-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsdser.py
More file actions
949 lines (850 loc) · 40.5 KB
/
sdser.py
File metadata and controls
949 lines (850 loc) · 40.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
from nicegui import ui
import pdfplumber
import re
import tempfile
from pathlib import Path
import csv
from typing import Dict, List, Tuple
from dataclasses import dataclass
import os
import gc
@dataclass
class Entry:
name: str
ec: str
cas: str
types: List[str]
CAS_REGEX = r'\b\d{2,7}-\d{2}-\d\b'
EC_REGEX = r'\b\d{3}-\d{3}-\d\b'
def extract_from_pdf(path: str) -> Tuple[List[str], List[str], Dict[str, str], Dict[str, str], List[Tuple[str, str]]]:
"""Extract CAS and EC identifiers from PDF and try to infer nearby substance names.
Attempts a table-aware parse using word bounding boxes. Returns:
- list of CAS found
- list of EC found
- cas_name_map: mapping cas -> suggested name (from nearby cell/column)
- ec_name_map: mapping ec -> suggested name
- pairs: list of (cas, ec) tuples detected in the same row/box
Falls back to a plain-text regex scan if table parsing yields no words.
"""
cas_found = []
ec_found = []
cas_name_map: Dict[str, str] = {}
ec_name_map: Dict[str, str] = {}
pairs: List[Tuple[str, str]] = []
# keep positions of detected tokens to allow positional pairing heuristics
cas_positions: List[Tuple[str, float, float]] = []
ec_positions: List[Tuple[str, float, float]] = []
# Ensure mapping tables are available only when needed (lazy load)
try:
ensure_maps_loaded()
except Exception:
# ignore if ensure not available in some contexts
pass
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
words = page.extract_words()
if not words:
# fallback to whole-page text
t = page.extract_text() or ''
for c in re.findall(CAS_REGEX, t):
if c not in cas_found:
cas_found.append(c)
for e in re.findall(EC_REGEX, t):
if e not in ec_found:
ec_found.append(e)
continue
# compute centers
for w in words:
w['x0'] = float(w.get('x0', 0))
w['x1'] = float(w.get('x1', 0))
w['top'] = float(w.get('top', 0))
w['bottom'] = float(w.get('bottom', 0))
w['cx'] = (w['x0'] + w['x1']) / 2.0
w['cy'] = (w['top'] + w['bottom']) / 2.0
# record CAS/EC positions for later pairing heuristics
for c in re.findall(CAS_REGEX, w.get('text', '') or ''):
cas_positions.append((c, w['cx'], w['cy']))
for e in re.findall(EC_REGEX, w.get('text', '') or ''):
ec_positions.append((e, w['cx'], w['cy']))
# cluster columns by x center (simple greedy clustering)
centers = sorted({round(w['cx'], 1) for w in words})
cols = []
for c in centers:
if not cols:
cols.append([c])
else:
if abs(c - (sum(cols[-1]) / len(cols[-1]))) < 40:
cols[-1].append(c)
else:
cols.append([c])
col_centers = [sum(group) / len(group) for group in cols]
# assign each word to nearest column
for w in words:
best_i = min(range(len(col_centers)), key=lambda i: abs(w['cx'] - col_centers[i]))
w['col'] = best_i
# cluster rows by y (group words on same line)
words_sorted = sorted(words, key=lambda x: x['cy'])
rows_clusters = []
for w in words_sorted:
if not rows_clusters:
rows_clusters.append([w])
else:
last = rows_clusters[-1]
# allow slightly larger vertical tolerance for row clustering
if abs(w['cy'] - (sum(x['cy'] for x in last) / len(last))) < 9:
last.append(w)
else:
rows_clusters.append([w])
# build a lightweight table: first collect cell_texts for all rows
all_row_cells = []
for r in rows_clusters:
cols_text = {}
for w in sorted(r, key=lambda x: x['x0']):
cols_text.setdefault(w['col'], []).append(w['text'])
cell_texts = {ci: ' '.join(ws).strip() for ci, ws in cols_text.items()}
all_row_cells.append(cell_texts)
# attempt to detect header row (look for 'name' or language variants)
header_row_idx = None
name_col_idx = None
cas_col_idx = None
ec_col_idx = None
name_keywords = ['name', 'naam', 'substance', 'stoff', 'stofnaam']
cas_keywords = ['cas', 'cas-nummer', 'cas number']
ec_keywords = ['eg-nummer', 'eg nummer', 'ec number', 'einecs', 'eg-nummer']
for i, cell_texts in enumerate(all_row_cells[:8]):
hits = 0
for ci, txt in cell_texts.items():
tl = txt.lower()
if any(k in tl for k in name_keywords):
hits += 2
if any(k in tl for k in cas_keywords + ec_keywords):
hits += 1
if hits >= 2:
header_row_idx = i
# determine specific columns from header
for ci, txt in all_row_cells[i].items():
tl = txt.lower()
if any(k in tl for k in name_keywords) and name_col_idx is None:
name_col_idx = ci
if any(k in tl for k in cas_keywords) and cas_col_idx is None:
cas_col_idx = ci
if any(k in tl for k in ec_keywords) and ec_col_idx is None:
ec_col_idx = ci
break
# helper to filter out likely-non-name tokens
def looks_like_name(s: str) -> bool:
if not s:
return False
if any(t in s.lower() for t in ['catalog', 'catalognumber', 'catalogue', 'lot', 'batch', 'temp', 'temperature', '°c', '°f']):
return False
# avoid pure numeric or containing parentheses-only tokens like temperatures
if any(ch.isdigit() for ch in s):
# allow digits if there are enough letters too (e.g., chemical names with numbers)
letters = sum(c.isalpha() for c in s)
# require at least 3 letters to accept a token that contains digits
if letters < 3:
return False
return True
def clean_name_token(s: str) -> str:
if not s:
return ''
t = s.strip()
# normalize newlines/spaces
t = t.replace('\n', ' ')
t = re.sub(r'\s+', ' ', t)
# remove common id prefixes and language variants (include ELINCS)
t = re.sub(r'(?i)\b(elincs|einecs|inecs|cas(?: number)?|ec(?: number)?|ec-no(?:\.|:)?|eg-?nummer|egnr|nummer|reg.no|reg nummer)[:\s-]*', '', t)
# remove leading labels like 'Name:' or 'Substance:'
t = re.sub(r'(?i)\b(name|substance|chemical|product|compound)[:\s-]*', '', t)
# strip leading bare CAS/EC tokens (e.g., '78-93-3 ' or '472-160-3 ')
t = re.sub(r'^\s*(?:\d{2,7}-\d{2}-\d|\d{3}-\d{3}-\d)[:\s-]*', '', t)
# remove trailing registration phrases like 'Reg.nr.: Exempt'
t = re.sub(r'(?i)\breg\.?\s*nr[:\.\s-]*.*$', '', t)
# strip surrounding punctuation and repeated whitespace
t = t.strip(' -:;,.')
t = re.sub(r'\s+', ' ', t)
return t.strip()
# now process rows, skipping header row if detected
for ridx, cell_texts in enumerate(all_row_cells):
if header_row_idx is not None and ridx == header_row_idx:
continue
# collect ids in row
row_cas = []
row_ec = []
for txt in cell_texts.values():
for c in re.findall(CAS_REGEX, txt):
row_cas.append(c)
for e in re.findall(EC_REGEX, txt):
row_ec.append(e)
# determine name candidate: prefer header-defined name column
name_candidate = ''
if name_col_idx is not None and name_col_idx in cell_texts:
cand = cell_texts[name_col_idx]
cand = clean_name_token(cand)
if looks_like_name(cand):
name_candidate = cand
# otherwise prefer neighboring column(s) of the id cell(s)
if not name_candidate:
chosen = None
# try CAS cells first
if row_cas:
for c in row_cas:
for ci, txt in cell_texts.items():
if c in txt:
# prefer right column, then left; also try combining with next row
curr = clean_name_token(cell_texts.get(ci, ''))
right = clean_name_token(cell_texts.get(ci + 1, ''))
left = clean_name_token(cell_texts.get(ci - 1, ''))
# next row same col / next row right col
# next rows (up to 2) same col / right col and next-right col
next_same = ''
next_right = ''
next2_same = ''
next2_right = ''
if ridx + 1 < len(all_row_cells):
next_cells = all_row_cells[ridx + 1]
next_same = clean_name_token(next_cells.get(ci, ''))
next_right = clean_name_token(next_cells.get(ci + 1, ''))
if ridx + 2 < len(all_row_cells):
next2_cells = all_row_cells[ridx + 2]
next2_same = clean_name_token(next2_cells.get(ci, ''))
next2_right = clean_name_token(next2_cells.get(ci + 1, ''))
# build candidate combinations (prefer longer, valid names)
candidates = []
# build combinations using up to two next rows and two columns (prefer longer)
combos = []
# include previous columns (-1, -2) and previous rows (-1, -2)
prev_same = ''
prev_right = ''
prev2_same = ''
prev2_right = ''
if ridx - 1 >= 0:
prev_cells = all_row_cells[ridx - 1]
prev_same = clean_name_token(prev_cells.get(ci, ''))
prev_right = clean_name_token(prev_cells.get(ci + 1, ''))
if ridx - 2 >= 0:
prev2_cells = all_row_cells[ridx - 2]
prev2_same = clean_name_token(prev2_cells.get(ci, ''))
prev2_right = clean_name_token(prev2_cells.get(ci + 1, ''))
combos.append(' '.join(filter(None, [prev2_same, prev_same, curr, right, next_same, next2_same, cell_texts.get(ci + 2, '') if cell_texts.get(ci + 2) else ''])))
combos.append(' '.join(filter(None, [prev_same, curr, right, next_same, next2_same])))
combos.append(' '.join(filter(None, [curr, right, next_same, next2_same])))
combos.append(' '.join(filter(None, [right, next_same, next2_same, next2_right])))
combos.append(' '.join(filter(None, [prev2_same, prev_same, curr, next_same, next2_same])))
combos.append(' '.join(filter(None, [cell_texts.get(ci - 2, ''), cell_texts.get(ci - 1, ''), curr, right])))
combos.append(' '.join(filter(None, [cell_texts.get(ci - 1, ''), curr, cell_texts.get(ci + 1, ''), cell_texts.get(ci + 2, '')])))
combos.append(curr)
combos.append(right)
combos.append(next_same)
combos.append(next2_same)
combos.append(prev_same)
combos.append(prev2_same)
combos.append(left)
for cval in combos:
if cval:
candidates.append(clean_name_token(cval))
# fallback single tokens
if next_same:
candidates.append(next_same)
if right:
candidates.append(right)
if left:
candidates.append(left)
# pick the best candidate that looks like a name and is longest
best = None
best_score = -1e9
for cand in candidates:
if not cand or not looks_like_name(cand):
continue
letters = sum(ch.isalpha() for ch in cand)
digits = sum(ch.isdigit() for ch in cand)
words = len(cand.split())
score = letters - digits * 2 + words * 0.5
if score > best_score:
best_score = score
best = cand
if best:
# attempt to append short alphabetic-only suffix tokens from nearby cells
suffixes = []
alpha_re = re.compile(r"^[A-Za-zÀ-ÖØ-öø-ÿ\-()/.]+$")
hazard_blacklist = ['flam', 'liq', 'flamm', 'aquatic', 'chronic', 'stot', 'acute', 'skin', 'irrit', 'sens', 'reg', 'nr', 'exempt', 'inhoud', 'content', 'ingredient']
# search neighbors: previous 2 rows, next 2 rows, columns -3..+3
for rr in range(ridx - 2, ridx + 3):
if rr < 0 or rr >= len(all_row_cells):
continue
cells_r = all_row_cells[rr]
for cc in range(ci - 3, ci + 4):
if cc == ci:
continue
txt = clean_name_token(cells_r.get(cc, ''))
if not txt:
continue
for tok in txt.split():
t = clean_name_token(tok)
if t and alpha_re.match(t) and len(t) >= 4 and t.lower() not in best.lower():
# avoid hazard/keyphrase tokens
low = t.lower()
if any(h in low for h in hazard_blacklist):
continue
# ensure not a CAS/EC-like token
if not re.match(r"^\d{2,7}-\d{2}-\d$", t) and not re.match(r"^\d{3}-\d{3}-\d$", t):
suffixes.append(t)
if suffixes:
# append unique suffixes in order
seen = set()
append_parts = []
for s in suffixes:
if s.lower() in seen:
continue
seen.add(s.lower())
append_parts.append(s)
if len(append_parts) >= 3:
break
best = (best + ' ' + ' '.join(append_parts)).strip()
chosen = best
break
if chosen:
break
# try EC cells if still not found
if not chosen and row_ec:
for e in row_ec:
for ci, txt in cell_texts.items():
if e in txt:
curr = clean_name_token(cell_texts.get(ci, ''))
right = clean_name_token(cell_texts.get(ci + 1, ''))
left = clean_name_token(cell_texts.get(ci - 1, ''))
next_same = ''
next_right = ''
next2_same = ''
next2_right = ''
if ridx + 1 < len(all_row_cells):
next_cells = all_row_cells[ridx + 1]
next_same = clean_name_token(next_cells.get(ci, ''))
next_right = clean_name_token(next_cells.get(ci + 1, ''))
if ridx + 2 < len(all_row_cells):
next2_cells = all_row_cells[ridx + 2]
next2_same = clean_name_token(next2_cells.get(ci, ''))
next2_right = clean_name_token(next2_cells.get(ci + 1, ''))
candidates = []
combos = []
combos.append(' '.join(filter(None, [cell_texts.get(ci - 2, ''), cell_texts.get(ci - 1, ''), curr, right, next_same, next_right, next2_same, cell_texts.get(ci + 2, ''), cell_texts.get(ci + 3, '')])))
combos.append(' '.join(filter(None, [curr, right, next_same, next2_same])))
combos.append(' '.join(filter(None, [right, next_same, next2_same])))
combos.append(' '.join(filter(None, [curr, next_same, next2_same])))
combos.append(' '.join(filter(None, [cell_texts.get(ci + 1, ''), cell_texts.get(ci + 2, ''), cell_texts.get(ci + 3, ''), next_same, next2_same])))
combos.append(curr)
combos.append(right)
combos.append(next_same)
combos.append(next2_same)
combos.append(left)
for cval in combos:
if cval:
candidates.append(clean_name_token(cval))
best = None
best_score = -1e9
for cand in candidates:
if not cand or not looks_like_name(cand):
continue
letters = sum(ch.isalpha() for ch in cand)
digits = sum(ch.isdigit() for ch in cand)
words = len(cand.split())
score = letters - digits * 2 + words * 0.5
if score > best_score:
best_score = score
best = cand
if best:
chosen = best
break
if chosen:
break
if chosen:
name_candidate = clean_name_token(chosen)
# assign mappings and pairs
for c in row_cas:
if c not in cas_found:
cas_found.append(c)
if name_candidate:
cas_name_map.setdefault(c, name_candidate)
for e in row_ec:
if e not in ec_found:
ec_found.append(e)
if name_candidate:
ec_name_map.setdefault(e, name_candidate)
for c in row_cas:
for e in row_ec:
pairs.append((c, e))
# final dedupe and sort
cas_found = sorted(set(cas_found))
ec_found = sorted(set(ec_found))
pairs = list(dict.fromkeys(pairs))
# If no explicit pairs were found, try pairing CAS and EC by positional
# proximity using token centers captured earlier. This helps when table
# clustering split identifiers into adjacent rows.
if not pairs and cas_positions and ec_positions:
used_ec = set()
for c, cx, cy in cas_positions:
best_j = None
best_score = 1e9
for j, (e, ex, ey) in enumerate(ec_positions):
if j in used_ec:
continue
# give vertical distance more weight
score = abs(ey - cy) * 2 + abs(ex - cx)
if score < best_score:
best_score = score
best_j = j
# accept pairing if reasonably close (heuristic)
if best_j is not None and best_score < 80:
pairs.append((c, ec_positions[best_j][0]))
used_ec.add(best_j)
# If we found explicit pairs, filter out stray CAS/EC matches that
# are likely false positives (e.g., header/footer tokens). Keep only
# identifiers that appear in pairs or that are present in the CSV
# mapping tables (authoritative source).
if pairs:
pair_cas = {c for c, _ in pairs}
pair_ec = {e for _, e in pairs}
cas_map_glob = globals().get('CAS_MAP', {}) or {}
ec_map_glob = globals().get('EC_MAP', {}) or {}
filtered_cas = set(pair_cas)
filtered_ec = set(pair_ec)
# retain any CAS/EC that have authoritative mappings as well
for c in cas_found:
if c in cas_map_glob:
filtered_cas.add(c)
for e in ec_found:
if e in ec_map_glob:
filtered_ec.add(e)
cas_found = sorted(filtered_cas)
ec_found = sorted(filtered_ec)
# If possible, also augment pairs using known mappings from loaded CSV maps
# (CAS_MAP and EC_MAP are loaded later at module import time; when this
# function is called at runtime they should be available.)
try:
extra = []
# prefer CAS -> EC mappings
for c in cas_found:
m = globals().get('CAS_MAP', {}).get(c)
if m and m.ec:
extra.append((c, m.ec))
# also allow EC -> CAS mappings
for e in ec_found:
m = globals().get('EC_MAP', {}).get(e)
if m and m.cas:
extra.append((m.cas, e))
# append extras while preserving order and uniqueness
for p in extra:
if p not in pairs:
pairs.append(p)
except Exception:
# be forgiving if CAS_MAP/EC_MAP are not available
pass
# Fill name suggestion maps using CSV-backed maps when available
try:
for c, e in pairs:
if c and not cas_name_map.get(c):
m = globals().get('CAS_MAP', {}).get(c)
if m and m.name:
cas_name_map[c] = m.name
if e and not ec_name_map.get(e):
m = globals().get('EC_MAP', {}).get(e)
if m and m.name:
ec_name_map[e] = m.name
for c in cas_found:
if c and not cas_name_map.get(c):
m = globals().get('CAS_MAP', {}).get(c)
if m and m.name:
cas_name_map[c] = m.name
for e in ec_found:
if e and not ec_name_map.get(e):
m = globals().get('EC_MAP', {}).get(e)
if m and m.name:
ec_name_map[e] = m.name
except Exception:
pass
# Try to merge cas/ec name fragments when the cas-name looks like a prefix and the ec-name is a short alphabetic suffix
try:
for c, e in pairs:
cn = cas_name_map.get(c, '')
en = ec_name_map.get(e, '')
if cn and en and en.lower() not in cn.lower():
# accept en if it's alphabetic-ish and not a hazard phrase
if re.match(r"^[A-Za-zÀ-ÖØ-öø-ÿ\-()/. ]+$", en) and len(re.sub(r"[^A-Za-z]", "", en)) >= 4:
# if prefix ends with a closing paren and suffix starts alpha, don't insert extra space
if cn.endswith(')') and en and en[0].isalpha():
merged = (cn + en).strip()
else:
merged = (cn + ' ' + en).strip()
cas_name_map[c] = merged
ec_name_map[e] = merged
except Exception:
pass
return cas_found, ec_found, cas_name_map, ec_name_map, pairs
def load_svhc_csv(filename: str) -> Tuple[Dict[str, Entry], Dict[str, Entry]]:
"""Load SVHC CSV and return two mappings: cas->entry and ec->entry.
The CSV is expected to have headers including at least: 'Substance name', 'EC number', 'CAS number'.
Empty values or '-' are treated as missing.
"""
data_dir = Path(__file__).parent / 'data'
path = data_dir / filename
cas_map: Dict[str, Entry] = {}
ec_map: Dict[str, Entry] = {}
if not path.exists():
return cas_map, ec_map
# Try several encodings commonly encountered on Windows and in exports.
encodings_to_try = ['utf-8', 'utf-8-sig', 'cp1252', 'latin-1']
rows = None
for enc in encodings_to_try:
try:
with path.open(newline='', encoding=enc) as f:
reader = csv.DictReader(f)
rows = list(reader)
break
except UnicodeDecodeError:
rows = None
continue
# If still None (very unlikely), reopen with latin-1 and replace invalid bytes.
if rows is None:
with path.open(newline='', encoding='latin-1', errors='replace') as f:
reader = csv.DictReader(f)
rows = list(reader)
for row in rows:
name = (row.get('Substance name') or '').strip()
ec = (row.get('EC number') or '').strip()
cas = (row.get('CAS number') or '').strip()
# normalize '-' and empty to ''
if ec == '-' or ec == '':
ec = ''
if cas == '-' or cas == '':
cas = ''
entry = Entry(name=name, ec=ec, cas=cas, types=['SVHC'])
if cas:
# if existing entry, merge types and prefer non-empty name
existing = cas_map.get(cas)
if existing:
for t in entry.types:
if t not in existing.types:
existing.types.append(t)
if not existing.name and entry.name:
existing.name = entry.name
cas_map[cas] = existing
else:
cas_map[cas] = entry
if ec:
existing = ec_map.get(ec)
if existing:
for t in entry.types:
if t not in existing.types:
existing.types.append(t)
if not existing.name and entry.name:
existing.name = entry.name
ec_map[ec] = existing
else:
ec_map[ec] = entry
return cas_map, ec_map
# Initially do not load CSV maps into memory; load them on demand to save memory
CAS_MAP = None
EC_MAP = None
def load_zzs_csv(filename: str, cas_map: Dict[str, Entry], ec_map: Dict[str, Entry]):
"""Load ZZS CSV (semicolon-delimited) and merge entries into provided maps.
Expected headers: 'CAS-nummer', 'EG-nummer', 'Nederlandse stofnaam', 'Engelse stofnaam'
Uses Dutch name where available. Adds type 'ZZS' to matched entries and merges with existing ones.
"""
data_dir = Path(__file__).parent / 'data'
path = data_dir / filename
if not path.exists():
return
# ZZS CSV uses semicolon delimiter and may have encoding issues too
encodings_to_try = ['utf-8', 'utf-8-sig', 'cp1252', 'latin-1']
rows = None
for enc in encodings_to_try:
try:
with path.open(newline='', encoding=enc) as f:
reader = csv.DictReader(f, delimiter=';')
rows = list(reader)
break
except UnicodeDecodeError:
rows = None
continue
if rows is None:
with path.open(newline='', encoding='latin-1', errors='replace') as f:
reader = csv.DictReader(f, delimiter=';')
rows = list(reader)
for row in rows:
cas = (row.get('CAS-nummer') or '').strip()
ec = (row.get('EG-nummer') or '').strip()
name_nl = (row.get('Nederlandse stofnaam') or '').strip()
name_en = (row.get('Engelse stofnaam') or '').strip()
name = name_nl or name_en or ''
if ec == '-' or ec == '':
ec = ''
if cas == '-' or cas == '':
cas = ''
entry = Entry(name=name, ec=ec, cas=cas, types=['ZZS'])
if cas:
existing = cas_map.get(cas)
if existing:
for t in entry.types:
if t not in existing.types:
existing.types.append(t)
if not existing.name and entry.name:
existing.name = entry.name
cas_map[cas] = existing
else:
cas_map[cas] = entry
if ec:
existing = ec_map.get(ec)
if existing:
for t in entry.types:
if t not in existing.types:
existing.types.append(t)
if not existing.name and entry.name:
existing.name = entry.name
ec_map[ec] = existing
else:
ec_map[ec] = entry
# previously we eagerly loaded CSVs here; maps are now loaded lazily by ensure_maps_loaded()
def load_potential_zzs_csv(filename: str, cas_map: Dict[str, Entry], ec_map: Dict[str, Entry]):
"""Load Potential ZZS CSV (semicolon-delimited) and merge entries into provided maps.
Expected headers similar to the ZZS list: 'CAS-nummer', 'EG-nummer', 'Nederlandse stofnaam', 'Engelse stofnaam'.
Adds type 'Potential ZZS' to matched entries and merges with existing ones.
"""
data_dir = Path(__file__).parent / 'data'
path = data_dir / filename
if not path.exists():
return
encodings_to_try = ['utf-8', 'utf-8-sig', 'cp1252', 'latin-1']
rows = None
for enc in encodings_to_try:
try:
with path.open(newline='', encoding=enc) as f:
reader = csv.DictReader(f, delimiter=';')
rows = list(reader)
break
except UnicodeDecodeError:
rows = None
continue
if rows is None:
with path.open(newline='', encoding='latin-1', errors='replace') as f:
reader = csv.DictReader(f, delimiter=';')
rows = list(reader)
for row in rows:
cas = (row.get('CAS-nummer') or '').strip()
ec = (row.get('EG-nummer') or '').strip()
name_nl = (row.get('Nederlandse stofnaam') or '').strip()
name_en = (row.get('Engelse stofnaam') or '').strip()
name = name_nl or name_en or ''
if ec == '-' or ec == '':
ec = ''
if cas == '-' or cas == '':
cas = ''
entry = Entry(name=name, ec=ec, cas=cas, types=['Potential ZZS'])
if cas:
existing = cas_map.get(cas)
if existing:
for t in entry.types:
if t not in existing.types:
existing.types.append(t)
if not existing.name and entry.name:
existing.name = entry.name
cas_map[cas] = existing
else:
cas_map[cas] = entry
if ec:
existing = ec_map.get(ec)
if existing:
for t in entry.types:
if t not in existing.types:
existing.types.append(t)
if not existing.name and entry.name:
existing.name = entry.name
ec_map[ec] = existing
else:
ec_map[ec] = entry
# attempt to load potential ZZS and merge
# also lazily merged via ensure_maps_loaded()
def ensure_maps_loaded():
"""Load CAS/EC mapping CSVs into module globals if not already loaded.
Loading is done once and retained for subsequent calls. This keeps memory
usage low until a PDF is actually processed.
"""
global CAS_MAP, EC_MAP
if CAS_MAP is not None and EC_MAP is not None:
return
cas_map, ec_map = load_svhc_csv('candidate_list_full-2025-09-15.csv')
# merge ZZS and potential ZZS into the maps
load_zzs_csv('ZZS.csv', cas_map, ec_map)
load_potential_zzs_csv('DownloadPotentieleZZSlijst.csv', cas_map, ec_map)
CAS_MAP = cas_map
EC_MAP = ec_map
def resolve_to_rows(found_cas: List[str], found_ec: List[str], cas_name_map: Dict[str, str] = None, ec_name_map: Dict[str, str] = None, pairs: List[Tuple[str, str]] = None) -> List[dict]:
"""Given lists of extracted CAS and EC identifiers, return unique rows for the results table.
Each row is a dict with keys: 'EC', 'CAS', 'Substance', 'Type'. Deduplicate by the pair (cas, ec).
"""
cas_name_map = cas_name_map or {}
ec_name_map = ec_name_map or {}
pairs = pairs or []
# ensure maps are present when resolving to rows
try:
ensure_maps_loaded()
except Exception:
pass
rows = []
seen = set()
# handle detected pairs first
paired = set()
for cas, ec in pairs:
if (cas, ec) in paired:
continue
paired.add((cas, ec))
cas_entry = CAS_MAP.get(cas)
ec_entry = EC_MAP.get(ec)
name = ''
types = []
if cas_entry:
name = cas_entry.name or name
for t in cas_entry.types:
if t not in types:
types.append(t)
if ec_entry:
name = name or ec_entry.name
for t in ec_entry.types:
if t not in types:
types.append(t)
# fall back to PDF suggestions
name = name or cas_name_map.get(cas) or ec_name_map.get(ec) or ''
row_key = (cas or '', ec or '')
if row_key in seen:
continue
seen.add(row_key)
row_type = ','.join(types) if types else 'unresolved'
rows.append({'EC': ec, 'CAS': cas, 'Substance': name, 'Type': row_type})
# then handle remaining CAS matches
for cas in found_cas:
if any(cas == p[0] for p in paired):
continue
mapped = CAS_MAP.get(cas)
ec_val = mapped.ec if mapped and mapped.ec else ''
name = mapped.name if mapped else ''
if not name:
name = cas_name_map.get(cas, '')
row_key = (cas or '', ec_val or '')
if row_key in seen:
continue
seen.add(row_key)
types = mapped.types if mapped else []
row_type = ",".join(types) if types else 'unresolved'
rows.append({'EC': ec_val, 'CAS': cas, 'Substance': name, 'Type': row_type})
# then handle remaining EC matches
for ec in found_ec:
if any(ec == p[1] for p in paired):
continue
mapped = EC_MAP.get(ec)
cas_val = mapped.cas if mapped and mapped.cas else ''
name = mapped.name if mapped else ''
if not name:
name = ec_name_map.get(ec, '')
row_key = (cas_val or '', ec or '')
if row_key in seen:
continue
seen.add(row_key)
types = mapped.types if mapped else []
row_type = ",".join(types) if types else 'unresolved'
rows.append({'EC': ec, 'CAS': cas_val, 'Substance': name, 'Type': row_type})
# show unresolved rows after resolved rows
rows.sort(key=lambda r: (r.get('Type', '') == 'unresolved'))
return rows
async def handle_pdf_upload(event):
uploaded_file = event.file # SmallFileUpload object
# bytes van PDF ophalen
pdf_bytes = await uploaded_file.read()
# tijdelijk bestand
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
tmp.write(pdf_bytes)
tmp.flush()
cas, ec, cas_name_map, ec_name_map, pairs = extract_from_pdf(tmp.name)
# optional small notification with counts
try:
ui.notify(f"Found {len(cas)} CAS and {len(ec)} EC", position='top')
except Exception:
pass
# resolve to a set of rows for display
resolved_rows = resolve_to_rows(cas, ec, cas_name_map=cas_name_map, ec_name_map=ec_name_map, pairs=pairs)
results_table.rows = resolved_rows
# cleanup large temporaries to keep memory footprint small
try:
# remove bytes buffer
del pdf_bytes
except Exception:
pass
try:
cas = None
ec = None
cas_name_map = None
ec_name_map = None
pairs = None
except Exception:
pass
try:
tmp.close()
except Exception:
pass
try:
os.unlink(tmp.name)
except Exception:
pass
try:
gc.collect()
except Exception:
pass
# hide upload to prevent further uploads and show clear button
try:
upload.set_visibility(False)
except Exception:
pass
try:
clear_button.set_visibility(True)
except Exception:
pass
ui.markdown("## SDS CAS/EC extractor and type resolver")
ui.label("Upload een SDS PDF om CAS- en EC-nummers te extraheren")
# upload component (keep a reference so we can hide it after one upload)
upload = ui.upload(
label="Upload SDS PDF",
auto_upload=True,
on_upload=handle_pdf_upload,
)
ui.separator()
# Clear button to reset UI; initially hidden until after an upload
def clear_all():
try:
results_table.rows = []
except Exception:
pass
try:
upload.set_visibility(True)
except Exception:
pass
try:
clear_button.set_visibility(False)
except Exception:
pass
try:
ui.notify('Cleared. You can upload a new PDF.', position='top')
except Exception:
pass
clear_button = ui.button('Clear', on_click=lambda: clear_all())
clear_button.set_visibility(False)
ui.markdown("### Gevonden stoffen met eventuele type matching")
# results table columns: EC, CAS, Substance, Type
results_table = ui.table(
columns=[
{'name': 'EC', 'field': 'EC', 'label': 'EC'},
{'name': 'CAS', 'field': 'CAS', 'label': 'CAS'},
{'name': 'Substance', 'field': 'Substance', 'label': 'Substance'},
{'name': 'Type', 'field': 'Type', 'label': 'Type'},
],
rows=[],
).props('small')
if __name__ in {"__main__", "__mp_main__"}:
ui.run(port=8080)