Skip to content

Commit d91b5f5

Browse files
committed
feat: initial log
1 parent 9ea72aa commit d91b5f5

7 files changed

Lines changed: 203 additions & 129 deletions

File tree

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ description = ""
55
readme = "README.md"
66
requires-python = ">=3.10"
77
dependencies = [
8-
"pydantic>=2.11.7",
98
"typing-extensions>=4.15.0",
109
]
1110

src/duron/log/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .entry import Entry
2+
from .storage import BaseLogStorage, FileLogStorage, Lease, Offset
3+
4+
__all__ = ["Entry", "BaseLogStorage", "Offset", "Lease", "FileLogStorage"]

src/duron/log/entry.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from typing import Literal
2+
3+
from typing_extensions import TypedDict
4+
5+
6+
class BaseEntry(TypedDict):
7+
id: str
8+
timestamp: int
9+
metadata: dict[str, str]
10+
11+
12+
class ErrorInfo(TypedDict):
13+
code: int
14+
message: str
15+
data: object
16+
17+
18+
class PromiseCreateEntry(BaseEntry):
19+
type: Literal["promise/create"]
20+
21+
22+
class PromiseCompleteEntry(BaseEntry):
23+
type: Literal["promise/complete"]
24+
promise_id: str
25+
result: object
26+
error: ErrorInfo | None
27+
28+
29+
class StreamCreateEntry(BaseEntry):
30+
type: Literal["stream/create"]
31+
32+
33+
class StreamEmitEntry(BaseEntry):
34+
type: Literal["stream/emit"]
35+
stream_id: str
36+
value: object
37+
state: object
38+
39+
40+
class StreamCloseEntry(BaseEntry):
41+
type: Literal["stream/close"]
42+
stream_id: str
43+
error: ErrorInfo | None
44+
45+
46+
class UnknownEntry(BaseEntry):
47+
type: str
48+
49+
50+
Entry = (
51+
PromiseCreateEntry
52+
| PromiseCompleteEntry
53+
| StreamCreateEntry
54+
| StreamEmitEntry
55+
| StreamCloseEntry
56+
| UnknownEntry
57+
)

src/duron/log/storage/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .base import BaseLogStorage, Lease, Offset
2+
from .file import FileLogStorage
3+
4+
__all__ = ["BaseLogStorage", "Offset", "Lease", "FileLogStorage"]

src/duron/log/storage/base.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from typing import TYPE_CHECKING, NewType
5+
6+
if TYPE_CHECKING:
7+
from collections.abc import AsyncGenerator
8+
9+
from ..entry import Entry
10+
11+
Offset = NewType("Offset", bytes)
12+
Lease = NewType("Lease", bytes)
13+
14+
15+
class BaseLogStorage(ABC):
16+
@abstractmethod
17+
def stream(
18+
self, start: Offset | None, live: bool
19+
) -> AsyncGenerator[tuple[Offset, Entry], None]: ...
20+
21+
@abstractmethod
22+
async def acquire_lease(self) -> Lease: ...
23+
24+
@abstractmethod
25+
async def release_lease(self, token: Lease): ...
26+
27+
@abstractmethod
28+
async def append(self, token: Lease, entry: Entry) -> Offset: ...

src/duron/log/storage/file.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import json
5+
import threading
6+
import uuid
7+
from pathlib import Path
8+
from typing import TYPE_CHECKING, cast
9+
10+
from typing_extensions import override
11+
12+
from .base import BaseLogStorage, Lease, Offset
13+
14+
if TYPE_CHECKING:
15+
from collections.abc import AsyncGenerator
16+
17+
from ..entry import Entry
18+
19+
20+
class FileLogStorage(BaseLogStorage):
21+
log_file: Path
22+
_leases: Lease | None
23+
_lock: threading.Lock
24+
25+
def __init__(self, log_file: str | Path):
26+
self.log_file = Path(log_file)
27+
self.log_file.parent.mkdir(parents=True, exist_ok=True)
28+
self._leases = None
29+
self._lock = threading.Lock()
30+
31+
@override
32+
async def stream(
33+
self, start: Offset | None, live: bool
34+
) -> AsyncGenerator[tuple[Offset, Entry], None]:
35+
if not self.log_file.exists():
36+
return
37+
38+
start_offset: int = int.from_bytes(start, "little") if start is not None else 0
39+
current_line = 0
40+
41+
with open(self.log_file) as f:
42+
# Skip to start offset
43+
for _ in range(start_offset):
44+
_ = f.readline()
45+
current_line += 1
46+
47+
# Read existing lines from start offset
48+
while True:
49+
line = f.readline()
50+
if line:
51+
try:
52+
entry = json.loads(line.strip())
53+
if isinstance(entry, dict):
54+
yield (
55+
Offset(current_line.to_bytes(8, "little")),
56+
cast("Entry", cast("object", entry)),
57+
)
58+
except json.JSONDecodeError:
59+
pass
60+
current_line += 1
61+
else:
62+
# Reached end of file
63+
break
64+
65+
# If live mode, continue tailing
66+
if live:
67+
while True:
68+
line = f.readline()
69+
if line:
70+
try:
71+
entry = json.loads(line.strip())
72+
if isinstance(entry, dict):
73+
yield (
74+
Offset(current_line.to_bytes(8, "little")),
75+
cast("Entry", cast("object", entry)),
76+
)
77+
except json.JSONDecodeError:
78+
pass
79+
current_line += 1
80+
else:
81+
await asyncio.sleep(0.1)
82+
83+
@override
84+
async def acquire_lease(self) -> Lease:
85+
lease_id = Lease(uuid.uuid4().bytes)
86+
with self._lock:
87+
self._leases = lease_id
88+
return lease_id
89+
90+
@override
91+
async def release_lease(self, token: Lease) -> None:
92+
with self._lock:
93+
if token == self._leases:
94+
self._leases = None
95+
96+
@override
97+
async def append(self, token: Lease, entry: Entry) -> Offset:
98+
with self._lock:
99+
if token != self._leases:
100+
raise ValueError("Invalid lease token")
101+
102+
with open(self.log_file, "a") as f:
103+
json.dump(entry, f, separators=(",", ":"))
104+
_ = f.write("\n")
105+
106+
with open(self.log_file) as f:
107+
lines = f.readlines()
108+
109+
return Offset((len(lines) - 1).to_bytes(8, "little"))

0 commit comments

Comments
 (0)