Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/laminardb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
AsyncStreamSubscription,
AsyncSubscription,
CheckpointError,
CheckpointResult,
Connection,
ConnectionError,
ConnectorError,
Expand Down Expand Up @@ -155,6 +156,7 @@ def mv(conn: Connection, name: str, sql_def: str | None = None) -> MaterializedV
"execute",
"mv",
# Core classes
"CheckpointResult",
"Connection",
"ExecuteResult",
"LaminarConfig",
Expand Down
20 changes: 18 additions & 2 deletions python/laminardb/_laminardb.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ class LaminarConfig:

def __repr__(self) -> str: ...

# ---------------------------------------------------------------------------
# CheckpointResult
# ---------------------------------------------------------------------------

class CheckpointResult:
"""The result of a checkpoint operation."""

@property
def checkpoint_id(self) -> int:
"""The checkpoint ID assigned by the database."""
...

def __bool__(self) -> bool: ...
def __int__(self) -> int: ...
def __repr__(self) -> str: ...

# ---------------------------------------------------------------------------
# ExecuteResult
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -282,8 +298,8 @@ class Connection:
"""Start the streaming pipeline."""
...

def checkpoint(self) -> int | None:
"""Trigger a checkpoint. Returns the checkpoint ID or None."""
def checkpoint(self) -> CheckpointResult:
"""Trigger a checkpoint. Returns a CheckpointResult."""
...

def execute(self, sql: str) -> ExecuteResult:
Expand Down
7 changes: 6 additions & 1 deletion python/laminardb/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ class Watermark:

@dataclass(frozen=True)
class CheckpointStatus:
"""Status of the checkpoint system."""
"""Status of the checkpoint system.

.. deprecated::
Use :class:`laminardb.CheckpointResult` returned by
``Connection.checkpoint()`` instead.
"""

checkpoint_id: int | None
enabled: bool
Expand Down
39 changes: 39 additions & 0 deletions src/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Checkpoint result class.

use pyo3::prelude::*;

/// The result of a checkpoint operation.
#[pyclass(name = "CheckpointResult", frozen)]
pub struct PyCheckpointResult {
checkpoint_id: u64,
}

unsafe impl Send for PyCheckpointResult {}
unsafe impl Sync for PyCheckpointResult {}

#[pymethods]
impl PyCheckpointResult {
/// The checkpoint ID assigned by the database.
#[getter]
fn checkpoint_id(&self) -> u64 {
self.checkpoint_id
}

fn __bool__(&self) -> bool {
true
}

fn __int__(&self) -> u64 {
self.checkpoint_id
}

fn __repr__(&self) -> String {
format!("CheckpointResult(checkpoint_id={})", self.checkpoint_id)
}
}

impl PyCheckpointResult {
pub fn from_id(checkpoint_id: u64) -> Self {
Self { checkpoint_id }
}
}
12 changes: 7 additions & 5 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use pyo3_arrow::PySchema;

use crate::async_support::{AsyncSubscription, runtime};
use crate::catalog::{PyQueryInfo, PySinkInfo, PySourceInfo, PyStreamInfo};
use crate::checkpoint::PyCheckpointResult;
use crate::conversion;
use crate::error::{ConnectionError, IntoPyResult, QueryError};
use crate::execute::ExecuteResult;
Expand Down Expand Up @@ -336,15 +337,16 @@ impl PyConnection {
self.closed
}

/// Trigger a checkpoint. Returns the checkpoint ID on success, or None.
fn checkpoint(&self, py: Python<'_>) -> PyResult<Option<u64>> {
/// Trigger a checkpoint. Returns a CheckpointResult on success.
fn checkpoint(&self, py: Python<'_>) -> PyResult<PyCheckpointResult> {
self.check_closed()?;
let inner = self.inner.clone();
py.allow_threads(|| {
let id = py.allow_threads(|| {
let _rt = runtime().enter();
let conn = inner.lock();
conn.checkpoint().into_pyresult().map(Some)
})
conn.checkpoint().into_pyresult()
})?;
Ok(PyCheckpointResult::from_id(id))
}

/// Whether checkpointing is enabled for this connection.
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

mod async_support;
mod catalog;
mod checkpoint;
mod config;
mod connection;
mod conversion;
Expand Down Expand Up @@ -45,6 +46,9 @@ fn _laminardb(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<execute::ExecuteResult>()?;
m.add_class::<config::PyLaminarConfig>()?;

// Checkpoint
m.add_class::<checkpoint::PyCheckpointResult>()?;

// Catalog info classes
m.add_class::<catalog::PySourceInfo>()?;
m.add_class::<catalog::PySinkInfo>()?;
Expand Down
79 changes: 79 additions & 0 deletions tests/test_checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Tests for checkpoint APIs."""

import pytest

import laminardb


class TestCheckpointDisabled:
"""Tests for checkpoint operations when checkpointing is disabled (default)."""

def test_checkpoint_raises_when_disabled(self, db):
with pytest.raises(laminardb.LaminarError):
db.checkpoint()

def test_is_checkpoint_enabled_false(self, db):
assert db.is_checkpoint_enabled is False


class TestCheckpointEnabled:
"""Tests for checkpoint operations when checkpointing is enabled."""

@pytest.fixture
def ckpt_db(self, tmp_path):
storage = tmp_path / "storage"
storage.mkdir()
config = laminardb.LaminarConfig(
storage_dir=str(storage),
checkpoint_interval_ms=60_000,
)
conn = laminardb.open("ckpt_test", config=config)
conn.create_table(
"events",
{"ts": "int64", "value": "float64"},
)
conn.start()
yield conn
conn.close()

def test_is_checkpoint_enabled_true(self, ckpt_db):
assert ckpt_db.is_checkpoint_enabled is True

def test_checkpoint_returns_result(self, ckpt_db):
result = ckpt_db.checkpoint()
assert isinstance(result, laminardb.CheckpointResult)
assert result.checkpoint_id >= 0

def test_checkpoint_result_bool(self, ckpt_db):
result = ckpt_db.checkpoint()
assert bool(result) is True

def test_checkpoint_result_int(self, ckpt_db):
result = ckpt_db.checkpoint()
assert int(result) == result.checkpoint_id

def test_checkpoint_result_repr(self, ckpt_db):
result = ckpt_db.checkpoint()
assert "CheckpointResult" in repr(result)
assert str(result.checkpoint_id) in repr(result)

def test_multiple_checkpoints_increasing_ids(self, ckpt_db):
r1 = ckpt_db.checkpoint()
r2 = ckpt_db.checkpoint()
assert r2.checkpoint_id > r1.checkpoint_id


class TestCheckpointAfterClose:
"""Tests that checkpoint methods raise after the connection is closed."""

def test_checkpoint_after_close_raises(self, tmp_path):
conn = laminardb.open(str(tmp_path / "test.db"))
conn.close()
with pytest.raises(laminardb.ConnectionError, match="closed"):
conn.checkpoint()

def test_is_checkpoint_enabled_after_close_raises(self, tmp_path):
conn = laminardb.open(str(tmp_path / "test.db"))
conn.close()
with pytest.raises(laminardb.ConnectionError, match="closed"):
_ = conn.is_checkpoint_enabled