Skip to content

Conversation

@patrickhuie19
Copy link
Contributor

@patrickhuie19 patrickhuie19 commented Jan 3, 2026

Supporting Private Workflow Registry

This change is part of a series of changes to allow the workflow engine to pull workflows from metadata sources other than the WorkflowRegistry.

For broader context, check out https://docs.google.com/document/d/1K5lLtczMe_HyrLTbdLNmb_jxj40lPCmMZW4OBjn7K5U/edit?tab=t.t17fuwqoj0w9#bookmark=id.i3vs01f9lvvr

For the workflow engine specific changes, this diagram is most relevant:
image

Series

[merged] chainlink-protos: smartcontractkit/chainlink-protos#255
[merged] chainlink-common: smartcontractkit/chainlink-common#1749
[In-review] chainlink: #20708

Changes specific to chainlink

  • Supports AlternativeWorkflowMetadata sources configured via TOML: grpc brokered and file brokered (for on-prem deployments)
  • Each source is responsible for fetching active + paused workflows. They present events directly to the reconciliation loop, so that if there is a failure on their source (RPC read failure, grpc server unavailable), they can gracefully wait for the next polling cycle w/o churning engine deletion.
  • The GRPC workflow metadata source will be deployed to a server. JWT CSA-key based allowlist auth. Retries on retryable errors.
  • Workflow execution and deployment events now include the workflow metadata source name to give us flexibility in our analytics and UI consumers to filter data by source.
  • Adds metrics for future dashboards/alerting.

@patrickhuie19 patrickhuie19 changed the title Feat/multi source workflows Supporting Private Workflow Registry Jan 3, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Jan 5, 2026

I see you updated files related to core. Please run pnpm changeset in the root directory to add a changeset as well as in the text include at least one of the following tags:

  • #added For any new functionality added.
  • #breaking_change For any functionality that requires manual action for the node to boot.
  • #bugfix For bug fixes.
  • #changed For any change to the existing functionality.
  • #db_update For any feature that introduces updates to database schema.
  • #deprecation_notice For any upcoming deprecation functionality.
  • #internal For changesets that need to be excluded from the final changelog.
  • #nops For any feature that is NOP facing and needs to be in the official Release Notes for the release.
  • #removed For any functionality/config that is removed.
  • #updated For any functionality that is updated.
  • #wip For any change that is not ready yet and external communication about it should be held off till it is feature complete.

@trunk-io
Copy link

trunk-io bot commented Jan 5, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
Test_CRE_GRPCSource_Lifecycle The test failed during the gRPC source lifecycle, but the specific error causing the failure is not indicated in the logs. Logs ↗︎
Test_CRE_GRPCSource_Lifecycle The test failed without a specific error message, but it appears to be related to issues in the gRPC source lifecycle workflow or environment setup. Logs ↗︎

View Full Report ↗︎Docs

@trunk-io
Copy link

trunk-io bot commented Jan 5, 2026

Static BadgeStatic BadgeStatic BadgeStatic Badge

Failed Test Failure Summary Logs
Test_CRE_V2_Cron_Regression The test named Test_CRE_V2_Cron_Regression failed to complete successfully. Logs ↗︎
Test_CRE_V2_EVM_EstimateGas_Invalid_To_Address_Regression/[v2]_EVM.EstimateGas_-_invalid_'to'_address_fails_with_not_authored_con... The test failed because it could not connect to the local node, causing environment setup to fail. Logs ↗︎
Test_CRE_V2_EVM_EstimateGas_Invalid_To_Address_Regression/[v2]_EVM.EstimateGas_-_invalid_'to'_address_fails_with_cut_hex The test failed because it could not connect to the local node, causing environment setup to fail. Logs ↗︎
Test_CRE_V2_EVM_FilterLogs_Invalid_Addresses_Regression/[v2]_EVM.FilterLogs_-_invalid_addresses_fails_with_invalid_address The test failed because it could not connect to the local node, causing environment setup to fail. Logs ↗︎

... and 48 more

View Full Report ↗︎Docs

@patrickhuie19 patrickhuie19 marked this pull request as ready for review January 5, 2026 04:42
@patrickhuie19 patrickhuie19 requested review from a team as code owners January 5, 2026 04:42
// Graceful degradation: Even if all sources fail, we return an empty list and nil error
// to allow retry on the next polling cycle. Errors are logged at appropriate levels
// (WARN when all sources fail, ERROR for individual source failures).
func (m *MultiSourceWorkflowAggregator) ListWorkflowMetadata(ctx context.Context, don capabilities.DON) ([]WorkflowMetadataView, *commontypes.Head, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrickhuie19 I think you should lift this up into the syncer -- you're having to deal with inconsistent heads here, but if you just treated each source independently you'd avoid that

What I'm proposing is that in the syncUsingReconciliationStrategy you just:

  • loop over the sources
  • for each source -> fetch workflows and store associated head
  • once you've fetched from all sources -> merge the result (and assert no overlapping!)
  • then put it through the reconcile logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handled a little differently: a117d58. updated the diagram in the description of the PR to match. syncUsingReconciliation loops over each source. each source will call ListWorkflowMetadata to get workflows, and then will send active + paused events to generateReconciliationEvents if there are no failures on the List action. In this way theres no deleting engine churn if on one polling cycle the eth mainnet rpc is down or the grpc server is unavailable after a few retries.

# ArtifactStorageHost is the host name that, when present within the workflow metadata binary or config URL, designates that a signed URL should be retrieved from the workflow storage service.
ArtifactStorageHost = 'artifact.cre.chain.link' # Example

[[Capabilities.WorkflowRegistry.AlternativeSources]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Additional a better name here? (Alternative to me implies "replacement of", whereas we want to read from both)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. we have to keep TOML backwards compatible, so it will take a few node revisions to deprecate the old TOML config section in favor of a more flexible section name like [[Capabilities.WorkflowRegistry.Source]] - i wanted to handle any such migration later on once we see usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with Cedric on this. How about "Offchain Sources"?

}

// MaxAlternativeSources is the maximum number of alternative workflow sources
const MaxAlternativeSources = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid enforcing a limit here (if we have to have a limit, I'd raise it to eg. 5); I'm anticipating that this will probably need to change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some context: Our polling cycle in the syncer is 12 seconds, so I think when we see this feature getting a lot of use, we would want to move to an individual polling cycle per source. I don't anticipate that happening for the next 12 months though, so putting a strawman limit on the alternative sources to help us not scatter metadata seems useful. Given that, would you still propose raising the limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised to 5: a117d58

MaxConfigSize *utils.FileSize
SyncStrategy *string
WorkflowStorage WorkflowStorage
AlternativeSourcesConfig []AlternativeWorkflowSource `toml:"AlternativeSources"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would deprecate the old fields (Address, NetworkID, ContractVersion, ChainID)
And introduce a []SourcesConfig that supports both contract + grpc sources
Then in setFrom you can read from the deprecated fields and add a contract source to the sources

Everywhere else in the application uses sources going forward

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will schedule for a follow up ticket, good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be hard to keep all sources together because config format is very different. How about a split into OnchainSources and OffchainSources? Potentially add XYZSources in the future.


// TryInitialize attempts to initialize the contract reader without blocking.
// Returns true if initialization succeeded or was already done.
func (c *ContractWorkflowSource) TryInitialize(ctx context.Context) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public if it's only used one line 52?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks

// TryInitialize attempts to initialize the contract reader without blocking.
// Returns true if initialization succeeded or was already done.
func (c *ContractWorkflowSource) TryInitialize(ctx context.Context) bool {
c.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a lock here? (Same question inside the List function)

This won't get called in parallel (the only thing calling this is the singleton sync routine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the lock to match the old pattern, which IFIUC had the lock in case the node was running the vault don functionality associated with the contract reader in addition to the engine deployment functionality. could remove; enables parallelization in tests w/ no cost for a singleton routine.

w.lggr.Debugw("fetching workflow metadata from all sources", "don", don.Families)

// Use the multi-source aggregator to fetch workflows from all configured sources
allWorkflowsMetadata, head, err := w.workflowSources.ListWorkflowMetadata(ctx, don)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case to be careful of here is that if you fail to get a result from one source, but get a result from another and then try to reconcile that partial list of workflows, it will lead to any workflows from the failing source to be deleted

That probably isn't desirable. You'll either need to:

  • just error at this point (even if you fetched one of the sources successfully)
  • or do reconciliation based on the source alone (i.e. all workflows are tagged with a source and you reconcile each in turn)

Probably the second one is what we want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, thanks for the catch - the second option looks best to me as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…conciliation per source in syncUsingReconciliationStrategy directly
}

// TryInitialize returns the current ready state (GRPC client initialized in constructor).
func (g *GRPCWorkflowSource) TryInitialize(_ context.Context) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: TryInitialize shouldn't be part of any public interface -- we should treat this as an internal detail of each source

Ready() error

// TryInitialize attempts lazy initialization. Returns true if ready.
TryInitialize(ctx context.Context) bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this out of the interface since it's only ever called internally (and in fact for some sources it does nothing)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good catch, thanks

switch wfMeta.Status {
case WorkflowStatusActive:
switch engineFound {
// we can't tell the difference between an activation and registration without holding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrickhuie19 Lots of comments are being removed here... some of which contains some useful information about what we're doing -- do we need to remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we want them, thanks - I thought i added back in.

@cl-sonarqube-production
Copy link

# ArtifactStorageHost is the host name that, when present within the workflow metadata binary or config URL, designates that a signed URL should be retrieved from the workflow storage service.
ArtifactStorageHost = 'artifact.cre.chain.link' # Example

[[Capabilities.WorkflowRegistry.AlternativeSources]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with Cedric on this. How about "Offchain Sources"?

MaxConfigSize *utils.FileSize
SyncStrategy *string
WorkflowStorage WorkflowStorage
AlternativeSourcesConfig []AlternativeWorkflowSource `toml:"AlternativeSources"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be hard to keep all sources together because config format is very different. How about a split into OnchainSources and OffchainSources? Potentially add XYZSources in the future.

@@ -0,0 +1,105 @@
# Workflow Gateway DON configuration with gRPC alternative workflow source enabled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the comment to "workflow DON with Gateway"? The name of the file is already confusing.

- [Debugging core nodes](#debugging-core-nodes)
- [Debugging capabilities (mac)](#debugging-capabilities-mac)
- [Workflow Commands](#workflow-commands)
- [Alternative Workflow Sources](#alternative-workflow-sources)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"offchain"?

- `--binary-url-prefix`: Prefix for the binary URL in the output (e.g., `file:///home/chainlink/workflows/`)
- `--config-url-prefix`: Prefix for the config URL in the output

### Deploying a File-Source Workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

}
}

// calculateBackoff calculates the backoff duration for a given attempt with jitter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a ready Go library which core already imports. @jmank88 ?


// ContractWorkflowSource implements WorkflowMetadataSource by reading from the on-chain
// workflow registry contract.
type ContractWorkflowSource struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about OnchainWorkflowSource?


// ListWorkflowMetadata fetches workflow metadata from the on-chain contract.
// It lazily initializes the contract reader on first call.
func (c *ContractWorkflowSource) ListWorkflowMetadata(ctx context.Context, don capabilities.DON) ([]WorkflowMetadataView, *commontypes.Head, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this pretty much a copy of the old "getAllWorkflowsMetadata"? Can you preserve the TODO comment that got removed please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants