Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should not be committed to the repo. .claude/settings.local.json is a local Claude Code configuration file — the 'local' in the name indicates it is machine-specific. Add it to .gitignore and drop it from this PR. Committing it exposes the allowed Bash commands your dev environment permits, which is unnecessary noise in the repo.

"permissions": {
"allow": [
"Bash(docker compose up *)",
"Bash(curl -s http://localhost:8090/health)",
"Bash(go build *)",
"Bash(go test *)",
"Bash(docker compose *)",
"Bash(go run *)",
"Bash(gh pr *)",
"WebFetch(domain:api.github.com)"
]
}
}
35 changes: 35 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Langship Environment Configuration
# Copy this to .env.local or set these as environment variables

# === CRITICAL: Encryption Key ===
# Master key for sealing all sensitive credentials at rest (PAT tokens, AWS keys, GCP service accounts, KV secrets).
# - Any non-empty string (will be hashed to 32 bytes)
# - In production: use a secure key store (Vault, cloud KMS)
# - If lost, all sealed credentials become unrecoverable
# - Example: openssl rand -hex 32
FLOW_SECRET_KEY=your-secret-key-change-this-in-production

# === API Server ===
FLOW_ADDR=:8090
FLOW_CORS_ORIGINS=http://localhost:3000
FLOW_PUBLIC_URL=http://localhost:8090

# === Database ===
MONGO_URI=mongodb://localhost:27017
MONGO_DB=flow

# === Orchestration (Restate) ===
RESTATE_INGRESS_URL=http://localhost:8081
RESTATE_ADMIN_URL=http://localhost:9070
RESTATE_SERVICE_ADDR=:9080
RESTATE_DEPLOYMENT_URI=http://localhost:9080

# === Build (BuildKit) ===
BUILDKIT_HOST=tcp://127.0.0.1:1234

# === Storage (MinIO / S3) ===
MINIO_ENDPOINT=127.0.0.1:9000
MINIO_ACCESS_KEY=minio
MINIO_SECRET_KEY=minio12345
MINIO_BUCKET=flow-logs
MINIO_USE_SSL=false
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# API server only — no UI, no web build stage.
# The UI lives in its own container (web/Dockerfile) and proxies /api here.

FROM golang:1.24-alpine AS build
FROM golang:1.25-alpine AS build
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Adding a node? See the "Adding a node executor" section in
| `FLOW_ADDR` | `:8090` | API listen address |
| `FLOW_CORS_ORIGINS` | `*` (compose: `http://localhost:3000`) | CSV allowlist |
| `FLOW_PUBLIC_URL` | (empty) | Externally-reachable base URL for webhook callback URLs. Set to your `cloudflared` tunnel for GitHub webhooks. |
| `FLOW_SECRET_KEY` | (unset → credential writes refused) | Master key for AES-GCM sealing of credentials/secrets. Any string; hashed to 32 bytes. **Losing it makes sealed data unrecoverable.** |
| `FLOW_SECRET_KEY` | (unset → credential writes refused) | **Required for production.** Master key for AES-256-GCM encryption of sensitive data: agent PAT tokens, AWS keys, GCP service accounts, KV secrets. Any string; hashed to 32 bytes via SHA-256. **CRITICAL: Losing this key makes all sealed credentials unrecoverable.** Store securely (e.g., HashiCorp Vault, cloud KMS). For dev/test: any non-empty string. |
| `MONGO_URI` | (required; compose: `mongodb://localhost:27017`) | |
| `MONGO_DB` | `flow` | |
| `RESTATE_INGRESS_URL` | `http://localhost:8081` | |
Expand Down
10 changes: 10 additions & 0 deletions cmd/flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lyzrai/flow/pkg/executors"
"github.com/lyzrai/flow/pkg/logstore"
"github.com/lyzrai/flow/pkg/orchestrator"
"github.com/lyzrai/flow/pkg/secrets"
"github.com/lyzrai/flow/pkg/storage"
)

Expand Down Expand Up @@ -133,6 +134,15 @@ func serve() int {
}()
slog.Info("mongo connected", slog.String("db", mongoDB))

// FLOW_SECRET_KEY is required for credential encryption. Check early so
// the failure is explicit rather than surfacing per-request.
if !secrets.IsConfigured() {
slog.Error("FLOW_SECRET_KEY is not set, exiting",
slog.String("hint", "set FLOW_SECRET_KEY to any non-empty value for encryption of PAT tokens, AWS keys, GCP service accounts, and KV secrets"),
)
return 1
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This startup check is a breaking change for anyone currently running the service without FLOW_SECRET_KEY set (i.e. deployments that predated the secrets package). The README update documents the variable, but there is no migration notice or graceful degradation path — the service simply refuses to start. If this is intentional and the policy is "no key, no start", that should be explicit in the PR description and ideally in a CHANGELOG or migration guide. If there are existing deployments with unsealed agents, operators need to know they must set the key before upgrading, not discover it from a crash on deploy.

// MinIO is optional — when MINIO_ENDPOINT is unset we skip log
// archiving. Live SSE log streaming still works regardless.
var logs logstore.Store
Expand Down
53 changes: 44 additions & 9 deletions pkg/api/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,14 @@ func (s *Server) handleCreateAgent(w http.ResponseWriter, r *http.Request) {
Name: name,
RepoURL: repo,
Ref: ref,
PAT: strings.TrimSpace(body.PAT),
AuthStatus: storage.AuthUntested,
CreatedAt: now,
UpdatedAt: now,
}
if err := a.SetPAT(strings.TrimSpace(body.PAT)); err != nil {
writeError(w, http.StatusInternalServerError, err)
return
}
if err := s.agents.Create(r.Context(), a); err != nil {
writeError(w, http.StatusInternalServerError, err)
return
Expand All @@ -202,9 +205,17 @@ func (s *Server) handleDeleteAgent(w http.ResponseWriter, r *http.Request) {
// dangling hooks pointing at a dead agent ID.
if a, err := s.agents.Get(r.Context(), id); err == nil && a.WebhookID != 0 {
if repo, perr := github.ParseRepo(a.RepoURL); perr == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decrypt failure here is silently swallowed — if GetPAT() returns an error, the webhook is left installed and the agent is deleted without cleanup. Either propagate the error (return 500 before deleting) or log it explicitly so the leak is at least visible in observability tooling.

ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
_ = github.NewClient(a.PAT).UninstallWebhook(ctx, repo, a.WebhookID)
cancel()
pat, err := a.GetPAT()
if err != nil {
slog.WarnContext(r.Context(), "delete_agent_pat_decrypt_failed",
slog.String("agent_id", id),
slog.Any("error", err))
}
if pat != "" {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
Comment thread
BrawlerXull marked this conversation as resolved.
_ = github.NewClient(pat).UninstallWebhook(ctx, repo, a.WebhookID)
cancel()
}
}
}
if err := s.agents.Delete(r.Context(), id); err != nil {
Expand All @@ -223,14 +234,22 @@ func (s *Server) handleTestAgentAuth(w http.ResponseWriter, r *http.Request) {
writeStorageErr(w, err, "agent not found")
return
}
pat, err := a.GetPAT()
if err != nil {
slog.ErrorContext(r.Context(), "test_auth_pat_decrypt_failed",
slog.String("agent_id", id),
slog.Any("error", err))
writeError(w, http.StatusInternalServerError, errors.New("failed to retrieve agent credentials"))
return
}
repo, err := github.ParseRepo(a.RepoURL)
if err != nil {
writeError(w, http.StatusBadRequest, err)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()
authErr := github.NewClient(a.PAT).TestAuth(ctx, repo)
authErr := github.NewClient(pat).TestAuth(ctx, repo)

now := time.Now().UTC()
a.AuthCheckedAt = &now
Expand Down Expand Up @@ -268,7 +287,15 @@ func (s *Server) handleInstallWebhook(w http.ResponseWriter, r *http.Request) {
writeStorageErr(w, err, "agent not found")
return
}
if a.PAT == "" {
pat, err := a.GetPAT()
if err != nil {
slog.ErrorContext(r.Context(), "pat_decrypt_failed",
slog.String("agent_id", id),
slog.Any("error", err))
writeError(w, http.StatusInternalServerError, errors.New("failed to retrieve agent credentials"))
return
}
if pat == "" {
writeError(w, http.StatusBadRequest, errors.New("agent has no PAT — re-create with one"))
return
}
Expand All @@ -288,7 +315,7 @@ func (s *Server) handleInstallWebhook(w http.ResponseWriter, r *http.Request) {

ctx, cancel := context.WithTimeout(r.Context(), 20*time.Second)
defer cancel()
hookID, err := github.NewClient(a.PAT).InstallWebhook(ctx, repo, callback, secret)
hookID, err := github.NewClient(pat).InstallWebhook(ctx, repo, callback, secret)
if err != nil {
writeError(w, http.StatusBadGateway, err)
return
Expand All @@ -301,7 +328,7 @@ func (s *Server) handleInstallWebhook(w http.ResponseWriter, r *http.Request) {
a.UpdatedAt = now
if err := s.agents.Update(r.Context(), a); err != nil {
// Try to roll back the hook so we don't leak it.
_ = github.NewClient(a.PAT).UninstallWebhook(ctx, repo, hookID)
_ = github.NewClient(pat).UninstallWebhook(ctx, repo, hookID)
writeError(w, http.StatusInternalServerError, err)
return
}
Expand All @@ -319,14 +346,22 @@ func (s *Server) handleUninstallWebhook(w http.ResponseWriter, r *http.Request)
writeError(w, http.StatusBadRequest, errors.New("no webhook installed"))
return
}
pat, err := a.GetPAT()
if err != nil {
slog.ErrorContext(r.Context(), "pat_decrypt_failed",
slog.String("agent_id", id),
slog.Any("error", err))
writeError(w, http.StatusInternalServerError, errors.New("failed to retrieve agent credentials"))
return
}
repo, err := github.ParseRepo(a.RepoURL)
if err != nil {
writeError(w, http.StatusBadRequest, err)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()
if err := github.NewClient(a.PAT).UninstallWebhook(ctx, repo, a.WebhookID); err != nil {
if err := github.NewClient(pat).UninstallWebhook(ctx, repo, a.WebhookID); err != nil {
writeError(w, http.StatusBadGateway, err)
return
}
Expand Down
51 changes: 21 additions & 30 deletions pkg/executors/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"time"

"github.com/lyzrai/flow/pkg/engine"
"github.com/lyzrai/flow/pkg/github"
"github.com/lyzrai/flow/pkg/models"
"github.com/lyzrai/flow/pkg/storage"
)
Expand Down Expand Up @@ -66,6 +65,11 @@ func (e *BuildExecutor) Execute(ctx context.Context, node models.NodeDef, inputs
return nil, fmt.Errorf("build: load agent %q: %w", agentID, err)
}

pat, err := a.GetPAT()
if err != nil {
return nil, fmt.Errorf("build: decrypt PAT: %w", err)
}

mode := strParam(node.Parameters, "mode", "docker")
timeoutSec := intParam(node.Parameters, "timeoutSeconds", 600)
if timeoutSec < 10 {
Expand All @@ -89,7 +93,7 @@ func (e *BuildExecutor) Execute(ctx context.Context, node models.NodeDef, inputs
"main",
))

cloneDir, cleanup, err := cloneRepo(ctx, a, ref, commitSHA, time.Duration(timeoutSec)*time.Second)
cloneDir, cleanup, err := cloneRepo(ctx, a.RepoURL, a.Name, pat, ref, commitSHA, time.Duration(timeoutSec)*time.Second)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +119,7 @@ func (e *BuildExecutor) Execute(ctx context.Context, node models.NodeDef, inputs
"log_tail": logTail,
}
case "docker", "":
summary, logTail, runErr = runDockerMode(hardCtx, node, a, cloneDir, ref, commitSHA)
summary, logTail, runErr = runDockerMode(hardCtx, node, a.Name, pat, cloneDir, ref, commitSHA)
default:
return nil, fmt.Errorf("build: unknown mode %q (want docker|shell)", mode)
}
Expand Down Expand Up @@ -175,7 +179,7 @@ func runShellMode(ctx context.Context, node models.NodeDef, a *storage.Agent, cl

// --- docker (BuildKit) mode ---------------------------------------------

func runDockerMode(ctx context.Context, node models.NodeDef, a *storage.Agent, cloneDir, ref, commitSHA string) (map[string]any, string, error) {
func runDockerMode(ctx context.Context, node models.NodeDef, agentName, pat, cloneDir, ref, commitSHA string) (map[string]any, string, error) {
bkAddr := os.Getenv("BUILDKIT_HOST")
if bkAddr == "" {
// docker-compose publishes buildkitd on 127.0.0.1:1234, so a host
Expand All @@ -192,7 +196,7 @@ func runDockerMode(ctx context.Context, node models.NodeDef, a *storage.Agent, c
buildArgs := parseKVCSV(strParam(node.Parameters, "buildArgs", ""))

if imageName == "" {
imageName = a.Name // "owner/repo"
imageName = agentName // "owner/repo"
}
tag := commitSHA
if tag == "" {
Expand All @@ -202,7 +206,7 @@ func runDockerMode(ctx context.Context, node models.NodeDef, a *storage.Agent, c

contextDir := filepath.Join(cloneDir, filepath.Clean("/"+contextRel))

auth, insecure := authForRegistry(registry, a)
auth, insecure := authForRegistry(registry, pat)

slog.InfoContext(ctx, "build_run_docker",
slog.String("node", node.Name),
Expand Down Expand Up @@ -238,20 +242,19 @@ func runDockerMode(ctx context.Context, node models.NodeDef, a *storage.Agent, c
}

// authForRegistry decides what creds to send to BuildKit for `registry`.
// ghcr.io: use the agent's PAT (must include write:packages).
// ghcr.io: use the PAT (must include write:packages).
// The username is always "x-access-token" for GitHub token auth to ghcr.io.
// (Prior code using agentOwner() is no longer needed; x-access-token is the
// canonical dummy username GitHub accepts for PAT-based auth.)
// localhost:* and registry:* (compose-internal): anonymous + insecure.
Comment thread
BrawlerXull marked this conversation as resolved.
// Anything else: anonymous; user can wire a real auth path later.
func authForRegistry(registry string, a *storage.Agent) (map[string]registryCreds, bool) {
func authForRegistry(registry string, pat string) (map[string]registryCreds, bool) {
host := registryHostname(registry)
insecure := isInsecureRegistry(host)

if host == "ghcr.io" && a.PAT != "" {
owner := agentOwner(a)
if owner == "" {
owner = "x-access-token"
}
if host == "ghcr.io" && pat != "" {
return map[string]registryCreds{
Comment thread
BrawlerXull marked this conversation as resolved.
"ghcr.io": {Username: owner, Password: a.PAT},
"ghcr.io": {Username: "x-access-token", Password: pat},
}, false
}
return map[string]registryCreds{}, insecure
Expand Down Expand Up @@ -280,18 +283,6 @@ func isInsecureRegistry(host string) bool {
return false
}

// agentOwner extracts "owner" from an agent name shaped like "owner/repo".
// Used as the GHCR username when pushing.
func agentOwner(a *storage.Agent) string {
if i := strings.Index(a.Name, "/"); i > 0 {
return a.Name[:i]
}
if r, err := github.ParseRepo(a.RepoURL); err == nil {
return r.Owner
}
return ""
}

// parseKVCSV parses "k=v, k2=v2" into a map.
func parseKVCSV(s string) map[string]string {
out := map[string]string{}
Expand All @@ -315,16 +306,16 @@ func parseKVCSV(s string) map[string]string {

// --- clone helper --------------------------------------------------------

// cloneRepo shallow-clones a's repo into a fresh tmp dir, optionally
// cloneRepo shallow-clones the repo into a fresh tmp dir, optionally
// checking out a specific commit. Returns (cloneDir, cleanupFn, err).
func cloneRepo(ctx context.Context, a *storage.Agent, ref, commit string, timeout time.Duration) (string, func(), error) {
func cloneRepo(ctx context.Context, repoURL, agentName, pat, ref, commit string, timeout time.Duration) (string, func(), error) {
cloneDir, err := os.MkdirTemp("", "flow-build-*")
if err != nil {
return "", nil, fmt.Errorf("build: tmp dir: %w", err)
}
cleanup := func() { os.RemoveAll(cloneDir) }

cloneURL, err := authedCloneURL(a.RepoURL, a.PAT)
cloneURL, err := authedCloneURL(repoURL, pat)
if err != nil {
cleanup()
return "", nil, fmt.Errorf("build: clone url: %w", err)
Expand All @@ -334,7 +325,7 @@ func cloneRepo(ctx context.Context, a *storage.Agent, ref, commit string, timeou
defer cancel()

slog.InfoContext(ctx, "build_clone",
slog.String("agent", a.Name),
slog.String("agent", agentName),
slog.String("ref", ref),
slog.String("dir", cloneDir),
)
Expand Down
8 changes: 6 additions & 2 deletions pkg/executors/promote.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ func (e *PromoteExecutor) Execute(ctx context.Context, node models.NodeDef, inpu
if err != nil {
return nil, fmt.Errorf("promote: load agent %q: %w", agentID, err)
}
pat, err := a.GetPAT()
if err != nil {
return nil, fmt.Errorf("promote: decrypt PAT: %w", err)
}
repo, err := github.ParseRepo(a.RepoURL)
if err != nil {
return nil, fmt.Errorf("promote: parse repo %q: %w", a.RepoURL, err)
}
if a.PAT == "" {
if pat == "" {
return nil, errors.New("promote: agent has no PAT (need 'repo' scope to open PRs / merge)")
}

Expand Down Expand Up @@ -93,7 +97,7 @@ func (e *PromoteExecutor) Execute(ctx context.Context, node models.NodeDef, inpu
logger.Log(fmt.Sprintf("[promote:%s] %s/%s: %s → %s",
mode, repo.Owner, repo.Name, from, to))

cli := github.NewClient(a.PAT)
cli := github.NewClient(pat)
title := strParam(node.Parameters, "title", "")
body := strParam(node.Parameters, "body", "")
commitMsg := firstNonEmptyStr(title, fmt.Sprintf("Promote %s → %s", from, to))
Expand Down
Loading