Skip to content

Commit 892389a

Browse files
committed
feat: simplify types
1 parent 4d8277e commit 892389a

6 files changed

Lines changed: 45 additions & 44 deletions

File tree

src/duron/_core/invoke.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def __init__(
9090
self._watchers: list[
9191
tuple[
9292
dict[str, str],
93-
StreamObserver[object],
93+
StreamObserver,
9494
]
9595
] = []
9696

@@ -207,7 +207,7 @@ def watch_stream(
207207
observer: ObserverStream[_T_co, None] = ObserverStream()
208208
self._watchers.append((
209209
labels,
210-
cast("StreamObserver[object]", cast("StreamObserver[_T_co]", observer)),
210+
cast("ObserverStream[object, None]", observer),
211211
))
212212
return observer
213213

@@ -334,7 +334,7 @@ def __init__(
334334
log: LogStorage,
335335
codec: Codec,
336336
*,
337-
watchers: list[tuple[dict[str, str], StreamObserver[object]]] | None = None,
337+
watchers: list[tuple[dict[str, str], StreamObserver]] | None = None,
338338
) -> None:
339339
self._loop = create_loop(asyncio.get_running_loop())
340340
self._task = self._loop.create_task(task)
@@ -518,7 +518,7 @@ async def enqueue_log(
518518
offset = await self._log.append(self._lease, entry)
519519
await self.handle_message(offset, entry)
520520

521-
async def enqueue_op(self, id_: str, fut: OpFuture[object]) -> None:
521+
async def enqueue_op(self, id_: str, fut: OpFuture) -> None:
522522
op = cast("Op", fut.params)
523523
match op:
524524
case FnCall():
@@ -566,7 +566,7 @@ async def cb() -> None:
566566
entry["ts"] = self.now()
567567
await self.enqueue_log(entry)
568568

569-
def done(f: OpFuture[object]) -> None:
569+
def done(f: OpFuture) -> None:
570570
if f.cancelled():
571571
sid = f.id
572572
if self._task_manager.has_pending(sid):

src/duron/_core/ops.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22

33
import asyncio
44
from dataclasses import dataclass
5-
from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeVar
6-
from typing_extensions import dataclass_transform, overload
5+
from typing import TYPE_CHECKING, Final, cast
6+
from typing_extensions import (
7+
Any,
8+
Protocol,
9+
TypeVar,
10+
dataclass_transform,
11+
overload,
12+
)
713

814
if TYPE_CHECKING:
915
from collections.abc import Callable, Coroutine, Mapping
@@ -20,9 +26,7 @@ def frozen(_cls: type[_T]) -> type[_T]: ...
2026
else:
2127
frozen = dataclass(slots=True)
2228

23-
_In_contra = TypeVar("_In_contra", contravariant=True)
24-
25-
_EMPTY_DICT: dict[str, Any] = {}
29+
_EMPTY_DICT: Final = cast("dict[str, Any]", {})
2630

2731

2832
@frozen
@@ -36,9 +40,7 @@ def labels(self) -> Mapping[str, str]:
3640

3741
@property
3842
def name(self) -> str:
39-
if (name := self._name) is not None:
40-
return name
41-
return "<unnamed>"
43+
return name if (name := self._name) is not None else "<unnamed>"
4244

4345
@staticmethod
4446
def extend(
@@ -70,14 +72,14 @@ class FnCall:
7072
annotations: OpAnnotations
7173

7274

73-
class StreamObserver(Protocol, Generic[_In_contra]):
74-
def on_next(self, log_offset: int, value: _In_contra, /) -> None: ...
75+
class StreamObserver(Protocol):
76+
def on_next(self, log_offset: int, value: object, /) -> None: ...
7577
def on_close(self, log_offset: int, error: Exception | None, /) -> None: ...
7678

7779

7880
@frozen
7981
class StreamCreate:
80-
observer: StreamObserver[Any] | None
82+
observer: StreamObserver | None
8183
dtype: TypeHint[Any]
8284
annotations: OpAnnotations
8385

@@ -123,18 +125,20 @@ class ExternalPromiseComplete:
123125

124126

125127
@overload
126-
def create_op(loop: EventLoop, params: FnCall) -> OpFuture[object]: ...
128+
def create_op(loop: EventLoop, params: FnCall) -> asyncio.Future[object]: ...
127129
@overload
128-
def create_op(loop: EventLoop, params: StreamCreate) -> OpFuture[str]: ...
130+
def create_op(loop: EventLoop, params: StreamCreate) -> asyncio.Future[str]: ...
129131
@overload
130-
def create_op(loop: EventLoop, params: StreamEmit) -> OpFuture[None]: ...
132+
def create_op(loop: EventLoop, params: StreamEmit) -> asyncio.Future[None]: ...
131133
@overload
132-
def create_op(loop: EventLoop, params: StreamClose) -> OpFuture[None]: ...
134+
def create_op(loop: EventLoop, params: StreamClose) -> asyncio.Future[None]: ...
133135
@overload
134-
def create_op(loop: EventLoop, params: Barrier) -> OpFuture[int]: ...
136+
def create_op(loop: EventLoop, params: Barrier) -> asyncio.Future[int]: ...
135137
@overload
136-
def create_op(loop: EventLoop, params: ExternalPromiseCreate) -> OpFuture[object]: ...
138+
def create_op(loop: EventLoop, params: ExternalPromiseCreate) -> OpFuture: ...
137139
@overload
138-
def create_op(loop: EventLoop, params: ExternalPromiseComplete) -> OpFuture[None]: ...
139-
def create_op(loop: EventLoop, params: Op) -> OpFuture[Any]:
140+
def create_op(
141+
loop: EventLoop, params: ExternalPromiseComplete
142+
) -> asyncio.Future[None]: ...
143+
def create_op(loop: EventLoop, params: Op) -> asyncio.Future[Any]:
140144
return loop.create_op(params, external=asyncio.get_running_loop() is not loop)

src/duron/_core/signal.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import sys
55
from asyncio.exceptions import CancelledError
66
from collections import deque
7-
from typing import TYPE_CHECKING, Generic
7+
from typing import TYPE_CHECKING, Generic, cast
88
from typing_extensions import Any, TypeVar, final
99

1010
from duron._core.ops import (
@@ -131,7 +131,7 @@ async def create_signal(
131131
loop,
132132
StreamCreate(
133133
dtype=dtype,
134-
observer=s,
134+
observer=cast("Signal[object]", s),
135135
annotations=annotations,
136136
),
137137
)

src/duron/_core/stream.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from types import TracebackType
3535

3636
from duron._core.context import Context
37-
from duron._loop import EventLoop, OpFuture
37+
from duron._loop import EventLoop
3838
from duron.typing import TypeHint
3939

4040
_P = ParamSpec("_P")
@@ -192,7 +192,7 @@ async def create_stream(
192192
loop,
193193
StreamCreate(
194194
dtype=dtype,
195-
observer=s,
195+
observer=cast("ObserverStream[object, None]", s),
196196
annotations=annotations,
197197
),
198198
)
@@ -396,7 +396,7 @@ async def __aenter__(self) -> _StreamRun[_U, _T]:
396396
self._loop,
397397
StreamCreate(
398398
dtype=self._dtype,
399-
observer=self._stream,
399+
observer=cast("_StreamRun[object, _T]", self._stream),
400400
annotations=OpAnnotations.extend(
401401
None,
402402
name=self._stream.name(),
@@ -457,7 +457,7 @@ def start_worker(self, sink: StreamWriter[_U]) -> asyncio.Future[object]:
457457
annotations=OpAnnotations.extend(None, name=self.name()),
458458
),
459459
)
460-
self._task = cast("OpFuture[_T]", op)
460+
self._task = cast("asyncio.Future[_T]", op)
461461
return op
462462

463463
async def _worker(self, sink: StreamWriter[_U]) -> _T:

src/duron/_core/stream_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
@dataclass(slots=True)
2323
class _StreamInfo:
24-
observers: Sequence[StreamObserver[object]]
24+
observers: Sequence[StreamObserver]
2525
dtype: TypeHint[Any]
2626
labels: Mapping[str, str]
2727
op_span: OpSpan | None
@@ -33,7 +33,7 @@ class StreamManager:
3333

3434
def __init__(
3535
self,
36-
watchers: list[tuple[dict[str, str], StreamObserver[object]]] | None = None,
36+
watchers: list[tuple[dict[str, str], StreamObserver]] | None = None,
3737
) -> None:
3838
self._streams: dict[
3939
str,
@@ -44,7 +44,7 @@ def __init__(
4444
def create_stream(
4545
self,
4646
stream_id: str,
47-
observer: StreamObserver[object] | None,
47+
observer: StreamObserver | None,
4848
dtype: TypeHint[Any],
4949
labels: Mapping[str, str],
5050
op_span: OpSpan | None,

src/duron/_loop.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from dataclasses import dataclass
1010
from hashlib import blake2b
1111
from heapq import heappop, heappush
12-
from typing import TYPE_CHECKING, Generic
12+
from typing import TYPE_CHECKING
1313
from typing_extensions import (
1414
Any,
1515
TypeVar,
@@ -21,14 +21,13 @@
2121

2222
from duron.log import derive_id, random_id
2323

24-
_T = TypeVar("_T")
25-
2624
if TYPE_CHECKING:
2725
import sys
2826
from asyncio.futures import Future
2927
from collections.abc import Callable, Coroutine, Generator
3028
from contextvars import Context
3129

30+
_T = TypeVar("_T")
3231
_Ts = TypeVarTuple("_Ts")
3332

3433
if sys.version_info >= (3, 12):
@@ -37,13 +36,11 @@
3736
_TaskCompatibleCoro = Generator[Any, None, _T] | Coroutine[Any, Any, _T]
3837

3938

40-
logger = logging.getLogger(__name__)
41-
42-
39+
logger = logging.getLogger("duron.loop")
4340
_task_ctx: contextvars.ContextVar[_TaskCtx] = contextvars.ContextVar("duron.task")
4441

4542

46-
class OpFuture(asyncio.Future[_T], Generic[_T]):
43+
class OpFuture(asyncio.Future[object]):
4744
__slots__: tuple[str, ...] = ("id", "params")
4845

4946
def __init__(
@@ -59,7 +56,7 @@ def __init__(
5956

6057
@dataclass(slots=True)
6158
class WaitSet:
62-
ops: list[OpFuture[object]]
59+
ops: list[OpFuture]
6360
timer: int | None
6461
event: asyncio.Event
6562

@@ -104,7 +101,7 @@ def __init__(self, host: asyncio.AbstractEventLoop) -> None:
104101
self._exc_handler: (
105102
Callable[[asyncio.AbstractEventLoop, dict[str, object]], object] | None
106103
) = None
107-
self._ops: dict[str, OpFuture[object]] = {}
104+
self._ops: dict[str, OpFuture] = {}
108105
self._ctx: _TaskCtx = _TaskCtx(parent_id="")
109106
self._key: bytes = b""
110107
self._now_us: int = 0
@@ -261,13 +258,13 @@ def poll_completion(self, task: Future[_T]) -> WaitSet | None:
261258
if prev_task:
262259
tasks._enter_task(self._host, prev_task) # noqa: SLF001
263260

264-
def create_op(self, params: object, *, external: bool = False) -> OpFuture[object]:
261+
def create_op(self, params: object, *, external: bool = False) -> OpFuture:
265262
if external:
266263
id_ = random_id()
267264
self._event.set()
268265
else:
269266
id_ = self.generate_op_id()
270-
op_fut: OpFuture[object] = OpFuture(id_, params, self)
267+
op_fut = OpFuture(id_, params, self)
271268
self._ops[id_] = op_fut
272269
return op_fut
273270

0 commit comments

Comments
 (0)