From 131f83fe9af36dec98fdbb09114d023a0a25b528 Mon Sep 17 00:00:00 2001 From: Yadu Nand B Date: Wed, 3 Sep 2025 19:52:10 +0000 Subject: [PATCH 1/2] Minor type fixes --- deduplication/__main__.py | 3 +++ deduplication/args.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/deduplication/__main__.py b/deduplication/__main__.py index ee61c36..7c2bc2b 100644 --- a/deduplication/__main__.py +++ b/deduplication/__main__.py @@ -1,8 +1,11 @@ from deduplication.workflows import * from deduplication.args import parse_args + args = parse_args() +args.sim_threshold = float(args.sim_threshold) + if args.mode == "bloom": if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" diff --git a/deduplication/args.py b/deduplication/args.py index ce7e792..98f4b7a 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -50,11 +50,13 @@ def parse_args(): parser.add_argument( "--sim-threshold", help="Jaccard Similarity threshold for deduplication, should be in [0, 1]. Default is 0.8", + type=float, default=0.8, ) parser.add_argument( "--num-perm", help="Number of hash functions for MinHashing. Default is 128", + type=int, default=128, ) parser.add_argument( From 0f263c18754ab4d8c22ec9fa9471d1311ac00982 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 22 Sep 2025 19:16:20 -0500 Subject: [PATCH 2/2] Add `skip_insertion` kwarg to all LSHBloom based workflows and components * When `skip_insertion` is enabled, unique entries found are not inserted into the index. This enables deduplicating against an index without modifying it. --- deduplication/__main__.py | 6 +++--- deduplication/args.py | 5 +++++ deduplication/lshbloom.py | 13 +++++++------ deduplication/workflows.py | 10 +++++++--- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/deduplication/__main__.py b/deduplication/__main__.py index 7c2bc2b..91bf67e 100644 --- a/deduplication/__main__.py +++ b/deduplication/__main__.py @@ -9,12 +9,12 @@ if args.mode == "bloom": if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) + dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion) elif args.multi: - dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) + dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion) else: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) + dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion) else: if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" diff --git a/deduplication/args.py b/deduplication/args.py index 98f4b7a..9de9429 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -99,5 +99,10 @@ def parse_args(): help="If set, will skip the minhashing step of each workflow (useful if minhashes have been precomputed at minhash_dir)", action="store_true" ) + parser.add_argument( + "--skip-insertion", + help="If set, will skip inserting unique documents into the index (works only with LSHBloom)", + action="store_true" + ) return parser.parse_args() diff --git a/deduplication/lshbloom.py b/deduplication/lshbloom.py index 7a0b5d6..78e8739 100644 --- a/deduplication/lshbloom.py +++ b/deduplication/lshbloom.py @@ -31,7 +31,7 @@ def __init__(self, minhash_dir: str, lsh_params: Dict): self.minhash_dir = minhash_dir self.lsh = MinHashLSHBloom(**lsh_params) - def deduplicate_corpus(self) -> List[Tuple[str]]: + def deduplicate_corpus(self, skip_insertion: bool = False) -> List[Tuple[str]]: """ Deduplicates documents in the given corpus and adds them to the LSH index if appropriate. Documents without existing duplicates will be stored in the LSH index for future deduplication. @@ -45,12 +45,12 @@ def deduplicate_corpus(self) -> List[Tuple[str]]: if f.endswith(".pkl") ] for minhashfile in minhash_files: - dups = self.deduplicate_minhash_file(minhashfile) + dups = self.deduplicate_minhash_file(minhashfile, skip_insertion=skip_insertion) duplicate_list.extend(dups) return duplicate_list - def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]: + def deduplicate_and_insert(self, params: Tuple, skip_insertion: bool = False) -> List[Tuple[str]]: """ Deduplicates a MinHash signature corresponding to a document using the provided LSH index. If the document is not duplicated in the LSH index, it is added to the index. @@ -67,12 +67,13 @@ def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]: # insert if not duplicated in index if not result: - self.lsh.insert(m_query) + if not skip_insertion: + self.lsh.insert(m_query) return None return [(key,)] - def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]: + def deduplicate_minhash_file(self, minhashfile: str, skip_insertion: bool = False) -> List[Tuple[str]]: """ Deduplicate documents in the given minhash file and adds them to the LSH index if appropriate. Documents without existing duplicates will be stored in the LSH index for future deduplication. @@ -91,7 +92,7 @@ def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]: # can't multiprocess here as insertion requires C++ dependencies that are not compatible with pickle with tqdm(total=len(minhash_list), desc=fname) as pbar: for i in range(len(minhash_list)): - result = self.deduplicate_and_insert(minhash_list[i]) + result = self.deduplicate_and_insert(minhash_list[i], skip_insertion=skip_insertion) if result: duplicate_list.extend(result) pbar.update() diff --git a/deduplication/workflows.py b/deduplication/workflows.py index f0e4be9..ca512cb 100644 --- a/deduplication/workflows.py +++ b/deduplication/workflows.py @@ -121,6 +121,7 @@ def dedup_single_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + skip_insertion: bool = False, ): if clear: clear_dir(save_dir) @@ -138,7 +139,7 @@ def dedup_single_bloom( m.process() index = LSHBloom(minhash_dir, lsh_params) - duplicates = index.deduplicate_corpus() + duplicates = index.deduplicate_corpus(skip_insertion=skip_insertion) write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"]) @@ -155,6 +156,7 @@ def dedup_multi_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + skip_insertion: bool = False, ): assert len(input_dirs) == len(minhash_dirs) == len(corpus_names), \ f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}" @@ -174,7 +176,8 @@ def dedup_multi_bloom( n_hash_funcs, save_dir, compute_minhashes, - clear=False + clear=False, + skip_insertion=skip_insertion ) def dedup_single_file_bloom( @@ -189,6 +192,7 @@ def dedup_single_file_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + skip_insertion: bool = False, ): if clear: clear_dir(save_dir) @@ -208,5 +212,5 @@ def dedup_single_file_bloom( fname = input_file.split("/")[-1] minhash_file = f"{minhash_dir}/{fname[:-6]}.pkl" index = LSHBloom(minhash_dir, lsh_params) - duplicates = index.deduplicate_minhash_file(minhash_file) + duplicates = index.deduplicate_minhash_file(minhash_file, skip_insertion=skip_insertion) write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"])