-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoed_prettifier.py
More file actions
333 lines (296 loc) · 16.3 KB
/
oed_prettifier.py
File metadata and controls
333 lines (296 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import argparse
import itertools
import logging
import os
import subprocess
import shutil
import sys
import time
from concurrent.futures import as_completed
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from duplicate_handler import DuplicateHandler
from pathlib import Path
from processing_worker import process_entry_line_worker
from pyglossary.glossary_v2 import Glossary
@dataclass
class ConverterConfig:
input_tsv: Path
output_ifo: str
add_syns: bool
workers: int | None
debug_words: list[str] | None
dump_html: bool
dump_logs: bool
class DictionaryConverter:
"""Orchestrates the conversion from a TSV file to a Stardict dictionary."""
def __init__(self, config: ConverterConfig):
if not config.input_tsv.is_file():
sys.exit(f"Error: Input TSV file not found at '{config.input_tsv}'")
self.input_tsv = config.input_tsv
self.output_ifo_name = config.output_ifo
self.add_syns = config.add_syns
self.dump_html = config.dump_html
self.dump_logs = config.dump_logs
self.debug_words = set(config.debug_words) if config.debug_words else None
if self.debug_words:
self.workers = 1
elif config.workers is not None:
self.workers = max(1, min(config.workers, os.cpu_count() or 1))
else:
self.workers = max(1, (os.cpu_count() or 1) - 1)
self.start_time = time.time()
self.metrics = {
'source_entry_count': 0, 'split_entry_count': 0, 'final_entry_count': 0,
'malformed_lines': 0, 'dotted_words': 0, 'dot_corrected': 0,
'synonyms_added_count': 0, 'total_entries': 0
}
self.processing_errors = []
self.unique_headwords = set()
Glossary.init()
self.glos = Glossary()
pyg_log = logging.getLogger("pyglossary")
if self.debug_words:
pyg_log.setLevel(logging.DEBUG)
else:
pyg_log.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s"))
pyg_log.addHandler(handler)
def _create_entry(self, all_words: list[str], final_definition: str):
"""Helper function to create a glossary entry from processed data."""
main_headword = all_words[0]
other_words = set(all_words[1:])
other_words.discard(main_headword)
sorted_words = [main_headword] + sorted(list(other_words))
entry = self.glos.newEntry(word=sorted_words, defi=final_definition, defiFormat='h')
self.glos.addEntry(entry)
self.metrics['final_entry_count'] += 1
def run(self):
"""Reads a TSV file, processes entries in parallel, and prepares the glossary."""
if self.debug_words:
print(f"--> Running in DEBUG mode for headword(s): {', '.join(sorted(self.debug_words))}")
label = "process" if self.workers == 1 else "processes"
print(f"--> Using {self.workers} worker {label}.")
print(f"--> Reading and processing '{self.input_tsv}'...")
try:
with open(self.input_tsv, 'r', encoding='utf-8') as f:
all_lines = []
for line in f:
stripped_line = line.strip()
if not stripped_line:
continue
if stripped_line.startswith('##'):
self._process_metadata_line(stripped_line)
else:
if self.debug_words:
word = stripped_line.split('\t', 1)[0]
if word in self.debug_words:
all_lines.append(stripped_line)
else:
all_lines.append(stripped_line)
if self.metrics['total_entries'] == 0:
self.metrics['total_entries'] = len(all_lines)
# Package lines with the add_syns flag for the workers
tasks = [(line, self.add_syns, self.debug_words) for line in all_lines]
completed_count = 0
spinner = itertools.cycle(['▁', '▂', '▃', '▄', '▅', '▆', '▇', '█', '▇', '▆', '▅', '▄', '▃', '▂'])
redundancy_reaper = DuplicateHandler(self.output_ifo_name)
print("--> Submitting tasks to workers... this might take a few seconds.")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
# Submit all tasks at once and get a dictionary of future-to-task mappings
futures = {executor.submit(process_entry_line_worker, task): task for task in tasks}
# Start the line for the spinner/progress
print("--> Processing: ", end='', flush=True)
# Process results as they are completed
for future in as_completed(futures):
try:
result = future.result()
if result['status'] == 'ok':
m = result['metrics']
is_split = m.get('split_entry', 0) > 0
for res in result['results']:
if self.dump_html and self.debug_words:
print(f"\n\n--> HTML Dump for '{res['words'][0]}':\n{res['definition']}\n")
redundancy_reaper.add(res['words'], res['definition'], self.debug_words, is_split_part=is_split)
self.unique_headwords.add(res['words'][0])
self.metrics['source_entry_count'] += m['source_entry']
self.metrics['split_entry_count'] += m['split_entry']
self.metrics['dotted_words'] += m['dotted_words']
self.metrics['dot_corrected'] += m['dot_corrected']
self.metrics['synonyms_added_count'] += m['synonyms_added']
elif result['type'] == 'malformed_line':
self.metrics['malformed_lines'] += 1
elif result['type'] == 'processing_error':
self.processing_errors.append(result)
except Exception as e:
# Handle potential errors from the worker process itself
original_task_line = futures[future][0] # Get the line from the original task
self.processing_errors.append({
'status': 'error',
'type': 'future_error',
'line': original_task_line[:100] + "...",
'error': str(e)
})
completed_count += 1
# Update progress bar and spinner, but throttle it to avoid excessive printing,
# modulo 97 (prime number) provides good distribution, seemingly unpredictable and periodic progress feedback to users.
if completed_count % 97 == 0 or completed_count == len(tasks):
percent = (completed_count / len(tasks)) * 100
print(f"\r--> Processing: {next(spinner)} {completed_count:,}/{len(tasks):,} ({percent:.1f}%)", end='', flush=True)
# Clear the progress line before printing the final summary
print("\r" + " " * 80, end='\r')
print("\n--> Sanity checks and deduplication in progress...")
redundancy_reaper.quarantine_trial(self.debug_words)
if self.dump_logs:
redundancy_reaper.write_logs()
u_hashes, d_hashes_count, mismatched, total_dropped = redundancy_reaper.get_stats()
self.metrics['unique_hashes'] = u_hashes
self.metrics['duplicated_hashes'] = d_hashes_count
self.metrics['mismatched'] = mismatched
self.metrics['total_dropped'] = total_dropped
for entry in redundancy_reaper.drain():
self._create_entry(entry['words'], entry['definition'])
print("\n--> Processing complete. Writing Stardict files...\n")
self._write_output()
self._print_summary()
except Exception as e:
sys.exit(f"An unexpected error occurred: {e}")
finally:
self._cleanup()
def _process_metadata_line(self, line: str):
"""Parses a metadata line and updates the glossary info."""
meta_parts = line.lstrip('#').strip().split('\t', 1)
if len(meta_parts) == 2:
key, value = meta_parts
if key.strip() == 'wordcount' and not self.debug_words:
try:
self.metrics['total_entries'] = int(value.strip())
except ValueError:
pass
print(f" - Found metadata: '{key.strip()}' = '{value.strip()}'")
self.glos.setInfo(key.strip(), value.strip())
def _write_output(self):
"""Writes the final Stardict files, including CSS and .syn handling."""
output_dir = Path(self.output_ifo_name)
output_dir.mkdir(parents=True, exist_ok=True)
# And back to Stradict we go!
# self.glos.setInfo("sourceLang", "English")
# self.glos.setInfo("targetLang", "English")
self.glos.setInfo("author", "Oxford University Press")
self.glos.setInfo("copyright", "© 1989 Oxford University Press")
self.glos.setInfo("website", "https://www.oed.com")
self.glos.setInfo("email", "")
if self.debug_words:
self.glos.setInfo("title", "debug OED")
self.glos.setInfo("description", "This dictionary was created using Commodore64user's oed_prettifier, if you encounter any formatting issues, do not hesitate to report them " \
"over at the GitHub repo. Happy reading!")
self.glos.setInfo("date", time.strftime("%Y-%m-%d"))
script_dir = Path(__file__).resolve().parent
css_path = script_dir / 'style.css'
if css_path.is_file():
try:
with open(css_path, 'rb') as f_css:
css_content = f_css.read()
# Create a data entry for the stylesheet
css_entry = self.glos.newDataEntry(f"../{self.output_ifo_name}.css", css_content)
self.glos.addEntry(css_entry)
print(f"--> Attached stylesheet: '{css_path}'")
except Exception as e:
print(f"--> Warning: Could not read or add CSS file '{css_path}'. Error: {e}")
else:
print("--> No 'style.css' file found in the source directory. Skipping.")
try:
output_base_path = output_dir / Path(self.output_ifo_name).name
if self.add_syns:
self.glos.write(str(output_base_path), formatName="Stardict", dictzip=True)
else: # don't create a syn file for the 1000-ish abbreviations we're adding.
self.glos.write(str(output_base_path), formatName="StardictMergeSyns", dictzip=True)
syn_dz_path = output_base_path.with_suffix('.syn.dz')
if syn_dz_path.is_file():
print(f"--> Decompressing '{syn_dz_path}'...")
max_retries, retry_delay = 5, 1
for attempt in range(max_retries):
try:
# Run the command, check=True will raise CalledProcessError on failure
subprocess.run(f"dictzip -d \"{syn_dz_path}\"", shell=True, check=True)
break
except subprocess.CalledProcessError as e:
if attempt < max_retries - 1:
print(f"\n--> Attempt {attempt + 1} failed, retrying in {retry_delay}s...")
time.sleep(retry_delay)
else:
print(f"\n--> All {max_retries} attempts to decompress failed.")
raise e
except Exception as e:
sys.exit(f"An error occurred during the write process: {e}")
def _cleanup(self):
"""Performs final cleanup of temporary files and directories."""
print("\nPerforming final cleanup...")
pycache_dir = "__pycache__"
if os.path.exists(pycache_dir) and os.path.isdir(pycache_dir):
try:
shutil.rmtree(pycache_dir)
print(f"Successfully removed {pycache_dir} directory.")
except OSError as e:
print(f"Error cleaning up {pycache_dir}: {e}")
else:
print("No __pycache__ directory found to clean.")
def _print_summary(self):
"""Prints the final metrics and summary of the conversion process."""
end_time = time.time()
duration = end_time - self.start_time
minutes, seconds = divmod(duration, 60)
print("\n----------------------------------------------------")
print(f"Process complete. New dictionary '{self.output_ifo_name}.ifo' created.")
print("----------------------------------------------------")
print("Metrics:")
print(f"- Entries read from source TSV: {self.metrics['source_entry_count']:,}")
print(f"- Entries with homographs split: {self.metrics['split_entry_count']:,}")
print(f"- Unique headwords processed: {len(self.unique_headwords):,}")
print(f"- Unique definition hashes: {self.metrics.get('unique_hashes', 0):,}")
print(f"- Malformed lines skipped: {self.metrics['malformed_lines']:,}")
if self.processing_errors:
print(f"- Unexpected processing errors: {len(self.processing_errors):,}")
if self.add_syns:
print(f"- Synonyms added from b-tags: {self.metrics['synonyms_added_count']:,}")
print(f"- Words found ending in full stops: {self.metrics['dotted_words']:,}")
print(f"- Full stops corrected: {self.metrics['dot_corrected']:,}")
print(f"- Hashes with duplicates: {self.metrics.get('duplicated_hashes', 0):,}")
print(f"- Mismatched entries dropped: {self.metrics.get('mismatched', 0):,}")
print(f"- Total entries dropped: {self.metrics.get('total_dropped', 0):,}")
print(f"- Total final entries written: {self.metrics['final_entry_count']:,}")
print(f"- Total execution time: {int(minutes):02d}:{int(seconds):02d}")
print("----------------------------------------------------\n")
if self.processing_errors:
print("\nEncountered processing errors on the following lines:")
for err in self.processing_errors[:20]: # Show first 20 errors
print(f" - Error: {err['error']}\n Line: {err['line'][:100]}...")
if len(self.processing_errors) > 20:
print(f" ... and {len(self.processing_errors) - 20} more.")
print()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Reads a TSV dictionary, preserves metadata, splits homographs, cleans HTML, and writes a Stardict file.",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("input_tsv", type=Path, help="Path to the source .tsv file.")
parser.add_argument("output_ifo", type=str, help="Base name for the new output Stardict files (e.g., 'OED_2ed').")
parser.add_argument("--add-syns", action="store_true", help="Scan HTML for b-tags and add their cleaned content as synonyms for the entry.")
parser.add_argument("--workers", type=int, default=None, help="Number of worker processes to use. Defaults to the number of system cores minus one.")
parser.add_argument("--debug", nargs='+', help="Run the script only for the specified headword(s) to speed up testing.")
parser.add_argument("-d", "--dump", action="store_true", help="When used with --debug, prints the final HTML of the processed entry to the console.")
parser.add_argument("--dump-logs", action="store_true", help="Writes duplication audit logs.")
args = parser.parse_args()
# Create the config object
config = ConverterConfig(
input_tsv=args.input_tsv,
output_ifo=args.output_ifo,
add_syns=args.add_syns,
workers=args.workers,
debug_words=args.debug,
dump_html=args.dump,
dump_logs=args.dump_logs
)
converter = DictionaryConverter(config)
converter.run()