From c812f1ce331e45ef9ba81c9e84b63504828c1507 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Fri, 12 May 2023 16:29:35 +0200 Subject: [PATCH 01/20] fix spike_time_tiling_coefficient --- elephant/spike_train_correlation.py | 56 +++++++++++------------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index b64667e45..7c003ff39 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -20,6 +20,7 @@ import neo import numpy as np +import quantities import quantities as pq import scipy.signal from scipy import integrate @@ -824,7 +825,10 @@ def cross_correlation_histogram( @deprecated_alias(spiketrain_1='spiketrain_i', spiketrain_2='spiketrain_j') -def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): +def spike_time_tiling_coefficient(spiketrain_i : neo.core.SpikeTrain, + spiketrain_j : neo.core.SpikeTrain, + dt: quantities.Quantity + = 0.005 * pq.s) -> float: """ Calculates the Spike Time Tiling Coefficient (STTC) as described in :cite:`correlation-Cutts2014_14288` following their implementation in C. @@ -892,43 +896,23 @@ def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): """ - def run_P(spiketrain_i, spiketrain_j): + def run_P(spiketrain_i : neo.core.SpikeTrain, + spiketrain_j : neo.core.SpikeTrain, + dt : quantities.Quantity=dt) -> int: """ - Check every spike in train 1 to see if there's a spike in train 2 + Check every spike in train i to see if there's a spike in train j within dt """ - N2 = len(spiketrain_j) - - # Search spikes of spiketrain_i in spiketrain_j - # ind will contain index of - ind = np.searchsorted(spiketrain_j.times, spiketrain_i.times) - - # To prevent IndexErrors - # If a spike of spiketrain_i is after the last spike of spiketrain_j, - # the index is N2, however spiketrain_j[N2] raises an IndexError. - # By shifting this index, the spike of spiketrain_i will be compared - # to the last 2 spikes of spiketrain_j (negligible overhead). - # Note: Not necessary for index 0 that will be shifted to -1, - # because spiketrain_j[-1] is valid (additional negligible comparison) - ind[ind == N2] = N2 - 1 - - # Compare to nearest spike in spiketrain_j BEFORE spike in spiketrain_i - close_left = np.abs( - spiketrain_j.times[ind - 1] - spiketrain_i.times) <= dt - # Compare to nearest spike in spiketrain_j AFTER (or simultaneous) - # spike in spiketrain_j - close_right = np.abs( - spiketrain_j.times[ind] - spiketrain_i.times) <= dt - - # spiketrain_j spikes that are in [-dt, dt] range of spiketrain_i - # spikes are counted only ONCE (as per original implementation) - close = close_left + close_right - - # Count how many spikes in spiketrain_i have a "partner" in - # spiketrain_j - return np.count_nonzero(close) - - def run_T(spiketrain): + within_dt = 0 + for spike_i in spiketrain_i.times: + for spike_j in spiketrain_j.times: + if abs(spike_i - spike_j) <= dt: + within_dt += 1 + break + + return within_dt + + def run_T(spiketrain : neo.core.SpikeTrain) -> float: """ Calculate the proportion of the total recording time 'tiled' by spikes. """ @@ -948,7 +932,7 @@ def run_T(spiketrain): else: # if more than a single spike in the train # Calculate difference between consecutive spikes - diff = np.diff(spiketrain) + diff = abs(np.diff(spiketrain)) # Find spikes whose tiles overlap idx = np.where(diff < 2 * dt)[0] From e618914d46cbaa929fbc1006d73c81047eca5cb4 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 22 May 2023 21:29:53 +0200 Subject: [PATCH 02/20] Revert "fix spike_time_tiling_coefficient" This reverts commit c812f1ce331e45ef9ba81c9e84b63504828c1507. --- elephant/spike_train_correlation.py | 56 ++++++++++++++++++----------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index 7c003ff39..b64667e45 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -20,7 +20,6 @@ import neo import numpy as np -import quantities import quantities as pq import scipy.signal from scipy import integrate @@ -825,10 +824,7 @@ def cross_correlation_histogram( @deprecated_alias(spiketrain_1='spiketrain_i', spiketrain_2='spiketrain_j') -def spike_time_tiling_coefficient(spiketrain_i : neo.core.SpikeTrain, - spiketrain_j : neo.core.SpikeTrain, - dt: quantities.Quantity - = 0.005 * pq.s) -> float: +def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): """ Calculates the Spike Time Tiling Coefficient (STTC) as described in :cite:`correlation-Cutts2014_14288` following their implementation in C. @@ -896,23 +892,43 @@ def spike_time_tiling_coefficient(spiketrain_i : neo.core.SpikeTrain, """ - def run_P(spiketrain_i : neo.core.SpikeTrain, - spiketrain_j : neo.core.SpikeTrain, - dt : quantities.Quantity=dt) -> int: + def run_P(spiketrain_i, spiketrain_j): """ - Check every spike in train i to see if there's a spike in train j + Check every spike in train 1 to see if there's a spike in train 2 within dt """ - within_dt = 0 - for spike_i in spiketrain_i.times: - for spike_j in spiketrain_j.times: - if abs(spike_i - spike_j) <= dt: - within_dt += 1 - break - - return within_dt - - def run_T(spiketrain : neo.core.SpikeTrain) -> float: + N2 = len(spiketrain_j) + + # Search spikes of spiketrain_i in spiketrain_j + # ind will contain index of + ind = np.searchsorted(spiketrain_j.times, spiketrain_i.times) + + # To prevent IndexErrors + # If a spike of spiketrain_i is after the last spike of spiketrain_j, + # the index is N2, however spiketrain_j[N2] raises an IndexError. + # By shifting this index, the spike of spiketrain_i will be compared + # to the last 2 spikes of spiketrain_j (negligible overhead). + # Note: Not necessary for index 0 that will be shifted to -1, + # because spiketrain_j[-1] is valid (additional negligible comparison) + ind[ind == N2] = N2 - 1 + + # Compare to nearest spike in spiketrain_j BEFORE spike in spiketrain_i + close_left = np.abs( + spiketrain_j.times[ind - 1] - spiketrain_i.times) <= dt + # Compare to nearest spike in spiketrain_j AFTER (or simultaneous) + # spike in spiketrain_j + close_right = np.abs( + spiketrain_j.times[ind] - spiketrain_i.times) <= dt + + # spiketrain_j spikes that are in [-dt, dt] range of spiketrain_i + # spikes are counted only ONCE (as per original implementation) + close = close_left + close_right + + # Count how many spikes in spiketrain_i have a "partner" in + # spiketrain_j + return np.count_nonzero(close) + + def run_T(spiketrain): """ Calculate the proportion of the total recording time 'tiled' by spikes. """ @@ -932,7 +948,7 @@ def run_T(spiketrain : neo.core.SpikeTrain) -> float: else: # if more than a single spike in the train # Calculate difference between consecutive spikes - diff = abs(np.diff(spiketrain)) + diff = np.diff(spiketrain) # Find spikes whose tiles overlap idx = np.where(diff < 2 * dt)[0] From b9c98e84e3ee58cbea9669637aec446f355f91d7 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 22 May 2023 22:08:05 +0200 Subject: [PATCH 03/20] add DOI to citation --- doc/bib/elephant.bib | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/bib/elephant.bib b/doc/bib/elephant.bib index 1f0bfac2a..c4164aef3 100644 --- a/doc/bib/elephant.bib +++ b/doc/bib/elephant.bib @@ -243,7 +243,8 @@ @article{Cutts2014_14288 number={43}, pages={14288--14303}, year={2014}, - publisher={Soc Neuroscience} + publisher={Soc Neuroscience}, + doi={10.1523/JNEUROSCI.2767-14.2014} } @article{Holt1996_1806, From 3353e57fc2311cdde40eca11c61cd62293756547 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 22 May 2023 23:29:22 +0200 Subject: [PATCH 04/20] restructure unittests --- elephant/test/test_spike_train_correlation.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index f4230763e..5847dda3b 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -727,7 +727,7 @@ def setUp(self): self.st_2 = neo.SpikeTrain( self.test_array_1d_2, units='ms', t_stop=50.) - def test_sttc(self): + def test_sttc_different_units(self): # test for result target = 0.495860165593 self.assertAlmostEqual(target, sc.sttc(self.st_1, self.st_2, @@ -737,28 +737,33 @@ def test_sttc(self): self.assertAlmostEqual(target, sc.sttc(self.st_1, self.st_2, 5.0 * pq.ms)) + def test_sttc_not_enough_spiketrains(self): + # test no spiketrains self.assertTrue(np.isnan(sc.sttc([], []))) # test one spiketrain self.assertTrue(np.isnan(sc.sttc(self.st_1, []))) + def test_sttc_one_spike(self): # test for one spike in a spiketrain st1 = neo.SpikeTrain([1], units='ms', t_stop=1.) st2 = neo.SpikeTrain([5], units='ms', t_stop=10.) self.assertEqual(sc.sttc(st1, st2), 1.0) self.assertTrue(bool(sc.sttc(st1, st2, 0.1 * pq.ms) < 0)) + def test_sttc_high_value_dt(self): # test for high value of dt self.assertEqual(sc.sttc(self.st_1, self.st_2, dt=5 * pq.s), 1.0) + def test_sttc_edge_cases(self): # test for TA = PB = 1 but TB /= PA /= 1 and vice versa + st2 = neo.SpikeTrain([5], units='ms', t_stop=10.) st3 = neo.SpikeTrain([1, 5, 9], units='ms', t_stop=10.) target2 = 1. / 3. - self.assertAlmostEqual(target2, sc.sttc(st3, st2, - 0.003 * pq.s)) - self.assertAlmostEqual(target2, sc.sttc(st2, st3, - 0.003 * pq.s)) + + self.assertAlmostEqual(target2, sc.sttc(st3, st2, 0.003 * pq.s)) + self.assertAlmostEqual(target2, sc.sttc(st2, st3, 0.003 * pq.s)) def test_exist_alias(self): # Test if alias cch still exists. From 73121f3ab1e7c31f302d3216919f0dc4bf7d1ffb Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 22 May 2023 23:30:13 +0200 Subject: [PATCH 05/20] add documentation for sttc input spiketrains should have sorted spiketimes --- doc/conf.py | 7 +++-- elephant/spike_train_correlation.py | 40 ++++++++++++++++------------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/doc/conf.py b/doc/conf.py index e98a1a05a..59b90aa3d 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -349,8 +349,11 @@ # configuration for intersphinx: refer to Viziphant intersphinx_mapping = { - 'viziphant': ('https://viziphant.readthedocs.io/en/latest/', None), - 'numpy': ('https://numpy.org/doc/stable', None) + 'viziphant': ('https://viziphant.readthedocs.io/en/stable/', None), + 'numpy': ('https://numpy.org/doc/stable', None), + 'neo': ('https://neo.readthedocs.io/en/stable/', None), + 'quantities': ('https://python-quantities.readthedocs.io/en/stable/', None), + 'python': ('https://docs.python.org/3/', None) } diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index b64667e45..2b08e9f7c 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -824,7 +824,9 @@ def cross_correlation_histogram( @deprecated_alias(spiketrain_1='spiketrain_i', spiketrain_2='spiketrain_j') -def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): +def spike_time_tiling_coefficient(spiketrain_i: neo.core.SpikeTrain, + spiketrain_j: neo.core.SpikeTrain, + dt: pq.Quantity = 0.005 * pq.s) -> float: """ Calculates the Spike Time Tiling Coefficient (STTC) as described in :cite:`correlation-Cutts2014_14288` following their implementation in C. @@ -832,7 +834,7 @@ def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): It has been proposed as a replacement for the correlation index as it presents several advantages (e.g. it's not confounded by firing rate, appropriately distinguishes lack of correlation from anti-correlation, - periods of silence don't add to the correlation and it's sensitive to + periods of silence don't add to the correlation, and it's sensitive to firing patterns). The STTC is calculated as follows: @@ -845,7 +847,7 @@ def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): in train 1, `PB` is the same proportion for the spikes in train 2; `TA` is the proportion of total recording time within `[-dt, +dt]` of any spike in train 1, TB is the same proportion for train 2. - For :math:`TA = PB = 1`and for :math:`TB = PA = 1` + For :math:`TA = PB = 1` and for :math:`TB = PA = 1` the resulting :math:`0/0` is replaced with :math:`1`, since every spike from the train with :math:`T = 1` is within `[-dt, +dt]` of a spike of the other train. @@ -857,9 +859,9 @@ def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): Parameters ---------- - spiketrain_i, spiketrain_j : neo.SpikeTrain + spiketrain_i, spiketrain_j : :class:`neo.core.SpikeTrain` Spike trains to cross-correlate. They must have the same `t_start` and - `t_stop`. + `t_stop`, further the spike times must be sorted. dt : pq.Quantity. The synchronicity window is used for both: the quantification of the proportion of total recording time that lies `[-dt, +dt]` of each spike @@ -869,9 +871,9 @@ def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): Returns ------- - index : float or np.nan - The spike time tiling coefficient (STTC). Returns np.nan if any spike - train is empty. + index : :class:`float` or :obj:`numpy.nan` + The spike time tiling coefficient (STTC). Returns :obj:`numpy.nan` if + any spike train is empty. Notes ----- @@ -892,9 +894,11 @@ def spike_time_tiling_coefficient(spiketrain_i, spiketrain_j, dt=0.005 * pq.s): """ - def run_P(spiketrain_i, spiketrain_j): + def run_P(spiketrain_i: neo.core.SpikeTrain, + spiketrain_j: neo.core.SpikeTrain, + dt: pq.Quantity = dt) -> int: """ - Check every spike in train 1 to see if there's a spike in train 2 + Check every spike in train i to see if there's a spike in train j within dt """ N2 = len(spiketrain_j) @@ -903,7 +907,7 @@ def run_P(spiketrain_i, spiketrain_j): # ind will contain index of ind = np.searchsorted(spiketrain_j.times, spiketrain_i.times) - # To prevent IndexErrors + # To prevent IndexErrors: # If a spike of spiketrain_i is after the last spike of spiketrain_j, # the index is N2, however spiketrain_j[N2] raises an IndexError. # By shifting this index, the spike of spiketrain_i will be compared @@ -913,12 +917,12 @@ def run_P(spiketrain_i, spiketrain_j): ind[ind == N2] = N2 - 1 # Compare to nearest spike in spiketrain_j BEFORE spike in spiketrain_i - close_left = np.abs( - spiketrain_j.times[ind - 1] - spiketrain_i.times) <= dt + close_left = np.abs(spiketrain_j.times[ind - 1] - spiketrain_i.times + ) <= dt # Compare to nearest spike in spiketrain_j AFTER (or simultaneous) # spike in spiketrain_j - close_right = np.abs( - spiketrain_j.times[ind] - spiketrain_i.times) <= dt + close_right = np.abs(spiketrain_j.times[ind] - spiketrain_i.times + ) <= dt # spiketrain_j spikes that are in [-dt, dt] range of spiketrain_i # spikes are counted only ONCE (as per original implementation) @@ -928,7 +932,7 @@ def run_P(spiketrain_i, spiketrain_j): # spiketrain_j return np.count_nonzero(close) - def run_T(spiketrain): + def run_T(spiketrain: neo.core.SpikeTrain) -> float: """ Calculate the proportion of the total recording time 'tiled' by spikes. """ @@ -974,9 +978,9 @@ def run_T(spiketrain): else: TA = run_T(spiketrain_i) TB = run_T(spiketrain_j) - PA = run_P(spiketrain_i, spiketrain_j) + PA = run_P(spiketrain_i, spiketrain_j, dt) PA = PA / N1 - PB = run_P(spiketrain_j, spiketrain_i) + PB = run_P(spiketrain_j, spiketrain_i, dt) PB = PB / N2 # check if the P and T values are 1 to avoid division by zero # This only happens for TA = PB = 1 and/or TB = PA = 1, From a70581f19b3e8fd3c2ae2da8d8d7a21234d5d55d Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Tue, 23 May 2023 10:25:36 +0200 Subject: [PATCH 06/20] refactor run_P --- elephant/spike_train_correlation.py | 77 ++++++++++++----------------- 1 file changed, 31 insertions(+), 46 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index 2b08e9f7c..fb8f4b32e 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -861,7 +861,7 @@ def spike_time_tiling_coefficient(spiketrain_i: neo.core.SpikeTrain, ---------- spiketrain_i, spiketrain_j : :class:`neo.core.SpikeTrain` Spike trains to cross-correlate. They must have the same `t_start` and - `t_stop`, further the spike times must be sorted. + `t_stop`. dt : pq.Quantity. The synchronicity window is used for both: the quantification of the proportion of total recording time that lies `[-dt, +dt]` of each spike @@ -894,43 +894,22 @@ def spike_time_tiling_coefficient(spiketrain_i: neo.core.SpikeTrain, """ - def run_P(spiketrain_i: neo.core.SpikeTrain, - spiketrain_j: neo.core.SpikeTrain, - dt: pq.Quantity = dt) -> int: + def run_P(spiketrain_j: neo.core.SpikeTrain, + spiketrain_i: neo.core.SpikeTrain, + dt: pq.Quantity = dt) -> float: """ - Check every spike in train i to see if there's a spike in train j - within dt + Returns number of spikes in spiketrain_j which lie within +- dt of + any spike from spiketrain_i, divided by the total number of spikes in + spiketrain_j """ - N2 = len(spiketrain_j) - - # Search spikes of spiketrain_i in spiketrain_j - # ind will contain index of - ind = np.searchsorted(spiketrain_j.times, spiketrain_i.times) - - # To prevent IndexErrors: - # If a spike of spiketrain_i is after the last spike of spiketrain_j, - # the index is N2, however spiketrain_j[N2] raises an IndexError. - # By shifting this index, the spike of spiketrain_i will be compared - # to the last 2 spikes of spiketrain_j (negligible overhead). - # Note: Not necessary for index 0 that will be shifted to -1, - # because spiketrain_j[-1] is valid (additional negligible comparison) - ind[ind == N2] = N2 - 1 - - # Compare to nearest spike in spiketrain_j BEFORE spike in spiketrain_i - close_left = np.abs(spiketrain_j.times[ind - 1] - spiketrain_i.times - ) <= dt - # Compare to nearest spike in spiketrain_j AFTER (or simultaneous) - # spike in spiketrain_j - close_right = np.abs(spiketrain_j.times[ind] - spiketrain_i.times - ) <= dt - - # spiketrain_j spikes that are in [-dt, dt] range of spiketrain_i - # spikes are counted only ONCE (as per original implementation) - close = close_left + close_right - - # Count how many spikes in spiketrain_i have a "partner" in - # spiketrain_j - return np.count_nonzero(close) + + tiled_spikes_j = np.isclose( + spiketrain_j.times.simplified.magnitude[:, np.newaxis], + spiketrain_i.times.simplified.magnitude, + atol=dt.simplified.item()) + tiled_spike_indices = np.any(tiled_spikes_j, axis=1) + tiled_spikes_j = spiketrain_j[tiled_spike_indices] + return len(tiled_spikes_j)/len(spiketrain_j) def run_T(spiketrain: neo.core.SpikeTrain) -> float: """ @@ -970,18 +949,24 @@ def run_T(spiketrain: neo.core.SpikeTrain) -> float: T = time_A / (spiketrain.t_stop - spiketrain.t_start) return T.simplified.item() # enforce simplification, strip units - N1 = len(spiketrain_i) - N2 = len(spiketrain_j) - - if N1 == 0 or N2 == 0: + # def calculate_covered_time(spikes, dt): + # sorted_spikes = np.sort(spikes) + # diff_spikes = np.diff(sorted_spikes) + # valid_durations = diff_spikes[diff_spikes >= 2 * dt] + # covered_time = np.sum(valid_durations) + 2 * dt * len(valid_durations) + # overlap_time = np.sum(diff_spikes[diff_spikes <= 2 * dt]) - len( + # valid_durations) * (2 * dt) + # total_time = np.sum(diff_spikes) + 2 * dt * len(diff_spikes) + # return covered_time, overlap_time, total_time + + if len(spiketrain_i) == 0 or len(spiketrain_j) == 0: index = np.nan else: - TA = run_T(spiketrain_i) - TB = run_T(spiketrain_j) - PA = run_P(spiketrain_i, spiketrain_j, dt) - PA = PA / N1 - PB = run_P(spiketrain_j, spiketrain_i, dt) - PB = PB / N2 + TA = run_T(spiketrain_j) + TB = run_T(spiketrain_i) + PA = run_P(spiketrain_j, spiketrain_i, dt) + PB = run_P(spiketrain_i, spiketrain_j, dt) + # check if the P and T values are 1 to avoid division by zero # This only happens for TA = PB = 1 and/or TB = PA = 1, # which leads to 0/0 in the calculation of the index. From 14ede19b17d0184dbf37d1df363b62e08d743590 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Tue, 23 May 2023 12:07:52 +0200 Subject: [PATCH 07/20] refactor run_T --- elephant/spike_train_correlation.py | 77 +++++++++++------------------ 1 file changed, 29 insertions(+), 48 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index fb8f4b32e..4fb6f5d9d 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -893,7 +893,7 @@ def spike_time_tiling_coefficient(spiketrain_i: neo.core.SpikeTrain, 0.4958601655933762 """ - + spiketrain_j def run_P(spiketrain_j: neo.core.SpikeTrain, spiketrain_i: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: @@ -902,7 +902,6 @@ def run_P(spiketrain_j: neo.core.SpikeTrain, any spike from spiketrain_i, divided by the total number of spikes in spiketrain_j """ - tiled_spikes_j = np.isclose( spiketrain_j.times.simplified.magnitude[:, np.newaxis], spiketrain_i.times.simplified.magnitude, @@ -911,59 +910,41 @@ def run_P(spiketrain_j: neo.core.SpikeTrain, tiled_spikes_j = spiketrain_j[tiled_spike_indices] return len(tiled_spikes_j)/len(spiketrain_j) - def run_T(spiketrain: neo.core.SpikeTrain) -> float: + def run_T(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: """ Calculate the proportion of the total recording time 'tiled' by spikes. """ - N = len(spiketrain) - time_A = 2 * N * dt # maximum possible time - - if N == 1: # for only a single spike in the train - - # Check difference between start of recording and single spike - if spiketrain[0] - spiketrain.t_start < dt: - time_A += - dt + spiketrain[0] - spiketrain.t_start - - # Check difference between single spike and end of recording - elif spiketrain[0] + dt > spiketrain.t_stop: - time_A += - dt - spiketrain[0] + spiketrain.t_stop - - else: # if more than a single spike in the train - - # Calculate difference between consecutive spikes - diff = np.diff(spiketrain) - - # Find spikes whose tiles overlap - idx = np.where(diff < 2 * dt)[0] - # Subtract overlapping "2*dt" tiles and add differences instead - time_A += - 2 * dt * len(idx) + diff[idx].sum() - - # Check if spikes are within +/-dt of the start and/or end - # if so, subtract overlap of first and/or last spike - if (spiketrain[0] - spiketrain.t_start) < dt: - time_A += spiketrain[0] - dt - spiketrain.t_start - if (spiketrain.t_stop - spiketrain[N - 1]) < dt: - time_A += - spiketrain[-1] - dt + spiketrain.t_stop - - # Calculate the proportion of total recorded time to "tiled" time - T = time_A / (spiketrain.t_stop - spiketrain.t_start) - return T.simplified.item() # enforce simplification, strip units - - # def calculate_covered_time(spikes, dt): - # sorted_spikes = np.sort(spikes) - # diff_spikes = np.diff(sorted_spikes) - # valid_durations = diff_spikes[diff_spikes >= 2 * dt] - # covered_time = np.sum(valid_durations) + 2 * dt * len(valid_durations) - # overlap_time = np.sum(diff_spikes[diff_spikes <= 2 * dt]) - len( - # valid_durations) * (2 * dt) - # total_time = np.sum(diff_spikes) + 2 * dt * len(diff_spikes) - # return covered_time, overlap_time, total_time + dt = dt.simplified.item() + t_start = spiketrain.t_start.simplified.item() + t_stop = spiketrain.t_stop.simplified.item() + sorted_spikes = np.sort(spiketrain.times.simplified.magnitude) + + diff_spikes = np.diff(sorted_spikes) + overlap_durations = diff_spikes[diff_spikes <= 2 * dt] + covered_time_overlap = np.sum(overlap_durations) + non_overlap_durations = diff_spikes[diff_spikes > 2 * dt] + covered_time_non_overlap = len(non_overlap_durations) * 2 * dt + + # Check if spikes are within +/-dt of the start and/or end + # if so, subtract overlap of first and/or last spike + if (sorted_spikes[0] - t_start) < dt: + covered_time_overlap += sorted_spikes[0] - t_start + else: + covered_time_non_overlap+=dt + if (t_stop - sorted_spikes[- 1]) < dt: + covered_time_overlap += t_stop - sorted_spikes[-1] + else: + covered_time_non_overlap+=dt + + total_time_covered = covered_time_overlap + covered_time_non_overlap + total_time = t_stop - t_start + return total_time_covered / total_time if len(spiketrain_i) == 0 or len(spiketrain_j) == 0: index = np.nan else: - TA = run_T(spiketrain_j) - TB = run_T(spiketrain_i) + TA = run_T(spiketrain_j, dt) + TB = run_T(spiketrain_i, dt) PA = run_P(spiketrain_j, spiketrain_i, dt) PB = run_P(spiketrain_i, spiketrain_j, dt) From 7fb535f8e07a1559e58a87829ed7c2a1244e9d49 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Tue, 23 May 2023 12:31:49 +0200 Subject: [PATCH 08/20] add checks for t_start and t_stop in run_t --- elephant/spike_train_correlation.py | 36 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index 4fb6f5d9d..531c964c7 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -893,8 +893,7 @@ def spike_time_tiling_coefficient(spiketrain_i: neo.core.SpikeTrain, 0.4958601655933762 """ - spiketrain_j - def run_P(spiketrain_j: neo.core.SpikeTrain, + def run_p(spiketrain_j: neo.core.SpikeTrain, spiketrain_i: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: """ @@ -910,7 +909,7 @@ def run_P(spiketrain_j: neo.core.SpikeTrain, tiled_spikes_j = spiketrain_j[tiled_spike_indices] return len(tiled_spikes_j)/len(spiketrain_j) - def run_T(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: + def run_t(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: """ Calculate the proportion of the total recording time 'tiled' by spikes. """ @@ -926,15 +925,15 @@ def run_T(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: covered_time_non_overlap = len(non_overlap_durations) * 2 * dt # Check if spikes are within +/-dt of the start and/or end - # if so, subtract overlap of first and/or last spike - if (sorted_spikes[0] - t_start) < dt: + # if so, add overlap of first and/or last spike + if sorted_spikes[0] - t_start < dt: covered_time_overlap += sorted_spikes[0] - t_start else: - covered_time_non_overlap+=dt - if (t_stop - sorted_spikes[- 1]) < dt: + covered_time_non_overlap += dt + if t_stop - sorted_spikes[- 1] < dt: covered_time_overlap += t_stop - sorted_spikes[-1] else: - covered_time_non_overlap+=dt + covered_time_non_overlap += dt total_time_covered = covered_time_overlap + covered_time_non_overlap total_time = t_stop - t_start @@ -943,10 +942,10 @@ def run_T(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: if len(spiketrain_i) == 0 or len(spiketrain_j) == 0: index = np.nan else: - TA = run_T(spiketrain_j, dt) - TB = run_T(spiketrain_i, dt) - PA = run_P(spiketrain_j, spiketrain_i, dt) - PB = run_P(spiketrain_i, spiketrain_j, dt) + TA = run_t(spiketrain_j, dt) + TB = run_t(spiketrain_i, dt) + PA = run_p(spiketrain_j, spiketrain_i, dt) + PB = run_p(spiketrain_i, spiketrain_j, dt) # check if the P and T values are 1 to avoid division by zero # This only happens for TA = PB = 1 and/or TB = PA = 1, @@ -954,16 +953,15 @@ def run_T(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: # In those cases, every spike in the train with P = 1 # is within dt of a spike in the other train, # so we set the respective (partial) index to 1. - if PA * TB == 1: - if PB * TA == 1: - index = 1. - else: - index = 0.5 + 0.5 * (PB - TA) / (1 - PB * TA) + if PA * TB == 1 and PB * TA == 1: + index = 1. + elif PA * TB == 1: + index = 0.5 + 0.5 * (PB - TA) / (1 - PB * TA) elif PB * TA == 1: index = 0.5 + 0.5 * (PA - TB) / (1 - PA * TB) else: - index = 0.5 * (PA - TB) / (1 - PA * TB) + 0.5 * (PB - TA) / ( - 1 - PB * TA) + index = 0.5 * (PA - TB) / (1 - PA * TB) + \ + 0.5 * (PB - TA) / (1 - PB * TA) return index From dd472e8fc74621b7bc6d8833bc25f76d055f2205 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Tue, 23 May 2023 13:33:59 +0200 Subject: [PATCH 09/20] add input checks and unittests --- elephant/spike_train_correlation.py | 25 ++++++++++----- elephant/test/test_spike_train_correlation.py | 32 ++++++++++++++++--- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index 531c964c7..c1242b208 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -25,7 +25,7 @@ from scipy import integrate from elephant.conversion import BinnedSpikeTrain -from elephant.utils import deprecated_alias +from elephant.utils import deprecated_alias, check_neo_consistency __all__ = [ "covariance", @@ -893,6 +893,15 @@ def spike_time_tiling_coefficient(spiketrain_i: neo.core.SpikeTrain, 0.4958601655933762 """ + # input checks + if dt <= 0 * pq.s: + raise ValueError(f"dt must be > 0, found: {dt}") + + check_neo_consistency([spiketrain_j, spiketrain_i], neo.core.SpikeTrain) + + if dt.units != spiketrain_i.units: + dt = dt.rescale(spiketrain_i.units) + def run_p(spiketrain_j: neo.core.SpikeTrain, spiketrain_i: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: @@ -902,9 +911,9 @@ def run_p(spiketrain_j: neo.core.SpikeTrain, spiketrain_j """ tiled_spikes_j = np.isclose( - spiketrain_j.times.simplified.magnitude[:, np.newaxis], - spiketrain_i.times.simplified.magnitude, - atol=dt.simplified.item()) + spiketrain_j.times.magnitude[:, np.newaxis], + spiketrain_i.times.magnitude, + atol=dt.item()) tiled_spike_indices = np.any(tiled_spikes_j, axis=1) tiled_spikes_j = spiketrain_j[tiled_spike_indices] return len(tiled_spikes_j)/len(spiketrain_j) @@ -913,10 +922,10 @@ def run_t(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: """ Calculate the proportion of the total recording time 'tiled' by spikes. """ - dt = dt.simplified.item() - t_start = spiketrain.t_start.simplified.item() - t_stop = spiketrain.t_stop.simplified.item() - sorted_spikes = np.sort(spiketrain.times.simplified.magnitude) + dt = dt.item() + t_start = spiketrain.t_start.item() + t_stop = spiketrain.t_stop.item() + sorted_spikes = np.sort(spiketrain.times.magnitude) diff_spikes = np.diff(sorted_spikes) overlap_durations = diff_spikes[diff_spikes <= 2 * dt] diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index 5847dda3b..45290e464 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -727,7 +727,25 @@ def setUp(self): self.st_2 = neo.SpikeTrain( self.test_array_1d_2, units='ms', t_stop=50.) - def test_sttc_different_units(self): + def test_sttc_dt_smaller_zero(self): + self.assertRaises(ValueError, sc.sttc, self.st_1, self.st_2, + dt=0 * pq.s) + self.assertRaises(ValueError, sc.sttc, self.st_1, self.st_2, + dt=-1 * pq.ms) + + def test_sttc_different_t_stop(self): + st_1 = neo.SpikeTrain([1], units='ms', t_stop=10.) + st_2 = neo.SpikeTrain([5], units='ms', t_stop=10.) + st_2.t_stop = 1 * pq.ms + self.assertRaises(ValueError, sc.sttc, st_1, st_2) + + def test_sttc_different_t_start(self): + st_1 = neo.SpikeTrain([1], units='ms', t_stop=10.) + st_2 = neo.SpikeTrain([5], units='ms', t_stop=10.) + st_2.t_start = 1 * pq.ms + self.assertRaises(ValueError, sc.sttc, st_1, st_2) + + def test_sttc_different_units_dt(self): # test for result target = 0.495860165593 self.assertAlmostEqual(target, sc.sttc(self.st_1, self.st_2, @@ -737,17 +755,21 @@ def test_sttc_different_units(self): self.assertAlmostEqual(target, sc.sttc(self.st_1, self.st_2, 5.0 * pq.ms)) - def test_sttc_not_enough_spiketrains(self): + def test_sttc_different_units_spiketrains(self): + st1 = neo.SpikeTrain([1], units='ms', t_stop=10.) + st2 = neo.SpikeTrain([5], units='s', t_stop=10.) + self.assertRaises(ValueError, sc.sttc, st1, st2) + def test_sttc_not_enough_spiketrains(self): # test no spiketrains - self.assertTrue(np.isnan(sc.sttc([], []))) + self.assertRaises(TypeError, sc.sttc, [], []) # test one spiketrain - self.assertTrue(np.isnan(sc.sttc(self.st_1, []))) + self.assertRaises(TypeError, sc.sttc, self.st_1, []) def test_sttc_one_spike(self): # test for one spike in a spiketrain - st1 = neo.SpikeTrain([1], units='ms', t_stop=1.) + st1 = neo.SpikeTrain([1], units='ms', t_stop=10.) st2 = neo.SpikeTrain([5], units='ms', t_stop=10.) self.assertEqual(sc.sttc(st1, st2), 1.0) self.assertTrue(bool(sc.sttc(st1, st2, 0.1 * pq.ms) < 0)) From a9880161b63298a858b83d408037c77a4f06ee58 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Tue, 23 May 2023 14:12:01 +0200 Subject: [PATCH 10/20] add regression test for Issue #563 --- elephant/test/test_spike_train_correlation.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index 45290e464..c50658d08 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -787,6 +787,50 @@ def test_sttc_edge_cases(self): self.assertAlmostEqual(target2, sc.sttc(st3, st2, 0.003 * pq.s)) self.assertAlmostEqual(target2, sc.sttc(st2, st3, 0.003 * pq.s)) + def test_sttc_unsorted_spiketimes(self): + # regression test for issue #563 + # https://github.com/NeuralEnsemble/elephant/issues/563 + spiketrain_E7 = neo.SpikeTrain( + [1678., 23786.3, 34641.8, 71520.7, 73606.9, 78383.3, + 97387.9, 144313.4, 4607.6, 19275.1, 152894.2, 44240.1], + units='ms', t_stop=160000 * pq.ms) + + spiketrain_E3 = neo.SpikeTrain( + [1678., 23786.3, 34641.8, 71520.7, 73606.9, 78383.3, + 97387.9, 144313.4, 4607.6, 19275.1, 152894.2, 44240.1], + units='ms', t_stop=160000 * pq.ms) + sttc_unsorted_E7_E3 = sc.sttc(spiketrain_E7, + spiketrain_E3, dt=0.10 * pq.s) + self.assertAlmostEqual(sttc_unsorted_E7_E3, 1) + spiketrain_E7.sort() + spiketrain_E3.sort() + sttc_sorted_E7_E3 = sc.sttc(spiketrain_E7, + spiketrain_E3, dt=0.10 * pq.s) + self.assertAlmostEqual(sttc_unsorted_E7_E3, sttc_sorted_E7_E3) + + spiketrain_E8 = neo.SpikeTrain( + [20646.8, 25875.1, 26154.4, 35121., 55909.7, 79164.8, + 110849.8, 117484.1, 3731.5, 4213.9, 119995.1, 123748.1, + 171016.8, 172989., 185145.2, 12043.5, 185995.9, 186740.1, + 12629.8, 23394.3, 34993.2], units='ms', t_stop=210000 * pq.ms) + + spiketrain_B3 = neo.SpikeTrain( + [10600.7, 19699.6, 22803., 40769.3, 121385.7, 127402.9, + 130829.2, 134363.8, 1193.5, 8012.7, 142037.3, 146628.2, + 165925.3, 168489.3, 175194.3, 10339.8, 178676.4, 180807.2, + 201431.3, 22231.1, 38113.4], units='ms', t_stop=210000 * pq.ms) + + self.assertTrue( + sc.sttc(spiketrain_E8, spiketrain_B3, dt=0.10 * pq.s) < 1) + + sttc_unsorted_E8_B3 = sc.sttc(spiketrain_E8, + spiketrain_B3, dt=0.10 * pq.s) + spiketrain_E8.sort() + spiketrain_B3.sort() + sttc_sorted_E8_B3 = sc.sttc(spiketrain_E8, + spiketrain_B3, dt=0.10 * pq.s) + self.assertAlmostEqual(sttc_unsorted_E8_B3, sttc_sorted_E8_B3) + def test_exist_alias(self): # Test if alias cch still exists. self.assertEqual(sc.spike_time_tiling_coefficient, sc.sttc) From 64c8ad99edb54acf195ee2bf9ee403f374a46405 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Tue, 23 May 2023 19:33:04 +0200 Subject: [PATCH 11/20] adjusted t_stop in regression test to 300 000 ms --- elephant/test/test_spike_train_correlation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index c50658d08..21b6a32fd 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -793,12 +793,12 @@ def test_sttc_unsorted_spiketimes(self): spiketrain_E7 = neo.SpikeTrain( [1678., 23786.3, 34641.8, 71520.7, 73606.9, 78383.3, 97387.9, 144313.4, 4607.6, 19275.1, 152894.2, 44240.1], - units='ms', t_stop=160000 * pq.ms) + units='ms', t_stop=300000 * pq.ms) spiketrain_E3 = neo.SpikeTrain( [1678., 23786.3, 34641.8, 71520.7, 73606.9, 78383.3, 97387.9, 144313.4, 4607.6, 19275.1, 152894.2, 44240.1], - units='ms', t_stop=160000 * pq.ms) + units='ms', t_stop=300000 * pq.ms) sttc_unsorted_E7_E3 = sc.sttc(spiketrain_E7, spiketrain_E3, dt=0.10 * pq.s) self.assertAlmostEqual(sttc_unsorted_E7_E3, 1) @@ -812,13 +812,13 @@ def test_sttc_unsorted_spiketimes(self): [20646.8, 25875.1, 26154.4, 35121., 55909.7, 79164.8, 110849.8, 117484.1, 3731.5, 4213.9, 119995.1, 123748.1, 171016.8, 172989., 185145.2, 12043.5, 185995.9, 186740.1, - 12629.8, 23394.3, 34993.2], units='ms', t_stop=210000 * pq.ms) + 12629.8, 23394.3, 34993.2], units='ms', t_stop=300000 * pq.ms) spiketrain_B3 = neo.SpikeTrain( [10600.7, 19699.6, 22803., 40769.3, 121385.7, 127402.9, 130829.2, 134363.8, 1193.5, 8012.7, 142037.3, 146628.2, 165925.3, 168489.3, 175194.3, 10339.8, 178676.4, 180807.2, - 201431.3, 22231.1, 38113.4], units='ms', t_stop=210000 * pq.ms) + 201431.3, 22231.1, 38113.4], units='ms', t_stop=300000 * pq.ms) self.assertTrue( sc.sttc(spiketrain_E8, spiketrain_B3, dt=0.10 * pq.s) < 1) From 7d69e01aa5b423aacad811c4aba52bc77123fbc6 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Wed, 24 May 2023 16:59:17 +0200 Subject: [PATCH 12/20] add validation tests --- elephant/test/test_spike_train_correlation.py | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index 21b6a32fd..f2942d19f 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -6,18 +6,20 @@ :license: Modified BSD, see LICENSE.txt for details. """ +import math import unittest import neo +from neo.io import NixIO import numpy as np import quantities as pq -from numpy.testing.utils import assert_array_equal, assert_array_almost_equal +from numpy.testing import assert_array_equal, assert_array_almost_equal import elephant.conversion as conv import elephant.spike_train_correlation as sc +from elephant.datasets import download_datasets, ELEPHANT_TMP_DIR from elephant.spike_train_generation import homogeneous_poisson_process, \ homogeneous_gamma_process -import math class CovarianceTestCase(unittest.TestCase): @@ -831,11 +833,39 @@ def test_sttc_unsorted_spiketimes(self): spiketrain_B3, dt=0.10 * pq.s) self.assertAlmostEqual(sttc_unsorted_E8_B3, sttc_sorted_E8_B3) - def test_exist_alias(self): + def test_sttc_validation_test(self): + """This test checks the results of elephants implementation of + the spike time tiling coefficient against the results of the + original c-implementation. + The c-code and the test data is located at + NeuralEnsemble/elephant-data/unittest/spike_train_correlation/ + spike_time_tiling_coefficient""" + + repo_path = r"unittest/spike_train_correlation/spike_time_tiling_coefficient/data" # noqa + + files_to_download = [("spike_time_tiling_coefficient_results.nix ", + "db4e81febc0ca48f1c125891a85c9a7a")] + + for filename, checksum in files_to_download: + download_datasets(repo_path=f"{repo_path}/{filename}", + checksum=checksum) + + reader = NixIO( + ELEPHANT_TMP_DIR / 'spike_time_tiling_coefficient_results.nix', + mode='ro') + test_data_block = reader.read() + + for segment in test_data_block[0].segments: + spiketrain_i = segment.spiketrains[0] + spiketrain_j = segment.spiketrains[1] + dt = segment.annotations['dt'] + sttc_result = segment.annotations['sttc_result'] + self.assertAlmostEqual(sc.sttc(spiketrain_i, spiketrain_j, dt), + sttc_result) + def test_sttc_exist_alias(self): # Test if alias cch still exists. self.assertEqual(sc.spike_time_tiling_coefficient, sc.sttc) - class SpikeTrainTimescaleTestCase(unittest.TestCase): def test_timescale_calculation(self): From cdebceb0388065425b448ee0b2777d3fe9ca2f7a Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Fri, 26 May 2023 10:58:48 +0200 Subject: [PATCH 13/20] add check if spike times are sorted, if not sort the spikes --- elephant/spike_train_correlation.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index c1242b208..32a9aec76 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -925,7 +925,10 @@ def run_t(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: dt = dt.item() t_start = spiketrain.t_start.item() t_stop = spiketrain.t_stop.item() - sorted_spikes = np.sort(spiketrain.times.magnitude) + sorted_spikes = spiketrain.times.magnitude + # Check if spikes are sorted + if (np.diff(sorted_spikes) < 0).any(): + sorted_spikes = np.sort(sorted_spikes) diff_spikes = np.diff(sorted_spikes) overlap_durations = diff_spikes[diff_spikes <= 2 * dt] From 7d2e3dfe4a7422a894496a2e08bacc150b626c4e Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Fri, 27 Oct 2023 16:48:58 +0200 Subject: [PATCH 14/20] add documentation for unit test --- elephant/test/test_spike_train_correlation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index f2942d19f..ce2396fe2 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -749,6 +749,7 @@ def test_sttc_different_t_start(self): def test_sttc_different_units_dt(self): # test for result + # target obtained with pencil and paper according to original paper. target = 0.495860165593 self.assertAlmostEqual(target, sc.sttc(self.st_1, self.st_2, 0.005 * pq.s)) From 5120a4d647c87660bafb31c1d45e831795717b69 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 30 Oct 2023 10:18:05 +0100 Subject: [PATCH 15/20] add comments for run_p and run_t --- elephant/spike_train_correlation.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/elephant/spike_train_correlation.py b/elephant/spike_train_correlation.py index 32a9aec76..da79abdf2 100644 --- a/elephant/spike_train_correlation.py +++ b/elephant/spike_train_correlation.py @@ -910,34 +910,48 @@ def run_p(spiketrain_j: neo.core.SpikeTrain, any spike from spiketrain_i, divided by the total number of spikes in spiketrain_j """ + # Create a boolean array where each element represents whether a spike + # in spiketrain_j lies within +- dt of any spike in spiketrain_i. tiled_spikes_j = np.isclose( spiketrain_j.times.magnitude[:, np.newaxis], spiketrain_i.times.magnitude, atol=dt.item()) + # Determine which spikes in spiketrain_j satisfy the time window + # condition. tiled_spike_indices = np.any(tiled_spikes_j, axis=1) + # Extract the spike times in spiketrain_j that satisfy the condition. tiled_spikes_j = spiketrain_j[tiled_spike_indices] + # Calculate the ratio of matching spikes in j to the total spikes in j. return len(tiled_spikes_j)/len(spiketrain_j) def run_t(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: """ Calculate the proportion of the total recording time 'tiled' by spikes. """ + # Get the numerical value of 'dt'. dt = dt.item() + # Get the start and stop times of the spike train. t_start = spiketrain.t_start.item() t_stop = spiketrain.t_stop.item() + # Get the spike times as a NumPy array. sorted_spikes = spiketrain.times.magnitude - # Check if spikes are sorted + # Check if spikes are sorted and sort them if not. if (np.diff(sorted_spikes) < 0).any(): sorted_spikes = np.sort(sorted_spikes) + # Calculate the time differences between consecutive spikes. diff_spikes = np.diff(sorted_spikes) + # Calculate durations of spike overlaps within a time window of 2 * dt. overlap_durations = diff_spikes[diff_spikes <= 2 * dt] covered_time_overlap = np.sum(overlap_durations) + + # Calculate the durations of non-overlapping spikes. non_overlap_durations = diff_spikes[diff_spikes > 2 * dt] covered_time_non_overlap = len(non_overlap_durations) * 2 * dt - # Check if spikes are within +/-dt of the start and/or end - # if so, add overlap of first and/or last spike + # Check if the first and last spikes are within +/-dt of the start + # and end. + # If so, adjust the overlapping and non-overlapping times accordingly. if sorted_spikes[0] - t_start < dt: covered_time_overlap += sorted_spikes[0] - t_start else: @@ -947,8 +961,12 @@ def run_t(spiketrain: neo.core.SpikeTrain, dt: pq.Quantity = dt) -> float: else: covered_time_non_overlap += dt + # Calculate the total time covered by spikes and the total recording + # time. total_time_covered = covered_time_overlap + covered_time_non_overlap total_time = t_stop - t_start + # Calculate and return the proportion of the total recording time + # covered by spikes. return total_time_covered / total_time if len(spiketrain_i) == 0 or len(spiketrain_j) == 0: From 0da0988c81888476757366f82edb719dd67866c8 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 30 Oct 2023 10:19:11 +0100 Subject: [PATCH 16/20] fix pep8 issues --- elephant/test/test_spike_train_correlation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index ce2396fe2..c4c4b9a72 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -863,10 +863,12 @@ def test_sttc_validation_test(self): sttc_result = segment.annotations['sttc_result'] self.assertAlmostEqual(sc.sttc(spiketrain_i, spiketrain_j, dt), sttc_result) + def test_sttc_exist_alias(self): # Test if alias cch still exists. self.assertEqual(sc.spike_time_tiling_coefficient, sc.sttc) + class SpikeTrainTimescaleTestCase(unittest.TestCase): def test_timescale_calculation(self): From 32e6d78585c35666ad246f14d3ff5409db3a8797 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 30 Oct 2023 10:21:23 +0100 Subject: [PATCH 17/20] fix pep8 --- elephant/test/test_spike_train_correlation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index c4c4b9a72..71121e0aa 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -842,7 +842,7 @@ def test_sttc_validation_test(self): NeuralEnsemble/elephant-data/unittest/spike_train_correlation/ spike_time_tiling_coefficient""" - repo_path = r"unittest/spike_train_correlation/spike_time_tiling_coefficient/data" # noqa + repo_path = r"unittest/spike_train_correlation/spike_time_tiling_coefficient/data" # noqa files_to_download = [("spike_time_tiling_coefficient_results.nix ", "db4e81febc0ca48f1c125891a85c9a7a")] From cd8c805fd7be362a115e12c92671c75733cd743f Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 30 Oct 2023 10:25:49 +0100 Subject: [PATCH 18/20] update checksum for new results file --- elephant/test/test_spike_train_correlation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index 71121e0aa..f94553f01 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -845,7 +845,7 @@ def test_sttc_validation_test(self): repo_path = r"unittest/spike_train_correlation/spike_time_tiling_coefficient/data" # noqa files_to_download = [("spike_time_tiling_coefficient_results.nix ", - "db4e81febc0ca48f1c125891a85c9a7a")] + "e3749d79046622494660a03e89950f51")] for filename, checksum in files_to_download: download_datasets(repo_path=f"{repo_path}/{filename}", From 3cd118a78b7f92d15382616edbc97064eef71d03 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 30 Oct 2023 18:01:01 +0100 Subject: [PATCH 19/20] change path to elephant temp --- elephant/test/test_spike_train_correlation.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index f94553f01..7aa3e6a99 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -848,12 +848,10 @@ def test_sttc_validation_test(self): "e3749d79046622494660a03e89950f51")] for filename, checksum in files_to_download: - download_datasets(repo_path=f"{repo_path}/{filename}", - checksum=checksum) + filepath = download_datasets(repo_path=f"{repo_path}/{filename}", + checksum=checksum) - reader = NixIO( - ELEPHANT_TMP_DIR / 'spike_time_tiling_coefficient_results.nix', - mode='ro') + reader = NixIO(filepath, mode='ro') test_data_block = reader.read() for segment in test_data_block[0].segments: From 2488b6635de635e4fb8ad0a1ecc2d8b01f254c83 Mon Sep 17 00:00:00 2001 From: Moritz-Alexander-Kern Date: Mon, 30 Oct 2023 18:05:46 +0100 Subject: [PATCH 20/20] remove space from filename --- elephant/test/test_spike_train_correlation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elephant/test/test_spike_train_correlation.py b/elephant/test/test_spike_train_correlation.py index 7aa3e6a99..ff4d088ca 100644 --- a/elephant/test/test_spike_train_correlation.py +++ b/elephant/test/test_spike_train_correlation.py @@ -844,7 +844,7 @@ def test_sttc_validation_test(self): repo_path = r"unittest/spike_train_correlation/spike_time_tiling_coefficient/data" # noqa - files_to_download = [("spike_time_tiling_coefficient_results.nix ", + files_to_download = [("spike_time_tiling_coefficient_results.nix", "e3749d79046622494660a03e89950f51")] for filename, checksum in files_to_download: