Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `deleteAsync(StreamId)` to `Session` for marking aggregates for deletion within a unit of work. No prior load is required — the stream ID alone is sufficient.
- Added `deleteAsync(StreamId)` to `TargetPersistenceAdapter` for physical entity deletion in state-based persistence.
- Added `softDeleteStreamAsync(StreamId)` to `EventStore` for tombstone-based stream deletion in event sourcing.
- `StateBasedSession.saveChangesAsync` now processes deletion-marked entities by calling `adapter.deleteAsync` and removing them from the identity map.
- Event sourcing `SessionImpl.saveChangesAsync` now soft-deletes marked streams via `EventStore.softDeleteStreamAsync`, writing a tombstone flag instead of rejecting deletion.
- `InMemoryEventStore`, `HiveEventStore`, and `SembastEventStore` implement `softDeleteStreamAsync` — soft-deleted streams are excluded from `loadStreamAsync` and `getStreamIdsByAggregateTypeAsync` but their events are retained for auditability.
- `InMemoryPersistenceAdapter`, `HivePersistenceAdapter`, and `SembastPersistenceAdapter` now implement `deleteAsync`.


## [5.2.0] - 2026-02-20

Expand Down
10 changes: 8 additions & 2 deletions packages/continuum/example/lib/continuum.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions packages/continuum/example/lib/store_state_based.dart
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ class FakeUserApiAdapter implements TargetPersistenceAdapter<User> {
'(${pendingOperations.length} event(s)) → 200 OK',
);
}

@override
Future<void> deleteAsync(StreamId streamId) async {
// Simulate network latency.
await Future<void>.delayed(const Duration(milliseconds: 50));

_backendDb.remove(streamId.value);
print(' [Backend] DELETE /users/${streamId.value} → 200 OK');
}
}

/// Simple record representing server-side user state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ final class UserLocalDbAdapter implements TargetPersistenceAdapter<User> {
'deactivatedAt': target.deactivatedAt?.toIso8601String(),
});
}

@override
Future<void> deleteAsync(StreamId streamId) async {
// Remove the target from the local database.
await _db.delete(streamId.value);
}
}

// ── Example ─────────────────────────────────────────────────────────────────
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ final class FakeBackendApi {

print(' [Backend] PATCH /users/$id → 200 OK');
}

/// Simulates `DELETE /users/:id` — removes a user from the backend.
Future<void> deleteUserAsync(String id) async {
await Future<void>.delayed(const Duration(milliseconds: 30));

_database.remove(id);
print(' [Backend] DELETE /users/$id → 200 OK');
}
}

/// Simple record representing server-side user state.
Expand Down Expand Up @@ -183,6 +191,12 @@ final class UserApiAdapter implements TargetPersistenceAdapter<User> {
}
}
}

@override
Future<void> deleteAsync(StreamId streamId) async {
// Delete the user from the backend.
await _api.deleteUserAsync(streamId.value);
}
}

// ── Example ─────────────────────────────────────────────────────────────────
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,20 @@ abstract interface class EventStore {
/// passed to [appendEventsAsync] or [AtomicEventStore.appendEventsToStreamsAsync]
/// when events were first persisted for a stream.
///
/// Returns an empty list if no streams match.
/// Returns an empty list if no streams match. Streams that have been
/// soft-deleted via [softDeleteStreamAsync] are excluded.
Future<List<StreamId>> getStreamIdsByAggregateTypeAsync(
String aggregateType,
);

/// Marks a stream as deleted using a tombstone flag in stream
/// metadata.
///
/// The events remain in the store but the stream is treated as
/// non-existent: [loadStreamAsync] returns an empty list and
/// [getStreamIdsByAggregateTypeAsync] excludes it.
///
/// Must be idempotent — soft-deleting a non-existent or already
/// deleted stream is a no-op.
Future<void> softDeleteStreamAsync(StreamId streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ final class SessionImpl extends SessionBase {
for (var attempt = 0; attempt <= maxRetries; attempt++) {
try {
final batch = await _persistPendingEventsAsync();

// Soft-delete streams marked for deletion via tombstone metadata.
// This happens after successful event persistence so pending
// operations on the same stream are not lost.
await _softDeleteMarkedStreamsAsync();

return batch;
} on ConcurrencyException {
final isLastAttempt = attempt == maxRetries;
Expand All @@ -281,6 +287,20 @@ final class SessionImpl extends SessionBase {
return CommitBatch.empty;
}

/// Soft-deletes all streams in [streamsMarkedForDeletion] via
/// [EventStore.softDeleteStreamAsync] and removes them from the
/// identity map.
Future<void> _softDeleteMarkedStreamsAsync() async {
// Copy to avoid concurrent modification during iteration.
final streamIds = Set<StreamId>.of(streamsMarkedForDeletion);

for (final streamId in streamIds) {
await _eventStore.softDeleteStreamAsync(streamId);
trackedEntities.remove(streamId);
streamsMarkedForDeletion.remove(streamId);
}
}

/// Core persistence logic extracted so [saveChangesAsync] can retry it.
Future<CommitBatch> _persistPendingEventsAsync() async {
final pendingEntries = <MapEntry<StreamId, TrackedEntity>>[];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}

/// A class which mocks [AtomicEventStore].
Expand Down Expand Up @@ -142,4 +151,13 @@ class MockAtomicEventStore extends _i1.Mock implements _i7.AtomicEventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}

/// A class which mocks [AtomicEventStore].
Expand Down Expand Up @@ -142,4 +151,13 @@ class MockAtomicEventStore extends _i1.Mock implements _i7.AtomicEventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ class MockEventStore extends _i1.Mock implements _i2.EventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}

/// A class which mocks [AtomicEventStore].
Expand Down Expand Up @@ -142,4 +151,13 @@ class MockAtomicEventStore extends _i1.Mock implements _i7.AtomicEventStore {
),
)
as _i3.Future<List<_i5.StreamId>>);

@override
_i3.Future<void> softDeleteStreamAsync(_i5.StreamId? streamId) =>
(super.noSuchMethod(
Invocation.method(#softDeleteStreamAsync, [streamId]),
returnValue: _i3.Future<void>.value(),
returnValueForMissingStub: _i3.Future<void>.value(),
)
as _i3.Future<void>);
}
5 changes: 4 additions & 1 deletion packages/continuum_generator/lib/src/combining_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ class CombiningBuilder implements Builder {
buffer.writeln('/// Backward-compatible alias for [\$targetList].');
buffer.writeln('///');
buffer.writeln('/// This package historically exposed discovered targets as `\$aggregateList`.');
buffer.writeln("@Deprecated('Use \$targetList instead.')");
// Use a raw string literal so `$targetList` is not treated as string
// interpolation by the Dart compiler.
// Emit the `$` via `${'\$'}` so the generator does not try to interpolate.
buffer.writeln("@Deprecated(r'Use ${'\$'}targetList instead.')");
buffer.writeln('final List<GeneratedAggregate> \$aggregateList = \$targetList;');
buffer.writeln();
}
Expand Down
19 changes: 11 additions & 8 deletions packages/continuum_generator/test/combining_builder_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ void main() {
});

group('CombiningBuilder', () {
test('generates lib/continuum.g.dart for aggregate roots', () async {
// Arrange: Provide a synthetic `$lib$` input and a single annotated
// aggregate. This verifies the happy path where at least one aggregate
// exists and a combining output should be produced.
test('generates lib/continuum.g.dart for operation targets', () async {
// Arrange: Provide a synthetic `$lib$` input and a single operation
// target. This verifies the happy path where at least one target exists
// and a combining output should be produced.
final builder = continuumCombiningBuilder(const BuilderOptions({}));

// Act + Assert: The output should exist and contain expected imports and
Expand All @@ -46,14 +46,17 @@ void main() {
builder,
{
'continuum_generator|lib/user.dart': """
import 'package:bounded/bounded.dart';
import 'package:continuum/continuum.dart';

final class UserId extends TypedIdentity<String> {
const UserId(super.value);
}

class User extends AggregateRoot<UserId> {
User(super.id);
@OperationTarget()
class User {
User(this.id);

final UserId id;
}
""",
},
Expand All @@ -67,7 +70,7 @@ class User extends AggregateRoot<UserId> {
contains("import 'user.dart';"),
contains(r'final List<GeneratedAggregate> $targetList = ['),
contains(r' $User,'),
contains(r"@Deprecated('Use $targetList instead.')"),
contains(r"@Deprecated(r'Use $targetList instead.')"),
contains(r'final List<GeneratedAggregate> $aggregateList = $targetList;'),
),
),
Expand Down
Loading