Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,14 @@ func (w *launcher) donPairsToUpdate(myID ragetypes.PeerID, localRegistry *regist
}

func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error {
w.lggr.Debug("CapabilitiesLauncher triggered...")
w.registry.SetLocalRegistry(localRegistry)
// Do not set an empty local registry: capability init (e.g. EVM) calls LocalNode() and fails with
// "empty local registry. no DONs registered". Only set once we have at least one DON so that
// capabilities that depend on the registry see valid data (or keep waiting until syncer pushes non-empty).
if len(localRegistry.IDsToDONs) > 0 {
w.registry.SetLocalRegistry(localRegistry)
} else {
w.lggr.Debugw("CapabilitiesLauncher skipping SetLocalRegistry (empty registry, waiting for first sync with DONs)")
}

allDONIDs := w.allDONs(localRegistry)
w.lggr.Debugw("All DONs in the local registry", "allDONIDs", allDONIDs)
Expand Down
35 changes: 34 additions & 1 deletion core/services/standardcapabilities/standard_capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -118,7 +119,9 @@ func (s *StandardCapabilities) Start(ctx context.Context) error {
CRESettings: s.creSettings,
TriggerEventStore: s.triggerEventStore,
}
if err = s.capabilitiesLoop.Service.Initialise(cctx, dependencies); err != nil {

s.log.Infow("StandardCapabilities calling Initialise on capability service", "command", s.command)
if err = s.retryInitialiseUntilReady(cctx, dependencies); err != nil {
s.log.Errorf("error initialising standard capabilities service: %v", err)
return
}
Expand All @@ -136,6 +139,36 @@ func (s *StandardCapabilities) Start(ctx context.Context) error {
})
}

// retryInitialiseUntilReady calls Initialise and retries on "empty local registry" or
// "metadataRegistry information not available" so that capability init runs after the
// registry syncer has pushed at least one non-empty local registry (startup race fix).
const initRetryTimeout = 90 * time.Second
const initRetryInterval = 3 * time.Second

func (s *StandardCapabilities) retryInitialiseUntilReady(ctx context.Context, dependencies core.StandardCapabilitiesDependencies) error {
deadline := time.Now().Add(initRetryTimeout)
var lastErr error
for attempt := 0; time.Now().Before(deadline); attempt++ {
lastErr = s.capabilitiesLoop.Service.Initialise(ctx, dependencies)
if lastErr == nil {
return nil
}
msg := lastErr.Error()
if !strings.Contains(msg, "empty local registry") && !strings.Contains(msg, "metadataRegistry information not available") {
return lastErr
}
if attempt > 0 {
s.log.Infow("StandardCapabilities Initialise retry (waiting for registry sync)", "command", s.command, "attempt", attempt+1, "err", lastErr)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(initRetryInterval):
}
}
return fmt.Errorf("initialise still failing after %v (registry never became ready): %w", initRetryTimeout, lastErr)
}

// Ready is a non-blocking check for the service's ready state. Errors if not
// ready when called.
func (s *StandardCapabilities) Ready() error {
Expand Down
169 changes: 114 additions & 55 deletions system-tests/lib/cre/don.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package cre
import (
"context"
"fmt"
"math"
"net/url"
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -22,6 +22,7 @@ import (
keystone_changeset "github.com/smartcontractkit/chainlink/deployment/keystone/changeset"
ks_contracts_op "github.com/smartcontractkit/chainlink/deployment/keystone/changeset/operations/contracts"

"github.com/smartcontractkit/chainlink-testing-framework/framework"
"github.com/smartcontractkit/chainlink-testing-framework/framework/clclient"
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain"
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/clnode"
Expand All @@ -30,6 +31,7 @@ import (

chainselectors "github.com/smartcontractkit/chain-selectors"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink/system-tests/lib/cre/don/secrets"
"github.com/smartcontractkit/chainlink/system-tests/lib/cre/environment/blockchains"
Expand Down Expand Up @@ -227,7 +229,7 @@ func (d *Don) GetName() string {

func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Output) (*Don, error) {
don := &Don{
Nodes: make([]*Node, 0),
Nodes: make([]*Node, len(donMetadata.NodesMetadata)),
Name: donMetadata.Name,
ID: donMetadata.ID,
Flags: donMetadata.Flags,
Expand All @@ -236,8 +238,6 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou
chainCapabilityIndex: donMetadata.ns.chainCapabilityIndex,
}

mu := &sync.Mutex{}

errgroup := errgroup.Group{}
for idx, nodeMetadata := range donMetadata.NodesMetadata {
errgroup.Go(func() error {
Expand All @@ -246,10 +246,7 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou
return fmt.Errorf("failed to create node %d: %w", idx, err)
}
node.DON = don

mu.Lock()
don.Nodes = append(don.Nodes, node)
mu.Unlock()
don.Nodes[idx] = node

return nil
})
Expand All @@ -274,8 +271,6 @@ func NewDON(ctx context.Context, donMetadata *DonMetadata, ctfNodes []*clnode.Ou
}

func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.Blockchain, cldfEnv *cldf.Environment) error {
mu := &sync.Mutex{}

jd, ok := cldfEnv.Offchain.(*jd.JobDistributor)
if !ok {
return fmt.Errorf("offchain environment is not a *.jd.JobDistributor, but %T", cldfEnv.Offchain)
Expand All @@ -293,19 +288,23 @@ func registerWithJD(ctx context.Context, d *Don, supportedChains []blockchains.B
for _, role := range node.Roles {
switch role {
case RoleWorker, RoleBootstrap:
chainConfigStart := time.Now()
if err := createJDChainConfigs(ctx, node, supportedChains, jd); err != nil {
return fmt.Errorf("failed to create supported chains in node %s: %w", node.Name, err)
}
framework.L.Info().
Str("don", d.Name).
Str("node", node.Name).
Float64("duration_s", roundSeconds(time.Since(chainConfigStart))).
Msg("JD chain-config setup completed")
case RoleGateway:
// no chains configuration needed for gateway nodes
default:
return fmt.Errorf("unknown node role: %s", role)
}
}

mu.Lock()
d.Nodes[idx] = node
mu.Unlock()

return nil
})
Expand Down Expand Up @@ -451,7 +450,19 @@ type JDChainConfigInput struct {
ChainType string
}

func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd *jd.JobDistributor) error {
type nodeChainConfigLister interface {
ListNodeChainConfigs(context.Context, *nodev1.ListNodeChainConfigsRequest, ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error)
}

var (
jdChainConfigPollInterval = 200 * time.Millisecond
jdChainConfigPollTimeout = 10 * time.Second
jdChainConfigRPCTimeout = 3 * time.Second
)

func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockchains.Blockchain, jd nodeChainConfigLister) error {
ocr2BundleIDsByType := make(map[string]string)

for _, chain := range supportedChains {
var account string
chainIDStr := strconv.FormatUint(chain.ChainID(), 10)
Expand Down Expand Up @@ -507,41 +518,35 @@ func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockc
if chain.IsFamily(blockchain.FamilyTron) {
chainType = strings.ToUpper(blockchain.FamilyEVM)
}
ocr2BundleID, createErr := n.Clients.GQLClient.FetchOCR2KeyBundleID(ctx, chainType)
if createErr != nil {
return fmt.Errorf("failed to fetch OCR2 key bundle id for node %s: %w", n.Name, createErr)
}
if ocr2BundleID == "" {
return fmt.Errorf("no OCR2 key bundle id found for node %s", n.Name)
}

ocr2BundleID, ok := ocr2BundleIDsByType[chainType]
if !ok {
fetchErr := error(nil)
ocr2BundleID, fetchErr = n.Clients.GQLClient.FetchOCR2KeyBundleID(ctx, chainType)
if fetchErr != nil {
return fmt.Errorf("failed to fetch OCR2 key bundle id for node %s: %w", n.Name, fetchErr)
}
if ocr2BundleID == "" {
return fmt.Errorf("no OCR2 key bundle id found for node %s", n.Name)
}
ocr2BundleIDsByType[chainType] = ocr2BundleID
}
if n.Keys.OCR2BundleIDs == nil {
n.Keys.OCR2BundleIDs = make(map[string]string)
}

n.Keys.OCR2BundleIDs[strings.ToLower(chainType)] = ocr2BundleID

// retry twice with 5 seconds interval to create JobDistributorChainConfig
retryErr := retry.Do(ctx, retry.WithMaxDuration(10*time.Second, retry.NewConstant(3*time.Second)), func(ctx context.Context) error {
// check the node chain config to see if this chain already exists
nodeChainConfigs, listErr := jd.ListNodeChainConfigs(context.Background(), &nodev1.ListNodeChainConfigsRequest{
Filter: &nodev1.ListNodeChainConfigsRequest_Filter{
NodeIds: []string{n.JobDistributorDetails.NodeID},
}})
if listErr != nil {
return retry.RetryableError(fmt.Errorf("failed to list node chain configs for node %s, retrying..: %w", n.Name, listErr))
// Retry create+observe to preserve the original JD behavior.
retryErr := retry.Do(ctx, retry.WithMaxDuration(jdChainConfigPollTimeout, retry.NewConstant(3*time.Second)), func(ctx context.Context) error {
nodeChainConfigIDs, err := listNodeChainConfigIDs(ctx, jd, n.JobDistributorDetails.NodeID)
if err != nil {
return retry.RetryableError(fmt.Errorf("failed to list node chain configs for node %s: %w", n.Name, err))
}
if nodeChainConfigs != nil {
for _, chainConfig := range nodeChainConfigs.ChainConfigs {
if chainConfig.Chain.Id == chainIDStr {
return nil
}
}
if _, exists := nodeChainConfigIDs[chainIDStr]; exists {
return nil
}

// we need to create JD chain config for each chain, because later on changestes ask the node for that chain data
// each node needs to have OCR2 enabled, because p2pIDs are used by some contracts to identify nodes (e.g. capability registry)
_, createErr = n.Clients.GQLClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{
_, err = n.Clients.GQLClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{
JobDistributorID: n.JobDistributorDetails.JDID,
ChainID: chainIDStr,
ChainType: chainType,
Expand All @@ -554,23 +559,48 @@ func createJDChainConfigs(ctx context.Context, n *Node, supportedChains []blockc
Ocr2KeyBundleID: ocr2BundleID,
Ocr2Plugins: `{}`,
})
// TODO: add a check if the chain config failed because of a duplicate in that case, should we update or return success?
if createErr != nil {
return createErr
if err != nil {
return err
}

// JD silently fails to update nodeChainConfig. Therefore, we fetch the node config and
// if it's not updated , throw an error
return retry.RetryableError(errors.New("retrying CreateChainConfig in JD"))
})

if retryErr != nil {
return fmt.Errorf("failed to create JD chain configuration for node %s: %w", n.Name, retryErr)
}
}

return nil
}

func listNodeChainConfigIDs(ctx context.Context, jd nodeChainConfigLister, nodeID string) (map[string]struct{}, error) {
rpcCtx, cancel := context.WithTimeout(ctx, jdChainConfigRPCTimeout)
defer cancel()

resp, err := jd.ListNodeChainConfigs(rpcCtx, &nodev1.ListNodeChainConfigsRequest{
Filter: &nodev1.ListNodeChainConfigsRequest_Filter{
NodeIds: []string{nodeID},
},
})
if err != nil {
return nil, err
}

chainIDs := make(map[string]struct{})
if resp == nil {
return chainIDs, nil
}

for _, chainConfig := range resp.ChainConfigs {
if chainConfig.GetChain() == nil {
continue
}
chainIDs[chainConfig.Chain.Id] = struct{}{}
}

return chainIDs, nil
}

// AcceptJob accepts the job proposal for the given job proposal spec
func (n *Node) AcceptJob(ctx context.Context, spec string) error {
// fetch JD to get the job proposals
Expand Down Expand Up @@ -773,21 +803,46 @@ func LinkToJobDistributor(ctx context.Context, input *LinkDonsToJDInput) error {
return errors.New("input is nil")
}

var nodeIDs []string
start := time.Now()
dons := input.Dons.List()
donMetadata := input.Topology.DonsMetadata.List()
nodeIDsByDON := make([][]string, len(dons))

for idx, don := range input.Dons.List() {
supportedChains, schErr := findDonSupportedChains(input.Topology.DonsMetadata.List()[idx], input.Blockchains)
if schErr != nil {
return errors.Wrap(schErr, "failed to find supported chains for DON")
}
errGroup, groupCtx := errgroup.WithContext(ctx)
for idx, don := range dons {
errGroup.Go(func() error {
donStart := time.Now()
supportedChains, schErr := findDonSupportedChains(donMetadata[idx], input.Blockchains)
if schErr != nil {
return errors.Wrap(schErr, "failed to find supported chains for DON")
}

if err := registerWithJD(ctx, don, supportedChains, input.CldfEnvironment); err != nil {
return fmt.Errorf("failed to register DON with JD: %w", err)
}
nodeIDs = append(nodeIDs, don.JDNodeIDs()...)
if err := registerWithJD(groupCtx, don, supportedChains, input.CldfEnvironment); err != nil {
return fmt.Errorf("failed to register DON %s with JD: %w", don.Name, err)
}

nodeIDsByDON[idx] = don.JDNodeIDs()
framework.L.Info().
Str("don", don.Name).
Float64("duration_s", roundSeconds(time.Since(donStart))).
Msg("JD registration completed for DON")
return nil
})
}

if err := errGroup.Wait(); err != nil {
return err
}

nodeIDs := make([]string, 0)
for _, donNodeIDs := range nodeIDsByDON {
nodeIDs = append(nodeIDs, donNodeIDs...)
}

input.CldfEnvironment.NodeIDs = nodeIDs
framework.L.Info().
Float64("duration_s", roundSeconds(time.Since(start))).
Msg("Post-start JD linking completed")

return nil
}
Expand Down Expand Up @@ -824,6 +879,10 @@ func findDonSupportedChains(donMetadata *DonMetadata, bcs []blockchains.Blockcha
return chains, nil
}

func roundSeconds(d time.Duration) float64 {
return math.Round(d.Seconds()*10) / 10
}

// Make DonMetadata also implement it, just in case?
type KeystoneDON interface {
KeystoneDONConfig() ks_contracts_op.ConfigureKeystoneDON
Expand Down
Loading
Loading