Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions pkg/capabilities/base_trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package capabilities

import (
"context"
log "github.com/smartcontractkit/chainlink-common/pkg/logger"
"sync"
"time"
)

type PendingEvent struct {
TriggerId string
WorkflowId string
EventId string
Payload []byte
FirstAt time.Time
LastSentAt time.Time
Attempts int
}

type EventStore interface {
Insert(ctx context.Context, rec PendingEvent) error
Delete(ctx context.Context, triggerId, workflowId, eventId string) error
List(ctx context.Context) ([]PendingEvent, error)
}

type OutboundSend func(ctx context.Context, ev TriggerEvent, workflowId string) error
type LostHook func(ctx context.Context, rec PendingEvent)

// TODO Implement BaseTriggerCapability - CRE-1523
type BaseTriggerCapability struct {
/*
Keeps track of workflow registrations (similar to LLO streams trigger).
Handles retransmits based on T_retransmit and T_max.
Persists pending events in the DB to be resilient to node restarts.
*/
tRetransmit time.Duration // time window for an event being ACKd before we retransmit
tMax time.Duration // timeout before events are considered lost if not ACKd

store EventStore
send OutboundSend
lost LostHook
lggr *log.Logger

mu sync.Mutex
pending map[string]*PendingEvent // key(triggerID|workflowID|eventID)

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func (b *BaseTriggerCapability) Start(ctx context.Context) error {
b.ctx, b.cancel = context.WithCancel(ctx)

recs, err := b.store.List(ctx)
if err != nil {
return err
}

// Initialize in-memory persistence
b.pending = make(map[string]*PendingEvent)
for i := range recs {
r := recs[i]
b.pending[key(r.TriggerId, r.WorkflowId, r.EventId)] = &r

Check failure on line 64 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

undefined: key

Check failure on line 64 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

undefined: key

Check failure on line 64 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

undefined: key
}

b.wg.Add(1)
go func() {
defer b.wg.Done()
b.retransmitLoop()
}()

for _, r := range recs {
_ = b.trySend(ctx, r.TriggerId, r.WorkflowId, r.EventId)

Check failure on line 74 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

b.trySend undefined (type *BaseTriggerCapability has no field or method trySend)

Check failure on line 74 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

b.trySend undefined (type *BaseTriggerCapability has no field or method trySend)

Check failure on line 74 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

b.trySend undefined (type *BaseTriggerCapability has no field or method trySend)
}
return nil
}

func (b *BaseTriggerCapability) deliverEvent(
ctx context.Context,
ev TriggerEvent,
workflowIds []string,
) error {
/*
Base Trigger Capability can interact with the Don2Don layer (in the remote capability setting)
as well as directly with a consumer (in the local setting).
*/
now := time.Now()

for _, workflowId := range workflowIds {
rec := PendingEvent{
TriggerId: ev.TriggerId,

Check failure on line 92 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 92 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 92 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)
WorkflowId: workflowId,
EventId: ev.EventId,

Check failure on line 94 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

ev.EventId undefined (type TriggerEvent has no field or method EventId)

Check failure on line 94 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

ev.EventId undefined (type TriggerEvent has no field or method EventId)

Check failure on line 94 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

ev.EventId undefined (type TriggerEvent has no field or method EventId)
Payload: ev.Payload,

Check failure on line 95 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

cannot use ev.Payload (variable of type *anypb.Any) as []byte value in struct literal

Check failure on line 95 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

cannot use ev.Payload (variable of type *anypb.Any) as []byte value in struct literal

Check failure on line 95 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

cannot use ev.Payload (variable of type *anypb.Any) as []byte value in struct literal
FirstAt: now,
}

if err := b.store.Insert(ctx, rec); err != nil {
return err
}

b.mu.Lock()
b.pending[key(ev.TriggerId, workflowId, ev.EventId)] = &rec

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

ev.EventId undefined (type TriggerEvent has no field or method EventId)

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

undefined: key

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

ev.EventId undefined (type TriggerEvent has no field or method EventId)

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

undefined: key

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

ev.EventId undefined (type TriggerEvent has no field or method EventId)

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 104 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

undefined: key
b.mu.Unlock()

_ = b.trySend(ctx, ev.TriggerId, workflowId, ev.EventId)

Check failure on line 107 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 107 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-race-tests

b.trySend undefined (type *BaseTriggerCapability has no field or method trySend)

Check failure on line 107 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 107 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / build-test

b.trySend undefined (type *BaseTriggerCapability has no field or method trySend)

Check failure on line 107 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

ev.TriggerId undefined (type TriggerEvent has no field or method TriggerId)

Check failure on line 107 in pkg/capabilities/base_trigger.go

View workflow job for this annotation

GitHub Actions / benchmark

b.trySend undefined (type *BaseTriggerCapability has no field or method trySend)
}
return nil // only when the event is successfully persisted and ready to be relaibly delivered
}

func (b *BaseTriggerCapability) AckEvent(
ctx context.Context,
triggerId, workflowId, eventId string,
) error {
k := key(triggerId, workflowId, eventId) // NOTE: WorkflowID we want to start ;P

b.mu.Lock()
delete(b.pending, k)
b.mu.Unlock()

return b.store.Delete(ctx, triggerId, workflowId, eventId)
}

func (b *BaseTriggerCapability) retransmitLoop() {
ticker := time.NewTicker(b.tRetransmit / 2)
defer ticker.Stop()

for {
select {
case <-b.ctx.Done():
return
case <-ticker.C:
b.scanPending()
}
}
}

func (b *BaseTriggerCapability) scanPending() {
now := time.Now()

for _, rec := range b.pending {
if now.Sub(rec.FirstAt) >= b.tMax {
_ = b.AckEvent(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId)
b.lost(b.ctx, *rec)
continue
}
if rec.LastSentAt.IsZero() || now.Sub(rec.LastSentAt) >= b.tRetransmit {
_ = b.trySend(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId)
}
}
}
6 changes: 6 additions & 0 deletions pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ type OCRAttributedOnchainSignature struct {
type TriggerExecutable interface {
RegisterTrigger(ctx context.Context, request TriggerRegistrationRequest) (<-chan TriggerResponse, error)
UnregisterTrigger(ctx context.Context, request TriggerRegistrationRequest) error
AckEvent(ctx context.Context, eventId string) error
}

// TriggerCapability interface needs to be implemented by all trigger capabilities.
Expand Down Expand Up @@ -551,6 +552,7 @@ func MustNewRemoteCapabilityInfo(
const (
DefaultRegistrationRefresh = 30 * time.Second
DefaultRegistrationExpiry = 2 * time.Minute
DefaultEventTimeout = 2 * time.Minute // TODO: determine best value
DefaultMessageExpiry = 2 * time.Minute
DefaultBatchSize = 100
DefaultBatchCollectionPeriod = 100 * time.Millisecond
Expand All @@ -561,6 +563,7 @@ const (
type RemoteTriggerConfig struct {
RegistrationRefresh time.Duration
RegistrationExpiry time.Duration
EventTimeout time.Duration
MinResponsesToAggregate uint32
MessageExpiry time.Duration
MaxBatchSize uint32
Expand Down Expand Up @@ -594,6 +597,9 @@ func (c *RemoteTriggerConfig) ApplyDefaults() {
if c.RegistrationExpiry == 0 {
c.RegistrationExpiry = DefaultRegistrationExpiry
}
if c.EventTimeout == 0 {
c.EventTimeout = DefaultEventTimeout
}
if c.MessageExpiry == 0 {
c.MessageExpiry = DefaultMessageExpiry
}
Expand Down
74 changes: 42 additions & 32 deletions pkg/capabilities/pb/registry.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/capabilities/pb/registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import "google/protobuf/duration.proto";
message RemoteTriggerConfig {
google.protobuf.Duration registrationRefresh = 1;
google.protobuf.Duration registrationExpiry = 2;
uint32 minResponsesToAggregate = 3;
google.protobuf.Duration messageExpiry = 4;
uint32 maxBatchSize = 5;
google.protobuf.Duration batchCollectionPeriod = 6;
google.protobuf.Duration eventTimeout =3;
uint32 minResponsesToAggregate = 4;
google.protobuf.Duration messageExpiry = 5;
uint32 maxBatchSize = 6;
google.protobuf.Duration batchCollectionPeriod = 7;
}

// deprecated - v1 only
Expand Down
16 changes: 16 additions & 0 deletions pkg/capabilities/registry/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ func (a *atomicTriggerCapability) GetState() connectivity.State {
return connectivity.State(-1) // unknown
}

func (a *atomicTriggerCapability) AckEvent(ctx context.Context, eventId string) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).AckEvent(ctx, eventId)
}

func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
c := a.Load()
if c == nil {
Expand Down Expand Up @@ -360,6 +368,14 @@ func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
return connectivity.State(-1) // unknown
}

func (a *atomicExecuteAndTriggerCapability) AckEvent(ctx context.Context, eventId string) error {
c := a.Load()
if c == nil {
return errors.New("capability unavailable")
}
return (*c).AckEvent(ctx, eventId)
}

func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
c := a.Load()
if c == nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/capabilities/triggers/mercury_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (o *MercuryTriggerService) ProcessReport(reports []datastreams.FeedReport)
return nil
}

func (o *MercuryTriggerService) AckEvent(ctx context.Context, eventId string) error {
// TODO Implement?
return nil
}

func (o *MercuryTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
wid := req.Metadata.WorkflowID

Expand Down
5 changes: 5 additions & 0 deletions pkg/capabilities/triggers/on_demand_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func (o *OnDemand) SendEvent(ctx context.Context, wid string, event capabilities
return nil
}

func (o *OnDemand) AckEvent(ctx context.Context, eventId string) error {
//TODO Implement?
return nil
}

func (o *OnDemand) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
wid := req.Metadata.WorkflowID
o.mu.Lock()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading