Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog

## 0.4.2 — 1.5.2026.

### Performance
- `delete_by_tags` in SQLite batches all matching rows into a single DELETE with one
orphan-detection pass instead of row-by-row `_delete_commit_body` calls.
- `delete_by_tags` in Redis uses tag-set indexes (`cashet:tag:{key}`,
`cashet:tag:{key}:{value}`) with SINTER instead of a full `zrevrange(all)` scan.
- `find_by_fingerprint` in Redis pushes TTL filtering server-side via
`ZREVRANGEBYSCORE` using `expires_at` timestamp as the sorted-set score.

### Added
- `cashet invalidate -t key=value` / `-t key` CLI command.

### Fixed
- Deterministic TTL and GC tests using `freezegun` instead of `time.sleep`.

## 0.4.1 — 1.5.2026.

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cashet"
version = "0.4.1"
version = "0.4.2"
description = "A Python memoization cache with Redis, async support, DAG pipelines, and an HTTP server"
readme = "README.md"
license = "MIT"
Expand Down
17 changes: 17 additions & 0 deletions src/cashet/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,23 @@ def rm_cmd(hash: str) -> None:
raise SystemExit(1)


@main.command("invalidate")
@click.option("--tag", "-t", "tags", multiple=True, required=True, help="Tag key=value or key")
def invalidate_cmd(tags: tuple[str, ...]) -> None:
"""Delete all commits matching tag criteria"""
client = _client()
parsed = _parse_tags(tags)
if parsed is None:
console.print("[red]At least one --tag is required.[/red]")
raise SystemExit(1)
try:
deleted = client.invalidate(parsed)
except ValueError as e:
console.print(f"[red]{e}[/red]")
raise SystemExit(1) from None
console.print(f"[green]Invalidated {deleted} commit(s).[/green]")


def _parse_size(size_str: str) -> int:
size_str = size_str.strip().upper()
units = {"B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4}
Expand Down
46 changes: 34 additions & 12 deletions src/cashet/redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def _status_key(status: str) -> str:
return f"cashet:index:status:{status}"


def _tag_key(key: str) -> str:
return f"cashet:tag:{key}"


def _tag_value_key(key: str, value: str) -> str:
return f"cashet:tag:{key}:{value}"


def _access_key() -> str:
return "cashet:index:last_accessed"

Expand Down Expand Up @@ -234,13 +242,17 @@ def _index_commit_commands(pipe: Any, commit: Commit) -> None:
pipe.set(_commit_key(commit.hash), _encode_commit(commit))
ts = commit.created_at.timestamp()
pipe.zadd("cashet:index:all", {commit.hash: ts})
pipe.zadd(_fp_key(commit.fingerprint), {commit.hash: ts})
expires_ts = commit.expires_at.timestamp() if commit.expires_at else float("inf")
pipe.zadd(_fp_key(commit.fingerprint), {commit.hash: expires_ts})
pipe.zadd(_func_key(commit.task_def.func_name), {commit.hash: ts})
now_ts = datetime.now(UTC).timestamp()
pipe.zadd(_access_key(), {commit.hash: now_ts})
for status in TaskStatus:
pipe.srem(_status_key(status.value), commit.hash)
pipe.sadd(_status_key(commit.status.value), commit.hash)
for key, val in commit.tags.items():
pipe.sadd(_tag_key(key), commit.hash)
pipe.sadd(_tag_value_key(key, val), commit.hash)


def _remove_commit_index_commands(pipe: Any, commit: Commit, resolved_hash: str) -> None:
Expand All @@ -251,6 +263,9 @@ def _remove_commit_index_commands(pipe: Any, commit: Commit, resolved_hash: str)
pipe.zrem(_access_key(), resolved_hash)
for status in TaskStatus:
pipe.srem(_status_key(status.value), resolved_hash)
for key, val in commit.tags.items():
pipe.srem(_tag_key(key), resolved_hash)
pipe.srem(_tag_value_key(key, val), resolved_hash)


def _commit_hash_from_key(raw: Any) -> str:
Expand Down Expand Up @@ -367,14 +382,14 @@ async def get_commit(self, hash: str) -> Commit | None:
return _decode_commit(data)

async def find_by_fingerprint(self, fingerprint: str) -> Commit | None:
hashes = await self._redis.zrevrange(_fp_key(fingerprint), 0, -1)
now = datetime.now(UTC)
now_ts = datetime.now(UTC).timestamp()
hashes = await self._redis.zrevrangebyscore(
_fp_key(fingerprint), max="+inf", min=f"({now_ts}"
)
for h in hashes:
h_str = h.decode() if isinstance(h, bytes) else h
commit = await self.get_commit(h_str)
if commit is not None and commit.status in (TaskStatus.COMPLETED, TaskStatus.CACHED):
if commit.expires_at is not None and commit.expires_at <= now:
continue
await self._touch_commit(h_str)
return commit
return None
Expand Down Expand Up @@ -562,16 +577,17 @@ async def delete_commit(self, hash: str) -> bool:
return await self._delete_commit(hash)

async def delete_by_tags(self, tags: dict[str, str | None]) -> int:
hashes = await self._redis.zrevrange("cashet:index:all", 0, -1)
set_keys: list[str] = []
for key, val in tags.items():
set_keys.append(_tag_key(key) if val is None else _tag_value_key(key, val))
if len(set_keys) == 1:
hashes = await self._redis.smembers(set_keys[0])
else:
hashes = await self._redis.sinter(set_keys)
deleted = 0
for h in hashes:
h_str = h.decode() if isinstance(h, bytes) else h
commit = await self.get_commit(h_str)
if (
commit is not None
and _matches_tags(commit, tags)
and await self._delete_commit_obj(commit)
):
if await self._delete_commit_obj_by_hash(h_str):
deleted += 1
return deleted

Expand All @@ -581,6 +597,12 @@ async def _delete_commit(self, hash: str) -> bool:
return False
return await self._delete_commit_obj(commit)

async def _delete_commit_obj_by_hash(self, hash: str) -> bool:
commit = await self.get_commit(hash)
if commit is None:
return False
return await self._delete_commit_obj(commit)

async def _delete_commit_obj(self, commit: Commit) -> bool:
resolved_hash = commit.hash
pipe = self._redis.pipeline()
Expand Down
35 changes: 23 additions & 12 deletions src/cashet/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ def delete_commit(self, hash: str) -> bool:

def delete_by_tags(self, tags: dict[str, str | None]) -> int:
conn = self._connect(immediate=True)
query = "SELECT hash FROM commits WHERE 1=1"
query = "SELECT hash, output_hash, input_refs FROM commits WHERE 1=1"
params: list[Any] = []
for key, val in tags.items():
if val is None:
Expand All @@ -676,23 +676,34 @@ def delete_by_tags(self, tags: dict[str, str | None]) -> int:
params.append(f"$.{key}")
params.append(val)
rows = conn.execute(query, params).fetchall()
deleted = 0
all_orphans: list[str] = []
if not rows:
conn.execute("ROLLBACK")
return 0
hashes = [r[0] for r in rows]
candidates: set[str] = set()
for r in rows:
if r[1]:
candidates.add(r[1])
if r[2]:
for h in json.loads(r[2]):
candidates.add(h)
try:
for row in rows:
success, orphans = self._delete_commit_body(conn, row[0])
if success:
deleted += 1
all_orphans.extend(orphans)
placeholders = ", ".join("?" for _ in hashes)
conn.execute(
f"UPDATE commits SET parent_hash = NULL WHERE parent_hash IN ({placeholders})",
hashes,
)
conn.execute(
f"DELETE FROM commits WHERE hash IN ({placeholders})", hashes
)
deleted = len(hashes)
all_orphans = self._find_orphan_objects(conn, candidates) if candidates else []
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
if all_orphans:
logger.info(
"orphan objects cleaned count=%d",
len(all_orphans),
)
logger.info("orphan objects cleaned count=%d", len(all_orphans))
self._delete_orphan_objects(conn, all_orphans)
return deleted

Expand Down
43 changes: 43 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,46 @@ def add(x: int, y: int) -> int:

client2 = Client(store_dir=store_dir2)
assert client2.get(ref.commit_hash) == 3


class TestInvalidate:
def test_invalidate_deletes_matching(self, cli_runner: CliRunner, store_dir: Path) -> None:
client = Client(store_dir=store_dir)

def val() -> int:
return 1

client.submit(val, _tags={"env": "test"})
result = _invoke(cli_runner, ["invalidate", "-t", "env=test"], store_dir)
assert result.exit_code == 0
assert "Invalidated 1 commit(s)" in result.output

def test_invalidate_bare_key(self, cli_runner: CliRunner, store_dir: Path) -> None:
client = Client(store_dir=store_dir)

def val_a() -> int:
return 1

def val_b() -> int:
return 2

client.submit(val_a, _tags={"env": "prod"})
client.submit(val_b, _tags={"env": "staging"})
result = _invoke(cli_runner, ["invalidate", "-t", "env"], store_dir)
assert result.exit_code == 0
assert "Invalidated 2 commit(s)" in result.output

def test_invalidate_no_match(self, cli_runner: CliRunner, store_dir: Path) -> None:
client = Client(store_dir=store_dir)

def val() -> int:
return 1

client.submit(val, _tags={"env": "prod"})
result = _invoke(cli_runner, ["invalidate", "-t", "env=staging"], store_dir)
assert result.exit_code == 0
assert "Invalidated 0 commit(s)" in result.output

def test_invalidate_requires_tag(self, cli_runner: CliRunner, store_dir: Path) -> None:
result = _invoke(cli_runner, ["invalidate"], store_dir)
assert result.exit_code != 0
72 changes: 72 additions & 0 deletions tests/test_redis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,75 @@ def run(x: int) -> None:

assert call_count == 1
assert len({r.hash for r in refs}) == 1

def test_delete_by_tags_exact_match(self, redis_store: RedisStore) -> None:
task_def = TaskDef(
func_hash="a" * 64,
func_name="f",
func_source="def f(): pass",
args_hash="b" * 64,
args_snapshot=b"",
tags={"env": "test"},
)
commit = Commit(
hash="d" * 64, task_def=task_def, tags={"env": "test"}, status=TaskStatus.COMPLETED
)
redis_store.put_commit(commit)
assert redis_store.get_commit("d" * 64) is not None

deleted = redis_store.delete_by_tags({"env": "test"})
assert deleted == 1
assert redis_store.get_commit("d" * 64) is None

def test_delete_by_tags_bare_key(self, redis_store: RedisStore) -> None:
for i, env in enumerate(["prod", "staging"]):
task_def = TaskDef(
func_hash=f"{i:064d}",
func_name="f",
func_source="def f(): pass",
args_hash="b" * 64,
args_snapshot=b"",
tags={"env": env},
)
commit = Commit(
hash=f"{i:064d}", task_def=task_def, tags={"env": env},
status=TaskStatus.COMPLETED,
)
redis_store.put_commit(commit)

deleted = redis_store.delete_by_tags({"env": None})
assert deleted == 2

def test_delete_by_tags_multi_condition(self, redis_store: RedisStore) -> None:
task_def = TaskDef(
func_hash="a" * 64,
func_name="f",
func_source="def f(): pass",
args_hash="b" * 64,
args_snapshot=b"",
tags={"env": "prod", "model": "v2"},
)
commit = Commit(
hash="e" * 64, task_def=task_def, tags={"env": "prod", "model": "v2"},
status=TaskStatus.COMPLETED,
)
redis_store.put_commit(commit)

tag_unmatched = TaskDef(
func_hash="c" * 64,
func_name="f",
func_source="def f(): pass",
args_hash="d" * 64,
args_snapshot=b"",
tags={"env": "prod", "model": "v1"},
)
commit2 = Commit(
hash="f" * 64, task_def=tag_unmatched, tags={"env": "prod", "model": "v1"},
status=TaskStatus.COMPLETED,
)
redis_store.put_commit(commit2)

deleted = redis_store.delete_by_tags({"env": "prod", "model": "v2"})
assert deleted == 1
assert redis_store.get_commit("e" * 64) is None
assert redis_store.get_commit("f" * 64) is not None
Loading
Loading