Skip to content

Commit d291611

Browse files
committed
Make storage append only, and add concept of sessions
1 parent 6f487ad commit d291611

4 files changed

Lines changed: 335 additions & 106 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ toml = "0.9.8"
2323
reqwest = { version = "0.13", default-features = false, features = ["json", "rustls"] }
2424
regex-lite = "0.1"
2525
serde_json = "1.0"
26+
# UUID v4 for session identifiers
27+
uuid = { version = "1", features = ["v4"] }
2628
# Date for today + recent daily notes (YYYYMMDD)
2729

2830
# IANA timezone support for local-time system prompt line (DST-aware)

src/agent/session.rs

Lines changed: 75 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,35 @@ impl From<DbError> for SessionError {
4444

4545
/// In-memory session: history and optional summary. Cap history at MAX_HISTORY.
4646
/// Backed by the `chat_history` and `chat_summary` tables in `BrainDb`.
47+
///
48+
/// `pending_inserts` tracks messages added since the last `save()`. Only those
49+
/// are written to the database on the next save (append-only storage).
4750
#[derive(Debug, Clone)]
4851
pub struct Session {
4952
history: Vec<Message>,
53+
pending_inserts: Vec<Message>,
5054
summary: String,
5155
chat_id: String,
56+
session_id: String,
5257
db: Arc<BrainDb>,
5358
}
5459

5560
impl Session {
56-
/// Load session from the database; missing chat_id → empty session.
61+
/// Load session from the database; missing chat_id → empty session with a fresh session_id.
5762
pub async fn load(db: Arc<BrainDb>, chat_id: &str) -> Result<Self, SessionError> {
5863
let chat_id = chat_id.to_string();
5964
let db_clone = Arc::clone(&db);
6065
let chat_id_clone = chat_id.clone();
6166

62-
let (stored, summary) =
63-
tokio::task::spawn_blocking(move || db_clone.load_session(&chat_id_clone))
64-
.await
65-
.map_err(|e| SessionError::Db(format!("spawn_blocking: {e}")))?
66-
.map_err(SessionError::from)?;
67+
// Fetch (or create) the active session UUID and the messages for that session.
68+
let (session_id, stored, summary) = tokio::task::spawn_blocking(move || {
69+
let session_id = db_clone.get_or_create_session_id(&chat_id_clone)?;
70+
let (stored, summary) = db_clone.load_session(&chat_id_clone, &session_id)?;
71+
Ok::<_, crate::memory::db::DbError>((session_id, stored, summary))
72+
})
73+
.await
74+
.map_err(|e| SessionError::Db(format!("spawn_blocking: {e}")))?
75+
.map_err(SessionError::from)?;
6776

6877
let history = stored
6978
.into_iter()
@@ -72,64 +81,81 @@ impl Session {
7281

7382
let mut session = Self {
7483
history,
84+
pending_inserts: Vec::new(),
7585
summary,
7686
chat_id,
87+
session_id,
7788
db,
7889
};
79-
// Enforce cap in case the DB somehow has more than MAX_HISTORY rows.
90+
// Enforce cap in case the DB has more than MAX_HISTORY rows.
8091
session.cap_history();
8192
Ok(session)
8293
}
8394

84-
/// Persist the session (history + summary) to the database.
85-
pub async fn save(&self) -> Result<(), SessionError> {
95+
/// Persist only the new messages (since the last save) to the database, then
96+
/// clear the pending queue. Append-only: previous messages are never deleted.
97+
pub async fn save(&mut self) -> Result<(), SessionError> {
98+
if self.pending_inserts.is_empty() && self.summary.is_empty() {
99+
return Ok(());
100+
}
101+
86102
let stored: Vec<StoredMessage> = self
87-
.history
103+
.pending_inserts
88104
.iter()
89105
.map(message_to_stored)
90106
.collect::<Result<Vec<_>, _>>()?;
91107

92108
let chat_id = self.chat_id.clone();
109+
let session_id = self.session_id.clone();
93110
let summary = self.summary.clone();
94111
let db = Arc::clone(&self.db);
95112

96-
tokio::task::spawn_blocking(move || db.save_session(&chat_id, &stored, &summary))
113+
tokio::task::spawn_blocking(move || db.append_session(&chat_id, &session_id, &stored, &summary))
97114
.await
98115
.map_err(|e| SessionError::Db(format!("spawn_blocking: {e}")))?
99-
.map_err(SessionError::from)
116+
.map_err(SessionError::from)?;
117+
118+
self.pending_inserts.clear();
119+
Ok(())
100120
}
101121

102122
// -----------------------------------------------------------------------
103123
// Mutation helpers
104124
// -----------------------------------------------------------------------
105125

106126
pub fn add_user_message(&mut self, content: &str) {
107-
self.history.push(Message {
127+
let msg = Message {
108128
role: Role::User,
109129
content: content.to_string(),
110130
tool_call_id: None,
111131
tool_calls: None,
112-
});
132+
};
133+
self.pending_inserts.push(msg.clone());
134+
self.history.push(msg);
113135
self.cap_history();
114136
}
115137

116138
pub fn add_assistant_message(&mut self, content: &str, tool_calls: Option<Vec<ToolCall>>) {
117-
self.history.push(Message {
139+
let msg = Message {
118140
role: Role::Assistant,
119141
content: content.to_string(),
120142
tool_call_id: None,
121143
tool_calls,
122-
});
144+
};
145+
self.pending_inserts.push(msg.clone());
146+
self.history.push(msg);
123147
self.cap_history();
124148
}
125149

126150
pub fn add_tool_message(&mut self, tool_call_id: &str, content: &str) {
127-
self.history.push(Message {
151+
let msg = Message {
128152
role: Role::Tool,
129153
content: content.to_string(),
130154
tool_call_id: Some(tool_call_id.to_string()),
131155
tool_calls: None,
132-
});
156+
};
157+
self.pending_inserts.push(msg.clone());
158+
self.history.push(msg);
133159
self.cap_history();
134160
}
135161

@@ -153,6 +179,11 @@ impl Session {
153179
&self.summary
154180
}
155181

182+
#[inline]
183+
pub fn session_id(&self) -> &str {
184+
&self.session_id
185+
}
186+
156187
pub fn set_summary(&mut self, s: String) {
157188
self.summary = s;
158189
}
@@ -265,18 +296,18 @@ mod tests {
265296
assert_eq!(loaded.summary(), "brief");
266297
}
267298

268-
// ── Overwrite on second save ──────────────────────────────────────────────
299+
// ── Append on second save ─────────────────────────────────────────────────
269300

270301
#[tokio::test]
271-
async fn session_save_overwrites() {
302+
async fn session_save_appends() {
272303
let (_tmp, db) = temp_db();
273304

274305
// First save
275306
let mut s1 = Session::load(Arc::clone(&db), "c").await.unwrap();
276307
s1.add_user_message("First");
277308
s1.save().await.unwrap();
278309

279-
// Second save with more messages
310+
// Second save with more messages (same session_id)
280311
let mut s2 = Session::load(Arc::clone(&db), "c").await.unwrap();
281312
assert_eq!(s2.history().len(), 1);
282313
s2.add_assistant_message("OK", None);
@@ -302,6 +333,27 @@ mod tests {
302333
assert_eq!(session.history().first().unwrap().content, "msg 5");
303334
}
304335

336+
// ── All pending inserts reach the DB even when history is capped ──────────
337+
338+
#[tokio::test]
339+
async fn session_all_pending_inserts_saved_to_db() {
340+
let (_tmp, db) = temp_db();
341+
let mut session = Session::load(Arc::clone(&db), "cap2").await.unwrap();
342+
for i in 0..55 {
343+
session.add_user_message(&format!("msg {}", i));
344+
}
345+
// In-memory history is capped at MAX_HISTORY (50)
346+
assert_eq!(session.history().len(), MAX_HISTORY);
347+
348+
// Save — all 55 pending inserts must go to the DB
349+
session.save().await.unwrap();
350+
351+
// Reload: DB has 55 rows, memory caps to 50; oldest in memory is msg 5
352+
let reloaded = Session::load(Arc::clone(&db), "cap2").await.unwrap();
353+
assert_eq!(reloaded.history().len(), MAX_HISTORY);
354+
assert_eq!(reloaded.history().first().unwrap().content, "msg 5");
355+
}
356+
305357
// ── Truncate history ──────────────────────────────────────────────────────
306358

307359
#[test]
@@ -310,8 +362,10 @@ mod tests {
310362
let db = Arc::new(BrainDb::open(tmp.path()).unwrap());
311363
let mut session = Session {
312364
history: Vec::new(),
365+
pending_inserts: Vec::new(),
313366
summary: String::new(),
314367
chat_id: "truncate".to_string(),
368+
session_id: "test-session".to_string(),
315369
db,
316370
};
317371

0 commit comments

Comments
 (0)