From 6547392285efcc44b9f89543bab7418ad448b67d Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Thu, 26 Mar 2026 11:52:41 +0100 Subject: [PATCH 1/2] speed up local CRE startup by parallelizing secret generation and JD linking --- system-tests/lib/cre/don.go | 169 ++++++++++------ system-tests/lib/cre/don_test.go | 185 ++++++++++++++++++ .../lib/cre/environment/environment.go | 1 + system-tests/lib/cre/types.go | 43 +++- system-tests/lib/cre/types_test.go | 92 +++++++++ 5 files changed, 429 insertions(+), 61 deletions(-) create mode 100644 system-tests/lib/cre/don_test.go create mode 100644 system-tests/lib/cre/types_test.go diff --git a/system-tests/lib/cre/don.go b/system-tests/lib/cre/don.go index 9f0c6c7073d..97bebf02bc8 100644 --- a/system-tests/lib/cre/don.go +++ b/system-tests/lib/cre/don.go @@ -3,11 +3,11 @@ package cre import ( "context" "fmt" + "math" "net/url" "slices" "strconv" "strings" - "sync" "time" "github.com/pkg/errors" @@ -22,6 +22,7 @@ import ( keystone_changeset "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" ks_contracts_op "github.com/smartcontractkit/chainlink/deployment/keystone/changeset/operations/contracts" + "github.com/smartcontractkit/chainlink-testing-framework/framework" "github.com/smartcontractkit/chainlink-testing-framework/framework/clclient" "github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain" "github.com/smartcontractkit/chainlink-testing-framework/framework/components/clnode" @@ -30,6 +31,7 @@ import ( chainselectors "github.com/smartcontractkit/chain-selectors" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains" @@ -227,7 +229,7 @@ func (d *Don) GetName() string { func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Output) (*Don, error) { don := &Don{ - Nodes: make([]*Node, 0), + Nodes: make([]*Node, len(donMetadata.NodesMetadata)), Name: donMetadata.Name, ID: donMetadata.ID, Flags: donMetadata.Flags, @@ -236,8 +238,6 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou chainCapabilityIndex: donMetadata.ns.chainCapabilityIndex, } - mu := &sync.Mutex{} - errgroup := errgroup.Group{} for idx, nodeMetadata := range donMetadata.NodesMetadata { errgroup.Go(func() error { @@ -246,10 +246,7 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou return fmt.Errorf("failed to create node %d: %w", idx, err) } node.DON = don - - mu.Lock() - don.Nodes = append(don.Nodes, node) - mu.Unlock() + don.Nodes[idx] = node return nil }) @@ -274,8 +271,6 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou } func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.Blockchain, cldfEnv *cldf.Environment) error { - mu := &sync.Mutex{} - jd, ok := cldfEnv.Offchain.(*jd.JobDistributor) if !ok { return fmt.Errorf("offchain environment is not a *.jd.JobDistributor, but %T", cldfEnv.Offchain) @@ -293,9 +288,15 @@ func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.B for _, role := range node.Roles { switch role { case RoleWorker, RoleBootstrap: + chainConfigStart := time.Now() if err := createJDChainConfigs(ctx, node, supportedChains, jd); err != nil { return fmt.Errorf("failed to create supported chains in node %s: %w", node.Name, err) } + framework.L.Info(). + Str("don", d.Name). + Str("node", node.Name). + Float64("duration_s", roundSeconds(time.Since(chainConfigStart))). + Msg("JD chain-config setup completed") case RoleGateway: // no chains configuration needed for gateway nodes default: @@ -303,9 +304,7 @@ func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.B } } - mu.Lock() d.Nodes[idx] = node - mu.Unlock() return nil }) @@ -451,7 +450,19 @@ type JDChainConfigInput struct { ChainType string } -func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd *jd.JobDistributor) error { +type nodeChainConfigLister interface { + ListNodeChainConfigs(context.Context, *nodev1.ListNodeChainConfigsRequest, ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) +} + +var ( + jdChainConfigPollInterval = 200 * time.Millisecond + jdChainConfigPollTimeout = 10 * time.Second + jdChainConfigRPCTimeout = 3 * time.Second +) + +func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd nodeChainConfigLister) error { + ocr2BundleIDsByType := make(map[string]string) + for _, chain := range supportedChains { var account string chainIDStr := strconv.FormatUint(chain.ChainID(), 10) @@ -507,41 +518,35 @@ func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockc if chain.IsFamily(blockchain.FamilyTron) { chainType = strings.ToUpper(blockchain.FamilyEVM) } - ocr2BundleID, createErr := n.Clients.GQLClient.FetchOCR2KeyBundleID(ctx, chainType) - if createErr != nil { - return fmt.Errorf("failed to fetch OCR2 key bundle id for node %s: %w", n.Name, createErr) - } - if ocr2BundleID == "" { - return fmt.Errorf("no OCR2 key bundle id found for node %s", n.Name) - } + ocr2BundleID, ok := ocr2BundleIDsByType[chainType] + if !ok { + fetchErr := error(nil) + ocr2BundleID, fetchErr = n.Clients.GQLClient.FetchOCR2KeyBundleID(ctx, chainType) + if fetchErr != nil { + return fmt.Errorf("failed to fetch OCR2 key bundle id for node %s: %w", n.Name, fetchErr) + } + if ocr2BundleID == "" { + return fmt.Errorf("no OCR2 key bundle id found for node %s", n.Name) + } + ocr2BundleIDsByType[chainType] = ocr2BundleID + } if n.Keys.OCR2BundleIDs == nil { n.Keys.OCR2BundleIDs = make(map[string]string) } - n.Keys.OCR2BundleIDs[strings.ToLower(chainType)] = ocr2BundleID - // retry twice with 5 seconds interval to create JobDistributorChainConfig - retryErr := retry.Do(ctx, retry.WithMaxDuration(10*time.Second, retry.NewConstant(3*time.Second)), func(ctx context.Context) error { - // check the node chain config to see if this chain already exists - nodeChainConfigs, listErr := jd.ListNodeChainConfigs(context.Background(), &nodev1.ListNodeChainConfigsRequest{ - Filter: &nodev1.ListNodeChainConfigsRequest_Filter{ - NodeIds: []string{n.JobDistributorDetails.NodeID}, - }}) - if listErr != nil { - return retry.RetryableError(fmt.Errorf("failed to list node chain configs for node %s, retrying..: %w", n.Name, listErr)) + // Retry create+observe to preserve the original JD behavior. + retryErr := retry.Do(ctx, retry.WithMaxDuration(jdChainConfigPollTimeout, retry.NewConstant(3*time.Second)), func(ctx context.Context) error { + nodeChainConfigIDs, err := listNodeChainConfigIDs(ctx, jd, n.JobDistributorDetails.NodeID) + if err != nil { + return retry.RetryableError(fmt.Errorf("failed to list node chain configs for node %s: %w", n.Name, err)) } - if nodeChainConfigs != nil { - for _, chainConfig := range nodeChainConfigs.ChainConfigs { - if chainConfig.Chain.Id == chainIDStr { - return nil - } - } + if _, exists := nodeChainConfigIDs[chainIDStr]; exists { + return nil } - // we need to create JD chain config for each chain, because later on changestes ask the node for that chain data - // each node needs to have OCR2 enabled, because p2pIDs are used by some contracts to identify nodes (e.g. capability registry) - _, createErr = n.Clients.GQLClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{ + _, err = n.Clients.GQLClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{ JobDistributorID: n.JobDistributorDetails.JDID, ChainID: chainIDStr, ChainType: chainType, @@ -554,23 +559,48 @@ func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockc Ocr2KeyBundleID: ocr2BundleID, Ocr2Plugins: `{}`, }) - // TODO: add a check if the chain config failed because of a duplicate in that case, should we update or return success? - if createErr != nil { - return createErr + if err != nil { + return err } - // JD silently fails to update nodeChainConfig. Therefore, we fetch the node config and - // if it's not updated , throw an error return retry.RetryableError(errors.New("retrying CreateChainConfig in JD")) }) - if retryErr != nil { return fmt.Errorf("failed to create JD chain configuration for node %s: %w", n.Name, retryErr) } } + return nil } +func listNodeChainConfigIDs(ctx context.Context, jd nodeChainConfigLister, nodeID string) (map[string]struct{}, error) { + rpcCtx, cancel := context.WithTimeout(ctx, jdChainConfigRPCTimeout) + defer cancel() + + resp, err := jd.ListNodeChainConfigs(rpcCtx, &nodev1.ListNodeChainConfigsRequest{ + Filter: &nodev1.ListNodeChainConfigsRequest_Filter{ + NodeIds: []string{nodeID}, + }, + }) + if err != nil { + return nil, err + } + + chainIDs := make(map[string]struct{}) + if resp == nil { + return chainIDs, nil + } + + for _, chainConfig := range resp.ChainConfigs { + if chainConfig.GetChain() == nil { + continue + } + chainIDs[chainConfig.Chain.Id] = struct{}{} + } + + return chainIDs, nil +} + // AcceptJob accepts the job proposal for the given job proposal spec func (n *Node) AcceptJob(ctx context.Context, spec string) error { // fetch JD to get the job proposals @@ -773,21 +803,46 @@ func LinkToJobDistributor(ctx context.Context, input *LinkDonsToJDInput) error { return errors.New("input is nil") } - var nodeIDs []string + start := time.Now() + dons := input.Dons.List() + donMetadata := input.Topology.DonsMetadata.List() + nodeIDsByDON := make([][]string, len(dons)) - for idx, don := range input.Dons.List() { - supportedChains, schErr := findDonSupportedChains(input.Topology.DonsMetadata.List()[idx], input.Blockchains) - if schErr != nil { - return errors.Wrap(schErr, "failed to find supported chains for DON") - } + errGroup, groupCtx := errgroup.WithContext(ctx) + for idx, don := range dons { + errGroup.Go(func() error { + donStart := time.Now() + supportedChains, schErr := findDonSupportedChains(donMetadata[idx], input.Blockchains) + if schErr != nil { + return errors.Wrap(schErr, "failed to find supported chains for DON") + } - if err := registerWithJD(ctx, don, supportedChains, input.CldfEnvironment); err != nil { - return fmt.Errorf("failed to register DON with JD: %w", err) - } - nodeIDs = append(nodeIDs, don.JDNodeIDs()...) + if err := registerWithJD(groupCtx, don, supportedChains, input.CldfEnvironment); err != nil { + return fmt.Errorf("failed to register DON %s with JD: %w", don.Name, err) + } + + nodeIDsByDON[idx] = don.JDNodeIDs() + framework.L.Info(). + Str("don", don.Name). + Float64("duration_s", roundSeconds(time.Since(donStart))). + Msg("JD registration completed for DON") + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return err + } + + nodeIDs := make([]string, 0) + for _, donNodeIDs := range nodeIDsByDON { + nodeIDs = append(nodeIDs, donNodeIDs...) } input.CldfEnvironment.NodeIDs = nodeIDs + framework.L.Info(). + Float64("duration_s", roundSeconds(time.Since(start))). + Msg("Post-start JD linking completed") return nil } @@ -824,6 +879,10 @@ func findDonSupportedChains(donMetadata *DonMetadata, bcs []blockchains.Blockcha return chains, nil } +func roundSeconds(d time.Duration) float64 { + return math.Round(d.Seconds()*10) / 10 +} + // Make DonMetadata also implement it, just in case? type KeystoneDON interface { KeystoneDONConfig() ks_contracts_op.ConfigureKeystoneDON diff --git a/system-tests/lib/cre/don_test.go b/system-tests/lib/cre/don_test.go new file mode 100644 index 00000000000..dfde624ec68 --- /dev/null +++ b/system-tests/lib/cre/don_test.go @@ -0,0 +1,185 @@ +package cre + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + cldf_chain "github.com/smartcontractkit/chainlink-deployments-framework/chain" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain" + webclient "github.com/smartcontractkit/chainlink/deployment/environment/web/sdk/client" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains" + "github.com/smartcontractkit/chainlink/system-tests/lib/crypto" +) + +type fakeGQLClient struct { + webclient.Client + + fetchOCR2KeyBundleIDFn func(context.Context, string) (string, error) + createJobDistributorChainConfigFn func(context.Context, webclient.JobDistributorChainConfigInput) (string, error) + + fetchOCR2KeyBundleIDCalls []string + createJobDistributorChainConfigCalls []webclient.JobDistributorChainConfigInput +} + +func (f *fakeGQLClient) FetchOCR2KeyBundleID(ctx context.Context, chainType string) (string, error) { + f.fetchOCR2KeyBundleIDCalls = append(f.fetchOCR2KeyBundleIDCalls, chainType) + if f.fetchOCR2KeyBundleIDFn != nil { + return f.fetchOCR2KeyBundleIDFn(ctx, chainType) + } + return "bundle-id", nil +} + +func (f *fakeGQLClient) CreateJobDistributorChainConfig(ctx context.Context, in webclient.JobDistributorChainConfigInput) (string, error) { + f.createJobDistributorChainConfigCalls = append(f.createJobDistributorChainConfigCalls, in) + if f.createJobDistributorChainConfigFn != nil { + return f.createJobDistributorChainConfigFn(ctx, in) + } + return "config-id", nil +} + +type fakeJDChainConfigLister struct { + chainIDsByNodeID map[string]map[string]struct{} +} + +func (f *fakeJDChainConfigLister) ListNodeChainConfigs(_ context.Context, in *nodev1.ListNodeChainConfigsRequest, _ ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { + resp := &nodev1.ListNodeChainConfigsResponse{} + if in.GetFilter() == nil || len(in.GetFilter().GetNodeIds()) == 0 { + return resp, nil + } + + for chainID := range f.chainIDsByNodeID[in.GetFilter().GetNodeIds()[0]] { + resp.ChainConfigs = append(resp.ChainConfigs, &nodev1.ChainConfig{ + Chain: &nodev1.Chain{Id: chainID}, + }) + } + + return resp, nil +} + +type fakeBlockchain struct { + chainID uint64 + chainFamily string +} + +func (f fakeBlockchain) ChainSelector() uint64 { return f.chainID } +func (f fakeBlockchain) ChainID() uint64 { return f.chainID } +func (f fakeBlockchain) ChainFamily() string { return f.chainFamily } +func (f fakeBlockchain) IsFamily(chainFamily string) bool { + return f.chainFamily == chainFamily +} +func (f fakeBlockchain) Fund(context.Context, string, uint64) error { return nil } +func (f fakeBlockchain) CtfOutput() *blockchain.Output { return nil } +func (f fakeBlockchain) ToCldfChain() (cldf_chain.BlockChain, error) { + return nil, nil +} + +var _ blockchains.Blockchain = fakeBlockchain{} + +func TestCreateJDChainConfigsSkipsExistingConfigs(t *testing.T) { + t.Parallel() + + node := mustNewTestNode(t) + gql := &fakeGQLClient{} + node.Clients.GQLClient = gql + jd := &fakeJDChainConfigLister{ + chainIDsByNodeID: map[string]map[string]struct{}{ + node.JobDistributorDetails.NodeID: {"111": {}}, + }, + } + + err := createJDChainConfigs(context.Background(), node, []blockchains.Blockchain{ + fakeBlockchain{chainID: 111, chainFamily: blockchain.FamilyEVM}, + }, jd) + + require.NoError(t, err) + require.Empty(t, gql.createJobDistributorChainConfigCalls) +} + +func TestCreateJDChainConfigsCreatesMissingConfigsAndReusesBundleIDs(t *testing.T) { + t.Parallel() + + node := mustNewTestNode(t) + jd := &fakeJDChainConfigLister{ + chainIDsByNodeID: map[string]map[string]struct{}{ + node.JobDistributorDetails.NodeID: {}, + }, + } + gql := &fakeGQLClient{ + createJobDistributorChainConfigFn: func(_ context.Context, in webclient.JobDistributorChainConfigInput) (string, error) { + jd.chainIDsByNodeID[node.JobDistributorDetails.NodeID][in.ChainID] = struct{}{} + return "created-" + in.ChainID, nil + }, + } + node.Clients.GQLClient = gql + + err := createJDChainConfigs(context.Background(), node, []blockchains.Blockchain{ + fakeBlockchain{chainID: 111, chainFamily: blockchain.FamilyEVM}, + fakeBlockchain{chainID: 222, chainFamily: blockchain.FamilyEVM}, + }, jd) + + require.NoError(t, err) + require.Len(t, gql.createJobDistributorChainConfigCalls, 2) + require.Equal(t, []string{"EVM"}, gql.fetchOCR2KeyBundleIDCalls) + require.Equal(t, "bundle-id", node.Keys.OCR2BundleIDs["evm"]) +} + +func TestCreateJDChainConfigsFailsVerificationOnTimeout(t *testing.T) { + node := mustNewTestNode(t) + jd := &fakeJDChainConfigLister{ + chainIDsByNodeID: map[string]map[string]struct{}{ + node.JobDistributorDetails.NodeID: {}, + }, + } + node.Clients.GQLClient = &fakeGQLClient{} + + originalInterval := jdChainConfigPollInterval + originalTimeout := jdChainConfigPollTimeout + jdChainConfigPollInterval = time.Millisecond + jdChainConfigPollTimeout = 5 * time.Millisecond + defer func() { + jdChainConfigPollInterval = originalInterval + jdChainConfigPollTimeout = originalTimeout + }() + + err := createJDChainConfigs(context.Background(), node, []blockchains.Blockchain{ + fakeBlockchain{chainID: 111, chainFamily: blockchain.FamilyEVM}, + }, jd) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to create JD chain configuration") +} + +func mustNewTestNode(t *testing.T) *Node { + t.Helper() + + p2pKey, err := crypto.NewP2PKey("password") + require.NoError(t, err) + evmKey, err := crypto.NewEVMKey("password", 111) + require.NoError(t, err) + + return &Node{ + Name: "node-1", + Keys: &secrets.NodeKeys{ + P2PKey: p2pKey, + EVM: map[uint64]*crypto.EVMKey{ + 111: evmKey, + 222: &crypto.EVMKey{PublicAddress: evmKey.PublicAddress}, + }, + }, + Addresses: Addresses{ + AdminAddress: "0xadmin", + }, + JobDistributorDetails: &JobDistributorDetails{ + NodeID: "node-id-1", + JDID: "jd-id-1", + }, + Clients: NodeClients{}, + Roles: Roles{RoleWorker}, + } +} diff --git a/system-tests/lib/cre/environment/environment.go b/system-tests/lib/cre/environment/environment.go index f3a1ccaa2a8..1dda74a8c1a 100644 --- a/system-tests/lib/cre/environment/environment.go +++ b/system-tests/lib/cre/environment/environment.go @@ -222,6 +222,7 @@ func SetupTestEnvironment( } fmt.Print(libformat.PurpleText("%s", input.StageGen.WrapAndNext("Applied Features in %.2f seconds", input.StageGen.Elapsed().Seconds()))) + fmt.Print(libformat.PurpleText("%s", input.StageGen.Wrap("Starting Job Distributor and DONs"))) queue := worker.New(ctx, 10) defer queue.StopAndWait() // Ensure cleanup on any exit path diff --git a/system-tests/lib/cre/types.go b/system-tests/lib/cre/types.go index 84a9da7074d..88a3c81537a 100644 --- a/system-tests/lib/cre/types.go +++ b/system-tests/lib/cre/types.go @@ -6,9 +6,11 @@ import ( "maps" "os" "path/filepath" + "runtime" "slices" "strconv" "strings" + "time" "github.com/Masterminds/semver/v3" "github.com/ethereum/go-ethereum/common" @@ -17,6 +19,7 @@ import ( "github.com/pelletier/go-toml/v2" "github.com/pkg/errors" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "github.com/smartcontractkit/chainlink-deployments-framework/datastore" cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" @@ -574,10 +577,16 @@ func NewDonMetadata(c *NodeSet, id uint64, provider infra.Provider, capabilityCo cfgs[i] = cfg } + newNodesStart := time.Now() nodes, err := newNodes(cfgs) if err != nil { return nil, fmt.Errorf("failed to create nodes metadata: %w", err) } + framework.L.Info(). + Str("don", c.Name). + Int("nodes", len(cfgs)). + Float64("duration_s", roundSeconds(time.Since(newNodesStart))). + Msg("Node metadata generation completed") capConfigs, capErr := processCapabilityConfigs(c, capabilityConfigs) if capErr != nil { @@ -1181,13 +1190,26 @@ func NewNodeMetadata(c NodeMetadataConfig) (*NodeMetadata, error) { func newNodes(cfgs []NodeMetadataConfig) ([]*NodeMetadata, error) { nodes := make([]*NodeMetadata, len(cfgs)) + if len(cfgs) == 0 { + return nodes, nil + } - for i := range nodes { - node, err := NewNodeMetadata(cfgs[i]) - if err != nil { - return nil, fmt.Errorf("failed to create node (index: %d): %w", i, err) - } - nodes[i] = node + errGroup := errgroup.Group{} + errGroup.SetLimit(min(len(cfgs), runtime.GOMAXPROCS(0))) + + for i := range cfgs { + errGroup.Go(func() error { + node, err := NewNodeMetadata(cfgs[i]) + if err != nil { + return fmt.Errorf("failed to create node (index: %d): %w", i, err) + } + nodes[i] = node + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err } return nodes, nil @@ -1424,6 +1446,7 @@ type NodeKeyInput struct { } func NewNodeKeys(input NodeKeyInput) (*secrets.NodeKeys, error) { + start := time.Now() out := &secrets.NodeKeys{ EVM: make(map[uint64]*crypto.EVMKey), Solana: make(map[string]*crypto.SolKey), @@ -1467,6 +1490,14 @@ func NewNodeKeys(input NodeKeyInput) (*secrets.NodeKeys, error) { } out.Solana[chainID] = k } + + framework.L.Debug(). + Int("evm_chains", len(input.EVMChainIDs)). + Int("solana_chains", len(input.SolanaChainIDs)). + Bool("imported", input.ImportedSecrets != ""). + Float64("duration_s", roundSeconds(time.Since(start))). + Msg("Node key generation completed") + return out, nil } diff --git a/system-tests/lib/cre/types_test.go b/system-tests/lib/cre/types_test.go new file mode 100644 index 00000000000..ea3e0356577 --- /dev/null +++ b/system-tests/lib/cre/types_test.go @@ -0,0 +1,92 @@ +package cre + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets" +) + +func TestNewNodesPreservesInputOrder(t *testing.T) { + t.Parallel() + + cfgs := []NodeMetadataConfig{ + {Host: "node-0", Roles: []string{WorkerNode}, Index: 0, Keys: NodeKeyInput{Password: "password"}}, + {Host: "node-1", Roles: []string{BootstrapNode}, Index: 1, Keys: NodeKeyInput{Password: "password"}}, + {Host: "node-2", Roles: []string{GatewayNode}, Index: 2, Keys: NodeKeyInput{Password: "password"}}, + } + + nodes, err := newNodes(cfgs) + + require.NoError(t, err) + require.Len(t, nodes, len(cfgs)) + for i, node := range nodes { + require.Equal(t, cfgs[i].Host, node.Host) + require.Equal(t, cfgs[i].Index, node.Index) + require.Equal(t, cfgs[i].Roles, node.Roles) + } +} + +func TestNewNodesImportedSecretsBypassGeneration(t *testing.T) { + t.Parallel() + + importedKeys, err := NewNodeKeys(NodeKeyInput{ + EVMChainIDs: []uint64{111}, + Password: "password", + }) + require.NoError(t, err) + + importedSecrets, err := importedKeys.ToNodeSecretsTOML() + require.NoError(t, err) + + nodes, err := newNodes([]NodeMetadataConfig{{ + Host: "node-imported", + Roles: []string{WorkerNode}, + Index: 0, + Keys: NodeKeyInput{ + ImportedSecrets: importedSecrets, + }, + }}) + + require.NoError(t, err) + require.Len(t, nodes, 1) + require.Equal(t, importedKeys.PeerID(), nodes[0].Keys.PeerID()) + require.Equal(t, importedKeys.EVM[111].PublicAddress, nodes[0].Keys.EVM[111].PublicAddress) +} + +func TestNewNodesReturnsFailingIndex(t *testing.T) { + t.Parallel() + + importedKeys, err := NewNodeKeys(NodeKeyInput{Password: "password"}) + require.NoError(t, err) + + importedSecrets, err := importedKeys.ToNodeSecretsTOML() + require.NoError(t, err) + + _, err = newNodes([]NodeMetadataConfig{ + { + Host: "node-0", + Roles: []string{WorkerNode}, + Index: 0, + Keys: NodeKeyInput{ImportedSecrets: importedSecrets}, + }, + { + Host: "node-1", + Roles: []string{WorkerNode}, + Index: 1, + Keys: NodeKeyInput{ImportedSecrets: "not valid toml"}, + }, + { + Host: "node-2", + Roles: []string{WorkerNode}, + Index: 2, + Keys: NodeKeyInput{ImportedSecrets: importedSecrets}, + }, + }) + + require.Error(t, err) + require.Contains(t, err.Error(), "index: 1") +} + +var _ = secrets.NodeKeys{} From 274c28bbc246ab11fce68d42030c8f8d1ab8b399 Mon Sep 17 00:00:00 2001 From: ilija Date: Fri, 27 Mar 2026 11:43:59 +0100 Subject: [PATCH 2/2] Fix local registry race --- core/capabilities/launcher.go | 10 ++++-- .../standard_capabilities.go | 35 ++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 968d92172e2..17e852cb944 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -300,8 +300,14 @@ func (w *launcher) donPairsToUpdate(myID ragetypes.PeerID, localRegistry *regist } func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error { - w.lggr.Debug("CapabilitiesLauncher triggered...") - w.registry.SetLocalRegistry(localRegistry) + // Do not set an empty local registry: capability init (e.g. EVM) calls LocalNode() and fails with + // "empty local registry. no DONs registered". Only set once we have at least one DON so that + // capabilities that depend on the registry see valid data (or keep waiting until syncer pushes non-empty). + if len(localRegistry.IDsToDONs) > 0 { + w.registry.SetLocalRegistry(localRegistry) + } else { + w.lggr.Debugw("CapabilitiesLauncher skipping SetLocalRegistry (empty registry, waiting for first sync with DONs)") + } allDONIDs := w.allDONs(localRegistry) w.lggr.Debugw("All DONs in the local registry", "allDONIDs", allDONIDs) diff --git a/core/services/standardcapabilities/standard_capabilities.go b/core/services/standardcapabilities/standard_capabilities.go index e3e30669cdc..e0122617281 100644 --- a/core/services/standardcapabilities/standard_capabilities.go +++ b/core/services/standardcapabilities/standard_capabilities.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -118,7 +119,9 @@ func (s *StandardCapabilities) Start(ctx context.Context) error { CRESettings: s.creSettings, TriggerEventStore: s.triggerEventStore, } - if err = s.capabilitiesLoop.Service.Initialise(cctx, dependencies); err != nil { + + s.log.Infow("StandardCapabilities calling Initialise on capability service", "command", s.command) + if err = s.retryInitialiseUntilReady(cctx, dependencies); err != nil { s.log.Errorf("error initialising standard capabilities service: %v", err) return } @@ -136,6 +139,36 @@ func (s *StandardCapabilities) Start(ctx context.Context) error { }) } +// retryInitialiseUntilReady calls Initialise and retries on "empty local registry" or +// "metadataRegistry information not available" so that capability init runs after the +// registry syncer has pushed at least one non-empty local registry (startup race fix). +const initRetryTimeout = 90 * time.Second +const initRetryInterval = 3 * time.Second + +func (s *StandardCapabilities) retryInitialiseUntilReady(ctx context.Context, dependencies core.StandardCapabilitiesDependencies) error { + deadline := time.Now().Add(initRetryTimeout) + var lastErr error + for attempt := 0; time.Now().Before(deadline); attempt++ { + lastErr = s.capabilitiesLoop.Service.Initialise(ctx, dependencies) + if lastErr == nil { + return nil + } + msg := lastErr.Error() + if !strings.Contains(msg, "empty local registry") && !strings.Contains(msg, "metadataRegistry information not available") { + return lastErr + } + if attempt > 0 { + s.log.Infow("StandardCapabilities Initialise retry (waiting for registry sync)", "command", s.command, "attempt", attempt+1, "err", lastErr) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(initRetryInterval): + } + } + return fmt.Errorf("initialise still failing after %v (registry never became ready): %w", initRetryTimeout, lastErr) +} + // Ready is a non-blocking check for the service's ready state. Errors if not // ready when called. func (s *StandardCapabilities) Ready() error {