from .regressors import Event, Confound, Intercept
import numpy as np
import pandas as pd
from sklearn import linear_model
import scipy as sp
from .plotting import plot_timecourses
from nilearn import input_data, image
from nilearn._utils import load_niimg
[docs]class ResponseFitter(object):
"""ResponseFitter takes an input signal and performs deconvolution on it.
To do this, it requires event times, and possible covariates.
ResponseFytter can, for each event_type, use different basis function sets,
see Event."""
def __init__(self,
input_signal,
sample_rate,
oversample_design_matrix=20,
add_intercept=True, **kwargs):
""" Initialize a ResponseFitter object.
Parameters
----------
input_signal : numpy array, dimensions preferably (X, n)
input data, of X timeseries of n timepoints
sampled at the frequency at which we would
like to conduct this analysis
sample_rate : float
frequency in Hz at which input data are sampled
**kwargs : dict
keyward arguments to be internalized by the ResponseFitter object
"""
super(ResponseFitter, self).__init__()
self.__dict__.update(kwargs)
self.sample_rate = sample_rate
self.sample_duration = 1.0/self.sample_rate
self.oversample_design_matrix = oversample_design_matrix
self.input_signal_time_points = np.linspace(0,
input_signal.shape[0] * self.sample_duration,
input_signal.shape[0],
endpoint=False)
self.input_signal = pd.DataFrame(input_signal)
self.input_signal.index = pd.Index(self.input_signal_time_points,
name='time')
self.X = pd.DataFrame(index=self.input_signal.index)
if add_intercept:
self.add_intercept()
self.events = {}
def add_intercept(self, name='intercept'):
intercept = Intercept(name, self)
self._add_regressor(intercept)
[docs] def add_confounds(self, name, confound):
"""Add a timeseries or set of timeseries to the general
design matrix as a confound
Parameters
----------
confound : array
Confound of (n_timepoints) or (n_timepoints, n_confounds)
"""
confound = Confound(name, self, confound)
self._add_regressor(confound)
def _add_regressor(self, regressor, oversample=1):
regressor.create_design_matrix(oversample=oversample)
if self.X.shape[1] == 0:
self.X = pd.concat((regressor.X, self.X), 1)
else:
self.X = pd.concat((self.X, regressor.X), 1)
[docs] def add_event(self,
event_name,
onset_times=None,
basis_set='fir',
interval=[0,10],
n_regressors=None,
durations=None,
covariates=None,
**kwargs):
"""
create design matrix for a given event_type.
Parameters
----------
event_name : string
Name of the event_type, used as key to lookup this event_type's
characteristics
**kwargs : dict
keyward arguments to be internalized by the generated and
internalized Event object. Needs to consist of the
necessary arguments to create an Event object,
see Event constructor method.
"""
assert event_name not in self.X.columns.get_level_values(0), "The event_name %s is already in use" % event_name
ev = Event(name=event_name,
onset_times=onset_times,
basis_set=basis_set,
interval=interval,
n_regressors=n_regressors,
durations=durations,
covariates=covariates,
fitter=self,
**kwargs)
self._add_regressor(ev)
self.events[event_name] = ev
[docs] def regress(self, type='ols', cv=20, alphas=None, store_residuals=False):
"""Regress a created design matrix on the input_data.
Creates internal variables betas, residuals, rank and s.
The beta values are then injected into the event_type objects the
ResponseFitter contains.
Parameters
----------
type : string, optional
the type of fit to be done. Options are 'ols' for np.linalg.lstsq,
'ridge' for CV ridge regression.
"""
if type == 'ols':
self.betas, self.ssquares, self.rank, self.s = \
np.linalg.lstsq(self.X, self.input_signal, rcond=-1)
self._send_betas_to_regressors()
if store_residuals:
self._residuals = self.input_signal - self.predict_from_design_matrix()
elif type == 'ridge': # betas and residuals are internalized by ridge_regress
if self.input_signal.shape[1] > 1:
raise NotImplementedError('No support for multidimensional signals yet')
self.ridge_regress(cv=cv, alphas=alphas, store_residuals=store_residuals)
[docs] def ridge_regress(self, cv=20, alphas=None, store_residuals=False):
"""
run CV ridge regression instead of ols fit. Uses sklearn's RidgeCV class
Parameters
----------
cv : int
number of cross-validation folds
alphas : np.array
the alpha/lambda values to try out in the CV ridge regression
"""
if alphas is None:
alphas = np.logspace(7, 0, 20)
self.rcv = linear_model.RidgeCV(alphas=alphas,
fit_intercept=False,
cv=cv)
self.rcv.fit(self.X, self.input_signal)
self.betas = self.rcv.coef_.T
if store_residuals:
self._residuals = self.input_signal - self.rcv.predict(self.X)
self._send_betas_to_regressors()
def _send_betas_to_regressors(self):
self.betas = pd.DataFrame(self.betas,
index=self.X.columns,
columns=self.input_signal.columns)
for key in self.events:
self.events[key].betas = self.betas.loc[[key]]
[docs] def predict_from_design_matrix(self, X=None):
"""
predict a signal given a design matrix. Requires regression to have
been run.
Parameters
----------
X : np.array, (timepoints, n_regressors)
the design matrix for which to predict data.
"""
# check if we have already run the regression - which is necessary
if X is None:
X = self.X
assert hasattr(self, 'betas'), 'no betas found, please run regression before prediction'
assert X.shape[1] == self.betas.shape[0], \
"""designmatrix needs to have the same number of regressors
as the betas already calculated"""
prediction = self.X.dot(self.betas)
return prediction
def get_timecourses(self,
oversample=None,
melt=False):
assert hasattr(self, 'betas'), 'no betas found, please run regression before prediction'
if oversample is None:
oversample = self.oversample_design_matrix
timecourses = pd.DataFrame()
for event_type in self.events:
tc = self.events[event_type].get_timecourses(oversample=oversample)
timecourses = pd.concat((timecourses, tc), ignore_index=False)
if melt:
timecourses = timecourses.reset_index().melt(id_vars=['event type',
'covariate',
'time'],
var_name='roi')
return timecourses
def plot_timecourses(self,
oversample=None,
*args,
**kwargs):
tc = self.get_timecourses(melt=True,
oversample=oversample)
tc['subj_idx'] = 'dummy'
plot_timecourses(tc, *args, **kwargs)
[docs] def get_rsq(self):
"""
calculate the rsq of a given fit.
calls predict_from_design_matrix to predict the signal that has been fit
"""
assert hasattr(self, 'betas'), \
'no betas found, please run regression before rsq'
return 1 - ((self.get_residuals()**2).sum() / (self.input_signal**2).sum())
def get_residuals(self):
if not hasattr(self, '_residuals'):
return self.input_signal - self.predict_from_design_matrix()
else:
return self._residuals
[docs] def get_epochs(self, onsets, interval, remove_incomplete_epochs=True):
"""
Return a matrix corresponding to specific onsets, within a given
interval. Matrix size is (n_onsets, n_timepoints_within_interval).
Note that any events that are in the ResponseFitter-object will
be regressed out before calculating the epochs.
"""
# If no other events are defined, no need to regress them out
if self.X.shape[1] == 0:
signal = self.input_signal
else:
self.regress(store_residuals=True)
signal = self._residuals
onsets = np.array(onsets)
indices = np.array([signal.index.get_loc(onset, method='nearest') for onset in onsets + interval[0]])
interval_duration = interval[1] - interval[0]
interval_n_samples = int(interval_duration * self.sample_rate) + 1
indices = np.tile(indices[:, np.newaxis], (1, interval_n_samples)) + np.arange(interval_n_samples)
# Set elements in epochs that fall out of time series to nan
indices[indices >= signal.shape[0]] = -1
# Make dummy element to fill epochs with nans if they fall out of the timeseries
signal = pd.concat((signal, pd.DataFrame(np.zeros((1, signal.shape[1])) * np.nan,
columns=signal.columns,
index=[np.nan])),
0)
# Calculate epochs
epochs = signal.values[indices].swapaxes(-1, -2)
epochs = epochs.reshape((epochs.shape[0], np.prod(epochs.shape[1:])))
columns = pd.MultiIndex.from_product([signal.columns,
np.linspace(interval[0], interval[1], interval_n_samples)],
names=['roi', 'time'])
epochs = pd.DataFrame(epochs,
columns=columns,
index=pd.Index(onsets, name='onset'))
# Get rid of incomplete epochs:
if remove_incomplete_epochs:
epochs = epochs[~epochs.isnull().any(1)]
return epochs
def get_time_to_peak(self, oversample=None, cutoff=1.0, negative_peak=False):
if oversample is None:
oversample = self.oversample_design_matrix
return self.get_timecourses(oversample=oversample)\
.groupby(['event type', 'covariate'], as_index=False)\
.apply(get_time_to_peak_from_timecourse,
negative_peak=negative_peak,
cutoff=cutoff)\
.reset_index(level=[ -1], drop=True)\
.pivot_table(columns='area', index='peak')[['time to peak', 'prominence']]
class ConcatenatedResponseFitter(ResponseFitter):
def __init__(self, response_fitters):
self.response_fitters = response_fitters
self.X = pd.concat([rf.X for rf in self.response_fitters]).fillna(0)
for attr in ['sample_rate', 'oversample_design_matrix']:
check_properties_response_fitters(self.response_fitters, attr)
setattr(self, attr, getattr(self.response_fitters[0], attr))
self.input_signal = pd.concat([rf.input_signal for rf in self.response_fitters])
self.events = {}
for rf in self.response_fitters:
self.events.update(rf.events)
def add_intercept(self, *args, **kwargs):
raise Exception('ConcatenatedResponseFitter does not allow for adding'\
'intercepts anymore. Do this in the original response '\
'fitters that get concatenated')
def add_confounds(self, *args, **kwargs):
raise Exception('ConcatenatedResponseFitter does not allow for adding '\
'confounds. Do this in the original response '\
'fytters that get concatenated')
def add_event(self, *args, **kwargs):
raise Exception('ConcatenatedResponseFitter does not allow for adding'\
'events.')
def plot_timecourses(self,
oversample=None,
*args,
**kwargs):
tc = self.get_timecourses(melt=True,
oversample=oversample)
tc['subj_idx'] = 'dummy'
plot_timecourses(tc, *args, **kwargs)
def get_epochs(self, onsets, interval, remove_incomplete_epochs=True):
raise NotImplementedError()
def check_properties_response_fitters(response_fitters, attribute):
attribute_values = [getattr(rf, attribute) for rf in response_fitters]
assert(all([v == attribute_values[0] for v in attribute_values])), "%s not equal across response fitters!" % attribute