From a5efd7d03595497487fb60e4c06fd97155ff2f4d Mon Sep 17 00:00:00 2001 From: Daniel Date: Sat, 21 Feb 2026 12:55:25 +0800 Subject: [PATCH 1/2] Implemented Stream tombstones --- CHANGELOG.md | 10 + .../example/lib/store_state_based.dart | 9 + .../lib/store_state_based_local_db.dart | 6 + .../lib/store_state_based_transactional.dart | 14 ++ .../lib/src/persistence/event_store.dart | 14 +- .../lib/src/persistence/session_impl.dart | 20 ++ .../event_application_mode_test.mocks.dart | 9 + .../session_apply_async_test.mocks.dart | 9 + .../session_concurrency_retry_test.mocks.dart | 18 ++ .../session_load_all_async_test.mocks.dart | 18 ++ .../test/persistence/session_test.mocks.dart | 18 ++ .../lib/src/combining_builder.dart | 2 +- .../src/persistence/state_based_session.dart | 64 +++++- .../target_persistence_adapter.dart | 10 + .../persistence/state_based_session_test.dart | 199 +++++++++++++++--- .../state_based_session_test.mocks.dart | 19 +- .../state_based_store_test.mocks.dart | 19 +- .../lib/src/hive_event_store.dart | 35 ++- .../lib/src/hive_persistence_adapter.dart | 7 + .../lib/src/in_memory_event_store.dart | 28 ++- .../src/in_memory_persistence_adapter.dart | 7 + .../in_memory_persistence_adapter_test.dart | 52 +++++ .../lib/src/sembast_event_store.dart | 29 ++- .../lib/src/sembast_persistence_adapter.dart | 7 + packages/continuum_uow/lib/src/session.dart | 32 ++- .../continuum_uow/lib/src/session_base.dart | 18 ++ .../test/unit/session_base_test.dart | 81 +++++++ .../test/unit/transactional_runner_test.dart | 3 + 28 files changed, 696 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c0efe8..d6d73b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `deleteAsync(StreamId)` to `Session` for marking aggregates for deletion within a unit of work. No prior load is required — the stream ID alone is sufficient. +- Added `deleteAsync(StreamId)` to `TargetPersistenceAdapter` for physical entity deletion in state-based persistence. +- Added `softDeleteStreamAsync(StreamId)` to `EventStore` for tombstone-based stream deletion in event sourcing. +- `StateBasedSession.saveChangesAsync` now processes deletion-marked entities by calling `adapter.deleteAsync` and removing them from the identity map. +- Event sourcing `SessionImpl.saveChangesAsync` now soft-deletes marked streams via `EventStore.softDeleteStreamAsync`, writing a tombstone flag instead of rejecting deletion. +- `InMemoryEventStore`, `HiveEventStore`, and `SembastEventStore` implement `softDeleteStreamAsync` — soft-deleted streams are excluded from `loadStreamAsync` and `getStreamIdsByAggregateTypeAsync` but their events are retained for auditability. +- `InMemoryPersistenceAdapter`, `HivePersistenceAdapter`, and `SembastPersistenceAdapter` now implement `deleteAsync`. + ## [5.2.0] - 2026-02-20 diff --git a/packages/continuum/example/lib/store_state_based.dart b/packages/continuum/example/lib/store_state_based.dart index 2b92ae5..4ba6561 100644 --- a/packages/continuum/example/lib/store_state_based.dart +++ b/packages/continuum/example/lib/store_state_based.dart @@ -77,6 +77,15 @@ class FakeUserApiAdapter implements TargetPersistenceAdapter { '(${pendingOperations.length} event(s)) → 200 OK', ); } + + @override + Future deleteAsync(StreamId streamId) async { + // Simulate network latency. + await Future.delayed(const Duration(milliseconds: 50)); + + _backendDb.remove(streamId.value); + print(' [Backend] DELETE /users/${streamId.value} → 200 OK'); + } } /// Simple record representing server-side user state. diff --git a/packages/continuum/example/lib/store_state_based_local_db.dart b/packages/continuum/example/lib/store_state_based_local_db.dart index ba96906..d5a6080 100644 --- a/packages/continuum/example/lib/store_state_based_local_db.dart +++ b/packages/continuum/example/lib/store_state_based_local_db.dart @@ -135,6 +135,12 @@ final class UserLocalDbAdapter implements TargetPersistenceAdapter { 'deactivatedAt': target.deactivatedAt?.toIso8601String(), }); } + + @override + Future deleteAsync(StreamId streamId) async { + // Remove the target from the local database. + await _db.delete(streamId.value); + } } // ── Example ───────────────────────────────────────────────────────────────── diff --git a/packages/continuum/example/lib/store_state_based_transactional.dart b/packages/continuum/example/lib/store_state_based_transactional.dart index fa66c73..ffdf9ee 100644 --- a/packages/continuum/example/lib/store_state_based_transactional.dart +++ b/packages/continuum/example/lib/store_state_based_transactional.dart @@ -91,6 +91,14 @@ final class FakeBackendApi { print(' [Backend] PATCH /users/$id → 200 OK'); } + + /// Simulates `DELETE /users/:id` — removes a user from the backend. + Future deleteUserAsync(String id) async { + await Future.delayed(const Duration(milliseconds: 30)); + + _database.remove(id); + print(' [Backend] DELETE /users/$id → 200 OK'); + } } /// Simple record representing server-side user state. @@ -183,6 +191,12 @@ final class UserApiAdapter implements TargetPersistenceAdapter { } } } + + @override + Future deleteAsync(StreamId streamId) async { + // Delete the user from the backend. + await _api.deleteUserAsync(streamId.value); + } } // ── Example ───────────────────────────────────────────────────────────────── diff --git a/packages/continuum_event_sourcing/lib/src/persistence/event_store.dart b/packages/continuum_event_sourcing/lib/src/persistence/event_store.dart index 4fe78dc..0d10268 100644 --- a/packages/continuum_event_sourcing/lib/src/persistence/event_store.dart +++ b/packages/continuum_event_sourcing/lib/src/persistence/event_store.dart @@ -42,8 +42,20 @@ abstract interface class EventStore { /// passed to [appendEventsAsync] or [AtomicEventStore.appendEventsToStreamsAsync] /// when events were first persisted for a stream. /// - /// Returns an empty list if no streams match. + /// Returns an empty list if no streams match. Streams that have been + /// soft-deleted via [softDeleteStreamAsync] are excluded. Future> getStreamIdsByAggregateTypeAsync( String aggregateType, ); + + /// Marks a stream as deleted using a tombstone flag in stream + /// metadata. + /// + /// The events remain in the store but the stream is treated as + /// non-existent: [loadStreamAsync] returns an empty list and + /// [getStreamIdsByAggregateTypeAsync] excludes it. + /// + /// Must be idempotent — soft-deleting a non-existent or already + /// deleted stream is a no-op. + Future softDeleteStreamAsync(StreamId streamId); } diff --git a/packages/continuum_event_sourcing/lib/src/persistence/session_impl.dart b/packages/continuum_event_sourcing/lib/src/persistence/session_impl.dart index 25b0a1d..fa0efc8 100644 --- a/packages/continuum_event_sourcing/lib/src/persistence/session_impl.dart +++ b/packages/continuum_event_sourcing/lib/src/persistence/session_impl.dart @@ -267,6 +267,12 @@ final class SessionImpl extends SessionBase { for (var attempt = 0; attempt <= maxRetries; attempt++) { try { final batch = await _persistPendingEventsAsync(); + + // Soft-delete streams marked for deletion via tombstone metadata. + // This happens after successful event persistence so pending + // operations on the same stream are not lost. + await _softDeleteMarkedStreamsAsync(); + return batch; } on ConcurrencyException { final isLastAttempt = attempt == maxRetries; @@ -281,6 +287,20 @@ final class SessionImpl extends SessionBase { return CommitBatch.empty; } + /// Soft-deletes all streams in [streamsMarkedForDeletion] via + /// [EventStore.softDeleteStreamAsync] and removes them from the + /// identity map. + Future _softDeleteMarkedStreamsAsync() async { + // Copy to avoid concurrent modification during iteration. + final streamIds = Set.of(streamsMarkedForDeletion); + + for (final streamId in streamIds) { + await _eventStore.softDeleteStreamAsync(streamId); + trackedEntities.remove(streamId); + streamsMarkedForDeletion.remove(streamId); + } + } + /// Core persistence logic extracted so [saveChangesAsync] can retry it. Future _persistPendingEventsAsync() async { final pendingEntries = >[]; diff --git a/packages/continuum_event_sourcing/test/persistence/event_application_mode_test.mocks.dart b/packages/continuum_event_sourcing/test/persistence/event_application_mode_test.mocks.dart index 5039fb9..6adfe44 100644 --- a/packages/continuum_event_sourcing/test/persistence/event_application_mode_test.mocks.dart +++ b/packages/continuum_event_sourcing/test/persistence/event_application_mode_test.mocks.dart @@ -78,4 +78,13 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } diff --git a/packages/continuum_event_sourcing/test/persistence/session_apply_async_test.mocks.dart b/packages/continuum_event_sourcing/test/persistence/session_apply_async_test.mocks.dart index 3d2c091..8ee2664 100644 --- a/packages/continuum_event_sourcing/test/persistence/session_apply_async_test.mocks.dart +++ b/packages/continuum_event_sourcing/test/persistence/session_apply_async_test.mocks.dart @@ -78,4 +78,13 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } diff --git a/packages/continuum_event_sourcing/test/persistence/session_concurrency_retry_test.mocks.dart b/packages/continuum_event_sourcing/test/persistence/session_concurrency_retry_test.mocks.dart index 64d1d8d..12e4783 100644 --- a/packages/continuum_event_sourcing/test/persistence/session_concurrency_retry_test.mocks.dart +++ b/packages/continuum_event_sourcing/test/persistence/session_concurrency_retry_test.mocks.dart @@ -80,6 +80,15 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } /// A class which mocks [AtomicEventStore]. @@ -142,4 +151,13 @@ class MockAtomicEventStore extends _i1.Mock implements _i7.AtomicEventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } diff --git a/packages/continuum_event_sourcing/test/persistence/session_load_all_async_test.mocks.dart b/packages/continuum_event_sourcing/test/persistence/session_load_all_async_test.mocks.dart index 23909c8..b58f8ad 100644 --- a/packages/continuum_event_sourcing/test/persistence/session_load_all_async_test.mocks.dart +++ b/packages/continuum_event_sourcing/test/persistence/session_load_all_async_test.mocks.dart @@ -80,6 +80,15 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } /// A class which mocks [AtomicEventStore]. @@ -142,4 +151,13 @@ class MockAtomicEventStore extends _i1.Mock implements _i7.AtomicEventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } diff --git a/packages/continuum_event_sourcing/test/persistence/session_test.mocks.dart b/packages/continuum_event_sourcing/test/persistence/session_test.mocks.dart index 263f178..353f2f2 100644 --- a/packages/continuum_event_sourcing/test/persistence/session_test.mocks.dart +++ b/packages/continuum_event_sourcing/test/persistence/session_test.mocks.dart @@ -80,6 +80,15 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } /// A class which mocks [AtomicEventStore]. @@ -142,4 +151,13 @@ class MockAtomicEventStore extends _i1.Mock implements _i7.AtomicEventStore { ), ) as _i3.Future>); + + @override + _i3.Future softDeleteStreamAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#softDeleteStreamAsync, [streamId]), + returnValue: _i3.Future.value(), + returnValueForMissingStub: _i3.Future.value(), + ) + as _i3.Future); } diff --git a/packages/continuum_generator/lib/src/combining_builder.dart b/packages/continuum_generator/lib/src/combining_builder.dart index 74d495e..ff11982 100644 --- a/packages/continuum_generator/lib/src/combining_builder.dart +++ b/packages/continuum_generator/lib/src/combining_builder.dart @@ -154,7 +154,7 @@ class CombiningBuilder implements Builder { buffer.writeln('/// Backward-compatible alias for [\$targetList].'); buffer.writeln('///'); buffer.writeln('/// This package historically exposed discovered targets as `\$aggregateList`.'); - buffer.writeln("@Deprecated('Use \$targetList instead.')"); + buffer.writeln("@Deprecated('Use \targetList instead.')"); buffer.writeln('final List \$aggregateList = \$targetList;'); buffer.writeln(); } diff --git a/packages/continuum_state/lib/src/persistence/state_based_session.dart b/packages/continuum_state/lib/src/persistence/state_based_session.dart index 7d548f4..f7dc4ba 100644 --- a/packages/continuum_state/lib/src/persistence/state_based_session.dart +++ b/packages/continuum_state/lib/src/persistence/state_based_session.dart @@ -90,15 +90,19 @@ final class StateBasedSession extends SessionBase { @override Future saveChangesAsync({int maxRetries = 1}) async { - // Collect all entities with pending operations. + // Collect entities with pending operations, excluding those marked + // for deletion (deletions are handled separately below). final pendingEntries = >[]; for (final entry in trackedEntities.entries) { - if (entry.value.pendingOperations.isNotEmpty) { + if (!streamsMarkedForDeletion.contains(entry.key) && + entry.value.pendingOperations.isNotEmpty) { pendingEntries.add(entry); } } - if (pendingEntries.isEmpty) return CommitBatch.empty; + if (pendingEntries.isEmpty && streamsMarkedForDeletion.isEmpty) { + return CommitBatch.empty; + } // In deferred mode, apply all pending operations to entities before // persisting so the adapter receives the post-operation state. @@ -140,13 +144,31 @@ final class StateBasedSession extends SessionBase { } } + // Delete each stream that was marked for deletion via its adapter. + final deletionStreamIds = Set.of(streamsMarkedForDeletion); + for (final streamId in deletionStreamIds) { + try { + final adapter = _resolveAdapterForDeletion(streamId); + await adapter.deleteAsync(streamId); + savedStreams.add(streamId); + + // Remove the entity from the identity map and the deletion + // set after successful deletion. + trackedEntities.remove(streamId); + streamsMarkedForDeletion.remove(streamId); + } on Exception catch (exception) { + failedStreams.add(streamId); + firstFailure ??= exception; + } + } + // Determine what to throw (if anything). if (failedStreams.isNotEmpty) { _handleSaveFailure( savedStreams: savedStreams, failedStreams: failedStreams, firstFailure: firstFailure!, - totalStreams: pendingEntries.length, + totalStreams: pendingEntries.length + deletionStreamIds.length, ); } @@ -240,4 +262,38 @@ final class StateBasedSession extends SessionBase { } return adapter; } + + /// Resolves the adapter to use for deleting a stream. + /// + /// If the entity is tracked, uses the tracked entity type for an + /// exact adapter lookup. Otherwise falls back to the first + /// registered adapter — deletion is idempotent, so calling it on + /// an adapter whose backend has no matching key is a safe no-op. + /// + /// Throws [InvalidOperationException] when no adapters are + /// registered at all. + TargetPersistenceAdapter _resolveAdapterForDeletion( + StreamId streamId, + ) { + // Prefer the tracked entity type for precise resolution. + final state = trackedEntities[streamId]; + if (state != null) { + final adapter = _adapters[state.entityType]; + if (adapter != null) return adapter; + } + + // Untracked stream — use the single registered adapter. + // Multi-adapter sessions that delete untracked streams are a + // rare edge case; for those, the caller should load first to + // disambiguate. + if (_adapters.isEmpty) { + throw const InvalidOperationException( + message: + 'Cannot delete stream: no TargetPersistenceAdapter is ' + 'registered in this session.', + ); + } + + return _adapters.values.first; + } } diff --git a/packages/continuum_state/lib/src/persistence/target_persistence_adapter.dart b/packages/continuum_state/lib/src/persistence/target_persistence_adapter.dart index ea1c74a..1686295 100644 --- a/packages/continuum_state/lib/src/persistence/target_persistence_adapter.dart +++ b/packages/continuum_state/lib/src/persistence/target_persistence_adapter.dart @@ -30,4 +30,14 @@ abstract interface class TargetPersistenceAdapter { TTarget target, List pendingOperations, ); + + /// Deletes a target from the backend. + /// + /// Removes the entity identified by [streamId] from the underlying + /// store. Called during commit when a session marks a stream for + /// deletion via [Session.deleteAsync]. + /// + /// Must be idempotent: deleting a non-existent target should not + /// throw. Throws on transient failures (network errors, etc.). + Future deleteAsync(StreamId streamId); } \ No newline at end of file diff --git a/packages/continuum_state/test/persistence/state_based_session_test.dart b/packages/continuum_state/test/persistence/state_based_session_test.dart index 101094f..2aa100a 100644 --- a/packages/continuum_state/test/persistence/state_based_session_test.dart +++ b/packages/continuum_state/test/persistence/state_based_session_test.dart @@ -39,7 +39,7 @@ void main() { group('loadAsync', () { test( - '5.1 first call fetches from adapter, second call returns cached', + 'First call fetches from adapter, second call returns cached', () async { // Arrange final session = createSession(); @@ -62,7 +62,7 @@ void main() { ); test( - '5.2 throws InvalidOperationException when no adapter registered', + 'Throws InvalidOperationException when no adapter registered', () async { // Arrange — session has no adapters registered. final session = createSession(adapters: {}); @@ -76,7 +76,7 @@ void main() { }, ); - test('5.3 rethrows StreamNotFoundException from adapter', () async { + test('Rethrows StreamNotFoundException from adapter', () async { // Arrange final session = createSession(); const streamId = StreamId('missing-1'); @@ -95,7 +95,7 @@ void main() { group('applyAsync', () { test( - '5.4 creation event creates aggregate via factory without calling adapter', + 'Creation event creates aggregate via factory without calling adapter', () async { // Arrange final session = createSession(); @@ -118,7 +118,7 @@ void main() { ); test( - '5.5 mutation event on untracked stream calls fetchAsync first', + 'Mutation event on untracked stream calls fetchAsync first', () async { // Arrange final session = createSession(); @@ -147,7 +147,7 @@ void main() { ); test( - '5.6 event on already-tracked stream applies directly without adapter', + 'Event on already-tracked stream applies directly without adapter', () async { // Arrange — load stream so it's tracked. final session = createSession(); @@ -180,7 +180,7 @@ void main() { ); test( - '5.7 creation event on already-tracked stream throws InvalidOperationException', + 'Creation event on already-tracked stream throws InvalidOperationException', () async { // Arrange — create a stream so it's tracked. final session = createSession(); @@ -204,7 +204,7 @@ void main() { }); group('EventApplicationMode', () { - test('5.8 eager mode applies event immediately', () async { + test('Eager mode applies event immediately', () async { // Arrange — eager mode is the default. final session = createSession( applicationMode: EventApplicationMode.eager, @@ -225,7 +225,7 @@ void main() { expect(counter.value, equals(15)); }); - test('5.9 deferred mode records event but aggregate unchanged', () async { + test('Deferred mode records event but aggregate unchanged', () async { // Arrange — deferred mode. final session = createSession( applicationMode: EventApplicationMode.deferred, @@ -249,7 +249,7 @@ void main() { group('saveChangesAsync', () { test( - '5.10 calls persistAsync with aggregate and pending operations', + 'Calls persistAsync with aggregate and pending operations', () async { // Arrange final session = createSession(); @@ -288,7 +288,7 @@ void main() { }, ); - test('5.11 deferred mode applies events before persistAsync', () async { + test('Deferred mode applies events before persistAsync', () async { // Arrange final session = createSession( applicationMode: EventApplicationMode.deferred, @@ -325,7 +325,7 @@ void main() { }); test( - '5.12 no pending operations completes without calling adapter', + 'No pending operations completes without calling adapter', () async { // Arrange final session = createSession(); @@ -338,7 +338,7 @@ void main() { }, ); - test('5.13 pending operations cleared after successful save', () async { + test('Pending operations cleared after successful save', () async { // Arrange final session = createSession(); const streamId = StreamId('counter-1'); @@ -361,7 +361,7 @@ void main() { verify(mockAdapter.persistAsync(any, any, any)).called(1); }); - test('5.14 multiple streams each call persistAsync independently', () async { + test('Multiple streams each call persistAsync independently', () async { // Arrange — two separate streams share the same adapter. final session = createSession(); const streamId1 = StreamId('counter-1'); @@ -388,9 +388,9 @@ void main() { }); }); - group('concurrency retry', () { + group('Concurrency retry', () { test( - '5.15 ConcurrencyException triggers re-fetch, re-apply, and retry', + 'ConcurrencyException triggers re-fetch, re-apply, and retry', () async { // Arrange final session = createSession(); @@ -438,7 +438,7 @@ void main() { ); test( - '5.16 exhausted retries rethrows ConcurrencyException', + 'Exhausted retries rethrows ConcurrencyException', () async { // Arrange final session = createSession(); @@ -474,7 +474,7 @@ void main() { }, ); - test('5.17 maxRetries parameter controls attempt count', () async { + test('MaxRetries parameter controls attempt count', () async { // Arrange final session = createSession(); const streamId = StreamId('counter-1'); @@ -511,9 +511,9 @@ void main() { }); }); - group('partial save', () { + group('Partial save', () { test( - '5.18 first stream succeeds, second fails → PartialSaveException', + 'First stream succeeds, second fails → PartialSaveException', () async { // Arrange final session = createSession(); @@ -563,7 +563,7 @@ void main() { ); test( - '5.19 saved streams cleared, failed streams retain pending operations', + 'Saved streams cleared, failed streams retain pending operations', () async { // Arrange final session = createSession(); @@ -610,7 +610,7 @@ void main() { ); test( - '5.20 single-stream failure rethrows original exception', + 'Single-stream failure rethrows original exception', () async { // Arrange — only one stream has pending operations. final session = createSession(); @@ -636,7 +636,7 @@ void main() { ); test( - '5.21 all-streams-fail rethrows first failure', + 'All-streams-fail rethrows first failure', () async { // Arrange — two streams, both fail. final session = createSession(); @@ -668,9 +668,9 @@ void main() { ); }); - group('exception propagation', () { + group('Exception propagation', () { test( - '5.22 TransientAdapterException propagates unchanged from persistAsync', + 'TransientAdapterException propagates unchanged from persistAsync', () async { // Arrange final session = createSession(); @@ -696,7 +696,7 @@ void main() { ); test( - '5.23 PermanentAdapterException propagates unchanged from persistAsync', + 'PermanentAdapterException propagates unchanged from persistAsync', () async { // Arrange final session = createSession(); @@ -722,7 +722,7 @@ void main() { ); test( - '5.24 unknown exception propagates unchanged from persistAsync', + 'Unknown exception propagates unchanged from persistAsync', () async { // Arrange final session = createSession(); @@ -748,8 +748,8 @@ void main() { ); }); - group('discard', () { - test('5.25 discardStream clears pending operations for one stream', () async { + group('Discard', () { + test('DiscardStream clears pending operations for one stream', () async { // Arrange final session = createSession(); const streamId = StreamId('counter-1'); @@ -767,7 +767,7 @@ void main() { verifyNever(mockAdapter.persistAsync(any, any, any)); }); - test('5.26 discardAll clears pending operations for all streams', () async { + test('DiscardAll clears pending operations for all streams', () async { // Arrange final session = createSession(); const streamId1 = StreamId('counter-1'); @@ -790,4 +790,143 @@ void main() { verifyNever(mockAdapter.persistAsync(any, any, any)); }); }); + + group('DeleteAsync', () { + test('Deleting a loaded entity calls adapter.deleteAsync on save', () async { + // Arrange — load an entity, then mark it for deletion. + final session = createSession(); + const streamId = StreamId('counter-1'); + final counter = Counter(42); + + when(mockAdapter.fetchAsync(streamId)).thenAnswer((_) async => counter); + when(mockAdapter.deleteAsync(streamId)).thenAnswer((_) async {}); + + await session.loadAsync(streamId); + + // Act — mark for deletion and commit. + session.deleteAsync(streamId); + await session.saveChangesAsync(); + + // Assert — deleteAsync was called on the adapter, persistAsync was not. + verify(mockAdapter.deleteAsync(streamId)).called(1); + verifyNever(mockAdapter.persistAsync(any, any, any)); + }); + + test('Deleting a created entity calls adapter.deleteAsync on save', () async { + // Arrange — create an entity via applyAsync, then mark for deletion. + final session = createSession(); + const streamId = StreamId('counter-1'); + + when(mockAdapter.deleteAsync(streamId)).thenAnswer((_) async {}); + + await session.applyAsync( + streamId, + CounterCreated(eventId: const EventId('e-1'), initial: 10), + ); + + // Act — mark for deletion and commit. + session.deleteAsync(streamId); + await session.saveChangesAsync(); + + // Assert — deleteAsync was called; the creation was never persisted + // because the entity was marked for deletion before commit. + verify(mockAdapter.deleteAsync(streamId)).called(1); + verifyNever(mockAdapter.persistAsync(any, any, any)); + }); + + test('Deleting removes entity from identity map after save', () async { + // Arrange — load, mark for deletion, save. + final session = createSession(); + const streamId = StreamId('counter-1'); + final counter = Counter(42); + + when(mockAdapter.fetchAsync(streamId)).thenAnswer((_) async => counter); + when(mockAdapter.deleteAsync(streamId)).thenAnswer((_) async {}); + + await session.loadAsync(streamId); + session.deleteAsync(streamId); + await session.saveChangesAsync(); + + // Act — loading the same stream again should call fetchAsync + // because it is no longer in the identity map. + final freshCounter = Counter(99); + when(mockAdapter.fetchAsync(streamId)).thenAnswer((_) async => freshCounter); + + final reloaded = await session.loadAsync(streamId); + + // Assert — the reloaded entity is the fresh one, not the deleted one. + expect(reloaded.value, equals(99)); + verify(mockAdapter.fetchAsync(streamId)).called(2); + }); + + test('DeleteAsync accepts untracked stream without throwing', () async { + // Arrange + final session = createSession(); + const streamId = StreamId('unknown-stream'); + + when(mockAdapter.deleteAsync(streamId)).thenAnswer((_) async {}); + + // Act — deleting an untracked stream should not throw. + session.deleteAsync(streamId); + await session.saveChangesAsync(); + + // Assert — adapter.deleteAsync was called (idempotent no-op on + // the backend side). + verify(mockAdapter.deleteAsync(streamId)).called(1); + }); + + test('DeleteAsync failure does not remove entity from identity map', () async { + // Arrange — load entity, mark for deletion, but adapter throws. + final session = createSession(); + const streamId = StreamId('counter-1'); + final counter = Counter(42); + + when(mockAdapter.fetchAsync(streamId)).thenAnswer((_) async => counter); + when(mockAdapter.deleteAsync(streamId)).thenThrow( + StateError('Backend unavailable'), + ); + + await session.loadAsync(streamId); + session.deleteAsync(streamId); + + // Act & Assert — save should throw due to adapter failure. + expect( + () => session.saveChangesAsync(), + throwsA(isA()), + ); + }); + + test('Save with mixed persist and delete handles both', () async { + // Arrange — create two entities, delete one, persist the other. + final session = createSession(); + const streamId1 = StreamId('counter-1'); + const streamId2 = StreamId('counter-2'); + final counter2 = Counter(50); + + when(mockAdapter.fetchAsync(streamId2)).thenAnswer((_) async => counter2); + when(mockAdapter.deleteAsync(streamId1)).thenAnswer((_) async {}); + when(mockAdapter.persistAsync(any, any, any)).thenAnswer((_) async {}); + + // Create counter-1 and mark for deletion. + await session.applyAsync( + streamId1, + CounterCreated(eventId: const EventId('e-1'), initial: 10), + ); + session.deleteAsync(streamId1); + + // Load counter-2 and apply a mutation. + await session.loadAsync(streamId2); + await session.applyAsync( + streamId2, + CounterIncremented(eventId: const EventId('e-2'), amount: 5), + ); + + // Act — save should handle both persist and delete. + await session.saveChangesAsync(); + + // Assert — deleteAsync for counter-1, persistAsync for counter-2. + verify(mockAdapter.deleteAsync(streamId1)).called(1); + verify(mockAdapter.persistAsync(streamId2, any, any)).called(1); + }); + }); } diff --git a/packages/continuum_state/test/persistence/state_based_session_test.mocks.dart b/packages/continuum_state/test/persistence/state_based_session_test.mocks.dart index 4b1e324..3b8ce8b 100644 --- a/packages/continuum_state/test/persistence/state_based_session_test.mocks.dart +++ b/packages/continuum_state/test/persistence/state_based_session_test.mocks.dart @@ -6,7 +6,8 @@ import 'dart:async' as _i4; import 'package:continuum/continuum.dart' as _i5; -import 'package:continuum_state/src/persistence/target_persistence_adapter.dart' as _i2; +import 'package:continuum_state/src/persistence/target_persistence_adapter.dart' + as _i2; import 'package:mockito/mockito.dart' as _i1; import 'package:mockito/src/dummies.dart' as _i6; @@ -30,7 +31,8 @@ import '../_fixtures/counter_fixtures.dart' as _i3; /// A class which mocks [TargetPersistenceAdapter]. /// /// See the documentation for Mockito's code generation for more information. -class MockTargetPersistenceAdapter extends _i1.Mock implements _i2.TargetPersistenceAdapter<_i3.Counter> { +class MockTargetPersistenceAdapter extends _i1.Mock + implements _i2.TargetPersistenceAdapter<_i3.Counter> { @override _i4.Future<_i3.Counter> fetchAsync(_i5.StreamId? streamId) => (super.noSuchMethod( @@ -53,17 +55,26 @@ class MockTargetPersistenceAdapter extends _i1.Mock implements _i2.TargetPersist @override _i4.Future persistAsync( _i5.StreamId? streamId, - _i3.Counter? aggregate, + _i3.Counter? target, List<_i5.Operation>? pendingOperations, ) => (super.noSuchMethod( Invocation.method(#persistAsync, [ streamId, - aggregate, + target, pendingOperations, ]), returnValue: _i4.Future.value(), returnValueForMissingStub: _i4.Future.value(), ) as _i4.Future); + + @override + _i4.Future deleteAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#deleteAsync, [streamId]), + returnValue: _i4.Future.value(), + returnValueForMissingStub: _i4.Future.value(), + ) + as _i4.Future); } diff --git a/packages/continuum_state/test/persistence/state_based_store_test.mocks.dart b/packages/continuum_state/test/persistence/state_based_store_test.mocks.dart index 9b3d8e9..f7bbfc8 100644 --- a/packages/continuum_state/test/persistence/state_based_store_test.mocks.dart +++ b/packages/continuum_state/test/persistence/state_based_store_test.mocks.dart @@ -6,7 +6,8 @@ import 'dart:async' as _i4; import 'package:continuum/continuum.dart' as _i5; -import 'package:continuum_state/continuum_state.dart' as _i2; +import 'package:continuum_state/src/persistence/target_persistence_adapter.dart' + as _i2; import 'package:mockito/mockito.dart' as _i1; import 'package:mockito/src/dummies.dart' as _i6; @@ -30,7 +31,8 @@ import '../_fixtures/counter_fixtures.dart' as _i3; /// A class which mocks [TargetPersistenceAdapter]. /// /// See the documentation for Mockito's code generation for more information. -class MockTargetPersistenceAdapter extends _i1.Mock implements _i2.TargetPersistenceAdapter<_i3.Counter> { +class MockTargetPersistenceAdapter extends _i1.Mock + implements _i2.TargetPersistenceAdapter<_i3.Counter> { @override _i4.Future<_i3.Counter> fetchAsync(_i5.StreamId? streamId) => (super.noSuchMethod( @@ -53,17 +55,26 @@ class MockTargetPersistenceAdapter extends _i1.Mock implements _i2.TargetPersist @override _i4.Future persistAsync( _i5.StreamId? streamId, - _i3.Counter? aggregate, + _i3.Counter? target, List<_i5.Operation>? pendingOperations, ) => (super.noSuchMethod( Invocation.method(#persistAsync, [ streamId, - aggregate, + target, pendingOperations, ]), returnValue: _i4.Future.value(), returnValueForMissingStub: _i4.Future.value(), ) as _i4.Future); + + @override + _i4.Future deleteAsync(_i5.StreamId? streamId) => + (super.noSuchMethod( + Invocation.method(#deleteAsync, [streamId]), + returnValue: _i4.Future.value(), + returnValueForMissingStub: _i4.Future.value(), + ) + as _i4.Future); } diff --git a/packages/continuum_store_hive/lib/src/hive_event_store.dart b/packages/continuum_store_hive/lib/src/hive_event_store.dart index 338bbd0..3a07c7a 100644 --- a/packages/continuum_store_hive/lib/src/hive_event_store.dart +++ b/packages/continuum_store_hive/lib/src/hive_event_store.dart @@ -34,6 +34,14 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { /// enabling [getStreamIdsByAggregateTypeAsync] queries. final Box _aggregateTypesBox; + /// The Hive box for storing soft-deletion tombstone flags. + /// + /// Maps stream ID values to `true` when the stream has been + /// soft-deleted via [softDeleteStreamAsync]. Streams present in + /// this box are excluded from [loadStreamAsync] and + /// [getStreamIdsByAggregateTypeAsync]. + final Box _deletedStreamsBox; + /// Global sequence counter for ordered projections. int _globalSequence; @@ -46,11 +54,13 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { required Box streamsBox, required Box transactionsBox, required Box aggregateTypesBox, + required Box deletedStreamsBox, required int globalSequence, }) : _eventsBox = eventsBox, _streamsBox = streamsBox, _transactionsBox = transactionsBox, _aggregateTypesBox = aggregateTypesBox, + _deletedStreamsBox = deletedStreamsBox, _globalSequence = globalSequence, _exclusiveOperation = Future.value(); @@ -63,6 +73,7 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { final streamsBox = await Hive.openBox('${boxName}_streams'); final transactionsBox = await Hive.openBox('$boxName$_transactionsBoxSuffix'); final aggregateTypesBox = await Hive.openBox('${boxName}_aggregate_types'); + final deletedStreamsBox = await Hive.openBox('${boxName}_deleted_streams'); await _recoverIncompleteTransactionsAsync( eventsBox: eventsBox, @@ -78,6 +89,7 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { streamsBox: streamsBox, transactionsBox: transactionsBox, aggregateTypesBox: aggregateTypesBox, + deletedStreamsBox: deletedStreamsBox, globalSequence: globalSequence, ); } @@ -140,6 +152,11 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { @override Future> loadStreamAsync(StreamId streamId) async { return _runExclusiveAsync>(() async { + // Soft-deleted streams are treated as non-existent. + if (_deletedStreamsBox.containsKey(streamId.value)) { + return []; + } + // Get the current version for this stream final int? currentVersion = _streamsBox.get(streamId.value); if (currentVersion == null) { @@ -299,6 +316,7 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { await _streamsBox.close(); await _transactionsBox.close(); await _aggregateTypesBox.close(); + await _deletedStreamsBox.close(); }); } @@ -309,15 +327,26 @@ final class HiveEventStore implements AtomicEventStore, ProjectionEventStore { return _runExclusiveAsync>(() async { final List result = []; for (final dynamic key in _aggregateTypesBox.keys) { - final String? storedType = _aggregateTypesBox.get(key); - if (storedType == aggregateType) { - result.add(StreamId(key as String)); + final String keyString = key as String; + final String? storedType = _aggregateTypesBox.get(keyString); + // Exclude soft-deleted streams from type-based queries. + if (storedType == aggregateType && !_deletedStreamsBox.containsKey(keyString)) { + result.add(StreamId(keyString)); } } return result; }); } + @override + Future softDeleteStreamAsync(StreamId streamId) async { + return _runExclusiveAsync(() async { + // Idempotent — marking an already-deleted or non-existent stream + // is a no-op (Hive put is naturally idempotent). + await _deletedStreamsBox.put(streamId.value, true); + }); + } + /// Creates a composite key for an event. String _eventKey(StreamId streamId, int version) => '${streamId.value}:$version'; diff --git a/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart b/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart index 1a09c98..b4dee4a 100644 --- a/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart +++ b/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart @@ -78,6 +78,13 @@ final class HivePersistenceAdapter implements TargetPersistenceAdapt await _box.put(streamId.value, json); } + @override + Future deleteAsync(StreamId streamId) async { + // Remove the target from the Hive box. Idempotent — deleting a + // non-existent key is a no-op in Hive. + await _box.delete(streamId.value); + } + /// Closes the underlying Hive box. /// /// Call this when the adapter is no longer needed to release resources. diff --git a/packages/continuum_store_memory/lib/src/in_memory_event_store.dart b/packages/continuum_store_memory/lib/src/in_memory_event_store.dart index e00857e..4171f0b 100644 --- a/packages/continuum_store_memory/lib/src/in_memory_event_store.dart +++ b/packages/continuum_store_memory/lib/src/in_memory_event_store.dart @@ -15,11 +15,23 @@ final class InMemoryEventStore implements AtomicEventStore, ProjectionEventStore /// Per-stream aggregate type metadata for type-based lookups. final Map _aggregateTypes = {}; + /// Tombstone set — stream IDs that have been soft-deleted. + /// + /// Soft-deleted streams are excluded from [loadStreamAsync] and + /// [getStreamIdsByAggregateTypeAsync] but their events remain in + /// [_streams] for auditability. + final Set _deletedStreams = {}; + /// Global sequence counter for optional global ordering. int _globalSequence = 0; @override Future> loadStreamAsync(StreamId streamId) async { + // Soft-deleted streams are treated as non-existent. + if (_deletedStreams.contains(streamId)) { + return []; + } + // Return a copy of the events to prevent external modification final events = _streams[streamId]; if (events == null) { @@ -113,8 +125,19 @@ final class InMemoryEventStore implements AtomicEventStore, ProjectionEventStore Future> getStreamIdsByAggregateTypeAsync( String aggregateType, ) async { - // Filter the aggregate type map for streams matching the given type. - return _aggregateTypes.entries.where((entry) => entry.value == aggregateType).map((entry) => entry.key).toList(); + // Filter the aggregate type map for streams matching the given type, + // excluding soft-deleted streams. + return _aggregateTypes.entries + .where((entry) => entry.value == aggregateType && !_deletedStreams.contains(entry.key)) + .map((entry) => entry.key) + .toList(); + } + + @override + Future softDeleteStreamAsync(StreamId streamId) async { + // Idempotent — adding an already-deleted or non-existent stream + // is a no-op. + _deletedStreams.add(streamId); } /// Validates optimistic concurrency expectations. @@ -144,6 +167,7 @@ final class InMemoryEventStore implements AtomicEventStore, ProjectionEventStore void clear() { _streams.clear(); _aggregateTypes.clear(); + _deletedStreams.clear(); _globalSequence = 0; } diff --git a/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart b/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart index 63d77f2..7b0f2f3 100644 --- a/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart +++ b/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart @@ -57,6 +57,13 @@ final class InMemoryPersistenceAdapter implements TargetPersistenceA _storage[streamId.value] = _toJson(target); } + @override + Future deleteAsync(StreamId streamId) async { + // Remove the target from in-memory storage. Idempotent — removing + // a non-existent key is a no-op. + _storage.remove(streamId.value); + } + /// The number of stored targets. int get length => _storage.length; diff --git a/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart b/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart index 393437c..5c0edfc 100644 --- a/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart +++ b/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart @@ -207,6 +207,58 @@ void main() { expect(fetched.age, isNull); }); }); + + group('deleteAsync', () { + test('removes a stored aggregate', () async { + // Arrange — store a profile, then delete it. + const streamId = StreamId('profile-del'); + final profile = _Profile(id: 'profile-del', name: 'Deleted', age: 1); + await adapter.persistAsync(streamId, profile, []); + + // Act — delete the stored aggregate. + await adapter.deleteAsync(streamId); + + // Assert — fetch should throw because the entity is gone. + await expectLater( + adapter.fetchAsync(streamId), + throwsA(isA()), + ); + expect(adapter.length, equals(0)); + }); + + test('is idempotent for non-existent stream', () async { + // Arrange — empty store. + const streamId = StreamId('nonexistent'); + + // Act & Assert — deleting a missing key should not throw. + await adapter.deleteAsync(streamId); + expect(adapter.length, equals(0)); + }); + + test('only removes the targeted stream', () async { + // Arrange — store two profiles. + const streamId1 = StreamId('keep'); + const streamId2 = StreamId('remove'); + await adapter.persistAsync( + streamId1, + _Profile(id: 'keep', name: 'Keep', age: 1), + [], + ); + await adapter.persistAsync( + streamId2, + _Profile(id: 'remove', name: 'Remove', age: 2), + [], + ); + + // Act — delete only the second profile. + await adapter.deleteAsync(streamId2); + + // Assert — first profile is still there, second is gone. + final fetched = await adapter.fetchAsync(streamId1); + expect(fetched.name, equals('Keep')); + expect(adapter.length, equals(1)); + }); + }); }); } diff --git a/packages/continuum_store_sembast/lib/src/sembast_event_store.dart b/packages/continuum_store_sembast/lib/src/sembast_event_store.dart index 3b6c8a3..5e7cd6f 100644 --- a/packages/continuum_store_sembast/lib/src/sembast_event_store.dart +++ b/packages/continuum_store_sembast/lib/src/sembast_event_store.dart @@ -26,6 +26,9 @@ final class SembastEventStore implements AtomicEventStore, ProjectionEventStore /// Store name for per-stream aggregate type metadata. static const String _aggregateTypesStoreName = 'aggregate_types'; + /// Store name for soft-deletion tombstone flags. + static const String _deletedStreamsStoreName = 'deleted_streams'; + /// The Sembast database instance. final Database _database; @@ -41,6 +44,12 @@ final class SembastEventStore implements AtomicEventStore, ProjectionEventStore /// each stream, enabling [getStreamIdsByAggregateTypeAsync] queries. final StoreRef _aggregateTypesStore; + /// The Sembast store for soft-deletion tombstone flags, keyed by stream ID. + /// + /// Streams present in this store are treated as deleted and excluded + /// from [loadStreamAsync] and [getStreamIdsByAggregateTypeAsync]. + final StoreRef _deletedStreamsStore; + /// Global sequence counter for ordered projections. int _globalSequence; @@ -50,6 +59,7 @@ final class SembastEventStore implements AtomicEventStore, ProjectionEventStore _eventsStore = StoreRef(_eventsStoreName), _streamsStore = StoreRef(_streamsStoreName), _aggregateTypesStore = StoreRef(_aggregateTypesStoreName), + _deletedStreamsStore = StoreRef(_deletedStreamsStoreName), _globalSequence = globalSequence; /// Opens a Sembast event store using the provided [databaseFactory] and [dbPath]. @@ -101,6 +111,12 @@ final class SembastEventStore implements AtomicEventStore, ProjectionEventStore @override Future> loadStreamAsync(StreamId streamId) async { + // Soft-deleted streams are treated as non-existent. + final bool? isDeleted = await _deletedStreamsStore.record(streamId.value).get(_database); + if (isDeleted == true) { + return []; + } + // Look up the current version for this stream. final RecordSnapshot? streamRecord = await _streamsStore.record(streamId.value).getSnapshot(_database); @@ -291,7 +307,18 @@ final class SembastEventStore implements AtomicEventStore, ProjectionEventStore // Scan the aggregate types store for streams matching the given type. final List> records = await _aggregateTypesStore.find(_database); - return records.where((record) => record.value == aggregateType).map((record) => StreamId(record.key)).toList(); + // Load the set of soft-deleted stream IDs so they can be excluded. + final List> deletedRecords = await _deletedStreamsStore.find(_database); + final Set deletedStreamIds = deletedRecords.map((record) => record.key).toSet(); + + return records.where((record) => record.value == aggregateType && !deletedStreamIds.contains(record.key)).map((record) => StreamId(record.key)).toList(); + } + + @override + Future softDeleteStreamAsync(StreamId streamId) async { + // Idempotent — marking an already-deleted or non-existent stream + // is a no-op (Sembast put is naturally idempotent). + await _deletedStreamsStore.record(streamId.value).put(_database, true); } // --------------------------------------------------------------------------- diff --git a/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart b/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart index 0363c43..ecfc462 100644 --- a/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart +++ b/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart @@ -71,6 +71,13 @@ final class SembastPersistenceAdapter implements TargetPersistenceAd await _store.record(streamId.value).put(_database, json); } + @override + Future deleteAsync(StreamId streamId) async { + // Remove the target from the Sembast store. Idempotent — deleting + // a non-existent record is a no-op in Sembast. + await _store.record(streamId.value).delete(_database); + } + /// Returns the number of stored targets. Future countAsync() async { return _store.count(_database); diff --git a/packages/continuum_uow/lib/src/session.dart b/packages/continuum_uow/lib/src/session.dart index 56fea5a..0461355 100644 --- a/packages/continuum_uow/lib/src/session.dart +++ b/packages/continuum_uow/lib/src/session.dart @@ -11,24 +11,24 @@ abstract interface class Session { /// Loads a single aggregate by its [streamId]. /// /// Returns the aggregate instance hydrated from persistence. The - /// aggregate is cached in the identity map — subsequent calls with + /// target is cached in the identity map — subsequent calls with /// the same [streamId] return the cached instance. - Future loadAsync(StreamId streamId); + Future loadAsync(StreamId streamId); - /// Loads all aggregates of type [TAggregate]. + /// Loads all targets of type [TTarget]. /// - /// Returns all aggregate instances of the requested type from - /// persistence. Each loaded aggregate is cached in the identity map. - Future> loadAllAsync(); + /// Returns all target instances of the requested type from + /// persistence. Each loaded target is cached in the identity map. + Future> loadAllAsync(); - /// Applies an [operation] to the aggregate identified by [streamId]. + /// Applies an [operation] to the target identified by [streamId]. /// /// The operation parameter type is [Operation], allowing both /// [ContinuumEvent] instances and plain [Operation] implementations /// to be used interchangeably. /// - /// Returns the aggregate instance after the operation is processed. - Future applyAsync( + /// Returns the target instance after the operation is processed. + Future applyAsync( StreamId streamId, Operation operation, ); @@ -41,15 +41,25 @@ abstract interface class Session { /// many times the save should be retried on concurrency conflicts. Future saveChangesAsync({int maxRetries = 1}); + /// Marks the target identified by [streamId] for deletion. + /// + /// The target does not need to be loaded first — the stream ID + /// alone is sufficient. If the target is tracked in the identity + /// map it will be removed on save; otherwise the deletion is + /// forwarded directly to the persistence backend. + /// + /// The actual deletion happens when [saveChangesAsync] is called. + void deleteAsync(StreamId streamId); + /// Discards all pending operations for the given [streamId]. /// - /// The aggregate remains in the identity map but its pending + /// The target remains in the identity map but its pending /// operations are cleared. void discardStream(StreamId streamId); /// Discards all pending operations across all tracked streams. /// - /// All aggregates remain in the identity map but their pending + /// All targets remain in the identity map but their pending /// operations are cleared. void discardAll(); } diff --git a/packages/continuum_uow/lib/src/session_base.dart b/packages/continuum_uow/lib/src/session_base.dart index bbcf24b..e8bb4a9 100644 --- a/packages/continuum_uow/lib/src/session_base.dart +++ b/packages/continuum_uow/lib/src/session_base.dart @@ -37,6 +37,14 @@ abstract class SessionBase implements Session { /// in-memory entity instance for the lifetime of the session. final Map trackedEntities = {}; + /// Stream IDs marked for deletion during the next commit. + /// + /// Populated by [deleteAsync], consumed and cleared by + /// [saveChangesAsync]. Subclass implementations must iterate + /// this set during save to perform the actual deletion. + @protected + final Set streamsMarkedForDeletion = {}; + /// Creates a session base with shared dependencies. /// /// Requires an [AggregateFactoryRegistry] for creation dispatch, @@ -266,6 +274,16 @@ abstract class SessionBase implements Session { } } + @override + void deleteAsync(StreamId streamId) { + // Record the stream ID for deletion during the next commit. + // The actual deletion is handled by the subclass in + // saveChangesAsync. No prior load is required — the stream ID + // alone is sufficient for both state-based adapters and event + // store tombstone writes. + streamsMarkedForDeletion.add(streamId); + } + @override void discardStream(StreamId streamId) { final state = trackedEntities[streamId]; diff --git a/packages/continuum_uow/test/unit/session_base_test.dart b/packages/continuum_uow/test/unit/session_base_test.dart index 98a25e3..f0eed69 100644 --- a/packages/continuum_uow/test/unit/session_base_test.dart +++ b/packages/continuum_uow/test/unit/session_base_test.dart @@ -52,6 +52,9 @@ final class _InMemoryTestSession extends SessionBase { Map? store, }) : store = store ?? {}; + /// Exposes the protected deletion set for test assertions. + Set get deletionSet => streamsMarkedForDeletion; + @override Future loadAsync(StreamId streamId) async { final entry = store[streamId]; @@ -88,6 +91,7 @@ final class _InMemoryTestSession extends SessionBase { @override Future saveChangesAsync({int maxRetries = 1}) async { final pendingEntries = trackedEntities.entries.where((e) => e.value.pendingOperations.isNotEmpty).toList(); + final deletionEntries = trackedEntities.entries.where((e) => streamsMarkedForDeletion.contains(e.key)).toList(); // Apply deferred operations before saving. applyDeferredOperations(pendingEntries); @@ -112,6 +116,15 @@ final class _InMemoryTestSession extends SessionBase { trackedEntities[entry.key] = entry.value.withClearedPendingOperations(); } + // Remove deleted entities from the identity map and clear the + // deletion set (handles both tracked and untracked deletions). + for (final entry in deletionEntries) { + trackedEntities.remove(entry.key); + } + for (final streamId in Set.of(streamsMarkedForDeletion)) { + streamsMarkedForDeletion.remove(streamId); + } + return CommitBatch(entries: entries); } @@ -551,5 +564,73 @@ void main() { ); }); }); + + group('deleteAsync', () { + test('marks a tracked entity for deletion', () async { + // Arrange — create an entity, then mark it for deletion. + final (:factories, :appliers) = _buildCounterRegistries(); + final session = _InMemoryTestSession( + aggregateFactories: factories, + eventAppliers: appliers, + applicationMode: EventApplicationMode.eager, + ); + const streamId = StreamId('counter-1'); + + await session.applyAsync<_Counter>( + streamId, + const _CreateCounterOperation(10), + ); + + // Act — mark for deletion. + session.deleteAsync(streamId); + + // Assert — the entity should be marked for deletion in the + // identity map. + expect( + session.deletionSet.contains(streamId), + isTrue, + ); + }); + + test('Accepts untracked stream without throwing', () { + // Arrange — session has no entities loaded. + final (:factories, :appliers) = _buildCounterRegistries(); + final session = _InMemoryTestSession( + aggregateFactories: factories, + eventAppliers: appliers, + applicationMode: EventApplicationMode.eager, + ); + const streamId = StreamId('unknown-stream'); + + // Act — deleting an untracked stream should not throw. + session.deleteAsync(streamId); + + // Assert — the stream is in the deletion set. + expect(session.deletionSet.contains(streamId), isTrue); + }); + + test('saveChangesAsync removes deleted entities from identity map', () async { + // Arrange — create, delete, save. + final (:factories, :appliers) = _buildCounterRegistries(); + final session = _InMemoryTestSession( + aggregateFactories: factories, + eventAppliers: appliers, + applicationMode: EventApplicationMode.eager, + ); + const streamId = StreamId('counter-1'); + + await session.applyAsync<_Counter>( + streamId, + const _CreateCounterOperation(10), + ); + session.deleteAsync(streamId); + + // Act — save should process the deletion. + await session.saveChangesAsync(); + + // Assert — entity should no longer be in the identity map. + expect(session.trackedEntities.containsKey(streamId), isFalse); + }); + }); }); } diff --git a/packages/continuum_uow/test/unit/transactional_runner_test.dart b/packages/continuum_uow/test/unit/transactional_runner_test.dart index 502d59b..b930cd1 100644 --- a/packages/continuum_uow/test/unit/transactional_runner_test.dart +++ b/packages/continuum_uow/test/unit/transactional_runner_test.dart @@ -42,6 +42,9 @@ final class _StubSession implements Session { @override void discardStream(StreamId streamId) {} + @override + void deleteAsync(StreamId streamId) {} + @override void discardAll() {} } From 080aa2c0958549f9f568596c7f44d8df5f64555c Mon Sep 17 00:00:00 2001 From: Daniel Date: Sat, 21 Feb 2026 13:07:36 +0800 Subject: [PATCH 2/2] Test fixes --- .../continuum/example/lib/continuum.g.dart | 10 ++++++++-- .../lib/src/combining_builder.dart | 5 ++++- .../test/combining_builder_test.dart | 19 +++++++++++-------- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/packages/continuum/example/lib/continuum.g.dart b/packages/continuum/example/lib/continuum.g.dart index 83ac7c7..aa32c66 100644 --- a/packages/continuum/example/lib/continuum.g.dart +++ b/packages/continuum/example/lib/continuum.g.dart @@ -9,16 +9,22 @@ import 'abstract_interface_targets.dart'; import 'domain/projections/user_profile_projection.dart'; import 'domain/user.dart'; -/// All discovered aggregates in this package. +/// All discovered operation targets in this package. /// /// Pass this list to [EventSourcingStore] for automatic /// registration of all serializers, factories, and appliers. -final List $aggregateList = [ +final List $targetList = [ $AbstractUserBase, $User, $UserContract, ]; +/// Backward-compatible alias for [$targetList]. +/// +/// This package historically exposed discovered targets as `$aggregateList`. +@Deprecated(r'Use $targetList instead.') +final List $aggregateList = $targetList; + /// All discovered projections in this package. /// /// Use this list to register all projections with the registry, diff --git a/packages/continuum_generator/lib/src/combining_builder.dart b/packages/continuum_generator/lib/src/combining_builder.dart index ff11982..0ccc41d 100644 --- a/packages/continuum_generator/lib/src/combining_builder.dart +++ b/packages/continuum_generator/lib/src/combining_builder.dart @@ -154,7 +154,10 @@ class CombiningBuilder implements Builder { buffer.writeln('/// Backward-compatible alias for [\$targetList].'); buffer.writeln('///'); buffer.writeln('/// This package historically exposed discovered targets as `\$aggregateList`.'); - buffer.writeln("@Deprecated('Use \targetList instead.')"); + // Use a raw string literal so `$targetList` is not treated as string + // interpolation by the Dart compiler. + // Emit the `$` via `${'\$'}` so the generator does not try to interpolate. + buffer.writeln("@Deprecated(r'Use ${'\$'}targetList instead.')"); buffer.writeln('final List \$aggregateList = \$targetList;'); buffer.writeln(); } diff --git a/packages/continuum_generator/test/combining_builder_test.dart b/packages/continuum_generator/test/combining_builder_test.dart index 82d3558..8c505b3 100644 --- a/packages/continuum_generator/test/combining_builder_test.dart +++ b/packages/continuum_generator/test/combining_builder_test.dart @@ -34,10 +34,10 @@ void main() { }); group('CombiningBuilder', () { - test('generates lib/continuum.g.dart for aggregate roots', () async { - // Arrange: Provide a synthetic `$lib$` input and a single annotated - // aggregate. This verifies the happy path where at least one aggregate - // exists and a combining output should be produced. + test('generates lib/continuum.g.dart for operation targets', () async { + // Arrange: Provide a synthetic `$lib$` input and a single operation + // target. This verifies the happy path where at least one target exists + // and a combining output should be produced. final builder = continuumCombiningBuilder(const BuilderOptions({})); // Act + Assert: The output should exist and contain expected imports and @@ -46,14 +46,17 @@ void main() { builder, { 'continuum_generator|lib/user.dart': """ -import 'package:bounded/bounded.dart'; +import 'package:continuum/continuum.dart'; final class UserId extends TypedIdentity { const UserId(super.value); } -class User extends AggregateRoot { - User(super.id); +@OperationTarget() +class User { + User(this.id); + + final UserId id; } """, }, @@ -67,7 +70,7 @@ class User extends AggregateRoot { contains("import 'user.dart';"), contains(r'final List $targetList = ['), contains(r' $User,'), - contains(r"@Deprecated('Use $targetList instead.')"), + contains(r"@Deprecated(r'Use $targetList instead.')"), contains(r'final List $aggregateList = $targetList;'), ), ),