Skip to content

Commit 4287923

Browse files
committed
refactor: move modules
1 parent 268c410 commit 4287923

11 files changed

Lines changed: 134 additions & 143 deletions

File tree

src/duron/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from duron.context import get_context
2-
from duron.mark import durable
2+
from duron.fn import fn
33
from duron.task import task
44

5-
__all__ = ["durable", "task", "get_context"]
5+
__all__ = ["fn", "task", "get_context"]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
pass
1717

1818
if TYPE_CHECKING:
19-
from duron.log.entry import JSONValue
19+
from duron.log import JSONValue
2020

2121
__all__ = ["Codec", "DEFAULT_CODEC"]
2222

src/duron/mark.py renamed to src/duron/fn.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,40 @@
99
cast,
1010
)
1111

12-
from duron.log.codec import DEFAULT_CODEC
12+
from duron.codec import DEFAULT_CODEC
1313

1414
if TYPE_CHECKING:
1515
from collections.abc import Callable
1616

17-
from duron.log.codec import Codec
17+
from duron.codec import Codec
1818

1919

2020
_T_co = TypeVar("_T_co", covariant=True)
2121
_P = ParamSpec("_P")
2222

2323

24-
@dataclass
25-
class DurableFnParams:
26-
codec: Codec
24+
class Fn(Protocol[_P, _T_co]):
25+
__duron__: FnOptions
2726

27+
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T_co: ...
2828

29-
class DurableFn(Protocol[_P, _T_co]):
30-
__durable__: DurableFnParams
3129

32-
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T_co: ...
30+
@dataclass(slots=True)
31+
class FnOptions:
32+
codec: Codec
3333

3434

35-
def durable() -> Callable[[Callable[_P, _T_co]], DurableFn[_P, _T_co]]:
35+
def fn(
36+
*, codec: Codec = DEFAULT_CODEC
37+
) -> Callable[[Callable[_P, _T_co]], Fn[_P, _T_co]]:
3638
"""
3739
Mark a function as durable, meaning its execution can be recorded and
3840
replayed.
3941
"""
4042

41-
def decorate(fn: Callable[_P, _T_co]) -> DurableFn[_P, _T_co]:
42-
d = cast("DurableFn[_P, _T_co]", fn)
43-
d.__durable__ = DurableFnParams(codec=DEFAULT_CODEC)
43+
def decorate(fn: Callable[_P, _T_co]) -> Fn[_P, _T_co]:
44+
d = cast("Fn[_P, _T_co]", fn)
45+
d.__duron__ = FnOptions(codec=codec)
4446
return d
4547

4648
return decorate

src/duron/log/__init__.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Generic, Protocol
4+
5+
from typing_extensions import TypedDict, TypeVar
6+
7+
if TYPE_CHECKING:
8+
from collections.abc import AsyncGenerator
9+
from typing import Literal, TypeGuard
10+
11+
from typing_extensions import NotRequired
12+
13+
14+
__all__ = ["LogStorage", "Entry", "AnyEntry", "is_entry"]
15+
16+
_TOffset = TypeVar("_TOffset")
17+
_TLease = TypeVar("_TLease")
18+
JSONValue = dict[str, "JSONValue"] | list["JSONValue"] | str | int | float | bool | None
19+
20+
21+
class _BaseEntry(TypedDict):
22+
id: str
23+
# Unix timestamp in microseconds
24+
ts: int
25+
meta: NotRequired[dict[str, str]]
26+
27+
28+
class ErrorInfo(TypedDict):
29+
code: int
30+
message: str
31+
state: NotRequired[str] # opaque
32+
33+
34+
class PromiseCreateEntry(_BaseEntry):
35+
type: Literal["promise/create"]
36+
37+
38+
class PromiseCompleteEntry(_BaseEntry):
39+
type: Literal["promise/complete"]
40+
promise_id: str
41+
result: NotRequired[JSONValue]
42+
error: NotRequired[ErrorInfo]
43+
44+
45+
class StreamCreateEntry(_BaseEntry):
46+
type: Literal["stream/create"]
47+
48+
49+
class StreamEmitEntry(_BaseEntry):
50+
type: Literal["stream/emit"]
51+
stream_id: str
52+
value: NotRequired[JSONValue]
53+
state: NotRequired[str] # opaque
54+
55+
56+
class StreamCompleteEntry(_BaseEntry):
57+
type: Literal["stream/complete"]
58+
stream_id: str
59+
error: NotRequired[ErrorInfo]
60+
61+
62+
class AnyEntry(_BaseEntry):
63+
type: str
64+
65+
66+
Entry = (
67+
PromiseCreateEntry
68+
| PromiseCompleteEntry
69+
| StreamCreateEntry
70+
| StreamEmitEntry
71+
| StreamCompleteEntry
72+
)
73+
74+
75+
def is_entry(entry: Entry | AnyEntry) -> TypeGuard[Entry]:
76+
return entry["type"] in {
77+
"promise/create",
78+
"promise/complete",
79+
"stream/create",
80+
"stream/emit",
81+
"stream/complete",
82+
}
83+
84+
85+
class LogStorage(Protocol, Generic[_TOffset, _TLease]):
86+
def stream(
87+
self, start: _TOffset | None, live: bool
88+
) -> AsyncGenerator[tuple[_TOffset, AnyEntry], None]: ...
89+
90+
async def acquire_lease(self) -> _TLease: ...
91+
92+
async def release_lease(self, token: _TLease): ...
93+
94+
async def append(self, token: _TLease, entry: Entry): ...
95+
96+
async def flush(self, token: _TLease): ...

src/duron/log/entry.py

Lines changed: 0 additions & 69 deletions
This file was deleted.
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@
88

99
from typing_extensions import override
1010

11-
from .base import LogStorage
11+
from . import LogStorage
1212

1313
if TYPE_CHECKING:
1414
from collections.abc import AsyncGenerator
1515

16-
from duron.log.entry import AnyEntry
17-
18-
from ..entry import Entry
16+
from . import AnyEntry, Entry
1917

2018

2119
class FileLogStorage(LogStorage[int, str]):

src/duron/log/storage/__init__.py

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/duron/log/storage/base.py

Lines changed: 0 additions & 32 deletions
This file was deleted.

src/duron/task.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,23 @@
1919

2020
from duron.context import get_context
2121
from duron.event_loop import create_loop
22-
from duron.log.entry import is_entry
22+
from duron.log import is_entry
2323
from duron.ops import FnCall, TaskRun
2424

2525
if TYPE_CHECKING:
2626
from collections.abc import Callable, Coroutine
2727
from types import TracebackType
2828

29+
from duron.codec import Codec
2930
from duron.event_loop import WaitSet
30-
from duron.log.codec import Codec
31-
from duron.log.entry import (
31+
from duron.fn import Fn
32+
from duron.log import (
3233
Entry,
3334
ErrorInfo,
3435
JSONValue,
36+
LogStorage,
3537
PromiseCompleteEntry,
3638
)
37-
from duron.log.storage import LogStorage
38-
from duron.mark import DurableFn
3939
from duron.ops import Op
4040

4141
_TOffset = TypeVar("_TOffset")
@@ -48,7 +48,7 @@
4848

4949

5050
def task(
51-
task_co: DurableFn[_P, Coroutine[Any, Any, _T]],
51+
task_co: Fn[_P, Coroutine[Any, Any, _T]],
5252
log: LogStorage[_TOffset, _TLease],
5353
) -> TaskGuard[_P, _T]:
5454
return TaskGuard(Task(task_co, log))
@@ -74,7 +74,7 @@ async def __aexit__(
7474
class Task(Generic[_P, _T]):
7575
def __init__(
7676
self,
77-
task_fn: DurableFn[_P, Coroutine[Any, Any, _T]],
77+
task_fn: Fn[_P, Coroutine[Any, Any, _T]],
7878
log: LogStorage[_TOffset, _TLease],
7979
codec: Codec | None = None,
8080
) -> None:
@@ -83,7 +83,7 @@ def __init__(
8383
self._run: _TaskRun | None = None
8484

8585
async def start(self, *args: _P.args, **kwargs: _P.kwargs) -> None:
86-
codec = self._task_fn.__durable__.codec
86+
codec = self._task_fn.__duron__.codec
8787
init: TaskInitParams = {
8888
"version": _CURRENT_VERSION,
8989
"args": [codec.encode_json(arg) for arg in args],
@@ -104,7 +104,7 @@ def cb() -> TaskInitParams:
104104
self._run = _TaskRun(
105105
task,
106106
cast("LogStorage[object, object]", self._log),
107-
self._task_fn.__durable__.codec,
107+
self._task_fn.__duron__.codec,
108108
)
109109
await self._run.resume()
110110

@@ -121,14 +121,14 @@ class TaskInitParams(TypedDict):
121121

122122

123123
async def _task_prelude(
124-
task_fn: DurableFn[..., Coroutine[Any, Any, object]],
124+
task_fn: Fn[..., Coroutine[Any, Any, object]],
125125
init: Callable[[], TaskInitParams],
126126
) -> object:
127127
ctx = get_context()
128128
init_params = await ctx.run(init)
129129
if init_params["version"] != _CURRENT_VERSION:
130130
raise Exception("version mismatch")
131-
codec = task_fn.__durable__.codec
131+
codec = task_fn.__duron__.codec
132132
args = (codec.decode_json(arg) for arg in init_params["args"])
133133
kwargs = {k: codec.decode_json(v) for k, v in init_params["kwargs"].items()}
134134
return await task_fn(*args, **kwargs)

tests/test_log.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
import pytest
99
from typing_extensions import TypeVar
1010

11-
from duron.log.storage.simple import FileLogStorage, MemoryLogStorage
11+
from duron.log.storage import FileLogStorage, MemoryLogStorage
1212

1313
if TYPE_CHECKING:
14-
from duron.log.entry import Entry
15-
from duron.log.storage import LogStorage
14+
from duron.log import Entry, LogStorage
1615

1716
_TOffset = TypeVar("_TOffset")
1817
_TLease = TypeVar("_TLease")

0 commit comments

Comments
 (0)