Skip to content
This repository was archived by the owner on Jan 22, 2026. It is now read-only.

Commit 2f030ed

Browse files
tdsmithjklukas
authored andcommitted
Use threads instead of processes in Dataset.summaries
Dataset.summaries uses a concurrent.futures.ProcessPoolExecutor to fetch multiple files from S3 at once. ProcessPoolExecutor uses multiprocessing underneath, which defaults to using fork() on Unix. Using fork() is dangerous and prone to deadlocks: https://codewithoutrules.com/2018/09/04/python-multiprocessing/ This is a possible source of observed deadlocks during calls to Dataset.records. Using threads should not be a performance regression since the operation we're parallelizing over is network-bound, not CPU-bound, so there should not be much contention for the GIL.
1 parent fb68074 commit 2f030ed

1 file changed

Lines changed: 2 additions & 2 deletions

File tree

moztelemetry/dataset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def __init__(self,
137137
datasets
138138
:param prefix: a prefix to the
139139
:param clauses: mapping of fields -> callables to refine the dataset
140-
:param max_concurrency: number of processes to spawn when collecting S3 summaries,
140+
:param max_concurrency: number of threads to spawn when collecting S3 summaries,
141141
defaults to 1.5 * cpu_count
142142
"""
143143
self.bucket = bucket
@@ -283,7 +283,7 @@ def summaries(self, sc, limit=None):
283283
# on the prefix directory)
284284
clauses['prefix'] = lambda x: True
285285

286-
with futures.ProcessPoolExecutor(self.max_concurrency) as executor:
286+
with futures.ThreadPoolExecutor(self.max_concurrency) as executor:
287287
scanned = self._scan(schema, [self.prefix], clauses, executor)
288288
keys = sc.parallelize(scanned).flatMap(self.store.list_keys)
289289
return keys.take(limit) if limit else keys.collect()

0 commit comments

Comments
 (0)