Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
boards/
config.yaml
test/integration/stub-worker/stub-worker
.superpowers/
.worktrees
7 changes: 6 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,18 @@ When in doubt: **"UI = human, case closed."**
cmd/contextmatrix/main.go → entrypoint, wires dependencies, starts server
internal/board/ → domain types: Card, ProjectConfig, StateMachine
internal/storage/ → Store interface + FilesystemStore implementation
internal/gitops/ → GitManager (commit, pull, push via go-git)
internal/gitops/ → GitManager (commit, pull, push via go-git) + async CommitQueue
internal/gitsync/ → background board-repo sync
internal/lock/ → agent claim/release/heartbeat + timeout checker
internal/service/ → CardService: orchestrates store, git, lock, events, state machine
internal/api/ → REST API handlers (stdlib http.ServeMux) + SSE endpoint
internal/mcp/ → MCP server: tools + prompts
internal/runner/ → webhook client for contextmatrix-runner
internal/chat/ → SQLite-backed chat session manager, SSE hub, runner-log bridge
internal/clock/ → injectable clock for service-layer time invariants
internal/events/ → in-process pub/sub event bus
internal/github/ → GitHub auth helpers shared across services
internal/refresh/ → knowledge-base refresh orchestration
internal/config/ → global config loading
internal/ctxlog/ → request_id context logger (WithRequestID / Logger)
internal/metrics/ → Prometheus metric vars + Register()
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ test:
go test ./cmd/... ./internal/...

test-race:
CGO_ENABLED=1 go test -race ./cmd/... ./internal/...
CGO_ENABLED=1 go test -race -count=1 ./internal/chat/... ./internal/api/... ./internal/mcp/...

fmt:
go fmt ./...
Expand Down
214 changes: 206 additions & 8 deletions cmd/contextmatrix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
githubauth "github.com/mhersson/contextmatrix-githubauth"

"github.com/mhersson/contextmatrix/internal/api"
"github.com/mhersson/contextmatrix/internal/chat"
chatsqlite "github.com/mhersson/contextmatrix/internal/chat/sqlite"
"github.com/mhersson/contextmatrix/internal/clock"
"github.com/mhersson/contextmatrix/internal/config"
"github.com/mhersson/contextmatrix/internal/events"
Expand Down Expand Up @@ -282,15 +284,142 @@ func main() {

runner.StartEndSessionSubscriber(ctx, bus, svc, runnerClient, slog.Default())
slog.Info("end-session subscriber started")
}

// Chat: SQLite store + manager + SSE hub + idle reaper + warm-idle grace timer.
chatStore, err := chatsqlite.Open(cfg.Chat.DBPath)
if err != nil {
slog.Error("failed to open chat store", "path", cfg.Chat.DBPath, "error", err)
cancel()
os.Exit(1) //nolint:gocritic // cancel called explicitly above
}
defer chatStore.Close()

slog.Info("chat store opened", "path", cfg.Chat.DBPath)

var chatRunner chat.RunnerClient
if cfg.Runner.Enabled {
chatRunner = chat.NewRunnerClient(chat.RunnerClientConfig{
BaseURL: cfg.Runner.URL,
HMACKey: cfg.Runner.APIKey,
MCPAPIKey: cfg.MCPAPIKey,
})
} else {
// Nil runner causes nil-pointer panics at call sites. Use a no-op stub
// that returns an error on every operation — chat features require runner.
chatRunner = chatRunnerDisabled{}
}

chatHub := chat.NewSSEHub(128)

chatMgr := chat.NewManager(chat.Config{
Store: chatStore,
Runner: chatRunner,
Clock: clock.Real(),
IdleTTL: cfg.Chat.IdleTTL,
MaxConcurrent: cfg.Chat.MaxConcurrent,
Hub: chatHub,
ResumeBudgetTokens: cfg.Chat.ResumeBudgetTokens,
RehydrationTimeout: cfg.Chat.RehydrationTimeout,
DefaultModel: cfg.Chat.DefaultModel,
ResolveRepoURL: func(rctx context.Context, project string) (string, error) {
p, err := svc.GetProject(rctx, project)
if err != nil {
return "", err
}

if p.Repo != "" {
return p.Repo, nil
}

repos := p.EffectiveRepos()
if len(repos) > 0 {
return repos[0].URL, nil
}

return "", nil
},
})
go chat.NewIdleReaper(chatMgr, time.Minute).Run(ctx)

// 30s grace timer: last subscriber drop → flip session to warm-idle.
// A new subscriber within 30s cancels the flip.
var graceTimers sync.Map // sessionID → *time.Timer

chatHub.OnLastUnsubscribe = func(sessionID string) {
if existing, ok := graceTimers.LoadAndDelete(sessionID); ok {
existing.(*time.Timer).Stop()
}

timer := time.AfterFunc(30*time.Second, func() {
// If the entry is still in the map it means no new subscriber
// arrived during the grace window — proceed with warm-idle.
if _, loaded := graceTimers.LoadAndDelete(sessionID); !loaded {
return
}

if err := chatMgr.MarkWarmIdle(ctx, sessionID); err != nil {
slog.Warn("chat: warm-idle transition failed", "session_id", sessionID, "error", err)
}
})
graceTimers.Store(sessionID, timer)
}
chatHub.OnSubscribe = func(sessionID string) {
if t, ok := graceTimers.LoadAndDelete(sessionID); ok {
t.(*time.Timer).Stop()
}
// A browser subscriber is a strong "I want this chat" signal.
// Reattach the runner-log consumer if one isn't already bridging
// /logs for this session — covers the case where CM restarted
// while runner containers stayed alive, stranding their consumer
// goroutines. No-op on cold/ending sessions.
if err := chatMgr.Reattach(ctx, sessionID); err != nil {
slog.Warn("chat: reattach on subscribe failed",
"session_id", sessionID, "error", err)
}
}

slog.Info("chat manager initialized", "idle_ttl", cfg.Chat.IdleTTL, "max_concurrent", cfg.Chat.MaxConcurrent)

// Resume runner-log consumers for sessions that survived a CM restart.
// Without this, active/warm-idle sessions stay marked alive in the DB
// while their consumer goroutines are gone (in-memory state lost), so
// the UI can't see runner output even though the container is still
// up. Reattach is idempotent and tolerant of dead containers — the
// consumer exits on first /logs error and the reconcile sweep below
// will flip orphaned sessions to cold.
go func() {
rctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

for _, status := range []chat.Status{chat.StatusActive, chat.StatusWarmIdle} {
sessions, err := chatMgr.ListSessions(rctx, chat.SessionFilter{Status: status})
if err != nil {
slog.Warn("chat: startup reattach list failed",
"status", status, "error", err)

continue
}

for _, s := range sessions {
if err := chatMgr.Reattach(rctx, s.ID); err != nil {
slog.Warn("chat: startup reattach failed",
"session_id", s.ID, "error", err)
}
}
}
}()

// Card + chat reconcile sweep: a single ticker fetches /containers once
// per tick and feeds both reconcilers. Two separate tickers used to
// produce identically-signed HMAC GETs back to back; the runner's
// replay cache rejected the second as a duplicate. The chat reconciler
// flips active/warm-idle sessions whose runner container has
// disappeared (claude crash, runner restart, OOM, manual docker kill)
// to cold so the UI can reopen.
if cfg.Runner.Enabled {
reconcileInterval := cfg.Runner.ReconcileIntervalDuration()
// The sweep takes the CardService (CardLookup) and the runner client
// (ReconcileClient: ListContainers + EndSession + Kill). It uses the
// runner's Docker state as the authoritative "is this container
// running?" input and the card store as the authoritative "should it
// be?" — see internal/runner/reconcile.go for why we no longer gate
// on card.runner_status.
runner.StartReconciliationSweep(ctx, svc, runnerClient, reconcileInterval, slog.Default())
runner.StartReconciliationSweep(ctx, svc, chatReconcilerAdapter{mgr: chatMgr}, runnerClient, reconcileInterval, slog.Default())

if reconcileInterval > 0 {
slog.Info("runner reconciliation sweep started", "interval", reconcileInterval)
Expand All @@ -310,7 +439,7 @@ func main() {
slog.Info("session log manager initialized")

// Create MCP server
mcpSrv := mcpserver.NewServer(svc, cfg.WorkflowSkillsDir)
mcpSrv := mcpserver.NewServer(svc, cfg.WorkflowSkillsDir, chatMgr)

mcpHandler := mcpserver.NewHandler(mcpSrv, cfg.MCPAPIKey)
if cfg.MCPAPIKey != "" {
Expand Down Expand Up @@ -344,6 +473,9 @@ func main() {
Theme: cfg.Theme,
Version: buildVersion(),
MCPHandler: mcpHandler,
ChatManager: chatMgr,
ChatHub: chatHub,
ChatConfig: &cfg.Chat,
})

slog.Info("MCP server registered", "endpoint", "/mcp")
Expand Down Expand Up @@ -466,6 +598,15 @@ func main() {
slog.Error("session manager shutdown error", "error", err)
}

if chatMgr != nil {
chatCloseCtx, chatCloseCancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := chatMgr.Close(chatCloseCtx); err != nil {
slog.Warn("chat manager close failed", "error", err)
}

chatCloseCancel()
}

// Phase 3: signal the rest of the app (timeout checker, syncers'
// periodic loops, runner subscribers) to wind down.
slog.Info("shutdown: phase=ctx_cancel")
Expand Down Expand Up @@ -642,3 +783,60 @@ func newSPAHandler(apiHandler http.Handler, fsys fs.FS) http.Handler {
fileServer.ServeHTTP(w, r)
})
}

// chatRunnerDisabled is a no-op RunnerClient used when the runner integration
// is disabled. Every operation returns an error so callers receive a clear
// "runner not enabled" message rather than a nil-pointer panic.
type chatRunnerDisabled struct{}

func (chatRunnerDisabled) StartChat(_ context.Context, _ chat.StartChatOpts) (string, error) {
return "", fmt.Errorf("chat: runner not enabled")
}

func (chatRunnerDisabled) EndChat(_ context.Context, _ string) error {
return fmt.Errorf("chat: runner not enabled")
}

func (chatRunnerDisabled) SendChatMessage(_ context.Context, _, _, _ string) error {
return fmt.Errorf("chat: runner not enabled")
}

func (chatRunnerDisabled) StreamLogs(ctx context.Context, _ string, _ func(chat.LogEntry)) error {
<-ctx.Done()

return ctx.Err()
}

// chatReconcilerAdapter adapts *chat.Manager to the runner.ChatReconciler
// surface. Keeps the chat package free of any runner-facing type while still
// letting the reconcile sweep enumerate orphan sessions and flip them cold.
type chatReconcilerAdapter struct {
mgr *chat.Manager
}

func (a chatReconcilerAdapter) ListActiveChatSessions(ctx context.Context) ([]runner.ChatSessionRef, error) {
active, err := a.mgr.ListSessions(ctx, chat.SessionFilter{Status: chat.StatusActive})
if err != nil {
return nil, fmt.Errorf("list active: %w", err)
}

warm, err := a.mgr.ListSessions(ctx, chat.SessionFilter{Status: chat.StatusWarmIdle})
if err != nil {
return nil, fmt.Errorf("list warm-idle: %w", err)
}

out := make([]runner.ChatSessionRef, 0, len(active)+len(warm))
for _, s := range active {
out = append(out, runner.ChatSessionRef{ID: s.ID, Status: string(s.Status)})
}

for _, s := range warm {
out = append(out, runner.ChatSessionRef{ID: s.ID, Status: string(s.Status)})
}

return out, nil
}

func (a chatReconcilerAdapter) EndChatSession(ctx context.Context, id string) error {
return a.mgr.EndSession(ctx, id)
}
45 changes: 45 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,51 @@ runner:
# Env: CONTEXTMATRIX_RUNNER_RECONCILE_INTERVAL
reconcile_interval: "60s"

# Chat (global chat panel)
chat:
# SQLite database for chat sessions and transcripts.
# CONTEXTMATRIX_CHAT_DB_PATH overrides this.
# Default: $XDG_STATE_HOME/contextmatrix/chats.db
# db_path: /var/lib/contextmatrix/chats.db

# How long a chat container survives after browser disconnects.
# CONTEXTMATRIX_CHAT_IDLE_TTL overrides this.
idle_ttl: 1h

# Maximum concurrent chat containers. Default is 8 — enough headroom for
# the multi-pane chat UI's 4 user-facing panes plus 2-4 agent-owned
# background sessions.
# CONTEXTMATRIX_CHAT_MAX_CONCURRENT overrides this.
max_concurrent: 8

# Claude model used when a chat is created without an explicit
# selection in the New Chat dialog. Must be a key in chat.models.
default_model: claude-sonnet-4-6

# Rough token budget for the rehydration payload sent to the runner on
# cold-reopen. Older transcript turns are dropped (first user turn and
# last 20 turns are always preserved) until the estimate fits.
resume_budget_tokens: 40000

# Force the rehydration phase off after this duration even if the
# agent never called chat_rehydration_complete. The first user message
# also ends the phase, so this is a belt-and-suspenders cap.
rehydration_timeout: 10m

# Allowlist of selectable models for new chats. The label is shown in
# the picker; max_tokens drives the context-window denominator in the
# ChatThread header indicator. Adding a new model is a single edit.
models:
claude-sonnet-4-6:
label: "Sonnet 4.6"
max_tokens: 1000000
claude-opus-4-7:
label: "Opus 4.7"
max_tokens: 1000000
claude-haiku-4-5-20251001:
label: "Haiku 4.5"
max_tokens: 200000

# GitHub authentication and integration.
# Used for boards git, task-skills git, issue importing, and branch listing.
github:
Expand Down
Loading
Loading