Skip to content

Commit a1b817b

Browse files
committed
feat: Server-side capture storage via PostgreSQL (replaces local filesystem)
Captures are now stored in a `session_captures` DB table instead of local JSONL files. All operations go through HTTP — no filesystem dependency on the server. New/updated endpoints: - POST /api/captures — batch-store capture entries - POST /api/captures/entry — store a single capture entry - GET /api/captures/status — buffer status from DB - POST /api/captures/drain — drain staged captures into memories Stack: - ops/migrate_session_captures.sql — table with RLS + indexes - ICaptureStore interface + PostgresKnowledgeGraphStore impl - KnowledgeGraphService.DrainCapturesAsync with tool-type grouping - SessionCapture model https://claude.ai/code/session_01M5GUmiM8WehUBUvKWiWgcJ
1 parent 8c52225 commit a1b817b

6 files changed

Lines changed: 532 additions & 88 deletions

File tree

SerialMemory.Api/Program.cs

Lines changed: 81 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,41 +1619,78 @@ await kgService.IngestMemoryAsync(
16191619
});
16201620

16211621
// ============================================
1622-
// AUTO-CAPTURE API ENDPOINTS
1622+
// AUTO-CAPTURE API ENDPOINTS (server-side DB storage)
16231623
// ============================================
16241624

1625-
var captureSessionDir = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile), ".cc-serialmemory", "sessions");
1626-
1627-
// Check capture buffer status
1628-
app.MapGet("/api/captures/status", () =>
1625+
// POST capture entries to the server (replaces local JSONL file writes)
1626+
app.MapPost("/api/captures", async (CaptureIngestRequest request, KnowledgeGraphService kgService) =>
16291627
{
1630-
if (!Directory.Exists(captureSessionDir))
1631-
return Results.Ok(new { files = 0, totalEntries = 0, message = "No capture directory found" });
1628+
try
1629+
{
1630+
if (request.Entries == null || request.Entries.Count == 0)
1631+
return Results.BadRequest(new { error = "At least one entry is required" });
1632+
1633+
if (request.Entries.Count > 1000)
1634+
return Results.BadRequest(new { error = "Maximum 1000 entries per request" });
16321635

1633-
var files = Directory.GetFiles(captureSessionDir, "*.jsonl")
1634-
.Where(f => !f.EndsWith(".drained"))
1635-
.OrderByDescending(File.GetLastWriteTimeUtc)
1636-
.Select(f =>
1636+
var captures = request.Entries.Select(e => new SessionCapture
16371637
{
1638-
var lineCount = File.ReadLines(f).Count(l => !string.IsNullOrWhiteSpace(l));
1639-
return new
1640-
{
1641-
name = Path.GetFileName(f),
1642-
entries = lineCount,
1643-
lastModified = File.GetLastWriteTimeUtc(f)
1644-
};
1645-
})
1646-
.ToList();
1638+
SessionId = request.SessionId ?? e.SessionId,
1639+
Ts = e.Ts ?? DateTime.UtcNow,
1640+
Tool = e.Tool,
1641+
File = e.File,
1642+
Result = e.Result,
1643+
RawJson = e.RawJson
1644+
}).ToList();
16471645

1648-
return Results.Ok(new
1646+
var count = await kgService.StoreCapturesBatchAsync(captures);
1647+
return Results.Ok(new { stored = count, sessionId = request.SessionId });
1648+
}
1649+
catch (Exception ex)
16491650
{
1650-
files = files.Count,
1651-
totalEntries = files.Sum(f => f.entries),
1652-
captures = files
1653-
});
1651+
return Results.Problem(detail: ex.Message, statusCode: 500);
1652+
}
1653+
});
1654+
1655+
// POST a single capture entry (convenience endpoint)
1656+
app.MapPost("/api/captures/entry", async (CaptureEntryRequest entry, KnowledgeGraphService kgService) =>
1657+
{
1658+
try
1659+
{
1660+
var capture = new SessionCapture
1661+
{
1662+
SessionId = entry.SessionId,
1663+
Ts = entry.Ts ?? DateTime.UtcNow,
1664+
Tool = entry.Tool,
1665+
File = entry.File,
1666+
Result = entry.Result,
1667+
RawJson = entry.RawJson
1668+
};
1669+
1670+
var id = await kgService.StoreCaptureAsync(capture);
1671+
return Results.Ok(new { id, stored = true });
1672+
}
1673+
catch (Exception ex)
1674+
{
1675+
return Results.Problem(detail: ex.Message, statusCode: 500);
1676+
}
1677+
});
1678+
1679+
// Check capture buffer status (reads from DB)
1680+
app.MapGet("/api/captures/status", async (KnowledgeGraphService kgService) =>
1681+
{
1682+
try
1683+
{
1684+
var status = await kgService.GetCaptureStatusAsync();
1685+
return Results.Ok(status);
1686+
}
1687+
catch (Exception ex)
1688+
{
1689+
return Results.Problem(detail: ex.Message, statusCode: 500);
1690+
}
16541691
});
16551692

1656-
// Drain captured JSONL session entries into memories
1693+
// Drain staged captures into memories (reads from DB, marks drained)
16571694
app.MapPost("/api/captures/drain", async (
16581695
string? session_id,
16591696
int? max_entries,
@@ -1662,68 +1699,12 @@ await kgService.IngestMemoryAsync(
16621699
{
16631700
try
16641701
{
1665-
if (!Directory.Exists(captureSessionDir))
1666-
return Results.Ok(new { entriesProcessed = 0, memoriesCreated = 0, message = "No capture directory found" });
1667-
1668-
// Find session log file
1669-
string? logFile = null;
1670-
if (!string.IsNullOrEmpty(session_id))
1671-
{
1672-
var specific = Path.Combine(captureSessionDir, $"{session_id}.jsonl");
1673-
if (File.Exists(specific)) logFile = specific;
1674-
}
1675-
logFile ??= Directory.GetFiles(captureSessionDir, "*.jsonl")
1676-
.Where(f => !f.EndsWith(".drained"))
1677-
.OrderByDescending(File.GetLastWriteTimeUtc)
1678-
.FirstOrDefault();
1679-
1680-
if (logFile == null)
1681-
return Results.Ok(new { entriesProcessed = 0, memoriesCreated = 0, message = "No active capture file found" });
1682-
1683-
var maxCount = Math.Clamp(max_entries ?? 500, 1, 5000);
1684-
var lines = File.ReadLines(logFile)
1685-
.Where(l => !string.IsNullOrWhiteSpace(l))
1686-
.Take(maxCount)
1687-
.ToList();
1688-
1689-
if (dry_run == true)
1690-
return Results.Ok(new { entriesFound = lines.Count, dryRun = true, file = Path.GetFileName(logFile) });
1702+
var result = await kgService.DrainCapturesAsync(
1703+
sessionId: session_id,
1704+
maxEntries: max_entries ?? 500,
1705+
dryRun: dry_run ?? false);
16911706

1692-
var memoriesCreated = 0;
1693-
var errors = 0;
1694-
1695-
// Batch lines and ingest as memories
1696-
foreach (var chunk in lines.Chunk(10))
1697-
{
1698-
try
1699-
{
1700-
var content = string.Join("\n", chunk);
1701-
await kgService.IngestMemoryAsync(
1702-
content: content,
1703-
source: "auto-capture",
1704-
metadata: new Dictionary<string, object>
1705-
{
1706-
["memory_type"] = "auto_capture",
1707-
["entry_count"] = chunk.Length
1708-
},
1709-
extractEntities: true,
1710-
dedupMode: "off",
1711-
memoryType: "auto_capture");
1712-
memoriesCreated++;
1713-
}
1714-
catch { errors++; }
1715-
}
1716-
1717-
// Rename to .drained
1718-
try { File.Move(logFile, logFile + ".drained", overwrite: true); } catch { }
1719-
1720-
return Results.Ok(new
1721-
{
1722-
entriesProcessed = lines.Count,
1723-
memoriesCreated,
1724-
errors,
1725-
file = Path.GetFileName(logFile)
1726-
});
1707+
return Results.Ok(result);
17271708
}
17281709
catch (Exception ex)
17291710
{
@@ -6889,6 +6870,19 @@ internal record ContextSummarizeRequest(
68896870
int? MaxMemories = 50,
68906871
bool? StoreSummary = true);
68916872

6873+
// Capture DTOs (server-side DB storage)
6874+
internal record CaptureIngestRequest(
6875+
string? SessionId,
6876+
List<CaptureEntryRequest> Entries);
6877+
6878+
internal record CaptureEntryRequest(
6879+
string? SessionId = null,
6880+
DateTime? Ts = null,
6881+
string? Tool = null,
6882+
string? File = null,
6883+
string? Result = null,
6884+
Dictionary<string, object>? RawJson = null);
6885+
68926886
// Mutation state holder for tracking pause/resume state (thread-safe)
68936887
internal class MutationStateHolder
68946888
{

SerialMemory.Core/Interfaces/IKnowledgeGraphStore.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,45 @@ public interface IWorkspaceStore
101101
Task<WorkspaceSnapshot?> GetSnapshotByNameAsync(string workspaceId, string snapshotName, CancellationToken ct = default);
102102
}
103103

104+
/// <summary>
105+
/// Server-side session capture staging operations.
106+
/// Captures are received over HTTP, stored in DB, then drained into memories.
107+
/// </summary>
108+
public interface ICaptureStore
109+
{
110+
Task<Guid> InsertCaptureAsync(SessionCapture capture, CancellationToken ct = default);
111+
Task<int> InsertCapturesBatchAsync(List<SessionCapture> captures, CancellationToken ct = default);
112+
Task<List<SessionCapture>> GetUndrainedCapturesAsync(string? sessionId = null, int limit = 5000, CancellationToken ct = default);
113+
Task<int> MarkCapturesDrainedAsync(List<Guid> captureIds, CancellationToken ct = default);
114+
Task<CaptureStatusResult> GetCaptureStatusAsync(CancellationToken ct = default);
115+
}
116+
117+
/// <summary>
118+
/// Capture buffer status summary.
119+
/// </summary>
120+
public class CaptureStatusResult
121+
{
122+
public int TotalUndrained { get; set; }
123+
public int TotalDrained { get; set; }
124+
public List<CaptureSessionSummary> Sessions { get; set; } = [];
125+
}
126+
127+
/// <summary>
128+
/// Per-session capture summary.
129+
/// </summary>
130+
public class CaptureSessionSummary
131+
{
132+
public string? SessionId { get; set; }
133+
public int EntryCount { get; set; }
134+
public DateTime? FirstTs { get; set; }
135+
public DateTime? LastTs { get; set; }
136+
}
137+
104138
/// <summary>
105139
/// Composite repository interface for knowledge graph operations.
106140
/// Inherits from focused sub-interfaces for backward compatibility.
107141
/// </summary>
108-
public interface IKnowledgeGraphStore : IMemoryStore, IEntityStore, IRelationshipStore, IUserProfileStore, ISessionStore, IStatisticsStore, IWorkspaceStore
142+
public interface IKnowledgeGraphStore : IMemoryStore, IEntityStore, IRelationshipStore, IUserProfileStore, ISessionStore, IStatisticsStore, IWorkspaceStore, ICaptureStore
109143
{
110144
Task<IAsyncDisposable> BeginUnitOfWorkAsync(CancellationToken cancellationToken = default);
111145
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace SerialMemory.Core.Models;
2+
3+
/// <summary>
4+
/// A staged capture entry received over HTTP.
5+
/// Stored server-side in session_captures table, drained into memories.
6+
/// </summary>
7+
public class SessionCapture
8+
{
9+
public Guid Id { get; set; }
10+
public string? SessionId { get; set; }
11+
public DateTime Ts { get; set; } = DateTime.UtcNow;
12+
public string? Tool { get; set; }
13+
public string? File { get; set; }
14+
public string? Result { get; set; }
15+
public Dictionary<string, object>? RawJson { get; set; }
16+
public bool Drained { get; set; }
17+
public DateTime? DrainedAt { get; set; }
18+
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
19+
}

0 commit comments

Comments
 (0)