From da13c8291033db4c2020d862d9124088798744ad Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Mon, 30 Mar 2026 12:51:19 +0200 Subject: [PATCH 1/5] parallelize secrets generation, JD linking and job proposal/approvals --- system-tests/lib/cre/contracts/keystone.go | 56 +++-- .../lib/cre/contracts/registry_pickup_wait.go | 205 ++++++++++++++++++ system-tests/lib/cre/don.go | 169 ++++++++++----- system-tests/lib/cre/don/jobs/jobs.go | 85 ++++++-- system-tests/lib/cre/don/jobs/jobs_test.go | 151 +++++++++++++ system-tests/lib/cre/don_test.go | 185 ++++++++++++++++ .../lib/cre/environment/environment.go | 4 +- .../cre/features/consensus/v2/consensus.go | 3 +- .../lib/cre/features/don_time/don_time.go | 2 + system-tests/lib/cre/features/evm/v2/evm.go | 75 ++++++- .../lib/cre/features/jobhelpers/helpers.go | 35 +++ .../cre/features/jobhelpers/helpers_test.go | 20 ++ .../log_event_trigger/log_event_trigger.go | 138 +++++++----- .../features/read_contract/read_contract.go | 138 +++++++----- system-tests/lib/cre/features/sets/sets.go | 6 +- .../lib/cre/features/solana/v2/solana.go | 36 +-- system-tests/lib/cre/types.go | 44 +++- system-tests/lib/cre/types_test.go | 92 ++++++++ 18 files changed, 1202 insertions(+), 242 deletions(-) create mode 100644 system-tests/lib/cre/contracts/registry_pickup_wait.go create mode 100644 system-tests/lib/cre/don/jobs/jobs_test.go create mode 100644 system-tests/lib/cre/don_test.go create mode 100644 system-tests/lib/cre/features/jobhelpers/helpers.go create mode 100644 system-tests/lib/cre/features/jobhelpers/helpers_test.go create mode 100644 system-tests/lib/cre/types_test.go diff --git a/system-tests/lib/cre/contracts/keystone.go b/system-tests/lib/cre/contracts/keystone.go index 97fce670aad..482d9a4a957 100644 --- a/system-tests/lib/cre/contracts/keystone.go +++ b/system-tests/lib/cre/contracts/keystone.go @@ -436,7 +436,7 @@ func toDons(input cre.ConfigureCapabilityRegistryInput) (*dons, error) { return dons, nil } -func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (CapabilitiesRegistry, error) { +func ConfigureCapabilityRegistry(ctx context.Context, input cre.ConfigureCapabilityRegistryInput) (CapabilitiesRegistry, error) { if err := input.Validate(); err != nil { return nil, errors.Wrap(err, "input validation failed") } @@ -445,6 +445,7 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca if dErr != nil { return nil, errors.Wrap(dErr, "failed to map input to dons") } + var capReg CapabilitiesRegistry if !input.WithV2Registries { for _, don := range dons.donsOrderedByID() { for i, cap := range don.Capabilities { @@ -478,7 +479,7 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca return nil, errors.Wrap(seqErr, "failed to configure capabilities registry") } - capReg, cErr := cre_contracts.GetOwnedContractV2[*kcr.CapabilitiesRegistry]( + capRegContract, cErr := cre_contracts.GetOwnedContractV2[*kcr.CapabilitiesRegistry]( input.CldEnv.DataStore.Addresses(), input.CldEnv.BlockChains.EVMChains()[input.ChainSelector], input.CapabilitiesRegistryAddress.Hex(), @@ -486,33 +487,40 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca if cErr != nil { return nil, errors.Wrap(cErr, "failed to get capabilities registry contract") } - return ®istryWrapper{V1: capReg.Contract}, nil - } + capReg = ®istryWrapper{V1: capRegContract.Contract} + } else { + // Transform dons data to V2 sequence input format + v2Input := dons.mustToV2ConfigureInput(input.ChainSelector, input.CapabilitiesRegistryAddress.Hex(), input.CapabilityToOCR3Config, input.ExtraSignerFamilies) + _, seqErr := operations.ExecuteSequence( + input.CldEnv.OperationsBundle, + cap_reg_v2_seq.ConfigureCapabilitiesRegistry, + cap_reg_v2_seq.ConfigureCapabilitiesRegistryDeps{ + Env: input.CldEnv, + }, + v2Input, + ) + if seqErr != nil { + return nil, errors.Wrap(seqErr, "failed to configure capabilities registry") + } - // Transform dons data to V2 sequence input format - v2Input := dons.mustToV2ConfigureInput(input.ChainSelector, input.CapabilitiesRegistryAddress.Hex(), input.CapabilityToOCR3Config, input.ExtraSignerFamilies) - _, seqErr := operations.ExecuteSequence( - input.CldEnv.OperationsBundle, - cap_reg_v2_seq.ConfigureCapabilitiesRegistry, - cap_reg_v2_seq.ConfigureCapabilitiesRegistryDeps{ - Env: input.CldEnv, - }, - v2Input, - ) - if seqErr != nil { - return nil, errors.Wrap(seqErr, "failed to configure capabilities registry") + capRegContract, cErr := cre_contracts.GetOwnedContractV2[*capabilities_registry_v2.CapabilitiesRegistry]( + input.CldEnv.DataStore.Addresses(), + input.CldEnv.BlockChains.EVMChains()[input.ChainSelector], + input.CapabilitiesRegistryAddress.Hex(), + ) + if cErr != nil { + return nil, errors.Wrap(cErr, "failed to get capabilities registry contract") + } + + capReg = ®istryWrapper{V2: capRegContract.Contract} } - capReg, cErr := cre_contracts.GetOwnedContractV2[*capabilities_registry_v2.CapabilitiesRegistry]( - input.CldEnv.DataStore.Addresses(), - input.CldEnv.BlockChains.EVMChains()[input.ChainSelector], - input.CapabilitiesRegistryAddress.Hex(), - ) - if cErr != nil { - return nil, errors.Wrap(cErr, "failed to get capabilities registry contract") + // TODO: remove this once the race condition is fixed (CRE-2684) + if waitErr := waitForWorkflowWorkersCapabilityRegistrySync(ctx, input); waitErr != nil { + return nil, errors.Wrap(waitErr, "failed waiting for workflow nodes to sync capability registry state") } - return ®istryWrapper{V2: capReg.Contract}, nil + return capReg, nil } type DonInfo struct { diff --git a/system-tests/lib/cre/contracts/registry_pickup_wait.go b/system-tests/lib/cre/contracts/registry_pickup_wait.go new file mode 100644 index 00000000000..4c999930ee8 --- /dev/null +++ b/system-tests/lib/cre/contracts/registry_pickup_wait.go @@ -0,0 +1,205 @@ +package contracts + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink-testing-framework/framework/components/postgres" + + "github.com/smartcontractkit/chainlink/system-tests/lib/cre" +) + +const ( + capabilityRegistrySyncPollInterval = 5 * time.Second + capabilityRegistrySyncTimeout = 2 * time.Minute + capabilityRegistrySyncQueryTimeout = 3 * time.Second +) + +type workflowWorkerTarget struct { + donName string + nodeIndex int + dbPort int +} + +type capabilityRegistrySyncState struct { + IDsToDONs map[string]json.RawMessage `json:"IDsToDONs"` + IDsToNodes map[string]json.RawMessage `json:"IDsToNodes"` + IDsToCapabilities map[string]json.RawMessage `json:"IDsToCapabilities"` +} + +const latestCapabilityRegistrySyncStateQuery = ` +SELECT data +FROM registry_syncer_states +ORDER BY id DESC +LIMIT 1 +` + +func waitForWorkflowWorkersCapabilityRegistrySync(ctx context.Context, input cre.ConfigureCapabilityRegistryInput) error { + // Waiting for capability registry sync is not supported in Kubernetes mode. + if input.Provider.IsKubernetes() { + return nil + } + targets, tErr := workflowWorkerTargets(input.Topology, input.NodeSets) + if tErr != nil { + return tErr + } + if len(targets) == 0 { + return nil + } + + timeoutCtx, cancel := context.WithTimeout(ctx, capabilityRegistrySyncTimeout) + defer cancel() + + pending := make(map[string]workflowWorkerTarget, len(targets)) + lastState := make(map[string]string, len(targets)) + for _, target := range targets { + key := registryTargetKey(target) + pending[key] = target + lastState[key] = "awaiting first successful registry snapshot check" + } + + ticker := time.NewTicker(capabilityRegistrySyncPollInterval) + defer ticker.Stop() + + for { + for key, target := range pending { + ready, state := hasCapabilityRegistrySyncOnWorker(timeoutCtx, target.dbPort, target.nodeIndex) + if ready { + delete(pending, key) + delete(lastState, key) + continue + } + lastState[key] = state + } + + if len(pending) == 0 { + return nil + } + + select { + case <-timeoutCtx.Done(): + if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) { + return fmt.Errorf("timed out after %.0f seconds waiting for workflow workers to sync capability registry state: %s", capabilityRegistrySyncTimeout.Seconds(), formatCapabilityRegistrySyncPending(lastState)) + } + return timeoutCtx.Err() + case <-ticker.C: + } + } +} + +func workflowWorkerTargets(topology *cre.Topology, nodeSets []*cre.NodeSet) ([]workflowWorkerTarget, error) { + if topology == nil || topology.DonsMetadata == nil { + return nil, errors.New("topology metadata cannot be nil") + } + + allDons := topology.DonsMetadata.List() + indexByName := make(map[string]int, len(allDons)) + for i, don := range allDons { + indexByName[don.Name] = i + } + + workflowDons, err := topology.DonsMetadata.WorkflowDONs() + if err != nil { + return nil, errors.Wrap(err, "failed to resolve workflow DONs") + } + + targets := make([]workflowWorkerTarget, 0) + for _, workflowDON := range workflowDons { + donIdx, ok := indexByName[workflowDON.Name] + if !ok { + return nil, fmt.Errorf("workflow DON %s not found in topology list", workflowDON.Name) + } + if donIdx >= len(nodeSets) || nodeSets[donIdx] == nil { + return nil, fmt.Errorf("nodeset for workflow DON %s is missing", workflowDON.Name) + } + + dbPort := nodeSets[donIdx].DbInput.Port + if dbPort == 0 { + defaultPort, dErr := strconv.Atoi(postgres.Port) + if dErr != nil { + return nil, errors.Wrap(dErr, "failed to convert postgres port to int") + } + dbPort = defaultPort + } + + workers, wErr := workflowDON.Workers() + if wErr != nil { + return nil, errors.Wrapf(wErr, "failed to resolve workers for workflow DON %s", workflowDON.Name) + } + + for _, worker := range workers { + targets = append(targets, workflowWorkerTarget{ + donName: workflowDON.Name, + nodeIndex: worker.Index, + dbPort: dbPort, + }) + } + } + + return targets, nil +} + +func hasCapabilityRegistrySyncOnWorker(ctx context.Context, dbPort, nodeIndex int) (bool, string) { + dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable connect_timeout=3", "127.0.0.1", dbPort, postgres.User, postgres.Password, fmt.Sprintf("db_%d", nodeIndex)) + db, err := sqlx.Open("postgres", dsn) + if err != nil { + return false, fmt.Sprintf("failed to open db connection: %v", err) + } + defer db.Close() + + queryCtx, cancel := context.WithTimeout(ctx, capabilityRegistrySyncQueryTimeout) + defer cancel() + + var rawData []byte + if err = db.GetContext(queryCtx, &rawData, latestCapabilityRegistrySyncStateQuery); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, "registry_syncer_states is empty" + } + return false, fmt.Sprintf("failed to query latest registry syncer state: %v", err) + } + if len(rawData) == 0 { + return false, "latest registry_syncer_states row has empty data payload" + } + + var state capabilityRegistrySyncState + if err = json.Unmarshal(rawData, &state); err != nil { + return false, fmt.Sprintf("failed to unmarshal latest registry syncer state payload: %v", err) + } + + hasDONs := len(state.IDsToDONs) > 0 + hasNodes := len(state.IDsToNodes) > 0 + hasCapabilities := len(state.IDsToCapabilities) > 0 + if !hasDONs || !hasCapabilities || !hasNodes { + return false, fmt.Sprintf("incomplete registry snapshot (has_dons=%t has_nodes=%t has_capabilities=%t)", hasDONs, hasNodes, hasCapabilities) + } + + return true, "" +} + +func registryTargetKey(target workflowWorkerTarget) string { + return fmt.Sprintf("%s/%d", target.donName, target.nodeIndex) +} + +func formatCapabilityRegistrySyncPending(lastState map[string]string) string { + parts := make([]string, 0, len(lastState)) + keys := make([]string, 0, len(lastState)) + for target := range lastState { + keys = append(keys, target) + } + sort.Strings(keys) + + for _, target := range keys { + reason := lastState[target] + parts = append(parts, fmt.Sprintf("%s (%s)", target, reason)) + } + return strings.Join(parts, "; ") +} diff --git a/system-tests/lib/cre/don.go b/system-tests/lib/cre/don.go index 9f0c6c7073d..97bebf02bc8 100644 --- a/system-tests/lib/cre/don.go +++ b/system-tests/lib/cre/don.go @@ -3,11 +3,11 @@ package cre import ( "context" "fmt" + "math" "net/url" "slices" "strconv" "strings" - "sync" "time" "github.com/pkg/errors" @@ -22,6 +22,7 @@ import ( keystone_changeset "github.com/smartcontractkit/chainlink/deployment/keystone/changeset" ks_contracts_op "github.com/smartcontractkit/chainlink/deployment/keystone/changeset/operations/contracts" + "github.com/smartcontractkit/chainlink-testing-framework/framework" "github.com/smartcontractkit/chainlink-testing-framework/framework/clclient" "github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain" "github.com/smartcontractkit/chainlink-testing-framework/framework/components/clnode" @@ -30,6 +31,7 @@ import ( chainselectors "github.com/smartcontractkit/chain-selectors" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains" @@ -227,7 +229,7 @@ func (d *Don) GetName() string { func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Output) (*Don, error) { don := &Don{ - Nodes: make([]*Node, 0), + Nodes: make([]*Node, len(donMetadata.NodesMetadata)), Name: donMetadata.Name, ID: donMetadata.ID, Flags: donMetadata.Flags, @@ -236,8 +238,6 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou chainCapabilityIndex: donMetadata.ns.chainCapabilityIndex, } - mu := &sync.Mutex{} - errgroup := errgroup.Group{} for idx, nodeMetadata := range donMetadata.NodesMetadata { errgroup.Go(func() error { @@ -246,10 +246,7 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou return fmt.Errorf("failed to create node %d: %w", idx, err) } node.DON = don - - mu.Lock() - don.Nodes = append(don.Nodes, node) - mu.Unlock() + don.Nodes[idx] = node return nil }) @@ -274,8 +271,6 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou } func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.Blockchain, cldfEnv *cldf.Environment) error { - mu := &sync.Mutex{} - jd, ok := cldfEnv.Offchain.(*jd.JobDistributor) if !ok { return fmt.Errorf("offchain environment is not a *.jd.JobDistributor, but %T", cldfEnv.Offchain) @@ -293,9 +288,15 @@ func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.B for _, role := range node.Roles { switch role { case RoleWorker, RoleBootstrap: + chainConfigStart := time.Now() if err := createJDChainConfigs(ctx, node, supportedChains, jd); err != nil { return fmt.Errorf("failed to create supported chains in node %s: %w", node.Name, err) } + framework.L.Info(). + Str("don", d.Name). + Str("node", node.Name). + Float64("duration_s", roundSeconds(time.Since(chainConfigStart))). + Msg("JD chain-config setup completed") case RoleGateway: // no chains configuration needed for gateway nodes default: @@ -303,9 +304,7 @@ func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.B } } - mu.Lock() d.Nodes[idx] = node - mu.Unlock() return nil }) @@ -451,7 +450,19 @@ type JDChainConfigInput struct { ChainType string } -func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd *jd.JobDistributor) error { +type nodeChainConfigLister interface { + ListNodeChainConfigs(context.Context, *nodev1.ListNodeChainConfigsRequest, ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) +} + +var ( + jdChainConfigPollInterval = 200 * time.Millisecond + jdChainConfigPollTimeout = 10 * time.Second + jdChainConfigRPCTimeout = 3 * time.Second +) + +func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd nodeChainConfigLister) error { + ocr2BundleIDsByType := make(map[string]string) + for _, chain := range supportedChains { var account string chainIDStr := strconv.FormatUint(chain.ChainID(), 10) @@ -507,41 +518,35 @@ func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockc if chain.IsFamily(blockchain.FamilyTron) { chainType = strings.ToUpper(blockchain.FamilyEVM) } - ocr2BundleID, createErr := n.Clients.GQLClient.FetchOCR2KeyBundleID(ctx, chainType) - if createErr != nil { - return fmt.Errorf("failed to fetch OCR2 key bundle id for node %s: %w", n.Name, createErr) - } - if ocr2BundleID == "" { - return fmt.Errorf("no OCR2 key bundle id found for node %s", n.Name) - } + ocr2BundleID, ok := ocr2BundleIDsByType[chainType] + if !ok { + fetchErr := error(nil) + ocr2BundleID, fetchErr = n.Clients.GQLClient.FetchOCR2KeyBundleID(ctx, chainType) + if fetchErr != nil { + return fmt.Errorf("failed to fetch OCR2 key bundle id for node %s: %w", n.Name, fetchErr) + } + if ocr2BundleID == "" { + return fmt.Errorf("no OCR2 key bundle id found for node %s", n.Name) + } + ocr2BundleIDsByType[chainType] = ocr2BundleID + } if n.Keys.OCR2BundleIDs == nil { n.Keys.OCR2BundleIDs = make(map[string]string) } - n.Keys.OCR2BundleIDs[strings.ToLower(chainType)] = ocr2BundleID - // retry twice with 5 seconds interval to create JobDistributorChainConfig - retryErr := retry.Do(ctx, retry.WithMaxDuration(10*time.Second, retry.NewConstant(3*time.Second)), func(ctx context.Context) error { - // check the node chain config to see if this chain already exists - nodeChainConfigs, listErr := jd.ListNodeChainConfigs(context.Background(), &nodev1.ListNodeChainConfigsRequest{ - Filter: &nodev1.ListNodeChainConfigsRequest_Filter{ - NodeIds: []string{n.JobDistributorDetails.NodeID}, - }}) - if listErr != nil { - return retry.RetryableError(fmt.Errorf("failed to list node chain configs for node %s, retrying..: %w", n.Name, listErr)) + // Retry create+observe to preserve the original JD behavior. + retryErr := retry.Do(ctx, retry.WithMaxDuration(jdChainConfigPollTimeout, retry.NewConstant(3*time.Second)), func(ctx context.Context) error { + nodeChainConfigIDs, err := listNodeChainConfigIDs(ctx, jd, n.JobDistributorDetails.NodeID) + if err != nil { + return retry.RetryableError(fmt.Errorf("failed to list node chain configs for node %s: %w", n.Name, err)) } - if nodeChainConfigs != nil { - for _, chainConfig := range nodeChainConfigs.ChainConfigs { - if chainConfig.Chain.Id == chainIDStr { - return nil - } - } + if _, exists := nodeChainConfigIDs[chainIDStr]; exists { + return nil } - // we need to create JD chain config for each chain, because later on changestes ask the node for that chain data - // each node needs to have OCR2 enabled, because p2pIDs are used by some contracts to identify nodes (e.g. capability registry) - _, createErr = n.Clients.GQLClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{ + _, err = n.Clients.GQLClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{ JobDistributorID: n.JobDistributorDetails.JDID, ChainID: chainIDStr, ChainType: chainType, @@ -554,23 +559,48 @@ func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockc Ocr2KeyBundleID: ocr2BundleID, Ocr2Plugins: `{}`, }) - // TODO: add a check if the chain config failed because of a duplicate in that case, should we update or return success? - if createErr != nil { - return createErr + if err != nil { + return err } - // JD silently fails to update nodeChainConfig. Therefore, we fetch the node config and - // if it's not updated , throw an error return retry.RetryableError(errors.New("retrying CreateChainConfig in JD")) }) - if retryErr != nil { return fmt.Errorf("failed to create JD chain configuration for node %s: %w", n.Name, retryErr) } } + return nil } +func listNodeChainConfigIDs(ctx context.Context, jd nodeChainConfigLister, nodeID string) (map[string]struct{}, error) { + rpcCtx, cancel := context.WithTimeout(ctx, jdChainConfigRPCTimeout) + defer cancel() + + resp, err := jd.ListNodeChainConfigs(rpcCtx, &nodev1.ListNodeChainConfigsRequest{ + Filter: &nodev1.ListNodeChainConfigsRequest_Filter{ + NodeIds: []string{nodeID}, + }, + }) + if err != nil { + return nil, err + } + + chainIDs := make(map[string]struct{}) + if resp == nil { + return chainIDs, nil + } + + for _, chainConfig := range resp.ChainConfigs { + if chainConfig.GetChain() == nil { + continue + } + chainIDs[chainConfig.Chain.Id] = struct{}{} + } + + return chainIDs, nil +} + // AcceptJob accepts the job proposal for the given job proposal spec func (n *Node) AcceptJob(ctx context.Context, spec string) error { // fetch JD to get the job proposals @@ -773,21 +803,46 @@ func LinkToJobDistributor(ctx context.Context, input *LinkDonsToJDInput) error { return errors.New("input is nil") } - var nodeIDs []string + start := time.Now() + dons := input.Dons.List() + donMetadata := input.Topology.DonsMetadata.List() + nodeIDsByDON := make([][]string, len(dons)) - for idx, don := range input.Dons.List() { - supportedChains, schErr := findDonSupportedChains(input.Topology.DonsMetadata.List()[idx], input.Blockchains) - if schErr != nil { - return errors.Wrap(schErr, "failed to find supported chains for DON") - } + errGroup, groupCtx := errgroup.WithContext(ctx) + for idx, don := range dons { + errGroup.Go(func() error { + donStart := time.Now() + supportedChains, schErr := findDonSupportedChains(donMetadata[idx], input.Blockchains) + if schErr != nil { + return errors.Wrap(schErr, "failed to find supported chains for DON") + } - if err := registerWithJD(ctx, don, supportedChains, input.CldfEnvironment); err != nil { - return fmt.Errorf("failed to register DON with JD: %w", err) - } - nodeIDs = append(nodeIDs, don.JDNodeIDs()...) + if err := registerWithJD(groupCtx, don, supportedChains, input.CldfEnvironment); err != nil { + return fmt.Errorf("failed to register DON %s with JD: %w", don.Name, err) + } + + nodeIDsByDON[idx] = don.JDNodeIDs() + framework.L.Info(). + Str("don", don.Name). + Float64("duration_s", roundSeconds(time.Since(donStart))). + Msg("JD registration completed for DON") + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return err + } + + nodeIDs := make([]string, 0) + for _, donNodeIDs := range nodeIDsByDON { + nodeIDs = append(nodeIDs, donNodeIDs...) } input.CldfEnvironment.NodeIDs = nodeIDs + framework.L.Info(). + Float64("duration_s", roundSeconds(time.Since(start))). + Msg("Post-start JD linking completed") return nil } @@ -824,6 +879,10 @@ func findDonSupportedChains(donMetadata *DonMetadata, bcs []blockchains.Blockcha return chains, nil } +func roundSeconds(d time.Duration) float64 { + return math.Round(d.Seconds()*10) / 10 +} + // Make DonMetadata also implement it, just in case? type KeystoneDON interface { KeystoneDONConfig() ks_contracts_op.ConfigureKeystoneDON diff --git a/system-tests/lib/cre/don/jobs/jobs.go b/system-tests/lib/cre/don/jobs/jobs.go index 8231de98711..32ced5d1d81 100644 --- a/system-tests/lib/cre/don/jobs/jobs.go +++ b/system-tests/lib/cre/don/jobs/jobs.go @@ -15,24 +15,73 @@ import ( "github.com/smartcontractkit/chainlink/system-tests/lib/cre" ) -func Approve(ctx context.Context, offChainClient cldf_offchain.Client, dons *cre.Dons, nodeToSpecs map[string][]string) error { +var loadNodeProposalIDs = func(ctx context.Context, node *cre.Node) (map[string]string, error) { + jd, err := node.Clients.GQLClient.GetJobDistributor(ctx, node.JobDistributorDetails.JDID) + if err != nil { + return nil, err + } + if jd.GetJobProposals() == nil { + return nil, fmt.Errorf("no job proposals found for node %s", node.Name) + } + + proposalIDsBySpec := make(map[string]string, len(jd.JobProposals)) + for _, proposal := range jd.JobProposals { + proposalIDsBySpec[proposal.LatestSpec.Definition] = proposal.Id + } + + return proposalIDsBySpec, nil +} + +var approveJobProposalSpec = func(ctx context.Context, node *cre.Node, proposalID string) error { + approvedSpec, err := node.Clients.GQLClient.ApproveJobProposalSpec(ctx, proposalID, false) + if err != nil { + return err + } + if approvedSpec == nil { + return fmt.Errorf("no job proposal spec found for job id %s", proposalID) + } + + return nil +} + +func Approve(ctx context.Context, _ cldf_offchain.Client, dons *cre.Dons, nodeToSpecs map[string][]string) error { + nodeByID := make(map[string]*cre.Node) + for _, don := range dons.List() { + for _, node := range don.Nodes { + nodeByID[node.JobDistributorDetails.NodeID] = node + } + } + + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(4) + for nodeID, jobSpecs := range nodeToSpecs { - for _, don := range dons.List() { - for _, node := range don.Nodes { - if node.JobDistributorDetails.NodeID != nodeID { - continue - } + node, ok := nodeByID[nodeID] + if !ok { + return fmt.Errorf("node with id %s not found", nodeID) + } - for _, jobSpec := range jobSpecs { - if err := accept(ctx, node, jobSpec); err != nil { - return err - } + eg.Go(func() error { + proposalIDsBySpec, err := loadNodeProposalIDs(egCtx, node) + if err != nil { + return err + } + + for _, jobSpec := range jobSpecs { + proposalID, ok := proposalIDsBySpec[jobSpec] + if !ok { + return fmt.Errorf("no job proposal found for job spec %s", jobSpec) + } + if err := accept(egCtx, node, proposalID, jobSpec); err != nil { + return err } } - } + + return nil + }) } - return nil + return eg.Wait() } func Create(ctx context.Context, offChainClient cldf_offchain.Client, dons *cre.Dons, jobSpecs cre.DonJobs) error { @@ -62,7 +111,7 @@ func Create(ctx context.Context, offChainClient cldf_offchain.Client, dons *cre. continue } - if err := accept(ctx, node, jobReq.Spec); err != nil { + if err := accept(ctx, node, "", jobReq.Spec); err != nil { return err } } @@ -83,8 +132,14 @@ func Create(ctx context.Context, offChainClient cldf_offchain.Client, dons *cre. return nil } -func accept(ctx context.Context, node *cre.Node, jobSpec string) error { - if err := node.AcceptJob(ctx, jobSpec); err != nil { +func accept(ctx context.Context, node *cre.Node, proposalID, jobSpec string) error { + var err error + if proposalID == "" { + err = node.AcceptJob(ctx, jobSpec) + } else { + err = approveJobProposalSpec(ctx, node, proposalID) + } + if err != nil { // Workflow specs get auto approved if strings.Contains(err.Error(), "cannot approve an approved spec") && strings.Contains(jobSpec, `type = "workflow"`) { return nil diff --git a/system-tests/lib/cre/don/jobs/jobs_test.go b/system-tests/lib/cre/don/jobs/jobs_test.go new file mode 100644 index 00000000000..00c17e880d2 --- /dev/null +++ b/system-tests/lib/cre/don/jobs/jobs_test.go @@ -0,0 +1,151 @@ +package jobs + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/system-tests/lib/cre" +) + +func TestApproveFetchesProposalsOncePerNode(t *testing.T) { + restoreLoad := loadNodeProposalIDs + restoreApprove := approveJobProposalSpec + t.Cleanup(func() { + loadNodeProposalIDs = restoreLoad + approveJobProposalSpec = restoreApprove + }) + + nodeA := &cre.Node{Name: "node-a", JobDistributorDetails: &cre.JobDistributorDetails{NodeID: "node-a"}} + nodeB := &cre.Node{Name: "node-b", JobDistributorDetails: &cre.JobDistributorDetails{NodeID: "node-b"}} + dons := &cre.Dons{Dons: []*cre.Don{{Nodes: []*cre.Node{nodeA, nodeB}}}} + + var mu sync.Mutex + fetches := map[string]int{} + loadNodeProposalIDs = func(_ context.Context, node *cre.Node) (map[string]string, error) { + mu.Lock() + fetches[node.JobDistributorDetails.NodeID]++ + mu.Unlock() + if node.JobDistributorDetails.NodeID == "node-a" { + return map[string]string{ + "spec-a-1": "proposal-a-1", + "spec-a-2": "proposal-a-2", + }, nil + } + return map[string]string{ + "spec-b-1": "proposal-b-1", + }, nil + } + approveJobProposalSpec = func(_ context.Context, _ *cre.Node, _ string) error { return nil } + + err := Approve(context.Background(), nil, dons, map[string][]string{ + "node-a": {"spec-a-1", "spec-a-2"}, + "node-b": {"spec-b-1"}, + }) + require.NoError(t, err) + require.Equal(t, map[string]int{"node-a": 1, "node-b": 1}, fetches) +} + +func TestApproveRunsAcrossNodesConcurrentlyAndWithinNodeSequentially(t *testing.T) { + restoreLoad := loadNodeProposalIDs + restoreApprove := approveJobProposalSpec + t.Cleanup(func() { + loadNodeProposalIDs = restoreLoad + approveJobProposalSpec = restoreApprove + }) + + nodeA := &cre.Node{Name: "node-a", JobDistributorDetails: &cre.JobDistributorDetails{NodeID: "node-a"}} + nodeB := &cre.Node{Name: "node-b", JobDistributorDetails: &cre.JobDistributorDetails{NodeID: "node-b"}} + dons := &cre.Dons{Dons: []*cre.Don{{Nodes: []*cre.Node{nodeA, nodeB}}}} + + loadNodeProposalIDs = func(_ context.Context, node *cre.Node) (map[string]string, error) { + return map[string]string{ + node.JobDistributorDetails.NodeID + "-1": node.JobDistributorDetails.NodeID + "-proposal-1", + node.JobDistributorDetails.NodeID + "-2": node.JobDistributorDetails.NodeID + "-proposal-2", + }, nil + } + + var mu sync.Mutex + activeGlobal := 0 + maxGlobal := 0 + activePerNode := map[string]int{} + maxPerNode := map[string]int{} + approveJobProposalSpec = func(_ context.Context, node *cre.Node, _ string) error { + nodeID := node.JobDistributorDetails.NodeID + + mu.Lock() + activeGlobal++ + activePerNode[nodeID]++ + if activeGlobal > maxGlobal { + maxGlobal = activeGlobal + } + if activePerNode[nodeID] > maxPerNode[nodeID] { + maxPerNode[nodeID] = activePerNode[nodeID] + } + mu.Unlock() + + time.Sleep(25 * time.Millisecond) + + mu.Lock() + activeGlobal-- + activePerNode[nodeID]-- + mu.Unlock() + return nil + } + + err := Approve(context.Background(), nil, dons, map[string][]string{ + "node-a": {"node-a-1", "node-a-2"}, + "node-b": {"node-b-1", "node-b-2"}, + }) + require.NoError(t, err) + require.GreaterOrEqual(t, maxGlobal, 2) + require.Equal(t, 1, maxPerNode["node-a"]) + require.Equal(t, 1, maxPerNode["node-b"]) +} + +func TestApproveMissingNodeID(t *testing.T) { + err := Approve(context.Background(), nil, &cre.Dons{}, map[string][]string{ + "missing-node": {"spec"}, + }) + require.ErrorContains(t, err, "node with id missing-node not found") +} + +func TestApproveMissingProposalMatch(t *testing.T) { + restoreLoad := loadNodeProposalIDs + restoreApprove := approveJobProposalSpec + t.Cleanup(func() { + loadNodeProposalIDs = restoreLoad + approveJobProposalSpec = restoreApprove + }) + + node := &cre.Node{Name: "node-a", JobDistributorDetails: &cre.JobDistributorDetails{NodeID: "node-a"}} + dons := &cre.Dons{Dons: []*cre.Don{{Nodes: []*cre.Node{node}}}} + + loadNodeProposalIDs = func(_ context.Context, _ *cre.Node) (map[string]string, error) { + return map[string]string{}, nil + } + approveJobProposalSpec = func(_ context.Context, _ *cre.Node, _ string) error { return nil } + + err := Approve(context.Background(), nil, dons, map[string][]string{ + "node-a": {"missing-spec"}, + }) + require.ErrorContains(t, err, "no job proposal found for job spec missing-spec") +} + +func TestAcceptTreatsApprovedWorkflowSpecAsSuccess(t *testing.T) { + restoreApprove := approveJobProposalSpec + t.Cleanup(func() { + approveJobProposalSpec = restoreApprove + }) + + approveJobProposalSpec = func(_ context.Context, _ *cre.Node, _ string) error { + return errors.New("cannot approve an approved spec") + } + + err := accept(context.Background(), &cre.Node{Name: "node-a"}, "proposal-id", `type = "workflow"`) + require.NoError(t, err) +} diff --git a/system-tests/lib/cre/don_test.go b/system-tests/lib/cre/don_test.go new file mode 100644 index 00000000000..dfde624ec68 --- /dev/null +++ b/system-tests/lib/cre/don_test.go @@ -0,0 +1,185 @@ +package cre + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + cldf_chain "github.com/smartcontractkit/chainlink-deployments-framework/chain" + nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node" + "github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain" + webclient "github.com/smartcontractkit/chainlink/deployment/environment/web/sdk/client" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains" + "github.com/smartcontractkit/chainlink/system-tests/lib/crypto" +) + +type fakeGQLClient struct { + webclient.Client + + fetchOCR2KeyBundleIDFn func(context.Context, string) (string, error) + createJobDistributorChainConfigFn func(context.Context, webclient.JobDistributorChainConfigInput) (string, error) + + fetchOCR2KeyBundleIDCalls []string + createJobDistributorChainConfigCalls []webclient.JobDistributorChainConfigInput +} + +func (f *fakeGQLClient) FetchOCR2KeyBundleID(ctx context.Context, chainType string) (string, error) { + f.fetchOCR2KeyBundleIDCalls = append(f.fetchOCR2KeyBundleIDCalls, chainType) + if f.fetchOCR2KeyBundleIDFn != nil { + return f.fetchOCR2KeyBundleIDFn(ctx, chainType) + } + return "bundle-id", nil +} + +func (f *fakeGQLClient) CreateJobDistributorChainConfig(ctx context.Context, in webclient.JobDistributorChainConfigInput) (string, error) { + f.createJobDistributorChainConfigCalls = append(f.createJobDistributorChainConfigCalls, in) + if f.createJobDistributorChainConfigFn != nil { + return f.createJobDistributorChainConfigFn(ctx, in) + } + return "config-id", nil +} + +type fakeJDChainConfigLister struct { + chainIDsByNodeID map[string]map[string]struct{} +} + +func (f *fakeJDChainConfigLister) ListNodeChainConfigs(_ context.Context, in *nodev1.ListNodeChainConfigsRequest, _ ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error) { + resp := &nodev1.ListNodeChainConfigsResponse{} + if in.GetFilter() == nil || len(in.GetFilter().GetNodeIds()) == 0 { + return resp, nil + } + + for chainID := range f.chainIDsByNodeID[in.GetFilter().GetNodeIds()[0]] { + resp.ChainConfigs = append(resp.ChainConfigs, &nodev1.ChainConfig{ + Chain: &nodev1.Chain{Id: chainID}, + }) + } + + return resp, nil +} + +type fakeBlockchain struct { + chainID uint64 + chainFamily string +} + +func (f fakeBlockchain) ChainSelector() uint64 { return f.chainID } +func (f fakeBlockchain) ChainID() uint64 { return f.chainID } +func (f fakeBlockchain) ChainFamily() string { return f.chainFamily } +func (f fakeBlockchain) IsFamily(chainFamily string) bool { + return f.chainFamily == chainFamily +} +func (f fakeBlockchain) Fund(context.Context, string, uint64) error { return nil } +func (f fakeBlockchain) CtfOutput() *blockchain.Output { return nil } +func (f fakeBlockchain) ToCldfChain() (cldf_chain.BlockChain, error) { + return nil, nil +} + +var _ blockchains.Blockchain = fakeBlockchain{} + +func TestCreateJDChainConfigsSkipsExistingConfigs(t *testing.T) { + t.Parallel() + + node := mustNewTestNode(t) + gql := &fakeGQLClient{} + node.Clients.GQLClient = gql + jd := &fakeJDChainConfigLister{ + chainIDsByNodeID: map[string]map[string]struct{}{ + node.JobDistributorDetails.NodeID: {"111": {}}, + }, + } + + err := createJDChainConfigs(context.Background(), node, []blockchains.Blockchain{ + fakeBlockchain{chainID: 111, chainFamily: blockchain.FamilyEVM}, + }, jd) + + require.NoError(t, err) + require.Empty(t, gql.createJobDistributorChainConfigCalls) +} + +func TestCreateJDChainConfigsCreatesMissingConfigsAndReusesBundleIDs(t *testing.T) { + t.Parallel() + + node := mustNewTestNode(t) + jd := &fakeJDChainConfigLister{ + chainIDsByNodeID: map[string]map[string]struct{}{ + node.JobDistributorDetails.NodeID: {}, + }, + } + gql := &fakeGQLClient{ + createJobDistributorChainConfigFn: func(_ context.Context, in webclient.JobDistributorChainConfigInput) (string, error) { + jd.chainIDsByNodeID[node.JobDistributorDetails.NodeID][in.ChainID] = struct{}{} + return "created-" + in.ChainID, nil + }, + } + node.Clients.GQLClient = gql + + err := createJDChainConfigs(context.Background(), node, []blockchains.Blockchain{ + fakeBlockchain{chainID: 111, chainFamily: blockchain.FamilyEVM}, + fakeBlockchain{chainID: 222, chainFamily: blockchain.FamilyEVM}, + }, jd) + + require.NoError(t, err) + require.Len(t, gql.createJobDistributorChainConfigCalls, 2) + require.Equal(t, []string{"EVM"}, gql.fetchOCR2KeyBundleIDCalls) + require.Equal(t, "bundle-id", node.Keys.OCR2BundleIDs["evm"]) +} + +func TestCreateJDChainConfigsFailsVerificationOnTimeout(t *testing.T) { + node := mustNewTestNode(t) + jd := &fakeJDChainConfigLister{ + chainIDsByNodeID: map[string]map[string]struct{}{ + node.JobDistributorDetails.NodeID: {}, + }, + } + node.Clients.GQLClient = &fakeGQLClient{} + + originalInterval := jdChainConfigPollInterval + originalTimeout := jdChainConfigPollTimeout + jdChainConfigPollInterval = time.Millisecond + jdChainConfigPollTimeout = 5 * time.Millisecond + defer func() { + jdChainConfigPollInterval = originalInterval + jdChainConfigPollTimeout = originalTimeout + }() + + err := createJDChainConfigs(context.Background(), node, []blockchains.Blockchain{ + fakeBlockchain{chainID: 111, chainFamily: blockchain.FamilyEVM}, + }, jd) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to create JD chain configuration") +} + +func mustNewTestNode(t *testing.T) *Node { + t.Helper() + + p2pKey, err := crypto.NewP2PKey("password") + require.NoError(t, err) + evmKey, err := crypto.NewEVMKey("password", 111) + require.NoError(t, err) + + return &Node{ + Name: "node-1", + Keys: &secrets.NodeKeys{ + P2PKey: p2pKey, + EVM: map[uint64]*crypto.EVMKey{ + 111: evmKey, + 222: &crypto.EVMKey{PublicAddress: evmKey.PublicAddress}, + }, + }, + Addresses: Addresses{ + AdminAddress: "0xadmin", + }, + JobDistributorDetails: &JobDistributorDetails{ + NodeID: "node-id-1", + JDID: "jd-id-1", + }, + Clients: NodeClients{}, + Roles: Roles{RoleWorker}, + } +} diff --git a/system-tests/lib/cre/environment/environment.go b/system-tests/lib/cre/environment/environment.go index f3a1ccaa2a8..add8165f88f 100644 --- a/system-tests/lib/cre/environment/environment.go +++ b/system-tests/lib/cre/environment/environment.go @@ -222,6 +222,7 @@ func SetupTestEnvironment( } fmt.Print(libformat.PurpleText("%s", input.StageGen.WrapAndNext("Applied Features in %.2f seconds", input.StageGen.Elapsed().Seconds()))) + fmt.Print(libformat.PurpleText("%s", input.StageGen.Wrap("Starting Job Distributor and DONs"))) queue := worker.New(ctx, 10) defer queue.StopAndWait() // Ensure cleanup on any exit path @@ -343,6 +344,7 @@ func SetupTestEnvironment( CldEnv: creEnvironment.CldfEnvironment, Blockchains: deployedBlockchains.Outputs, Topology: topology, + Provider: input.Provider, CapabilitiesRegistryAddress: ptr.Ptr(crecontracts.MustGetAddressFromMemoryDataStore( deployKeystoneContractsOutput.MemoryDataStore, deployedBlockchains.RegistryChain().ChainSelector(), @@ -364,7 +366,7 @@ func SetupTestEnvironment( capRegInput.CapabilityRegistryConfigFns = append(capRegInput.CapabilityRegistryConfigFns, input.CapabilitiesContractFactoryFunctions...) maps.Copy(capRegInput.DONCapabilityWithConfigs, donsCapabilities) - capReg, capRegErr := crecontracts.ConfigureCapabilityRegistry(capRegInput) + capReg, capRegErr := crecontracts.ConfigureCapabilityRegistry(ctx, capRegInput) if capRegErr != nil { return nil, pkgerrors.Wrap(capRegErr, "failed to configure Capability Registry contracts") } diff --git a/system-tests/lib/cre/features/consensus/v2/consensus.go b/system-tests/lib/cre/features/consensus/v2/consensus.go index ee1ca7e5e6f..511849c57f0 100644 --- a/system-tests/lib/cre/features/consensus/v2/consensus.go +++ b/system-tests/lib/cre/features/consensus/v2/consensus.go @@ -84,6 +84,7 @@ func (c *Consensus) PostEnvStartup( ) error { jobsErr := createJobs( ctx, + testLogger, don, dons, creEnv, @@ -97,6 +98,7 @@ func (c *Consensus) PostEnvStartup( func createJobs( ctx context.Context, + testLogger zerolog.Logger, don *cre.Don, dons *cre.Dons, creEnv *cre.Environment, @@ -137,7 +139,6 @@ func createJobs( } specs := make(map[string][]string) - // Create node job if nodeSpecs, err := proposeNodeJob(creEnv, don, command, []string{formatBootstrapPeer(bootstrap)}, configStr); err != nil { return err diff --git a/system-tests/lib/cre/features/don_time/don_time.go b/system-tests/lib/cre/features/don_time/don_time.go index 711fee093f5..8efb751ce48 100644 --- a/system-tests/lib/cre/features/don_time/don_time.go +++ b/system-tests/lib/cre/features/don_time/don_time.go @@ -69,6 +69,7 @@ func (o *DONTime) PostEnvStartup( ) error { jobErr := createJobs( ctx, + testLogger, creEnv, don, dons, @@ -82,6 +83,7 @@ func (o *DONTime) PostEnvStartup( func createJobs( ctx context.Context, + testLogger zerolog.Logger, creEnv *cre.Environment, don *cre.Don, dons *cre.Dons, diff --git a/system-tests/lib/cre/features/evm/v2/evm.go b/system-tests/lib/cre/features/evm/v2/evm.go index bfd2602feff..0c680addf69 100644 --- a/system-tests/lib/cre/features/evm/v2/evm.go +++ b/system-tests/lib/cre/features/evm/v2/evm.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "maps" "strconv" "strings" "text/template" @@ -13,6 +14,7 @@ import ( "github.com/Masterminds/semver/v3" "github.com/pkg/errors" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/durationpb" chainselectors "github.com/smartcontractkit/chain-selectors" @@ -38,6 +40,7 @@ import ( "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs/standardcapability" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/features/evm" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/features/jobhelpers" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/flags" ) @@ -199,8 +202,6 @@ func createJobs( dons *cre.Dons, creEnv *cre.Environment, ) error { - specs := make(map[string][]string) - var nodeSet cre.NodeSetWithCapabilityConfigs for _, ns := range dons.AsNodeSetWithChainCapabilities() { if ns.GetName() == don.Name { @@ -232,6 +233,16 @@ func createJobs( return fmt.Errorf("failed to get chain ID from registry chain selector %d: %w", creEnv.RegistryChainSelector, rcErr) } + type proposalWork struct { + chainID uint64 + chainIDStr string + chainSelector uint64 + capabilityConfig cre.CapabilityConfig + command string + workerNode *cre.Node + } + + workItems := make([]proposalWork, 0, len(enabledChainIDs)*len(workerNodes)) for _, chainID := range enabledChainIDs { chainSelector, selErr := chainselectors.SelectorFromChainId(chainID) if selErr != nil { @@ -250,6 +261,27 @@ func createJobs( } for _, workerNode := range workerNodes { + workItems = append(workItems, proposalWork{ + chainID: chainID, + chainIDStr: chainIDStr, + chainSelector: chainSelector, + capabilityConfig: capabilityConfig, + command: command, + workerNode: workerNode, + }) + } + } + + results := make([]map[string][]string, len(workItems)) + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(max(len(workItems), 4)) + + for i, workItem := range workItems { + group.Go(func() error { + chainID := workItem.chainID + chainSelector := workItem.chainSelector + workerNode := workItem.workerNode + evmKey, ok := workerNode.Keys.EVM[chainID] if !ok { return fmt.Errorf("failed to get EVM key (chainID %d, node index %d)", chainID, workerNode.Index) @@ -268,13 +300,13 @@ func createJobs( semver.MustParse("1.0.0"), "", ) - creForwarderAddress, err := creEnv.CldfEnvironment.DataStore.Addresses().Get(creForwarderKey) - if err != nil { - return errors.Wrap(err, "failed to get CRE Forwarder address") + creForwarderAddress, cErr := creEnv.CldfEnvironment.DataStore.Addresses().Get(creForwarderKey) + if cErr != nil { + return errors.Wrap(cErr, "failed to get CRE Forwarder address") } runtimeFallbacks := buildRuntimeValues(chainID, "evm", creForwarderAddress.Address, nodeAddress) - templateData := capabilityConfig.Values + templateData := maps.Clone(workItem.capabilityConfig.Values) var aErr error templateData, aErr = credon.ApplyRuntimeValues(templateData, runtimeFallbacks) @@ -282,9 +314,9 @@ func createJobs( return errors.Wrap(aErr, "failed to apply runtime values") } - tmpl, err := template.New("evmConfig").Parse(configTemplate) - if err != nil { - return errors.Wrapf(err, "failed to parse %s config template", flag) + tmpl, tErr := template.New("evmConfig").Parse(configTemplate) + if tErr != nil { + return errors.Wrapf(tErr, "failed to parse %s config template", flag) } var configBuffer bytes.Buffer @@ -332,11 +364,11 @@ func createJobs( }, Template: job_types.EVM, Inputs: job_types.JobSpecInput{ - "command": command, + "command": workItem.command, "config": configStr, "oracleFactory": cre_jobs_pkg.OracleFactory{ Enabled: true, - ChainID: chainIDStr, + ChainID: workItem.chainIDStr, BootstrapPeers: bootstrapPeers, OCRContractAddress: registryContractAddrRef.Address, OCRKeyBundleID: evmKeyBundle, @@ -361,6 +393,7 @@ func createJobs( return fmt.Errorf("failed to propose EVM v2 worker job spec: %w", workerErr) } + specs := make(map[string][]string) for _, r := range workerReport.Reports { out, ok := r.Output.(cre_jobs_ops.ProposeStandardCapabilityJobOutput) if !ok { @@ -371,7 +404,25 @@ func createJobs( return fmt.Errorf("failed to merge worker job specs: %w", mErr) } } - } + + select { + case <-groupCtx.Done(): + return groupCtx.Err() + default: + } + + results[i] = specs + return nil + }) + } + + if wErr := group.Wait(); wErr != nil { + return wErr + } + + specs, mErr := jobhelpers.MergeSpecsByIndex(results) + if mErr != nil { + return mErr } approveErr := jobs.Approve(ctx, creEnv.CldfEnvironment.Offchain, dons, specs) diff --git a/system-tests/lib/cre/features/jobhelpers/helpers.go b/system-tests/lib/cre/features/jobhelpers/helpers.go new file mode 100644 index 00000000000..d87eaf3ec9e --- /dev/null +++ b/system-tests/lib/cre/features/jobhelpers/helpers.go @@ -0,0 +1,35 @@ +package jobhelpers + +import ( + "fmt" + "runtime" + + "dario.cat/mergo" +) + +func MergeSpecsByIndex(results []map[string][]string) (map[string][]string, error) { + merged := make(map[string][]string) + for i, result := range results { + if result == nil { + continue + } + if err := mergo.Merge(&merged, result, mergo.WithAppendSlice); err != nil { + return nil, fmt.Errorf("failed to merge proposal result %d: %w", i, err) + } + } + + return merged, nil +} + +func Parallelism(workItems int) int { + if workItems <= 1 { + return 1 + } + + limit := runtime.GOMAXPROCS(0) + if workItems < limit { + return workItems + } + + return limit +} diff --git a/system-tests/lib/cre/features/jobhelpers/helpers_test.go b/system-tests/lib/cre/features/jobhelpers/helpers_test.go new file mode 100644 index 00000000000..337592e57b5 --- /dev/null +++ b/system-tests/lib/cre/features/jobhelpers/helpers_test.go @@ -0,0 +1,20 @@ +package jobhelpers + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMergeSpecsByIndexPreservesInputOrder(t *testing.T) { + t.Parallel() + + merged, err := MergeSpecsByIndex([]map[string][]string{ + {"node-a": {"first-a"}, "node-b": {"first-b"}}, + {"node-a": {"second-a"}}, + {"node-b": {"second-b"}}, + }) + require.NoError(t, err) + require.Equal(t, []string{"first-a", "second-a"}, merged["node-a"]) + require.Equal(t, []string{"first-b", "second-b"}, merged["node-b"]) +} diff --git a/system-tests/lib/cre/features/log_event_trigger/log_event_trigger.go b/system-tests/lib/cre/features/log_event_trigger/log_event_trigger.go index eed2d258aab..658f4d7df76 100644 --- a/system-tests/lib/cre/features/log_event_trigger/log_event_trigger.go +++ b/system-tests/lib/cre/features/log_event_trigger/log_event_trigger.go @@ -4,11 +4,13 @@ import ( "bytes" "context" "fmt" + "maps" "text/template" "dario.cat/mergo" "github.com/pkg/errors" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" kcr "github.com/smartcontractkit/chainlink-evm/gethwrappers/keystone/generated/capabilities_registry_1_1_0" @@ -22,6 +24,7 @@ import ( credon "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs/standardcapability" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/features/jobhelpers" ) const flag = cre.LogEventTriggerCapability @@ -81,8 +84,6 @@ func (o *LogEventTrigger) PostEnvStartup( dons *cre.Dons, creEnv *cre.Environment, ) error { - specs := make(map[string][]string) - var nodeSet cre.NodeSetWithCapabilityConfigs for _, ns := range dons.AsNodeSetWithChainCapabilities() { if ns.GetName() == don.Name { @@ -99,71 +100,96 @@ func (o *LogEventTrigger) PostEnvStartup( return fmt.Errorf("could not find enabled chainIDs for '%s' in don '%s': %w", flag, don.Name, err) } - for _, chainID := range enabledChainIDs { - capabilityConfig, resolveErr := cre.ResolveCapabilityConfig(nodeSet, flag, cre.ChainCapabilityScope(chainID)) - if resolveErr != nil { - return fmt.Errorf("could not resolve capability config for '%s' on chain %d: %w", flag, chainID, resolveErr) - } + results := make([]map[string][]string, len(enabledChainIDs)) + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(jobhelpers.Parallelism(len(enabledChainIDs))) - command, cErr := standardcapability.GetCommand(capabilityConfig.BinaryName) - if cErr != nil { - return errors.Wrap(cErr, "failed to get command for Read Contract capability") - } + for i, chainID := range enabledChainIDs { + group.Go(func() error { + capabilityConfig, resolveErr := cre.ResolveCapabilityConfig(nodeSet, flag, cre.ChainCapabilityScope(chainID)) + if resolveErr != nil { + return fmt.Errorf("could not resolve capability config for '%s' on chain %d: %w", flag, chainID, resolveErr) + } - templateData := capabilityConfig.Values - templateData["ChainID"] = chainID + command, cErr := standardcapability.GetCommand(capabilityConfig.BinaryName) + if cErr != nil { + return errors.Wrap(cErr, "failed to get command for Read Contract capability") + } - tmpl, tmplErr := template.New(flag + "-config").Parse(configTemplate) - if tmplErr != nil { - return errors.Wrapf(tmplErr, "failed to parse %s config template", flag) - } + templateData := maps.Clone(capabilityConfig.Values) + templateData["ChainID"] = chainID - var configBuffer bytes.Buffer - if err := tmpl.Execute(&configBuffer, templateData); err != nil { - return errors.Wrapf(err, "failed to execute %s config template", flag) - } - configStr := configBuffer.String() + tmpl, tmplErr := template.New(flag + "-config").Parse(configTemplate) + if tmplErr != nil { + return errors.Wrapf(tmplErr, "failed to parse %s config template", flag) + } - if err := credon.ValidateTemplateSubstitution(configStr, flag); err != nil { - return fmt.Errorf("%s template validation failed: %w\nRendered template: %s", flag, err, configStr) - } + var configBuffer bytes.Buffer + if executeErr := tmpl.Execute(&configBuffer, templateData); executeErr != nil { + return errors.Wrapf(executeErr, "failed to execute %s config template", flag) + } + configStr := configBuffer.String() - workerInput := cre_jobs.ProposeJobSpecInput{ - Domain: offchain.ProductLabel, - Environment: cre.EnvironmentName, - DONName: don.Name, - JobName: fmt.Sprintf("log-event-trigger-worker-%d", chainID), - ExtraLabels: map[string]string{cre.CapabilityLabelKey: flag}, - DONFilters: []offchain.TargetDONFilter{ - {Key: offchain.FilterKeyDONName, Value: don.Name}, - }, - Template: job_types.LogEventTrigger, - Inputs: job_types.JobSpecInput{ - "command": command, - "config": configStr, - }, - } + if validateErr := credon.ValidateTemplateSubstitution(configStr, flag); validateErr != nil { + return fmt.Errorf("%s template validation failed: %w\nRendered template: %s", flag, validateErr, configStr) + } - workerVerErr := cre_jobs.ProposeJobSpec{}.VerifyPreconditions(*creEnv.CldfEnvironment, workerInput) - if workerVerErr != nil { - return fmt.Errorf("precondition verification failed for Log Event Trigger worker job: %w", workerVerErr) - } + workerInput := cre_jobs.ProposeJobSpecInput{ + Domain: offchain.ProductLabel, + Environment: cre.EnvironmentName, + DONName: don.Name, + JobName: fmt.Sprintf("log-event-trigger-worker-%d", chainID), + ExtraLabels: map[string]string{cre.CapabilityLabelKey: flag}, + DONFilters: []offchain.TargetDONFilter{ + {Key: offchain.FilterKeyDONName, Value: don.Name}, + }, + Template: job_types.LogEventTrigger, + Inputs: job_types.JobSpecInput{ + "command": command, + "config": configStr, + }, + } - workerReport, workerErr := cre_jobs.ProposeJobSpec{}.Apply(*creEnv.CldfEnvironment, workerInput) - if workerErr != nil { - return fmt.Errorf("failed to propose Log Event Trigger worker job spec: %w", workerErr) - } + workerVerErr := cre_jobs.ProposeJobSpec{}.VerifyPreconditions(*creEnv.CldfEnvironment, workerInput) + if workerVerErr != nil { + return fmt.Errorf("precondition verification failed for Log Event Trigger worker job: %w", workerVerErr) + } - for _, r := range workerReport.Reports { - out, ok := r.Output.(cre_jobs_ops.ProposeStandardCapabilityJobOutput) - if !ok { - return fmt.Errorf("unable to cast to ProposeStandardCapabilityJobOutput, actual type: %T", r.Output) + workerReport, workerErr := cre_jobs.ProposeJobSpec{}.Apply(*creEnv.CldfEnvironment, workerInput) + if workerErr != nil { + return fmt.Errorf("failed to propose Log Event Trigger worker job spec: %w", workerErr) } - mErr := mergo.Merge(&specs, out.Specs, mergo.WithAppendSlice) - if mErr != nil { - return fmt.Errorf("failed to merge worker job specs: %w", mErr) + + specs := make(map[string][]string) + for _, r := range workerReport.Reports { + out, ok := r.Output.(cre_jobs_ops.ProposeStandardCapabilityJobOutput) + if !ok { + return fmt.Errorf("unable to cast to ProposeStandardCapabilityJobOutput, actual type: %T", r.Output) + } + mErr := mergo.Merge(&specs, out.Specs, mergo.WithAppendSlice) + if mErr != nil { + return fmt.Errorf("failed to merge worker job specs: %w", mErr) + } } - } + + select { + case <-groupCtx.Done(): + return groupCtx.Err() + default: + } + + results[i] = specs + return nil + }) + } + + if wErr := group.Wait(); wErr != nil { + return wErr + } + + specs, mErr := jobhelpers.MergeSpecsByIndex(results) + if mErr != nil { + return mErr } approveErr := jobs.Approve(ctx, creEnv.CldfEnvironment.Offchain, dons, specs) diff --git a/system-tests/lib/cre/features/read_contract/read_contract.go b/system-tests/lib/cre/features/read_contract/read_contract.go index 05aed9fa941..f5b198aae53 100644 --- a/system-tests/lib/cre/features/read_contract/read_contract.go +++ b/system-tests/lib/cre/features/read_contract/read_contract.go @@ -4,11 +4,13 @@ import ( "bytes" "context" "fmt" + "maps" "text/template" "dario.cat/mergo" "github.com/pkg/errors" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" kcr "github.com/smartcontractkit/chainlink-evm/gethwrappers/keystone/generated/capabilities_registry_1_1_0" @@ -22,6 +24,7 @@ import ( credon "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs/standardcapability" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/features/jobhelpers" ) const flag = cre.ReadContractCapability @@ -72,8 +75,6 @@ func (o *ReadContract) PostEnvStartup( dons *cre.Dons, creEnv *cre.Environment, ) error { - specs := make(map[string][]string) - var nodeSet cre.NodeSetWithCapabilityConfigs for _, ns := range dons.AsNodeSetWithChainCapabilities() { if ns.GetName() == don.Name { @@ -90,71 +91,96 @@ func (o *ReadContract) PostEnvStartup( return fmt.Errorf("could not find enabled chainIDs for '%s' in don '%s': %w", flag, don.Name, err) } - for _, chainID := range enabledChainIDs { - capabilityConfig, resolveErr := cre.ResolveCapabilityConfig(nodeSet, flag, cre.ChainCapabilityScope(chainID)) - if resolveErr != nil { - return fmt.Errorf("could not resolve capability config for '%s' on chain %d: %w", flag, chainID, resolveErr) - } + results := make([]map[string][]string, len(enabledChainIDs)) + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(jobhelpers.Parallelism(len(enabledChainIDs))) - command, cErr := standardcapability.GetCommand(capabilityConfig.BinaryName) - if cErr != nil { - return errors.Wrap(cErr, "failed to get command for Read Contract capability") - } + for i, chainID := range enabledChainIDs { + group.Go(func() error { + capabilityConfig, resolveErr := cre.ResolveCapabilityConfig(nodeSet, flag, cre.ChainCapabilityScope(chainID)) + if resolveErr != nil { + return fmt.Errorf("could not resolve capability config for '%s' on chain %d: %w", flag, chainID, resolveErr) + } - templateData := capabilityConfig.Values - templateData["ChainID"] = chainID + command, cErr := standardcapability.GetCommand(capabilityConfig.BinaryName) + if cErr != nil { + return errors.Wrap(cErr, "failed to get command for Read Contract capability") + } - tmpl, tmplErr := template.New(flag + "-config").Parse(configTemplate) - if tmplErr != nil { - return errors.Wrapf(tmplErr, "failed to parse %s config template", flag) - } + templateData := maps.Clone(capabilityConfig.Values) + templateData["ChainID"] = chainID - var configBuffer bytes.Buffer - if err := tmpl.Execute(&configBuffer, templateData); err != nil { - return errors.Wrapf(err, "failed to execute %s config template", flag) - } - configStr := configBuffer.String() + tmpl, tmplErr := template.New(flag + "-config").Parse(configTemplate) + if tmplErr != nil { + return errors.Wrapf(tmplErr, "failed to parse %s config template", flag) + } - if err := credon.ValidateTemplateSubstitution(configStr, flag); err != nil { - return fmt.Errorf("%s template validation failed: %w\nRendered template: %s", flag, err, configStr) - } + var configBuffer bytes.Buffer + if executeErr := tmpl.Execute(&configBuffer, templateData); executeErr != nil { + return errors.Wrapf(executeErr, "failed to execute %s config template", flag) + } + configStr := configBuffer.String() - workerInput := cre_jobs.ProposeJobSpecInput{ - Domain: offchain.ProductLabel, - Environment: cre.EnvironmentName, - DONName: don.Name, - JobName: fmt.Sprintf("read-contract-worker-%d", chainID), - ExtraLabels: map[string]string{cre.CapabilityLabelKey: flag}, - DONFilters: []offchain.TargetDONFilter{ - {Key: offchain.FilterKeyDONName, Value: don.Name}, - }, - Template: job_types.ReadContract, - Inputs: job_types.JobSpecInput{ - "command": command, - "config": configStr, - }, - } + if validateErr := credon.ValidateTemplateSubstitution(configStr, flag); validateErr != nil { + return fmt.Errorf("%s template validation failed: %w\nRendered template: %s", flag, validateErr, configStr) + } - workerVerErr := cre_jobs.ProposeJobSpec{}.VerifyPreconditions(*creEnv.CldfEnvironment, workerInput) - if workerVerErr != nil { - return fmt.Errorf("precondition verification failed for Read Contract worker job: %w", workerVerErr) - } + workerInput := cre_jobs.ProposeJobSpecInput{ + Domain: offchain.ProductLabel, + Environment: cre.EnvironmentName, + DONName: don.Name, + JobName: fmt.Sprintf("read-contract-worker-%d", chainID), + ExtraLabels: map[string]string{cre.CapabilityLabelKey: flag}, + DONFilters: []offchain.TargetDONFilter{ + {Key: offchain.FilterKeyDONName, Value: don.Name}, + }, + Template: job_types.ReadContract, + Inputs: job_types.JobSpecInput{ + "command": command, + "config": configStr, + }, + } - workerReport, workerErr := cre_jobs.ProposeJobSpec{}.Apply(*creEnv.CldfEnvironment, workerInput) - if workerErr != nil { - return fmt.Errorf("failed to propose Read Contract worker job spec: %w", workerErr) - } + workerVerErr := cre_jobs.ProposeJobSpec{}.VerifyPreconditions(*creEnv.CldfEnvironment, workerInput) + if workerVerErr != nil { + return fmt.Errorf("precondition verification failed for Read Contract worker job: %w", workerVerErr) + } - for _, r := range workerReport.Reports { - out, ok := r.Output.(cre_jobs_ops.ProposeStandardCapabilityJobOutput) - if !ok { - return fmt.Errorf("unable to cast to ProposeStandardCapabilityJobOutput, actual type: %T", r.Output) + workerReport, workerErr := cre_jobs.ProposeJobSpec{}.Apply(*creEnv.CldfEnvironment, workerInput) + if workerErr != nil { + return fmt.Errorf("failed to propose Read Contract worker job spec: %w", workerErr) } - mErr := mergo.Merge(&specs, out.Specs, mergo.WithAppendSlice) - if mErr != nil { - return fmt.Errorf("failed to merge worker job specs: %w", mErr) + + specs := make(map[string][]string) + for _, r := range workerReport.Reports { + out, ok := r.Output.(cre_jobs_ops.ProposeStandardCapabilityJobOutput) + if !ok { + return fmt.Errorf("unable to cast to ProposeStandardCapabilityJobOutput, actual type: %T", r.Output) + } + mErr := mergo.Merge(&specs, out.Specs, mergo.WithAppendSlice) + if mErr != nil { + return fmt.Errorf("failed to merge worker job specs: %w", mErr) + } } - } + + select { + case <-groupCtx.Done(): + return groupCtx.Err() + default: + } + + results[i] = specs + return nil + }) + } + + if wErr := group.Wait(); wErr != nil { + return wErr + } + + specs, mErr := jobhelpers.MergeSpecsByIndex(results) + if mErr != nil { + return mErr } approveErr := jobs.Approve(ctx, creEnv.CldfEnvironment.Offchain, dons, specs) diff --git a/system-tests/lib/cre/features/sets/sets.go b/system-tests/lib/cre/features/sets/sets.go index f29a5ad9be8..f737c2491fb 100644 --- a/system-tests/lib/cre/features/sets/sets.go +++ b/system-tests/lib/cre/features/sets/sets.go @@ -22,7 +22,6 @@ import ( func New() cre.Features { return cre.NewFeatures( - &consensus_v1_feature.Consensus{}, &consensus_v2_feature.Consensus{}, &cron_feature.Cron{}, &custom_compute_feature.CustomCompute{}, @@ -36,7 +35,10 @@ func New() cre.Features { &read_contract_feature.ReadContract{}, &web_api_target_feature.WebAPITarget{}, &web_api_trigger_feature.WebAPITrigger{}, - &vault_feature.Vault{}, &solana_v2_feature.Solana{}, + // Keep OCR3 late in PostEnvStartup so ConfigWatcher health waits do not block + // the rest of the job-oriented features from making progress. + &consensus_v1_feature.Consensus{}, + &vault_feature.Vault{}, ) } diff --git a/system-tests/lib/cre/features/solana/v2/solana.go b/system-tests/lib/cre/features/solana/v2/solana.go index 91b2519ec70..83eddee736a 100644 --- a/system-tests/lib/cre/features/solana/v2/solana.go +++ b/system-tests/lib/cre/features/solana/v2/solana.go @@ -4,9 +4,10 @@ import ( "bytes" "context" "fmt" + "maps" + "runtime" "slices" "strconv" - "sync" "text/template" "time" @@ -43,6 +44,7 @@ import ( "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs" "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/jobs/standardcapability" solchain "github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains/solana" + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/features/jobhelpers" corechainlink "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" ) @@ -119,7 +121,7 @@ func (s *Solana) PostEnvStartup( ) error { // 1. Deploy & Configure OCR3 Contracts (once solana consensus reads are supported) // 2. Create & Approve Solana Standard capability jobs for the DON - jobErr := createJobs(ctx, don, dons, creEnv) + jobErr := createJobs(ctx, testLogger, don, dons, creEnv) if jobErr != nil { return errors.Wrapf(jobErr, "failed to create job for solana chain standard capability") } @@ -139,11 +141,11 @@ func (s *Solana) PostEnvStartup( // post env func createJobs( ctx context.Context, + testLogger zerolog.Logger, don *cre.Don, dons *cre.Dons, creEnv *cre.Environment, ) error { - specs := make(map[string][]string) solChain := extractSolanaFromEnv(creEnv) var nodeSet cre.NodeSetWithCapabilityConfigs @@ -206,9 +208,10 @@ func createJobs( return errors.Wrapf(err, "failed to parse %s config template", flag) } - var specsMu sync.Mutex + results := make([]map[string][]string, len(workerNodes)) group, groupCtx := errgroup.WithContext(ctx) - for _, workerNode := range workerNodes { + group.SetLimit(min(len(workerNodes), runtime.GOMAXPROCS(0))) + for i, workerNode := range workerNodes { group.Go(func() error { key, ok := workerNode.Keys.Solana[chainID] if !ok { @@ -225,19 +228,19 @@ func createJobs( "ChainID": solChainID.String(), } - templateData, aErr := credon.ApplyRuntimeValues(config.Values, runtimeFallbacks) + templateData, aErr := credon.ApplyRuntimeValues(maps.Clone(config.Values), runtimeFallbacks) if aErr != nil { return errors.Wrap(aErr, "failed to apply runtime values") } var configBuffer bytes.Buffer - if err := tmpl.Execute(&configBuffer, templateData); err != nil { - return errors.Wrapf(err, "failed to execute %s config template", flag) + if executeErr := tmpl.Execute(&configBuffer, templateData); executeErr != nil { + return errors.Wrapf(executeErr, "failed to execute %s config template", flag) } configStr := configBuffer.String() - if err := credon.ValidateTemplateSubstitution(configStr, flag); err != nil { - return errors.Wrapf(err, "%s template validation failed", flag) + if validateErr := credon.ValidateTemplateSubstitution(configStr, flag); validateErr != nil { + return errors.Wrapf(validateErr, "%s template validation failed", flag) } workerInput := cre_jobs.ProposeJobSpecInput{ @@ -267,8 +270,7 @@ func createJobs( return fmt.Errorf("failed to propose Solana v2 worker job spec: %w", workerErr) } - specsMu.Lock() - defer specsMu.Unlock() + specs := make(map[string][]string) for _, r := range workerReport.Reports { out, ok := r.Output.(cre_jobs_ops.ProposeStandardCapabilityJobOutput) if !ok { @@ -286,12 +288,18 @@ func createJobs( default: } + results[i] = specs return nil }) } - if err := group.Wait(); err != nil { - return err + if wErr := group.Wait(); wErr != nil { + return wErr + } + + specs, mErr := jobhelpers.MergeSpecsByIndex(results) + if mErr != nil { + return mErr } approveErr := jobs.Approve(ctx, creEnv.CldfEnvironment.Offchain, dons, specs) diff --git a/system-tests/lib/cre/types.go b/system-tests/lib/cre/types.go index 84a9da7074d..20b1c1a2aa2 100644 --- a/system-tests/lib/cre/types.go +++ b/system-tests/lib/cre/types.go @@ -6,9 +6,11 @@ import ( "maps" "os" "path/filepath" + "runtime" "slices" "strconv" "strings" + "time" "github.com/Masterminds/semver/v3" "github.com/ethereum/go-ethereum/common" @@ -17,6 +19,7 @@ import ( "github.com/pelletier/go-toml/v2" "github.com/pkg/errors" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "github.com/smartcontractkit/chainlink-deployments-framework/datastore" cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment" @@ -385,6 +388,7 @@ type ConfigureCapabilityRegistryInput struct { NodeSets []*NodeSet CapabilityRegistryConfigFns []CapabilityRegistryConfigFn Blockchains []blockchains.Blockchain + Provider infra.Provider CapabilitiesRegistryAddress *common.Address @@ -574,10 +578,16 @@ func NewDonMetadata(c *NodeSet, id uint64, provider infra.Provider, capabilityCo cfgs[i] = cfg } + newNodesStart := time.Now() nodes, err := newNodes(cfgs) if err != nil { return nil, fmt.Errorf("failed to create nodes metadata: %w", err) } + framework.L.Info(). + Str("don", c.Name). + Int("nodes", len(cfgs)). + Float64("duration_s", roundSeconds(time.Since(newNodesStart))). + Msg("Node metadata generation completed") capConfigs, capErr := processCapabilityConfigs(c, capabilityConfigs) if capErr != nil { @@ -1181,13 +1191,26 @@ func NewNodeMetadata(c NodeMetadataConfig) (*NodeMetadata, error) { func newNodes(cfgs []NodeMetadataConfig) ([]*NodeMetadata, error) { nodes := make([]*NodeMetadata, len(cfgs)) + if len(cfgs) == 0 { + return nodes, nil + } - for i := range nodes { - node, err := NewNodeMetadata(cfgs[i]) - if err != nil { - return nil, fmt.Errorf("failed to create node (index: %d): %w", i, err) - } - nodes[i] = node + errGroup := errgroup.Group{} + errGroup.SetLimit(min(len(cfgs), runtime.GOMAXPROCS(0))) + + for i := range cfgs { + errGroup.Go(func() error { + node, err := NewNodeMetadata(cfgs[i]) + if err != nil { + return fmt.Errorf("failed to create node (index: %d): %w", i, err) + } + nodes[i] = node + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err } return nodes, nil @@ -1424,6 +1447,7 @@ type NodeKeyInput struct { } func NewNodeKeys(input NodeKeyInput) (*secrets.NodeKeys, error) { + start := time.Now() out := &secrets.NodeKeys{ EVM: make(map[uint64]*crypto.EVMKey), Solana: make(map[string]*crypto.SolKey), @@ -1467,6 +1491,14 @@ func NewNodeKeys(input NodeKeyInput) (*secrets.NodeKeys, error) { } out.Solana[chainID] = k } + + framework.L.Debug(). + Int("evm_chains", len(input.EVMChainIDs)). + Int("solana_chains", len(input.SolanaChainIDs)). + Bool("imported", input.ImportedSecrets != ""). + Float64("duration_s", roundSeconds(time.Since(start))). + Msg("Node key generation completed") + return out, nil } diff --git a/system-tests/lib/cre/types_test.go b/system-tests/lib/cre/types_test.go new file mode 100644 index 00000000000..ea3e0356577 --- /dev/null +++ b/system-tests/lib/cre/types_test.go @@ -0,0 +1,92 @@ +package cre + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets" +) + +func TestNewNodesPreservesInputOrder(t *testing.T) { + t.Parallel() + + cfgs := []NodeMetadataConfig{ + {Host: "node-0", Roles: []string{WorkerNode}, Index: 0, Keys: NodeKeyInput{Password: "password"}}, + {Host: "node-1", Roles: []string{BootstrapNode}, Index: 1, Keys: NodeKeyInput{Password: "password"}}, + {Host: "node-2", Roles: []string{GatewayNode}, Index: 2, Keys: NodeKeyInput{Password: "password"}}, + } + + nodes, err := newNodes(cfgs) + + require.NoError(t, err) + require.Len(t, nodes, len(cfgs)) + for i, node := range nodes { + require.Equal(t, cfgs[i].Host, node.Host) + require.Equal(t, cfgs[i].Index, node.Index) + require.Equal(t, cfgs[i].Roles, node.Roles) + } +} + +func TestNewNodesImportedSecretsBypassGeneration(t *testing.T) { + t.Parallel() + + importedKeys, err := NewNodeKeys(NodeKeyInput{ + EVMChainIDs: []uint64{111}, + Password: "password", + }) + require.NoError(t, err) + + importedSecrets, err := importedKeys.ToNodeSecretsTOML() + require.NoError(t, err) + + nodes, err := newNodes([]NodeMetadataConfig{{ + Host: "node-imported", + Roles: []string{WorkerNode}, + Index: 0, + Keys: NodeKeyInput{ + ImportedSecrets: importedSecrets, + }, + }}) + + require.NoError(t, err) + require.Len(t, nodes, 1) + require.Equal(t, importedKeys.PeerID(), nodes[0].Keys.PeerID()) + require.Equal(t, importedKeys.EVM[111].PublicAddress, nodes[0].Keys.EVM[111].PublicAddress) +} + +func TestNewNodesReturnsFailingIndex(t *testing.T) { + t.Parallel() + + importedKeys, err := NewNodeKeys(NodeKeyInput{Password: "password"}) + require.NoError(t, err) + + importedSecrets, err := importedKeys.ToNodeSecretsTOML() + require.NoError(t, err) + + _, err = newNodes([]NodeMetadataConfig{ + { + Host: "node-0", + Roles: []string{WorkerNode}, + Index: 0, + Keys: NodeKeyInput{ImportedSecrets: importedSecrets}, + }, + { + Host: "node-1", + Roles: []string{WorkerNode}, + Index: 1, + Keys: NodeKeyInput{ImportedSecrets: "not valid toml"}, + }, + { + Host: "node-2", + Roles: []string{WorkerNode}, + Index: 2, + Keys: NodeKeyInput{ImportedSecrets: importedSecrets}, + }, + }) + + require.Error(t, err) + require.Contains(t, err.Error(), "index: 1") +} + +var _ = secrets.NodeKeys{} From f51339ebfc3cfd97f51e90f62461bf4b2c521117 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Mon, 30 Mar 2026 14:54:19 +0200 Subject: [PATCH 2/5] clean up --- .../lib/cre/contracts/registry_pickup_wait.go | 39 ++++++++++++++++--- system-tests/lib/cre/don/jobs/jobs.go | 2 + .../cre/features/consensus/v2/consensus.go | 2 - .../lib/cre/features/don_time/don_time.go | 2 - .../lib/cre/features/solana/v2/solana.go | 3 +- 5 files changed, 37 insertions(+), 11 deletions(-) diff --git a/system-tests/lib/cre/contracts/registry_pickup_wait.go b/system-tests/lib/cre/contracts/registry_pickup_wait.go index 4c999930ee8..6656129c7c5 100644 --- a/system-tests/lib/cre/contracts/registry_pickup_wait.go +++ b/system-tests/lib/cre/contracts/registry_pickup_wait.go @@ -8,10 +8,12 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/jmoiron/sqlx" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "github.com/smartcontractkit/chainlink-testing-framework/framework/components/postgres" @@ -22,6 +24,7 @@ const ( capabilityRegistrySyncPollInterval = 5 * time.Second capabilityRegistrySyncTimeout = 2 * time.Minute capabilityRegistrySyncQueryTimeout = 3 * time.Second + capabilityRegistrySyncConcurrency = 4 ) type workflowWorkerTarget struct { @@ -71,14 +74,40 @@ func waitForWorkflowWorkersCapabilityRegistrySync(ctx context.Context, input cre defer ticker.Stop() for { + type checkResult struct { + key string + ready bool + state string + } + results := make([]checkResult, 0, len(pending)) + resultsMu := sync.Mutex{} + eg, egCtx := errgroup.WithContext(timeoutCtx) + eg.SetLimit(capabilityRegistrySyncConcurrency) for key, target := range pending { - ready, state := hasCapabilityRegistrySyncOnWorker(timeoutCtx, target.dbPort, target.nodeIndex) - if ready { - delete(pending, key) - delete(lastState, key) + key := key + target := target + eg.Go(func() error { + ready, state := hasCapabilityRegistrySyncOnWorker(egCtx, target.dbPort, target.nodeIndex) + resultsMu.Lock() + results = append(results, checkResult{ + key: key, + ready: ready, + state: state, + }) + resultsMu.Unlock() + return nil + }) + } + if err := eg.Wait(); err != nil { + return err + } + for _, result := range results { + if result.ready { + delete(pending, result.key) + delete(lastState, result.key) continue } - lastState[key] = state + lastState[result.key] = result.state } if len(pending) == 0 { diff --git a/system-tests/lib/cre/don/jobs/jobs.go b/system-tests/lib/cre/don/jobs/jobs.go index 32ced5d1d81..e06d18043d6 100644 --- a/system-tests/lib/cre/don/jobs/jobs.go +++ b/system-tests/lib/cre/don/jobs/jobs.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink/system-tests/lib/cre" ) +// defined as variables to allow for easy testing var loadNodeProposalIDs = func(ctx context.Context, node *cre.Node) (map[string]string, error) { jd, err := node.Clients.GQLClient.GetJobDistributor(ctx, node.JobDistributorDetails.JDID) if err != nil { @@ -32,6 +33,7 @@ var loadNodeProposalIDs = func(ctx context.Context, node *cre.Node) (map[string] return proposalIDsBySpec, nil } +// defined as variables to allow for easy testing var approveJobProposalSpec = func(ctx context.Context, node *cre.Node, proposalID string) error { approvedSpec, err := node.Clients.GQLClient.ApproveJobProposalSpec(ctx, proposalID, false) if err != nil { diff --git a/system-tests/lib/cre/features/consensus/v2/consensus.go b/system-tests/lib/cre/features/consensus/v2/consensus.go index 511849c57f0..d094064214b 100644 --- a/system-tests/lib/cre/features/consensus/v2/consensus.go +++ b/system-tests/lib/cre/features/consensus/v2/consensus.go @@ -84,7 +84,6 @@ func (c *Consensus) PostEnvStartup( ) error { jobsErr := createJobs( ctx, - testLogger, don, dons, creEnv, @@ -98,7 +97,6 @@ func (c *Consensus) PostEnvStartup( func createJobs( ctx context.Context, - testLogger zerolog.Logger, don *cre.Don, dons *cre.Dons, creEnv *cre.Environment, diff --git a/system-tests/lib/cre/features/don_time/don_time.go b/system-tests/lib/cre/features/don_time/don_time.go index 8efb751ce48..711fee093f5 100644 --- a/system-tests/lib/cre/features/don_time/don_time.go +++ b/system-tests/lib/cre/features/don_time/don_time.go @@ -69,7 +69,6 @@ func (o *DONTime) PostEnvStartup( ) error { jobErr := createJobs( ctx, - testLogger, creEnv, don, dons, @@ -83,7 +82,6 @@ func (o *DONTime) PostEnvStartup( func createJobs( ctx context.Context, - testLogger zerolog.Logger, creEnv *cre.Environment, don *cre.Don, dons *cre.Dons, diff --git a/system-tests/lib/cre/features/solana/v2/solana.go b/system-tests/lib/cre/features/solana/v2/solana.go index 83eddee736a..ffeee23422d 100644 --- a/system-tests/lib/cre/features/solana/v2/solana.go +++ b/system-tests/lib/cre/features/solana/v2/solana.go @@ -121,7 +121,7 @@ func (s *Solana) PostEnvStartup( ) error { // 1. Deploy & Configure OCR3 Contracts (once solana consensus reads are supported) // 2. Create & Approve Solana Standard capability jobs for the DON - jobErr := createJobs(ctx, testLogger, don, dons, creEnv) + jobErr := createJobs(ctx, don, dons, creEnv) if jobErr != nil { return errors.Wrapf(jobErr, "failed to create job for solana chain standard capability") } @@ -141,7 +141,6 @@ func (s *Solana) PostEnvStartup( // post env func createJobs( ctx context.Context, - testLogger zerolog.Logger, don *cre.Don, dons *cre.Dons, creEnv *cre.Environment, From db4d43df614c293e139ef1eda05871419e72c9d6 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Mon, 30 Mar 2026 15:06:17 +0200 Subject: [PATCH 3/5] lints lints --- system-tests/lib/cre/contracts/registry_pickup_wait.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/system-tests/lib/cre/contracts/registry_pickup_wait.go b/system-tests/lib/cre/contracts/registry_pickup_wait.go index 6656129c7c5..0bb6cfaa634 100644 --- a/system-tests/lib/cre/contracts/registry_pickup_wait.go +++ b/system-tests/lib/cre/contracts/registry_pickup_wait.go @@ -84,8 +84,6 @@ func waitForWorkflowWorkersCapabilityRegistrySync(ctx context.Context, input cre eg, egCtx := errgroup.WithContext(timeoutCtx) eg.SetLimit(capabilityRegistrySyncConcurrency) for key, target := range pending { - key := key - target := target eg.Go(func() error { ready, state := hasCapabilityRegistrySyncOnWorker(egCtx, target.dbPort, target.nodeIndex) resultsMu.Lock() From 36c5dd6f3d661c74d8d4c68a13a8fc9ed22b9235 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Mon, 30 Mar 2026 15:25:50 +0200 Subject: [PATCH 4/5] allow CRE system/regression tests to run max 10 minutes in the CI --- .github/workflows/cre-regression-system-tests.yaml | 2 +- .github/workflows/cre-system-tests.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cre-regression-system-tests.yaml b/.github/workflows/cre-regression-system-tests.yaml index a4ec65b103d..92c205baaaa 100644 --- a/.github/workflows/cre-regression-system-tests.yaml +++ b/.github/workflows/cre-regression-system-tests.yaml @@ -99,7 +99,7 @@ jobs: # http://docs.github.com/en/actions/how-tos/deploy/configure-and-manage-deployments/control-deployments#using-environments-without-deployments name: integration deployment: false - timeout-minutes: 60 + timeout-minutes: 10 env: ENABLE_AUTO_QUARANTINE: "true" # override Chip Ingress and Chip Config images with remote images. We have added this env var here, instead of the "Start local CRE" step, because diff --git a/.github/workflows/cre-system-tests.yaml b/.github/workflows/cre-system-tests.yaml index ba27f356917..a61f399e962 100644 --- a/.github/workflows/cre-system-tests.yaml +++ b/.github/workflows/cre-system-tests.yaml @@ -146,7 +146,7 @@ jobs: # http://docs.github.com/en/actions/how-tos/deploy/configure-and-manage-deployments/control-deployments#using-environments-without-deployments name: integration deployment: false - timeout-minutes: 60 + timeout-minutes: 10 env: ENABLE_AUTO_QUARANTINE: "true" # override Chip Ingress and Chip Config images with remote images. We have added this env var here, instead of the "Start local CRE" step, because From d3eee5bbc1afcd9626568ce33eba488b95fd3a0b Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Mon, 30 Mar 2026 15:51:16 +0200 Subject: [PATCH 5/5] CR changes --- system-tests/lib/cre/don.go | 5 ++--- system-tests/lib/cre/don_test.go | 3 --- system-tests/lib/cre/features/evm/v2/evm.go | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/system-tests/lib/cre/don.go b/system-tests/lib/cre/don.go index 97bebf02bc8..ec353744b80 100644 --- a/system-tests/lib/cre/don.go +++ b/system-tests/lib/cre/don.go @@ -455,9 +455,8 @@ type nodeChainConfigLister interface { } var ( - jdChainConfigPollInterval = 200 * time.Millisecond - jdChainConfigPollTimeout = 10 * time.Second - jdChainConfigRPCTimeout = 3 * time.Second + jdChainConfigPollTimeout = 10 * time.Second + jdChainConfigRPCTimeout = 3 * time.Second ) func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd nodeChainConfigLister) error { diff --git a/system-tests/lib/cre/don_test.go b/system-tests/lib/cre/don_test.go index dfde624ec68..575ec1dca17 100644 --- a/system-tests/lib/cre/don_test.go +++ b/system-tests/lib/cre/don_test.go @@ -138,12 +138,9 @@ func TestCreateJDChainConfigsFailsVerificationOnTimeout(t *testing.T) { } node.Clients.GQLClient = &fakeGQLClient{} - originalInterval := jdChainConfigPollInterval originalTimeout := jdChainConfigPollTimeout - jdChainConfigPollInterval = time.Millisecond jdChainConfigPollTimeout = 5 * time.Millisecond defer func() { - jdChainConfigPollInterval = originalInterval jdChainConfigPollTimeout = originalTimeout }() diff --git a/system-tests/lib/cre/features/evm/v2/evm.go b/system-tests/lib/cre/features/evm/v2/evm.go index 0c680addf69..4604fe78b41 100644 --- a/system-tests/lib/cre/features/evm/v2/evm.go +++ b/system-tests/lib/cre/features/evm/v2/evm.go @@ -274,7 +274,7 @@ func createJobs( results := make([]map[string][]string, len(workItems)) group, groupCtx := errgroup.WithContext(ctx) - group.SetLimit(max(len(workItems), 4)) + group.SetLimit(jobhelpers.Parallelism(len(workItems))) for i, workItem := range workItems { group.Go(func() error {