Skip to content

Commit ce2b550

Browse files
committed
feat: use protocols and cleanup task loop
1 parent 0b46ca8 commit ce2b550

10 files changed

Lines changed: 185 additions & 159 deletions

File tree

src/duron/context.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from __future__ import annotations
22

33
import asyncio
4-
from typing import TYPE_CHECKING, Any, TypeVar, cast
4+
from typing import TYPE_CHECKING, TypeVar, cast
5+
6+
from typing_extensions import overload
57

68
from duron.event_loop import EventLoop
79
from duron.ops import FnCall
810

911
if TYPE_CHECKING:
10-
from collections.abc import Callable, Coroutine
12+
from collections.abc import Awaitable, Callable
1113

1214
_T = TypeVar("_T")
1315

@@ -19,7 +21,11 @@ def __init__(self):
1921
self._loop: EventLoop = loop
2022
pass
2123

22-
async def run(self, fn: Callable[..., Coroutine[Any, Any, _T] | _T]) -> _T:
24+
@overload
25+
async def run(self, fn: Callable[[], Awaitable[_T]]) -> _T: ...
26+
@overload
27+
async def run(self, fn: Callable[[], _T]) -> _T: ...
28+
async def run(self, fn: Callable[[], Awaitable[_T] | _T]) -> _T:
2329
return cast("_T", await self._loop.create_op(FnCall(callable=fn)))
2430

2531

src/duron/log/codec.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pickle
66
from typing import TYPE_CHECKING, cast, final
77

8-
from typing_extensions import Protocol
8+
from typing_extensions import Protocol, override
99

1010
BaseModel: type[_pydantic.BaseModel] | None = None
1111
try:
@@ -18,10 +18,10 @@
1818
if TYPE_CHECKING:
1919
from duron.log.entry import JSONValue
2020

21-
__all__ = ["BaseCodec", "default_codec"]
21+
__all__ = ["Codec", "DEFAULT_CODEC"]
2222

2323

24-
class BaseCodec(Protocol):
24+
class Codec(Protocol):
2525
def encode_json(self, result: object) -> JSONValue: ...
2626
def decode_json(self, encoded: JSONValue) -> object: ...
2727

@@ -30,7 +30,7 @@ def decode_state(self, state: str) -> object: ...
3030

3131

3232
@final
33-
class _DefaultCodec:
33+
class _DefaultCodec(Codec):
3434
def __init__(self) -> None:
3535
self._type_cache: dict[str, type] = {}
3636

@@ -49,6 +49,7 @@ def _lookup_model(self, qual: str) -> type:
4949
else:
5050
raise TypeError(f"Imported object is not a type: {obj!r}")
5151

52+
@override
5253
def encode_json(self, result: object) -> JSONValue:
5354
if BaseModel and isinstance(result, BaseModel):
5455
cls = result.__class__
@@ -57,6 +58,7 @@ def encode_json(self, result: object) -> JSONValue:
5758
return model
5859
return cast("JSONValue", result)
5960

61+
@override
6062
def decode_json(self, encoded: JSONValue) -> object:
6163
if isinstance(encoded, dict) and "_duron.pydantic" in encoded:
6264
model = self._lookup_model(cast("str", encoded["_duron.pydantic"]))
@@ -67,11 +69,13 @@ def decode_json(self, encoded: JSONValue) -> object:
6769
)
6870
return encoded
6971

72+
@override
7073
def encode_state(self, obj: object) -> str:
7174
return base64.b64encode(pickle.dumps(obj, protocol=5)).decode("ascii")
7275

76+
@override
7377
def decode_state(self, state: str) -> object:
7478
return pickle.loads(base64.b64decode(state.encode()))
7579

7680

77-
default_codec = _DefaultCodec()
81+
DEFAULT_CODEC = _DefaultCodec()

src/duron/log/entry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class StreamCloseEntry(BaseEntry):
4646
error: NotRequired[ErrorInfo]
4747

4848

49-
class UnknownEntry(BaseEntry):
49+
class AnyEntry(BaseEntry):
5050
type: str
5151

5252

@@ -59,7 +59,7 @@ class UnknownEntry(BaseEntry):
5959
)
6060

6161

62-
def is_entry(entry: Entry | UnknownEntry) -> TypeGuard[Entry]:
62+
def is_entry(entry: Entry | AnyEntry) -> TypeGuard[Entry]:
6363
return entry["type"] in {
6464
"promise/create",
6565
"promise/complete",

src/duron/log/storage/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from .base import BaseLogStorage, Lease, Offset
1+
from .base import LogStorage
22

3-
__all__ = ["BaseLogStorage", "Offset", "Lease"]
3+
__all__ = ["LogStorage"]

src/duron/log/storage/base.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
11
from __future__ import annotations
22

3-
from abc import ABC, abstractmethod
4-
from typing import TYPE_CHECKING, NewType
3+
from typing import TYPE_CHECKING, Generic, Protocol
4+
5+
from typing_extensions import TypeVar
56

67
if TYPE_CHECKING:
78
from collections.abc import AsyncGenerator
89

9-
from duron.log.entry import UnknownEntry
10+
from duron.log.entry import AnyEntry
1011

1112
from ..entry import Entry
1213

13-
Offset = NewType("Offset", bytes)
14-
Lease = NewType("Lease", bytes)
14+
15+
__all__ = ["LogStorage"]
16+
17+
_TOffset = TypeVar("_TOffset")
18+
_TLease = TypeVar("_TLease")
1519

1620

17-
class BaseLogStorage(ABC):
18-
@abstractmethod
21+
class LogStorage(Protocol, Generic[_TOffset, _TLease]):
1922
def stream(
20-
self, start: Offset | None, live: bool
21-
) -> AsyncGenerator[tuple[Offset, Entry | UnknownEntry], None]: ...
23+
self, start: _TOffset | None, live: bool
24+
) -> AsyncGenerator[tuple[_TOffset, AnyEntry], None]: ...
2225

23-
@abstractmethod
24-
async def acquire_lease(self) -> Lease: ...
26+
async def acquire_lease(self) -> _TLease: ...
2527

26-
@abstractmethod
27-
async def release_lease(self, token: Lease): ...
28+
async def release_lease(self, token: _TLease): ...
2829

29-
@abstractmethod
30-
async def append(self, token: Lease, entry: Entry) -> Offset: ...
30+
async def append(self, token: _TLease, entry: Entry) -> _TOffset: ...

src/duron/log/storage/simple.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,19 @@
88

99
from typing_extensions import override
1010

11-
from .base import BaseLogStorage, Lease, Offset
11+
from .base import LogStorage
1212

1313
if TYPE_CHECKING:
1414
from collections.abc import AsyncGenerator
1515

16-
from duron.log.entry import UnknownEntry
16+
from duron.log.entry import AnyEntry
1717

1818
from ..entry import Entry
1919

2020

21-
class FileLogStorage(BaseLogStorage):
21+
class FileLogStorage(LogStorage[int, str]):
2222
_log_file: Path
23-
_leases: Lease | None
23+
_leases: str | None
2424
_lock: asyncio.Lock
2525

2626
def __init__(self, log_file: str | Path):
@@ -31,12 +31,12 @@ def __init__(self, log_file: str | Path):
3131

3232
@override
3333
async def stream(
34-
self, start: Offset | None, live: bool
35-
) -> AsyncGenerator[tuple[Offset, Entry | UnknownEntry], None]:
34+
self, start: int | None, live: bool
35+
) -> AsyncGenerator[tuple[int, AnyEntry], None]:
3636
if not self._log_file.exists():
3737
return
3838

39-
start_offset: int = int.from_bytes(start, "little") if start is not None else 0
39+
start_offset: int = start if start is not None else 0
4040

4141
with open(self._log_file, "rb") as f:
4242
# Seek to start offset
@@ -51,8 +51,8 @@ async def stream(
5151
entry = json.loads(line.decode().strip())
5252
if isinstance(entry, dict):
5353
yield (
54-
Offset(line_start_offset.to_bytes(8, "little")),
55-
cast("UnknownEntry", cast("object", entry)),
54+
line_start_offset,
55+
cast("AnyEntry", cast("object", entry)),
5656
)
5757
except (json.JSONDecodeError, UnicodeDecodeError):
5858
pass
@@ -70,29 +70,29 @@ async def stream(
7070
entry = json.loads(line.decode().strip())
7171
if isinstance(entry, dict):
7272
yield (
73-
Offset(line_start_offset.to_bytes(8, "little")),
74-
cast("UnknownEntry", cast("object", entry)),
73+
line_start_offset,
74+
cast("AnyEntry", cast("object", entry)),
7575
)
7676
except (json.JSONDecodeError, UnicodeDecodeError):
7777
pass
7878
else:
7979
await asyncio.sleep(0.1)
8080

8181
@override
82-
async def acquire_lease(self) -> Lease:
83-
lease_id = Lease(uuid.uuid4().bytes)
82+
async def acquire_lease(self) -> str:
83+
lease_id = str(uuid.uuid4())
8484
async with self._lock:
8585
self._leases = lease_id
8686
return lease_id
8787

8888
@override
89-
async def release_lease(self, token: Lease) -> None:
89+
async def release_lease(self, token: str) -> None:
9090
async with self._lock:
9191
if token == self._leases:
9292
self._leases = None
9393

9494
@override
95-
async def append(self, token: Lease, entry: Entry) -> Offset:
95+
async def append(self, token: str, entry: Entry) -> int:
9696
async with self._lock:
9797
if token != self._leases:
9898
raise ValueError("Invalid lease token")
@@ -102,34 +102,34 @@ async def append(self, token: Lease, entry: Entry) -> Offset:
102102
json.dump(entry, f, separators=(",", ":"))
103103
_ = f.write("\n")
104104

105-
return Offset(entry_offset.to_bytes(8, "little"))
105+
return entry_offset
106106

107107

108-
class MemoryLogStorage(BaseLogStorage):
109-
_entries: list[Entry]
110-
_leases: Lease | None
108+
class MemoryLogStorage(LogStorage[int, str]):
109+
_entries: list[AnyEntry]
110+
_leases: str | None
111111
_lock: asyncio.Lock
112112
_condition: asyncio.Condition
113113

114-
def __init__(self, entries: list[Entry] | None = None):
114+
def __init__(self, entries: list[AnyEntry] | None = None):
115115
self._entries = entries or []
116116
self._leases = None
117117
self._lock = asyncio.Lock()
118118
self._condition = asyncio.Condition(self._lock)
119119

120120
@override
121121
async def stream(
122-
self, start: Offset | None, live: bool
123-
) -> AsyncGenerator[tuple[Offset, Entry | UnknownEntry], None]:
124-
start_index: int = int.from_bytes(start, "little") if start is not None else 0
122+
self, start: int | None, live: bool
123+
) -> AsyncGenerator[tuple[int, AnyEntry], None]:
124+
start_index: int = start if start is not None else 0
125125

126126
# Yield existing entries
127127
async with self._lock:
128128
entries_snapshot = self._entries.copy()
129129

130130
for index in range(start_index, len(entries_snapshot)):
131131
yield (
132-
Offset(index.to_bytes(8, "little")),
132+
index,
133133
entries_snapshot[index],
134134
)
135135

@@ -146,36 +146,36 @@ async def stream(
146146

147147
for index in range(last_seen_index + 1, current_length):
148148
yield (
149-
Offset(index.to_bytes(8, "little")),
149+
index,
150150
self._entries[index],
151151
)
152152
last_seen_index = index
153153

154154
@override
155-
async def acquire_lease(self) -> Lease:
156-
lease_id = Lease(uuid.uuid4().bytes)
155+
async def acquire_lease(self) -> str:
156+
lease_id = str(uuid.uuid4())
157157
async with self._lock:
158158
self._leases = lease_id
159159
return lease_id
160160

161161
@override
162-
async def release_lease(self, token: Lease) -> None:
162+
async def release_lease(self, token: str) -> None:
163163
async with self._lock:
164164
if token == self._leases:
165165
self._leases = None
166166

167167
@override
168-
async def append(self, token: Lease, entry: Entry) -> Offset:
168+
async def append(self, token: str, entry: Entry) -> int:
169169
async with self._condition:
170170
if token != self._leases:
171171
raise ValueError("Invalid lease token")
172172

173173
index = len(self._entries)
174-
self._entries.append(entry)
174+
self._entries.append(cast("AnyEntry", cast("object", entry)))
175175
self._condition.notify_all()
176176

177-
return Offset(index.to_bytes(8, "little"))
177+
return index
178178

179-
async def entries(self) -> list[Entry]:
179+
async def entries(self) -> list[AnyEntry]:
180180
async with self._lock:
181181
return self._entries.copy()

src/duron/ops.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44
from typing import TYPE_CHECKING, Any
55

66
if TYPE_CHECKING:
7-
from collections.abc import Callable, Coroutine
7+
from collections.abc import Awaitable, Callable, Coroutine
88

99

1010
@dataclass(slots=True)
1111
class FnCall:
12-
callable: Callable[..., Coroutine[Any, Any, object] | object]
12+
callable: Callable[[], Awaitable[object] | object]
1313

1414

15-
Op = FnCall
15+
@dataclass(slots=True)
16+
class TaskRun:
17+
task: Coroutine[Any, Any, object]
18+
19+
20+
Op = FnCall | TaskRun

0 commit comments

Comments
 (0)