From 52bbf48d852f7a3f86da9ae6ad1b91ffb269b2bf Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 19 Feb 2026 15:20:29 +0800 Subject: [PATCH 1/3] Removed obsolete examples --- CHANGELOG.md | 13 + .../example/lib/hybrid/backend_api.dart | 36 -- .../continuum/example/lib/hybrid/dtos.dart | 64 ---- .../example/lib/hybrid_multi_step_form.dart | 73 ---- .../lib/hybrid_optimistic_creation.dart | 68 ---- .../example/lib/hybrid_profile_edit.dart | 70 ---- .../lib/store_state_based_transactional.dart | 316 ++++++++++++++++++ packages/continuum/example/main.dart | 14 +- 8 files changed, 335 insertions(+), 319 deletions(-) delete mode 100644 packages/continuum/example/lib/hybrid/backend_api.dart delete mode 100644 packages/continuum/example/lib/hybrid/dtos.dart delete mode 100644 packages/continuum/example/lib/hybrid_multi_step_form.dart delete mode 100644 packages/continuum/example/lib/hybrid_optimistic_creation.dart delete mode 100644 packages/continuum/example/lib/hybrid_profile_edit.dart create mode 100644 packages/continuum/example/lib/store_state_based_transactional.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 77ab3b3..0fa6c3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Removed + +- Removed obsolete hybrid examples from the `example/` package. The + following example files and their helper DTOs were removed because the + recommended pattern is now demonstrated in + `example/lib/store_state_based_transactional.dart`: + - `example/lib/hybrid_optimistic_creation.dart` + - `example/lib/hybrid_profile_edit.dart` + - `example/lib/hybrid_multi_step_form.dart` + - `example/lib/hybrid/backend_api.dart` + - `example/lib/hybrid/dtos.dart` + + ## [5.0.0] - 2026-02-19 ### Breaking Changes diff --git a/packages/continuum/example/lib/hybrid/backend_api.dart b/packages/continuum/example/lib/hybrid/backend_api.dart deleted file mode 100644 index 6620f40..0000000 --- a/packages/continuum/example/lib/hybrid/backend_api.dart +++ /dev/null @@ -1,36 +0,0 @@ -import 'dart:math'; - -import 'package:continuum_example/hybrid/dtos.dart'; - -/// Simulated backend API for hybrid-mode examples. -final class BackendApi { - final Random _random = Random(); - - Future createUser(CreateUserRequest request) async { - await Future.delayed(const Duration(milliseconds: 300)); - if (request.email.contains('invalid')) { - throw ApiValidationException('Email domain not allowed'); - } - - return UserDto( - id: 'user-${_random.nextInt(10000)}', - name: request.name, - email: request.email, - isActive: true, - ); - } - - Future updateUser(String userId, UpdateUserRequest request) async { - await Future.delayed(const Duration(milliseconds: 200)); - if (_random.nextDouble() < 0.1) { - throw ApiNetworkException('Connection timeout'); - } - - return UserDto( - id: userId, - name: request.name ?? 'Jane Doe', - email: request.email ?? 'existing@example.com', - isActive: true, - ); - } -} diff --git a/packages/continuum/example/lib/hybrid/dtos.dart b/packages/continuum/example/lib/hybrid/dtos.dart deleted file mode 100644 index 34f30f4..0000000 --- a/packages/continuum/example/lib/hybrid/dtos.dart +++ /dev/null @@ -1,64 +0,0 @@ -final class CreateUserRequest { - final String name; - final String email; - - CreateUserRequest({required this.name, required this.email}); - - Map toJson() { - return {'name': name, 'email': email}; - } -} - -final class UpdateUserRequest { - final String? name; - final String? email; - - UpdateUserRequest({this.name, this.email}); - - Map toJson() { - final Map result = {}; - if (name != null) { - result['name'] = name!; - } - if (email != null) { - result['email'] = email!; - } - return result; - } -} - -final class UserDto { - final String id; - final String name; - final String email; - final bool isActive; - - UserDto({ - required this.id, - required this.name, - required this.email, - required this.isActive, - }); -} - -final class ApiValidationException implements Exception { - final String message; - - ApiValidationException(this.message); - - @override - String toString() { - return message; - } -} - -final class ApiNetworkException implements Exception { - final String message; - - ApiNetworkException(this.message); - - @override - String toString() { - return message; - } -} diff --git a/packages/continuum/example/lib/hybrid_multi_step_form.dart b/packages/continuum/example/lib/hybrid_multi_step_form.dart deleted file mode 100644 index 02a11e9..0000000 --- a/packages/continuum/example/lib/hybrid_multi_step_form.dart +++ /dev/null @@ -1,73 +0,0 @@ -// ignore_for_file: file_names - -/// Example 6: Hybrid Mode - Multi-Step Forms with Cancel -/// -/// This example shows how to use events for multi-step forms where the user -/// can cancel without persisting anything. Events are only applied locally -/// until the user clicks "Submit". -/// -/// IMPORTANT: In hybrid mode there's NO EventSourcingStore or Session! -/// The aggregate is just a regular Dart object in memory. When you navigate -/// away, it gets garbage collected automatically if nothing references it. -library; - -import 'package:continuum_example/domain/events/email_changed.dart'; -import 'package:continuum_example/domain/events/user_registered.dart'; -import 'package:continuum_example/domain/user.dart'; - -void main() { - print('═══════════════════════════════════════════════════════════════════'); - print('Example 6: Hybrid Mode - Multi-Step Forms with Cancel'); - print('═══════════════════════════════════════════════════════════════════'); - print(''); - print('User fills out a registration wizard with multiple steps.'); - print('Each step applies events locally. Nothing sent until "Submit".'); - print(''); - - // User starts the registration wizard - print('Step 1: User enters basic info'); - final draftUser = User.createFromUserRegistered( - UserRegistered( - userId: const UserId('draft'), - name: 'Draft User', - email: 'step1@example.com', - ), - ); - print(' Current email: ${draftUser.email}'); - print(''); - - // User progresses to step 2 - print('Step 2: User updates email'); - draftUser.applyEvent( - EmailChanged( - newEmail: 'step2@example.com', - ), - ); - print(' Current email: ${draftUser.email}'); - print(''); - - // User progresses to step 3 - print('Step 3: User finalizes email'); - draftUser.applyEvent( - EmailChanged( - newEmail: 'final@example.com', - ), - ); - print(' Current email: ${draftUser.email}'); - print(''); - - // User clicks "Cancel" - print('User clicks "Cancel" button'); - print(' [Action] Navigate away - the draftUser object goes out of scope'); - print(' [Backend] No API calls made'); - print(' [Memory] draftUser has no more references → garbage collected'); - print(''); - - print('✓ No cleanup needed! Just let the object go out of scope.'); - print(' Dart\'s GC automatically reclaims memory when nothing references it.'); - print(''); - print('If user clicked "Submit" instead:'); - print(' → Convert draftUser state to CreateUserRequest(...)'); - print(' → Send to backend API'); - print(' → Replace with backend-returned authoritative state'); -} diff --git a/packages/continuum/example/lib/hybrid_optimistic_creation.dart b/packages/continuum/example/lib/hybrid_optimistic_creation.dart deleted file mode 100644 index 21494c1..0000000 --- a/packages/continuum/example/lib/hybrid_optimistic_creation.dart +++ /dev/null @@ -1,68 +0,0 @@ -// ignore_for_file: file_names - -/// Example 4: Hybrid Mode - Optimistic User Creation -/// -/// This example demonstrates hybrid mode where the frontend uses events for -/// local state modeling but sends DTOs (not events) to the backend. The backend -/// is the source of truth. -library; - -import 'package:continuum_example/domain/events/user_registered.dart'; -import 'package:continuum_example/domain/user.dart'; -import 'package:continuum_example/hybrid/backend_api.dart'; -import 'package:continuum_example/hybrid/dtos.dart'; - -void main() async { - print('═══════════════════════════════════════════════════════════════════'); - print('Example 4: Hybrid Mode - Optimistic User Creation'); - print('═══════════════════════════════════════════════════════════════════'); - print(''); - print('Frontend events are for LOCAL STATE only.'); - print('Backend has its own events - we just get the resulting state.'); - print(''); - - final api = BackendApi(); - - // Step 1: User fills out registration form. - // Show the new user immediately using a local event (optimistic update). - print('Step 1: User fills out form'); - final optimisticUser = User.createFromUserRegistered( - UserRegistered( - userId: const UserId('temp-new-user'), - name: 'Jane Doe', - email: 'jane@example.com', - ), - ); - - print(' [UI] Showing optimistic state: ${optimisticUser.name}'); - print(' [UI] Spinner indicates "Saving..."'); - print(''); - - // Step 2: Convert aggregate state to API request (NOT the event!) - print('Step 2: Send request to backend'); - final createRequest = CreateUserRequest( - name: optimisticUser.name, - email: optimisticUser.email, - ); - - print(' [API] POST /users ${createRequest.toJson()}'); - print(''); - - try { - // Step 3: Call the backend - final backendUser = await api.createUser(createRequest); - - // Step 4: SUCCESS! Discard optimistic state, use backend state. - print('Step 3: Backend responds'); - print(' [Backend] Created user with ID: ${backendUser.id}'); - print(' [UI] Replace optimistic state with authoritative backend state'); - print(''); - print('✓ Success! User sees instant feedback, backend remains source of truth.'); - } on ApiValidationException catch (e) { - print('Step 3: Backend error'); - print(' [Backend] Validation error: $e'); - print(' [UI] Show error message, user can fix and retry'); - print(''); - print('✓ Local state preserved for retry.'); - } -} diff --git a/packages/continuum/example/lib/hybrid_profile_edit.dart b/packages/continuum/example/lib/hybrid_profile_edit.dart deleted file mode 100644 index 45c8e61..0000000 --- a/packages/continuum/example/lib/hybrid_profile_edit.dart +++ /dev/null @@ -1,70 +0,0 @@ -// ignore_for_file: file_names - -/// Example 5: Hybrid Mode - Optimistic Profile Editing -/// -/// This example shows instant UI feedback when editing a profile. The frontend -/// applies events locally while the backend request is in flight. -library; - -import 'package:continuum_example/domain/events/email_changed.dart'; -import 'package:continuum_example/domain/events/user_registered.dart'; -import 'package:continuum_example/domain/user.dart'; -import 'package:continuum_example/hybrid/backend_api.dart'; -import 'package:continuum_example/hybrid/dtos.dart'; - -void main() async { - print('═══════════════════════════════════════════════════════════════════'); - print('Example 5: Hybrid Mode - Optimistic Profile Editing'); - print('═══════════════════════════════════════════════════════════════════'); - print(''); - - final api = BackendApi(); - - // Imagine we loaded this user from a previous API call - print('Initial state (loaded from backend):'); - final existingUser = User.createFromUserRegistered( - UserRegistered( - userId: const UserId('user-456'), - name: 'Jane Doe', - email: 'jane@example.com', - ), - ); - - print(' Email: ${existingUser.email}'); - print(''); - - // Step 1: User changes email in the form. - // Apply event LOCALLY for instant UI update. - print('Step 1: User changes email in form'); - existingUser.applyEvent( - EmailChanged( - newEmail: 'jane.doe@company.com', - ), - ); - - print(' [UI] Instant update: ${existingUser.email}'); - print(' [UI] Saving icon appears...'); - print(''); - - // Step 2: Convert to API request (just the changed field) - print('Step 2: Send update to backend'); - final updateRequest = UpdateUserRequest(email: existingUser.email); - print(' [API] PATCH /users/user-456 ${updateRequest.toJson()}'); - print(''); - - try { - final updatedUser = await api.updateUser('user-456', updateRequest); - print('Step 3: Backend confirms'); - print(' [Backend] Updated: ${updatedUser.email}'); - print(' [UI] Remove saving icon'); - print(''); - print('✓ User saw instant feedback, no waiting for network!'); - } on ApiNetworkException catch (e) { - print('Step 3: Network error'); - print(' [Backend] Error: $e'); - print(' [UI] Show "Save failed. Retry?" button'); - print(' [UI] Optimistic state still visible: ${existingUser.email}'); - print(''); - print('✓ Local state preserved - user can retry without losing changes.'); - } -} diff --git a/packages/continuum/example/lib/store_state_based_transactional.dart b/packages/continuum/example/lib/store_state_based_transactional.dart new file mode 100644 index 0000000..e694835 --- /dev/null +++ b/packages/continuum/example/lib/store_state_based_transactional.dart @@ -0,0 +1,316 @@ +/// State-Based Persistence with TransactionalRunner +/// +/// Demonstrates how to use `StateBasedStore` with `TransactionalRunner` for +/// apps backed by a traditional REST API. The backend is the source of truth — +/// it returns aggregates directly, and accepts commands like "update email" +/// or "deactivate account". There is no event store; events exist only in +/// memory as domain-level state mutations. +/// +/// What you'll learn: +/// - How to implement `AggregatePersistenceAdapter` backed by a REST API +/// - How `TransactionalRunner` manages session lifecycle (auto-commit) +/// - How to run multiple mutations in a single transaction +/// - How the adapter receives the post-mutation aggregate and pending +/// operations, letting it choose how to translate them into API calls +/// +/// Real-world use case: A mobile or web app where the backend owns +/// the user table. The frontend applies events locally for state management, +/// then the adapter syncs changed state to the backend on commit. +library; + +import 'package:continuum/continuum.dart'; +import 'package:continuum_state/continuum_state.dart'; +import 'package:continuum_uow/continuum_uow.dart'; + +import 'continuum.g.dart'; +import 'domain/events/email_changed.dart'; +import 'domain/events/user_deactivated.dart'; +import 'domain/events/user_registered.dart'; +import 'domain/user.dart'; + +// ── Fake Backend ──────────────────────────────────────────────────────────── + +/// Simulates a REST API backend that stores user records. +/// +/// In a real app this would be an HTTP client making requests to +/// endpoints like `GET /users/:id`, `PATCH /users/:id`, etc. +final class FakeBackendApi { + /// Server-side database keyed by user ID. + final Map _database = {}; + + /// Simulates `POST /users` — creates a new user on the backend. + Future createUserAsync(String id, String name, String email) async { + await Future.delayed(const Duration(milliseconds: 30)); + + _database[id] = UserRecord( + id: id, + name: name, + email: email, + isActive: true, + deactivatedAt: null, + ); + + print(' [Backend] POST /users/$id → 201 Created'); + } + + /// Simulates `GET /users/:id` — fetches user state from the backend. + Future getUserAsync(String id) async { + await Future.delayed(const Duration(milliseconds: 30)); + + final record = _database[id]; + if (record == null) { + throw StateError('User not found: $id'); + } + + print(' [Backend] GET /users/$id → 200 OK'); + return record; + } + + /// Simulates `PATCH /users/:id` — updates specific fields on the backend. + Future updateUserAsync( + String id, { + String? email, + bool? isActive, + DateTime? deactivatedAt, + }) async { + await Future.delayed(const Duration(milliseconds: 30)); + + final record = _database[id]; + if (record == null) { + throw StateError('User not found: $id'); + } + + // The backend applies only the fields that were sent. + _database[id] = UserRecord( + id: id, + name: record.name, + email: email ?? record.email, + isActive: isActive ?? record.isActive, + deactivatedAt: deactivatedAt ?? record.deactivatedAt, + ); + + print(' [Backend] PATCH /users/$id → 200 OK'); + } +} + +/// Simple record representing server-side user state. +/// +/// In a real app this would be a DTO parsed from JSON. +final class UserRecord { + final String id; + final String name; + final String email; + final bool isActive; + final DateTime? deactivatedAt; + + const UserRecord({ + required this.id, + required this.name, + required this.email, + required this.isActive, + required this.deactivatedAt, + }); +} + +// ── Adapter ───────────────────────────────────────────────────────────────── + +/// Bridges the `User` aggregate to the fake REST API. +/// +/// `fetchAsync` maps `GET /users/:id` → `User` domain object. +/// `persistAsync` inspects pending operations and translates them +/// into the appropriate backend calls (POST for creation, PATCH for +/// mutations). +/// +/// In production, replace `FakeBackendApi` with a real HTTP client. +final class UserApiAdapter implements AggregatePersistenceAdapter { + /// The backend API client injected at construction time. + final FakeBackendApi _api; + + /// Creates an adapter backed by the given API client. + UserApiAdapter({required FakeBackendApi api}) : _api = api; + + @override + Future fetchAsync(StreamId streamId) async { + // Load user state from the backend and reconstruct the aggregate. + final record = await _api.getUserAsync(streamId.value); + + // The backend returns all fields — we reconstruct the domain + // aggregate using its creation factory. This is the only place + // where backend ↔ domain mapping happens. + return User.createFromUserRegistered( + UserRegistered( + userId: UserId(record.id), + name: record.name, + email: record.email, + ), + ); + } + + @override + Future persistAsync( + StreamId streamId, + User aggregate, + List pendingOperations, + ) async { + // Inspect the pending operations to decide which backend calls + // to make. The adapter has full flexibility here — it can send + // the final aggregate state, translate each operation into a + // separate API call, or batch them. + for (final operation in pendingOperations) { + switch (operation) { + case UserRegistered(): + // Creation event → POST new user to the backend. + await _api.createUserAsync( + streamId.value, + aggregate.name, + aggregate.email, + ); + + case EmailChanged(): + // Email mutation → PATCH with the new email. + await _api.updateUserAsync( + streamId.value, + email: aggregate.email, + ); + + case UserDeactivated(): + // Deactivation → PATCH with the active flag and timestamp. + await _api.updateUserAsync( + streamId.value, + isActive: false, + deactivatedAt: aggregate.deactivatedAt, + ); + } + } + } +} + +// ── Example ───────────────────────────────────────────────────────────────── + +void main() async { + print('═══════════════════════════════════════════════════════════════════'); + print('State-Based Persistence with TransactionalRunner'); + print('═══════════════════════════════════════════════════════════════════'); + print(''); + + // The backend API client — in production, inject your HTTP client here. + final backendApi = FakeBackendApi(); + + // The adapter bridges the User aggregate to the backend. + final userAdapter = UserApiAdapter(api: backendApi); + + // Construct a StateBasedStore with one adapter per aggregate type. + // $aggregateList provides the generated event-application registries + // so the session knows how to apply events to aggregates. + final store = StateBasedStore( + adapters: {User: userAdapter}, + aggregates: $aggregateList, + ); + + // TransactionalRunner manages session lifecycle automatically: + // 1. Opens a session before the action + // 2. Commits (saveChangesAsync) after the action succeeds + // 3. Abandons the session if the action throws + final runner = TransactionalRunner(store: store); + + final userId = const StreamId('user-42'); + + // ── Transaction 1: Create a new user ────────────────────────────────── + + print('Transaction 1: Register a new user'); + print(''); + + await runner.runAsync(() async { + // Access the ambient session provided by the runner. + final session = TransactionalRunner.currentSession; + + // Apply the creation event — the session tracks the new aggregate + // and the adapter will receive a UserRegistered operation on commit. + final user = await session.applyAsync( + userId, + UserRegistered( + userId: const UserId('user-42'), + name: 'Alice Smith', + email: 'alice@example.com', + ), + ); + + print(' [Memory] Created: ${user.name} <${user.email}>'); + }); + // ← TransactionalRunner auto-committed here: the adapter's + // persistAsync received the UserRegistered operation and + // called POST /users/user-42. + + print(''); + + // ── Transaction 2: Load and update email ────────────────────────────── + + print('Transaction 2: Change email address'); + print(''); + + await runner.runAsync(() async { + final session = TransactionalRunner.currentSession; + + // Load the user from the backend via the adapter's fetchAsync. + final user = await session.loadAsync(userId); + print(' [Memory] Loaded: ${user.name} <${user.email}>'); + + // Apply a mutation — the session records this operation and + // the adapter will translate it into PATCH /users/user-42. + await session.applyAsync( + userId, + EmailChanged(newEmail: 'alice@company.com'), + ); + + print(' [Memory] Updated: ${user.name} <${user.email}>'); + }); + // ← Auto-committed: adapter received EmailChanged and called + // PATCH /users/user-42 with the new email. + + print(''); + + // ── Transaction 3: Multiple mutations in one transaction ────────────── + + print('Transaction 3: Change email and deactivate (batched)'); + print(''); + + await runner.runAsync(() async { + final session = TransactionalRunner.currentSession; + + // Load the user — fetched from the backend again. + final user = await session.loadAsync(userId); + print(' [Memory] Loaded: ${user.name} <${user.email}>'); + + // Apply two mutations in the same transaction. + // Both will be persisted atomically on commit. + await session.applyAsync( + userId, + EmailChanged(newEmail: 'alice.archived@company.com'), + ); + + await session.applyAsync( + userId, + UserDeactivated(deactivatedAt: DateTime.now()), + ); + + print( + ' [Memory] Updated: ${user.name} <${user.email}> ' + 'active=${user.isActive}', + ); + }); + // ← Auto-committed: adapter received both EmailChanged and + // UserDeactivated, calling PATCH twice. + + print(''); + + // ── Summary ─────────────────────────────────────────────────────────── + + print('═══════════════════════════════════════════════════════════════════'); + print('Key Takeaways:'); + print(' 1. StateBasedStore + adapter = no event store needed'); + print(' 2. Backend is the source of truth (REST API, GraphQL, etc.)'); + print(' 3. TransactionalRunner auto-commits on success'); + print(' 4. The adapter translates operations → backend API calls'); + print(' 5. Same session.applyAsync / loadAsync API as event sourcing'); + print('═══════════════════════════════════════════════════════════════════'); +} diff --git a/packages/continuum/example/main.dart b/packages/continuum/example/main.dart index bc1c76b..8c03d82 100644 --- a/packages/continuum/example/main.dart +++ b/packages/continuum/example/main.dart @@ -17,15 +17,14 @@ /// store_atomic_rollback.dart - Atomic rollback on conflict /// /// STATE-BASED PERSISTENCE (StateBasedStore + Adapter): -/// store_state_based.dart - State-based backend persistence +/// store_state_based.dart - State-based backend persistence +/// store_state_based_transactional.dart - State-based with TransactionalRunner /// /// PROJECTIONS (Read Models): /// projection_example.dart - Projection with code generation /// /// HYBRID MODE (Frontend Events + Backend State): -/// hybrid_optimistic_creation.dart - Optimistic user creation -/// hybrid_profile_edit.dart - Instant feedback when editing -/// hybrid_multi_step_form.dart - Multi-step forms with cancel +/// (removed - hybrid examples consolidated into state-based examples) /// /// To run any example: /// cd example @@ -53,15 +52,14 @@ void main() { print(' store_atomic_rollback.dart - Atomic rollback on conflict'); print(''); print('STATE-BASED PERSISTENCE (StateBasedStore + Adapter):'); - print(' store_state_based.dart - State-based backend persistence'); + print(' store_state_based.dart - State-based backend persistence'); + print(' store_state_based_transactional.dart - State-based with TransactionalRunner'); print(''); print('PROJECTIONS (Read Models):'); print(' projection_example.dart - Projection with code generation'); print(''); print('HYBRID MODE (Frontend Events + Backend State):'); - print(' hybrid_optimistic_creation.dart - Optimistic user creation'); - print(' hybrid_profile_edit.dart - Instant feedback editing'); - print(' hybrid_multi_step_form.dart - Multi-step forms with cancel'); + print(' (removed - see store_state_based_transactional.dart)'); print(''); print('Run any example:'); print(' dart run aggregate_creation.dart'); From 0fd2b643793ce5b6887b43de8604083ab1e075f8 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 19 Feb 2026 21:05:16 +0800 Subject: [PATCH 2/3] Added a new example --- CHANGELOG.md | 5 + .../lib/store_state_based_local_db.dart | 269 ++++++++++++++++++ packages/continuum/example/main.dart | 2 + 3 files changed, 276 insertions(+) create mode 100644 packages/continuum/example/lib/store_state_based_local_db.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fa6c3e..565cfcf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- New example `example/lib/store_state_based_transactional.dart` demonstrating `StateBasedStore` with `TransactionalRunner` and a REST API adapter. +- New example `example/lib/store_state_based_local_db.dart` demonstrating `StateBasedStore` with a local key-value database adapter. + ### Removed - Removed obsolete hybrid examples from the `example/` package. The diff --git a/packages/continuum/example/lib/store_state_based_local_db.dart b/packages/continuum/example/lib/store_state_based_local_db.dart new file mode 100644 index 0000000..18a24ed --- /dev/null +++ b/packages/continuum/example/lib/store_state_based_local_db.dart @@ -0,0 +1,269 @@ +/// State-Based Persistence with Local Database +/// +/// Demonstrates how to use `StateBasedStore` with a local database. +/// The adapter stores and loads aggregates as whole entities — no event +/// sourcing, no remote backend. This is the simplest persistence model: +/// load the full object, mutate it with domain events, and write it back. +/// +/// What you'll learn: +/// - How to implement `AggregatePersistenceAdapter` for a local database +/// - How the adapter serializes/deserializes whole aggregates (JSON maps) +/// - How `TransactionalRunner` auto-commits changes to the local DB +/// - How create vs. update is handled inside `persistAsync` +/// +/// Real-world use case: A mobile app that stores user profiles in a local +/// database (Hive, Sembast, Isar, SQLite, etc.) with no backend involved. +/// The database holds the full current state of each aggregate. +library; + +import 'dart:convert'; + +import 'package:continuum/continuum.dart'; +import 'package:continuum_state/continuum_state.dart'; +import 'package:continuum_uow/continuum_uow.dart'; + +import 'continuum.g.dart'; +import 'domain/events/email_changed.dart'; +import 'domain/events/user_deactivated.dart'; +import 'domain/events/user_registered.dart'; +import 'domain/user.dart'; + +// ── Fake Local Database ───────────────────────────────────────────────────── + +/// Simulates a local key-value database (like Hive, Sembast, or SharedPreferences). +/// +/// In a real app, replace this with your actual database client. The API +/// intentionally mimics a simple key-value store: `put`, `get`, `delete`. +final class FakeLocalDatabase { + /// Internal storage — simulates an on-disk key-value store. + final Map _storage = {}; + + /// Writes a JSON-encoded record to the database. + Future put(String key, Map value) async { + // Simulate disk I/O latency. + await Future.delayed(const Duration(milliseconds: 5)); + + _storage[key] = jsonEncode(value); + print(' [DB] PUT "$key" → ${jsonEncode(value)}'); + } + + /// Reads a JSON-encoded record from the database, + /// or `null` if the key does not exist. + Future?> get(String key) async { + await Future.delayed(const Duration(milliseconds: 5)); + + final raw = _storage[key]; + if (raw == null) { + print(' [DB] GET "$key" → null'); + return null; + } + + print(' [DB] GET "$key" → found'); + return jsonDecode(raw) as Map; + } + + /// Deletes a record from the database. + Future delete(String key) async { + await Future.delayed(const Duration(milliseconds: 5)); + _storage.remove(key); + print(' [DB] DELETE "$key"'); + } + + /// Returns all keys currently stored. + Iterable get keys => _storage.keys; +} + +// ── Adapter ───────────────────────────────────────────────────────────────── + +/// Bridges the `User` aggregate to a local key-value database. +/// +/// `fetchAsync` reads a JSON map from the database and reconstructs the +/// aggregate. `persistAsync` serializes the aggregate's current state +/// and writes it back. The pending operations list is ignored here — we +/// simply store the whole entity every time. +/// +/// This is the simplest adapter strategy: full-entity read/write. More +/// sophisticated adapters could diff fields, use SQL UPDATE for changed +/// columns only, etc. +final class UserLocalDbAdapter implements AggregatePersistenceAdapter { + /// The local database instance. + final FakeLocalDatabase _db; + + /// Creates an adapter backed by the given local database. + UserLocalDbAdapter({required FakeLocalDatabase db}) : _db = db; + + @override + Future fetchAsync(StreamId streamId) async { + final json = await _db.get(streamId.value); + if (json == null) { + throw StateError('User not found in local DB: ${streamId.value}'); + } + + // Deserialize from JSON map → domain aggregate. + // First create via the creation factory, then restore mutable fields + // that may have changed since creation. + final user = User.createFromUserRegistered( + UserRegistered( + userId: UserId(json['id'] as String), + name: json['name'] as String, + email: json['email'] as String, + ), + ); + + // Restore fields that are not part of the creation event. + user.email = json['email'] as String; + user.isActive = json['isActive'] as bool; + final deactivatedAtRaw = json['deactivatedAt'] as String?; + user.deactivatedAt = deactivatedAtRaw != null ? DateTime.parse(deactivatedAtRaw) : null; + + return user; + } + + @override + Future persistAsync( + StreamId streamId, + User aggregate, + List pendingOperations, + ) async { + // Serialize the full aggregate state to a JSON map and write it. + // We ignore pendingOperations entirely — just store the whole entity. + await _db.put(streamId.value, { + 'id': aggregate.id.value, + 'name': aggregate.name, + 'email': aggregate.email, + 'isActive': aggregate.isActive, + 'deactivatedAt': aggregate.deactivatedAt?.toIso8601String(), + }); + } +} + +// ── Example ───────────────────────────────────────────────────────────────── + +void main() async { + print('═══════════════════════════════════════════════════════════════════'); + print('State-Based Persistence with Local Database'); + print('═══════════════════════════════════════════════════════════════════'); + print(''); + + // The local database — in production, this would be Hive, Sembast, + // Isar, SharedPreferences, SQLite, etc. + final db = FakeLocalDatabase(); + + // The adapter bridges the User aggregate to the local database. + final userAdapter = UserLocalDbAdapter(db: db); + + // Construct a StateBasedStore — identical setup as the backend + // example, just with a different adapter implementation. + final store = StateBasedStore( + adapters: {User: userAdapter}, + aggregates: $aggregateList, + ); + + // TransactionalRunner manages session lifecycle. + final runner = TransactionalRunner(store: store); + + final userId = const StreamId('user-1'); + + // ── Transaction 1: Create a new user ────────────────────────────────── + + print('Transaction 1: Create a new user'); + print(''); + + await runner.runAsync(() async { + final session = TransactionalRunner.currentSession; + + final user = await session.applyAsync( + userId, + UserRegistered( + userId: const UserId('user-1'), + name: 'Bob Miller', + email: 'bob@example.com', + ), + ); + + print(' [Memory] Created: ${user.name} <${user.email}>'); + }); + // ← Auto-committed: adapter serialized the full User to the local DB. + + print(''); + + // ── Transaction 2: Load from DB, change email ──────────────────────── + + print('Transaction 2: Load from DB, change email'); + print(''); + + await runner.runAsync(() async { + final session = TransactionalRunner.currentSession; + + // Load the user from the local database. + final user = await session.loadAsync(userId); + print(' [Memory] Loaded: ${user.name} <${user.email}>'); + + // Apply a mutation — the session records this operation. + await session.applyAsync( + userId, + EmailChanged(newEmail: 'bob.miller@company.com'), + ); + + print(' [Memory] Updated: ${user.name} <${user.email}>'); + }); + // ← Auto-committed: adapter wrote the updated User back to the DB. + + print(''); + + // ── Transaction 3: Load, deactivate ─────────────────────────────────── + + print('Transaction 3: Load from DB, deactivate account'); + print(''); + + await runner.runAsync(() async { + final session = TransactionalRunner.currentSession; + + final user = await session.loadAsync(userId); + print( + ' [Memory] Loaded: ${user.name} <${user.email}> ' + 'active=${user.isActive}', + ); + + await session.applyAsync( + userId, + UserDeactivated(deactivatedAt: DateTime(2026, 2, 19)), + ); + + print( + ' [Memory] Updated: active=${user.isActive} ' + 'deactivatedAt=${user.deactivatedAt}', + ); + }); + // ← Auto-committed: full entity written back with isActive=false. + + print(''); + + // ── Verify: Load one more time to prove persistence ─────────────────── + + print('Verification: Load from DB to prove persistence'); + print(''); + + await runner.runAsync(() async { + final session = TransactionalRunner.currentSession; + + final user = await session.loadAsync(userId); + print( + ' [Memory] Final state: ${user.name} <${user.email}> ' + 'active=${user.isActive} deactivatedAt=${user.deactivatedAt}', + ); + }); + + print(''); + + // ── Summary ─────────────────────────────────────────────────────────── + + print('═══════════════════════════════════════════════════════════════════'); + print('Key Takeaways:'); + print(' 1. Same StateBasedStore API — only the adapter changes'); + print(' 2. Adapter stores/loads the whole entity as JSON'); + print(' 3. No backend needed — works offline with local storage'); + print(' 4. Replace FakeLocalDatabase with Hive, Sembast, Isar, etc.'); + print(' 5. pendingOperations can be ignored when doing full-entity writes'); + print('═══════════════════════════════════════════════════════════════════'); +} diff --git a/packages/continuum/example/main.dart b/packages/continuum/example/main.dart index 8c03d82..d1a10db 100644 --- a/packages/continuum/example/main.dart +++ b/packages/continuum/example/main.dart @@ -19,6 +19,7 @@ /// STATE-BASED PERSISTENCE (StateBasedStore + Adapter): /// store_state_based.dart - State-based backend persistence /// store_state_based_transactional.dart - State-based with TransactionalRunner +/// store_state_based_local_db.dart - State-based with local database /// /// PROJECTIONS (Read Models): /// projection_example.dart - Projection with code generation @@ -54,6 +55,7 @@ void main() { print('STATE-BASED PERSISTENCE (StateBasedStore + Adapter):'); print(' store_state_based.dart - State-based backend persistence'); print(' store_state_based_transactional.dart - State-based with TransactionalRunner'); + print(' store_state_based_local_db.dart - State-based with local database'); print(''); print('PROJECTIONS (Read Models):'); print(' projection_example.dart - Projection with code generation'); From 7eb92a45f638956c756053fec108f1498e079814 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 20 Feb 2026 13:11:49 +0800 Subject: [PATCH 3/3] Added new default implementations for the store adapters --- CHANGELOG.md | 3 + .../lib/continuum_store_hive.dart | 5 +- .../lib/src/hive_persistence_adapter.dart | 90 ++++++ packages/continuum_store_hive/pubspec.yaml | 1 + .../test/hive_persistence_adapter_test.dart | 258 +++++++++++++++++ .../lib/continuum_store_memory.dart | 6 +- .../src/in_memory_persistence_adapter.dart | 65 +++++ packages/continuum_store_memory/pubspec.yaml | 1 + .../in_memory_persistence_adapter_test.dart | 246 +++++++++++++++++ .../lib/continuum_store_sembast.dart | 5 +- .../lib/src/sembast_persistence_adapter.dart | 78 ++++++ packages/continuum_store_sembast/pubspec.yaml | 1 + .../sembast_persistence_adapter_test.dart | 260 ++++++++++++++++++ 13 files changed, 1013 insertions(+), 6 deletions(-) create mode 100644 packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart create mode 100644 packages/continuum_store_hive/test/hive_persistence_adapter_test.dart create mode 100644 packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart create mode 100644 packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart create mode 100644 packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart create mode 100644 packages/continuum_store_sembast/test/sembast_persistence_adapter_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 565cfcf..7dd4de4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New example `example/lib/store_state_based_transactional.dart` demonstrating `StateBasedStore` with `TransactionalRunner` and a REST API adapter. - New example `example/lib/store_state_based_local_db.dart` demonstrating `StateBasedStore` with a local key-value database adapter. +- `InMemoryPersistenceAdapter` in `continuum_store_memory` — a ready-made `AggregatePersistenceAdapter` backed by a plain `Map`, suitable for testing and development. +- `HivePersistenceAdapter` in `continuum_store_hive` — a ready-made `AggregatePersistenceAdapter` backed by a Hive `Box`, for local persistence. +- `SembastPersistenceAdapter` in `continuum_store_sembast` — a ready-made `AggregatePersistenceAdapter` backed by a Sembast store, for local persistence. ### Removed diff --git a/packages/continuum_store_hive/lib/continuum_store_hive.dart b/packages/continuum_store_hive/lib/continuum_store_hive.dart index ac9b895..d10fd1f 100644 --- a/packages/continuum_store_hive/lib/continuum_store_hive.dart +++ b/packages/continuum_store_hive/lib/continuum_store_hive.dart @@ -1,8 +1,9 @@ /// Hive-backed store implementations for the continuum library. /// -/// Provides persistent local [EventStore] and [ReadModelStore] -/// implementations using Hive. +/// Provides persistent local [EventStore], [ReadModelStore], and +/// [AggregatePersistenceAdapter] implementations using Hive. library; export 'src/hive_event_store.dart'; +export 'src/hive_persistence_adapter.dart'; export 'src/hive_read_model_store.dart'; diff --git a/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart b/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart new file mode 100644 index 0000000..2234013 --- /dev/null +++ b/packages/continuum_store_hive/lib/src/hive_persistence_adapter.dart @@ -0,0 +1,90 @@ +import 'dart:convert'; + +import 'package:continuum/continuum.dart'; +import 'package:continuum_state/continuum_state.dart'; +import 'package:hive/hive.dart'; + +/// Hive-backed implementation of [AggregatePersistenceAdapter]. +/// +/// Stores aggregates as JSON strings in a Hive [Box]. +/// Aggregates survive app restarts and device reboots. +/// +/// The caller provides [toJson] and [fromJson] callbacks for +/// aggregate serialization. The adapter stores the full entity +/// state on every persist — pending operations are ignored. +final class HivePersistenceAdapter implements AggregatePersistenceAdapter { + /// The Hive box storing JSON-encoded aggregates. + final Box _box; + + /// Serializes an aggregate to a JSON-compatible map. + final Map Function(TAggregate aggregate) _toJson; + + /// Deserializes an aggregate from a JSON-compatible map. + final TAggregate Function(Map json) _fromJson; + + /// Private constructor — use [openAsync] factory. + HivePersistenceAdapter._({ + required Box box, + required Map Function(TAggregate aggregate) toJson, + required TAggregate Function(Map json) fromJson, + }) : _box = box, + _toJson = toJson, + _fromJson = fromJson; + + /// Opens a Hive-backed persistence adapter with the given [boxName]. + /// + /// If the box already exists, it is reopened with existing data. + /// + /// The [toJson] and [fromJson] callbacks handle aggregate + /// serialization. Both callbacks are responsible for mapping all + /// mutable aggregate state — not just the creation fields. + static Future> openAsync({ + required String boxName, + required Map Function(TAggregate aggregate) toJson, + required TAggregate Function(Map json) fromJson, + }) async { + final box = await Hive.openBox(boxName); + return HivePersistenceAdapter._( + box: box, + toJson: toJson, + fromJson: fromJson, + ); + } + + @override + Future fetchAsync(StreamId streamId) async { + final json = _box.get(streamId.value); + if (json == null) { + throw StateError( + 'Aggregate not found in Hive store: ${streamId.value}', + ); + } + + // Decode the stored JSON string back into an aggregate. + final decoded = jsonDecode(json) as Map; + return _fromJson(decoded); + } + + @override + Future persistAsync( + StreamId streamId, + TAggregate aggregate, + List pendingOperations, + ) async { + // Encode the full aggregate state to a JSON string and store it. + // Pending operations are ignored — we always write the complete + // entity. + final json = jsonEncode(_toJson(aggregate)); + await _box.put(streamId.value, json); + } + + /// Closes the underlying Hive box. + /// + /// Call this when the adapter is no longer needed to release resources. + Future closeAsync() async { + await _box.close(); + } + + /// The number of stored aggregates. + int get length => _box.length; +} diff --git a/packages/continuum_store_hive/pubspec.yaml b/packages/continuum_store_hive/pubspec.yaml index e4dc963..b0ff09f 100644 --- a/packages/continuum_store_hive/pubspec.yaml +++ b/packages/continuum_store_hive/pubspec.yaml @@ -13,6 +13,7 @@ dependencies: zooper_flutter_core: ^2.0.0 continuum: ^5.0.0 continuum_event_sourcing: ^5.0.0 + continuum_state: ^5.0.0 hive: ^2.2.3 dev_dependencies: diff --git a/packages/continuum_store_hive/test/hive_persistence_adapter_test.dart b/packages/continuum_store_hive/test/hive_persistence_adapter_test.dart new file mode 100644 index 0000000..ca16e06 --- /dev/null +++ b/packages/continuum_store_hive/test/hive_persistence_adapter_test.dart @@ -0,0 +1,258 @@ +import 'dart:io'; + +import 'package:continuum/continuum.dart'; +import 'package:continuum_store_hive/continuum_store_hive.dart'; +import 'package:hive/hive.dart'; +import 'package:test/test.dart'; + +void main() { + group('HivePersistenceAdapter', () { + late Directory tempDir; + late HivePersistenceAdapter<_Profile> adapter; + + setUp(() async { + tempDir = await Directory.systemTemp.createTemp('hive_persistence_test_'); + Hive.init(tempDir.path); + adapter = await HivePersistenceAdapter.openAsync<_Profile>( + boxName: 'profiles', + toJson: (profile) => profile.toJson(), + fromJson: _Profile.fromJson, + ); + }); + + tearDown(() async { + await adapter.closeAsync(); + await Hive.close(); + if (tempDir.existsSync()) { + tempDir.deleteSync(recursive: true); + } + }); + + group('fetchAsync', () { + test('returns the stored aggregate when it exists', () async { + // Arrange — persist a profile first. + final streamId = const StreamId('profile-1'); + final profile = _Profile( + id: 'profile-1', + name: 'Alice', + age: 30, + ); + await adapter.persistAsync(streamId, profile, []); + + // Act + final fetched = await adapter.fetchAsync(streamId); + + // Assert — round-trip must preserve all fields. + expect(fetched.id, equals('profile-1')); + expect(fetched.name, equals('Alice')); + expect(fetched.age, equals(30)); + }); + + test('throws StateError when the aggregate does not exist', () async { + // Arrange — empty store. + final streamId = const StreamId('nonexistent'); + + // Act & Assert — fetching a missing aggregate must throw. + await expectLater( + adapter.fetchAsync(streamId), + throwsA(isA()), + ); + }); + }); + + group('persistAsync', () { + test('stores the aggregate so it can be fetched later', () async { + // Arrange + final streamId = const StreamId('profile-2'); + final profile = _Profile(id: 'profile-2', name: 'Bob', age: 25); + + // Act + await adapter.persistAsync(streamId, profile, []); + + // Assert — the aggregate must be retrievable. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Bob')); + }); + + test('overwrites existing aggregate on repeated persist', () async { + // Arrange — persist an initial version. + final streamId = const StreamId('profile-3'); + final original = _Profile(id: 'profile-3', name: 'Carol', age: 40); + await adapter.persistAsync(streamId, original, []); + + // Act — persist an updated version with the same stream ID. + final updated = _Profile( + id: 'profile-3', + name: 'Carol Updated', + age: 41, + ); + await adapter.persistAsync(streamId, updated, []); + + // Assert — fetch must return the updated version. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Carol Updated')); + expect(fetched.age, equals(41)); + }); + + test('ignores pending operations and stores full state', () async { + // Arrange — create a fake operation list. + final streamId = const StreamId('profile-4'); + final profile = _Profile(id: 'profile-4', name: 'Dave', age: 35); + final operations = [_FakeOperation()]; + + // Act — operations should be ignored. + await adapter.persistAsync(streamId, profile, operations); + + // Assert — the stored data must reflect full aggregate state. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Dave')); + }); + + test('stores multiple distinct aggregates independently', () async { + // Arrange + final profile1 = _Profile(id: 'a', name: 'One', age: 1); + final profile2 = _Profile(id: 'b', name: 'Two', age: 2); + + // Act + await adapter.persistAsync(const StreamId('a'), profile1, []); + await adapter.persistAsync(const StreamId('b'), profile2, []); + + // Assert — each aggregate must be independently retrievable. + final fetched1 = await adapter.fetchAsync(const StreamId('a')); + final fetched2 = await adapter.fetchAsync(const StreamId('b')); + expect(fetched1.name, equals('One')); + expect(fetched2.name, equals('Two')); + }); + }); + + group('length', () { + test('returns zero when the store is empty', () { + // Assert — no aggregates have been persisted yet. + expect(adapter.length, equals(0)); + }); + + test('reflects the number of stored aggregates', () async { + // Arrange + await adapter.persistAsync( + const StreamId('a'), + _Profile(id: 'a', name: 'A', age: 1), + [], + ); + await adapter.persistAsync( + const StreamId('b'), + _Profile(id: 'b', name: 'B', age: 2), + [], + ); + + // Assert — two distinct aggregates were stored. + expect(adapter.length, equals(2)); + }); + }); + + group('persistence', () { + test('data survives close and reopen', () async { + // Arrange — persist, then close. + final streamId = const StreamId('persistent-1'); + final profile = _Profile( + id: 'persistent-1', + name: 'Persistent', + age: 99, + ); + await adapter.persistAsync(streamId, profile, []); + await adapter.closeAsync(); + + // Act — reopen from the same Hive directory. + adapter = await HivePersistenceAdapter.openAsync<_Profile>( + boxName: 'profiles', + toJson: (profile) => profile.toJson(), + fromJson: _Profile.fromJson, + ); + final fetched = await adapter.fetchAsync(streamId); + + // Assert — data must have been persisted to disk. + expect(fetched.id, equals('persistent-1')); + expect(fetched.name, equals('Persistent')); + expect(fetched.age, equals(99)); + }); + }); + + group('closeAsync', () { + test('closes the underlying Hive box', () async { + // Arrange — persist something first. + await adapter.persistAsync( + const StreamId('x'), + _Profile(id: 'x', name: 'X', age: 1), + [], + ); + + // Act + await adapter.closeAsync(); + + // Assert — reopening and fetching proves the previous close + // released the box. If the box were still locked, openAsync + // would reuse the cached instance. + adapter = await HivePersistenceAdapter.openAsync<_Profile>( + boxName: 'profiles', + toJson: (profile) => profile.toJson(), + fromJson: _Profile.fromJson, + ); + final fetched = await adapter.fetchAsync(const StreamId('x')); + expect(fetched.name, equals('X')); + }); + }); + + group('serialization round-trip', () { + test('preserves null fields through serialization', () async { + // Arrange — a profile with a nullable field set to null. + final streamId = const StreamId('nullable'); + final profile = _Profile( + id: 'nullable', + name: 'NullTest', + age: null, + ); + + // Act + await adapter.persistAsync(streamId, profile, []); + final fetched = await adapter.fetchAsync(streamId); + + // Assert — null must survive the JSON encode/decode round-trip. + expect(fetched.age, isNull); + }); + }); + }); +} + +// --------------------------------------------------------------------------- +// Test fixtures +// --------------------------------------------------------------------------- + +/// A simple data class representing a profile, used as the +/// aggregate under test. +final class _Profile { + /// The profile identifier. + final String id; + + /// The display name. + final String name; + + /// The age, which may be null. + final int? age; + + /// Creates a profile with the given fields. + _Profile({required this.id, required this.name, required this.age}); + + /// Serializes the profile to a JSON-compatible map. + Map toJson() => {'id': id, 'name': name, 'age': age}; + + /// Deserializes a profile from a JSON-compatible map. + static _Profile fromJson(Map json) { + return _Profile( + id: json['id']! as String, + name: json['name']! as String, + age: json['age'] as int?, + ); + } +} + +/// A fake [Operation] for testing that operations are ignored. +final class _FakeOperation implements Operation {} diff --git a/packages/continuum_store_memory/lib/continuum_store_memory.dart b/packages/continuum_store_memory/lib/continuum_store_memory.dart index 5ecad1f..93ff9ce 100644 --- a/packages/continuum_store_memory/lib/continuum_store_memory.dart +++ b/packages/continuum_store_memory/lib/continuum_store_memory.dart @@ -1,8 +1,10 @@ /// In-memory store implementations for the continuum library. /// -/// Provides simple in-memory [EventStore] and [ReadModelStore] -/// implementations suitable for testing and development. +/// Provides simple in-memory [EventStore], [ReadModelStore], and +/// [AggregatePersistenceAdapter] implementations suitable for +/// testing and development. library; export 'src/in_memory_event_store.dart'; +export 'src/in_memory_persistence_adapter.dart'; export 'src/in_memory_read_model_store.dart'; diff --git a/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart b/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart new file mode 100644 index 0000000..46141ad --- /dev/null +++ b/packages/continuum_store_memory/lib/src/in_memory_persistence_adapter.dart @@ -0,0 +1,65 @@ +import 'package:continuum/continuum.dart'; +import 'package:continuum_state/continuum_state.dart'; + +/// In-memory implementation of [AggregatePersistenceAdapter]. +/// +/// Stores aggregates as JSON maps in a plain [Map], suitable for +/// testing and development. Data is lost when the instance is +/// garbage collected. +/// +/// The caller provides [toJson] and [fromJson] callbacks for +/// aggregate serialization. The [toJson] callback converts the +/// aggregate to a JSON-compatible map, and [fromJson] reconstructs +/// it from the stored map. +final class InMemoryPersistenceAdapter implements AggregatePersistenceAdapter { + /// Internal storage keyed by stream ID value. + final Map> _storage = {}; + + /// Serializes an aggregate to a JSON-compatible map. + final Map Function(TAggregate aggregate) _toJson; + + /// Deserializes an aggregate from a JSON-compatible map. + final TAggregate Function(Map json) _fromJson; + + /// Creates an in-memory persistence adapter. + /// + /// The [toJson] callback converts the aggregate to a JSON map for + /// storage. The [fromJson] callback reconstructs the aggregate from + /// the stored JSON map. Both callbacks are responsible for mapping + /// all mutable aggregate state — not just the creation fields. + InMemoryPersistenceAdapter({ + required Map Function(TAggregate aggregate) toJson, + required TAggregate Function(Map json) fromJson, + }) : _toJson = toJson, + _fromJson = fromJson; + + @override + Future fetchAsync(StreamId streamId) async { + final json = _storage[streamId.value]; + if (json == null) { + throw StateError( + 'Aggregate not found in in-memory store: ${streamId.value}', + ); + } + + // Reconstruct the aggregate from the stored JSON map. + return _fromJson(json); + } + + @override + Future persistAsync( + StreamId streamId, + TAggregate aggregate, + List pendingOperations, + ) async { + // Store the full aggregate state as a JSON map. The pending + // operations are ignored — we always write the complete entity. + _storage[streamId.value] = _toJson(aggregate); + } + + /// The number of stored aggregates. + int get length => _storage.length; + + /// Clears all stored aggregates. + void clear() => _storage.clear(); +} diff --git a/packages/continuum_store_memory/pubspec.yaml b/packages/continuum_store_memory/pubspec.yaml index 8c8f01e..9b4d7a0 100644 --- a/packages/continuum_store_memory/pubspec.yaml +++ b/packages/continuum_store_memory/pubspec.yaml @@ -13,6 +13,7 @@ dependencies: zooper_flutter_core: ^2.0.0 continuum: ^5.0.0 continuum_event_sourcing: ^5.0.0 + continuum_state: ^5.0.0 dev_dependencies: build_runner: ^2.4.0 diff --git a/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart b/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart new file mode 100644 index 0000000..393437c --- /dev/null +++ b/packages/continuum_store_memory/test/in_memory_persistence_adapter_test.dart @@ -0,0 +1,246 @@ +import 'package:continuum/continuum.dart'; +import 'package:continuum_store_memory/continuum_store_memory.dart'; +import 'package:test/test.dart'; + +void main() { + group('InMemoryPersistenceAdapter', () { + late InMemoryPersistenceAdapter<_Profile> adapter; + + setUp(() { + adapter = InMemoryPersistenceAdapter<_Profile>( + toJson: (profile) => profile.toJson(), + fromJson: _Profile.fromJson, + ); + }); + + group('fetchAsync', () { + test('returns the stored aggregate when it exists', () async { + // Arrange — persist a profile first. + final streamId = const StreamId('profile-1'); + final profile = _Profile( + id: 'profile-1', + name: 'Alice', + age: 30, + ); + await adapter.persistAsync(streamId, profile, []); + + // Act + final fetched = await adapter.fetchAsync(streamId); + + // Assert — round-trip must preserve all fields. + expect(fetched.id, equals('profile-1')); + expect(fetched.name, equals('Alice')); + expect(fetched.age, equals(30)); + }); + + test('throws StateError when the aggregate does not exist', () async { + // Arrange — empty store. + final streamId = const StreamId('nonexistent'); + + // Act & Assert — fetching a missing aggregate must throw. + await expectLater( + adapter.fetchAsync(streamId), + throwsA(isA()), + ); + }); + }); + + group('persistAsync', () { + test('stores the aggregate so it can be fetched later', () async { + // Arrange + final streamId = const StreamId('profile-2'); + final profile = _Profile(id: 'profile-2', name: 'Bob', age: 25); + + // Act + await adapter.persistAsync(streamId, profile, []); + + // Assert — the aggregate must be retrievable. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Bob')); + }); + + test('overwrites existing aggregate on repeated persist', () async { + // Arrange — persist an initial version. + final streamId = const StreamId('profile-3'); + final original = _Profile(id: 'profile-3', name: 'Carol', age: 40); + await adapter.persistAsync(streamId, original, []); + + // Act — persist an updated version with the same stream ID. + final updated = _Profile(id: 'profile-3', name: 'Carol Updated', age: 41); + await adapter.persistAsync(streamId, updated, []); + + // Assert — fetch must return the updated version. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Carol Updated')); + expect(fetched.age, equals(41)); + }); + + test('ignores pending operations and stores full state', () async { + // Arrange — create a fake operation list. + final streamId = const StreamId('profile-4'); + final profile = _Profile(id: 'profile-4', name: 'Dave', age: 35); + final operations = [_FakeOperation()]; + + // Act — operations should be ignored. + await adapter.persistAsync(streamId, profile, operations); + + // Assert — the stored data must reflect full aggregate state. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Dave')); + }); + + test('stores multiple distinct aggregates independently', () async { + // Arrange + final profile1 = _Profile(id: 'a', name: 'One', age: 1); + final profile2 = _Profile(id: 'b', name: 'Two', age: 2); + + // Act + await adapter.persistAsync(const StreamId('a'), profile1, []); + await adapter.persistAsync(const StreamId('b'), profile2, []); + + // Assert — each aggregate must be independently retrievable. + final fetched1 = await adapter.fetchAsync(const StreamId('a')); + final fetched2 = await adapter.fetchAsync(const StreamId('b')); + expect(fetched1.name, equals('One')); + expect(fetched2.name, equals('Two')); + }); + }); + + group('length', () { + test('returns zero when the store is empty', () { + // Assert — no aggregates have been persisted yet. + expect(adapter.length, equals(0)); + }); + + test('reflects the number of stored aggregates', () async { + // Arrange + await adapter.persistAsync( + const StreamId('a'), + _Profile(id: 'a', name: 'A', age: 1), + [], + ); + await adapter.persistAsync( + const StreamId('b'), + _Profile(id: 'b', name: 'B', age: 2), + [], + ); + + // Assert — two distinct aggregates were stored. + expect(adapter.length, equals(2)); + }); + + test('does not increment when overwriting the same stream ID', () async { + // Arrange + final streamId = const StreamId('same'); + await adapter.persistAsync( + streamId, + _Profile(id: 'same', name: 'V1', age: 1), + [], + ); + + // Act — overwrite with a new version. + await adapter.persistAsync( + streamId, + _Profile(id: 'same', name: 'V2', age: 2), + [], + ); + + // Assert — length must stay at 1 since it's the same key. + expect(adapter.length, equals(1)); + }); + }); + + group('clear', () { + test('removes all stored aggregates', () async { + // Arrange — store some aggregates. + await adapter.persistAsync( + const StreamId('a'), + _Profile(id: 'a', name: 'A', age: 1), + [], + ); + await adapter.persistAsync( + const StreamId('b'), + _Profile(id: 'b', name: 'B', age: 2), + [], + ); + + // Act + adapter.clear(); + + // Assert — store must be empty. + expect(adapter.length, equals(0)); + }); + + test('causes subsequent fetches to throw', () async { + // Arrange — store and then clear. + final streamId = const StreamId('c'); + await adapter.persistAsync( + streamId, + _Profile(id: 'c', name: 'C', age: 3), + [], + ); + adapter.clear(); + + // Act & Assert — the aggregate is gone. + await expectLater( + adapter.fetchAsync(streamId), + throwsA(isA()), + ); + }); + }); + + group('serialization round-trip', () { + test('preserves null fields through serialization', () async { + // Arrange — a profile with a nullable field set to null. + final streamId = const StreamId('nullable'); + final profile = _Profile( + id: 'nullable', + name: 'NullTest', + age: null, + ); + + // Act + await adapter.persistAsync(streamId, profile, []); + final fetched = await adapter.fetchAsync(streamId); + + // Assert — null must survive the round-trip. + expect(fetched.age, isNull); + }); + }); + }); +} + +// --------------------------------------------------------------------------- +// Test fixtures +// --------------------------------------------------------------------------- + +/// A simple data class representing a profile, used as the +/// aggregate under test. +final class _Profile { + /// The profile identifier. + final String id; + + /// The display name. + final String name; + + /// The age, which may be null. + final int? age; + + /// Creates a profile with the given fields. + _Profile({required this.id, required this.name, required this.age}); + + /// Serializes the profile to a JSON-compatible map. + Map toJson() => {'id': id, 'name': name, 'age': age}; + + /// Deserializes a profile from a JSON-compatible map. + static _Profile fromJson(Map json) { + return _Profile( + id: json['id']! as String, + name: json['name']! as String, + age: json['age'] as int?, + ); + } +} + +/// A fake [Operation] for testing that operations are ignored. +final class _FakeOperation implements Operation {} diff --git a/packages/continuum_store_sembast/lib/continuum_store_sembast.dart b/packages/continuum_store_sembast/lib/continuum_store_sembast.dart index f9e3db0..70acac5 100644 --- a/packages/continuum_store_sembast/lib/continuum_store_sembast.dart +++ b/packages/continuum_store_sembast/lib/continuum_store_sembast.dart @@ -1,8 +1,9 @@ /// Sembast-backed store implementations for the continuum library. /// -/// Provides persistent local [EventStore] and [ReadModelStore] -/// implementations using Sembast. +/// Provides persistent local [EventStore], [ReadModelStore], and +/// [AggregatePersistenceAdapter] implementations using Sembast. library; export 'src/sembast_event_store.dart'; +export 'src/sembast_persistence_adapter.dart'; export 'src/sembast_read_model_store.dart'; diff --git a/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart b/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart new file mode 100644 index 0000000..d7325c2 --- /dev/null +++ b/packages/continuum_store_sembast/lib/src/sembast_persistence_adapter.dart @@ -0,0 +1,78 @@ +import 'dart:convert'; + +import 'package:continuum/continuum.dart'; +import 'package:continuum_state/continuum_state.dart'; +import 'package:sembast/sembast.dart'; + +/// Sembast-backed implementation of [AggregatePersistenceAdapter]. +/// +/// Stores aggregates as JSON strings in a Sembast [StoreRef]. +/// Aggregates survive app restarts and device reboots. +/// +/// The caller provides [toJson] and [fromJson] callbacks for +/// aggregate serialization. The adapter stores the full entity +/// state on every persist — pending operations are ignored. +final class SembastPersistenceAdapter implements AggregatePersistenceAdapter { + /// The Sembast database instance. + final Database _database; + + /// The Sembast store for JSON-encoded aggregates, keyed by string. + final StoreRef _store; + + /// Serializes an aggregate to a JSON-compatible map. + final Map Function(TAggregate aggregate) _toJson; + + /// Deserializes an aggregate from a JSON-compatible map. + final TAggregate Function(Map json) _fromJson; + + /// Creates a Sembast-backed persistence adapter. + /// + /// The [database] must already be opened by the caller. The + /// [storeName] identifies the Sembast store within the database — + /// use a unique name per aggregate type to prevent collisions. + /// + /// The [toJson] and [fromJson] callbacks handle aggregate + /// serialization. Both callbacks are responsible for mapping all + /// mutable aggregate state — not just the creation fields. + SembastPersistenceAdapter({ + required Database database, + required String storeName, + required Map Function(TAggregate aggregate) toJson, + required TAggregate Function(Map json) fromJson, + }) : _database = database, + _store = StoreRef(storeName), + _toJson = toJson, + _fromJson = fromJson; + + @override + Future fetchAsync(StreamId streamId) async { + final json = await _store.record(streamId.value).get(_database); + if (json == null) { + throw StateError( + 'Aggregate not found in Sembast store: ${streamId.value}', + ); + } + + // Decode the stored JSON string back into an aggregate. + final decoded = jsonDecode(json) as Map; + return _fromJson(decoded); + } + + @override + Future persistAsync( + StreamId streamId, + TAggregate aggregate, + List pendingOperations, + ) async { + // Encode the full aggregate state to a JSON string and store it. + // Pending operations are ignored — we always write the complete + // entity. + final json = jsonEncode(_toJson(aggregate)); + await _store.record(streamId.value).put(_database, json); + } + + /// Returns the number of stored aggregates. + Future countAsync() async { + return _store.count(_database); + } +} diff --git a/packages/continuum_store_sembast/pubspec.yaml b/packages/continuum_store_sembast/pubspec.yaml index 43099a3..b418357 100644 --- a/packages/continuum_store_sembast/pubspec.yaml +++ b/packages/continuum_store_sembast/pubspec.yaml @@ -13,6 +13,7 @@ dependencies: zooper_flutter_core: ^2.0.0 continuum: ^5.0.0 continuum_event_sourcing: ^5.0.0 + continuum_state: ^5.0.0 sembast: ^3.8.6 dev_dependencies: diff --git a/packages/continuum_store_sembast/test/sembast_persistence_adapter_test.dart b/packages/continuum_store_sembast/test/sembast_persistence_adapter_test.dart new file mode 100644 index 0000000..b5a385c --- /dev/null +++ b/packages/continuum_store_sembast/test/sembast_persistence_adapter_test.dart @@ -0,0 +1,260 @@ +import 'dart:io'; + +import 'package:continuum/continuum.dart'; +import 'package:continuum_store_sembast/continuum_store_sembast.dart'; +import 'package:sembast/sembast_io.dart'; +import 'package:test/test.dart'; + +void main() { + group('SembastPersistenceAdapter', () { + late Directory tempDir; + late Database database; + late SembastPersistenceAdapter<_Profile> adapter; + + setUp(() async { + tempDir = await Directory.systemTemp.createTemp( + 'sembast_persistence_test_', + ); + final dbPath = '${tempDir.path}/test.db'; + database = await databaseFactoryIo.openDatabase(dbPath); + adapter = SembastPersistenceAdapter<_Profile>( + database: database, + storeName: 'profiles', + toJson: (profile) => profile.toJson(), + fromJson: _Profile.fromJson, + ); + }); + + tearDown(() async { + await database.close(); + if (tempDir.existsSync()) { + tempDir.deleteSync(recursive: true); + } + }); + + group('fetchAsync', () { + test('returns the stored aggregate when it exists', () async { + // Arrange — persist a profile first. + final streamId = const StreamId('profile-1'); + final profile = _Profile( + id: 'profile-1', + name: 'Alice', + age: 30, + ); + await adapter.persistAsync(streamId, profile, []); + + // Act + final fetched = await adapter.fetchAsync(streamId); + + // Assert — round-trip must preserve all fields. + expect(fetched.id, equals('profile-1')); + expect(fetched.name, equals('Alice')); + expect(fetched.age, equals(30)); + }); + + test('throws StateError when the aggregate does not exist', () async { + // Arrange — empty store. + final streamId = const StreamId('nonexistent'); + + // Act & Assert — fetching a missing aggregate must throw. + await expectLater( + adapter.fetchAsync(streamId), + throwsA(isA()), + ); + }); + }); + + group('persistAsync', () { + test('stores the aggregate so it can be fetched later', () async { + // Arrange + final streamId = const StreamId('profile-2'); + final profile = _Profile(id: 'profile-2', name: 'Bob', age: 25); + + // Act + await adapter.persistAsync(streamId, profile, []); + + // Assert — the aggregate must be retrievable. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Bob')); + }); + + test('overwrites existing aggregate on repeated persist', () async { + // Arrange — persist an initial version. + final streamId = const StreamId('profile-3'); + final original = _Profile(id: 'profile-3', name: 'Carol', age: 40); + await adapter.persistAsync(streamId, original, []); + + // Act — persist an updated version with the same stream ID. + final updated = _Profile( + id: 'profile-3', + name: 'Carol Updated', + age: 41, + ); + await adapter.persistAsync(streamId, updated, []); + + // Assert — fetch must return the updated version. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Carol Updated')); + expect(fetched.age, equals(41)); + }); + + test('ignores pending operations and stores full state', () async { + // Arrange — create a fake operation list. + final streamId = const StreamId('profile-4'); + final profile = _Profile(id: 'profile-4', name: 'Dave', age: 35); + final operations = [_FakeOperation()]; + + // Act — operations should be ignored. + await adapter.persistAsync(streamId, profile, operations); + + // Assert — the stored data must reflect full aggregate state. + final fetched = await adapter.fetchAsync(streamId); + expect(fetched.name, equals('Dave')); + }); + + test('stores multiple distinct aggregates independently', () async { + // Arrange + final profile1 = _Profile(id: 'a', name: 'One', age: 1); + final profile2 = _Profile(id: 'b', name: 'Two', age: 2); + + // Act + await adapter.persistAsync(const StreamId('a'), profile1, []); + await adapter.persistAsync(const StreamId('b'), profile2, []); + + // Assert — each aggregate must be independently retrievable. + final fetched1 = await adapter.fetchAsync(const StreamId('a')); + final fetched2 = await adapter.fetchAsync(const StreamId('b')); + expect(fetched1.name, equals('One')); + expect(fetched2.name, equals('Two')); + }); + }); + + group('countAsync', () { + test('returns zero when the store is empty', () async { + // Assert — no aggregates have been persisted yet. + expect(await adapter.countAsync(), equals(0)); + }); + + test('reflects the number of stored aggregates', () async { + // Arrange + await adapter.persistAsync( + const StreamId('a'), + _Profile(id: 'a', name: 'A', age: 1), + [], + ); + await adapter.persistAsync( + const StreamId('b'), + _Profile(id: 'b', name: 'B', age: 2), + [], + ); + + // Assert — two distinct aggregates were stored. + expect(await adapter.countAsync(), equals(2)); + }); + + test('does not increment when overwriting the same stream ID', () async { + // Arrange + final streamId = const StreamId('same'); + await adapter.persistAsync( + streamId, + _Profile(id: 'same', name: 'V1', age: 1), + [], + ); + + // Act — overwrite with a new version. + await adapter.persistAsync( + streamId, + _Profile(id: 'same', name: 'V2', age: 2), + [], + ); + + // Assert — count must stay at 1 since it's the same key. + expect(await adapter.countAsync(), equals(1)); + }); + }); + + group('persistence', () { + test('data survives close and reopen', () async { + // Arrange — persist, then close the database. + final streamId = const StreamId('persistent-1'); + final profile = _Profile( + id: 'persistent-1', + name: 'Persistent', + age: 99, + ); + await adapter.persistAsync(streamId, profile, []); + await database.close(); + + // Act — reopen from the same database file. + final dbPath = '${tempDir.path}/test.db'; + database = await databaseFactoryIo.openDatabase(dbPath); + adapter = SembastPersistenceAdapter<_Profile>( + database: database, + storeName: 'profiles', + toJson: (profile) => profile.toJson(), + fromJson: _Profile.fromJson, + ); + final fetched = await adapter.fetchAsync(streamId); + + // Assert — data must have been persisted to disk. + expect(fetched.id, equals('persistent-1')); + expect(fetched.name, equals('Persistent')); + expect(fetched.age, equals(99)); + }); + }); + + group('serialization round-trip', () { + test('preserves null fields through serialization', () async { + // Arrange — a profile with a nullable field set to null. + final streamId = const StreamId('nullable'); + final profile = _Profile( + id: 'nullable', + name: 'NullTest', + age: null, + ); + + // Act + await adapter.persistAsync(streamId, profile, []); + final fetched = await adapter.fetchAsync(streamId); + + // Assert — null must survive the JSON encode/decode round-trip. + expect(fetched.age, isNull); + }); + }); + }); +} + +// --------------------------------------------------------------------------- +// Test fixtures +// --------------------------------------------------------------------------- + +/// A simple data class representing a profile, used as the +/// aggregate under test. +final class _Profile { + /// The profile identifier. + final String id; + + /// The display name. + final String name; + + /// The age, which may be null. + final int? age; + + /// Creates a profile with the given fields. + _Profile({required this.id, required this.name, required this.age}); + + /// Serializes the profile to a JSON-compatible map. + Map toJson() => {'id': id, 'name': name, 'age': age}; + + /// Deserializes a profile from a JSON-compatible map. + static _Profile fromJson(Map json) { + return _Profile( + id: json['id']! as String, + name: json['name']! as String, + age: json['age'] as int?, + ); + } +} + +/// A fake [Operation] for testing that operations are ignored. +final class _FakeOperation implements Operation {}