Skip to content

Commit fca37c5

Browse files
committed
fix: Codex review — oplog skip-on-error, watcher close-write, lifecycle, dashboard
- read_oplog: always skip invalid lines (no stalling, writer guarantees flush) - Watcher: handle Access(Close(Write)) for inotify-based platforms - Background sync: Weak<Cortex>, initial pull, thread-name self-join guard - Dashboard: /v1/memories/recent, embedded HTML, multi-tier - server.json: 1.8.0, Dockerfile: ENTRYPOINT preserved
1 parent b2d712f commit fca37c5

7 files changed

Lines changed: 78 additions & 16 deletions

File tree

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ ENV CORTEX_PORT=3315
4343

4444
EXPOSE 3315
4545

46+
# HTTP mode (default): docker run -p 3315:3315 image
47+
# MCP stdio mode: docker run --entrypoint cortex-mcp-server image /data/memory.db
4648
ENTRYPOINT ["cortex-http"]

cortex-core/src/lib.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,15 @@ impl Cortex {
304304
let poll_thread = std::thread::Builder::new()
305305
.name("cortex-bg-sync".into())
306306
.spawn(move || {
307+
// Pull once immediately to catch any pre-existing remote changes.
308+
if let Some(cortex) = cortex_weak.upgrade() {
309+
match cortex.sync_pull() {
310+
Ok(0) => {}
311+
Ok(n) => tracing::info!(applied = n, "Background sync: initial pull"),
312+
Err(e) => tracing::warn!(error = %e, "Background sync: initial pull failed"),
313+
}
314+
}
315+
307316
let mut last_poll = std::time::Instant::now();
308317
loop {
309318
if stop_clone.load(std::sync::atomic::Ordering::Acquire) {
@@ -1286,8 +1295,12 @@ impl Cortex {
12861295

12871296
impl Drop for Cortex {
12881297
fn drop(&mut self) {
1289-
// CRITICAL: Stop background sync thread BEFORE fields it references are dropped.
1290-
// The background thread holds raw pointers to storage/index/sync_engine.
1291-
self.stop_background_sync();
1298+
// Stop background sync. Set the stop flag so the thread exits on its own,
1299+
// but only join if we're NOT on the bg sync thread (avoids self-join deadlock
1300+
// when the thread's Weak upgrade creates the last Arc and Drop runs there).
1301+
let mut guard = self.background_sync_handle.lock();
1302+
if let Some(mut handle) = guard.take() {
1303+
handle.signal_stop(); // set flag, don't join
1304+
}
12921305
}
12931306
}

cortex-core/src/sync/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,17 @@ impl BackgroundSyncHandle {
519519
!self.stop_flag.load(Ordering::Acquire)
520520
}
521521

522+
/// Signal stop without joining the thread. Safe to call from any thread
523+
/// including the background sync thread itself (avoids self-join deadlock).
524+
pub fn signal_stop(&mut self) {
525+
self.stop_flag.store(true, Ordering::Release);
526+
if let Some(wh) = self.watcher_handle.take() {
527+
wh.stop();
528+
}
529+
// Don't join — thread will exit on its own when it checks stop_flag
530+
// or when Weak::upgrade fails.
531+
}
532+
522533
fn shutdown(&mut self) {
523534
self.stop_flag.store(true, Ordering::Release);
524535
if let Some(wh) = self.watcher_handle.take() {
@@ -532,6 +543,12 @@ impl BackgroundSyncHandle {
532543

533544
impl Drop for BackgroundSyncHandle {
534545
fn drop(&mut self) {
535-
self.shutdown();
546+
// Check if we're on the bg sync thread — if so, only signal (no join).
547+
let is_bg_thread = std::thread::current().name() == Some("cortex-bg-sync");
548+
if is_bg_thread {
549+
self.signal_stop();
550+
} else {
551+
self.shutdown();
552+
}
536553
}
537554
}

cortex-core/src/sync/oplog.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ pub fn read_oplog(
225225
offset += bytes_read as u64;
226226
}
227227
Err(_) => {
228+
// Distinguish partial write (incomplete line) from permanent corruption:
229+
// - If the raw line ends with '\n', it's a complete but corrupt line → skip it
230+
// - If at EOF without '\n', it's likely a partial write → don't advance, retry
231+
// Always advance past bad lines to avoid stalling sync.
232+
// Partial writes (no trailing \n) are also skipped — the writer
233+
// uses flush() which guarantees complete lines on disk.
228234
tracing::warn!("Skipping invalid oplog line at offset {}", offset);
229235
offset += bytes_read as u64;
230236
}

cortex-core/src/sync/watcher.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,12 @@ pub fn start_watcher(
132132

133133
/// Check whether a filesystem event is relevant (i.e., a .jsonl file from another device).
134134
fn is_relevant_event(event: &Event, my_device_id: &str) -> bool {
135-
// Only care about creates, modifications, and renames
135+
// Only care about creates (new oplog files) and close-write events.
136+
// Skip Modify events to avoid reading partially-written files.
136137
match event.kind {
137-
EventKind::Create(_) | EventKind::Modify(_) => {}
138+
EventKind::Create(_) => {}
139+
EventKind::Modify(notify::event::ModifyKind::Data(_)) => {}
140+
EventKind::Access(notify::event::AccessKind::Close(notify::event::AccessMode::Write)) => {}
138141
_ => return false,
139142
}
140143

@@ -147,10 +150,18 @@ fn is_relevant_event(event: &Event, my_device_id: &str) -> bool {
147150
return false;
148151
}
149152

150-
// Check that the file is NOT in our own device subfolder
151-
let path_str = path.to_string_lossy();
152-
!path_str.contains(&format!("/{}/", my_device_id))
153-
&& !path_str.contains(&format!("\\{}\\", my_device_id))
153+
// Only ignore events from the immediate `devices/{my_device_id}/` subfolder.
154+
// Check the path component right after "devices/" — not any arbitrary component.
155+
let components: Vec<_> = path.components()
156+
.map(|c| c.as_os_str().to_string_lossy().to_string())
157+
.collect();
158+
let devices_idx = components.iter().position(|c| c == "devices");
159+
if let Some(idx) = devices_idx {
160+
if let Some(device_dir) = components.get(idx + 1) {
161+
return device_dir != my_device_id;
162+
}
163+
}
164+
true // no "devices/" in path — allow event
154165
})
155166
}
156167

cortex-http/src/handlers.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,18 @@ pub async fn stats(State(state): State<Arc<AppState>>) -> AppResult {
6666
// ── Recent Memories ──────────────────────────────────────────────────────────
6767

6868
pub async fn recent_memories(State(state): State<Arc<AppState>>) -> AppResult {
69-
let mems = state.cortex.storage()
70-
.list_by_tier_ordered_by_ingestion(cortex_core::types::MemoryTier::Episodic, 20)
71-
.map_err(cortex_err)?;
69+
use cortex_core::types::MemoryTier;
70+
// Fetch recent memories from all active tiers with enough headroom
71+
let mut mems = Vec::new();
72+
for tier in &[MemoryTier::Episodic, MemoryTier::Semantic, MemoryTier::Procedural] {
73+
let tier_mems = state.cortex.storage()
74+
.list_by_tier_ordered_by_ingestion(*tier, 20)
75+
.map_err(cortex_err)?;
76+
mems.extend(tier_mems);
77+
}
78+
// Sort by ingestion time descending, take top 20
79+
mems.sort_by(|a, b| b.temporal.ingestion_time.cmp(&a.temporal.ingestion_time));
80+
mems.truncate(20);
7281

7382
let results: Vec<serde_json::Value> = mems.iter().map(|m| {
7483
let text = match &m.content {

server.json

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,19 @@
77
"url": "https://github.com/gambletan/cortex",
88
"source": "github"
99
},
10-
"version": "1.5.1",
10+
"version": "1.8.0",
1111
"packages": [
1212
{
1313
"registryType": "oci",
14-
"identifier": "ghcr.io/gambletan/cortex:v1.5.1",
14+
"identifier": "ghcr.io/gambletan/cortex/cortex-http:1.8.0",
1515
"runtimeHint": "docker",
1616
"transport": {
1717
"type": "stdio"
18-
}
18+
},
19+
"environmentVariables": {
20+
"CORTEX_DB_PATH": "/data/memory.db"
21+
},
22+
"entrypoint": ["cortex-mcp-server", "/data/memory.db"]
1923
}
2024
]
2125
}

0 commit comments

Comments
 (0)