feat: copy sqlite-es and event-sorcery crates#3
Conversation
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: ⛔ Files ignored due to path filters (2)
📒 Files selected for processing (33)
WalkthroughThis PR establishes a complete event-sourcing framework by introducing two crates— ChangesEvent-Sourcing System with SQLite Persistence
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
c95843c to
4a8f3db
Compare
Crates copied verbatim from st0x.issuance (sqlite-es) and st0x.liquidity (event-sorcery). Adds workspace Cargo.toml with strict lints (warnings/clippy::all/pedantic/nursery/unwrap_used/expect_used denied; unsafe_code forbidden), clippy.toml exempting unwrap/expect in tests, and the canonical events+snapshots schema migration. Workspace dependency versions audited and bumped to current semver-compatible releases. cqrs-es held at 0.4.12 — 0.5.0 is a major bump pending deliberate upgrade.
There was a problem hiding this comment.
Actionable comments posted: 16
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/event-sorcery/src/lib.rs`:
- Around line 524-639: Both load_all_ids and load_ids_paginated duplicate the
rows.into_iter().fold(...) parse-and-collect logic; extract this into a single
helper (e.g., parse_collect_ids or parse_ids_from_rows) that takes the
Vec<(String,)> and a log-context/message (or an enum flag) and returns
Result<Vec<Entity::Id>, Vec<String>> or (ids, invalid) so both functions can
call it; update load_all_ids and load_ids_paginated to call that helper and
convert the helper result into the existing Ok(...) /
Err(LoadAllIdsError::InvalidIds { ... }) flow, preserving the aggregate_type and
using the appropriate log message context previously used in each function.
In `@crates/event-sorcery/src/lifecycle.rs`:
- Around line 168-221: The Self::Failed arm of apply is re-wrapping any existing
LifecycleError in LifecycleError::AlreadyFailed on every incoming event, causing
unbounded nesting; change apply (the Self::Failed { error, last_valid_entity }
arm) to detect if error is already an AlreadyFailed and reuse its inner failure
(or unwrap one level) instead of boxing it again so the depth is capped at one;
update the code that constructs LifecycleError::AlreadyFailed to either use the
existing failure when matches!(error, LifecycleError::AlreadyFailed{..}) or
extract failure when let LifecycleError::AlreadyFailed{failure, ..} = error and
build a single-layer AlreadyFailed containing that failure plus the new event;
add a regression test that calls apply (via View::update or directly) with at
least two events after a failure and asserts the inner failure is not an
AlreadyFailed.
In `@crates/event-sorcery/src/projection.rs`:
- Around line 415-427: The catch_up upsert bypasses optimistic locking and can
clobber newer rows if called while the reactor is live; either make the
startup-only APIs (catch_up, rebuild, rebuild_all) non-public or make the upsert
version-aware: change the SQL used in this block (the query building around
INSERT INTO {table} ... ON CONFLICT(view_id) DO UPDATE ...) to only replace the
row when the incoming version is newer (e.g. use the excluded/parameter version
in the DO UPDATE ... WHERE existing.version < excluded.version or WHERE version
< ?2) so you never write an older payload/version over a newer one, and keep the
rest of the write logic (bind(view_id), bind(max_seq), bind(&payload),
execute(pool).await?) the same.
- Around line 638-658: The code currently treats non_null_count == 0 as fatal
and returns ProjectionError::StaleColumn; instead, change the behavior so that
when row_count.0 > 0 and non_null_count.0 == 0 you only emit the warn! (target:
"cqrs", %column_name, %table, row_count = %row_count.0, ...) and do not return
Err; remove the return of ProjectionError::StaleColumn for this case (or make it
behind an explicit opt-in flag such as fail_on_all_nulls if you need
configurable strictness), leaving existence checks (row_count) as the only
automatic failure path and letting all-NULL be a warning-only condition.
- Around line 565-597: The react() handler currently swallows load/save failures
by returning Ok(()) which lets subsequent events be applied to stale state;
modify react() so that on load failure (Err from self.repo.load_with_context)
and on terminal save failure (final PersistenceError::OptimisticLockError and
other Err(error) arms from self.repo.update_view) it returns an Err or triggers
the gap/recovery path instead of Ok(()). Concretely, change the match arm
handling Err(error) from load_with_context to propagate/convert that error (e.g.
return Err(ProjectionError::LoadFailure(error)) or call the existing gap-aware
recovery API), and change the final Err branches after update_view to either
return Err(ProjectionError::SaveFailure(error)) or invoke
mark_gap_for_recovery(view_id) / enqueue_full_rebuild(view_id) instead of
returning Ok(()), ensuring react()'s signature and callers accept/handle the
propagated failure.
In `@crates/event-sorcery/src/sqlite_event_repository.rs`:
- Around line 196-202: The SQL returned by the repository's event stream query
(the sqlx::query(...) call used by stream_all_events) orders only by sequence
which is per-aggregate and thus non-deterministic across aggregates; update the
query's ORDER BY clause to include aggregate_id as a tiebreaker (e.g., ORDER BY
sequence, aggregate_id) so replay is deterministic and keep the same selected
columns (aggregate_type, aggregate_id, sequence, ...).
- Around line 181-205: The current match arms use fetch_all(&pool).await which
loads the entire result set into memory; change both branches (the
Some(aggregate_id) and None arms) to create a query then call .fetch(&pool) and
iterate with .try_next().await (or StreamExt) to process each row as it arrives,
mapping each row to the existing event structure and sending it into the
existing channel (e.g., tx/send logic) inside the loop so backpressure is
respected and no intermediate Vec of rows is allocated; keep bindings of
aggregate_type and aggregate_id the same and reuse the same row-to-event
conversion and channel send code used after the original fetch_all.
In `@crates/event-sorcery/src/wire.rs`:
- Around line 137-153: reconcile::<Entity>()'s result is ignored but it can
indicate schema drift; update the flow around
Reconciler::new(self.pool.clone()).reconcile::<Entity>() to capture its return
value, detect when the schema version changed (the reconciliation result/enum
from reconcile::<Entity>()), and if drift/schema bump occurred, rebuild or
reinitialize the projection before calling Projection::sqlite(...).catch_up() so
views that already have max(event) still get their serialized payloads/columns
updated; ensure Projection::sqlite(self.pool.clone()) and
projection.catch_up().await error mapping (ProjectionError -> ReconcileError)
remain intact and run only after the projection has been recreated or migrated
when reconcile indicates a schema change.
In `@crates/sqlite-es/README.md`:
- Around line 50-54: Add a short warning to the README near the
SqliteEventRepository::with_tables example explaining that the passed table
names (as in the let repo = SqliteEventRepository::with_tables(...) example) are
used as SQL identifiers and are not SQL bind parameters, so they must be trusted
constants and must never be derived from user input or request data; mention the
SQL-injection risk and recommend using compile-time constants or
server-controlled configuration for those names.
In `@crates/sqlite-es/src/event_repository.rs`:
- Around line 141-143: The builder method with_stream_channel_size currently
accepts 0 which will cause ReplayStream::new to panic because tokio bounded
channels require buffer > 0; update with_stream_channel_size (and the similar
setter at the other occurrence) to validate the input—either
assert!(stream_channel_size > 0) or coerce via stream_channel_size =
stream_channel_size.max(1)—so that stream_channel_size passed into
ReplayStream::new is always > 0 and cannot trigger the runtime panic.
- Around line 81-93: The persist method currently calls insert_events() and then
update_snapshot() in separate transactions which can lead to partial commits;
change persist to open a single database transaction and perform both event
insertion and snapshot update within that transaction (use the DB
connection/transaction API your crate uses), passing the transaction handle into
the lower-level routines (e.g., modify or add insert_events_tx(...) and
update_snapshot_tx(...) or change insert_events/update_snapshot to accept a &mut
Transaction) so that on any error the transaction is rolled back and only
committed after both operations succeed; update persist to begin transaction,
call the transactional versions of insert_events and update_snapshot::<A>(),
then commit the transaction before returning Ok(()).
- Around line 262-303: The code currently calls
sqlx::query(...).fetch_all(&pool).await which materializes the entire result
set; change it to use sqlx::query(&query).bind(...).fetch(&pool) and iterate the
returned stream (using futures::stream::StreamExt or equivalent) so you process
rows one-by-one and only call feed.push(Ok(event)).await per row; handle and map
any stream/row error into PersistenceError::ConnectionError (pushing Err to
feed) and continue to map deserialization errors from row_to_serialized_event
into PersistenceError::DeserializationError (pushing Err to feed), and stop
iteration if feed.push().await returns Err, ensuring backpressure from feed
regulates the query.
In `@crates/sqlite-es/src/lib.rs`:
- Line 49: The testing module is being exported unconditionally via `pub mod
testing;`, exposing test-only APIs like `create_test_pool()` to production
consumers and forcing the migrations path requirement; change the declaration to
be conditional by adding the attribute `#[cfg(any(test, feature =
"test-support"))]` above `pub mod testing;` so it only compiles for tests or
when the feature is enabled, and then add the `test-support` feature to
crates/sqlite-es/Cargo.toml (documenting it as optional test utilities) so
consumers opt-in when needed.
In `@crates/sqlite-es/src/sql_query.rs`:
- Around line 7-12: The constructor new currently accepts raw Strings for
events_table and snapshots_table and those identifiers are interpolated into SQL
elsewhere (lines ~107-130), creating an injection risk; introduce a validated
Identifier (newtype) that enforces an allowlist regex like
^[A-Za-z_][A-Za-z0-9_]*$ and use that type instead of raw String in pub(crate)
const fn new(events_table: String, snapshots_table: String) (validate at
construction or provide a try_new returning Result), and when composing SQL
always quote identifiers consistently (e.g. double-quote/escape via a helper
like Identifier::quoted()) before interpolating into SQL in the methods that
build queries (the code paths around the SQL composition at lines 107-130).
In `@crates/sqlite-es/src/testing.rs`:
- Around line 9-10: The in-memory SQLite pool must be created with a single
shared connection and the correct URI to avoid isolated in-memory databases;
replace the Pool::<Sqlite>::connect(":memory:").await? call with creating the
pool via
SqlitePoolOptions::new().max_connections(1).connect("sqlite::memory:").await?
and keep running migrations with
sqlx::migrate!("../../migrations").run(&pool).await? so migrations run on the
single shared connection.
In `@crates/sqlite-es/src/view_repository.rs`:
- Around line 169-179: The insert branch that runs when context.version == 0
currently maps any insert conflict to SqliteViewError::Connection, which hides
first-write races from the optimistic-lock retry path; catch the specific
constraint/unique-violation error returned by sqlx when executing
SqlQueryFactory::insert_view(&self.view_table) (the same pattern used for update
conflicts) and map that case to SqliteViewError::OptimisticLockError instead of
Connection, leaving other database errors mapped to Connection; apply the
identical change to the other insert-location noted (the same insert handling
around the later insert block that currently maps to Connection) so concurrent
creators produce OptimisticLockError and trigger retry logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7aee5a21-94c3-48d3-a271-ea3de0bca058
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (21)
Cargo.tomlclippy.tomlcrates/event-sorcery/Cargo.tomlcrates/event-sorcery/src/dependency.rscrates/event-sorcery/src/lib.rscrates/event-sorcery/src/lifecycle.rscrates/event-sorcery/src/projection.rscrates/event-sorcery/src/reactor.rscrates/event-sorcery/src/schema_registry.rscrates/event-sorcery/src/sqlite_event_repository.rscrates/event-sorcery/src/testing.rscrates/event-sorcery/src/wire.rscrates/sqlite-es/Cargo.tomlcrates/sqlite-es/README.mdcrates/sqlite-es/src/cqrs.rscrates/sqlite-es/src/event_repository.rscrates/sqlite-es/src/lib.rscrates/sqlite-es/src/sql_query.rscrates/sqlite-es/src/testing.rscrates/sqlite-es/src/view_repository.rsmigrations/20251016210348_init.sql
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Graphite / mergeability_check
🧰 Additional context used
📓 Path-based instructions (2)
*
⚙️ CodeRabbit configuration file
Focus on providing constructive criticism. Whenever you see a suboptimal approach, suggest more idiomatic or robust alternative(s). Flag potential footguns. Suggest FP alternatives to mutable/imperative code. Point out architectural flaws like leaky abstractions, tight coupling, wrong level of abstraction, poor type modeling, over-abstraction, unclear domain boundaries. Code should generally be organized based on business concerns rather than technical aspects - suggest improvements if you find violations. Point out gaps in test coverage but suggest tests that are not too coupled to the implementation and actually test domain invariants and business logic
Files:
clippy.tomlCargo.toml
**/*.md
⚙️ CodeRabbit configuration file
Focus on the contents of the docs and not on cosmetic things like markdown formatting. We use markdown files for various docs including but not limited to guidelines for AI contributors (AGENTS.md), project overview and instructions for human contributors (README.md), and topic-focused references under docs/ (cqrs.md, sqlx.md, ttdd.md). Think about the target audience of a document when deciding what comment to leave. For instructions, suggest better rules and guidelines and point out missing instructions. For topic references, suggest improvements that would make non-obvious framework behavior or pitfalls easier to discover. In all cases, flag needless bloat, prefer clear concise writing, and consider the structure of the document and order of the sections
Files:
crates/sqlite-es/README.md
🪛 OpenGrep (1.20.0)
crates/event-sorcery/src/lib.rs
[ERROR] 505-505: SQL query built via format!() passed to a database method. Use parameterized queries with bind parameters instead.
(coderabbit.sql-injection.rust-format-query)
crates/event-sorcery/src/projection.rs
[ERROR] 318-318: SQL query built via format!() passed to a database method. Use parameterized queries with bind parameters instead.
(coderabbit.sql-injection.rust-format-query)
[ERROR] 336-336: SQL query built via format!() passed to a database method. Use parameterized queries with bind parameters instead.
(coderabbit.sql-injection.rust-format-query)
[ERROR] 418-422: SQL query built via format!() passed to a database method. Use parameterized queries with bind parameters instead.
(coderabbit.sql-injection.rust-format-query)
🔇 Additional comments (21)
crates/event-sorcery/src/testing.rs (8)
25-90: LGTM!
92-133: LGTM!
135-152: LGTM!
154-214: LGTM!
216-297: LGTM!
299-401: LGTM!
403-512: LGTM!
514-670: LGTM!crates/event-sorcery/src/lib.rs (2)
78-126: LGTM!Also applies to: 188-376, 386-394, 406-449, 461-509
646-678: LGTM!Also applies to: 681-1095
crates/event-sorcery/src/dependency.rs (1)
29-381: LGTM!Also applies to: 383-595
crates/event-sorcery/src/lifecycle.rs (2)
10-77: LGTM!Also applies to: 79-122, 124-167, 222-298
300-555: LGTM!crates/event-sorcery/src/reactor.rs (1)
12-76: LGTM!migrations/20251016210348_init.sql (1)
1-27: LGTM!crates/event-sorcery/src/sqlite_event_repository.rs (1)
50-106: LGTM!Also applies to: 108-163, 240-281, 284-325
Cargo.toml (1)
1-49: LGTM!clippy.toml (1)
1-6: LGTM!crates/event-sorcery/Cargo.toml (1)
1-24: LGTM!crates/sqlite-es/Cargo.toml (1)
1-22: LGTM!crates/sqlite-es/src/cqrs.rs (1)
1-29: LGTM!
| pub async fn load_all_ids<Entity: EventSourced>( | ||
| pool: &SqlitePool, | ||
| ) -> Result<Vec<Entity::Id>, LoadAllIdsError> | ||
| where | ||
| <Entity::Id as FromStr>::Err: Debug, | ||
| { | ||
| let rows: Vec<(String,)> = sqlx::query_as( | ||
| "SELECT aggregate_id FROM ( \ | ||
| SELECT aggregate_id FROM events WHERE aggregate_type = ?1 \ | ||
| UNION \ | ||
| SELECT aggregate_id FROM snapshots WHERE aggregate_type = ?1 \ | ||
| ) \ | ||
| ORDER BY aggregate_id ASC", | ||
| ) | ||
| .bind(Entity::AGGREGATE_TYPE) | ||
| .fetch_all(pool) | ||
| .await?; | ||
|
|
||
| let (ids, invalid) = rows.into_iter().fold( | ||
| (Vec::new(), Vec::new()), | ||
| |(mut ids, mut invalid), (id_str,)| { | ||
| match id_str.parse::<Entity::Id>() { | ||
| Ok(id) => ids.push(id), | ||
| Err(parse_error) => { | ||
| tracing::warn!( | ||
| target: "cqrs", | ||
| aggregate_id = id_str, | ||
| aggregate_type = Entity::AGGREGATE_TYPE, | ||
| ?parse_error, | ||
| "Failed to parse aggregate ID" | ||
| ); | ||
| invalid.push(id_str); | ||
| } | ||
| } | ||
| (ids, invalid) | ||
| }, | ||
| ); | ||
|
|
||
| if invalid.is_empty() { | ||
| Ok(ids) | ||
| } else { | ||
| Err(LoadAllIdsError::InvalidIds { | ||
| aggregate_type: Entity::AGGREGATE_TYPE, | ||
| ids: invalid, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| /// Load aggregate IDs with pagination, newest first (by highest rowid). | ||
| /// | ||
| /// Returns up to `limit` IDs starting from `offset`, ordered by most | ||
| /// recently created aggregate first (based on the maximum rowid of each | ||
| /// aggregate's events or snapshot). | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns `LoadAllIdsError` on database errors or unparseable IDs. | ||
| pub async fn load_ids_paginated<Entity: EventSourced>( | ||
| pool: &SqlitePool, | ||
| limit: usize, | ||
| offset: usize, | ||
| ) -> Result<Vec<Entity::Id>, LoadAllIdsError> | ||
| where | ||
| <Entity::Id as FromStr>::Err: Debug, | ||
| { | ||
| let rows: Vec<(String,)> = sqlx::query_as( | ||
| "SELECT aggregate_id FROM ( \ | ||
| SELECT aggregate_id, MAX(rowid) AS latest_rowid \ | ||
| FROM events \ | ||
| WHERE aggregate_type = ?1 \ | ||
| GROUP BY aggregate_id \ | ||
| UNION ALL \ | ||
| SELECT aggregate_id, MAX(rowid) AS latest_rowid \ | ||
| FROM snapshots \ | ||
| WHERE aggregate_type = ?1 \ | ||
| GROUP BY aggregate_id \ | ||
| ) \ | ||
| GROUP BY aggregate_id \ | ||
| ORDER BY MAX(latest_rowid) DESC \ | ||
| LIMIT ?2 OFFSET ?3", | ||
| ) | ||
| .bind(Entity::AGGREGATE_TYPE) | ||
| .bind(i64::try_from(limit)?) | ||
| .bind(i64::try_from(offset)?) | ||
| .fetch_all(pool) | ||
| .await?; | ||
|
|
||
| let (ids, invalid) = rows.into_iter().fold( | ||
| (Vec::new(), Vec::new()), | ||
| |(mut ids, mut invalid), (id_str,)| { | ||
| match id_str.parse::<Entity::Id>() { | ||
| Ok(id) => ids.push(id), | ||
| Err(parse_error) => { | ||
| tracing::warn!( | ||
| target: "cqrs", | ||
| aggregate_id = id_str, | ||
| aggregate_type = Entity::AGGREGATE_TYPE, | ||
| ?parse_error, | ||
| "Failed to parse aggregate ID (paginated)" | ||
| ); | ||
| invalid.push(id_str); | ||
| } | ||
| } | ||
| (ids, invalid) | ||
| }, | ||
| ); | ||
|
|
||
| if invalid.is_empty() { | ||
| Ok(ids) | ||
| } else { | ||
| Err(LoadAllIdsError::InvalidIds { | ||
| aggregate_type: Entity::AGGREGATE_TYPE, | ||
| ids: invalid, | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Extract duplicated parse-and-collect logic.
The rows.into_iter().fold(...) blocks at lines 542‑560 and 611‑629 are byte-for-byte identical except for the trailing log message. Any future change to ID parsing or invalid-ID handling must be applied twice and kept in sync — easy to miss. Extract a single helper, e.g.:
♻️ Proposed helper extraction
+fn partition_ids<Id>(
+ rows: Vec<(String,)>,
+ aggregate_type: &'static str,
+ context: &'static str,
+) -> (Vec<Id>, Vec<String>)
+where
+ Id: FromStr,
+ <Id as FromStr>::Err: Debug,
+{
+ rows.into_iter().fold(
+ (Vec::new(), Vec::new()),
+ |(mut ids, mut invalid), (id_str,)| {
+ match id_str.parse::<Id>() {
+ Ok(id) => ids.push(id),
+ Err(parse_error) => {
+ tracing::warn!(
+ target: "cqrs",
+ aggregate_id = id_str,
+ aggregate_type,
+ ?parse_error,
+ context,
+ "Failed to parse aggregate ID"
+ );
+ invalid.push(id_str);
+ }
+ }
+ (ids, invalid)
+ },
+ )
+}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/event-sorcery/src/lib.rs` around lines 524 - 639, Both load_all_ids
and load_ids_paginated duplicate the rows.into_iter().fold(...)
parse-and-collect logic; extract this into a single helper (e.g.,
parse_collect_ids or parse_ids_from_rows) that takes the Vec<(String,)> and a
log-context/message (or an enum flag) and returns Result<Vec<Entity::Id>,
Vec<String>> or (ids, invalid) so both functions can call it; update
load_all_ids and load_ids_paginated to call that helper and convert the helper
result into the existing Ok(...) / Err(LoadAllIdsError::InvalidIds { ... })
flow, preserving the aggregate_type and using the appropriate log message
context previously used in each function.
| fn apply(&mut self, event: Self::Event) { | ||
| *self = match std::mem::take(self) { | ||
| Self::Uninitialized => Entity::originate(&event).map_or_else( | ||
| || { | ||
| let err = LifecycleError::EventCantOriginate { event }; | ||
| error!(target: "cqrs", "lifecycle failed during originate: {err}"); | ||
| Self::Failed { | ||
| error: err, | ||
| last_valid_entity: None, | ||
| } | ||
| }, | ||
| Self::Live, | ||
| ), | ||
|
|
||
| Self::Live(entity) => match Entity::evolve(&entity, &event) { | ||
| Ok(Some(new_entity)) => Self::Live(new_entity), | ||
| Ok(None) => { | ||
| let err = LifecycleError::UnexpectedEvent { | ||
| entity: Box::new(entity.clone()), | ||
| event, | ||
| }; | ||
| error!(target: "cqrs", "lifecycle failed during evolve: {err}"); | ||
| Self::Failed { | ||
| error: err, | ||
| last_valid_entity: Some(Box::new(entity)), | ||
| } | ||
| } | ||
| Err(domain_err) => { | ||
| let err = LifecycleError::Apply(domain_err); | ||
| error!(target: "cqrs", "lifecycle failed during evolve: {err}"); | ||
| Self::Failed { | ||
| error: err, | ||
| last_valid_entity: Some(Box::new(entity)), | ||
| } | ||
| } | ||
| }, | ||
|
|
||
| Self::Failed { | ||
| error, | ||
| last_valid_entity, | ||
| } => { | ||
| let err = LifecycleError::AlreadyFailed { | ||
| failure: Box::new(error), | ||
| event, | ||
| }; | ||
| error!(target: "cqrs", "lifecycle already failed, ignoring event: {err}"); | ||
| Self::Failed { | ||
| error: err, | ||
| last_valid_entity, | ||
| } | ||
| } | ||
| }; | ||
| } | ||
| } |
There was a problem hiding this comment.
Unbounded AlreadyFailed nesting when events keep arriving on a Failed lifecycle.
The Self::Failed arm at lines 205‑218 always wraps the existing error in a fresh LifecycleError::AlreadyFailed { failure: Box::new(error), event }. If the prior error was already AlreadyFailed, it gets nested again — and again for every subsequent event. The chain grows linearly with the number of events applied after failure, which is reachable via:
View::update(line 230) callingapplyonce per envelope during replay of a long event log on an aggregate that failed early.- Continued command processing against a sticky-failed aggregate.
Each nesting level adds a Box<LifecycleError<Entity>> + a cloned Entity::Event and inflates the JSON when the lifecycle is serialized as a snapshot/view, obscuring the original failure under N layers of wrapping. The existing failed_state_is_sticky test only applies a single event and would not catch this.
Make the wrapping idempotent so the depth is capped at one:
🔧 Proposed fix
Self::Failed {
error,
last_valid_entity,
} => {
- let err = LifecycleError::AlreadyFailed {
- failure: Box::new(error),
- event,
- };
+ let err = match error {
+ LifecycleError::AlreadyFailed { .. } => error,
+ original => LifecycleError::AlreadyFailed {
+ failure: Box::new(original),
+ event,
+ },
+ };
error!(target: "cqrs", "lifecycle already failed, ignoring event: {err}");
Self::Failed {
error: err,
last_valid_entity,
}
}And add a regression test that applies ≥2 events after the failure and asserts the inner failure is not itself AlreadyFailed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/event-sorcery/src/lifecycle.rs` around lines 168 - 221, The
Self::Failed arm of apply is re-wrapping any existing LifecycleError in
LifecycleError::AlreadyFailed on every incoming event, causing unbounded
nesting; change apply (the Self::Failed { error, last_valid_entity } arm) to
detect if error is already an AlreadyFailed and reuse its inner failure (or
unwrap one level) instead of boxing it again so the depth is capped at one;
update the code that constructs LifecycleError::AlreadyFailed to either use the
existing failure when matches!(error, LifecycleError::AlreadyFailed{..}) or
extract failure when let LifecycleError::AlreadyFailed{failure, ..} = error and
build a single-layer AlreadyFailed containing that failure plus the new event;
add a regression test that calls apply (via View::update or directly) with at
least two events after a failure and asserts the inner failure is not an
AlreadyFailed.
| // Write directly with version = max_seq, bypassing the view repo's | ||
| // optimistic lock (which expects version + 1 increments). This is | ||
| // safe because catch_up runs once at startup before the main loop. | ||
| sqlx::query(&format!( | ||
| "INSERT INTO {table} (view_id, version, payload) \ | ||
| VALUES (?1, ?2, ?3) \ | ||
| ON CONFLICT(view_id) DO UPDATE SET version = ?2, payload = ?3" | ||
| )) | ||
| .bind(view_id) | ||
| .bind(max_seq) | ||
| .bind(&payload) | ||
| .execute(pool) | ||
| .await?; |
There was a problem hiding this comment.
catch_up can overwrite newer rows outside the startup path.
This upsert deliberately bypasses optimistic locking, but the only thing making that safe is the comment. Because catch_up, rebuild, and rebuild_all are public, a caller can invoke them while the projection reactor is live; if another event advances the row in that window, this statement can write an older payload/version back over it. Please make these APIs startup-only or add version-checked coordination around this write.
🧰 Tools
🪛 OpenGrep (1.20.0)
[ERROR] 418-422: SQL query built via format!() passed to a database method. Use parameterized queries with bind parameters instead.
(coderabbit.sql-injection.rust-format-query)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/event-sorcery/src/projection.rs` around lines 415 - 427, The catch_up
upsert bypasses optimistic locking and can clobber newer rows if called while
the reactor is live; either make the startup-only APIs (catch_up, rebuild,
rebuild_all) non-public or make the upsert version-aware: change the SQL used in
this block (the query building around INSERT INTO {table} ... ON
CONFLICT(view_id) DO UPDATE ...) to only replace the row when the incoming
version is newer (e.g. use the excluded/parameter version in the DO UPDATE ...
WHERE existing.version < excluded.version or WHERE version < ?2) so you never
write an older payload/version over a newer one, and keep the rest of the write
logic (bind(view_id), bind(max_seq), bind(&payload), execute(pool).await?) the
same.
| let (mut lifecycle, context) = match self.repo.load_with_context(&view_id).await { | ||
| Ok(Some(pair)) => pair, | ||
| Ok(None) => (Lifecycle::default(), ViewContext::new(view_id.clone(), 0)), | ||
| Err(error) => { | ||
| warn!(target: "cqrs", %view_id, ?error, "Failed to load view for update"); | ||
| return Ok(()); | ||
| } | ||
| }; | ||
|
|
||
| lifecycle.apply(event.clone()); | ||
|
|
||
| match self.repo.update_view(lifecycle, context).await { | ||
| Ok(()) => return Ok(()), | ||
| Err(PersistenceError::OptimisticLockError) if attempt < max_retries => { | ||
| let delay_ms = (base_delay_ms * 2u64.pow(attempt)).min(max_delay_ms); | ||
| warn!( | ||
| target: "cqrs", | ||
| %view_id, attempt = attempt + 1, max_retries, delay_ms, | ||
| "Optimistic lock conflict, retrying view update" | ||
| ); | ||
| sleep(Duration::from_millis(delay_ms)).await; | ||
| } | ||
| Err(PersistenceError::OptimisticLockError) => { | ||
| error!( | ||
| target: "cqrs", | ||
| %view_id, max_retries, | ||
| "View update lost: optimistic lock conflict persisted after all retries" | ||
| ); | ||
| return Ok(()); | ||
| } | ||
| Err(error) => { | ||
| warn!(target: "cqrs", %view_id, ?error, "Failed to save view update"); | ||
| return Ok(()); |
There was a problem hiding this comment.
Don't acknowledge dropped projection updates.
react() returns Ok(()) for both load failures and terminal save failures. After one event is skipped, a later event for the same aggregate can be applied to stale state and leave the materialized view wrong until a full rebuild. This needs to fail closed or trigger a gap-aware recovery path before later events are accepted.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/event-sorcery/src/projection.rs` around lines 565 - 597, The react()
handler currently swallows load/save failures by returning Ok(()) which lets
subsequent events be applied to stale state; modify react() so that on load
failure (Err from self.repo.load_with_context) and on terminal save failure
(final PersistenceError::OptimisticLockError and other Err(error) arms from
self.repo.update_view) it returns an Err or triggers the gap/recovery path
instead of Ok(()). Concretely, change the match arm handling Err(error) from
load_with_context to propagate/convert that error (e.g. return
Err(ProjectionError::LoadFailure(error)) or call the existing gap-aware recovery
API), and change the final Err branches after update_view to either return
Err(ProjectionError::SaveFailure(error)) or invoke
mark_gap_for_recovery(view_id) / enqueue_full_rebuild(view_id) instead of
returning Ok(()), ensuring react()'s signature and callers accept/handle the
propagated failure.
| if row_count.0 > 0 { | ||
| let non_null_count: (i64,) = sqlx::query_as(&format!( | ||
| "SELECT COUNT(*) FROM {table} | ||
| WHERE {column_name} IS NOT NULL" | ||
| )) | ||
| .fetch_one(pool) | ||
| .await?; | ||
|
|
||
| if non_null_count.0 == 0 { | ||
| warn!( | ||
| target: "cqrs", | ||
| %column_name, %table, row_count = %row_count.0, | ||
| "Generated column has all NULL values, likely stale JSON path" | ||
| ); | ||
|
|
||
| return Err(ProjectionError::StaleColumn { | ||
| column: column.clone(), | ||
| table: table.to_string(), | ||
| row_count: row_count.0, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Don't equate “all NULL” with a stale generated column.
A generated column can legitimately be NULL for every current row, e.g. an optional JSON path or a newly introduced field that has not been populated yet. In that case filter() rejects a valid schema with StaleColumn. Existence is the only thing this helper can prove reliably here; the all-NULL case should be opt-in or warning-only.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/event-sorcery/src/projection.rs` around lines 638 - 658, The code
currently treats non_null_count == 0 as fatal and returns
ProjectionError::StaleColumn; instead, change the behavior so that when
row_count.0 > 0 and non_null_count.0 == 0 you only emit the warn! (target:
"cqrs", %column_name, %table, row_count = %row_count.0, ...) and do not return
Err; remove the return of ProjectionError::StaleColumn for this case (or make it
behind an explicit opt-in flag such as fail_on_all_nulls if you need
configurable strictness), leaving existence checks (row_count) as the only
automatic failure path and letting all-NULL be a warning-only condition.
| let rows = match &aggregate_id { | ||
| Some(id) => { | ||
| sqlx::query(&query) | ||
| .bind(&aggregate_type) | ||
| .bind(id) | ||
| .fetch_all(&pool) | ||
| .await | ||
| } | ||
| None => { | ||
| sqlx::query(&query) | ||
| .bind(&aggregate_type) | ||
| .fetch_all(&pool) | ||
| .await | ||
| } | ||
| }; | ||
|
|
||
| let rows = match rows { | ||
| Ok(rows) => rows, | ||
| Err(e) => { | ||
| let _ = feed | ||
| .push(Err(PersistenceError::ConnectionError(Box::new(e)))) | ||
| .await; | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| for row in &rows { | ||
| let event = match row_to_serialized_event(row) { | ||
| Ok(event) => event, | ||
| Err(e) => { | ||
| let _ = feed | ||
| .push(Err(PersistenceError::DeserializationError(Box::new(e)))) | ||
| .await; | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| if feed.push(Ok(event)).await.is_err() { | ||
| return; | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat crates/sqlite-es/src/event_repository.rs | head -n 320 | tail -n +240Repository: ST0x-Technology/event-sorcery
Length of output: 2686
🏁 Script executed:
grep -n "ReplayStream" crates/sqlite-es/src/event_repository.rs | head -20Repository: ST0x-Technology/event-sorcery
Length of output: 506
🏁 Script executed:
rg "struct ReplayStream|impl ReplayStream" -A 20Repository: ST0x-Technology/event-sorcery
Length of output: 55
🏁 Script executed:
rg "ReplayStream" --type rust -B 2 -A 15 | head -100Repository: ST0x-Technology/event-sorcery
Length of output: 8343
🏁 Script executed:
fd "*.rs" | xargs grep -l "struct ReplayStream"Repository: ST0x-Technology/event-sorcery
Length of output: 359
🏁 Script executed:
rg "pub struct ReplayStream|pub type ReplayStream" -A 10Repository: ST0x-Technology/event-sorcery
Length of output: 55
🏁 Script executed:
cat crates/sqlite-es/src/event_repository.rs | wc -lRepository: ST0x-Technology/event-sorcery
Length of output: 77
🏁 Script executed:
rg "stream_channel_size" -B 3 -A 3Repository: ST0x-Technology/event-sorcery
Length of output: 4415
🏁 Script executed:
rg "cqrs_es|cqrs-es" Cargo.tomlRepository: ST0x-Technology/event-sorcery
Length of output: 92
🏁 Script executed:
rg "pub.*fn push|async.*push" --type rust -B 2 -A 5 | head -60Repository: ST0x-Technology/event-sorcery
Length of output: 55
🏁 Script executed:
rg "ReplayStream::new" -B 5 -A 10Repository: ST0x-Technology/event-sorcery
Length of output: 2980
🏁 Script executed:
cat crates/sqlite-es/src/event_repository.rs | sed -n '250,303p'Repository: ST0x-Technology/event-sorcery
Length of output: 1942
🌐 Web query:
cqrs-es 0.4.12 ReplayStream channel bounded backpressure
💡 Result:
In cqrs-es (Rust), the ReplayStream is implemented as a bounded channel whose “backpressure” effect is provided by the channel capacity parameter you pass when creating the stream (channel_size). The docs.rs API shows ReplayStream plus related abstractions (ReplayFeed, QueryReplay) in the persist module. [1][2] From the project’s store implementations, a concrete stream type is created via ReplayStream::new(channel_size) (older name) or MpscReplayStream::new(channel_size) (newer name), and that channel_size value is threaded through the event streaming functions. This strongly indicates that bounded-capacity behavior (producer awaiting when the buffer is full) is the backpressure mechanism used during replay. [3] What I could not confirm from available documentation/snippets: - The exact Tokio channel type used inside ReplayStream/MpscReplayStream (e.g., tokio::sync::mpsc vs another), and whether it uses synchronous send/await semantics or drop semantics. - The precise bounded backpressure semantics for “replay” (e.g., whether the SQL fetch task pauses when the downstream consumer lags). If you want, paste the cqrs-es 0.4.12 source for persist::ReplayStream (or its struct definition) and I can map the channel/backpressure behavior precisely to the Tokio primitives used.
Citations:
- 1: https://docs.rs/cqrs-es/latest/cqrs_es/persist/index.html
- 2: https://docs.rs/cqrs-es/latest/cqrs_es/all.html
- 3: serverlesstechnology/mysql-es@main...abstract_replay_stream
🌐 Web query:
sqlx fetch_all vs fetch streaming memory usage
💡 Result:
In sqlx (Rust), fetch_all allocates and buffers the entire result set into a Vec, so its memory usage grows with the total number of rows (and their decoded sizes). By contrast, the fetch method returns a Stream you can iterate row-by-row, so peak memory can stay roughly bounded by the driver/stream buffering and your per-row processing. 1) What fetch_all does to memory - sqlx::Query::fetch_all collects the full result set “into a Vec”. [1] - The docs explicitly warn: “This will attempt to collect the full result set of the query into memory” and advise using LIMIT to avoid exhausting memory. [1] So if you switch between: - fetch_all: memory ≈ O(total_rows × row_size) (plus Vec overhead) - fetch/stream iteration: memory ≈ O(buffering + per-row decoded values) 2) What “streaming” (fetch) implies - sqlx::Query::fetch returns “the generated results as a futures Stream”. [2] - Because you don’t materialize a Vec, you can process rows incrementally; sqlx’s API is designed to support streaming consumption rather than full collection. [2] 3) Practical guidance - Use fetch_all only when you truly need all rows at once (and the result size is bounded; add LIMIT). [1] - For large/unknown result sets, prefer fetch + stream iteration to avoid loading everything into memory. [2][1] Summary (memory usage): - fetch_all: unbounded-by-design with respect to row count; can exhaust memory. [1] - fetch (stream): peak memory can remain much lower because results are consumed incrementally. [2]
Citations:
- 1: https://docs.rs/sqlx/latest/sqlx/query/struct.Map.html
- 2: https://docs.rs/sqlx/latest/sqlx/query/struct.Query.html
🏁 Script executed:
cat crates/sqlite-es/src/event_repository.rs | sed -n '1,30p'Repository: ST0x-Technology/event-sorcery
Length of output: 1130
🏁 Script executed:
rg "fetch\(" --type rust -B 2 -A 5 crates/sqlite-es/src/event_repository.rsRepository: ST0x-Technology/event-sorcery
Length of output: 55
🏁 Script executed:
rg "\.fetch\(" --type rust -B 2 -A 3Repository: ST0x-Technology/event-sorcery
Length of output: 55
🏁 Script executed:
rg "\.fetch_one\(|\.fetch_optional\(" --type rust -B 2 -A 3Repository: ST0x-Technology/event-sorcery
Length of output: 4897
🏁 Script executed:
rg "stream_events_impl" -B 2 -A 30 | head -100Repository: ST0x-Technology/event-sorcery
Length of output: 8084
🏁 Script executed:
cat crates/event-sorcery/src/sqlite_event_repository.rs | sed -n '213,280p'Repository: ST0x-Technology/event-sorcery
Length of output: 2116
🏁 Script executed:
cat crates/event-sorcery/src/sqlite_event_repository.rs | sed -n '195,280p'Repository: ST0x-Technology/event-sorcery
Length of output: 2757
Replace fetch_all with row-by-row fetch to enable streaming backpressure.
Lines 262–276 materialize the entire result set into memory before pushing to the channel. For large aggregates, this causes memory spikes and stalls, despite the streaming API intent. The channel's backpressure cannot prevent the initial allocation.
Use row-by-row fetch iteration so backpressure from feed.push() can regulate query progress and keep memory bounded by the channel size.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sqlite-es/src/event_repository.rs` around lines 262 - 303, The code
currently calls sqlx::query(...).fetch_all(&pool).await which materializes the
entire result set; change it to use sqlx::query(&query).bind(...).fetch(&pool)
and iterate the returned stream (using futures::stream::StreamExt or equivalent)
so you process rows one-by-one and only call feed.push(Ok(event)).await per row;
handle and map any stream/row error into PersistenceError::ConnectionError
(pushing Err to feed) and continue to map deserialization errors from
row_to_serialized_event into PersistenceError::DeserializationError (pushing Err
to feed), and stop iteration if feed.push().await returns Err, ensuring
backpressure from feed regulates the query.
| mod cqrs; | ||
| mod event_repository; | ||
| mod sql_query; | ||
| pub mod testing; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find and inspect the main lib.rs file
fd "lib.rs" crates/sqlite-es/srcRepository: ST0x-Technology/event-sorcery
Length of output: 101
🏁 Script executed:
# Check the line 49 and surrounding context
head -60 crates/sqlite-es/src/lib.rs | tail -20Repository: ST0x-Technology/event-sorcery
Length of output: 661
🏁 Script executed:
# Check if testing module exists
fd "testing.rs" crates/sqlite-es/srcRepository: ST0x-Technology/event-sorcery
Length of output: 105
🏁 Script executed:
# Check the Cargo.toml for crates/sqlite-es
cat crates/sqlite-es/Cargo.tomlRepository: ST0x-Technology/event-sorcery
Length of output: 580
🏁 Script executed:
cat crates/sqlite-es/src/testing.rsRepository: ST0x-Technology/event-sorcery
Length of output: 475
🏁 Script executed:
# Also check if there's a migrations directory in the workspace
fd "migrations" --type d crates/sqlite-es/Repository: ST0x-Technology/event-sorcery
Length of output: 55
🏁 Script executed:
# Check workspace root for migrations
fd "migrations" --type d --max-depth 2Repository: ST0x-Technology/event-sorcery
Length of output: 85
Do not expose testing as an unconditional public module.
The pub mod testing; declaration on line 49 makes test-only utilities and the create_test_pool() function part of the production API surface. This couples crate compilation to the workspace migration path (../../migrations), requiring that path to be valid for all consumers of the crate, not just for tests.
Gate this with #[cfg(any(test, feature = "test-support"))] and define the feature in crates/sqlite-es/Cargo.toml if you want to conditionally expose test utilities.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sqlite-es/src/lib.rs` at line 49, The testing module is being exported
unconditionally via `pub mod testing;`, exposing test-only APIs like
`create_test_pool()` to production consumers and forcing the migrations path
requirement; change the declaration to be conditional by adding the attribute
`#[cfg(any(test, feature = "test-support"))]` above `pub mod testing;` so it
only compiles for tests or when the feature is enabled, and then add the
`test-support` feature to crates/sqlite-es/Cargo.toml (documenting it as
optional test utilities) so consumers opt-in when needed.
| pub(crate) const fn new(events_table: String, snapshots_table: String) -> Self { | ||
| Self { | ||
| events_table, | ||
| snapshots_table, | ||
| } | ||
| } |
There was a problem hiding this comment.
Harden dynamic table-name handling before SQL string interpolation.
Lines 7-12 and 107-130 interpolate identifiers directly into SQL. If any table name comes from config/user-controlled input, this becomes an injection and query-integrity footgun.
Introduce a validated identifier type (allowlist like ^[A-Za-z_][A-Za-z0-9_]*$) and quote identifiers consistently before composing SQL.
As per coding guidelines, "Flag potential footguns."
Also applies to: 107-130
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sqlite-es/src/sql_query.rs` around lines 7 - 12, The constructor new
currently accepts raw Strings for events_table and snapshots_table and those
identifiers are interpolated into SQL elsewhere (lines ~107-130), creating an
injection risk; introduce a validated Identifier (newtype) that enforces an
allowlist regex like ^[A-Za-z_][A-Za-z0-9_]*$ and use that type instead of raw
String in pub(crate) const fn new(events_table: String, snapshots_table: String)
(validate at construction or provide a try_new returning Result), and when
composing SQL always quote identifiers consistently (e.g. double-quote/escape
via a helper like Identifier::quoted()) before interpolating into SQL in the
methods that build queries (the code paths around the SQL composition at lines
107-130).
| let pool = Pool::<Sqlite>::connect(":memory:").await?; | ||
| sqlx::migrate!("../../migrations").run(&pool).await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's read the actual file to confirm the code matches the snippet
cat -n crates/sqlite-es/src/testing.rsRepository: ST0x-Technology/event-sorcery
Length of output: 559
🏁 Script executed:
# Check if sqlx is in the dependencies and what version
find . -name "Cargo.toml" -type f | head -5 | xargs grep -l "sqlx" 2>/dev/nullRepository: ST0x-Technology/event-sorcery
Length of output: 150
🏁 Script executed:
# Let's search for how SQLx handles in-memory databases with pooling
find . -name "*.rs" -type f | xargs grep -l "connect.*:memory:" 2>/dev/null | head -10Repository: ST0x-Technology/event-sorcery
Length of output: 335
🏁 Script executed:
# Check sqlx version in the relevant Cargo.toml
cat crates/sqlite-es/Cargo.tomlRepository: ST0x-Technology/event-sorcery
Length of output: 580
🌐 Web query:
SQLx SQLite in-memory database pooling behavior :memory: multiple connections
💡 Result:
SQLx supports pooling with SQLite:memory: databases, but the behavior requires specific configuration due to SQLite's nature where plain ":memory:" creates a private database per connection[1][2]. SQLx handles this by automatically enabling shared cache and using a named in-memory URI like "file:sqlx-in-memory-{seqno}?mode=memory&cache=shared" when parsing "sqlite::memory:" via SqliteConnectOptions::from_str(":memory:")[3][4]. When using SqlitePool with multiple connections (default max=10), all connections share the same in-memory database thanks to this shared cache mechanism, introduced in PR #625[5]. However, issues arise because the pool periodically closes idle connections via idle_timeout (default 10m) or max_lifetime (default 30m), and if the pool drops to 0 connections before min_connections (default 0) maintenance recreates them, the shared in-memory DB is destroyed as no connections remain[4][6][7]. Subsequent acquires create a fresh empty DB, causing "no such table" errors intermittently[4]. Workarounds and recommendations: - Set max_connections(1), idle_timeout(None), max_lifetime(None) for a single persistent connection[4][7]. - Or use min_connections(3+), but not fully reliable[4]. - Prefer SqliteConnectOptions::from_str("sqlite::memory:") over manual.filename(":memory:"), as the former sets in_memory=true and shared_cache=true correctly[3][2]. Official docs note ":memory:" opens an in-memory DB but warn low-level APIs lack special treatment; recommend from_str for simple cases[2]. No explicit pooling warnings in current docs, but GitHub issues highlight these pitfalls[4][3]. Example for reliable single-connection pool: let opts = SqliteConnectOptions::from_str("sqlite::memory:").unwrap(); let pool = SqlitePoolOptions::new() .max_connections(1) .min_connections(1) .idle_timeout(None) .max_lifetime(None) .connect_with(opts) .await?; This keeps the DB alive[4]. For production/multi-conn, consider file-based SQLite or other DBs, as in-memory pooling is fragile under load[8].
Citations:
- 1: https://www2.sqlite.org/inmemorydb.html
- 2: https://docs.rs/sqlx/latest/sqlx/sqlite/struct.SqliteConnectOptions.html
- 3: SqliteConnectOptions::new() doesn't set in_memory = true launchbadge/sqlx#3136
- 4: sqlite in-memory databases do not seem to work with connection pools launchbadge/sqlx#2510
- 5: Add support for pooling in-memory sqlite DB, enabling shared cache launchbadge/sqlx#625
- 6: Keeping in-memory sqlite database connection pool alive launchbadge/sqlx#2011
- 7: Opening "sqlite::memory:" is broken for SqlitePool launchbadge/sqlx#362
- 8: https://news.ycombinator.com/item?id=44715527
🌐 Web query:
SQLx sqlite::memory: vs :memory: connection string behavior
💡 Result:
In sqlx for Rust, the standard connection string for an in-memory SQLite database is "sqlite::memory:" (with double colon), which parses to create a pure in-memory DB with special handling.[1][2] This is listed explicitly in the official documentation as the URL for opening an in-memory database.[1] ":memory:" alone (single colon prefix) is not a full sqlx connection URL; sqlx expects the "sqlite:" scheme. Using SqliteConnectOptions::from_str(":memory:") may work in some parsing contexts but is not the documented format and lacks the scheme.[3][1] Historically, "sqlite::memory:" had parsing bugs in early versions (e.g., turning into invalid "memory:"), but these were fixed with dedicated SQLite URL parsing.[2] Key behavioral differences and gotchas, especially with connection pools: - Each ":memory:" connection creates a distinct, private in-memory DB that disappears when the connection closes. In pools with max_connections > 1, different pool connections see empty/separate DBs, causing data loss (tables "disappear").[4][5][2][6] Workarounds: set max_connections=1, or min_connections >=1 with max_lifetime=None and idle_timeout=None to reuse connections.[5] - sqlx's parser for "sqlite::memory:" sets in_memory=true, shared_cache=true, and rewrites to a unique "file:sqlx-in-memory-{seqno}" URI filename. This enables SQLite shared cache mode, allowing multiple connections in a pool to share the same in-memory DB (abusing URI-named memory DBs).[3][7] Using the builder pattern SqliteConnectOptions::new.filename(":memory:") skips this special logic, leaving in_memory=false, causing pool deadlocks or separate DBs.[3] Examples: let pool = SqlitePool::connect("sqlite::memory:").await?; // Special handling, shared via seqno [3] let opts = SqliteConnectOptions::from_str("sqlite::memory:").unwrap(); let pool = SqlitePool::connect_with(opts).await?; // Same as above [1][3] For single connection (no pool issues): let conn = SqliteConnection::connect("sqlite::memory:").await?; // Or "sqlite://:memory:" [11] ":memory:" without "sqlite:" is invalid as a full URL but may parse leniently via from_str; prefer documented "sqlite::memory:".[1][2]
Citations:
- 1: https://docs.rs/sqlx/latest/sqlx/sqlite/struct.SqliteConnectOptions.html
- 2: Opening "sqlite::memory:" is broken for SqlitePool launchbadge/sqlx#362
- 3: SqliteConnectOptions::new() doesn't set in_memory = true launchbadge/sqlx#3136
- 4: https://sqlite.org/inmemorydb.html
- 5: sqlite in-memory databases do not seem to work with connection pools launchbadge/sqlx#2510
- 6: Inmemory table dissappears after query jmoiron/sqlx#164
- 7: Add support for pooling in-memory sqlite DB, enabling shared cache launchbadge/sqlx#625
🏁 Script executed:
# Verify SqlitePoolOptions is available in sqlx
rg "SqlitePoolOptions" --type rust | head -5Repository: ST0x-Technology/event-sorcery
Length of output: 55
Fix in-memory SQLite pool to prevent flaky test failures.
The current code uses :memory: with Pool::connect(), which creates isolated in-memory databases per connection. With multiple pooled connections, migrations applied on one connection won't be visible to others, causing intermittent "no such table" errors. Additionally, if idle connections are closed and the pool drops to zero connections, the in-memory database is destroyed entirely.
Apply the proposed fix to use max_connections(1) and the correct sqlite::memory: URI format.
Proposed fix
-use sqlx::{Pool, Sqlite};
+use sqlx::{sqlite::SqlitePoolOptions, Pool, Sqlite};
pub async fn create_test_pool() -> Result<Pool<Sqlite>, sqlx::Error> {
- let pool = Pool::<Sqlite>::connect(":memory:").await?;
+ let pool = SqlitePoolOptions::new()
+ .max_connections(1)
+ .connect("sqlite::memory:")
+ .await?;
sqlx::migrate!("../../migrations").run(&pool).await?;
Ok(pool)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let pool = Pool::<Sqlite>::connect(":memory:").await?; | |
| sqlx::migrate!("../../migrations").run(&pool).await?; | |
| use sqlx::{sqlite::SqlitePoolOptions, Pool, Sqlite}; | |
| pub async fn create_test_pool() -> Result<Pool<Sqlite>, sqlx::Error> { | |
| let pool = SqlitePoolOptions::new() | |
| .max_connections(1) | |
| .connect("sqlite::memory:") | |
| .await?; | |
| sqlx::migrate!("../../migrations").run(&pool).await?; | |
| Ok(pool) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sqlite-es/src/testing.rs` around lines 9 - 10, The in-memory SQLite
pool must be created with a single shared connection and the correct URI to
avoid isolated in-memory databases; replace the
Pool::<Sqlite>::connect(":memory:").await? call with creating the pool via
SqlitePoolOptions::new().max_connections(1).connect("sqlite::memory:").await?
and keep running migrations with
sqlx::migrate!("../../migrations").run(&pool).await? so migrations run on the
single shared connection.
| if context.version == 0 { | ||
| let insert_query = SqlQueryFactory::insert_view(&self.view_table); | ||
|
|
||
| sqlx::query(&insert_query) | ||
| .bind(&context.view_instance_id) | ||
| .bind(new_version) | ||
| .bind(&payload) | ||
| .execute(&self.pool) | ||
| .await | ||
| .map_err(SqliteViewError::Connection)?; | ||
| } else { |
There was a problem hiding this comment.
Handle first-write races as optimistic-lock conflicts.
Line 169 routes version == 0 to insert, but Lines 172-178 convert conflicting inserts into ConnectionError instead of OptimisticLockError. Concurrent creators for the same view_id won’t hit the optimistic-lock retry path.
Also applies to: 191-193
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/sqlite-es/src/view_repository.rs` around lines 169 - 179, The insert
branch that runs when context.version == 0 currently maps any insert conflict to
SqliteViewError::Connection, which hides first-write races from the
optimistic-lock retry path; catch the specific constraint/unique-violation error
returned by sqlx when executing SqlQueryFactory::insert_view(&self.view_table)
(the same pattern used for update conflicts) and map that case to
SqliteViewError::OptimisticLockError instead of Connection, leaving other
database errors mapped to Connection; apply the identical change to the other
insert-location noted (the same insert handling around the later insert block
that currently maps to Connection) so concurrent creators produce
OptimisticLockError and trigger retry logic.
* feat: AGENTS.md, SPEC.md, README, and docs AGENTS.md adapted from upstream st0x conventions, trimmed to the rules that apply here (drops alloy/brokerage/onchain bits). SPEC.md describes the library's goals, components, and behavior. docs/domain.md is the CQRS/ES glossary and naming conventions. docs/cqrs.md is a usage reference. docs/sqlx.md is the running pitfall log. docs/ttdd.md is the type-driven TDD methodology. CLAUDE.md is a symlink to AGENTS.md so both names resolve to the same source of truth. * feat: GitHub Actions CI (#5) * feat: GitHub Actions CI Matrix runs cargo check / nextest / clippy / fmt --check, plus a separate job for pre-commit hooks. Every step uses 'nix develop -c …' so CI runs the same toolchain as local dev. Caches ~/.cargo and target/ keyed on lockfile + flake.lock. * feat: replace SqliteProjectionRepo with RepoFamily GAT (#6) * feat: replace SqliteProjectionRepo with ViewBackend GAT The previous design parameterized Projection over a Repo type that was bound to ViewRepository<Lifecycle<Entity>, Lifecycle<Entity>>. That bound named the pub(crate) Lifecycle type in a public position, tripping the private_bounds lint, and was suppressed with #[allow(private_bounds)] in three impl blocks pending the crate's extraction. ViewBackend is a GAT-on-trait HKT emulation: a ViewBackend instance is a type-level function (View, Aggregate) -> SomeRepo. Projection<Entity, Backend: ViewBackend> applies the function internally to obtain Backend::Repo<Lifecycle<Entity>, Lifecycle<Entity>>, so Lifecycle saturation happens inside the struct/method bodies, not in any public bound. SqliteViewBackend is the default. The three #[allow(private_bounds)] suppressions are removed. Test repos InMemoryRepo<View, Agg> and ConflictingRepo<View, Agg> made generic with paired ViewBackend adapters so existing tests still compile. * docs: add examples of how to use the repo (#7) Adds a runnable examples/ directory at the workspace root covering the core event-sorcery surface: - basic_entity: smallest setup -- EventSourced, Store, send/load, load_entity, send_command, load_all_ids, count_aggregates, load_ids_paginated, compact_events, incremental_vacuum. - projection: Materialized = Table with a SQLite generated column for filtered queries; load/load_all/filter/rebuild/rebuild_all; injects a domain Service via Arc<dyn Clock>. - reactor: multi-entity StockAlert reactor wired across two stores plus a single-entity AuditLog reactor running alongside an auto-projection. Each example has a README.md explaining the concept and a #[cfg(all(test, feature = "test-support"))] mod tests block exercising replay, TestHarness, TestStore, SpyReactor, and ReactorHarness. examples/README.md indexes the three; the repo-root README.md links to it. CI now runs every example in a dedicated matrix job and includes --all-targets so example test modules execute under cargo nextest.
* feat: nix dev shell + direnv Rainix-based nix flake providing the rust toolchain, sqlx-cli, cargo-expand, cargo-nextest, and the pre-commit hooks listed in .pre-commit-config.yaml. .envrc auto-loads the shell via nix-direnv. flake.lock pins all inputs for reproducibility. * feat: copy sqlite-es and event-sorcery crates (#3) * feat: workspace + copy sqlite-es and event-sorcery crates Crates copied verbatim from st0x.issuance (sqlite-es) and st0x.liquidity (event-sorcery). Adds workspace Cargo.toml with strict lints (warnings/clippy::all/pedantic/nursery/unwrap_used/expect_used denied; unsafe_code forbidden), clippy.toml exempting unwrap/expect in tests, and the canonical events+snapshots schema migration. Workspace dependency versions audited and bumped to current semver-compatible releases. cqrs-es held at 0.4.12 — 0.5.0 is a major bump pending deliberate upgrade. * feat: AGENTS.md, README, and docs (cqrs, sqlx, ttdd) (#4) * feat: AGENTS.md, SPEC.md, README, and docs AGENTS.md adapted from upstream st0x conventions, trimmed to the rules that apply here (drops alloy/brokerage/onchain bits). SPEC.md describes the library's goals, components, and behavior. docs/domain.md is the CQRS/ES glossary and naming conventions. docs/cqrs.md is a usage reference. docs/sqlx.md is the running pitfall log. docs/ttdd.md is the type-driven TDD methodology. CLAUDE.md is a symlink to AGENTS.md so both names resolve to the same source of truth. * feat: GitHub Actions CI (#5) * feat: GitHub Actions CI Matrix runs cargo check / nextest / clippy / fmt --check, plus a separate job for pre-commit hooks. Every step uses 'nix develop -c …' so CI runs the same toolchain as local dev. Caches ~/.cargo and target/ keyed on lockfile + flake.lock. * feat: replace SqliteProjectionRepo with RepoFamily GAT (#6) * feat: replace SqliteProjectionRepo with ViewBackend GAT The previous design parameterized Projection over a Repo type that was bound to ViewRepository<Lifecycle<Entity>, Lifecycle<Entity>>. That bound named the pub(crate) Lifecycle type in a public position, tripping the private_bounds lint, and was suppressed with #[allow(private_bounds)] in three impl blocks pending the crate's extraction. ViewBackend is a GAT-on-trait HKT emulation: a ViewBackend instance is a type-level function (View, Aggregate) -> SomeRepo. Projection<Entity, Backend: ViewBackend> applies the function internally to obtain Backend::Repo<Lifecycle<Entity>, Lifecycle<Entity>>, so Lifecycle saturation happens inside the struct/method bodies, not in any public bound. SqliteViewBackend is the default. The three #[allow(private_bounds)] suppressions are removed. Test repos InMemoryRepo<View, Agg> and ConflictingRepo<View, Agg> made generic with paired ViewBackend adapters so existing tests still compile. * docs: add examples of how to use the repo (#7) Adds a runnable examples/ directory at the workspace root covering the core event-sorcery surface: - basic_entity: smallest setup -- EventSourced, Store, send/load, load_entity, send_command, load_all_ids, count_aggregates, load_ids_paginated, compact_events, incremental_vacuum. - projection: Materialized = Table with a SQLite generated column for filtered queries; load/load_all/filter/rebuild/rebuild_all; injects a domain Service via Arc<dyn Clock>. - reactor: multi-entity StockAlert reactor wired across two stores plus a single-entity AuditLog reactor running alongside an auto-projection. Each example has a README.md explaining the concept and a #[cfg(all(test, feature = "test-support"))] mod tests block exercising replay, TestHarness, TestStore, SpyReactor, and ReactorHarness. examples/README.md indexes the three; the repo-root README.md links to it. CI now runs every example in a dedicated matrix job and includes --all-targets so example test modules execute under cargo nextest.
* chore: bootstrap repo with LICENSE and shared config MIT LICENSE plus repo-wide configuration that applies to every PR in the stack: .gitignore, .yamlfmt (formatter config), and .coderabbit.yaml (review automation). * feat: nix flake (#2) * feat: nix dev shell + direnv Rainix-based nix flake providing the rust toolchain, sqlx-cli, cargo-expand, cargo-nextest, and the pre-commit hooks listed in .pre-commit-config.yaml. .envrc auto-loads the shell via nix-direnv. flake.lock pins all inputs for reproducibility. * feat: copy sqlite-es and event-sorcery crates (#3) * feat: workspace + copy sqlite-es and event-sorcery crates Crates copied verbatim from st0x.issuance (sqlite-es) and st0x.liquidity (event-sorcery). Adds workspace Cargo.toml with strict lints (warnings/clippy::all/pedantic/nursery/unwrap_used/expect_used denied; unsafe_code forbidden), clippy.toml exempting unwrap/expect in tests, and the canonical events+snapshots schema migration. Workspace dependency versions audited and bumped to current semver-compatible releases. cqrs-es held at 0.4.12 — 0.5.0 is a major bump pending deliberate upgrade. * feat: AGENTS.md, README, and docs (cqrs, sqlx, ttdd) (#4) * feat: AGENTS.md, SPEC.md, README, and docs AGENTS.md adapted from upstream st0x conventions, trimmed to the rules that apply here (drops alloy/brokerage/onchain bits). SPEC.md describes the library's goals, components, and behavior. docs/domain.md is the CQRS/ES glossary and naming conventions. docs/cqrs.md is a usage reference. docs/sqlx.md is the running pitfall log. docs/ttdd.md is the type-driven TDD methodology. CLAUDE.md is a symlink to AGENTS.md so both names resolve to the same source of truth. * feat: GitHub Actions CI (#5) * feat: GitHub Actions CI Matrix runs cargo check / nextest / clippy / fmt --check, plus a separate job for pre-commit hooks. Every step uses 'nix develop -c …' so CI runs the same toolchain as local dev. Caches ~/.cargo and target/ keyed on lockfile + flake.lock. * feat: replace SqliteProjectionRepo with RepoFamily GAT (#6) * feat: replace SqliteProjectionRepo with ViewBackend GAT The previous design parameterized Projection over a Repo type that was bound to ViewRepository<Lifecycle<Entity>, Lifecycle<Entity>>. That bound named the pub(crate) Lifecycle type in a public position, tripping the private_bounds lint, and was suppressed with #[allow(private_bounds)] in three impl blocks pending the crate's extraction. ViewBackend is a GAT-on-trait HKT emulation: a ViewBackend instance is a type-level function (View, Aggregate) -> SomeRepo. Projection<Entity, Backend: ViewBackend> applies the function internally to obtain Backend::Repo<Lifecycle<Entity>, Lifecycle<Entity>>, so Lifecycle saturation happens inside the struct/method bodies, not in any public bound. SqliteViewBackend is the default. The three #[allow(private_bounds)] suppressions are removed. Test repos InMemoryRepo<View, Agg> and ConflictingRepo<View, Agg> made generic with paired ViewBackend adapters so existing tests still compile. * docs: add examples of how to use the repo (#7) Adds a runnable examples/ directory at the workspace root covering the core event-sorcery surface: - basic_entity: smallest setup -- EventSourced, Store, send/load, load_entity, send_command, load_all_ids, count_aggregates, load_ids_paginated, compact_events, incremental_vacuum. - projection: Materialized = Table with a SQLite generated column for filtered queries; load/load_all/filter/rebuild/rebuild_all; injects a domain Service via Arc<dyn Clock>. - reactor: multi-entity StockAlert reactor wired across two stores plus a single-entity AuditLog reactor running alongside an auto-projection. Each example has a README.md explaining the concept and a #[cfg(all(test, feature = "test-support"))] mod tests block exercising replay, TestHarness, TestStore, SpyReactor, and ReactorHarness. examples/README.md indexes the three; the repo-root README.md links to it. CI now runs every example in a dedicated matrix job and includes --all-targets so example test modules execute under cargo nextest.

Copies
sqlite-escrate over from st0x.issuance andevent-sorceryfrom st0x.liquiditySummary by CodeRabbit