Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions migrations/V029__fix_message_sequences.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Fix broken sequences (many rows stuck at sequence=0) by reassigning
-- using rowid (insertion order) as ground truth.
UPDATE messages SET sequence = (
SELECT sub.correct_seq FROM (
SELECT id, (ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY rowid)) - 1 AS correct_seq
FROM messages
) sub
WHERE sub.id = messages.id
);
119 changes: 24 additions & 95 deletions orbitdock-server/crates/connector-claude/src/lib.rs

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions orbitdock-server/crates/connector-core/src/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,8 @@ pub fn transition(
});

if !is_dup {
// Assign sequence from rows vec, not the inflatable counter
if entry.sequence == 0 {
entry.sequence = state.rows.last().map(|r| r.sequence + 1).unwrap_or(0);
}
// In-memory sequence for live ordering; DB assigns authoritative sequence on persist.
entry.sequence = state.rows.last().map(|r| r.sequence + 1).unwrap_or(0);
state.rows.push(entry.clone());
state.total_row_count = entry.sequence + 1;
state.last_activity_at = Some(now.to_string());
Expand Down
1 change: 1 addition & 0 deletions orbitdock-server/crates/server/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ pub async fn run_server(options: ServerRunOptions) -> anyhow::Result<()> {
.send(crate::infrastructure::persistence::PersistCommand::RowAppend {
session_id: session_id.clone(),
entry: entry.clone(),
sequence_tx: None,
})
.await;
}
Expand Down
12 changes: 12 additions & 0 deletions orbitdock-server/crates/server/src/domain/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,18 @@ impl SessionHandle {
&self.rows
}

/// Update a row's sequence to the DB-assigned value (single source of truth).
pub fn set_row_sequence(&mut self, row_id: &str, sequence: u64) {
if let Some(entry) = self.rows.iter_mut().find(|e| e.id() == row_id) {
entry.sequence = sequence;
}
}

/// Look up a row by ID.
pub fn row_by_id(&self, row_id: &str) -> Option<&ConversationRowEntry> {
self.rows.iter().find(|e| e.id() == row_id)
}

/// Get first prompt
#[allow(dead_code)]
pub fn first_prompt(&self) -> Option<&str> {
Expand Down
16 changes: 10 additions & 6 deletions orbitdock-server/crates/server/src/domain/sessions/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ pub fn persist_op_to_command(op: PersistOp) -> PersistCommand {
last_activity_at,
},
PersistOp::SessionEnd { id, reason } => PersistCommand::SessionEnd { id, reason },
PersistOp::RowAppend { session_id, entry } => {
PersistCommand::RowAppend { session_id, entry }
}
PersistOp::RowUpsert { session_id, entry } => {
PersistCommand::RowUpsert { session_id, entry }
}
PersistOp::RowAppend { session_id, entry } => PersistCommand::RowAppend {
session_id,
entry,
sequence_tx: None,
},
PersistOp::RowUpsert { session_id, entry } => PersistCommand::RowUpsert {
session_id,
entry,
sequence_tx: None,
},
PersistOp::TokensUpdate {
session_id,
usage,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde_json::Value;
use tokio::sync::oneshot;

use orbitdock_protocol::conversation_contracts::ConversationRowEntry;
use orbitdock_protocol::{
Expand All @@ -7,7 +8,7 @@ use orbitdock_protocol::{
};

/// Commands that can be persisted.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum PersistCommand {
/// Create a new session
SessionCreate {
Expand Down Expand Up @@ -45,12 +46,16 @@ pub enum PersistCommand {
RowAppend {
session_id: String,
entry: ConversationRowEntry,
/// When set, the DB-assigned sequence is sent back after INSERT.
sequence_tx: Option<oneshot::Sender<u64>>,
},

/// Upsert a conversation row (update existing or insert new)
RowUpsert {
session_id: String,
entry: ConversationRowEntry,
/// When set, the DB-assigned sequence is sent back after INSERT/UPDATE.
sequence_tx: Option<oneshot::Sender<u64>>,
},

/// Update token usage
Expand Down Expand Up @@ -437,3 +442,20 @@ pub enum PersistCommand {
completed_at: Option<Option<String>>,
},
}

impl PersistCommand {
/// Returns true if this command has a response channel that the caller is awaiting.
/// The writer should flush immediately when any batched command needs a response.
pub fn has_response_channel(&self) -> bool {
matches!(
self,
PersistCommand::RowAppend {
sequence_tx: Some(_),
..
} | PersistCommand::RowUpsert {
sequence_tx: Some(_),
..
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,30 +329,45 @@ pub(super) fn execute_command(
)?;
}

PersistCommand::RowAppend { session_id, entry } => {
PersistCommand::RowAppend {
session_id,
entry,
sequence_tx,
} => {
let row_id = entry.id().to_string();
let row_type = row_type_str(&entry.row);
let seq = entry.sequence as i64;
let row_data = serde_json::to_string(&entry.row).unwrap_or_else(|_| "{}".to_string());

// Extract content for last_message updates
let content_text = extract_row_content(&entry.row);
let is_user = matches!(&entry.row, ConversationRow::User(_));

// DB computes sequence as MAX(sequence)+1 — single source of truth.
conn.execute(
"INSERT OR IGNORE INTO messages (id, session_id, type, content, timestamp, sequence, row_data)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
VALUES (?1, ?2, ?3, ?4, ?5,
COALESCE((SELECT MAX(sequence) + 1 FROM messages WHERE session_id = ?2), 0),
?6)",
params![
row_id,
session_id,
row_type,
content_text.as_deref().unwrap_or(""),
chrono_now(),
seq,
row_data,
],
)?;

// Read back DB-assigned sequence and send to caller if requested.
if let Some(tx) = sequence_tx {
let db_seq: i64 = conn.query_row(
"SELECT sequence FROM messages WHERE id = ?1",
params![row_id],
|row| row.get(0),
)?;
let _ = tx.send(db_seq as u64);
}

// Update last_message for dashboard context lines (user + assistant only)
if matches!(
&entry.row,
Expand All @@ -376,31 +391,45 @@ pub(super) fn execute_command(
}
}

PersistCommand::RowUpsert { session_id, entry } => {
PersistCommand::RowUpsert {
session_id,
entry,
sequence_tx,
} => {
let row_id = entry.id().to_string();
let row_type = row_type_str(&entry.row);
let seq = entry.sequence as i64;
let row_data = serde_json::to_string(&entry.row).unwrap_or_else(|_| "{}".to_string());
let content_text = extract_row_content(&entry.row);

// DB computes sequence on insert; ON CONFLICT preserves original ordering.
conn.execute(
"INSERT INTO messages (id, session_id, type, content, timestamp, sequence, row_data)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
VALUES (?1, ?2, ?3, ?4, ?5,
COALESCE((SELECT MAX(sequence) + 1 FROM messages WHERE session_id = ?2), 0),
?6)
ON CONFLICT(id) DO UPDATE SET
content = excluded.content,
row_data = excluded.row_data,
sequence = excluded.sequence",
row_data = excluded.row_data",
params![
row_id,
session_id,
row_type,
content_text.as_deref().unwrap_or(""),
chrono_now(),
seq,
row_data,
],
)?;

// Read back DB-assigned sequence and send to caller if requested.
if let Some(tx) = sequence_tx {
let db_seq: i64 = conn.query_row(
"SELECT sequence FROM messages WHERE id = ?1",
params![row_id],
|row| row.get(0),
)?;
let _ = tx.send(db_seq as u64);
}

// Update last_message for completed user/assistant rows
if matches!(
&entry.row,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ fn row_append_stores_correct_sequence() {
let batch = vec![
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("row-0", 0),
},
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: assistant_entry("row-1", 1),
},
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("row-2", 2),
},
];
Expand All @@ -116,21 +119,21 @@ fn row_append_stores_correct_sequence() {
}

#[test]
fn row_append_with_zero_sequence_stores_zero() {
// This test documents that RowAppend is a dumb write — it persists
// whatever sequence it receives. The caller (AddRowAndBroadcast handler)
// is responsible for assigning the correct sequence BEFORE sending.
fn row_append_with_zero_sequence_gets_db_assigned_sequence() {
// DB computes MAX(sequence)+1 — callers no longer need to assign sequences.
let (conn, db_path, _dir) = setup_test_db();
drop(conn);

let batch = vec![
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("row-a", 0),
},
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
entry: user_entry("row-b", 0), // Bad: duplicate sequence=0
sequence_tx: None,
entry: user_entry("row-b", 0),
},
];

Expand All @@ -139,28 +142,29 @@ fn row_append_with_zero_sequence_stores_zero() {
let conn = Connection::open(&db_path).unwrap();
let rows = load_messages_from_db(&conn, "test-session").unwrap();

// Both rows are stored, both with sequence=0 — this is why callers
// MUST go through the actor (AddRowAndBroadcast) to get correct sequences.
// DB assigns contiguous sequences regardless of the app-provided value.
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].sequence, 0);
assert_eq!(rows[1].sequence, 0);
assert_eq!(rows[1].sequence, 1);
}

#[test]
fn row_upsert_updates_sequence_on_conflict() {
fn row_upsert_preserves_original_sequence_on_conflict() {
let (conn, db_path, _dir) = setup_test_db();
drop(conn);

// First: insert a row with sequence=0 (the old buggy behavior)
// First: insert a row — DB assigns sequence=0
let batch = vec![PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("row-0", 0),
}];
super::writer::flush_batch_for_test(&db_path, batch).unwrap();

// Then: upsert the same row with the correct sequence=5
// Then: upsert the same row — sequence should be preserved (not overwritten)
let batch = vec![PersistCommand::RowUpsert {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("row-0", 5),
}];
super::writer::flush_batch_for_test(&db_path, batch).unwrap();
Expand All @@ -169,10 +173,10 @@ fn row_upsert_updates_sequence_on_conflict() {
let rows = load_messages_from_db(&conn, "test-session").unwrap();

assert_eq!(rows.len(), 1);
// The sequence should be corrected from 0 to 5
// ON CONFLICT preserves the original DB-assigned sequence
assert_eq!(
rows[0].sequence, 5,
"RowUpsert must update the sequence column"
rows[0].sequence, 0,
"RowUpsert must preserve original sequence on conflict"
);
}

Expand All @@ -183,6 +187,7 @@ fn row_upsert_inserts_when_not_existing() {

let batch = vec![PersistCommand::RowUpsert {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("new-row", 3),
}];
super::writer::flush_batch_for_test(&db_path, batch).unwrap();
Expand All @@ -192,7 +197,8 @@ fn row_upsert_inserts_when_not_existing() {

assert_eq!(rows.len(), 1);
assert_eq!(rows[0].id(), "new-row");
assert_eq!(rows[0].sequence, 3);
// DB computes sequence as MAX+1 (0 for first row), ignoring app-provided value
assert_eq!(rows[0].sequence, 0);
}

#[test]
Expand All @@ -204,6 +210,7 @@ fn batch_of_appends_preserves_insertion_order() {
let batch: Vec<PersistCommand> = (0..10)
.map(|i| PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: assistant_entry(&format!("msg-{i}"), i as u64),
})
.collect();
Expand Down Expand Up @@ -238,10 +245,12 @@ fn row_append_ignore_deduplicates_by_id() {
let batch = vec![
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("dup-id", 0),
},
PersistCommand::RowAppend {
session_id: "test-session".to_string(),
sequence_tx: None,
entry: user_entry("dup-id", 5), // Same id, different sequence
},
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ impl PersistenceWriter {
loop {
tokio::select! {
Some(cmd) = self.rx.recv() => {
let needs_flush_now = cmd.has_response_channel();
self.batch.push(cmd);

if self.batch.len() >= self.batch_size {
if needs_flush_now || self.batch.len() >= self.batch_size {
self.flush().await;
}
}
Expand Down
Loading
Loading