diff --git a/elephant/statistics.py b/elephant/statistics.py index 6ad8bb3bc..314c3215c 100644 --- a/elephant/statistics.py +++ b/elephant/statistics.py @@ -74,6 +74,7 @@ import numpy as np import quantities as pq import scipy.stats +import scipy.signal from scipy.special import erf import elephant.conversion as conv @@ -603,13 +604,12 @@ def lvr(time_intervals, R=5*pq.ms, with_nan=False): def instantaneous_rate(spiketrains, sampling_period, kernel='auto', cutoff=5.0, t_start=None, t_stop=None, trim=False, center_kernel=True, border_correction=False): - """ + r""" Estimates instantaneous firing rate by kernel convolution. Visualization of this function is covered in Viziphant: :func:`viziphant.statistics.plot_instantaneous_rates_colormesh`. - Parameters ---------- spiketrains : neo.SpikeTrain or list of neo.SpikeTrain @@ -622,7 +622,7 @@ def instantaneous_rate(spiketrains, sampling_period, kernel='auto', The string 'auto' or callable object of class `kernels.Kernel`. The kernel is used for convolution with the spike train and its standard deviation determines the time resolution of the instantaneous - rate estimation. Currently implemented kernel forms are rectangular, + rate estimation. Currently, implemented kernel forms are rectangular, triangular, epanechnikovlike, gaussian, laplacian, exponential, and alpha function. If 'auto', the optimized kernel width for the rate estimation is @@ -630,29 +630,34 @@ def instantaneous_rate(spiketrains, sampling_period, kernel='auto', Gaussian kernel is constructed with this width. Automatized calculation of the kernel width is not available for other than Gaussian kernel shapes. + Note: The kernel width is not adaptive, i.e., it is calculated as global optimum across the data. + Default: 'auto' cutoff : float, optional This factor determines the cutoff of the probability distribution of the kernel, i.e., the considered width of the kernel in terms of multiples of the standard deviation sigma. + Default: 5.0 t_start : pq.Quantity, optional Start time of the interval used to compute the firing rate. If None, `t_start` is assumed equal to `t_start` attribute of `spiketrain`. + Default: None t_stop : pq.Quantity, optional - End time of the interval used to compute the firing rate (included). + End time of the interval used to compute the firing rate. If None, `t_stop` is assumed equal to `t_stop` attribute of `spiketrain`. + Default: None trim : bool, optional Accounts for the asymmetry of a kernel. If False, the output of the Fast Fourier Transformation being a longer - vector than the input vector by the size of the kernel is reduced back - to the original size of the considered time interval of the + vector than the input vector (ouput = input + kernel - 1) is reduced + back to the original size of the considered time interval of the `spiketrain` using the median of the kernel. False (no trimming) is equivalent to 'same' convolution mode for symmetrical kernels. If True, only the region of the convolved signal is returned, where @@ -661,12 +666,14 @@ def instantaneous_rate(spiketrains, sampling_period, kernel='auto', Transformation by a total of two times the size of the kernel, and `t_start` and `t_stop` are adjusted. True (trimming) is equivalent to 'valid' convolution mode for symmetrical kernels. + Default: False center_kernel : bool, optional If set to True, the kernel will be translated such that its median is centered on the spike, thus putting equal weight before and after the spike. If False, no adjustment is performed such that the spike sits at the origin of the kernel. + Default: True border_correction : bool, optional Apply a border correction to prevent underestimating the firing rates @@ -689,35 +696,51 @@ def instantaneous_rate(spiketrains, sampling_period, kernel='auto', Raises ------ TypeError - If `spiketrain` is not an instance of `neo.SpikeTrain`. + * If `spiketrain` is not an instance of `neo.SpikeTrain`. + * If `sampling_period` is not a `pq.Quantity`. + * If `sampling_period` is not larger than zero. + * If `kernel` is neither instance of `kernels.Kernel` nor string + 'auto'. + * If `cutoff` is neither `float` nor `int`. + * If `t_start` and `t_stop` are neither None nor a `pq.Quantity`. + * If `trim` is not `bool`. + ValueError + * If `sampling_period` is smaller than zero. + * If `kernel` is 'auto' and the function was unable to calculate + optimal kernel width for instantaneous rate from input data. - If `sampling_period` is not a `pq.Quantity`. + Warns + ----- + UserWarning + * If `cutoff` is less than `min_cutoff` attribute of `kernel`, the + width of the kernel is adjusted to a minimally allowed width. - If `sampling_period` is not larger than zero. + Notes + ----- + * The resulting instantaneous firing rate values smaller than ``0``, which + may happen due to machine precision errors, are clipped to zero. - If `kernel` is neither instance of `kernels.Kernel` nor string 'auto'. + * The instantaneous firing rate estimate is calculated based on half-open + intervals ``[)``, except the last one e.g. if ``t_start = 0s``, + ``t_stop = 4s`` and ``sampling_period = 1s``, the intervals are: - If `cutoff` is neither `float` nor `int`. + ``[0, 1)`` ``[1, 2)`` ``[2, 3)`` ``[3, 4]``. - If `t_start` and `t_stop` are neither None nor a `pq.Quantity`. + This induces a sampling bias, which can lead to a time shift of the + estimated rate, if the `sampling_period` is chosen large relative to the + duration ``(t_stop - t_start)``. One possibility to counteract this is + to choose a smaller `sampling_period`. - If `trim` is not `bool`. - ValueError - If `sampling_period` is smaller than zero. + * The last interval of the given duration ``(t_stop - t_start)`` is + dropped if it is shorter than `sampling_period`, + e.g. if ``t_start = 0s``, ``t_stop = 4.5s`` and + ``sampling_period = 1s``, the intervals considered are: - If `kernel` is 'auto' and the function was unable to calculate optimal - kernel width for instantaneous rate from input data. + ``[0, 1)`` ``[1, 2)`` ``[2, 3)`` ``[3, 4]``, + + the last interval ``[4, 4.5]`` is excluded from all calculations. - Warns - ----- - UserWarning - If `cutoff` is less than `min_cutoff` attribute of `kernel`, the width - of the kernel is adjusted to a minimally allowed width. - Notes - ----- - The resulting instantaneous firing rate values smaller than ``0``, which - can happen due to machine precision errors, are clipped to zero. Examples -------- @@ -787,32 +810,29 @@ def optimal_kernel(st): if kernel == 'auto': kernel = optimal_kernel(spiketrains) spiketrains = [spiketrains] - elif not isinstance(spiketrains, (list, tuple, SpikeTrainList)): - raise TypeError( - "'spiketrains' must be a list of neo.SpikeTrain's or a single " - "neo.SpikeTrain. Found: '{}'".format(type(spiketrains))) + + if not all([isinstance(elem, neo.SpikeTrain) for elem in spiketrains]): + raise TypeError(f"'spiketrains' must be a list of neo.SpikeTrain's or " + f"a single neo.SpikeTrain. Found: {type(spiketrains)}") if not is_time_quantity(sampling_period): - raise TypeError( - "The 'sampling_period' must be a time Quantity. \n" - "Found: {}".format(type(sampling_period))) + raise TypeError(f"The 'sampling_period' must be a time Quantity." + f"Found: {type(sampling_period)}") if sampling_period.magnitude < 0: - raise ValueError("The 'sampling_period' ({}) must be non-negative.". - format(sampling_period)) + raise ValueError(f"The 'sampling_period' ({sampling_period}) " + f"must be non-negative.") if not (isinstance(kernel, kernels.Kernel) or kernel == 'auto'): - raise TypeError( - "'kernel' must be either instance of class elephant.kernels.Kernel" - " or the string 'auto'. Found: %s, value %s" % (type(kernel), - str(kernel))) + raise TypeError(f"'kernel' must be instance of class " + f"elephant.kernels.Kernel or string 'auto'. Found: " + f"{type(kernel)}, value {str(kernel)}") if not isinstance(cutoff, (float, int)): raise TypeError("'cutoff' must be float or integer") if not is_time_quantity(t_start, allow_none=True): raise TypeError("'t_start' must be a time Quantity") - if not is_time_quantity(t_stop, allow_none=True): raise TypeError("'t_stop' must be a time Quantity") @@ -822,6 +842,7 @@ def optimal_kernel(st): check_neo_consistency(spiketrains, object_type=neo.SpikeTrain, t_start=t_start, t_stop=t_stop) + if kernel == 'auto': if len(spiketrains) == 1: kernel = optimal_kernel(spiketrains[0]) @@ -835,77 +856,77 @@ def optimal_kernel(st): if t_stop is None: t_stop = spiketrains[0].t_stop - units = pq.CompoundUnit( - "{}*s".format(sampling_period.rescale('s').item())) + # Rescale units for consistent calculation t_start = t_start.rescale(spiketrains[0].units) t_stop = t_stop.rescale(spiketrains[0].units) - n_bins = int(((t_stop - t_start) / sampling_period).simplified) + 1 - time_vectors = np.zeros((len(spiketrains), n_bins), dtype=np.float64) - hist_range_end = t_stop + sampling_period.rescale(spiketrains[0].units) + # Calculate parameters for np.histogram + n_bins = int(((t_stop - t_start) / sampling_period).simplified) + hist_range_end = t_start + n_bins * \ + sampling_period.rescale(spiketrains[0].units) + hist_range = (t_start.item(), hist_range_end.item()) + + # Preallocation + histogram_arr = np.zeros((len(spiketrains), n_bins), dtype=np.float64) for i, st in enumerate(spiketrains): - time_vectors[i], _ = np.histogram(st.magnitude, bins=n_bins, - range=hist_range) + histogram_arr[i], _ = np.histogram(st.magnitude, bins=n_bins, + range=hist_range) + + histogram_arr = histogram_arr.T # make it (time, units) + # Kernel if cutoff < kernel.min_cutoff: cutoff = kernel.min_cutoff warnings.warn("The width of the kernel was adjusted to a minimally " "allowed width.") - # An odd number of points correctly resolves the median index and the - # fact that the peak of an instantaneous rate should be centered at t=0 - # for symmetric kernels applied on a single spike at t=0. - # See issue https://github.com/NeuralEnsemble/elephant/issues/360 - n_half = math.ceil(cutoff * ( - kernel.sigma / sampling_period).simplified.item()) - cutoff_sigma = cutoff * kernel.sigma.rescale(units).magnitude - if center_kernel: - # t_arr must be centered at the kernel median. - # Not centering on the kernel median leads to underestimating the - # instantaneous rate in cases when sampling_period >> kernel.sigma. - median = kernel.icdf(0.5).rescale(units).item() + scaling_unit = pq.CompoundUnit(f"{sampling_period.rescale('s').item()}*s") + cutoff_sigma = cutoff * kernel.sigma.rescale(scaling_unit).magnitude + if center_kernel: # t_arr is centered on the kernel median. + median = kernel.icdf(0.5).rescale(scaling_unit).item() else: median = 0 - t_arr = np.linspace(-cutoff_sigma + median, stop=cutoff_sigma + median, - num=2 * n_half + 1, endpoint=True) * units - - if center_kernel: - # keep the full convolve range and do the trimming afterwards; - # trimming is performed according to the kernel median index - fft_mode = 'full' - elif trim: - # no median index trimming is involved + + # An odd number of points correctly resolves the median index of the + # kernel. This avoids a timeshift in the rate estimate for symmetric + # kernels. A number x given by 'x = 2 * n + 1' with n being an integer is + # always odd. Using `math.ceil` to calculate `t_arr_kernel_half` ensures an + # integer value, hence the number of points for the kernel (num) given by + # `num=2 * t_arr_kernel_half + 1` is always odd. + # (See Issue #360, https://github.com/NeuralEnsemble/elephant/issues/360) + t_arr_kernel_half = math.ceil( + cutoff * (kernel.sigma / sampling_period).simplified.item()) + t_arr_kernel_length = 2 * t_arr_kernel_half + 1 + + # Shift kernel using the calculated median + t_arr_kernel = np.linspace(start=-cutoff_sigma + median, + stop=cutoff_sigma + median, + num=t_arr_kernel_length, + endpoint=True) * scaling_unit + + # Calculate the kernel values with t_arr + kernel_arr = np.expand_dims( + kernel(t_arr_kernel).rescale(pq.Hz).magnitude, axis=1) + + # Define mode for scipy.signal.fftconvolve + if trim: fft_mode = 'valid' else: - # no median index trimming is involved fft_mode = 'same' - time_vectors = time_vectors.T # make it (time, units) - kernel_arr = np.expand_dims(kernel(t_arr).rescale(pq.Hz).magnitude, axis=1) - rate = scipy.signal.fftconvolve(time_vectors, + rate = scipy.signal.fftconvolve(histogram_arr, kernel_arr, mode=fft_mode) - # the convolution of non-negative vectors is non-negative + # The convolution of non-negative vectors is non-negative rate = np.clip(rate, a_min=0, a_max=None, out=rate) - if center_kernel: # account for the kernel asymmetry - median_id = kernel.median_index(t_arr) - # the size of kernel() output matches the input size, len(t_arr) - kernel_array_size = len(t_arr) - if not trim: - rate = rate[median_id: -kernel_array_size + median_id] - else: - rate = rate[2 * median_id: -2 * (kernel_array_size - median_id)] - t_start = t_start + median_id * units - t_stop = t_stop - (kernel_array_size - median_id) * units - else: - # FIXME: don't shrink the output array - # (to be consistent with center_kernel=True) - # n points have n-1 intervals; - # instantaneous rate is a list of intervals; - # hence, the last element is excluded - rate = rate[:-1] + # Adjust t_start and t_stop + if fft_mode == 'valid': + median_id = kernel.median_index(t_arr_kernel) + kernel_array_size = len(kernel_arr) + t_start = t_start + median_id * scaling_unit + t_stop = t_stop - (kernel_array_size - median_id) * scaling_unit kernel_annotation = dict(type=type(kernel).__name__, sigma=str(kernel.sigma), @@ -968,10 +989,11 @@ def time_histogram(spiketrains, bin_size, t_start=None, t_stop=None, Default: None output : {'counts', 'mean', 'rate'}, optional Normalization of the histogram. Can be one of: - * 'counts': spike counts at each bin (as integer numbers) - * 'mean': mean spike counts per spike train - * 'rate': mean spike rate per spike train. Like 'mean', but the - counts are additionally normalized by the bin width. + * 'counts': spike counts at each bin (as integer numbers). + * 'mean': mean spike counts per spike train. + * 'rate': mean spike rate per spike train. Like 'mean', but the + counts are additionally normalized by the bin width. + Default: 'counts' binary : bool, optional If True, indicates whether all `neo.SpikeTrain` objects should first @@ -1120,25 +1142,28 @@ class Complexity(object): bin_size : pq.Quantity or None, optional Width of the histogram's time bins with units of time. The user must specify the `bin_size` or the `sampling_rate`. - * If None and the `sampling_rate` is available - 1/`sampling_rate` is used. - * If both are given then `bin_size` is used. + * If None and the `sampling_rate` is available + 1/`sampling_rate` is used. + * If both are given then `bin_size` is used. + Default: None binary : bool, optional - * If True then the time histograms will be binary. - * If False the total number of synchronous spikes is counted in the - time histogram. + * If True then the time histograms will be binary. + * If False the total number of synchronous spikes is counted in the + time histogram. + Default: True spread : int, optional Number of bins in which to check for synchronous spikes. Spikes that occur separated by `spread - 1` or less empty bins are considered synchronous. - * ``spread = 0`` corresponds to a bincount accross spike trains. - * ``spread = 1`` corresponds to counting consecutive spikes. - * ``spread = 2`` corresponds to counting consecutive spikes and - spikes separated by exactly 1 empty bin. - * ``spread = n`` corresponds to counting spikes separated by exactly - or less than `n - 1` empty bins. + * ``spread = 0`` corresponds to a bincount accross spike trains. + * ``spread = 1`` corresponds to counting consecutive spikes. + * ``spread = 2`` corresponds to counting consecutive spikes and + spikes separated by exactly 1 empty bin. + * ``spread = n`` corresponds to counting spikes separated by exactly + or less than `n - 1` empty bins. + Default: 0 tolerance : float or None, optional Tolerance for rounding errors in the binning process and in the input @@ -1151,18 +1176,20 @@ class Complexity(object): epoch : neo.Epoch An epoch object containing complexity values, left edges and durations of all intervals with at least one spike. - * ``epoch.array_annotations['complexity']`` contains the - complexity values per spike. - * ``epoch.times`` contains the left edges. - * ``epoch.durations`` contains the durations. + * ``epoch.array_annotations['complexity']`` contains the + complexity values per spike. + * ``epoch.times`` contains the left edges. + * ``epoch.durations`` contains the durations. + time_histogram : neo.Analogsignal A `neo.AnalogSignal` object containing the histogram values. `neo.AnalogSignal[j]` is the histogram computed between `t_start + j * binsize` and `t_start + (j + 1) * binsize`. - * If ``binary = True`` : Number of neurons that spiked in each bin, - regardless of the number of spikes. - * If ``binary = False`` : Number of neurons and spikes per neurons - in each bin. + * If ``binary = True`` : Number of neurons that spiked in each bin, + regardless of the number of spikes. + * If ``binary = False`` : Number of neurons and spikes per neurons + in each bin. + complexity_histogram : np.ndarray The number of occurrences of events of different complexities. `complexity_hist[i]` corresponds to the number of events of @@ -1196,11 +1223,11 @@ class Complexity(object): Notes ----- - * Note that with most common parameter combinations spike times can end up - on bin edges. This makes the binning susceptible to rounding errors which - is accounted for by moving spikes which are within tolerance of the next - bin edge into the following bin. This can be adjusted using the tolerance - parameter and turned off by setting `tolerance=None`. + Note that with most common parameter combinations spike times can end up + on bin edges. This makes the binning susceptible to rounding errors which + is accounted for by moving spikes which are within tolerance of the next + bin edge into the following bin. This can be adjusted using the tolerance + parameter and turned off by setting `tolerance=None`. See also -------- diff --git a/elephant/test/test_spike_train_generation.py b/elephant/test/test_spike_train_generation.py index 457e376c6..6a1434a2e 100644 --- a/elephant/test/test_spike_train_generation.py +++ b/elephant/test/test_spike_train_generation.py @@ -864,7 +864,7 @@ def test_recovered_firing_rate_profile(self): rate_recovered = rate_recovered.flatten().magnitude trim = (rate_profile.shape[0] - rate_recovered.shape[0]) // 2 rate_profile_valid = rate_profile.magnitude.squeeze() - rate_profile_valid = rate_profile_valid[trim: -trim - 1] + rate_profile_valid = rate_profile_valid[trim: -trim] assert_allclose(rate_recovered, rate_profile_valid, rtol=0, atol=rtol * rate.item()) diff --git a/elephant/test/test_statistics.py b/elephant/test/test_statistics.py index d693e604e..1c7649593 100644 --- a/elephant/test/test_statistics.py +++ b/elephant/test/test_statistics.py @@ -22,7 +22,7 @@ from elephant.spike_train_generation import StationaryPoissonProcess -class isi_TestCase(unittest.TestCase): +class IsiTestCase(unittest.TestCase): def setUp(self): self.test_array_2d = np.array([[0.3, 0.56, 0.87, 1.23], [0.02, 0.71, 1.82, 8.46], @@ -82,10 +82,10 @@ def test_unsorted_array(self): np.random.seed(0) array = np.random.rand(100) with self.assertWarns(UserWarning): - isi = statistics.isi(array) + statistics.isi(array) -class isi_cv_TestCase(unittest.TestCase): +class IsiCvTestCase(unittest.TestCase): def setUp(self): self.test_array_regular = np.arange(1, 6) @@ -102,7 +102,7 @@ def test_cv_isi_regular_array_is_zero(self): self.assertEqual(res, targ) -class mean_firing_rate_TestCase(unittest.TestCase): +class MeanFiringRateTestCase(unittest.TestCase): def setUp(self): self.test_array_3d = np.ones([5, 7, 13]) self.test_array_2d = np.array([[0.3, 0.56, 0.87, 1.23], @@ -504,67 +504,77 @@ def setUp(self): # generation of a multiply used specific kernel self.kernel = kernels.TriangularKernel(sigma=0.03 * pq.s) + # calculate instantaneous rate + self.sampling_period = 0.01 * pq.s + self.inst_rate = statistics.instantaneous_rate( + self.spike_train, self.sampling_period, self.kernel, cutoff=0) - def test_instantaneous_rate_and_warnings(self): - st = self.spike_train - sampling_period = 0.01 * pq.s + def test_instantaneous_rate_warnings(self): with self.assertWarns(UserWarning): # Catches warning: The width of the kernel was adjusted to a # minimally allowed width. - inst_rate = statistics.instantaneous_rate( - st, sampling_period, self.kernel, cutoff=0) - self.assertIsInstance(inst_rate, neo.core.AnalogSignal) - self.assertEqual( - inst_rate.sampling_period.simplified, sampling_period.simplified) - self.assertEqual(inst_rate.simplified.units, pq.Hz) - self.assertEqual(inst_rate.t_stop.simplified, st.t_stop.simplified) - self.assertEqual(inst_rate.t_start.simplified, st.t_start.simplified) + statistics.instantaneous_rate(self.spike_train, + self.sampling_period, + self.kernel, cutoff=0) - def test_error_instantaneous_rate(self): - self.assertRaises( + def test_instantaneous_rate_errors(self): + self.assertRaises( # input is not neo.SpikeTrain TypeError, statistics.instantaneous_rate, spiketrains=[1, 2, 3] * pq.s, sampling_period=0.01 * pq.ms, kernel=self.kernel) - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=[1, 2, 3], - sampling_period=0.01 * pq.ms, kernel=self.kernel) - st = self.spike_train - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=0.01, kernel=self.kernel) - self.assertRaises( - ValueError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=-0.01 * pq.ms, kernel=self.kernel) - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=0.01 * pq.ms, kernel='NONE') - self.assertRaises(TypeError, statistics.instantaneous_rate, - self.spike_train, - sampling_period=0.01 * pq.s, kernel='wrong_string', - t_start=self.st_tr[0] * pq.s, - t_stop=self.st_tr[1] * pq.s, - trim=False) - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=0.01 * pq.ms, kernel=self.kernel, - cutoff=20 * pq.ms) - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=0.01 * pq.ms, kernel=self.kernel, t_start=2) - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=0.01 * pq.ms, kernel=self.kernel, - t_stop=20 * pq.mV) - self.assertRaises( - TypeError, statistics.instantaneous_rate, spiketrains=st, - sampling_period=0.01 * pq.ms, kernel=self.kernel, trim=1) - - # cannot estimate a kernel for a list of spiketrains - self.assertRaises(ValueError, statistics.instantaneous_rate, - spiketrains=[st, st], sampling_period=10 * pq.ms, - kernel='auto') - - def test_rate_estimation_consistency(self): + self.assertRaises( # sampling period is not time quantity + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, kernel=self.kernel, + sampling_period=0.01) + self.assertRaises( # sampling period is < 0 + ValueError, statistics.instantaneous_rate, + spiketrains=self.spike_train, kernel=self.kernel, + sampling_period=-0.01 * pq.ms) + self.assertRaises( # no kernel or kernel='auto' + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, sampling_period=0.01 * pq.ms, + kernel='NONE') + self.assertRaises( # wrong string for kernel='string' + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, sampling_period=0.01 * pq.s, + kernel='wrong_string') + self.assertRaises( # cutoff is not float or int + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, sampling_period=0.01 * pq.ms, + kernel=self.kernel, cutoff=20 * pq.ms) + self.assertRaises( # t_start not time quantity + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, sampling_period=0.01 * pq.ms, + kernel=self.kernel, t_start=2) + self.assertRaises( # t_stop not time quantity + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, sampling_period=0.01 * pq.ms, + kernel=self.kernel, t_stop=20 * pq.mV) + self.assertRaises( # trim is not bool + TypeError, statistics.instantaneous_rate, + spiketrains=self.spike_train, sampling_period=0.01 * pq.ms, + kernel=self.kernel, trim=1) + self.assertRaises( # can't estimate a kernel for a list of spiketrains + ValueError, statistics.instantaneous_rate, + spiketrains=[self.spike_train, self.spike_train], + sampling_period=10 * pq.ms, kernel='auto') + + def test_instantaneous_rate_output(self): + # return type correct + self.assertIsInstance(self.inst_rate, neo.core.AnalogSignal) + # sampling_period input and output same + self.assertEqual(self.inst_rate.sampling_period.simplified, + self.sampling_period.simplified) + # return correct units pq.Hz + self.assertEqual(self.inst_rate.simplified.units, pq.Hz) + # input and output t_stop same + self.assertEqual(self.spike_train.t_stop.simplified, + self.inst_rate.t_stop.simplified) + # input and output t_start same + self.assertEqual(self.inst_rate.t_start.simplified, + self.spike_train.t_start.simplified) + + def test_instantaneous_rate_rate_estimation_consistency(self): """ Test, whether the integral of the rate estimation curve is (almost) equal to the number of spikes of the spike train. @@ -575,6 +585,7 @@ def test_rate_estimation_consistency(self): issubclass(kern_cls, kernels.Kernel) and kern_cls is not kernels.Kernel and kern_cls is not kernels.SymmetricKernel) + # set sigma kernels_available = [kern_cls(sigma=0.5 * pq.s, invert=False) for kern_cls in kernel_types] kernels_available.append('auto') @@ -595,45 +606,54 @@ def test_rate_estimation_consistency(self): border_correction=border_correction ) num_spikes = len(self.spike_train) - auc = spint.cumtrapz( + area_under_curve = spint.cumtrapz( y=rate_estimate.magnitude[:, 0], x=rate_estimate.times.rescale('s').magnitude)[-1] - self.assertAlmostEqual(num_spikes, auc, + self.assertAlmostEqual(num_spikes, area_under_curve, delta=0.01 * num_spikes) - def test_not_center_kernel(self): - # issue 107 + def test_instantaneous_rate_regression_107(self): + # Create a spiketrain with t_start=0s, t_stop=2s and a single spike at + # t=1s. Now choose an asymmetric kernel starting at t=0 to avoid a rise + # in firing rate before the response onset, so to say to avoid 'looking + # into the future' from the perspective of the neuron. t_spike = 1 * pq.s - st = neo.SpikeTrain([t_spike], t_start=0 * pq.s, t_stop=2 * pq.s, - units=pq.s) + spiketrain = neo.SpikeTrain( + [t_spike], t_start=0 * pq.s, t_stop=2 * pq.s, units=pq.s) kernel = kernels.AlphaKernel(200 * pq.ms) - fs = 0.1 * pq.ms - rate = statistics.instantaneous_rate(st, - sampling_period=fs, - kernel=kernel, - center_kernel=False) + sampling_period = 0.1 * pq.ms + rate = statistics.instantaneous_rate( + spiketrains=spiketrain, sampling_period=sampling_period, + kernel=kernel, center_kernel=False) + # find positive nonezero rate estimates rate_nonzero_index = np.nonzero(rate > 1e-6)[0] - # where the mass is concentrated - rate_mass = rate.times.rescale(t_spike.units)[rate_nonzero_index] - all_after_response_onset = (rate_mass >= t_spike).all() + # find times, where the mass is concentrated, i.e. rate is estimated>0 + rate_mass_times = rate.times.rescale(t_spike.units)[rate_nonzero_index] + # all times, where rate is >0 should occur after response onset + # (t_spike is at 1s) + all_after_response_onset = (rate_mass_times >= t_spike).all() self.assertTrue(all_after_response_onset) - def test_regression_288(self): - np.random.seed(9) - sampling_period = 200 * pq.ms - spiketrain = StationaryPoissonProcess( - 10 * pq.Hz, t_start=0 * pq.s, t_stop=10 * pq.s - ).generate_spiketrain() - kernel = kernels.AlphaKernel(sigma=5 * pq.ms, invert=True) + def test_instantaneous_rate_regression_288(self): # check that instantaneous_rate "works" for kernels with small sigma - # without triggering an incomprehensible error - rate = statistics.instantaneous_rate(spiketrain, - sampling_period=sampling_period, - kernel=kernel) - self.assertEqual( - len(rate), (spiketrain.t_stop / sampling_period).simplified.item()) - - def test_small_kernel_sigma(self): + # without triggering an incomprehensible error: + # ValueError: zero-size array to reduction operation minimum which has + # no identity + try: + np.random.seed(9) + sampling_period = 200 * pq.ms + spiketrain = StationaryPoissonProcess( + 10 * pq.Hz, t_start=0 * pq.s, + t_stop=10 * pq.s).generate_spiketrain() + kernel = kernels.AlphaKernel(sigma=5 * pq.ms, invert=True) + rate = statistics.instantaneous_rate( + spiketrain, sampling_period=sampling_period, kernel=kernel) + except ValueError: + self.fail('When providing a kernel on a much smaller time scale ' + 'than sampling rate requested the instantaneous rate ' + 'estimation will fail on numpy level ') + + def test_instantaneous_rate_small_kernel_sigma(self): # Test that the instantaneous rate is overestimated when # kernel.sigma << sampling_period and center_kernel is True. # The setup is set to match the issue 288. @@ -642,8 +662,9 @@ def test_small_kernel_sigma(self): sigma = 5 * pq.ms rate_expected = 10 * pq.Hz spiketrain = StationaryPoissonProcess( - rate_expected, t_start=0 * pq.s, t_stop=10 * pq.s - ).generate_spiketrain() + rate_expected, t_start=0 * pq.s, + t_stop=10 * pq.s).generate_spiketrain() + kernel_types = tuple( kern_cls for kern_cls in kernels.__dict__.values() if isinstance(kern_cls, type) and @@ -659,13 +680,14 @@ def test_small_kernel_sigma(self): kernel=kernel, center_kernel=True) self.assertGreater(rate.mean(), rate_expected) - def test_spikes_on_edges(self): + def test_instantaneous_rate_spikes_on_edges(self): # this test demonstrates that the trimming (convolve valid mode) - # removes the edge spikes, underestimating the true firing rate and - # thus is not able to reconstruct the number of spikes in a - # spiketrain (see test_rate_estimation_consistency) + # removes the edges of the rate estimate, underestimating the true + # firing rate and thus is not able to reconstruct the number of spikes + # in a spiketrain (see test_rate_estimation_consistency) cutoff = 5 sampling_period = 0.01 * pq.s + # with t_spikes = [-5, 5]s the isi is 10s, so 1/isi 0.1 Hz t_spikes = np.array([-cutoff, cutoff]) * pq.s spiketrain = neo.SpikeTrain(t_spikes, t_start=t_spikes[0], t_stop=t_spikes[-1]) @@ -685,9 +707,14 @@ def test_spikes_on_edges(self): kernel=kernel, cutoff=cutoff, trim=True, center_kernel=center_kernel) - assert_array_almost_equal(rate.magnitude, 0, decimal=3) - - def test_trim_as_convolve_mode(self): + assert_array_almost_equal(rate.magnitude, 0, decimal=2) + + def test_instantaneous_rate_center_kernel(self): + # this test is obsolete since trimming is now always done by + # np.fftconvolve, in earlier version trimming was implemented for + # center_kernel = True + # This test now verifies, that an already centered kernel is not + # affected by center_kernel = True. cutoff = 5 sampling_period = 0.01 * pq.s t_spikes = np.linspace(-cutoff, cutoff, num=(2 * cutoff + 1)) * pq.s @@ -706,15 +733,16 @@ def test_trim_as_convolve_mode(self): for trim in (False, True): rate_centered = statistics.instantaneous_rate( spiketrain, sampling_period=sampling_period, - kernel=kernel, cutoff=cutoff, trim=trim) + kernel=kernel, cutoff=cutoff, trim=trim, + center_kernel=True) - rate_convolve = statistics.instantaneous_rate( + rate_not_centered = statistics.instantaneous_rate( spiketrain, sampling_period=sampling_period, kernel=kernel, cutoff=cutoff, trim=trim, center_kernel=False) - assert_array_almost_equal(rate_centered, rate_convolve) + assert_array_almost_equal(rate_centered, rate_not_centered) - def test_instantaneous_rate_spiketrainlist(self): + def test_instantaneous_rate_list_of_spiketrains(self): np.random.seed(19) duration_effective = self.st_dur - 2 * self.st_margin st_num_spikes = np.random.poisson(self.st_rate * duration_effective) @@ -725,37 +753,37 @@ def test_instantaneous_rate_spiketrainlist(self): spike_train2 = neo.SpikeTrain(spike_train2 * pq.s, t_start=self.st_tr[0] * pq.s, t_stop=self.st_tr[1] * pq.s) - st_rate_1 = statistics.instantaneous_rate(self.spike_train, - sampling_period=0.01 * pq.s, - kernel=self.kernel) - st_rate_2 = statistics.instantaneous_rate(spike_train2, - sampling_period=0.01 * pq.s, - kernel=self.kernel) + + st_rate_1 = statistics.instantaneous_rate( + self.spike_train, sampling_period=self.sampling_period, + kernel=self.kernel) + st_rate_2 = statistics.instantaneous_rate( + spike_train2, sampling_period=self.sampling_period, + kernel=self.kernel) + rate_concat = np.c_[st_rate_1, st_rate_2] + combined_rate = statistics.instantaneous_rate( [self.spike_train, spike_train2], - sampling_period=0.01 * pq.s, + sampling_period=self.sampling_period, kernel=self.kernel) - rate_concat = np.c_[st_rate_1, st_rate_2] # 'time_vector.dtype' in instantaneous_rate() is changed from float64 # to float32 which results in 3e-6 abs difference assert_array_almost_equal(combined_rate.magnitude, rate_concat.magnitude, decimal=5) - # Regression test for #144 def test_instantaneous_rate_regression_144(self): # The following spike train contains spikes that are so close to each # other, that the optimal kernel cannot be detected. Therefore, the # function should react with a ValueError. st = neo.SpikeTrain([2.12, 2.13, 2.15] * pq.s, t_stop=10 * pq.s) - self.assertRaises(ValueError, statistics.instantaneous_rate, st, - 1 * pq.ms) + self.assertRaises( + ValueError, statistics.instantaneous_rate, st, 1 * pq.ms) - # Regression test for #245 def test_instantaneous_rate_regression_245(self): # This test makes sure that the correct kernel width is chosen when # selecting 'auto' as kernel spiketrain = neo.SpikeTrain( - range(1, 30) * pq.ms, t_start=0 * pq.ms, t_stop=30 * pq.ms) + pq.ms * range(1, 30), t_start=0 * pq.ms, t_stop=30 * pq.ms) # This is the correct procedure to attain the kernel: first, the result # of sskernel retrieves the kernel bandwidth of an optimal Gaussian @@ -789,17 +817,16 @@ def test_instantaneous_rate_grows_with_sampling_period(self): for sampling_period in np.linspace(1, 1000, num=10) * pq.ms: with self.subTest(sampling_period=sampling_period): rate = statistics.instantaneous_rate( - spiketrain, - sampling_period=sampling_period, - kernel=kernel) + spiketrain, sampling_period=sampling_period, kernel=kernel) rates_mean.append(rate.mean()) # rate means are greater or equal the expected rate assert_array_less(rate_expected, rates_mean) # check sorted self.assertTrue(np.all(rates_mean[:-1] < rates_mean[1:])) - # Regression test for #360 - def test_centered_at_origin(self): + def test_instantaneous_rate_regression_360(self): + # This test check if the resulting rate is centered for a spiketrain + # with spikes at [-0.0001, 0, 0.0001]. # Skip RectangularKernel because it doesn't have a strong peak. kernel_types = tuple( kern_cls for kern_cls in kernels.__dict__.values() @@ -818,11 +845,11 @@ def test_centered_at_origin(self): rate = statistics.instantaneous_rate(spiketrain, sampling_period=20 * pq.ms, kernel=kernel) - # the peak time must be centered at origin + # the peak time must be centered at origin t=0 self.assertEqual(rate.times[np.argmax(rate)], 0) # second part: a single spike at t=0 - periods = [2 ** c for c in range(-3, 6)] + periods = [2 ** exp for exp in range(-3, 6)] for period in periods: with self.subTest(period=period): spiketrain = neo.SpikeTrain(np.array([0]) * pq.s, @@ -835,7 +862,7 @@ def test_centered_at_origin(self): kernel=kernel) self.assertEqual(rate.times[np.argmax(rate)], 0) - def test_annotations(self): + def test_instantaneous_rate_annotations(self): spiketrain = neo.SpikeTrain([1, 2], t_stop=2 * pq.s, units=pq.s) kernel = kernels.AlphaKernel(sigma=100 * pq.ms) rate = statistics.instantaneous_rate(spiketrain, @@ -847,30 +874,86 @@ def test_annotations(self): self.assertIn('kernel', rate.annotations) self.assertEqual(rate.annotations['kernel'], kernel_annotation) - def test_border_correction(self): - np.random.seed(0) - n_spiketrains = 125 - rate = 50. * pq.Hz - t_start = 0. * pq.ms - t_stop = 1000. * pq.ms - - sampling_period = 0.1 * pq.ms - - trial_list = StationaryPoissonProcess( - rate=rate, t_start=t_start, t_stop=t_stop - ).generate_n_spiketrains(n_spiketrains) - - for correction in (True, False): - rates = [] - for trial in trial_list: - # calculate the instantaneous rate, discard extra dimension - instantaneous_rate = statistics.instantaneous_rate( - spiketrains=trial, - sampling_period=sampling_period, - kernel='auto', - border_correction=correction - ) - rates.append(instantaneous_rate) + def test_instantaneous_rate_regression_374(self): + # Check if the last interval is dropped. + # In this example a spiketrain with t_start=0, t_stop=9.8, and spikes + # at [9.65, 9.7, 9.75]s is used. When calculating the rate estimate + # with a sampling_period of 1s, the last interval [9.0, 9.8) should be + # dropped and not be considered in the calculation. + spike_times = np.array([9.65, 9.7, 9.75]) * pq.s + + spiketrain = neo.SpikeTrain(spike_times, + t_start=0, + t_stop=9.8) + kernel = kernels.GaussianKernel(sigma=250 * pq.ms) + sampling_period = 1000 * pq.ms + rate = statistics.instantaneous_rate( + spiketrain, + sampling_period=sampling_period, + kernel=kernel, center_kernel=False, trim=False, cutoff=1) + assert_array_almost_equal(rate.magnitude, 0) + + def test_instantaneous_rate_rate_times(self): + # check if the differences between the rate.times is equal to + # sampling_period + st = self.spike_train + periods = [1, 0.99, 0.35, 11, st.duration]*pq.s + for period in periods: + rate = statistics.instantaneous_rate(st, + sampling_period=period, + kernel=self.kernel, + center_kernel=True, + trim=False) + rate_times_diff = np.diff(rate.times) + period_times = np.full_like(rate_times_diff, period) + assert_array_almost_equal(rate_times_diff, period_times) + + def test_instantaneous_rate_bin_edges(self): + # This test checks if the bin edges used to calculate the rate estimate + # are multiples of the sampling rate. In the following example, the + # rate maximum is expected to be at 5.785s. + # See PR#453 https://github.com/NeuralEnsemble/elephant/pull/453 + spike_times = np.array( + [4.45, 4.895, 5.34, 5.785, 6.23, 6.675, 7.12]) * pq.s + # add 0.01 s + shifted_spike_times = spike_times + .01 * pq.s + + spiketrain = neo.SpikeTrain(shifted_spike_times, + t_start=0, + t_stop=10) + kernel = kernels.GaussianKernel(sigma=500 * pq.ms) + sampling_period = 445 * pq.ms + rate = statistics.instantaneous_rate( + spiketrain, + sampling_period=sampling_period, + kernel=kernel, center_kernel=True, trim=False) + self.assertAlmostEqual(spike_times[3].magnitude.item(), + rate.times[rate.argmax()].magnitude.item()) + + def test_instantaneous_rate_border_correction(self): + np.random.seed(0) + n_spiketrains = 125 + rate = 50. * pq.Hz + t_start = 0. * pq.ms + t_stop = 1000. * pq.ms + + sampling_period = 0.1 * pq.ms + + trial_list = StationaryPoissonProcess( + rate=rate, t_start=t_start, t_stop=t_stop + ).generate_n_spiketrains(n_spiketrains) + + for correction in (True, False): + rates = [] + for trial in trial_list: + # calculate the instantaneous rate, discard extra dimension + instantaneous_rate = statistics.instantaneous_rate( + spiketrains=trial, + sampling_period=sampling_period, + kernel='auto', + border_correction=correction + ) + rates.append(instantaneous_rate) # The average estimated rate gives the average estimated value of # the firing rate in each time bin.