Source code for hyeenna.analysis

import os
import itertools
import numpy as np
import pandas as pd
import xarray as xr
import scipy.stats as st
from joblib import Parallel, delayed
from .estimators import entropy
from .estimators import transfer_entropy as te
from .estimators import conditional_transfer_entropy as cte


[docs]def estimate_timescales(X: np.ndarray, Y: np.ndarray, lag_list: list, window_list: list, sample_size: int=5000) -> pd.DataFrame: """ Compute the transfer entropy (TE) over a range of lag counts and window sizes. Parameters ---------- X: np.array Source data Y: np.array Target data lag_list: list A list enumerating the lag counts to compute TE with window_list: list A list enumerating the window widths to compute TE with sample_size: int Number of samples to use when computing TE Returns ------- out: pd.DataFrame A dataframe containing the computed transfer entropies for every combination of lag and window given in the input parameters """ out = xr.DataArray(np.zeros((len(lag_list), len(window_list))), coords={'lag': lag_list, 'window': window_list}, dims=('lag', 'window')) for l, w in itertools.product(lag_list, window_list): ss = np.min([sample_size, (len(X)-l-w)//2]) max_start = len(X) - l - w - ss si = np.random.randint(0, max_start) Xs, Ys = X[si:si+ss], Y[si:si+ss] out.loc[{'lag': l, 'window': w}] = te(Xs, Ys, l, 1, w, 1) return out
def _run_one_estimator_stats(estimator, data, params, sample_size): """ The kernel function of the `estimator_stats` function. Runs a single instance of the estimator. """ # Setting seed to None keeps joblib from blowing up np.random.seed(None) X = list(data.values())[0] l, w = params.get('l', 1), params.get('omega', 1) ss = np.min([sample_size, (len(X)-l-w)]) max_start = len(X) - l - w - ss if max_start < 1: raise ValueError("Maximum start index is negative!" + os.linesep + "Computed value {}".format(max_start) + os.linesep + "Components leading to this error:" + os.linesep + " Data length: {}".format(len(X)) + os.linesep + " Lag size: {}".format(l) + os.linesep + " Window size: {}".format(w) + os.linesep + " Sample size: {}".format(ss) ) si = np.random.randint(0, max_start) data2 = {k: v[:][si:si+ss] for k, v in data.items()} return estimator(**data2, **params)
[docs]def estimator_stats(estimator: callable, data: dict, params: dict, nruns: int=10, sample_size: int=3000) -> dict: """ Compute some statistics about a given estimator. Parameters ---------- estimator: callable The estimator to compute statistics on. Suggested to be from the HYEENNA library. data: dict Input data to feed into the estimator params: dict Parameters to feed into the estimator nruns: int (default: 10) Number of times to run the estimator. sample_size: int (default 3000) Size of sample to draw from `data` to feed into the estimator Returns ------- stats: dict A dictionary containing sample statistics along with the actual results from each run of the estimator. """ results = Parallel(n_jobs=nruns)(delayed(_run_one_estimator_stats)( estimator, data, params, sample_size) for i in range(nruns)) stats = {'mean': np.mean(results), 'median': np.median(results), 'variance': np.var(results, ddof=1), 'max': np.max(results), 'min': np.min(results), 'results': results} return stats
def _run_one_shuffle_test(estimator, data, params, sample_size): """ The kernel function of the `shuffle_test` function. Runs a single instance of the estimator on a shuffled random surrogate. """ np.random.seed(None) X = list(data.values())[0] l, w = params.get('l', 1), params.get('omega', 1) ss = np.min([sample_size, (len(X)-l-w)//2]) max_start = len(X) - l - w - ss if max_start < 1: raise ValueError("Maximum start index is negative!" + os.linesep + "Computed value {}".format(max_start) + os.linesep + "Components leading to this error:" + os.linesep + " Data length: {}".format(len(X)) + os.linesep + " Lag size: {}".format(l) + os.linesep + " Window size: {}".format(w) + os.linesep + " Sample size: {}".format(ss)) si = np.random.randint(0, max_start) data2 = {key: val[:][si:si+ss].copy() for key, val in data.items()} for key, val in data2.items(): np.random.shuffle(val) return estimator(**data2, **params)
[docs]def shuffle_test(estimator: callable, data: dict, params: dict, confidence: float=0.99, nruns: int=10, sample_size: int=3000) -> dict: """ Compute a one tailed Z test against a sample of shuffled surrogates. Parameters ---------- estimator: callable The estimator to compute statistics on. Suggested to be from the HYEENNA library. data: dict Input data to feed into the estimator params: dict Parameters to feed into the estimator confidence: float (default: 0.99) Confidence level to conduct the test at. nruns: int (default: 10) Number of times to run the estimator. sample_size: int (default: 3000) Size of sample to draw from `data` to feed into the estimator Returns ------- stats: dict A dictionary with statistics from the standard `estimator_stats` function along with statistics computed on the shuffled surrogates. Most importantly are the 'test_value' and 'significant' keys, which are the value to perform the test on, along with whether the test result was significantly significant at the given confidence level. """ stats = estimator_stats(estimator, data, params, nruns, sample_size) stats['test_value'] = np.random.choice(stats['results']) shuffled_te = Parallel(n_jobs=nruns)(delayed(_run_one_shuffle_test)( estimator, data, params, sample_size) for i in range(nruns)) stats['shuffled_results'] = shuffled_te stats['ci'] = [np.percentile(shuffled_te, 1), np.percentile(shuffled_te, 99)] stats['shuffled_median'] = np.median(shuffled_te) stats['shuffled_mean'] = np.mean(shuffled_te) stats['shuffled_variance'] = np.var(shuffled_te, ddof=1) stats['shuffled_thresh'] = (stats['shuffled_mean'] + st.norm.ppf(confidence) * stats['shuffled_variance']) stats['significant'] = stats['test_value'] > stats['shuffled_thresh'] return stats
[docs]def estimate_info_transfer_network(varlist: list, names: list, tau: int=1, omega: int=1, nu: int=1, k: int=1, l: int=1, m: int=1, condition: bool=True, nruns: int=10, sample_size: int=3000) -> pd.DataFrame: """ Compute the pairwise transfer entropy for a list of given variables, resulting in an information transfer network. Parameters ---------- varlist: list List of given variable data names: list List of names corresponding to the data given in `varlist` tau: int (default=1) Lag value for source variables omega: int (default=1) Lag value for conditioning target variable history nu: int (default=1) Lag value for conditioning source variable histories k: int (default=1) Window length for source variables (applied to the same variable as the `tau` parameter) l: int (default=1) Window length for target variable histories (applied to the same variable as the `omega` parameter) m: int (default=1) Window length for source conditioning variables (applied to the same variable as the `nu` parameter) condition: bool (default=False) Whether to condition on all variables, or just the target variable history. nruns: int (default=10) Number of samples to compute for each connection. The median value is reported. sample_size: int (default=3000) Size of samples to take during estimation of transfer entropy. Returns ------- df: pd.DataFrame Dataframe representing the information transfer network. Both rows and columns are populated with the given `names`. """ # Calculate all needed variable combinations mapping = {n: d for n, d in zip(names, varlist)} permutations = [list(l) for l in list(itertools.permutations(names, 2))] for combo in permutations: n = [n for n in names if n not in combo] [combo.append(nn) for nn in n] # Subsample data and put it together with combination list analysis_sets = [] for combo in permutations: analysis_sets.append([mapping[c] for c in combo]) # Compute scores scores = [] params = {'tau': tau, 'omega': omega, 'nu': nu, 'k': k, 'l': l, 'm': m} for c, s in zip(permutations, analysis_sets): if condition: X = np.array(s[0]).reshape(-1, 1) Y = np.array(s[1]).reshape(-1, 1) Z = np.array(s[2:]).T args = {'estimator': cte, 'data': {'X': X, 'Y': Y, 'Z': Z}, 'params': params, 'nruns': nruns, 'sample_size': sample_size} else: X, Y = np.array(s[0]).reshape(-1, 1), np.array(s[1]).reshape(-1, 1) args = {'estimator': te, 'data': {'X': X, 'Y': Y}, 'params': params, 'nruns': nruns, 'sample_size': sample_size} res = shuffle_test(**args) scores.append(res['mean'] * res['significant']) # Reformat into a dataframe df = pd.DataFrame(columns=names, index=names) for link, score in zip(permutations, scores): if score < 1e-4: score = 0 df.loc[link[0], link[1]] = score for name, var in zip(names, varlist): e_tot = entropy(var) tot_exp = df.loc[name, :].sum() df[name][name] = e_tot - tot_exp return df