Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/cashet/async_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ async def _async_store_lock(


def _is_stale_claim(commit: Commit, ttl: timedelta) -> bool:
return datetime.now(UTC) - commit.claimed_at > ttl
# Use created_at so that long-pending tasks (created but never successfully
# claimed) are still eligible for reclaim, rather than being stuck forever.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Critical The change from commit.claimed_at to commit.created_at correctly ensures that stale-claim detection uses the task's immutable creation time. However, consider adding a safeguard for edge cases where created_at might be in the future (e.g., clock skew). While not present in current code, defensive checking could prevent unexpected behavior. This is a minor concern; the logic is otherwise sound.

return datetime.now(UTC) - commit.created_at > ttl


class AsyncLocalExecutor:
Expand Down
3 changes: 3 additions & 0 deletions src/cashet/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class Commit:
output_ref: ObjectRef | None = None
parent_hash: str | None = None
status: TaskStatus = TaskStatus.PENDING
# created_at is the canonical anchor for task lifetime; it never changes
# and is used for stale-claim detection so that pending tasks cannot hide
# behind a recent heartbeat.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Low The comment on created_at clarifies its role as the canonical anchor. Consider also adding a note that this field is set once at commit creation and should never be updated, to prevent accidental misuse.

created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
claimed_at: datetime = field(default_factory=lambda: datetime.now(UTC))
error: str | None = None
Expand Down
2 changes: 1 addition & 1 deletion src/cashet/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def find_running_by_fingerprint(self, fingerprint: str) -> Commit | None:
row = conn.execute(
"""SELECT * FROM commits
WHERE fingerprint = ? AND status = 'running'
ORDER BY claimed_at DESC LIMIT 1""",
ORDER BY created_at DESC LIMIT 1""",
(fingerprint,),
).fetchone()
if row is None:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_async_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import UTC, datetime, timedelta
from pathlib import Path

import pytest
Expand Down Expand Up @@ -281,6 +282,27 @@ def non_cached() -> int:
assert await ref1.load() == 1
assert await ref2.load() == 2

async def test_old_created_at_causes_reclaim_despite_fresh_claim(
self, async_client: AsyncClient
) -> None:
import cashet.dag as dag
import cashet.hashing as hashing
from cashet.models import TaskStatus

def work() -> int:
return 42

task_def = hashing.build_task_def(work, (), {})
input_refs = dag.resolve_input_refs((), {})
commit = dag.build_commit(task_def, input_refs)
commit.status = TaskStatus.RUNNING
commit.created_at = datetime.now(UTC) - timedelta(seconds=400)
commit.claimed_at = datetime.now(UTC) - timedelta(seconds=5)
await async_client.store.put_commit(commit)

ref = await async_client.submit(work)
assert await ref.load() == 42

async def test_task_decorator_callable_returns_async_result_ref(
self, async_client: AsyncClient
) -> None:
Expand Down
21 changes: 21 additions & 0 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,27 @@ def slow() -> int:
with pytest.raises(TaskError, match="TimeoutError"):
client.submit(slow, _timeout=0.01)

def test_old_created_at_trumps_recent_claimed_at(self, store_dir: Path) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Medium The test test_old_created_at_trumps_recent_claimed_at correctly validates the new logic. It uses a synthetic commit with created_at 400s ago and claimed_at 5s ago, then expects the task to be reclaimed and return 42. This covers the critical scenario described in the PR. The async counterpart (test_async_client.py) mirrors it well.

import cashet.dag as dag
import cashet.hashing as hashing
from cashet.models import TaskStatus

client = Client(store_dir=store_dir)

def work() -> int:
return 42

task_def = hashing.build_task_def(work, (), {})
input_refs = dag.resolve_input_refs((), {})
commit = dag.build_commit(task_def, input_refs)
commit.status = TaskStatus.RUNNING
commit.created_at = datetime.now(UTC) - timedelta(seconds=400)
commit.claimed_at = datetime.now(UTC) - timedelta(seconds=5)
client.store.put_commit(commit)

ref = client.submit(work)
assert ref.load() == 42

def test_running_claim_lookup_is_not_limited_to_1000_rows(self, store_dir: Path) -> None:
import cashet.dag as dag
import cashet.hashing as hashing
Expand Down
Loading