diff --git a/pkg/chain/ethereum/tbtc.go b/pkg/chain/ethereum/tbtc.go index d2d9efaf63..5b359c2cae 100644 --- a/pkg/chain/ethereum/tbtc.go +++ b/pkg/chain/ethereum/tbtc.go @@ -1478,8 +1478,10 @@ func (tc *TbtcChain) GetWallet( walletRegistryWallet, err := tc.walletRegistry.GetWallet(wallet.EcdsaWalletID) if err != nil { - logger.Warnf( - "cannot get wallet registry data for wallet [0x%x]: [%v]", + logger.Errorf( + "wallet registry unavailable for wallet [0x%x]; "+ + "MembersIDsHash will be zero -- signer approval "+ + "operations will fail until the registry recovers: [%v]", wallet.EcdsaWalletID, err, ) diff --git a/pkg/covenantsigner/config.go b/pkg/covenantsigner/config.go index 16ede9a4f9..e11f7fec0a 100644 --- a/pkg/covenantsigner/config.go +++ b/pkg/covenantsigner/config.go @@ -10,7 +10,9 @@ type Config struct { // binds to. Empty defaults to loopback-only. ListenAddress string // AuthToken enables static Bearer authentication for signer endpoints. - // Non-loopback binds must set this. + // Non-loopback binds must set this. Prefer environment variables or + // config files over CLI flags to avoid exposing the token in + // /proc/PID/cmdline. AuthToken string // EnableSelfV1 exposes the self_v1 signer HTTP routes. Keep this disabled // for a qc_v1-first launch unless self_v1 has cleared its own go-live gate. diff --git a/pkg/covenantsigner/server.go b/pkg/covenantsigner/server.go index 96ad083120..917a9a5651 100644 --- a/pkg/covenantsigner/server.go +++ b/pkg/covenantsigner/server.go @@ -124,6 +124,7 @@ func Initialize( listener, err := net.Listen("tcp", server.httpServer.Addr) if err != nil { + cancelService() return nil, false, fmt.Errorf("failed to bind covenant signer port [%d]: %w", config.Port, err) } @@ -251,7 +252,7 @@ func newHandler(service *Service, serviceCtx context.Context, authToken string, } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/healthz" { + if r.Method == http.MethodGet && r.URL.Path == "/healthz" { mux.ServeHTTP(w, r) return } diff --git a/pkg/covenantsigner/server_test.go b/pkg/covenantsigner/server_test.go index 4b4f651e65..a81d4c853d 100644 --- a/pkg/covenantsigner/server_test.go +++ b/pkg/covenantsigner/server_test.go @@ -867,7 +867,7 @@ func TestSubmitHandlerPreCancelledContextStillSucceeds(t *testing.T) { type contextKey string -func TestSubmitHandlerPreservesContextValues(t *testing.T) { +func TestSubmitHandlerPreservesServiceContextValues(t *testing.T) { const testKey contextKey = "test-trace-id" const testValue = "trace-abc-123" @@ -889,15 +889,13 @@ func TestSubmitHandlerPreservesContextValues(t *testing.T) { t.Fatal(err) } - // Wrap the handler with middleware that injects a value into the request - // context. The detached context should preserve this value. - innerHandler := newHandler(service, context.Background(), "", true) - wrappedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - enrichedCtx := context.WithValue(r.Context(), testKey, testValue) - innerHandler.ServeHTTP(w, r.WithContext(enrichedCtx)) - }) + // Inject a value into the service context. The submit handler derives + // its timeout context from serviceCtx (not from the HTTP request), so + // values on the service context must be visible to the engine. + serviceCtx := context.WithValue(context.Background(), testKey, testValue) + handler := newHandler(service, serviceCtx, "", true) - server := httptest.NewServer(wrappedHandler) + server := httptest.NewServer(handler) defer server.Close() submitPayload := mustJSON(t, SignerSubmitInput{ @@ -925,7 +923,7 @@ func TestSubmitHandlerPreservesContextValues(t *testing.T) { defer mu.Unlock() if capturedValue != testValue { t.Fatalf( - "expected context value %q to be preserved through detachment, "+ + "expected service context value %q to be visible in engine, "+ "got %v", testValue, capturedValue, diff --git a/pkg/covenantsigner/service.go b/pkg/covenantsigner/service.go index 3d112f9e3f..fa339844d8 100644 --- a/pkg/covenantsigner/service.go +++ b/pkg/covenantsigner/service.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "errors" "fmt" "reflect" "sync" @@ -229,50 +230,28 @@ func (s *Service) loadPollJob(route TemplateID, input SignerPollInput) (*Job, er return job, nil } -func (s *Service) Submit(ctx context.Context, route TemplateID, input SignerSubmitInput) (StepResult, error) { - submitValidationOptions := validationOptions{ - migrationPlanQuoteTrustRoots: s.migrationPlanQuoteTrustRoots, - depositorTrustRoots: s.depositorTrustRoots, - custodianTrustRoots: s.custodianTrustRoots, - requireFreshMigrationPlanQuote: true, - migrationPlanQuoteVerificationNow: s.now(), - signerApprovalVerifier: s.signerApprovalVerifier, - } - if err := validateSubmitInput(route, input, submitValidationOptions); err != nil { - return StepResult{}, err - } - - normalizedRequest, err := normalizeRouteSubmitRequest( - input.Request, - validationOptions{ - migrationPlanQuoteTrustRoots: s.migrationPlanQuoteTrustRoots, - depositorTrustRoots: s.depositorTrustRoots, - custodianTrustRoots: s.custodianTrustRoots, - signerApprovalVerifier: s.signerApprovalVerifier, - }, - ) - if err != nil { - return StepResult{}, err - } - - requestDigest, err := requestDigestFromNormalized(normalizedRequest) - if err != nil { - return StepResult{}, err - } - +// createOrDedup creates a new job under the service mutex, or returns the +// existing job result if the route request is already known. Returns +// (job, nil, nil) for a new job, or (nil, result, nil) for a dedup hit. +func (s *Service) createOrDedup( + route TemplateID, + input SignerSubmitInput, + normalizedRequest RouteSubmitRequest, + requestDigest string, +) (*Job, *StepResult, error) { s.mutex.Lock() + defer s.mutex.Unlock() + if existing, ok, err := s.store.GetByRouteRequest(route, input.RouteRequestID); err != nil { - s.mutex.Unlock() - return StepResult{}, err + return nil, nil, err } else if ok { if existing.RequestDigest != requestDigest { - s.mutex.Unlock() - return StepResult{}, &inputError{ + return nil, nil, &inputError{ "routeRequestId already exists with a different request payload", } } - s.mutex.Unlock() - return mapJobResult(existing), nil + result := mapJobResult(existing) + return nil, &result, nil } requestIDPrefix := "" @@ -282,14 +261,12 @@ func (s *Service) Submit(ctx context.Context, route TemplateID, input SignerSubm case TemplateSelfV1: requestIDPrefix = "kcs_self" default: - s.mutex.Unlock() - return StepResult{}, fmt.Errorf("unsupported route: %s", route) + return nil, nil, fmt.Errorf("unsupported route: %s", route) } requestID, err := newRequestID(requestIDPrefix) if err != nil { - s.mutex.Unlock() - return StepResult{}, err + return nil, nil, err } now := s.now() @@ -309,10 +286,50 @@ func (s *Service) Submit(ctx context.Context, route TemplateID, input SignerSubm } if err := s.store.Put(job); err != nil { - s.mutex.Unlock() + return nil, nil, err + } + + return job, nil, nil +} + +func (s *Service) Submit(ctx context.Context, route TemplateID, input SignerSubmitInput) (StepResult, error) { + submitValidationOptions := validationOptions{ + migrationPlanQuoteTrustRoots: s.migrationPlanQuoteTrustRoots, + depositorTrustRoots: s.depositorTrustRoots, + custodianTrustRoots: s.custodianTrustRoots, + requireFreshMigrationPlanQuote: true, + migrationPlanQuoteVerificationNow: s.now(), + signerApprovalVerifier: s.signerApprovalVerifier, + } + if err := validateSubmitInput(route, input, submitValidationOptions); err != nil { return StepResult{}, err } - s.mutex.Unlock() + + normalizedRequest, err := normalizeRouteSubmitRequest( + input.Request, + validationOptions{ + migrationPlanQuoteTrustRoots: s.migrationPlanQuoteTrustRoots, + depositorTrustRoots: s.depositorTrustRoots, + custodianTrustRoots: s.custodianTrustRoots, + signerApprovalVerifier: s.signerApprovalVerifier, + }, + ) + if err != nil { + return StepResult{}, err + } + + requestDigest, err := requestDigestFromNormalized(normalizedRequest) + if err != nil { + return StepResult{}, err + } + + job, existingResult, err := s.createOrDedup(route, input, normalizedRequest, requestDigest) + if err != nil { + return StepResult{}, err + } + if existingResult != nil { + return *existingResult, nil + } transition, err := s.engine.OnSubmit(ctx, job) if err != nil { @@ -329,7 +346,7 @@ func (s *Service) Submit(ctx context.Context, route TemplateID, input SignerSubm s.mutex.Lock() defer s.mutex.Unlock() - currentJob, ok, err := s.store.GetByRequestID(requestID) + currentJob, ok, err := s.store.GetByRequestID(job.RequestID) if err != nil { return StepResult{}, err } @@ -378,7 +395,7 @@ func (s *Service) Poll(ctx context.Context, route TemplateID, input SignerPollIn transition, pollErr := s.engine.OnPoll(ctx, job) if pollErr != nil { - if pollErr != errJobNotFound { + if !errors.Is(pollErr, errJobNotFound) { return StepResult{}, pollErr } } @@ -398,7 +415,7 @@ func (s *Service) Poll(ctx context.Context, route TemplateID, input SignerPollIn return mapJobResult(currentJob), nil } - if pollErr == errJobNotFound { + if errors.Is(pollErr, errJobNotFound) { applyTransition(currentJob, &Transition{ State: JobStateFailed, Reason: ReasonJobNotFound, diff --git a/pkg/covenantsigner/store.go b/pkg/covenantsigner/store.go index 3684edb73e..1e70bb2586 100644 --- a/pkg/covenantsigner/store.go +++ b/pkg/covenantsigner/store.go @@ -16,11 +16,13 @@ const jobsDirectory = "covenant-signer/jobs" const lockFileName = ".lock" type Store struct { - handle persistence.BasicHandle - mutex sync.Mutex - lockFile *os.File - byRequestID map[string]*Job - byRouteKey map[string]string + handle persistence.BasicHandle + mutex sync.Mutex + lockFile *os.File + byRequestID map[string]*Job + byRouteKey map[string]string + poisonedRoutes map[string]bool + skippedJobFiles []string } // NewStore creates a new Store backed by the given persistence handle. When @@ -30,9 +32,10 @@ type Store struct { // error. When dataDir is empty (in-memory handles), file locking is skipped. func NewStore(handle persistence.BasicHandle, dataDir string) (*Store, error) { store := &Store{ - handle: handle, - byRequestID: make(map[string]*Job), - byRouteKey: make(map[string]string), + handle: handle, + byRequestID: make(map[string]*Job), + byRouteKey: make(map[string]string), + poisonedRoutes: make(map[string]bool), } if dataDir != "" { @@ -55,6 +58,12 @@ func NewStore(handle persistence.BasicHandle, dataDir string) (*Store, error) { // acquireFileLock creates and acquires an exclusive non-blocking advisory lock // on a lock file inside the jobs directory. The returned file handle must be // kept open for the lifetime of the lock; closing it releases the lock. +// +// IMPORTANT: This uses POSIX flock(2), which is advisory and Linux-specific. +// It protects against concurrent processes on the same host but does NOT +// protect against concurrent access over network filesystems (NFS, EFS, +// CIFS). The data directory MUST reside on local or block-level storage +// with single-writer access (e.g., Kubernetes ReadWriteOnce PV). func acquireFileLock(dataDir string) (*os.File, error) { lockPath := filepath.Join(dataDir, jobsDirectory, lockFileName) @@ -107,10 +116,50 @@ func (s *Store) Close() error { return err } +var errPoisonedRouteKey = fmt.Errorf( + "route key belongs to a job that could not be loaded; " + + "manual recovery of the corrupt job file is required", +) + func routeKey(route TemplateID, routeRequestID string) string { return fmt.Sprintf("%s:%s", route, routeRequestID) } +// poisonRouteFromPartialJob attempts a lenient parse of content to extract +// Route and RouteRequestID. If successful, the route key is marked as +// poisoned so that future submissions are rejected rather than silently +// creating a duplicate job. +func (s *Store) poisonRouteFromPartialJob(content []byte, fileName string) { + var partial struct { + Route TemplateID `json:"Route"` + RouteRequestID string `json:"RouteRequestID"` + } + if err := json.Unmarshal(content, &partial); err != nil { + return + } + if partial.Route == "" || partial.RouteRequestID == "" { + return + } + key := routeKey(partial.Route, partial.RouteRequestID) + s.poisonedRoutes[key] = true + logger.Warnf( + "poisoned route key [%s] from skipped job file [%s]", + key, + fileName, + ) +} + +// SkippedJobFiles returns the file names of job files that could not be +// loaded during startup. Operators should investigate and repair or remove +// these files. +func (s *Store) SkippedJobFiles() []string { + s.mutex.Lock() + defer s.mutex.Unlock() + result := make([]string, len(s.skippedJobFiles)) + copy(result, s.skippedJobFiles) + return result +} + func cloneJob(job *Job) (*Job, error) { payload, err := json.Marshal(job) if err != nil { @@ -155,6 +204,8 @@ func (s *Store) load() error { dataChan, errorChan := s.handle.ReadAll() + var loaded, skipped int + for dataChan != nil || errorChan != nil { select { case descriptor, ok := <-dataChan: @@ -174,6 +225,8 @@ func (s *Store) load() error { descriptor.Name(), err, ) + s.skippedJobFiles = append(s.skippedJobFiles, descriptor.Name()) + skipped++ continue } @@ -184,6 +237,12 @@ func (s *Store) load() error { descriptor.Name(), err, ) + // Attempt partial parse to extract route info for + // poisoning. If the route key is recoverable, block + // future submissions for this route to preserve dedupe. + s.poisonRouteFromPartialJob(content, descriptor.Name()) + s.skippedJobFiles = append(s.skippedJobFiles, descriptor.Name()) + skipped++ continue } @@ -201,31 +260,70 @@ func (s *Store) load() error { // candidate's timestamp is valid, the failure is on // the existing job -- replace it. Otherwise skip the // candidate. - if _, parseErr := time.Parse(time.RFC3339Nano, job.UpdatedAt); parseErr != nil { + _, existingParseErr := time.Parse(time.RFC3339Nano, existing.UpdatedAt) + _, candidateParseErr := time.Parse(time.RFC3339Nano, job.UpdatedAt) + + switch { + case candidateParseErr != nil && existingParseErr == nil: + // Only the candidate is unparseable; keep existing. logger.Warnf( - "skipping job [%s] with invalid timestamp on duplicate route key [%s/%s]: [%v]", + "skipping job [%s] with invalid timestamp on duplicate route key [%s/%s] (keeping [%s]): [%v]", job.RequestID, job.Route, job.RouteRequestID, + existing.RequestID, err, ) continue + case candidateParseErr == nil && existingParseErr != nil: + // Only the existing is unparseable; replace with candidate. + logger.Warnf( + "replacing job [%s] with invalid timestamp on duplicate route key [%s/%s]: [%v]", + existing.RequestID, + job.Route, + job.RouteRequestID, + err, + ) + default: + // Both timestamps are unparseable. Use + // lexicographic RequestID as a deterministic + // tiebreaker so the outcome does not depend + // on file iteration order. + if existing.RequestID <= job.RequestID { + logger.Warnf( + "skipping job [%s] on duplicate route key [%s/%s] (keeping [%s], lexicographic tiebreak): [%v]", + job.RequestID, + job.Route, + job.RouteRequestID, + existing.RequestID, + err, + ) + continue + } + logger.Warnf( + "replacing job [%s] on duplicate route key [%s/%s] (lexicographic tiebreak): [%v]", + existing.RequestID, + job.Route, + job.RouteRequestID, + err, + ) } - logger.Warnf( - "replacing job [%s] with invalid timestamp on duplicate route key [%s/%s]: [%v]", - existing.RequestID, - job.Route, - job.RouteRequestID, - err, - ) } else if existingIsNewerOrSame { continue } + + // Remove the superseded job from the primary index + // so stale entries do not leak in byRequestID. + delete(s.byRequestID, existingID) } } s.byRequestID[job.RequestID] = job s.byRouteKey[key] = job.RequestID + // A valid job for this route supersedes any earlier poison + // from a malformed sibling file for the same route key. + delete(s.poisonedRoutes, key) + loaded++ case err, ok := <-errorChan: if !ok { errorChan = nil @@ -237,6 +335,16 @@ func (s *Store) load() error { } } + if skipped > 0 { + logger.Warnf( + "store load complete: loaded [%d] jobs, skipped [%d] unreadable or malformed files", + loaded, + skipped, + ) + } else if loaded > 0 { + logger.Infof("store load complete: loaded [%d] jobs", loaded) + } + return nil } @@ -261,7 +369,13 @@ func (s *Store) GetByRouteRequest(route TemplateID, routeRequestID string) (*Job s.mutex.Lock() defer s.mutex.Unlock() - requestID, ok := s.byRouteKey[routeKey(route, routeRequestID)] + key := routeKey(route, routeRequestID) + + if s.poisonedRoutes[key] { + return nil, false, errPoisonedRouteKey + } + + requestID, ok := s.byRouteKey[key] if !ok { return nil, false, nil } diff --git a/pkg/covenantsigner/store_test.go b/pkg/covenantsigner/store_test.go index fd271530ed..5c1e1589e8 100644 --- a/pkg/covenantsigner/store_test.go +++ b/pkg/covenantsigner/store_test.go @@ -205,7 +205,7 @@ func TestStoreLoadSelectsNewestJobForDuplicateRouteKeys(t *testing.T) { } } -func TestStoreLoadFailsOnInvalidUpdatedAtForDuplicateRouteKeys(t *testing.T) { +func TestStoreLoadResolvesInvalidUpdatedAtForDuplicateRouteKeys(t *testing.T) { handle := newMemoryHandle() first := &Job{ diff --git a/pkg/tbtc/chain.go b/pkg/tbtc/chain.go index c70e4b73c0..391906c91a 100644 --- a/pkg/tbtc/chain.go +++ b/pkg/tbtc/chain.go @@ -415,6 +415,12 @@ type DepositChainRequest struct { } // WalletChainData represents wallet data stored on-chain. +// +// EcdsaWalletID and MembersIDsHash are sourced from the wallet registry. +// When the registry is unavailable during a fault-isolated GetWallet call, +// these fields contain their zero values. Consumers that require registry +// data (e.g. signer approval certificate computation) must guard against +// zero values -- see ErrMissingWalletID and ErrMissingMembersIDsHash. type WalletChainData struct { EcdsaWalletID [32]byte MembersIDsHash [32]byte diff --git a/pkg/tbtc/covenant_signer.go b/pkg/tbtc/covenant_signer.go index ba1ed8b7e2..9858c9cf77 100644 --- a/pkg/tbtc/covenant_signer.go +++ b/pkg/tbtc/covenant_signer.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/covenantsigner" + "github.com/keep-network/keep-core/pkg/internal/canonicaljson" "github.com/keep-network/keep-core/pkg/tecdsa" ) @@ -870,10 +871,14 @@ func buildWitnessSignatureBytes(signature *tecdsa.Signature) ([]byte, error) { } func computeQcV1SignerHandoffPayloadHash(payload map[string]any) (string, error) { - // The handoff bundle ID is content-addressed using Go's stable JSON map-key - // ordering. Future non-Go custodian consumers that want to recompute this - // hash must preserve the same canonical field set and serialization rules. - rawPayload, err := json.Marshal(payload) + // The handoff bundle ID is content-addressed using canonical JSON + // (alphabetical key ordering, no HTML escaping, no trailing newline). + // Go's encoding/json.Marshal already sorts map keys alphabetically + // (since Go 1.12), so using canonicaljson.Marshal produces identical + // output for non-HTML content while also disabling HTML escaping for + // safety. Non-Go custodian consumers that recompute this hash must + // use the same canonical serialization rules. + rawPayload, err := canonicaljson.Marshal(payload) if err != nil { return "", err } diff --git a/pkg/tbtc/signer_approval_certificate.go b/pkg/tbtc/signer_approval_certificate.go index 72a5dfd5c8..442e995ccd 100644 --- a/pkg/tbtc/signer_approval_certificate.go +++ b/pkg/tbtc/signer_approval_certificate.go @@ -22,12 +22,18 @@ var ( // ErrMissingWalletID is returned when wallet chain data does not // include a wallet ID, typically because the wallet registry was // unavailable during a fault-isolated GetWallet call. - ErrMissingWalletID = fmt.Errorf("wallet chain data must include wallet ID") + ErrMissingWalletID = fmt.Errorf( + "wallet chain data must include wallet ID; " + + "the wallet registry may be unavailable", + ) // ErrMissingMembersIDsHash is returned when wallet chain data does // not include a members IDs hash, typically because the wallet // registry was unavailable during a fault-isolated GetWallet call. - ErrMissingMembersIDsHash = fmt.Errorf("wallet chain data must include members IDs hash") + ErrMissingMembersIDsHash = fmt.Errorf( + "wallet chain data must include members IDs hash; " + + "the wallet registry may be unavailable", + ) ) const (