-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.go
More file actions
167 lines (146 loc) · 6.13 KB
/
consumer.go
File metadata and controls
167 lines (146 loc) · 6.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package eventbus
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/protobuf/types/known/anypb"
eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen"
"github.com/GoCodeAlone/workflow-plugin-eventbus/providers"
sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk"
)
// ── consumer registry ─────────────────────────────────────────────────────────
var (
consumerMu sync.RWMutex
consumerRegistry = make(map[string]*eventbusv1.ConsumerConfig)
)
// RegisterConsumer stores a ConsumerConfig in the global registry under instanceName.
func RegisterConsumer(instanceName string, cfg *eventbusv1.ConsumerConfig) {
consumerMu.Lock()
defer consumerMu.Unlock()
consumerRegistry[instanceName] = cfg
}
// GetConsumer looks up a ConsumerConfig by instance name.
func GetConsumer(instanceName string) (*eventbusv1.ConsumerConfig, bool) {
consumerMu.RLock()
defer consumerMu.RUnlock()
cfg, ok := consumerRegistry[instanceName]
return cfg, ok
}
// UnregisterConsumer removes a ConsumerConfig from the registry.
func UnregisterConsumer(instanceName string) {
consumerMu.Lock()
defer consumerMu.Unlock()
delete(consumerRegistry, instanceName)
}
// GetConsumerByName looks up a ConsumerConfig by its durable consumer name
// (cfg.name), iterating all registered instances. This is used by
// step.eventbus.consume to resolve the consumer config from the durable name
// supplied in ConsumeRequest.consumer.
func GetConsumerByName(durableName string) (*eventbusv1.ConsumerConfig, bool) {
consumerMu.RLock()
defer consumerMu.RUnlock()
for _, cfg := range consumerRegistry {
if cfg.GetName() == durableName {
return cfg, true
}
}
return nil, false
}
// ── ConsumerModuleFactory (TypedModuleProvider) ───────────────────────────────
// ConsumerModuleFactory implements sdk.TypedModuleProvider for the
// eventbus.consumer module type.
type ConsumerModuleFactory struct{}
// Compile-time assertion: ConsumerModuleFactory implements sdk.TypedModuleProvider.
var _ sdk.TypedModuleProvider = (*ConsumerModuleFactory)(nil)
// TypedModuleTypes returns the single module type served by this factory.
func (f *ConsumerModuleFactory) TypedModuleTypes() []string {
return []string{"eventbus.consumer"}
}
// CreateTypedModule unpacks the typed proto config and delegates to NewConsumerModule.
func (f *ConsumerModuleFactory) CreateTypedModule(typeName, name string, config *anypb.Any) (sdk.ModuleInstance, error) {
if typeName != "eventbus.consumer" {
return nil, fmt.Errorf("%w: module type %q", sdk.ErrTypedContractNotHandled, typeName)
}
var cfg eventbusv1.ConsumerConfig
if config != nil {
if err := config.UnmarshalTo(&cfg); err != nil {
return nil, fmt.Errorf("eventbus.consumer %q: unmarshal typed config: %w", name, err)
}
}
return NewConsumerModule(name, &cfg)
}
// ── consumerModule (ModuleInstance) ──────────────────────────────────────────
// consumerModule implements sdk.ModuleInstance for the eventbus.consumer
// module type. It declares a durable JetStream consumer (or Kafka consumer group)
// and registers its config for use by step and trigger modules. No background
// goroutines are started — consumption is pull-based, driven by step execution.
type consumerModule struct {
instanceName string
config *eventbusv1.ConsumerConfig
}
// Compile-time assertion: consumerModule implements sdk.ModuleInstance.
var _ sdk.ModuleInstance = (*consumerModule)(nil)
// NewConsumerModule creates a consumerModule from a typed ConsumerConfig proto.
//
// Returns an error if:
// - config.name is empty
// - config.stream_name is empty
func NewConsumerModule(instanceName string, cfg *eventbusv1.ConsumerConfig) (sdk.ModuleInstance, error) {
if cfg.GetName() == "" {
return nil, fmt.Errorf("eventbus.consumer %q: config.name is required", instanceName)
}
if cfg.GetStreamName() == "" {
return nil, fmt.Errorf("eventbus.consumer %q: config.stream_name is required", instanceName)
}
return &consumerModule{instanceName: instanceName, config: cfg}, nil
}
// Init registers the consumer config in the global registry.
func (m *consumerModule) Init() error {
RegisterConsumer(m.instanceName, m.config)
return nil
}
// Start resolves the broker named by m.config.BrokerRef via LookupRuntime and
// dispatches EnsureConsumer on the runtime so the durable consumer exists
// before any worker pulls. Mirrors streamModule.Start: the lookup is wrapped
// in a bounded-retry loop (10-second budget) because modular's Start phase
// has no guaranteed ordering between broker and consumer modules.
//
// Pull-based consumption still has no background goroutines — the
// step.eventbus.consume step drives fetch calls. This Start only ensures the
// consumer record exists; it does not start a long-lived subscription.
//
// Legacy compat: configs without broker_ref skip the dispatch entirely and
// keep behaving as registered-only (Init populated the global registry).
// This is the transitional shape; Group F refactors the step + trigger code
// to require broker_ref and the legacy path will be removed at that point.
func (m *consumerModule) Start(ctx context.Context) error {
brokerRef := m.config.GetBrokerRef()
if brokerRef == "" {
return nil
}
var (
rb providers.RuntimeBroker
conn providers.Connection
)
if err := retryWithBackoff(ctx, 10*time.Second, func() error {
var lookupErr error
rb, conn, lookupErr = LookupRuntime(brokerRef)
return lookupErr
}); err != nil {
return fmt.Errorf(
"eventbus.consumer %q: broker %q not available within 10s: %w",
m.instanceName, brokerRef, err,
)
}
streamName := m.config.GetStreamName()
if err := rb.EnsureConsumer(ctx, conn, streamName, m.config); err != nil {
return fmt.Errorf("eventbus.consumer %q: ensure: %w", m.instanceName, err)
}
return nil
}
// Stop unregisters the consumer config from the global registry.
func (m *consumerModule) Stop(_ context.Context) error {
UnregisterConsumer(m.instanceName)
return nil
}