diff --git a/nitools/cifti.py b/nitools/cifti.py index 7d82c86..745f0af 100644 --- a/nitools/cifti.py +++ b/nitools/cifti.py @@ -4,6 +4,8 @@ import nibabel as nb import matplotlib.pyplot as plt import nitools as nt +import subprocess +import os def make_label_cifti(data, bm_axis, labels=None, @@ -26,7 +28,7 @@ def make_label_cifti(data, bm_axis, label_RGBA (list): List of rgba vectors for labels Returns: - cifti (GiftiImage): Label gifti image + cifti (GiftiImage): Label gifti image """ if data.ndim == 1: # reshape to (1, num_vertices) @@ -213,7 +215,7 @@ def split_cifti_to_giftis(cifti_img, type = "label", column_names = None): )) return gii -def volume_from_cifti(cifti, struct_names=[]): +def volume_from_cifti(cifti, struct_names=None): """ Gets the 4D nifti object containing the data for all subcortical (volume-based) structures @@ -222,7 +224,7 @@ def volume_from_cifti(cifti, struct_names=[]): cifti object containing the data struct_names (list or None): List of structure names that are included - defaults to None + defaults to None (all) Returns: nii_vol(niftiImage): nifti object containing the data @@ -232,20 +234,22 @@ def volume_from_cifti(cifti, struct_names=[]): # get the data array with all the time points, all the structures d_array = cifti.get_fdata(dtype=np.float32) - struct_names = [nb.cifti2.BrainModelAxis.to_cifti_brain_structure_name(n) for n in struct_names] + if struct_names is None: + struct_names = [a for a,_,_ in bmf.iter_structures()] + else: + struct_names = [nb.cifti2.BrainModelAxis.to_cifti_brain_structure_name(n) for n in struct_names] # initialize a matrix representing 4D data (x, y, z, time_point) vol = np.zeros(bmf.volume_shape + (d_array.shape[0],)) for idx, (nam,slc,bm) in enumerate(bmf.iter_structures()): - if (any(s in nam for s in struct_names)): + if (nam in struct_names): ijk = bm.voxel bm_data = d_array[:, slc] i = (ijk[:,0] > -1) # fill in data vol[ijk[i, 0], ijk[i, 1], ijk[i, 2], :]=bm_data[:,i].T - # save as nii nii_vol_4d = nb.Nifti1Image(vol,bmf.affine) return nii_vol_4d @@ -327,7 +331,7 @@ def smooth_cifti(cifti_input, # make up the command # overwrite the temp file created smooth_cmd = f"wb_command -cifti-smoothing 'temp.dscalar.nii' {surface_sigma} {volume_sigma} {direction} {cifti_output} -left-surface {left_surface} -right-surface {right_surface} -fix-zeros-surface" - subprocess.run(smooth_cmd, shell=True) + subprocess.run(smooth_cmd, shell=True, capture_output=True) # remove the temp file os.remove("temp.dscalar.nii") diff --git a/nitools/gifti.py b/nitools/gifti.py index 165ec6e..b377751 100644 --- a/nitools/gifti.py +++ b/nitools/gifti.py @@ -296,8 +296,9 @@ def get_gifti_colortable(gifti,ignore_zero=False): labels = labels[1:] cmap = mpl.colors.LinearSegmentedColormap.from_list('mylist', rgba, N=len(rgba)) - mpl.cm.unregister_cmap("mycolormap") - mpl.cm.register_cmap("mycolormap", cmap) + # Removed - incomplatible with newer versions of matplotlib + # mpl.cm.unregister_cmap("mycolormap") + # mpl.cm.register_cmap("mycolormap", cmap) return rgba, cmap diff --git a/nitools/spm.py b/nitools/spm.py index 549a7a7..8f66dea 100644 --- a/nitools/spm.py +++ b/nitools/spm.py @@ -12,8 +12,13 @@ from __future__ import annotations from os.path import normpath, dirname import numpy as np +import nibabel as nb import nitools as nt from scipy.io import loadmat +import pandas as pd +from scipy.stats import gamma +from scipy.special import gammaln +from scipy.linalg import block_diag class SpmGlm: @@ -35,7 +40,20 @@ def get_info_from_spm_mat(self): Args: spm_mat_path (str): _description_ """ - SPM = loadmat(f"{self.path}/SPM.mat", simplify_cells=True)['SPM'] + try: + # SPM = loadmat(f"{self.path}/SPM.mat", simplify_cells=True)['SPM'] + SPM = loadmat(f"{self.path}/SPM.mat", simplify_cells=True) + if 'SPM' in SPM: + SPM = SPM['SPM'] + spm_file_loaded_with_scipyio = True + except Exception as e: + print(e) + print( + f"Error loading SPM.mat file. The file was saved as mat-file version 7.3 (see https://www.mathworks.com/help/matlab/import_export/mat-file-versions.html). Try loading the mat-file with Matlab, and saving it as mat-file version 7.0 intead. Use this command: ") + print(f"cp {self.path}/SPM.mat {self.path}/SPM.mat.backup") + print( + f"matlab -nodesktop -nosplash -r \"load('{self.path}/SPM.mat'); save('{self.path}/SPM.mat', '-struct', 'SPM', '-v7'); exit\"") + # Get basic information from SPM.mat self.nscans = SPM['nscan'] self.nruns = len(self.nscans) @@ -45,7 +63,7 @@ def get_info_from_spm_mat(self): self.run_number = [] # Extract run number and condition name from SPM names for reg_name in SPM['xX']['name']: - s=reg_name.split(' ') + s = reg_name.split(' ') self.run_number.append(int(s[0][3:-1])) self.beta_names.append(s[1]) self.run_number = np.array(self.run_number) @@ -55,10 +73,20 @@ def get_info_from_spm_mat(self): # Get the necesssary matrices to reestimate the GLM for getting the residuals self.filter_matrices = [k['X0'] for k in SPM['xX']['K']] self.reg_of_interest = SPM['xX']['iC'] - self.design_matrix = SPM['xX']['xKXs']['X'] # Filtered and whitened design matrix - self.eff_df = SPM['xX']['erdf'] # Effective degrees of freedom - self.weight = SPM['xX']['W'] # Weight matrix for whitening - self.pinvX = SPM['xX']['pKX'] # Pseudo-inverse of (filtered and weighted) design matrix + self.design_matrix = SPM['xX']['xKXs']['X'].astype(float) # Filtered and whitened design matrix + self.eff_df = SPM['xX']['erdf'] # Effective degrees of freedom + self.weight = SPM['xX']['W'] # Weight matrix for whitening + self.pinvX = SPM['xX']['pKX'] # Pseudo-inverse of (filtered and weighted) design matrix + # self.pinvX = np.linalg.pinv(self.design_matrix) + self.gSF = SPM['xGX']['gSF'] # global scaling factor + self.X = SPM["xX"]["X"] + self.bf = SPM['xBF']['bf'] + self.Volterra = SPM['xBF']['Volterra'] + self.Sess = SPM['Sess'] + self.T = SPM["xBF"]["T"] + self.T0 = SPM["xBF"]["T0"] + + def relocate_file(self, fpath: str) -> str: """SPM file entries to current project directory and OS. @@ -76,10 +104,12 @@ def relocate_file(self, fpath: str) -> str: """ norm_fpath = fpath.replace('\\', '/') base_path = dirname(self.path).replace('\\', '/') - c = norm_fpath.find('func') - return base_path + '/' + norm_fpath[c:] + c = norm_fpath.find('imaging_data') + g = base_path.find('glm') + return base_path[:g] + norm_fpath[c:] + # return norm_fpath - def get_betas(self,mask): + def get_betas(self, mask): """ Samples the beta images of an estimated SPM GLM at the mask locations also returns the ResMS values, and the obseration descriptors (run and condition) name @@ -95,19 +125,26 @@ def get_betas(self,mask): obs_descriptors (dict): with lists reg_name and run_number (N long) """ - coords = nt.get_mask_coords(mask) + if isinstance(mask, str): + mask = nb.load(mask) + if isinstance(mask, nb.Nifti1Image): + coords = nt.get_mask_coords(mask) + elif isinstance(mask, np.ndarray) and (mask.shape[0] == 3): + coords = mask + else: + raise ValueError('Mask should be a 3xP array or coordinates, a nifti1image, or nifti file name') # Generate the list of relevant beta images: - indx = self.reg_of_interest-1 + indx = self.reg_of_interest - 1 beta_files = [f'{self.path}/{self.beta_files[i]}' for i in indx] # Get the data from beta and ResMS files rms_file = [f'{self.path}/ResMS.nii'] - data = nt.sample_images(beta_files + rms_file,coords,use_dataobj=False) + data = nt.sample_images(beta_files + rms_file, coords, use_dataobj=False) # Return the data and the observation descriptors info = {'reg_name': self.beta_names[indx], 'run_number': self.run_number[indx]} - return data[:-1,:], data[-1,:], info + return data[:-1, :], data[-1, :], info - def get_residuals(self,mask): + def get_residuals(self, mask): """ Collects 3d images of a range of GLM residuals (typical SPM GLM results) and corresponding metadata @@ -117,22 +154,32 @@ def get_residuals(self,mask): res_range (range): range of to be saved residual images per run """ # Sample the relevant time series data - coords = nt.get_mask_coords(mask) - data = nt.sample_images(self.rawdata_files,coords,use_dataobj=True) + if isinstance(mask, str): + mask = nb.load(mask) + if isinstance(mask, nb.Nifti1Image): + coords = nt.get_mask_coords(mask) + elif isinstance(mask, np.ndarray) and (mask.shape[0] == 3): + coords = mask + else: + raise ValueError('Mask should be a 3xP array or coordinates, a nifti1image, or nifti file name') + + data = nt.sample_images(self.rawdata_files, coords, use_dataobj=True) + + data = data * self.gSF[:, None] # Filter and temporal pre-whiten the data - fdata= self.spm_filter(self.weight @ data) # spm_filter + fdata = self.spm_filter(self.weight @ data) # spm_filter - # Estimate the beta coefficients abd residuals + # Estimate the beta coefficients and residuals beta = self.pinvX @ fdata residuals = fdata - self.design_matrix @ beta # Return the regressors of interest - indx = self.reg_of_interest-1 + indx = self.reg_of_interest - 1 info = {'reg_name': self.beta_names[indx], 'run_number': self.run_number[indx]} - return residuals, beta[indx,:], info + return residuals, beta[indx, :], info - def spm_filter(self,data): + def spm_filter(self, data): """ Does high pass-filtering and temporal weighting of the data (indentical to spm_filter) @@ -142,10 +189,549 @@ def spm_filter(self,data): data (ndarray): 2d array of time series data (TxP) """ scan_bounds = self.nscans.cumsum() - scan_bounds = np.insert(scan_bounds,0,0) + scan_bounds = np.insert(scan_bounds, 0, 0) fdata = data.copy() for i in range(self.nruns): - Y = fdata[scan_bounds[i]:scan_bounds[i+1],:]; - Y = Y - self.filter_matrices[i] @ (self.filter_matrices[i].T @ Y) + Y = fdata[scan_bounds[i]:scan_bounds[i + 1], :] + if self.filter_matrices[i].size > 0: + # Only apply with filter matrices are not empty + Y = Y - self.filter_matrices[i] @ (self.filter_matrices[i].T @ Y) + fdata[scan_bounds[i]:scan_bounds[i + 1], :] = Y return fdata + + def rerun_glm(self, data): + """ + Re-estimate the GLM (without hyperparameter estimation) on new data + + Args: + data (ndarray): 2d array of time series data (TxP) + Returns: + beta (ndarray): 2d array of beta coefficients (PxQ) + info (dict): with lists reg_name and run_number (Q long) + data_filt (ndarray): 2d array of filtered time series data (TxP) + data_hat (ndarray): 2d array of predicted time series data (TxP) - + This is predicted only using regressors of interest (without the constant or other nuisance regressors) + data_adj (ndarray): 2d array of adjusted time series data (TxP) + This is filtered timeseries with constants and other nuisance regressors substrated out + residuals (ndarray): 2d array of residuals (TxP) + """ + + # Filter and temporal pre-whiten the data + data_filt = self.spm_filter(self.weight @ data) # spm_filter + + # Estimate the beta coefficients and residuals + beta = self.pinvX @ data_filt + residuals = data_filt - self.design_matrix @ beta + # Get estimated (predicted) timeseries (regressors of interest without the constant) + data_hat = self.design_matrix[:, self.reg_of_interest[:-1]] @ beta[self.reg_of_interest[:-1], :] + + # Get adjusted timeseries + data_adj = data_hat + residuals + + # Return the regressors of interest (apart from the constant) + indx = self.reg_of_interest - 1 + info = pd.DataFrame({'reg_name': self.beta_names[indx], 'run_number': self.run_number[indx]}) + + return beta[indx, :], info, data_filt, data_hat, data_adj, residuals + + + def convolve_glm(self, bf): + """ + Re-convolves the SPM structure with a new basis function. + + Args: + bf (ndarray): new basis function (output of spm_hrf): + + """ + + self.bf = bf + Xx_blocks = [] + Xb_blocks = [] + + num_scan = self.nscans + + for s in range(len(self.Sess)): + # Number of scans for this session + k = num_scan[s] + + # Get stimulus functions U + U = self.Sess[s]["U"] + num_cond = len(U) + + # Convolve stimulus functions with basis functions + X, Xn, Fc = _spm_Volterra(U, self.bf, self.Volterra) + + # Resample regressors at acquisition times (32 bin offset) + X = X[np.arange(k) * self.T + self.T0 + 32, :] + + # # Orthogonalize within trial type + # for i in range(len(Fc)): + # X[:, Fc[i]["i"][0] - 1] = _spm_orth(X[:, Fc[i]["i"][0] - 1]) + + # Get user-specified regressors + C = self.Sess[s]["C"]["C"] + if C.size > 0: + #num_reg = C.shape[1] + X = np.hstack([X, _spm_detrend(C)]) + #else: + #num_reg = 0 + + # # Store session info + # if Xx.size == 0: + # Xx = X + # Xb = np.ones((k, 1)) + # else: + # Xx = _blkdiag([Xx, X]) + # Xb = _blkdiag([Xb, np.ones((k, 1))]) + + Xx_blocks.append(X) + Xb_blocks.append(np.ones((k, 1), dtype=X.dtype)) + + Xx = block_diag(*Xx_blocks) + Xb = block_diag(*Xb_blocks) + + # Finalize design matrix + self.X = np.hstack([Xx, Xb]) + + # Compute weighted and filtered design matrix + if hasattr(self, "weight"): + self.design_matrix = self.spm_filter(self.weight @ self.X) + else: + self.design_matrix = self.spm_filter(self.X) + + self.design_matrix = self.design_matrix.astype(float) + + # Compute pseudoinverse of weighted and filtered design matrix + self.pinvX = np.linalg.pinv(self.design_matrix) + + + def update_hrf_params(self, P, mask_img): + """ + Updates the HRF parameters of the SPM structure and return the timeseries calculated with the new HRF parameters. + Args: + P (ndarray): HRF parameters (SPM12 default: [6, 16, 1, 1, 6, 0, 32]) + mask_img (str or Nifti1Image): mask image (e.g., ROI mask) + Returns: + data_filt (ndarray): 2d array of filtered time series data (TxP) + data_hat (ndarray): 2d array of predicted time series data (TxP) - + This is predicted only using regressors of interest (without the constant or other nuisance regressors) + data_adj (ndarray): 2d array of adjusted time series data (TxP) + This is filtered timeseries with constants and other nuisance regressors substrated out + residuals (ndarray): 2d array of residuals (TxP) + """ + if isinstance(mask_img, str): + mask_img = nb.load(mask_img) + if isinstance(mask_img, nb.Nifti1Image): + coords = nt.get_mask_coords(mask_img) + else: + raise TypeError("mask_img must be a nifti1 image or a string") + + hrf, p = spm_hrf(1, P) + self.convolve_glm(hrf) + + # get raw time series in roi + data = nt.sample_images(self.rawdata_files, coords) + + # rerun glm + _, info, data_filt, data_hat, data_adj, residuals = self.rerun_glm(data) + + return data_filt, data_hat, data_adj, residuals + + +def _cut(X, pre, at, post, padding='last'): + """ + Cut segment from signal X. + + Parameters: + X : np.ndarray + Input data (rows: time samples, cols: cols) + pre : int + N samples before `at` + at : int or None + Sample index at which to cut. If None or NaN, returns NaNs. + post : int + N samples after `at` + padding : str, optional + Padding strategy when time is not available ('nan', 'zero', 'last'). Default is 'last'. + + Returns: + np.ndarray + The extracted segment of the trajectory. + """ + if at is None or np.isnan(at): + return np.full((pre + post + 1, X.shape[1]), np.nan) + + rows, cols = X.shape + start, end = max(0, at - pre), min(at + post + 1, rows) + y0 = X[start:end, :] + + pad_before, pad_after = max(0, pre - (at - start)), max(0, post - (rows - at - 1)) + + if padding == 'nan': + y = np.pad(y0, ((pad_before, pad_after),(0, 0), ), mode='constant', constant_values=np.nan) + elif padding == 'zero': + y = np.pad(y0, ((pad_before, pad_after), (0, 0), ), mode='constant', constant_values=0) + elif padding == 'last': + y = np.pad(y0, ( (pad_before, pad_after), (0, 0), ), mode='edge') + else: + raise ValueError("Unknown padding option. Use: 'nan', 'last', 'zero'") + + return y + + +def cut(X, pre, at, post, padding='last'): + """ + Takes a vector of sample locations (at) and returns the signal (X) aligned and averaged around those locations + Args: + X (np.array): + Signal to cut. + pre (int): + N samples before cut + at (np.array or list): + Cut locations in samples + post (int): + N samples after cut + padding (str, optional): + Padding strategy when time is not available ('nan', 'zero', 'last'). Default is 'last'. + stats (str): + Sufficient statistic used for the area + mean: Mean of the voxels + whitemean: Mean of the voxel, spatially weighted by noise + axis (int): + Axis along which the sufficient statistic is applied. If X.shape = (n_voxels, n_samples) then axis=0 + ResMS (np.array): + Residual mean squares of shape (n_voxels,) only used if stats='prewhiten' for spatial prewhitening + + Returns: + + y (np.array): + Signal cut around locations at. + + """ + + y_tmp = [] + for a in at: + y_tmp.append(_cut(X, pre, a, post, padding)) + + return np.array(y_tmp) + + +def _blkdiag(matrices): + """ + Constructs a block diagonal matrix from multiple input matrices. + + Parameters: + matrices : list of ndarray + Matrices to be placed on the block diagonal. + + Returns: + y : ndarray + Block diagonal concatenated matrix. + """ + if len(matrices) == 0: + return np.array([]) + + # Determine final shape + rows = np.array([m.shape[0] for m in matrices]).sum() + cols = np.array([m.shape[1] for m in matrices]).sum() + + # Preallocate result matrix with zeros + y = np.zeros((rows, cols), dtype=matrices[0].dtype) + + # Fill the block diagonal + r_offset, c_offset = 0, 0 + for m in matrices: + r, c = m.shape + y[r_offset:r_offset + r, c_offset:c_offset + c] = m + r_offset += r + c_offset += c + + return y + + +def _spm_detrend(x, p=0): + """ + Polynomial detrending over columns. + + Parameters: + x : ndarray or list of ndarrays + Data matrix (or list of matrices). + p : int, optional + Order of polynomial (default: 0, i.e., mean subtraction). + + Returns: + y : ndarray or list of ndarrays + Detrended data matrix. + """ + + # Handle case where x is a list (equivalent to cell arrays in MATLAB) + if isinstance(x, list): + return [spm_detrend(xi, p) for xi in x] + + # Check dimensions + m, n = x.shape + if m == 0 or n == 0: + return np.array([]) + + # Mean subtraction (order 0) + if p == 0: + return x - np.mean(x, axis=0, keepdims=True) + + # Polynomial adjustment + G = np.zeros((m, p + 1)) + for i in range(p + 1): + G[:, i] = (np.arange(1, m + 1) ** i) + + y = x - G @ np.linalg.pinv(G) @ x + return y + + +def _spm_orth(X, OPT='pad'): + """ + Recursive Gram-Schmidt orthogonalisation of basis functions + + Parameters: + X : ndarray + Input matrix + OPT : str, optional + 'norm' for Euclidean normalization + 'pad' for zero padding of null space (default) + + Returns: + X : ndarray + Orthogonalized matrix + """ + # Turn off warnings (equivalent to MATLAB warning('off','all')) + np.seterr(all='ignore') + + if X.ndim==1: + X = X[:, np.newaxis] + n, m = X.shape + X = X[:, np.any(X, axis=0)] # Remove zero columns + rankX = np.linalg.matrix_rank(X) + + try: + x = X[:, [0]] + j = [0] + + for i in range(1, X.shape[1]): + D = X[:, [i]] + D = D - x @ np.linalg.pinv(x) @ D + + if np.linalg.norm(D, 1) > np.exp(-32): + x = np.hstack((x, D)) + j.append(i) + + if len(j) == rankX: + break + except: + x = np.zeros((n, 0)) + j = [] + + # Restore warnings + np.seterr(all='warn') + + # Normalization if requested + if OPT == 'pad': + X_out = np.zeros((n, m)) + X_out[:, j] = x + elif OPT == 'norm': + X_out = x / np.linalg.norm(x, axis=0, keepdims=True) + else: + X_out = x + + # drop extra dimensions if input was a vector + X_out = np.squeeze(X_out) + + return X_out + + +def _spm_Volterra(U, bf, V=1): + """ + Generalized convolution of inputs (U) with basis set (bf) + + Parameters: + U : list of dict + Input structure array containing 'u' (time series) and 'name' (labels) + bf : ndarray + Basis functions + V : int, optional + Order of Volterra expansion (default: 1) + + Returns: + X : ndarray + Design Matrix + Xname : list + Names of regressors (columns) in X + Fc : list of dict + Contains indices and names for each input + """ + + def _convolve_boxcar(u, dur, dt): + """ + Convolve a single column of u with a boxcar of the + appropriate duration (in microtime bins), matching SPM. + """ + du = int(round(dur / dt)) + if du > 1: + boxcar = np.ones(du) / du + u_conv = np.convolve(u_col, boxcar, mode='full')[:n_bins] + return u_conv + + X = [] + Xname = [] + Fc = [] + + if bf.ndim == 2: + num_basis = bf.shape[1] + else: + num_basis = 1 + + # First-order terms + for i, u_dict in enumerate(U): + dt = u_dict['dt'] # get microtime resolution + ind = [] + ip = [] + for k in range(u_dict['u'].shape[1]): + dur = u_dict.get('dur', 0) # get boxcar length (if dur=0 then delta) + dur_k = dur if np.isscalar(dur) else dur[k] # handle scalar or array + du = int(round(dur_k / dt)) + for p in range(num_basis): + if num_basis > 1: + x = u_dict['u'].todense()[:, k] + d = np.arange(x.shape[0]) + if du > 1: + x = np.convolve(x, np.ones(du) / du, mode='full')[:len(d)] + x = np.convolve(x, bf[:, p], mode='full')[:len(d)] + else: + x = u_dict['u'].todense()[:, k] + x = np.asarray(x).flatten() + d = np.arange(x.shape[0]) + if du > 1: + x = np.convolve(x, np.ones(du) / du, mode='full')[:len(d)] + x = np.convolve(x, bf, mode='full')[:len(d)] + x = x[:, None] + X.append(x) + + Xname.append(f"{u_dict['name'][k]}*bf({p + 1})") + ind.append(len(X)) + ip.append(k + 1) + + Fc.append({'i': ind, 'name': u_dict['name'][0], 'p': ip}) + + X = np.column_stack(X) if X else np.empty((len(U[0]['u']), 0)) + + # Return if first order + if V == 1: + return X, Xname, Fc + + # Second-order terms + for i in range(len(U)): + for j in range(i, len(U)): + ind = [] + ip = [] + for p in range(bf.shape[1]): + for q in range(bf.shape[1]): + x = U[i]['u'][:, 0] + y = U[j]['u'][:, 0] + x = np.convolve(x, bf[:, p], mode='full')[:len(d)] + y = np.convolve(y, bf[:, q], mode='full')[:len(d)] + X = np.column_stack((X, x * y)) + + Xname.append(f"{U[i]['name'][0]}*bf({p + 1})x{U[j]['name'][0]}*bf({q + 1})") + ind.append(X.shape[1]) + ip.append(1) + + Fc.append({'i': ind, 'name': f"{U[i]['name'][0]}x{U[j]['name'][0]}", 'p': ip}) + + return X, Xname, Fc + + +def spm_hrf(RT, P=None, T=16): + """ + Haemodynamic response function (HRF) + + Parameters: + RT : float + Scan repeat time in seconds (TR) + P : list or ndarray, optional + Parameters of the response function (two Gamma functions), defaults to [6, 16, 1, 1, 6, 0, 32] + T : int, optional + Microtime resolution (default: 16) + + Returns: + hrf : ndarray + Hemodynamic response function + p : ndarray + Parameters of the response function + """ + p = np.array([6., 16., 1., 1., 6., 0., 32.], dtype=float) + + # MATLAB behavior: overwrite first len(P) entries only + if P is not None: + P = np.asarray(P, dtype=float).ravel() + p[:len(P)] = P + + RT = RT / T + dt = RT / T + u = np.arange(0, int(np.ceil(p[6] / dt)) + 1) - p[5] / dt + u = np.maximum(u, np.finfo(float).tiny) + + hrf = _spm_Gpdf(u, p[0] / p[2], dt / p[2]) - _spm_Gpdf(u, p[1] / p[3], dt / p[3]) / p[4] + + # MATLAB: hrf([0:floor(p7/RT)]*T + 1) -> Python 0-based + idx = (np.arange(0, int(np.floor(p[6] / RT)) + 1) * T).astype(int) + hrf = hrf[idx] + + hrf = hrf / hrf.sum() + return hrf, p + + +def _spm_Gpdf(x, h, l): + """ + Equivalent of spm_Gpdf in MATLAB: computes the PDF of the Gamma distribution + with shape h and scale l for values x. + + Parameters: + - x: input values (can be scalar or array) + - h: shape parameter (> 0) + - l: scale parameter (> 0) + + Returns: + - f: Gamma PDF evaluated at x + """ + + x = np.asarray(x, dtype=np.float64) + h = np.asarray(h, dtype=np.float64) + l = np.asarray(l, dtype=np.float64) + + # Clamp x to avoid the x==0 singularity when h<1 + x = np.maximum(x, np.finfo(float).tiny) + + # Broadcast all inputs to same shape + x, h, l = np.broadcast_arrays(x, h, l) + + f = np.zeros_like(x) + + # Invalid values: h <= 0 or l <= 0 + valid = (h > 0) & (l > 0) + invalid = ~valid + f[invalid] = np.nan + + # # Degenerate cases at x == 0 + # zero_x = (x == 0) & valid + # f[zero_x & (h < 1)] = np.inf + # f[zero_x & (h == 1)] = l[zero_x & (h == 1)] + # f[zero_x & (h > 1)] = 0 + + # Compute for x > 0 + pos = (x > 0) & valid + f[pos] = np.exp( + (h[pos] - 1) * np.log(x[pos]) + + h[pos] * np.log(l[pos]) - + l[pos] * x[pos] - + gammaln(h[pos]) + ) + + return f \ No newline at end of file diff --git a/nitools/volume.py b/nitools/volume.py index 12a25d0..1d02faa 100644 --- a/nitools/volume.py +++ b/nitools/volume.py @@ -164,6 +164,7 @@ def euclidean_dist_sq(coordA,coordB): def sample_image(img,xm,ym,zm,interpolation): """ Return values after resample image + ignores NaN values Args: img (Nifti image): @@ -218,15 +219,13 @@ def sample_image(img,xm,ym,zm,interpolation): c011 = D[ir, jr+1, kr+1] c001 = D[ir, jr, kr+1] - c00 = c000*(1-id)+c100*id - c01 = c001*(1-id)+c101*id - c10 = c010*(1-id)+c110*id - c11 = c011*(1-id)+c111*id - - c0 = c00*(1-jd)+c10*jd - c1 = c01*(1-jd)+c11*jd - - value = c0*(1-kd)+c1*kd + c00 = nan_weighted_mean(c100,c000,id) + c01 = nan_weighted_mean(c101,c001,id) + c10 = nan_weighted_mean(c110,c010,id) + c11 = nan_weighted_mean(c111,c011,id) + c0 = nan_weighted_mean(c10,c00,jd) + c1 = nan_weighted_mean(c11,c01,jd) + value = nan_weighted_mean(c1,c0,kd) elif interpolation == 0: ir = np.rint(im).astype('int') jr = np.rint(jm).astype('int') @@ -245,6 +244,18 @@ def sample_image(img,xm,ym,zm,interpolation): value[invalid]=0 return value +def nan_weighted_mean(d1,d2,weight): + j = np.isnan(d1) + k = np.isnan(d2) + if j.sum()==0 & k.sum()==0: + return d1*weight + d2*(1-weight) + else: + dd1 = d1.copy() + dd2 = d2.copy() + dd1[j]=dd2[j] + dd2[k]=dd1[k] + return (dd1*weight + dd2*(1-weight)) + def check_voxel_range(img,i,j,k): """ Checks if i,j,k voxels coordinates are within an image @@ -294,17 +305,17 @@ def change_nifti_numformat(infile, """Changes the number format of a nifti file and saves it. Args: - infile (str): file name of input file - outfile (str): file name output file + infile (str): file name of input file + outfile (str): file name output file new_numformat (str): New number format. Defaults to 'uint16'. - type_cast_data (bool): If true, typecasts the data as well. + type_cast_data (bool): If true, typecasts the data as well. Note: - typecast_data changes the data format and can lead to and wrap-around of values, as the data is typecasted as well. - Do only when correcting a previous mistake in processing. + typecast_data changes the data format and can lead to and wrap-around of values, as the data is typecasted as well. + Do only when correcting a previous mistake in processing. """ A = nb.load(infile) X = A.get_fdata() - if typecast_data: + if typecast_data: X = X.astype(new_numformat) head = A.header.copy() head.set_data_dtype(new_numformat) @@ -314,20 +325,20 @@ def change_nifti_numformat(infile, def sample_images(imgs,coords,use_dataobj=True): """ Samples a list of images at a set of voxels (coordinates nearest neighbor) - 3d images are being returned as a single value per voxel. 4d images are being returned as a vector per voxel. - The function respects ths SPM-convention of specifying time slices 'img.nii,1' - Note that either all or none of the image names need to follow the 'img.nii,1' convention. + 3d images are being returned as a single value per voxel. 4d images are being returned as a vector per voxel. + The function respects ths SPM-convention of specifying time slices 'img.nii,1' + Note that either all or none of the image names need to follow the 'img.nii,1' convention. Args: imgs (list): - List of Nifti file names (SPM convention for time slices is respected) + List of Nifti file names (SPM convention for time slices is respected) coords (np-array): - 3xN matrix of coordinates of to be sampled voxels + 3xN matrix of coordinates of to be sampled voxels Returns: values (np-array): Array contains all values in the image """ n = len(imgs) - if ',' in imgs[0]: # SPM-convention of indicating time slice with ',x' notation + if ',' in imgs[0]: # SPM-convention of indicating time slice with ',x' notation fnames=[f.split(',')[0] for f in imgs] slice=np.array([f.split(',')[1] for f in imgs],dtype=int)-1 unique_fnames,img_indx = np.unique(fnames,return_inverse=True) @@ -344,7 +355,7 @@ def sample_images(imgs,coords,use_dataobj=True): img_indx.append(np.array([i],dtype=int)) slice.append(np.array([0],dtype=int)) slice = np.concatenate(slice) - img_indx = np.concatenate(img_indx) + img_indx = np.concatenate(img_indx) N = len(slice) # Load the header from all the input files voxels = coords_to_voxelidxs(coords,input_vols[0]) @@ -355,7 +366,7 @@ def sample_images(imgs,coords,use_dataobj=True): data_block = v.dataobj[min(voxels[0]):max(voxels[0])+1,min(voxels[1]):max(voxels[1])+1,min(voxels[2]):max(voxels[2])+1] D = data_block[voxels[0]-min(voxels[0]),voxels[1]-min(voxels[1]),voxels[2]-min(voxels[2])] if v.ndim==3: - data[dindx,:]=D + data[dindx,:]=D else: data[dindx,:]=D[:,slice[dindx]].T else: