forked from datafold/data-diff
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy paththread_utils.py
More file actions
132 lines (110 loc) · 4.45 KB
/
thread_utils.py
File metadata and controls
132 lines (110 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import itertools
import threading
from collections import deque
from collections.abc import Callable, Iterable, Iterator
from concurrent.futures import Future, ThreadPoolExecutor
from queue import PriorityQueue
from time import sleep
from typing import Any
import attrs
_SENTINEL = object()
def _chain_future(source: Future, dest: Future) -> None:
"""Propagate the outcome (result, exception, or cancellation) from source to dest."""
if dest.cancelled():
return
try:
if source.cancelled():
dest.cancel()
elif exc := source.exception():
dest.set_exception(exc)
else:
dest.set_result(source.result())
except Exception as exc:
try:
dest.set_exception(exc)
except Exception:
pass
class PriorityThreadPoolExecutor:
"""Thread pool that executes tasks in priority order.
Uses a dispatcher thread to pull work from a PriorityQueue and
submit it to a standard ThreadPoolExecutor. No CPython internals.
"""
def __init__(self, max_workers: int | None = None) -> None:
self._inner = ThreadPoolExecutor(max_workers=max_workers)
self._queue: PriorityQueue = PriorityQueue()
self._counter = itertools.count().__next__
self._shutdown = False
self._dispatcher = threading.Thread(target=self._dispatch, daemon=True)
self._dispatcher.start()
def _dispatch(self) -> None:
while True:
proxy = None
try:
_priority, _count, item = self._queue.get()
if item is _SENTINEL:
break
fn, args, kwargs, proxy = item
inner_future = self._inner.submit(fn, *args, **kwargs)
inner_future.add_done_callback(lambda f, p=proxy: _chain_future(f, p))
except Exception as exc:
if proxy is not None and not proxy.done():
try:
proxy.set_exception(exc)
except Exception:
pass
def submit(self, fn, /, *args, priority: int = 0, **kwargs) -> Future:
if self._shutdown:
raise RuntimeError("cannot submit after shutdown")
proxy = Future()
self._queue.put((-priority, self._counter(), (fn, args, kwargs, proxy)))
return proxy
def shutdown(self, wait: bool = True) -> None:
self._shutdown = True
self._queue.put((float("inf"), self._counter(), _SENTINEL))
self._dispatcher.join(timeout=30)
if self._dispatcher.is_alive():
raise RuntimeError("PriorityThreadPoolExecutor dispatcher did not shut down within 30s")
self._inner.shutdown(wait=wait)
@attrs.define(frozen=False, init=False)
class ThreadedYielder(Iterable):
"""Yields results from multiple threads into a single iterator, ordered by priority.
To add a source iterator, call ``submit()`` with a function that returns an iterator.
Priority for the iterator can be provided via the keyword argument 'priority'. (higher runs first)
"""
_pool: PriorityThreadPoolExecutor
_futures: deque
_yield: deque = attrs.field(alias="_yield") # Python keyword!
_exception: None = None
yield_list: bool = False
def __init__(self, max_workers: int | None = None, yield_list: bool = False) -> None:
super().__init__()
self._pool = PriorityThreadPoolExecutor(max_workers)
self._futures = deque()
self._yield = deque()
self._exception = None
self.yield_list = yield_list
def _worker(self, fn, *args, **kwargs) -> None:
try:
res = fn(*args, **kwargs)
if res is not None:
if self.yield_list:
self._yield.append(res)
else:
self._yield += res
except Exception as e:
self._exception = e
def submit(self, fn: Callable, *args, priority: int = 0, **kwargs) -> None:
self._futures.append(self._pool.submit(self._worker, fn, *args, priority=priority, **kwargs))
def __iter__(self) -> Iterator[Any]:
while True:
if self._exception:
raise self._exception
while self._yield:
yield self._yield.popleft()
if not self._futures:
# No more tasks
return
if self._futures[0].done():
self._futures.popleft()
else:
sleep(0.001)