From 7cab78810ee85be01aeb937a6f7ab45b7dbd0927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:27:48 +0200 Subject: [PATCH 1/7] perf: batch delete_by_tags in SQLite into single DELETE pass --- src/cashet/store.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/cashet/store.py b/src/cashet/store.py index a06a5db..a80c60a 100644 --- a/src/cashet/store.py +++ b/src/cashet/store.py @@ -665,7 +665,7 @@ def delete_commit(self, hash: str) -> bool: def delete_by_tags(self, tags: dict[str, str | None]) -> int: conn = self._connect(immediate=True) - query = "SELECT hash FROM commits WHERE 1=1" + query = "SELECT hash, output_hash, input_refs FROM commits WHERE 1=1" params: list[Any] = [] for key, val in tags.items(): if val is None: @@ -676,23 +676,34 @@ def delete_by_tags(self, tags: dict[str, str | None]) -> int: params.append(f"$.{key}") params.append(val) rows = conn.execute(query, params).fetchall() - deleted = 0 - all_orphans: list[str] = [] + if not rows: + conn.execute("ROLLBACK") + return 0 + hashes = [r[0] for r in rows] + candidates: set[str] = set() + for r in rows: + if r[1]: + candidates.add(r[1]) + if r[2]: + for h in json.loads(r[2]): + candidates.add(h) try: - for row in rows: - success, orphans = self._delete_commit_body(conn, row[0]) - if success: - deleted += 1 - all_orphans.extend(orphans) + placeholders = ", ".join("?" for _ in hashes) + conn.execute( + f"UPDATE commits SET parent_hash = NULL WHERE parent_hash IN ({placeholders})", + hashes, + ) + conn.execute( + f"DELETE FROM commits WHERE hash IN ({placeholders})", hashes + ) + deleted = len(hashes) + all_orphans = self._find_orphan_objects(conn, candidates) if candidates else [] conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise if all_orphans: - logger.info( - "orphan objects cleaned count=%d", - len(all_orphans), - ) + logger.info("orphan objects cleaned count=%d", len(all_orphans)) self._delete_orphan_objects(conn, all_orphans) return deleted From 529c0b206fb664e2db583c55ad56d0ec0bb271a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:27:53 +0200 Subject: [PATCH 2/7] perf: tag-set indexes for Redis delete_by_tags, TTL pushdown in find_by_fingerprint --- src/cashet/redis_store.py | 46 +++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/cashet/redis_store.py b/src/cashet/redis_store.py index d07c523..bf0ee4d 100644 --- a/src/cashet/redis_store.py +++ b/src/cashet/redis_store.py @@ -62,6 +62,14 @@ def _status_key(status: str) -> str: return f"cashet:index:status:{status}" +def _tag_key(key: str) -> str: + return f"cashet:tag:{key}" + + +def _tag_value_key(key: str, value: str) -> str: + return f"cashet:tag:{key}:{value}" + + def _access_key() -> str: return "cashet:index:last_accessed" @@ -234,13 +242,17 @@ def _index_commit_commands(pipe: Any, commit: Commit) -> None: pipe.set(_commit_key(commit.hash), _encode_commit(commit)) ts = commit.created_at.timestamp() pipe.zadd("cashet:index:all", {commit.hash: ts}) - pipe.zadd(_fp_key(commit.fingerprint), {commit.hash: ts}) + expires_ts = commit.expires_at.timestamp() if commit.expires_at else float("inf") + pipe.zadd(_fp_key(commit.fingerprint), {commit.hash: expires_ts}) pipe.zadd(_func_key(commit.task_def.func_name), {commit.hash: ts}) now_ts = datetime.now(UTC).timestamp() pipe.zadd(_access_key(), {commit.hash: now_ts}) for status in TaskStatus: pipe.srem(_status_key(status.value), commit.hash) pipe.sadd(_status_key(commit.status.value), commit.hash) + for key, val in commit.tags.items(): + pipe.sadd(_tag_key(key), commit.hash) + pipe.sadd(_tag_value_key(key, val), commit.hash) def _remove_commit_index_commands(pipe: Any, commit: Commit, resolved_hash: str) -> None: @@ -251,6 +263,9 @@ def _remove_commit_index_commands(pipe: Any, commit: Commit, resolved_hash: str) pipe.zrem(_access_key(), resolved_hash) for status in TaskStatus: pipe.srem(_status_key(status.value), resolved_hash) + for key, val in commit.tags.items(): + pipe.srem(_tag_key(key), resolved_hash) + pipe.srem(_tag_value_key(key, val), resolved_hash) def _commit_hash_from_key(raw: Any) -> str: @@ -367,14 +382,14 @@ async def get_commit(self, hash: str) -> Commit | None: return _decode_commit(data) async def find_by_fingerprint(self, fingerprint: str) -> Commit | None: - hashes = await self._redis.zrevrange(_fp_key(fingerprint), 0, -1) - now = datetime.now(UTC) + now_ts = datetime.now(UTC).timestamp() + hashes = await self._redis.zrevrangebyscore( + _fp_key(fingerprint), max="+inf", min=f"({now_ts}" + ) for h in hashes: h_str = h.decode() if isinstance(h, bytes) else h commit = await self.get_commit(h_str) if commit is not None and commit.status in (TaskStatus.COMPLETED, TaskStatus.CACHED): - if commit.expires_at is not None and commit.expires_at <= now: - continue await self._touch_commit(h_str) return commit return None @@ -562,16 +577,17 @@ async def delete_commit(self, hash: str) -> bool: return await self._delete_commit(hash) async def delete_by_tags(self, tags: dict[str, str | None]) -> int: - hashes = await self._redis.zrevrange("cashet:index:all", 0, -1) + set_keys: list[str] = [] + for key, val in tags.items(): + set_keys.append(_tag_key(key) if val is None else _tag_value_key(key, val)) + if len(set_keys) == 1: + hashes = await self._redis.smembers(set_keys[0]) + else: + hashes = await self._redis.sinter(set_keys) deleted = 0 for h in hashes: h_str = h.decode() if isinstance(h, bytes) else h - commit = await self.get_commit(h_str) - if ( - commit is not None - and _matches_tags(commit, tags) - and await self._delete_commit_obj(commit) - ): + if await self._delete_commit_obj_by_hash(h_str): deleted += 1 return deleted @@ -581,6 +597,12 @@ async def _delete_commit(self, hash: str) -> bool: return False return await self._delete_commit_obj(commit) + async def _delete_commit_obj_by_hash(self, hash: str) -> bool: + commit = await self.get_commit(hash) + if commit is None: + return False + return await self._delete_commit_obj(commit) + async def _delete_commit_obj(self, commit: Commit) -> bool: resolved_hash = commit.hash pipe = self._redis.pipeline() From 1d3ab6bd7a7bb21446996806352d0deebce02450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:27:58 +0200 Subject: [PATCH 3/7] feat: add invalidate CLI command --- src/cashet/cli.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/cashet/cli.py b/src/cashet/cli.py index 30b4b07..1c2c13c 100644 --- a/src/cashet/cli.py +++ b/src/cashet/cli.py @@ -276,6 +276,23 @@ def rm_cmd(hash: str) -> None: raise SystemExit(1) +@main.command("invalidate") +@click.option("--tag", "-t", "tags", multiple=True, required=True, help="Tag key=value or key") +def invalidate_cmd(tags: tuple[str, ...]) -> None: + """Delete all commits matching tag criteria""" + client = _client() + parsed = _parse_tags(tags) + if parsed is None: + console.print("[red]At least one --tag is required.[/red]") + raise SystemExit(1) + try: + deleted = client.invalidate(parsed) + except ValueError as e: + console.print(f"[red]{e}[/red]") + raise SystemExit(1) from None + console.print(f"[green]Invalidated {deleted} commit(s).[/green]") + + def _parse_size(size_str: str) -> int: size_str = size_str.strip().upper() units = {"B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4} From 4d7c9549ddcd59b6a285bdf42313385a7f5fc143 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:28:03 +0200 Subject: [PATCH 4/7] test: freezegun for TTL/GC tests, CLI invalidate and Redis delete_by_tags coverage --- tests/test_cli.py | 43 +++++++++++++++++++++++ tests/test_redis_store.py | 72 +++++++++++++++++++++++++++++++++++++++ tests/test_store.py | 63 +++++++++++++++++----------------- 3 files changed, 147 insertions(+), 31 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 999687f..9c6ff88 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -349,3 +349,46 @@ def add(x: int, y: int) -> int: client2 = Client(store_dir=store_dir2) assert client2.get(ref.commit_hash) == 3 + + +class TestInvalidate: + def test_invalidate_deletes_matching(self, cli_runner: CliRunner, store_dir: Path) -> None: + client = Client(store_dir=store_dir) + + def val() -> int: + return 1 + + client.submit(val, _tags={"env": "test"}) + result = _invoke(cli_runner, ["invalidate", "-t", "env=test"], store_dir) + assert result.exit_code == 0 + assert "Invalidated 1 commit(s)" in result.output + + def test_invalidate_bare_key(self, cli_runner: CliRunner, store_dir: Path) -> None: + client = Client(store_dir=store_dir) + + def val_a() -> int: + return 1 + + def val_b() -> int: + return 2 + + client.submit(val_a, _tags={"env": "prod"}) + client.submit(val_b, _tags={"env": "staging"}) + result = _invoke(cli_runner, ["invalidate", "-t", "env"], store_dir) + assert result.exit_code == 0 + assert "Invalidated 2 commit(s)" in result.output + + def test_invalidate_no_match(self, cli_runner: CliRunner, store_dir: Path) -> None: + client = Client(store_dir=store_dir) + + def val() -> int: + return 1 + + client.submit(val, _tags={"env": "prod"}) + result = _invoke(cli_runner, ["invalidate", "-t", "env=staging"], store_dir) + assert result.exit_code == 0 + assert "Invalidated 0 commit(s)" in result.output + + def test_invalidate_requires_tag(self, cli_runner: CliRunner, store_dir: Path) -> None: + result = _invoke(cli_runner, ["invalidate"], store_dir) + assert result.exit_code != 0 diff --git a/tests/test_redis_store.py b/tests/test_redis_store.py index 9915127..c0f25c7 100644 --- a/tests/test_redis_store.py +++ b/tests/test_redis_store.py @@ -560,3 +560,75 @@ def run(x: int) -> None: assert call_count == 1 assert len({r.hash for r in refs}) == 1 + + def test_delete_by_tags_exact_match(self, redis_store: RedisStore) -> None: + task_def = TaskDef( + func_hash="a" * 64, + func_name="f", + func_source="def f(): pass", + args_hash="b" * 64, + args_snapshot=b"", + tags={"env": "test"}, + ) + commit = Commit( + hash="d" * 64, task_def=task_def, tags={"env": "test"}, status=TaskStatus.COMPLETED + ) + redis_store.put_commit(commit) + assert redis_store.get_commit("d" * 64) is not None + + deleted = redis_store.delete_by_tags({"env": "test"}) + assert deleted == 1 + assert redis_store.get_commit("d" * 64) is None + + def test_delete_by_tags_bare_key(self, redis_store: RedisStore) -> None: + for i, env in enumerate(["prod", "staging"]): + task_def = TaskDef( + func_hash=f"{i:064d}", + func_name="f", + func_source="def f(): pass", + args_hash="b" * 64, + args_snapshot=b"", + tags={"env": env}, + ) + commit = Commit( + hash=f"{i:064d}", task_def=task_def, tags={"env": env}, + status=TaskStatus.COMPLETED, + ) + redis_store.put_commit(commit) + + deleted = redis_store.delete_by_tags({"env": None}) + assert deleted == 2 + + def test_delete_by_tags_multi_condition(self, redis_store: RedisStore) -> None: + task_def = TaskDef( + func_hash="a" * 64, + func_name="f", + func_source="def f(): pass", + args_hash="b" * 64, + args_snapshot=b"", + tags={"env": "prod", "model": "v2"}, + ) + commit = Commit( + hash="e" * 64, task_def=task_def, tags={"env": "prod", "model": "v2"}, + status=TaskStatus.COMPLETED, + ) + redis_store.put_commit(commit) + + tag_unmatched = TaskDef( + func_hash="c" * 64, + func_name="f", + func_source="def f(): pass", + args_hash="d" * 64, + args_snapshot=b"", + tags={"env": "prod", "model": "v1"}, + ) + commit2 = Commit( + hash="f" * 64, task_def=tag_unmatched, tags={"env": "prod", "model": "v1"}, + status=TaskStatus.COMPLETED, + ) + redis_store.put_commit(commit2) + + deleted = redis_store.delete_by_tags({"env": "prod", "model": "v2"}) + assert deleted == 1 + assert redis_store.get_commit("e" * 64) is None + assert redis_store.get_commit("f" * 64) is not None diff --git a/tests/test_store.py b/tests/test_store.py index 0bf665d..308932f 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -716,7 +716,7 @@ def boom() -> None: assert client.stats()["total_commits"] == 0 def test_gc_exact_orphan_detection(self, client: Client) -> None: - import time + from freezegun import freeze_time def produce() -> bytes: return b"keep_me" @@ -724,16 +724,17 @@ def produce() -> bytes: def consume(data: bytes) -> bytes: return data + b"_consumed" - ref1 = client.submit(produce) - time.sleep(1.1) - client.submit(consume, ref1) - assert client.stats()["total_commits"] == 2 - assert client.stats()["stored_objects"] >= 1 + with freeze_time("2026-01-01 00:00:00") as frozen: + ref1 = client.submit(produce) + frozen.tick(1.1) + client.submit(consume, ref1) + assert client.stats()["total_commits"] == 2 + assert client.stats()["stored_objects"] >= 1 - cutoff = datetime.now(UTC) - timedelta(seconds=0.5) - client.store.evict(cutoff) - assert client.stats()["total_commits"] == 1 - assert client.stats()["stored_objects"] >= 1 + cutoff = datetime.now(UTC) - timedelta(seconds=0.5) + client.store.evict(cutoff) + assert client.stats()["total_commits"] == 1 + assert client.stats()["stored_objects"] >= 1 def test_client_close(self, store_dir: Path) -> None: client = Client(store_dir=store_dir) @@ -1113,6 +1114,8 @@ def val() -> int: assert commit.task_def.ttl == timedelta(seconds=3600) def test_ttl_expiration_causes_reexecution(self, client: Client) -> None: + from freezegun import freeze_time + call_count = 0 def val() -> int: @@ -1120,21 +1123,18 @@ def val() -> int: call_count += 1 return call_count - ref1 = client.submit(val, _ttl=0.05) - assert ref1.load() == 1 + with freeze_time("2026-01-01 00:00:00") as frozen: + ref1 = client.submit(val, _ttl=0.05) + assert ref1.load() == 1 - # Should still be cached immediately - ref2 = client.submit(val, _ttl=0.05) - assert ref2.load() == 1 - assert ref1.commit_hash == ref2.commit_hash + ref2 = client.submit(val, _ttl=0.05) + assert ref2.load() == 1 + assert ref1.commit_hash == ref2.commit_hash - # Wait for TTL to expire - import time - - time.sleep(0.1) + frozen.tick(0.1) - ref3 = client.submit(val, _ttl=0.05) - assert ref3.load() == 2 + ref3 = client.submit(val, _ttl=0.05) + assert ref3.load() == 2 def test_ttl_on_task_decorator(self, client: Client) -> None: @client.task(ttl=3600) @@ -1187,6 +1187,8 @@ def b() -> int: assert commit.expires_at is not None def test_ttl_on_submit_many_causes_reexecution(self, client: Client) -> None: + from freezegun import freeze_time + call_count = 0 def val_a() -> int: @@ -1199,18 +1201,17 @@ def val_b() -> int: call_count += 1 return call_count - refs1 = client.submit_many([val_a, val_b], _ttl=0.05) - assert [r.load() for r in refs1] == [1, 2] + with freeze_time("2026-01-01 00:00:00") as frozen: + refs1 = client.submit_many([val_a, val_b], _ttl=0.05) + assert [r.load() for r in refs1] == [1, 2] - refs2 = client.submit_many([val_a, val_b], _ttl=0.05) - assert [r.load() for r in refs2] == [1, 2] - - import time + refs2 = client.submit_many([val_a, val_b], _ttl=0.05) + assert [r.load() for r in refs2] == [1, 2] - time.sleep(0.1) + frozen.tick(0.1) - refs3 = client.submit_many([val_a, val_b], _ttl=0.05) - assert [r.load() for r in refs3] == [3, 4] + refs3 = client.submit_many([val_a, val_b], _ttl=0.05) + assert [r.load() for r in refs3] == [3, 4] class TestTagInvalidation: From 24f3c4f7a125c4f80f242f13b3d3a2b3911a8f5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:28:08 +0200 Subject: [PATCH 5/7] chore: bump version to 0.4.2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f134726..2a429fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cashet" -version = "0.4.1" +version = "0.4.2" description = "A Python memoization cache with Redis, async support, DAG pipelines, and an HTTP server" readme = "README.md" license = "MIT" From d49b142522b414213eac4ce05ca755fbb94bce8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:28:13 +0200 Subject: [PATCH 6/7] docs: add 0.4.2 changelog --- CHANGELOG.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dc5050..335ed89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog +## 0.4.2 (unreleased) + +### Performance +- `delete_by_tags` in SQLite batches all matching rows into a single DELETE with one + orphan-detection pass instead of row-by-row `_delete_commit_body` calls. +- `delete_by_tags` in Redis uses tag-set indexes (`cashet:tag:{key}`, + `cashet:tag:{key}:{value}`) with SINTER instead of a full `zrevrange(all)` scan. +- `find_by_fingerprint` in Redis pushes TTL filtering server-side via + `ZREVRANGEBYSCORE` using `expires_at` timestamp as the sorted-set score. + +### Added +- `cashet invalidate -t key=value` / `-t key` CLI command. + +### Fixed +- Deterministic TTL and GC tests using `freezegun` instead of `time.sleep`. + ## 0.4.1 — 1.5.2026. ### Fixed From 7fddf391bcda43d4b9c36c96d919a80fb1c6b014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Jolovi=C4=87?= Date: Fri, 1 May 2026 18:33:43 +0200 Subject: [PATCH 7/7] docs: finalize 0.4.2 release date in changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 335ed89..3fcc642 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.4.2 (unreleased) +## 0.4.2 — 1.5.2026. ### Performance - `delete_by_tags` in SQLite batches all matching rows into a single DELETE with one