From 3223f5ece9cd675b31d4b1310bd86704bf0c75de Mon Sep 17 00:00:00 2001 From: sujitn Date: Sun, 22 Feb 2026 21:22:58 +0000 Subject: [PATCH] feat: add CheckpointResult type for checkpoint() return value - Add PyCheckpointResult frozen pyclass with checkpoint_id getter, __bool__, __int__, and __repr__ support - Change Connection.checkpoint() to return CheckpointResult instead of int | None - Add CheckpointResult to type stubs, exports, and __all__ - Add deprecation note to CheckpointStatus in types.py - Add comprehensive test suite (10 tests) covering disabled/enabled checkpointing, result properties, and after-close behavior --- python/laminardb/__init__.py | 2 + python/laminardb/_laminardb.pyi | 20 ++++++++- python/laminardb/types.py | 7 ++- src/checkpoint.rs | 39 ++++++++++++++++ src/connection.rs | 12 ++--- src/lib.rs | 4 ++ tests/test_checkpoint.py | 79 +++++++++++++++++++++++++++++++++ 7 files changed, 155 insertions(+), 8 deletions(-) create mode 100644 src/checkpoint.rs create mode 100644 tests/test_checkpoint.py diff --git a/python/laminardb/__init__.py b/python/laminardb/__init__.py index 78f32b3..f08d7b2 100644 --- a/python/laminardb/__init__.py +++ b/python/laminardb/__init__.py @@ -40,6 +40,7 @@ AsyncStreamSubscription, AsyncSubscription, CheckpointError, + CheckpointResult, Connection, ConnectionError, ConnectorError, @@ -155,6 +156,7 @@ def mv(conn: Connection, name: str, sql_def: str | None = None) -> MaterializedV "execute", "mv", # Core classes + "CheckpointResult", "Connection", "ExecuteResult", "LaminarConfig", diff --git a/python/laminardb/_laminardb.pyi b/python/laminardb/_laminardb.pyi index 778b31e..56892d9 100644 --- a/python/laminardb/_laminardb.pyi +++ b/python/laminardb/_laminardb.pyi @@ -117,6 +117,22 @@ class LaminarConfig: def __repr__(self) -> str: ... +# --------------------------------------------------------------------------- +# CheckpointResult +# --------------------------------------------------------------------------- + +class CheckpointResult: + """The result of a checkpoint operation.""" + + @property + def checkpoint_id(self) -> int: + """The checkpoint ID assigned by the database.""" + ... + + def __bool__(self) -> bool: ... + def __int__(self) -> int: ... + def __repr__(self) -> str: ... + # --------------------------------------------------------------------------- # ExecuteResult # --------------------------------------------------------------------------- @@ -282,8 +298,8 @@ class Connection: """Start the streaming pipeline.""" ... - def checkpoint(self) -> int | None: - """Trigger a checkpoint. Returns the checkpoint ID or None.""" + def checkpoint(self) -> CheckpointResult: + """Trigger a checkpoint. Returns a CheckpointResult.""" ... def execute(self, sql: str) -> ExecuteResult: diff --git a/python/laminardb/types.py b/python/laminardb/types.py index 21cc7f4..81b560c 100644 --- a/python/laminardb/types.py +++ b/python/laminardb/types.py @@ -87,7 +87,12 @@ class Watermark: @dataclass(frozen=True) class CheckpointStatus: - """Status of the checkpoint system.""" + """Status of the checkpoint system. + + .. deprecated:: + Use :class:`laminardb.CheckpointResult` returned by + ``Connection.checkpoint()`` instead. + """ checkpoint_id: int | None enabled: bool diff --git a/src/checkpoint.rs b/src/checkpoint.rs new file mode 100644 index 0000000..6000cd5 --- /dev/null +++ b/src/checkpoint.rs @@ -0,0 +1,39 @@ +//! Checkpoint result class. + +use pyo3::prelude::*; + +/// The result of a checkpoint operation. +#[pyclass(name = "CheckpointResult", frozen)] +pub struct PyCheckpointResult { + checkpoint_id: u64, +} + +unsafe impl Send for PyCheckpointResult {} +unsafe impl Sync for PyCheckpointResult {} + +#[pymethods] +impl PyCheckpointResult { + /// The checkpoint ID assigned by the database. + #[getter] + fn checkpoint_id(&self) -> u64 { + self.checkpoint_id + } + + fn __bool__(&self) -> bool { + true + } + + fn __int__(&self) -> u64 { + self.checkpoint_id + } + + fn __repr__(&self) -> String { + format!("CheckpointResult(checkpoint_id={})", self.checkpoint_id) + } +} + +impl PyCheckpointResult { + pub fn from_id(checkpoint_id: u64) -> Self { + Self { checkpoint_id } + } +} diff --git a/src/connection.rs b/src/connection.rs index 551eacd..26b15fd 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -13,6 +13,7 @@ use pyo3_arrow::PySchema; use crate::async_support::{AsyncSubscription, runtime}; use crate::catalog::{PyQueryInfo, PySinkInfo, PySourceInfo, PyStreamInfo}; +use crate::checkpoint::PyCheckpointResult; use crate::conversion; use crate::error::{ConnectionError, IntoPyResult, QueryError}; use crate::execute::ExecuteResult; @@ -336,15 +337,16 @@ impl PyConnection { self.closed } - /// Trigger a checkpoint. Returns the checkpoint ID on success, or None. - fn checkpoint(&self, py: Python<'_>) -> PyResult> { + /// Trigger a checkpoint. Returns a CheckpointResult on success. + fn checkpoint(&self, py: Python<'_>) -> PyResult { self.check_closed()?; let inner = self.inner.clone(); - py.allow_threads(|| { + let id = py.allow_threads(|| { let _rt = runtime().enter(); let conn = inner.lock(); - conn.checkpoint().into_pyresult().map(Some) - }) + conn.checkpoint().into_pyresult() + })?; + Ok(PyCheckpointResult::from_id(id)) } /// Whether checkpointing is enabled for this connection. diff --git a/src/lib.rs b/src/lib.rs index 7d5f5f6..8aa664c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod async_support; mod catalog; +mod checkpoint; mod config; mod connection; mod conversion; @@ -45,6 +46,9 @@ fn _laminardb(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; + // Checkpoint + m.add_class::()?; + // Catalog info classes m.add_class::()?; m.add_class::()?; diff --git a/tests/test_checkpoint.py b/tests/test_checkpoint.py new file mode 100644 index 0000000..f7f2790 --- /dev/null +++ b/tests/test_checkpoint.py @@ -0,0 +1,79 @@ +"""Tests for checkpoint APIs.""" + +import pytest + +import laminardb + + +class TestCheckpointDisabled: + """Tests for checkpoint operations when checkpointing is disabled (default).""" + + def test_checkpoint_raises_when_disabled(self, db): + with pytest.raises(laminardb.LaminarError): + db.checkpoint() + + def test_is_checkpoint_enabled_false(self, db): + assert db.is_checkpoint_enabled is False + + +class TestCheckpointEnabled: + """Tests for checkpoint operations when checkpointing is enabled.""" + + @pytest.fixture + def ckpt_db(self, tmp_path): + storage = tmp_path / "storage" + storage.mkdir() + config = laminardb.LaminarConfig( + storage_dir=str(storage), + checkpoint_interval_ms=60_000, + ) + conn = laminardb.open("ckpt_test", config=config) + conn.create_table( + "events", + {"ts": "int64", "value": "float64"}, + ) + conn.start() + yield conn + conn.close() + + def test_is_checkpoint_enabled_true(self, ckpt_db): + assert ckpt_db.is_checkpoint_enabled is True + + def test_checkpoint_returns_result(self, ckpt_db): + result = ckpt_db.checkpoint() + assert isinstance(result, laminardb.CheckpointResult) + assert result.checkpoint_id >= 0 + + def test_checkpoint_result_bool(self, ckpt_db): + result = ckpt_db.checkpoint() + assert bool(result) is True + + def test_checkpoint_result_int(self, ckpt_db): + result = ckpt_db.checkpoint() + assert int(result) == result.checkpoint_id + + def test_checkpoint_result_repr(self, ckpt_db): + result = ckpt_db.checkpoint() + assert "CheckpointResult" in repr(result) + assert str(result.checkpoint_id) in repr(result) + + def test_multiple_checkpoints_increasing_ids(self, ckpt_db): + r1 = ckpt_db.checkpoint() + r2 = ckpt_db.checkpoint() + assert r2.checkpoint_id > r1.checkpoint_id + + +class TestCheckpointAfterClose: + """Tests that checkpoint methods raise after the connection is closed.""" + + def test_checkpoint_after_close_raises(self, tmp_path): + conn = laminardb.open(str(tmp_path / "test.db")) + conn.close() + with pytest.raises(laminardb.ConnectionError, match="closed"): + conn.checkpoint() + + def test_is_checkpoint_enabled_after_close_raises(self, tmp_path): + conn = laminardb.open(str(tmp_path / "test.db")) + conn.close() + with pytest.raises(laminardb.ConnectionError, match="closed"): + _ = conn.is_checkpoint_enabled