Skip to content

Commit 12530e6

Browse files
committed
feat: log flush interface
1 parent 42805e9 commit 12530e6

3 files changed

Lines changed: 15 additions & 3 deletions

File tree

src/duron/log/storage/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,5 @@ async def acquire_lease(self) -> _TLease: ...
2828
async def release_lease(self, token: _TLease): ...
2929

3030
async def append(self, token: _TLease, entry: Entry) -> _TOffset: ...
31+
32+
async def flush(self, token: _TLease, offset: _TOffset): ...

src/duron/log/storage/simple.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ async def append(self, token: str, entry: Entry) -> int:
104104

105105
return entry_offset
106106

107+
@override
108+
async def flush(self, token: str, offset: int):
109+
pass
110+
107111

108112
class MemoryLogStorage(LogStorage[int, str]):
109113
_entries: list[AnyEntry]
@@ -176,6 +180,10 @@ async def append(self, token: str, entry: Entry) -> int:
176180

177181
return index
178182

183+
@override
184+
async def flush(self, token: str, offset: int):
185+
pass
186+
179187
async def entries(self) -> list[AnyEntry]:
180188
async with self._lock:
181189
return self._entries.copy()

src/duron/task.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,14 @@ async def handle_message(self, e: Entry) -> None:
256256
else:
257257
pass
258258

259-
async def enqueue_log(self, entry: Entry) -> None:
259+
async def enqueue_log(self, entry: Entry, flush: bool = False) -> None:
260260
if not self._running:
261261
self._pending_msg.append(entry)
262262
else:
263-
_ = await self._log.append(self._running, entry)
263+
offset = await self._log.append(self._running, entry)
264264
await self.handle_message(entry)
265+
if flush:
266+
await self._log.flush(self._running, offset)
265267

266268
async def enqueue_op(self, id: bytes, op: Op | object) -> None:
267269
match op:
@@ -313,7 +315,7 @@ async def cb() -> None:
313315
except BaseException as e:
314316
entry["error"] = _encode_error(e, self._codec)
315317

316-
await self.enqueue_log(entry)
318+
await self.enqueue_log(entry, True)
317319

318320
_ = self._loop.create_task(cb())
319321

0 commit comments

Comments
 (0)