-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPendingNotebookCode.py
More file actions
1914 lines (1419 loc) · 112 KB
/
PendingNotebookCode.py
File metadata and controls
1914 lines (1419 loc) · 112 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
## This file serves as overflow from active Jupyter-lab notebooks, to eventually be refactored.
from copy import deepcopy
from pathlib import Path
from typing import List, Optional, Dict, Tuple
from neuropy.core.epoch import ensure_dataframe
import numpy as np
import pandas as pd
from matplotlib.colors import ListedColormap
import pyvista as pv
import pyvistaqt as pvqt # conda install -c conda-forge pyvistaqt
from pyphocorehelpers.programming_helpers import metadata_attributes
from pyphocorehelpers.function_helpers import function_attributes
# from pyphoplacecellanalysis.PhoPositionalData.analysis.interactive_placeCell_config import print_subsession_neuron_differences
from collections import Counter # debug_detect_repeated_values
import scipy # pho_compute_rank_order
from pyphoplacecellanalysis.General.Pipeline.Stages.ComputationFunctions.MultiContextComputationFunctions.RankOrderComputations import RankOrderAnalyses # for _compute_single_rank_order_shuffle
# ==================================================================================================================== #
# 2024-01-19 - Marginals #
# ==================================================================================================================== #
from pyphoplacecellanalysis.Analysis.Decoder.reconstruction import DecodedFilterEpochsResult
class CurrTesting:
# Pre-2023-21 ________________________________________________________________________________________________________ #
def _plot_directional_likelihoods_df(directional_likelihoods_df):
""" 2023-12-21 - Now
"""
df = deepcopy(directional_likelihoods_df)
fig = plt.figure(num='directional_likelihoods_df Matplotlib figure')
plt.plot(df.index, df["long_relative_direction_likelihoods"], label="Long Direction")
plt.plot(df.index, df["short_relative_direction_likelihoods"], label="Short Direction")
for i, idx in enumerate(df["long_best_direction_indices"]):
if idx == 0:
plt.annotate("↑", (df.index[i], df["long_relative_direction_likelihoods"][i]), textcoords="offset points", xytext=(0, 10))
elif idx == 1:
plt.annotate("↓", (df.index[i], df["long_relative_direction_likelihoods"][i]), textcoords="offset points", xytext=(0, -10))
plt.xlabel("Index")
plt.ylabel("Likelihood")
plt.legend()
plt.show()
def pho_compute_rank_order(track_templates, curr_epoch_spikes_df: pd.DataFrame, rank_method="average", stats_nan_policy='omit') -> Dict[str, Tuple]:
""" 2023-12-20 - Actually working spearman rank-ordering!!
Usage:
curr_epoch_spikes_df = deepcopy(active_plotter.get_active_epoch_spikes_df())[['t_rel_seconds', 'aclu', 'shank', 'cluster', 'qclu', 'maze_id', 'flat_spike_idx', 'Probe_Epoch_id']]
curr_epoch_spikes_df["spike_rank"] = curr_epoch_spikes_df["t_rel_seconds"].rank(method="average")
# Sort by column: 'aclu' (ascending)
curr_epoch_spikes_df = curr_epoch_spikes_df.sort_values(['aclu'])
curr_epoch_spikes_df
"""
curr_epoch_spikes_df["spike_rank"] = curr_epoch_spikes_df["t_rel_seconds"].rank(method=rank_method)
# curr_epoch_spikes_df = curr_epoch_spikes_df.sort_values(['aclu'], inplace=False) # Sort by column: 'aclu' (ascending)
n_spikes = np.shape(curr_epoch_spikes_df)[0]
curr_epoch_spikes_aclus = deepcopy(curr_epoch_spikes_df.aclu.to_numpy())
curr_epoch_spikes_aclu_ranks = deepcopy(curr_epoch_spikes_df.spike_rank.to_numpy())
# curr_epoch_spikes_aclu_rank_map = dict(zip(curr_epoch_spikes_aclus, curr_epoch_spikes_aclu_ranks)) # could build a map equiv to template versions
n_unique_aclus = np.shape(curr_epoch_spikes_df.aclu.unique())[0]
assert n_spikes == n_unique_aclus, f"there is more than one spike in curr_epoch_spikes_df for an aclu! n_spikes: {n_spikes}, n_unique_aclus: {n_unique_aclus}"
# decoder_LR_pf_peak_ranks_list = [scipy.stats.rankdata(a_decoder.pf.ratemap.peak_tuning_curve_center_of_masses, method='dense') for a_decoder in (long_LR_decoder, short_LR_decoder)]
# decoder_RL_pf_peak_ranks_list = [scipy.stats.rankdata(a_decoder.pf.ratemap.peak_tuning_curve_center_of_masses, method='dense') for a_decoder in (long_RL_decoder, short_RL_decoder)]
# rank_method: str = "dense"
# rank_method: str = "average"
track_templates.rank_method = rank_method
# decoder_rank_dict = {a_decoder_name:scipy.stats.rankdata(a_decoder.pf.ratemap.peak_tuning_curve_center_of_masses, method=rank_method) for a_decoder_name, a_decoder in track_templates.get_decoders_dict().items()}
# decoder_aclu_peak_rank_dict_dict = {a_decoder_name:dict(zip(a_decoder.pf.ratemap.neuron_ids, scipy.stats.rankdata(a_decoder.pf.ratemap.peak_tuning_curve_center_of_masses, method=rank_method))) for a_decoder_name, a_decoder in track_templates.get_decoders_dict().items()}
# decoder_aclu_peak_rank_dict_dict = {a_decoder_name:dict(zip(a_decoder.pf.ratemap.neuron_ids, scipy.stats.rankdata(a_decoder.pf.ratemap.peak_tuning_curve_center_of_masses, method='dense'))) for a_decoder_name, a_decoder in track_templates.decoder_peak_rank_list_dict.items()}
# decoder_rank_dict = track_templates.decoder_peak_rank_list_dict
decoder_aclu_peak_rank_dict_dict = track_templates.decoder_aclu_peak_rank_dict_dict
template_spearman_real_results = {}
for a_decoder_name, a_decoder_aclu_peak_rank_dict in decoder_aclu_peak_rank_dict_dict.items():
# template_corresponding_aclu_rank_list: the list of template ranks for each aclu present in the `curr_epoch_spikes_aclus`
template_corresponding_aclu_rank_list = np.array([a_decoder_aclu_peak_rank_dict.get(key, np.nan) for key in curr_epoch_spikes_aclus]) # if key in decoder_aclu_peak_rank_dict_dict['long_LR']
# curr_epoch_spikes_aclu_rank_list = np.array([curr_epoch_spikes_aclu_rank_map.get(key, np.nan) for key in curr_epoch_spikes_aclus])
curr_epoch_spikes_aclu_rank_list = curr_epoch_spikes_aclu_ranks
n_missing_aclus = np.isnan(template_corresponding_aclu_rank_list).sum()
real_long_rank_stats = scipy.stats.spearmanr(curr_epoch_spikes_aclu_rank_list, template_corresponding_aclu_rank_list, nan_policy=stats_nan_policy)
print(f'real_long_rank_stats: {real_long_rank_stats}')
# _alt_real_long_rank_stats = CurrTesting.calculate_spearman_rank_correlation(curr_epoch_spikes_aclu_rank_list, template_corresponding_aclu_rank_list, rank_method=rank_method)
# print(f'_alt_real_long_rank_stats: {_alt_real_long_rank_stats}')
# print(f"Spearman rank correlation coefficient: {correlation}")
template_spearman_real_results[a_decoder_name] = (*real_long_rank_stats, n_missing_aclus)
return template_spearman_real_results
def debug_detect_repeated_values(data, exceeding_count:int=1):
"""
Identify and return a map of all repeated values in a list-like or NumPy array.
Args:
data: Any list-like or NumPy array.
min_repeat: Max number of times a value can be used before it is included (default: 1).
Returns:
A dictionary mapping each repeated value to its count.
"""
if isinstance(data, np.ndarray):
data = data.flatten()
return {key: value for key, value in Counter(data).items() if value > exceeding_count}
# ==================================================================================================================== #
# OLD #
# ==================================================================================================================== #
## Laps Stuff:
should_force_recompute_placefields = True
should_display_2D_plots = True
_debug_print = False
# ==================================================================================================================== #
# 2023-10-31 - Debug Plotting for Directional Placefield Templates #
# ==================================================================================================================== #
# from pyphoplacecellanalysis.SpecificResults.PhoDiba2023Paper import build_shared_sorted_neuronIDs
# from pyphoplacecellanalysis.Pho2D.matplotlib.visualize_heatmap import visualize_heatmap_pyqtgraph
# ratemap = long_pf1D.ratemap
# included_unit_neuron_IDs = EITHER_subset.track_exclusive_aclus
# rediculous_final_sorted_all_included_neuron_ID, rediculous_final_sorted_all_included_pfmap = build_shared_sorted_neuronIDs(ratemap, included_unit_neuron_IDs, sort_ind=new_all_aclus_sort_indicies.copy())
# heatmap_pf1D_win, heatmap_pf1D_img = visualize_heatmap_pyqtgraph(rediculous_final_sorted_all_included_pfmap, show_yticks=False, title=f"pf1D Sorted Visualization", defer_show=True)
# active_curves_sorted = long_pf1D.ratemap.normalized_tuning_curves[is_included][included_new_all_aclus_sort_indicies]
# heatmap_pf1D_win, heatmap_pf1D_img = visualize_heatmap_pyqtgraph(active_curves_sorted, show_yticks=False, title=f"pf1D Sorted Visualization", defer_show=True)
# _out = visualize_heatmap_pyqtgraph(np.vstack([odd_shuffle_helper.long_pf_peak_ranks, odd_shuffle_helper.short_pf_peak_ranks, even_shuffle_helper.long_pf_peak_ranks, even_shuffle_helper.short_pf_peak_ranks]), show_value_labels=True, show_xticks=True, show_yticks=True, show_colorbar=False)
from scipy import stats # _recover_samples_per_sec_from_laps_df
def _recover_samples_per_sec_from_laps_df(global_laps_df, time_start_column_name='start_t_rel_seconds', time_stop_column_name='end_t_rel_seconds',
extra_indexed_column_start_column_name='start_position_index', extra_indexed_column_stop_column_name='end_position_index') -> float:
""" Recovers the index/Denoting with λ(Θ) the probability that a neuron be active in a given room, the null hypothesis was therefore that all neurons could be assigned the same value λ (which would depend on Θ)time relation for the specified index columns by computing both the time duration and the number of indicies spanned by a given epoch.
returns the `mode_samples_per_sec` corresponding to that column.
ASSUMES REGULAR SAMPLEING!
Usage:
global_laps_df = global_laps.to_dataframe()
position_mode_samples_per_sec = _recover_samples_per_sec_from_laps_df(global_laps_df, time_start_column_name='start_t_rel_seconds', time_stop_column_name='end_t_rel_seconds',
extra_indexed_column_start_column_name='start_position_index', extra_indexed_column_stop_column_name='end_position_index')
position_mode_samples_per_sec # 29.956350269267112
"""
duration_sec = global_laps_df[time_stop_column_name] - global_laps_df[time_start_column_name]
num_position_samples = global_laps_df[extra_indexed_column_stop_column_name] - global_laps_df[extra_indexed_column_start_column_name]
samples_per_sec = (num_position_samples/duration_sec).to_numpy()
mode_samples_per_sec = stats.mode(samples_per_sec)[0] # take the mode of all the epochs
return mode_samples_per_sec
# ==================================================================================================================== #
# 2023-10-26 - Directional Placefields to generate four templates #
# ==================================================================================================================== #
# from pyphoplacecellanalysis.General.Pipeline.Stages.ComputationFunctions.MultiContextComputationFunctions.DirectionalPlacefieldGlobalComputationFunctions import DirectionalLapsHelpers
# ==================================================================================================================== #
# 2023-10-19 Weighted Correlation #
# ==================================================================================================================== #
from neuropy.core import Epoch
from pyphoplacecellanalysis.Analysis.Decoder.reconstruction import DecodedFilterEpochsResult
@function_attributes(short_name=None, tags=['weighted_correlation', 'decoder', 'epoch', 'obsolite'], input_requires=[], output_provides=[], uses=['WeightedCorr'], used_by=['add_weighted_correlation_result'], creation_date='2023-10-19 07:54', related_items=[])
def compute_epoch_weighted_correlation(xbin_centers, curr_time_bins, curr_long_epoch_p_x_given_n, method='spearman') -> List[float]:
""" computes the weighted_correlation for the epoch given the decoded posterior
# FLATTEN for WeightedCorr calculation, filling as appropriate:
X, Y are vectors
W is a matrix containing the posteriors
"""
from neuropy.utils.external.WeightedCorr import WeightedCorr
n_xbins = len(xbin_centers)
curr_n_time_bins = len(curr_time_bins)
curr_flat_length = int(float(curr_n_time_bins) * float(n_xbins))
X = np.repeat(curr_time_bins, n_xbins)
Y = np.tile(xbin_centers, curr_n_time_bins)
assert np.shape(X) == np.shape(Y)
W = np.reshape(curr_long_epoch_p_x_given_n, newshape=curr_flat_length, order='F') # order='F' means take the first axis (xbins) as changing the fastest
assert np.allclose(curr_long_epoch_p_x_given_n[:,1], W[n_xbins:n_xbins+n_xbins]) # compare the lienarlly-index second timestamp with the 2D-indexed version to ensure flattening was done correctly.
data_df = pd.DataFrame({'x':X, 'y':Y, 'w':W})
weighted_corr_results = []
if isinstance(method, str):
# wrap in single element list:
method = (method, )
a_weighted_corr_obj = WeightedCorr(xyw=data_df[['x', 'y', 'w']])
for a_method in method:
weighted_corr_results.append(a_weighted_corr_obj(method=a_method)) # append the scalar
return weighted_corr_results
@function_attributes(short_name=None, tags=['weighted_correlation', 'decoder', 'obsolite'], input_requires=[], output_provides=[], uses=['compute_epoch_weighted_correlation'], used_by=[], creation_date='2023-10-19 07:54', related_items=[])
def add_weighted_correlation_result(xbin_centers, a_long_decoder_result: DecodedFilterEpochsResult, a_short_decoder_result: DecodedFilterEpochsResult, method=('pearson', 'spearman'), debug_print = False):
""" builds the weighted correlation for each epoch respective to the posteriors decoded by each decoder (long/short) """
epoch_long_weighted_corr_results = []
epoch_short_weighted_corr_results = []
for decoded_epoch_idx in np.arange(a_long_decoder_result.num_filter_epochs):
# decoded_epoch_idx:int = 0
curr_epoch_time_bin_container = a_long_decoder_result.time_bin_containers[decoded_epoch_idx]
curr_time_bins = curr_epoch_time_bin_container.centers
curr_n_time_bins = len(curr_time_bins)
if debug_print:
print(f'curr_n_time_bins: {curr_n_time_bins}')
## Long Decoding:
curr_long_epoch_p_x_given_n = a_long_decoder_result.p_x_given_n_list[decoded_epoch_idx] # .shape: (239, 5) - (n_x_bins, n_epoch_time_bins)
print(f'np.shape(curr_long_epoch_p_x_given_n): {np.shape(curr_long_epoch_p_x_given_n)}')
weighted_corr_result = compute_epoch_weighted_correlation(xbin_centers, curr_time_bins, curr_long_epoch_p_x_given_n, method=method)
epoch_long_weighted_corr_results.append(weighted_corr_result)
## Short Decoding:
curr_short_epoch_p_x_given_n = a_short_decoder_result.p_x_given_n_list[decoded_epoch_idx] # .shape: (239, 5) - (n_x_bins, n_epoch_time_bins)
weighted_corr_result = compute_epoch_weighted_correlation(xbin_centers, curr_time_bins, curr_short_epoch_p_x_given_n, method=method)
epoch_short_weighted_corr_results.append(weighted_corr_result)
# ## Build separate result dataframe:
# epoch_weighted_corr_results_df = pd.DataFrame({'weighted_corr_LONG': np.array(epoch_long_weighted_corr_results), 'weighted_corr_SHORT': np.array(epoch_short_weighted_corr_results)})
# epoch_weighted_corr_results_df
return np.array(epoch_long_weighted_corr_results), np.array(epoch_short_weighted_corr_results)
# ==================================================================================================================== #
# 2023-10-11 #
# ==================================================================================================================== #
from pyphoplacecellanalysis.SpecificResults.AcrossSessionResults import AcrossSessionTables
def build_and_merge_all_sessions_joined_neruon_fri_df(global_data_root_parent_path, BATCH_DATE_TO_USE, included_session_contexts):
""" captures a lot of stuff still, don't remember what.
Usage:
# BATCH_DATE_TO_USE = '2023-10-05_NewParameters'
BATCH_DATE_TO_USE = '2023-10-07'
from PendingNotebookCode import build_and_merge_all_sessions_joined_neruon_fri_df
all_sessions_joined_neruon_fri_df, out_path = build_and_merge_all_sessions_joined_neruon_fri_df(global_data_root_parent_path, BATCH_DATE_TO_USE)
TODO: seems like it should probably go into AcrossSessionResults or AcrossSessionTables
"""
# Rootfolder mode:
# joined_neruon_fri_df_file_paths = [global_data_root_parent_path.joinpath(f'{BATCH_DATE_TO_USE}_{a_ctxt.get_description(separator="-", include_property_names=False)}_joined_neruon_fri_df.pkl') for a_ctxt in included_session_contexts]
# Subfolder mode: PosixPath('/nfs/turbo/umms-kdiba/Data/2024-09-02/kdiba-gor01-one-2006-6-08_14-26-15_joined_neruon_fri_df.pkl')
# joined_neruon_fri_df_file_paths = [global_data_root_parent_path.joinpath(BATCH_DATE_TO_USE, f'{a_ctxt.get_description(separator="-", include_property_names=False)}_joined_neruon_fri_df.pkl') for a_ctxt in included_session_contexts]
# Both mode:
joined_neruon_fri_df_file_paths = [global_data_root_parent_path.joinpath(BATCH_DATE_TO_USE, f'{BATCH_DATE_TO_USE}_{a_ctxt.get_description(separator="-", include_property_names=False)}_joined_neruon_fri_df.pkl') for a_ctxt in included_session_contexts]
joined_neruon_fri_df_file_paths = [a_path for a_path in joined_neruon_fri_df_file_paths if a_path.exists()] # only get the paths that exist
data_frames = [AcrossSessionTables.load_table_from_file(global_data_root_parent_path=a_path.parent, output_filename=a_path.name) for a_path in joined_neruon_fri_df_file_paths]
# data_frames = [df for df in data_frames if df is not None] # remove empty results
print(f'joined_neruon_fri_df: concatenating dataframes from {len(data_frames)}')
all_sessions_joined_neruon_fri_df = pd.concat(data_frames, ignore_index=True)
## Finally save out the combined result:
all_sessions_joined_neruon_fri_df_basename = f'{BATCH_DATE_TO_USE}_MERGED_joined_neruon_fri_df'
out_path = global_data_root_parent_path.joinpath(all_sessions_joined_neruon_fri_df_basename).resolve()
AcrossSessionTables.write_table_to_files(all_sessions_joined_neruon_fri_df, global_data_root_parent_path=global_data_root_parent_path, output_basename=all_sessions_joined_neruon_fri_df_basename)
print(f'>>\t done with {out_path}')
return all_sessions_joined_neruon_fri_df, out_path
# 2023-07-13 - Helpers for future swapping of the x and y axis on many of the plots like Kamran suggested.
def _swap_x_and_y_axis(x_frs, y_frs, should_swap_axis:bool=True):
""" swaps the order of the arguments depending on the value of `should_swap_axis`. Can be used to reverse the x and y axes for a plot."""
if not should_swap_axis:
return (x_frs, y_frs) # return in same order
else:
return (y_frs, x_frs)
# 2023-07-05 - MiracleWrapper Idea
"""
# Wish it was easier to get things in and out of functions.
def a_fn(...):
pf1d_compare_graphics, (example_epoch_rasters_L, example_epoch_rasters_S), example_stacked_epoch_graphics = a_computation_fn(...)
miracle_wrapper = MiracleWrapper(pf1d_compare_graphics, (example_epoch_rasters_L, example_epoch_rasters_S), example_stacked_epoch_graphics)
miracle_wrapper.add(pf1d_compare_graphics, (example_epoch_rasters_L, example_epoch_rasters_S), example_stacked_epoch_graphics)
return miracle_wrapper
miracle_wrapper = a_fn(...)
# Ideally, you could get them out "by magic" by specifying the same name that they were put in with on the LHS of an assignment eqn:
pf1d_compare_graphics, (example_epoch_rasters_L, example_epoch_rasters_S), example_stacked_epoch_graphics = miracle_wrapper.magic_unwrap()
"""
# ==================================================================================================================== #
# 2023-05-16 - Manual Post-hoc Conformance for Laps and Long/Short Bins #
# ==================================================================================================================== #
def _update_computation_configs_with_laps_and_shared_grid_bins(curr_active_pipeline, enable_interactive_bounds_selection:bool = False):
""" 2023-05-16 - A post-hoc version of updating the computation configs and recomputing with the laps as the computation_epochs and the shared bins as the grid_bin_bounds.
In the future shouldn't need this, as I updated the KDiba default active computation configs to determine these properties prior to computation by default.
"""
from neuropy.analyses.placefields import PlacefieldComputationParameters
# curr_active_pipeline.computation_results['maze1'].computation_config.pf_params.grid_bin = refined_grid_bin_bounds
long_epoch_name, short_epoch_name, global_epoch_name = curr_active_pipeline.find_LongShortGlobal_epoch_names()
long_session, short_session, global_session = [curr_active_pipeline.filtered_sessions[an_epoch_name] for an_epoch_name in [long_epoch_name, short_epoch_name, global_epoch_name]]
long_results, short_results, global_results = [curr_active_pipeline.computation_results[an_epoch_name]['computed_data'] for an_epoch_name in [long_epoch_name, short_epoch_name, global_epoch_name]]
active_computation_configs_dict = {'default': curr_active_pipeline.computation_results[global_epoch_name].computation_config} # get the old pf_params from global
## Duplicate the default computation config to modify it:
temp_comp_params = deepcopy(active_computation_configs_dict['default'])
# Determine the grid_bin_bounds from the long session:
grid_bin_bounding_session = long_session
grid_bin_bounds = PlacefieldComputationParameters.compute_grid_bin_bounds(grid_bin_bounding_session.position.x, grid_bin_bounding_session.position.y)
grid_bin_bounds # ((22.736279243974774, 261.696733348342), (125.5644705153173, 151.21507349463707))
if enable_interactive_bounds_selection:
# Interactive grid_bin_bounds selector (optional):
from neuropy.utils.matplotlib_helpers import add_rectangular_selector
# Show an interactive rectangular selection for the occupancy:
fig, ax = curr_active_pipeline.computation_results['maze'].computed_data.pf2D.plot_occupancy()
rect_selector, set_extents = add_rectangular_selector(fig, ax, initial_selection=grid_bin_bounds) # (24.82, 257.88), (125.52, 149.19)
# TODO: allow the user to customize selection (block) before continuing
# refined_grid_bin_bounds
# final_grid_bin_bounds = refined_grid_bin_bounds # TODO 2023-05-16 - implement
else:
# no interactive selection/refinement:
final_grid_bin_bounds = grid_bin_bounds
# refined_grid_bin_bounds = ((24.12, 259.80), (130.00, 150.09))
# temp_comp_params = PlacefieldComputationParameters(speed_thresh=4)
# temp_comp_params.pf_params.speed_thresh = 10 # 4.0 cm/sec
# temp_comp_params.pf_params.grid_bin = (2, 2) # (2cm x 2cm)
temp_comp_params.pf_params.grid_bin = (1.5, 1.5) # (1.5cm x 1.5cm)
temp_comp_params.pf_params.grid_bin_bounds = final_grid_bin_bounds # same bounds for all
# temp_comp_params.pf_params.smooth = (0.0, 0.0) # No smoothing
# temp_comp_params.pf_params.frate_thresh = 1 # Minimum for non-smoothed peak is 1Hz
temp_comp_params.pf_params.computation_epochs = global_session.laps.as_epoch_obj().get_non_overlapping().filtered_by_duration(1.0, 30.0) # laps specifically for use in the placefields with non-overlapping, duration, constraints: the lap must be at least 1 second long and at most 30 seconds long
# Add it to the array of computation configs:
# active_session_computation_configs.append(temp_comp_params)
active_computation_configs_dict['custom'] = temp_comp_params
# active_computation_configs_dict
# Compute with the new computation config:
computation_functions_name_includelist=['_perform_baseline_placefield_computation', '_perform_time_dependent_placefield_computation', '_perform_extended_statistics_computation',
'_perform_position_decoding_computation',
'_perform_firing_rate_trends_computation',
'_perform_pf_find_ratemap_peaks_computation',
'_perform_time_dependent_pf_sequential_surprise_computation'
'_perform_two_step_position_decoding_computation',
# '_perform_recursive_latent_placefield_decoding'
] # '_perform_pf_find_ratemap_peaks_peak_prominence2d_computation'
# computation_functions_name_includelist=['_perform_baseline_placefield_computation']
curr_active_pipeline.perform_computations(computation_functions_name_includelist=computation_functions_name_includelist, computation_functions_name_excludelist=None, fail_on_exception=True, debug_print=False, overwrite_extant_results=True) #, overwrite_extant_results=False ], fail_on_exception=True, debug_print=False)
return curr_active_pipeline
# ==================================================================================================================== #
# 2023-05-08 - Paginated Plots #
# ==================================================================================================================== #
# from PendingNotebookCode import PaginationController
# from pyphocorehelpers.DataStructure.general_parameter_containers import VisualizationParameters, RenderPlotsData, RenderPlots
# from pyphocorehelpers.gui.PhoUIContainer import PhoUIContainer
# From plot_paginated_decoded_epoch_slices
# ==================================================================================================================== #
# 2023-05-02 - Factor out Paginator and plotting stuff #
# ==================================================================================================================== #
# from pyphocorehelpers.plotting.figure_management import PhoActiveFigureManager2D
# ==================================================================================================================== #
# 2023-05-02 - Factor out interactive matplotlib/pyqtgraph helper code (untested) #
# ==================================================================================================================== #
import matplotlib
from attrs import define, field, Factory
@define(slots=True, eq=False) #eq=False enables hashing by object identity
class SelectionManager:
""" Takes a list of matplotlib Axes that can have their selection toggled/un-toggled for inclusion/exclusion.
Adds the ability to toggle selections for each axis by clicking, and a grey background for selected objects vs. white for unselected.
Usage:
from pyphoplacecellanalysis.General.Pipeline.Stages.DisplayFunctions.DecoderPredictionError import plot_decoded_epoch_slices
laps_plot_tuple = plot_decoded_epoch_slices(long_results_obj.active_filter_epochs, long_results_obj.all_included_filter_epochs_decoder_result, global_pos_df=global_session.position.df, variable_name='lin_pos', xbin=long_results_obj.original_1D_decoder.xbin,
name='stacked_epoch_slices_long_results_obj', debug_print=False, debug_test_max_num_slices=32)
curr_viz_params, _curr_plot_data, _curr_plots, _curr_ui_container = laps_plot_tuple
# Create a SelectionManager instance
sm = SelectionManager(_curr_plots.axs)
"""
axes: list = field(default=Factory(list))
is_selected: dict = field(default=Factory(dict))
fig: Optional[matplotlib.figure.Figure] = field(default=None) # Matplotlib.Figure
cid: Optional[int] = field(default=None) # Matplotlib.Figure
def __attrs_post_init__(self):
# Get figure from first axes:
assert len(self.axes) > 0
first_ax = self.axes[0]
self.fig = first_ax.get_figure()
self.cid = self.fig.canvas.mpl_connect('button_press_event', self.on_click)
# Set initial selection to False
for ax in self.axes:
self.is_selected[ax] = False
def on_click(self, event):
# Get the clicked Axes object
ax = event.inaxes
# Toggle the selection status of the clicked Axes
self.is_selected[ax] = not self.is_selected[ax]
# Set the face color of the clicked Axes based on its selection status
if self.is_selected[ax]:
ax.patch.set_facecolor('gray')
else:
ax.patch.set_facecolor('white')
# Redraw the figure to show the updated selection
event.canvas.draw()
@define(slots=True, eq=False) #eq=False enables hashing by object identity
class PaginatedSelectionManager:
""" Takes a list of matplotlib Axes that can have their selection toggled/un-toggled for inclusion/exclusion.
Adds the ability to toggle selections for each axis by clicking, and a grey background for selected objects vs. white for unselected.
Usage:
from pyphoplacecellanalysis.General.Pipeline.Stages.DisplayFunctions.DecoderPredictionError import plot_decoded_epoch_slices
laps_plot_tuple = plot_decoded_epoch_slices(long_results_obj.active_filter_epochs, long_results_obj.all_included_filter_epochs_decoder_result, global_pos_df=global_session.position.df, variable_name='lin_pos', xbin=long_results_obj.original_1D_decoder.xbin,
name='stacked_epoch_slices_long_results_obj', debug_print=False, debug_test_max_num_slices=32)
curr_viz_params, _curr_plot_data, _curr_plots, _curr_ui_container = laps_plot_tuple
# Create a SelectionManager instance
sel_man = PaginatedSelectionManager(axes=self.plots.axs, fig=self.plots.fig)
def on_page_change(updated_page_idx):
# print(f'on_page_change(updated_page_idx: {updated_page_idx})')
# Update on page change:
sel_man.perform_update()
ui.mw.ui.paginator_controller_widget.jump_to_page.connect(on_page_change)
"""
axes: list = field(default=Factory(list))
is_selected: dict = field(default=Factory(dict))
fig: Optional[matplotlib.figure.Figure] = field(default=None) # Matplotlib.Figure
callback_id: Optional[int] = field(default=None) # Matplotlib.Figure
def __attrs_post_init__(self):
# Get figure from first axes:
assert len(self.axes) > 0
first_ax = self.axes[0]
self.fig = first_ax.get_figure()
self.callback_id = self.fig.canvas.mpl_connect('button_press_event', self.on_click)
# Set initial selection to False
# for ax in self.axes:
# self.is_selected[ax] = False
def perform_update(self):
""" called to update the selection when the page is changed or something else happens. """
current_page_idx, curr_page_data_indicies = _get_current_page_data_indicies()
assert len(self.plots.axs) == len(curr_page_data_indicies), f"len(plots.axs): {len(self.plots.axs)}, len(curr_page_data_indicies): {len(curr_page_data_indicies)}"
for ax, found_data_idx in zip(self.plots.axs, list(curr_page_data_indicies)): # TODO: might fail for the last page?
# print(f'found_data_idx: {found_data_idx}')
# found_data_index = curr_page_data_indicies[found_index]
# print(f'{current_page_idx = }, {found_data_index =}')
is_selected = self.is_selected.get(found_data_idx, False)
if is_selected:
ax.patch.set_facecolor('gray')
else:
ax.patch.set_facecolor('white')
# Redraw the figure to show the updated selection
self.fig.canvas.draw()
def on_click(self, event):
# Get the clicked Axes object
ax = event.inaxes
# Find the axes
found_index = safe_find_index_in_list(self.plots.axs, ax)
# print(f'{found_index = }')
current_page_idx, curr_page_data_indicies = _get_current_page_data_indicies()
found_data_index = curr_page_data_indicies[found_index]
# print(f'{current_page_idx = }, {found_data_index =}')
# Toggle the selection status of the clicked Axes
self.is_selected[found_data_index] = not self.is_selected.get(found_data_index, False) # if never set before, assume that it's not selected
# self.is_selected[ax] = not self.is_selected[ax]
# Set the face color of the clicked Axes based on its selection status
# if self.is_selected[ax]:
if self.is_selected[found_data_index]:
ax.patch.set_facecolor('gray')
else:
ax.patch.set_facecolor('white')
# Redraw the figure to show the updated selection
event.canvas.draw()
# ==================================================================================================================== #
# 2023-04-17 - Factor out interactive diagnostic figure code #
# ==================================================================================================================== #
## Create a diagnostic plot that plots a stack of the three curves used for computations in the given epoch:
import pyphoplacecellanalysis.External.pyqtgraph as pg
# ==================================================================================================================== #
# 2023-04-10 - Long short expected surprise #
# ==================================================================================================================== #
def _scramble_curve(pf: np.ndarray, roll_num_bins:int = 10, method='circ'):
""" Circularly rotates the 1D placefield """
return np.roll(pf, roll_num_bins)
# ==================================================================================================================== #
# 2023-03-09 - Parameter Sweeping #
# ==================================================================================================================== #
def _compute_parameter_sweep(spikes_df, active_pos, all_param_sweep_options: dict) -> dict:
""" Computes the PfNDs for all the swept parameters (combinations of grid_bin, smooth, etc)
Usage:
from PendingNotebookCode import _compute_parameter_sweep
smooth_options = [(None, None), (0.5, 0.5), (1.0, 1.0), (2.0, 2.0), (5.0, 5.0)]
grid_bin_options = [(1,1),(5,5),(10,10)]
all_param_sweep_options = cartesian_product(smooth_options, grid_bin_options)
param_sweep_option_n_values = dict(smooth=len(smooth_options), grid_bin=len(grid_bin_options))
output_pfs = _compute_parameter_sweep(spikes_df, active_pos, all_param_sweep_options)
"""
output_pfs = {} # empty dict
for a_sweep_dict in all_param_sweep_options:
a_sweep_tuple = frozenset(a_sweep_dict.items())
output_pfs[a_sweep_tuple] = PfND(deepcopy(spikes_df).spikes.sliced_by_neuron_type('pyramidal'), deepcopy(active_pos.linear_pos_obj), **a_sweep_dict) # grid_bin=, etc
return output_pfs
# ==================================================================================================================== #
# 2022-02-17 - Giving up on Rank-Order Sequence Analysis #
# ==================================================================================================================== #
"""
after convincing Kamran that the sample size of the diferent replays made them uncomparable.
"""
# # 2023-02-16 - Simple "weighted-center-of-mass" method of determing cell firing order in a timeseries
# +
@metadata_attributes(short_name=None, tags=['rank-order', 'spikes'], input_requires=[], output_provides=[], uses=[], used_by=[], creation_date='2023-02-16 00:00', related_items=[])
class SpikesRankOrder:
""" Simple "weighted-center-of-mass" method of determing cell firing order in a timeseries
"""
def compute_rankordered_spikes_during_epochs(active_spikes_df, active_epochs):
"""
Usage:
from neuropy.utils.efficient_interval_search import filter_epochs_by_num_active_units
active_sess = curr_active_pipeline.filtered_sessions['maze']
active_epochs = active_sess.perform_compute_estimated_replay_epochs(min_epoch_included_duration=None, max_epoch_included_duration=None, maximum_speed_thresh=None) # filter on nothing basically
active_spikes_df = active_sess.spikes_df.spikes.sliced_by_neuron_type('pyr') # only look at pyramidal cells
spike_trimmed_active_epochs, _extra_outputs = filter_epochs_by_num_active_units(active_spikes_df, active_epochs, min_inclusion_fr_active_thresh=2.0, min_num_unique_aclu_inclusions=1)
epoch_ranked_aclus_dict, active_spikes_df, all_probe_epoch_ids, all_aclus = compute_rankordered_spikes_during_epochs(active_spikes_df, active_epochs)
"""
from neuropy.utils.mixins.time_slicing import add_epochs_id_identity
# add the active_epoch's id to each spike in active_spikes_df to make filtering and grouping easier and more efficient:
active_spikes_df = add_epochs_id_identity(active_spikes_df, epochs_df=active_epochs.to_dataframe(), epoch_id_key_name='Probe_Epoch_id', epoch_label_column_name=None, override_time_variable_name='t_rel_seconds', no_interval_fill_value=-1) # uses new add_epochs_id_identity
# Get all aclus and epoch_idxs used throughout the entire spikes_df:
all_aclus = active_spikes_df['aclu'].unique()
all_probe_epoch_ids = active_spikes_df['Probe_Epoch_id'].unique()
selected_spikes = active_spikes_df.groupby(['Probe_Epoch_id', 'aclu'])[active_spikes_df.spikes.time_variable_name].first() # first spikes
# selected_spikes = active_spikes_df.groupby(['Probe_Epoch_id', 'aclu'])[active_spikes_df.spikes.time_variable_name].median() # median spikes
# rank the aclu values by their first t value in each Probe_Epoch_id
ranked_aclus = selected_spikes.groupby('Probe_Epoch_id').rank(method='dense') # resolve ties in ranking by assigning the same rank to each and then incrimenting for the next item
# create a nested dictionary of {Probe_Epoch_id: {aclu: rank}} from the ranked_aclu values
ranked_aclus_dict = {}
for (epoch_id, aclu), rank in zip(ranked_aclus.index, ranked_aclus):
if epoch_id not in ranked_aclus_dict:
ranked_aclus_dict[epoch_id] = {}
ranked_aclus_dict[epoch_id][aclu] = rank
# ranked_aclus_dict
return ranked_aclus_dict, active_spikes_df, all_probe_epoch_ids, all_aclus
# -
# +
def compute_rankordered_stats(epoch_ranked_aclus_dict):
""" Spearman rank-order tests:
WARNING, from documentation: Although calculation of the p-value does not make strong assumptions about the distributions underlying the samples, it is only accurate for very large samples (>500 observations). For smaller sample sizes, consider a permutation test (see Examples section below).
Usage:
epoch_ranked_aclus_stats_corr_values, epoch_ranked_aclus_stats_p_values, (outside_epochs_ranked_aclus_stats_corr_value, outside_epochs_ranked_aclus_stats_p_value) = compute_rankordered_stats(epoch_ranked_aclus_dict)
"""
import scipy.stats
epoch_ranked_aclus_stats_dict = {epoch_id:scipy.stats.spearmanr(np.array(list(rank_dict.keys())), np.array(list(rank_dict.values()))) for epoch_id, rank_dict in epoch_ranked_aclus_dict.items()}
# epoch_ranked_aclus_stats_dict
# Spearman statistic (correlation) values:
epoch_ranked_aclus_stats_corr_values = np.array([np.abs(rank_stats.statistic) for epoch_id, rank_stats in epoch_ranked_aclus_stats_dict.items()])
outside_epochs_ranked_aclus_stats_corr_value = epoch_ranked_aclus_stats_corr_values[0]
epoch_ranked_aclus_stats_corr_values = epoch_ranked_aclus_stats_corr_values[1:] # drop the first value corresponding to the -1 index. Now they correspond only to valid epoch_ids
# Spearman p-values:
epoch_ranked_aclus_stats_p_values = np.array([rank_stats.pvalue for epoch_id, rank_stats in epoch_ranked_aclus_stats_dict.items()])
outside_epochs_ranked_aclus_stats_p_value = epoch_ranked_aclus_stats_p_values[0]
epoch_ranked_aclus_stats_p_values = epoch_ranked_aclus_stats_p_values[1:] # drop the first value corresponding to the -1 index. Now they correspond only to valid epoch_ids
return epoch_ranked_aclus_stats_corr_values, epoch_ranked_aclus_stats_p_values, (outside_epochs_ranked_aclus_stats_corr_value, outside_epochs_ranked_aclus_stats_p_value)
# # + [markdown] jp-MarkdownHeadingCollapsed=true tags=[]
# # ### 2023-02-16 TODO: try to overcome issue with small sample sizes mentioned above by performing the permutation test:
# # +
# # def statistic(x): # permute only `x`
# # return scipy.stats.spearmanr(x, y).statistic
# # res_exact = scipy.stats.permutation_test((x,), statistic, permutation_type='pairings')
# res_asymptotic = scipy.stats.spearmanr(x, y)
# res_exact.pvalue, res_asymptotic.pvalue # asymptotic pvalue is too low
# # scipy.stats.permutation_test((x,), (lambda x: scipy.stats.spearmanr(x, y).statistic), permutation_type='pairings')
# ## Compute the exact value using permutations:
# # epoch_ranked_aclus_stats_exact_dict = {epoch_id:scipy.stats.permutation_test((np.array(list(rank_dict.keys())),), (lambda x: scipy.stats.spearmanr(x, np.array(list(rank_dict.values()))).statistic), permutation_type='pairings') for epoch_id, rank_dict in epoch_ranked_aclus_dict.items()}
# epoch_ranked_aclus_stats_exact_dict = {epoch_id:scipy.stats.permutation_test((np.array(list(rank_dict.values())),), (lambda y: scipy.stats.spearmanr(np.array(list(rank_dict.keys())), y).statistic), permutation_type='pairings') for epoch_id, rank_dict in epoch_ranked_aclus_dict.items()} # ValueError: each sample in `data` must contain two or more observations along `axis`.
# epoch_ranked_aclus_stats_exact_dict
# ==================================================================================================================== #
# 2022-12-22 - Posterior Confidences/Certainties #
# ==================================================================================================================== #
def _compute_epoch_posterior_confidences(X_decoding_of_Y_epochs_results):
""" average over positions to find the maximum likelihood in the posterior (value only) for each timebin. This is a rough estimate for how certain we are about each timebin.
Usage:
from PendingNotebookCode import _compute_epoch_posterior_confidences
long_decoding_of_short_epochs_results = _compute_epoch_posterior_confidences(long_decoding_of_short_epochs_results)
short_decoding_of_long_epochs_results = _compute_epoch_posterior_confidences(short_decoding_of_long_epochs_results)
"""
# loop through each returned epoch and compute its measurez:
X_decoding_of_Y_epochs_results.posterior_uncertainty_measure = [] # one for each decoded epoch
## combined_plottables variables refer to concatenating the values for each epoch so they can be plotted using a single matplotlib command:
X_decoding_of_Y_epochs_results.combined_plottables_x = []
X_decoding_of_Y_epochs_results.combined_plottables_y = []
for i, time_bin_container, p_x_given_n in zip(np.arange(X_decoding_of_Y_epochs_results.num_filter_epochs), X_decoding_of_Y_epochs_results.time_bin_containers, X_decoding_of_Y_epochs_results.p_x_given_n_list):
# average over positions to find the maximum likelihood in the posterior (value only) for each timebin. This is a rough estimate for how certain we are about each timebin.
posterior_uncertainty_measure = np.max(p_x_given_n, axis=0) # each value will be between (0.0, 1.0]
X_decoding_of_Y_epochs_results.posterior_uncertainty_measure.append(posterior_uncertainty_measure)
X_decoding_of_Y_epochs_results.combined_plottables_x.append(time_bin_container.centers)
X_decoding_of_Y_epochs_results.combined_plottables_y.append(posterior_uncertainty_measure)
return X_decoding_of_Y_epochs_results
# ==================================================================================================================== #
# 2022-12-20 - Overlapping Intervals #
# ==================================================================================================================== #
# https://www.baeldung.com/cs/finding-all-overlapping-intervals
# def eraseOverlapIntervals(intervals):
# """ https://leetcode.com/problems/non-overlapping-intervals/solutions/91702/python-simple-greedy-10-lines/ """
# if len(intervals) == 0:
# return 0
# intervals = sorted(intervals, key = lambda x:x[1])
# removeNum, curBorder = -1, intervals[0][1]
# for interval in intervals:
# if interval[0] < curBorder:
# removeNum += 1
# else:
# curBorder = interval[1]
# return removeNum
def eraseOverlapIntervals(intervals: List[List[int]]) -> int:
""" https://leetcode.com/problems/non-overlapping-intervals/solutions/424634/python-greedy-w-optimizations-faster-then-99-5/ """
max_count = len(intervals)
if max_count <= 1:
return 0
arr = sorted(intervals, key=lambda x: x[1])
counter = 0
last_end = float("-inf")
for elem in arr:
if elem[0] >= last_end:
last_end = elem[1]
counter += 1
return max_count - counter
def removeCoveredIntervals(intervals: List[List[int]]) -> List[List[int]]:
"""
https://leetcode.com/problems/remove-covered-intervals/solutions/879665/python-faster-than-99-using-dict/
Alternatives:
https://leetcode.com/problems/remove-covered-intervals/solutions/878478/python-simple-solution-explained-video-code-fastest/
https://leetcode.com/problems/remove-covered-intervals/solutions/1784520/python3-sorting-explained/?orderBy=most_votes&languageTags=python3
"""
d=dict()
high=-1
ans=0
out_intervals = []
totalIntervals = len(intervals)
for i in range(len(intervals)):
if intervals[i][0] in d.keys():
if d[intervals[i][0]]< intervals[i][1]:
d[intervals[i][0]] = intervals[i][1]
else:
d[intervals[i][0]] = intervals[i][1]
for i in sorted(d):
if d[i] > high:
high = d[i]
ans+=1
out_intervals.append(d[i])
return out_intervals
def merge(intervals: List[List[int]], in_place=False) -> List[List[int]]:
""" NOTE: modifies initial array.
Given an array of intervals where intervals[i] = [starti, endi], merge all overlapping intervals, and return an array of the non-overlapping intervals that cover all the intervals in the input.
https://leetcode.com/problems/merge-intervals/solutions/350272/python3-sort-o-nlog-n/ """
if not in_place:
# return a copy so the original intervals aren't modified.
intervals = deepcopy(intervals)
intervals.sort(key =lambda x: x[0])
merged =[]
for i in intervals:
# if the list of merged intervals is empty
# or if the current interval does not overlap with the previous,
# simply append it.
if not merged or merged[-1][-1] < i[0]:
merged.append(i)
# otherwise, there is overlap,
#so we merge the current and previous intervals.
else:
merged[-1][-1] = max(merged[-1][-1], i[-1])
return merged
# Divide Intervals Into Minimum Number of Groups _____________________________________________________________________ #
# """You are given a 2D integer array intervals where intervals[i] = [lefti, righti] represents the inclusive interval [lefti, righti].
# You have to divide the intervals into one or more groups such that each interval is in exactly one group, and no two intervals that are in the same group intersect each other.
# Return the minimum number of groups you need to make.
# """
import heapq
def minGroups(intervals: List[List[int]]) -> int:
""" https://leetcode.com/problems/divide-intervals-into-minimum-number-of-groups/solutions/2560020/min-heap/?orderBy=most_votes&languageTags=python3
Alternatives:
https://leetcode.com/problems/divide-intervals-into-minimum-number-of-groups/solutions/2568422/python-runtime-o-nlogn-96-12-memory-o-n/
"""
pq = []
for left, right in sorted(intervals):
if pq and pq[0] < left:
heapq.heappop(pq)
heapq.heappush(pq, right)
return pq
## overlap between ranges:
def overlap(a,b,c,d):
"""
return 0 for identical intervals and None for non-overlapping intervals as required.
https://stackoverflow.com/questions/11026167/interval-overlap-size
For ranges, see: https://stackoverflow.com/questions/6821156/how-to-find-range-overlap-in-python
`range(max(x[0], y[0]), min(x[-1], y[-1])+1)`
"""
r = 0 if a==c and b==d else min(b,d)-max(a,c)
if r>=0: return r
# def merge(self, intervals: List[List[int]]) -> List[List[int]]:
# """ https://dev.to/codekagei/algorithm-to-merge-overlapping-intervals-found-in-a-list-python-solution-5819 """
# if len(intervals) <= 1:
# return intervals
# output = []
# intervals.sort()
# current = intervals[0]
# output.append(current)
# for i in range(len(intervals)):
# current2 = current[1];
# next1 = intervals[i][0]
# next2 = intervals[i][1]
# if current2 >= next1:
# current[1] = max(current2, next2)
# else:
# current = intervals[i]
# output.append(current)
# return output
# ==================================================================================================================== #
# 2022-12-18 - Added Standardization of Position bins between short and long #
# ==================================================================================================================== #
from neuropy.analyses.placefields import PfND # for re-binning pf1D
from pyphoplacecellanalysis.General.Mixins.CrossComputationComparisonHelpers import _compare_computation_results
def merge_overlapping_intervals(intervals):
""" Doesn't seem to work. Generated by Chat-GPT """
# Sort the intervals by start time
intervals = sorted(intervals, key=lambda x: x[0])
# Initialize the result with the first interval
result = [intervals[0]]
# Iterate through the rest of the intervals
for interval in intervals[1:]:
# If the current interval overlaps with the last interval in the result,
# update the end time of the last interval to the maximum of the two end times
if interval[0] <= result[-1][1]:
result[-1][1] = max(result[-1][1], interval[1])
# Otherwise, append the current interval to the result
else:
result.append(interval)
return np.array(result)
def split_overlapping_intervals(intervals):
""" Doesn't seem to work. Generated by Chat-GPT """
# Sort the intervals by start time
intervals = sorted(intervals, key=lambda x: x[0])
result = []
# Iterate through the intervals
for interval in intervals:
# If the current interval overlaps with the last interval in the result,
# split the current interval into two non-overlapping intervals
if result and interval[0] <= result[-1][1]:
result.append([result[-1][1], interval[1]])
# Otherwise, append the current interval to the result
else:
result.append(interval)
return np.array(result)
### Piso-based interval overlap removal
# ## Build non-overlapping intervals with piso. Unsure of the computation efficiency, but the ouptuts are correct.
# import piso
# piso.register_accessors()
# print(f'pre: {active_filter_epochs.shape[0]}')
# valid_intervals = pd.arrays.IntervalArray.from_arrays(left=active_filter_epochs.start.values, right=active_filter_epochs.end.values).piso.symmetric_difference()
# valid_active_filter_epochs = np.vstack([valid_intervals.left.values.T, valid_intervals.right.values.T]).T
# print(f'post: {valid_active_filter_epochs.shape[0]}') # (37, 2)
# active_filter_epochs = valid_active_filter_epochs
def interleave(list1, list2):
""" Chat-GPT """
return [x for pair in zip(list1, list2) for x in pair]
import itertools
def interleave(list1, list2):
""" human solution """
return [x for x in itertools.chain.from_iterable(itertools.izip_longest(list1,list2)) if x]
def _get_common_cell_pf_results(long_neuron_ids, short_neuron_ids):
## get shared neuron info:
# this must be done after we rebuild the short_pf1D bins (if we need to) so they continue to match:
pf_neurons_diff = _compare_computation_results(long_neuron_ids, short_neuron_ids)
shared_aclus = pf_neurons_diff.intersection #.shape (56,)
print(f'shared_aclus: {shared_aclus}.\t np.shape: {np.shape(shared_aclus)}')
# curr_any_context_neurons = pf_neurons_diff.either
long_only_aclus = pf_neurons_diff.lhs_only
short_only_aclus = pf_neurons_diff.rhs_only
print(f'long_only_aclus: {long_only_aclus}.\t np.shape: {np.shape(long_only_aclus)}')
print(f'short_only_aclus: {short_only_aclus}.\t np.shape: {np.shape(short_only_aclus)}')
## Get the normalized_tuning_curves only for the shared aclus (that are common across (long/short/global):
long_is_included = np.isin(long_neuron_ids, shared_aclus) #.shape # (104, 63)
long_incl_aclus = np.array(long_neuron_ids)[long_is_included] #.shape # (98,)
long_incl_curves = long_pf1D.ratemap.normalized_tuning_curves[long_is_included] #.shape # (98, 63)
assert long_incl_aclus.shape[0] == long_incl_curves.shape[0] # (98,) == (98, 63)
short_is_included = np.isin(short_neuron_ids, shared_aclus)
short_incl_aclus = np.array(short_neuron_ids)[short_is_included] #.shape (98,)
short_incl_curves = short_pf1D.ratemap.normalized_tuning_curves[short_is_included] #.shape # (98, 40)
assert short_incl_aclus.shape[0] == short_incl_curves.shape[0] # (98,) == (98, 63)
# assert short_incl_curves.shape[1] == long_incl_curves.shape[1] # short and long should have the same bins
global_is_included = np.isin(global_neuron_ids, shared_aclus)
global_incl_aclus = np.array(global_neuron_ids)[global_is_included] #.shape (98,)
global_incl_curves = global_pf1D.ratemap.normalized_tuning_curves[global_is_included] #.shape # (98, 63)
assert global_incl_aclus.shape[0] == global_incl_curves.shape[0] # (98,) == (98, 63)
assert global_incl_curves.shape[1] == long_incl_curves.shape[1] # global and long should have the same bins
assert np.alltrue(np.isin(long_incl_aclus, short_incl_aclus))
assert np.alltrue(np.isin(long_incl_aclus, global_incl_aclus))
return
# ==================================================================================================================== #
# 2022-12-15 Importing from TestNeuropyPipeline241 #
# ==================================================================================================================== #
def _update_nearest_decoded_most_likely_position_callback(start_t, end_t):
""" Only uses end_t
Implicitly captures: ipspikesDataExplorer, _get_nearest_decoded_most_likely_position_callback