From e7b910b2a9871e79eaff7fdb932491d5aa9d411b Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 24 May 2026 20:54:02 -0400 Subject: [PATCH 1/4] feat: add neutral telemetry contracts --- plugin/external/sdk/interfaces.go | 12 +++ telemetry/contracts.go | 119 ++++++++++++++++++++++++++++++ telemetry/contracts_test.go | 58 +++++++++++++++ telemetry/sdk_aliases_test.go | 34 +++++++++ 4 files changed, 223 insertions(+) create mode 100644 telemetry/contracts.go create mode 100644 telemetry/contracts_test.go create mode 100644 telemetry/sdk_aliases_test.go diff --git a/plugin/external/sdk/interfaces.go b/plugin/external/sdk/interfaces.go index fb68918b..b6588f59 100644 --- a/plugin/external/sdk/interfaces.go +++ b/plugin/external/sdk/interfaces.go @@ -6,6 +6,7 @@ import ( "context" pb "github.com/GoCodeAlone/workflow/plugin/external/proto" + "github.com/GoCodeAlone/workflow/telemetry" "google.golang.org/protobuf/types/known/anypb" ) @@ -161,3 +162,14 @@ type ServiceContextInvoker interface { type TypedServiceInvoker interface { InvokeTypedMethod(method string, input *anypb.Any) (*anypb.Any, error) } + +type TelemetryAttrs = telemetry.Attrs +type TelemetryMetricKind = telemetry.MetricKind +type TelemetryMetricRecord = telemetry.MetricRecord +type TelemetryMetricRecorder = telemetry.MetricRecorder +type TelemetryMetricEmitter = telemetry.MetricEmitter +type TelemetryLogRecord = telemetry.LogRecord +type TelemetryLogEmitter = telemetry.LogEmitter +type TelemetrySpanEvent = telemetry.SpanEvent +type TelemetrySpanRecorder = telemetry.SpanRecorder +type TelemetryTraceAnnotator = telemetry.TraceAnnotator diff --git a/telemetry/contracts.go b/telemetry/contracts.go new file mode 100644 index 00000000..e7e01274 --- /dev/null +++ b/telemetry/contracts.go @@ -0,0 +1,119 @@ +package telemetry + +import ( + "context" + "sync" + "time" +) + +type Attrs map[string]string + +type MetricKind string + +const ( + MetricCounter MetricKind = "counter" + MetricGauge MetricKind = "gauge" + MetricHistogram MetricKind = "histogram" +) + +type MetricRecord struct { + Name string + Kind MetricKind + Value float64 + Attrs Attrs + Timestamp time.Time +} + +type MetricRecorder interface { + Counter(name string, value float64, attrs Attrs) + Gauge(name string, value float64, attrs Attrs) + Histogram(name string, value float64, attrs Attrs) +} + +type MetricEmitter interface { + EmitMetrics(context.Context, MetricRecorder) error +} + +type LogRecord struct { + Timestamp time.Time + Level string + Message string + Module string + Attrs Attrs +} + +type LogEmitter interface { + DrainTelemetryLogs(context.Context) []LogRecord +} + +type SpanEvent struct { + Name string + Attrs Attrs + Timestamp time.Time +} + +type SpanRecorder interface { + Event(name string, attrs Attrs) + Attribute(key, value string) +} + +type TraceAnnotator interface { + AnnotateSpan(context.Context, SpanRecorder) +} + +type SnapshotRecorder struct { + mu sync.Mutex + metrics []MetricRecord +} + +func NewSnapshotRecorder() *SnapshotRecorder { + return &SnapshotRecorder{} +} + +func (r *SnapshotRecorder) Counter(name string, value float64, attrs Attrs) { + r.record(MetricCounter, name, value, attrs) +} + +func (r *SnapshotRecorder) Gauge(name string, value float64, attrs Attrs) { + r.record(MetricGauge, name, value, attrs) +} + +func (r *SnapshotRecorder) Histogram(name string, value float64, attrs Attrs) { + r.record(MetricHistogram, name, value, attrs) +} + +func (r *SnapshotRecorder) Metrics() []MetricRecord { + r.mu.Lock() + defer r.mu.Unlock() + + out := make([]MetricRecord, len(r.metrics)) + for i, metric := range r.metrics { + metric.Attrs = copyAttrs(metric.Attrs) + out[i] = metric + } + return out +} + +func (r *SnapshotRecorder) record(kind MetricKind, name string, value float64, attrs Attrs) { + r.mu.Lock() + defer r.mu.Unlock() + + r.metrics = append(r.metrics, MetricRecord{ + Name: name, + Kind: kind, + Value: value, + Attrs: copyAttrs(attrs), + Timestamp: time.Now(), + }) +} + +func copyAttrs(attrs Attrs) Attrs { + if len(attrs) == 0 { + return nil + } + copied := make(Attrs, len(attrs)) + for k, v := range attrs { + copied[k] = v + } + return copied +} diff --git a/telemetry/contracts_test.go b/telemetry/contracts_test.go new file mode 100644 index 00000000..d882c84c --- /dev/null +++ b/telemetry/contracts_test.go @@ -0,0 +1,58 @@ +package telemetry + +import ( + "context" + "testing" + "time" +) + +type testMetricEmitter struct{} + +func (testMetricEmitter) EmitMetrics(_ context.Context, r MetricRecorder) error { + r.Counter("requests_total", 2, Attrs{"tenant": "acme"}) + r.Gauge("active_sessions", 3, nil) + r.Histogram("request_duration_seconds", 0.15, Attrs{"route": "/"}) + return nil +} + +func TestSnapshotRecorderCapturesMetrics(t *testing.T) { + rec := NewSnapshotRecorder() + if err := (testMetricEmitter{}).EmitMetrics(context.Background(), rec); err != nil { + t.Fatal(err) + } + got := rec.Metrics() + if len(got) != 3 { + t.Fatalf("metric count = %d, want 3", len(got)) + } + if got[0].Name != "requests_total" || got[0].Kind != MetricCounter || got[0].Value != 2 { + t.Fatalf("first metric = %#v", got[0]) + } + if got[0].Attrs["tenant"] != "acme" { + t.Fatalf("tenant attr = %q, want acme", got[0].Attrs["tenant"]) + } + if got[0].Timestamp.IsZero() { + t.Fatal("metric timestamp is zero") + } +} + +func TestSnapshotRecorderCopiesAttrs(t *testing.T) { + rec := NewSnapshotRecorder() + attrs := Attrs{"tenant": "acme"} + rec.Counter("requests_total", 1, attrs) + attrs["tenant"] = "other" + got := rec.Metrics() + got[0].Attrs["tenant"] = "mutated" + + got = rec.Metrics() + if got[0].Attrs["tenant"] != "acme" { + t.Fatalf("stored attr = %q, want acme", got[0].Attrs["tenant"]) + } +} + +func TestLogRecordDefaults(t *testing.T) { + now := time.Now() + rec := LogRecord{Timestamp: now, Level: "info", Message: "ok"} + if rec.Timestamp.IsZero() || rec.Level != "info" || rec.Message != "ok" { + t.Fatalf("bad log record: %#v", rec) + } +} diff --git a/telemetry/sdk_aliases_test.go b/telemetry/sdk_aliases_test.go new file mode 100644 index 00000000..b8f27130 --- /dev/null +++ b/telemetry/sdk_aliases_test.go @@ -0,0 +1,34 @@ +package telemetry_test + +import ( + "context" + "testing" + + "github.com/GoCodeAlone/workflow/plugin/external/sdk" + "github.com/GoCodeAlone/workflow/telemetry" +) + +type sdkMetricEmitter struct{} + +func (sdkMetricEmitter) EmitMetrics(_ context.Context, r sdk.TelemetryMetricRecorder) error { + r.Counter("sdk_requests_total", 1, sdk.TelemetryAttrs{"source": "sdk"}) + return nil +} + +func TestSDKTelemetryAliasesMatchCoreContracts(t *testing.T) { + var _ telemetry.MetricEmitter = sdkMetricEmitter{} + var _ sdk.TelemetryMetricEmitter = sdkMetricEmitter{} + + rec := telemetry.NewSnapshotRecorder() + if err := (sdkMetricEmitter{}).EmitMetrics(context.Background(), rec); err != nil { + t.Fatal(err) + } + got := rec.Metrics() + if len(got) != 1 || got[0].Name != "sdk_requests_total" { + t.Fatalf("metrics = %#v", got) + } + + var _ sdk.TelemetryMetricRecord = telemetry.MetricRecord{} + var _ sdk.TelemetryLogRecord = telemetry.LogRecord{} + var _ sdk.TelemetrySpanEvent = telemetry.SpanEvent{} +} From 723da8f8406cd589de90794dbe980da6d8572623 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 24 May 2026 20:59:36 -0400 Subject: [PATCH 2/4] feat: add telemetry bridge --- module/telemetry_bridge.go | 108 +++++++++++++ module/telemetry_bridge_test.go | 77 +++++++++ plugins/observability/plugin_test.go | 48 +++++- plugins/observability/wiring.go | 49 ++++++ telemetry/bridge.go | 232 +++++++++++++++++++++++++++ telemetry/bridge_test.go | 156 ++++++++++++++++++ 6 files changed, 668 insertions(+), 2 deletions(-) create mode 100644 module/telemetry_bridge.go create mode 100644 module/telemetry_bridge_test.go create mode 100644 telemetry/bridge.go create mode 100644 telemetry/bridge_test.go diff --git a/module/telemetry_bridge.go b/module/telemetry_bridge.go new file mode 100644 index 00000000..445cf652 --- /dev/null +++ b/module/telemetry_bridge.go @@ -0,0 +1,108 @@ +package module + +import ( + "context" + "sync" + "time" + + "github.com/GoCodeAlone/modular" + "github.com/GoCodeAlone/workflow/telemetry" +) + +const defaultTelemetryBridgeInterval = 30 * time.Second + +type TelemetryBridgeConfig struct { + Interval time.Duration `yaml:"interval" json:"interval"` + Timeout time.Duration `yaml:"timeout" json:"timeout"` +} + +type TelemetryBridge struct { + name string + config TelemetryBridgeConfig + bridge *telemetry.Bridge + app modular.Application + logger modular.Logger + + mu sync.Mutex + cancel context.CancelFunc +} + +func NewTelemetryBridge(name string, sink telemetry.TelemetrySink, config TelemetryBridgeConfig) *TelemetryBridge { + if config.Interval <= 0 { + config.Interval = defaultTelemetryBridgeInterval + } + return &TelemetryBridge{ + name: name, + config: config, + bridge: telemetry.NewBridge(sink, telemetry.BridgeConfig{Timeout: config.Timeout}), + } +} + +func (m *TelemetryBridge) Name() string { + return m.name +} + +func (m *TelemetryBridge) Init(app modular.Application) error { + m.app = app + m.logger = app.Logger() + return app.RegisterService("telemetry.bridge", m) +} + +func (m *TelemetryBridge) Start(ctx context.Context) error { + m.mu.Lock() + if m.cancel != nil { + m.mu.Unlock() + return nil + } + runCtx, cancel := context.WithCancel(ctx) + m.cancel = cancel + m.mu.Unlock() + + m.collect(runCtx) + go m.run(runCtx) + return nil +} + +func (m *TelemetryBridge) Stop(context.Context) error { + m.mu.Lock() + cancel := m.cancel + m.cancel = nil + m.mu.Unlock() + if cancel != nil { + cancel() + } + return nil +} + +func (m *TelemetryBridge) ProvidesServices() []modular.ServiceProvider { + return []modular.ServiceProvider{ + { + Name: "telemetry.bridge", + Description: "Host-side telemetry bridge for neutral Workflow emitters", + Instance: m, + }, + } +} + +func (m *TelemetryBridge) RequiresServices() []modular.ServiceDependency { + return nil +} + +func (m *TelemetryBridge) run(ctx context.Context) { + ticker := time.NewTicker(m.config.Interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.collect(ctx) + } + } +} + +func (m *TelemetryBridge) collect(ctx context.Context) { + if err := m.bridge.Collect(ctx, m.app); err != nil && m.logger != nil { + m.logger.Warn("telemetry bridge collection failed", "error", err) + } +} diff --git a/module/telemetry_bridge_test.go b/module/telemetry_bridge_test.go new file mode 100644 index 00000000..464561bf --- /dev/null +++ b/module/telemetry_bridge_test.go @@ -0,0 +1,77 @@ +package module + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/GoCodeAlone/workflow/telemetry" +) + +type telemetryBridgeMetricEmitter struct{} + +func (telemetryBridgeMetricEmitter) EmitMetrics(_ context.Context, r telemetry.MetricRecorder) error { + r.Counter("requests_total", 1, nil) + return nil +} + +type telemetryBridgeRecordingSink struct { + mu sync.Mutex + metrics []telemetry.MetricRecord +} + +func (s *telemetryBridgeRecordingSink) RecordMetrics(_ context.Context, records []telemetry.MetricRecord) error { + s.mu.Lock() + defer s.mu.Unlock() + s.metrics = append(s.metrics, records...) + return nil +} + +func (s *telemetryBridgeRecordingSink) RecordLogs(context.Context, []telemetry.LogRecord) error { + return nil +} + +func (s *telemetryBridgeRecordingSink) RecordSpanEvents(context.Context, []telemetry.SpanEvent) error { + return nil +} + +func (s *telemetryBridgeRecordingSink) count() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.metrics) +} + +func TestTelemetryBridgeModuleCollectsOnInterval(t *testing.T) { + app := NewMockApplication() + if err := app.RegisterService("emitter", telemetryBridgeMetricEmitter{}); err != nil { + t.Fatal(err) + } + sink := &telemetryBridgeRecordingSink{} + mod := NewTelemetryBridge("telemetry-bridge", sink, TelemetryBridgeConfig{ + Interval: 10 * time.Millisecond, + Timeout: time.Second, + }) + if err := mod.Init(app); err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := mod.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := mod.Stop(context.Background()); err != nil { + t.Fatalf("Stop: %v", err) + } + }) + + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if sink.count() > 0 { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("bridge did not collect metrics") +} diff --git a/plugins/observability/plugin_test.go b/plugins/observability/plugin_test.go index 21e18b05..ef830f45 100644 --- a/plugins/observability/plugin_test.go +++ b/plugins/observability/plugin_test.go @@ -1,8 +1,12 @@ package observability import ( + "context" "testing" + "github.com/GoCodeAlone/modular" + "github.com/GoCodeAlone/workflow/config" + "github.com/GoCodeAlone/workflow/module" "github.com/GoCodeAlone/workflow/plugin" "github.com/GoCodeAlone/workflow/schema" ) @@ -239,12 +243,13 @@ func TestWiringHooks(t *testing.T) { p := New() hooks := p.WiringHooks() - if len(hooks) != 5 { - t.Fatalf("WiringHooks() count = %d, want 5", len(hooks)) + if len(hooks) != 6 { + t.Fatalf("WiringHooks() count = %d, want 6", len(hooks)) } expectedNames := map[string]bool{ "observability.otel-middleware": false, + "observability.telemetry-bridge": false, "observability.health-endpoints": false, "observability.metrics-endpoint": false, "observability.log-endpoint": false, @@ -269,6 +274,45 @@ func TestWiringHooks(t *testing.T) { } } +func TestWireTelemetryBridgeRegistersBridge(t *testing.T) { + app := module.NewMockApplication() + app.RegisterModule(&testTelemetryModule{name: "telemetry"}) + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "telemetry", Type: "observability.telemetry"}, + }, + } + + if err := wireTelemetryBridge(app, cfg); err != nil { + t.Fatal(err) + } + svc, ok := app.Services["telemetry.bridge"].(*module.TelemetryBridge) + if !ok { + t.Fatalf("telemetry.bridge service = %#v", app.Services["telemetry.bridge"]) + } + if err := svc.Stop(context.Background()); err != nil { + t.Fatal(err) + } +} + +type testTelemetryModule struct { + name string +} + +func (m *testTelemetryModule) Name() string { return m.name } +func (m *testTelemetryModule) Init(modular.Application) error { + return nil +} +func (m *testTelemetryModule) ProvidesServices() []modular.ServiceProvider { + return nil +} +func (m *testTelemetryModule) RequiresServices() []modular.ServiceDependency { + return nil +} +func (m *testTelemetryModule) InvokeServiceContext(context.Context, string, map[string]any) (map[string]any, error) { + return map[string]any{"accepted": true}, nil +} + func TestStepFactories(t *testing.T) { p := New() steps := p.StepFactories() diff --git a/plugins/observability/wiring.go b/plugins/observability/wiring.go index 3f02ee5c..f4125efc 100644 --- a/plugins/observability/wiring.go +++ b/plugins/observability/wiring.go @@ -2,11 +2,13 @@ package observability import ( "context" + "time" "github.com/GoCodeAlone/modular" "github.com/GoCodeAlone/workflow/config" "github.com/GoCodeAlone/workflow/module" "github.com/GoCodeAlone/workflow/plugin" + "github.com/GoCodeAlone/workflow/telemetry" ) // wiringHooks returns post-init wiring functions that connect observability @@ -41,7 +43,54 @@ func wiringHooks() []plugin.WiringHook { Priority: 40, // run after health/metrics so routes are stable Hook: wireOpenAPIEndpoints, }, + { + Name: "observability.telemetry-bridge", + Priority: 10, + Hook: wireTelemetryBridge, + }, + } +} + +type legacyServiceInvoker interface { + InvokeService(string, map[string]any) (map[string]any, error) +} + +type legacyServiceInvokerAdapter struct { + invoker legacyServiceInvoker +} + +func (a legacyServiceInvokerAdapter) InvokeServiceContext(_ context.Context, method string, args map[string]any) (map[string]any, error) { + return a.invoker.InvokeService(method, args) +} + +func wireTelemetryBridge(app modular.Application, cfg *config.WorkflowConfig) error { + sink := telemetry.TelemetrySink(telemetry.NoopSink{}) + if cfg != nil { + for _, modCfg := range cfg.Modules { + if modCfg.Type != "observability.telemetry" { + continue + } + mod := app.GetModule(modCfg.Name) + if invoker, ok := mod.(telemetry.ContextServiceInvoker); ok { + sink = telemetry.NewServiceInvokerSink(invoker) + break + } + if invoker, ok := mod.(legacyServiceInvoker); ok { + sink = telemetry.NewServiceInvokerSink(legacyServiceInvokerAdapter{invoker: invoker}) + break + } + } + } + + bridge := module.NewTelemetryBridge("observability.telemetry-bridge", sink, module.TelemetryBridgeConfig{ + Interval: 30 * time.Second, + Timeout: 2 * time.Second, + }) + if err := bridge.Init(app); err != nil { + return err } + app.RegisterModule(bridge) + return bridge.Start(context.Background()) } // wireHealthEndpoints registers health check endpoints on any available router, diff --git a/telemetry/bridge.go b/telemetry/bridge.go new file mode 100644 index 00000000..947f1ec0 --- /dev/null +++ b/telemetry/bridge.go @@ -0,0 +1,232 @@ +package telemetry + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/GoCodeAlone/modular" +) + +const defaultBridgeTimeout = 2 * time.Second + +type TelemetrySink interface { + RecordMetrics(context.Context, []MetricRecord) error + RecordLogs(context.Context, []LogRecord) error + RecordSpanEvents(context.Context, []SpanEvent) error +} + +type BridgeConfig struct { + Timeout time.Duration +} + +type Bridge struct { + sink TelemetrySink + config BridgeConfig +} + +func NewBridge(sink TelemetrySink, config BridgeConfig) *Bridge { + if sink == nil { + sink = NoopSink{} + } + if config.Timeout <= 0 { + config.Timeout = defaultBridgeTimeout + } + return &Bridge{sink: sink, config: config} +} + +func (b *Bridge) Collect(ctx context.Context, app modular.Application) error { + if app == nil { + return nil + } + + var joined error + var metrics []MetricRecord + var logs []LogRecord + var events []SpanEvent + + for name, svc := range app.SvcRegistry() { + if emitter, ok := svc.(MetricEmitter); ok { + rec := NewSnapshotRecorder() + if err := emitter.EmitMetrics(ctx, rec); err != nil { + joined = errors.Join(joined, fmt.Errorf("collect metrics from %s: %w", name, err)) + } + metrics = append(metrics, rec.Metrics()...) + } + if emitter, ok := svc.(LogEmitter); ok { + logs = append(logs, emitter.DrainTelemetryLogs(ctx)...) + } + if annotator, ok := svc.(TraceAnnotator); ok { + rec := newSnapshotSpanRecorder() + annotator.AnnotateSpan(ctx, rec) + events = append(events, rec.Events()...) + } + } + + sinkCtx := ctx + var cancel context.CancelFunc + if b.config.Timeout > 0 { + sinkCtx, cancel = context.WithTimeout(ctx, b.config.Timeout) + defer cancel() + } + if len(metrics) > 0 { + if err := b.sink.RecordMetrics(sinkCtx, metrics); err != nil { + joined = errors.Join(joined, fmt.Errorf("record metrics: %w", err)) + } + } + if len(logs) > 0 { + if err := b.sink.RecordLogs(sinkCtx, logs); err != nil { + joined = errors.Join(joined, fmt.Errorf("record logs: %w", err)) + } + } + if len(events) > 0 { + if err := b.sink.RecordSpanEvents(sinkCtx, events); err != nil { + joined = errors.Join(joined, fmt.Errorf("record span events: %w", err)) + } + } + return joined +} + +type NoopSink struct{} + +func (NoopSink) RecordMetrics(context.Context, []MetricRecord) error { + return nil +} + +func (NoopSink) RecordLogs(context.Context, []LogRecord) error { + return nil +} + +func (NoopSink) RecordSpanEvents(context.Context, []SpanEvent) error { + return nil +} + +type ContextServiceInvoker interface { + InvokeServiceContext(context.Context, string, map[string]any) (map[string]any, error) +} + +type ServiceInvokerSink struct { + invoker ContextServiceInvoker +} + +func NewServiceInvokerSink(invoker ContextServiceInvoker) *ServiceInvokerSink { + return &ServiceInvokerSink{invoker: invoker} +} + +func (s *ServiceInvokerSink) RecordMetrics(ctx context.Context, records []MetricRecord) error { + if s == nil || s.invoker == nil { + return nil + } + _, err := s.invoker.InvokeServiceContext(ctx, "recordMetrics", map[string]any{ + "metrics": metricRecordsToArgs(records), + }) + return err +} + +func (s *ServiceInvokerSink) RecordLogs(ctx context.Context, records []LogRecord) error { + if s == nil || s.invoker == nil { + return nil + } + _, err := s.invoker.InvokeServiceContext(ctx, "recordLogs", map[string]any{ + "logs": logRecordsToArgs(records), + }) + return err +} + +func (s *ServiceInvokerSink) RecordSpanEvents(ctx context.Context, records []SpanEvent) error { + if s == nil || s.invoker == nil { + return nil + } + _, err := s.invoker.InvokeServiceContext(ctx, "recordSpanEvents", map[string]any{ + "events": spanEventsToArgs(records), + }) + return err +} + +type snapshotSpanRecorder struct { + events []SpanEvent +} + +func newSnapshotSpanRecorder() *snapshotSpanRecorder { + return &snapshotSpanRecorder{} +} + +func (r *snapshotSpanRecorder) Event(name string, attrs Attrs) { + r.events = append(r.events, SpanEvent{ + Name: name, + Attrs: copyAttrs(attrs), + Timestamp: time.Now(), + }) +} + +func (r *snapshotSpanRecorder) Attribute(key, value string) { + r.Event("attribute", Attrs{key: value}) +} + +func (r *snapshotSpanRecorder) Events() []SpanEvent { + out := make([]SpanEvent, len(r.events)) + for i, event := range r.events { + event.Attrs = copyAttrs(event.Attrs) + out[i] = event + } + return out +} + +func metricRecordsToArgs(records []MetricRecord) []map[string]any { + out := make([]map[string]any, 0, len(records)) + for _, record := range records { + out = append(out, map[string]any{ + "name": record.Name, + "kind": string(record.Kind), + "value": record.Value, + "attrs": mapString(record.Attrs), + "timestamp": formatTimestamp(record.Timestamp), + }) + } + return out +} + +func logRecordsToArgs(records []LogRecord) []map[string]any { + out := make([]map[string]any, 0, len(records)) + for _, record := range records { + out = append(out, map[string]any{ + "timestamp": formatTimestamp(record.Timestamp), + "level": record.Level, + "message": record.Message, + "module": record.Module, + "attrs": mapString(record.Attrs), + }) + } + return out +} + +func spanEventsToArgs(records []SpanEvent) []map[string]any { + out := make([]map[string]any, 0, len(records)) + for _, record := range records { + out = append(out, map[string]any{ + "name": record.Name, + "attrs": mapString(record.Attrs), + "timestamp": formatTimestamp(record.Timestamp), + }) + } + return out +} + +func mapString(attrs Attrs) map[string]string { + if len(attrs) == 0 { + return nil + } + out := make(map[string]string, len(attrs)) + for k, v := range attrs { + out[k] = v + } + return out +} + +func formatTimestamp(ts time.Time) string { + if ts.IsZero() { + return "" + } + return ts.UTC().Format(time.RFC3339Nano) +} diff --git a/telemetry/bridge_test.go b/telemetry/bridge_test.go new file mode 100644 index 00000000..60e7fcaf --- /dev/null +++ b/telemetry/bridge_test.go @@ -0,0 +1,156 @@ +package telemetry_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/GoCodeAlone/workflow/module" + "github.com/GoCodeAlone/workflow/telemetry" +) + +type bridgeMetricEmitter struct{} + +func (bridgeMetricEmitter) EmitMetrics(_ context.Context, r telemetry.MetricRecorder) error { + r.Counter("requests_total", 2, telemetry.Attrs{"tenant": "acme"}) + r.Gauge("active_sessions", 3, nil) + r.Histogram("request_duration_seconds", 0.15, telemetry.Attrs{"route": "/"}) + return nil +} + +type bridgeLogEmitter struct{} + +func (bridgeLogEmitter) DrainTelemetryLogs(_ context.Context) []telemetry.LogRecord { + return []telemetry.LogRecord{{Level: "info", Message: "ok", Module: "test"}} +} + +type bridgeTraceAnnotator struct{} + +func (bridgeTraceAnnotator) AnnotateSpan(_ context.Context, r telemetry.SpanRecorder) { + r.Event("cache.hit", telemetry.Attrs{"cache": "users"}) + r.Attribute("tenant", "acme") +} + +type recordingSink struct { + metrics []telemetry.MetricRecord + logs []telemetry.LogRecord + events []telemetry.SpanEvent +} + +func (s *recordingSink) RecordMetrics(_ context.Context, records []telemetry.MetricRecord) error { + s.metrics = append(s.metrics, records...) + return nil +} + +func (s *recordingSink) RecordLogs(_ context.Context, records []telemetry.LogRecord) error { + s.logs = append(s.logs, records...) + return nil +} + +func (s *recordingSink) RecordSpanEvents(_ context.Context, records []telemetry.SpanEvent) error { + s.events = append(s.events, records...) + return nil +} + +type failingSink struct{} + +func (failingSink) RecordMetrics(context.Context, []telemetry.MetricRecord) error { + return errors.New("metrics down") +} + +func (failingSink) RecordLogs(context.Context, []telemetry.LogRecord) error { + return nil +} + +func (failingSink) RecordSpanEvents(context.Context, []telemetry.SpanEvent) error { + return nil +} + +func TestBridgeCollectsEmitters(t *testing.T) { + app := module.NewMockApplication() + if err := app.RegisterService("metrics", bridgeMetricEmitter{}); err != nil { + t.Fatal(err) + } + if err := app.RegisterService("logs", bridgeLogEmitter{}); err != nil { + t.Fatal(err) + } + if err := app.RegisterService("traces", bridgeTraceAnnotator{}); err != nil { + t.Fatal(err) + } + + sink := &recordingSink{} + bridge := telemetry.NewBridge(sink, telemetry.BridgeConfig{Timeout: time.Second}) + if err := bridge.Collect(context.Background(), app); err != nil { + t.Fatal(err) + } + if len(sink.metrics) != 3 { + t.Fatalf("metric count = %d, want 3", len(sink.metrics)) + } + if len(sink.logs) != 1 { + t.Fatalf("log count = %d, want 1", len(sink.logs)) + } + if len(sink.events) != 2 { + t.Fatalf("span event count = %d, want 2", len(sink.events)) + } +} + +func TestBridgeSinkFailureIsDiagnostic(t *testing.T) { + app := module.NewMockApplication() + if err := app.RegisterService("metrics", bridgeMetricEmitter{}); err != nil { + t.Fatal(err) + } + + bridge := telemetry.NewBridge(failingSink{}, telemetry.BridgeConfig{Timeout: time.Second}) + if err := bridge.Collect(context.Background(), app); err == nil { + t.Fatal("expected diagnostic error") + } +} + +func TestNoopSinkKeepsEmittersInert(t *testing.T) { + app := module.NewMockApplication() + if err := app.RegisterService("metrics", bridgeMetricEmitter{}); err != nil { + t.Fatal(err) + } + + bridge := telemetry.NewBridge(telemetry.NoopSink{}, telemetry.BridgeConfig{Timeout: time.Second}) + if err := bridge.Collect(context.Background(), app); err != nil { + t.Fatal(err) + } +} + +func TestServiceInvokerSinkConvertsRecords(t *testing.T) { + invoker := &recordingInvoker{} + sink := telemetry.NewServiceInvokerSink(invoker) + err := sink.RecordMetrics(context.Background(), []telemetry.MetricRecord{{ + Name: "requests_total", + Kind: telemetry.MetricCounter, + Value: 1, + Attrs: telemetry.Attrs{"tenant": "acme"}, + Timestamp: time.Unix(1, 0).UTC(), + }}) + if err != nil { + t.Fatal(err) + } + if invoker.method != "recordMetrics" { + t.Fatalf("method = %q, want recordMetrics", invoker.method) + } + metrics, ok := invoker.args["metrics"].([]map[string]any) + if !ok || len(metrics) != 1 { + t.Fatalf("metrics args = %#v", invoker.args["metrics"]) + } + if metrics[0]["timestamp"] != "1970-01-01T00:00:01Z" { + t.Fatalf("timestamp = %#v", metrics[0]["timestamp"]) + } +} + +type recordingInvoker struct { + method string + args map[string]any +} + +func (i *recordingInvoker) InvokeServiceContext(_ context.Context, method string, args map[string]any) (map[string]any, error) { + i.method = method + i.args = args + return map[string]any{"accepted": true}, nil +} From 77a9a6446911d64b252a0e455c11898a44dfdfb1 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 24 May 2026 21:31:54 -0400 Subject: [PATCH 3/4] chore: tidy example module sums --- example/go.sum | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/example/go.sum b/example/go.sum index d0851f94..73264da0 100644 --- a/example/go.sum +++ b/example/go.sum @@ -8,6 +8,8 @@ github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2 github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go/v5 v5.8.3 h1:s58CUJ9s8lezjhTNJO/SxkPBv2qZjS3ktpRSqGF5n0s= github.com/DataDog/datadog-go/v5 v5.8.3/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= +github.com/GoCodeAlone/go-plugin v1.7.0 h1:EwnhqPlXiNmp85S+MXnKKvm3YlfA6O4NzBb4+GSlEVY= +github.com/GoCodeAlone/go-plugin v1.7.0/go.mod h1:HbGQRZUIa+jbDfjsaZIMJYvrz+LnxL0mJpggfynSTMk= github.com/GoCodeAlone/modular v1.13.2 h1:qmiSgLrbDUIHzT1CHNh8Zbm3nYCneksymsmECMyG5FI= github.com/GoCodeAlone/modular v1.13.2/go.mod h1:b06Pvgcc8HsGxvl30iO39zGH2jIWz467QEj2+OQL2Do= github.com/GoCodeAlone/modular/modules/auth v1.15.0 h1:pBSkPSf4k4GLSbUQFLuPa+nFbfoJXGzSz9q89VoapZk= @@ -324,6 +326,8 @@ github.com/hashicorp/serf v0.10.2 h1:m5IORhuNSjaxeljg5DeQVDlQyVkhRIjJDimbkCa8aAc github.com/hashicorp/serf v0.10.2/go.mod h1:T1CmSGfSeGfnfNy/w0odXQUR1rfECGd2Qdsp84DjOiY= github.com/hashicorp/vault/api v1.23.0 h1:gXgluBsSECfRWTSW9niY2jwg2e9mMJc4WoHNv4g3h6A= github.com/hashicorp/vault/api v1.23.0/go.mod h1:zransKiB9ftp+kgY8ydjnvCU7Wk8i9L0DYWpXeMj9ko= +github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= +github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns= github.com/itchyny/gojq v0.12.19 h1:ttXA0XCLEMoaLOz5lSeFOZ6u6Q3QxmG46vfgI4O0DEs= github.com/itchyny/gojq v0.12.19/go.mod h1:5galtVPDywX8SPSOrqjGxkBeDhSxEW1gSxoy7tn1iZY= github.com/itchyny/timefmt-go v0.1.8 h1:1YEo1JvfXeAHKdjelbYr/uCuhkybaHCeTkH8Bo791OI= @@ -391,6 +395,8 @@ github.com/minio/highwayhash v1.0.4 h1:asJizugGgchQod2ja9NJlGOWq4s7KsAWr5XUc9Clg github.com/minio/highwayhash v1.0.4/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= +github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= @@ -441,6 +447,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E= +github.com/oklog/run v1.2.0/go.mod h1:mgDbKRSwPhJfesJ4PntqFUbKQRZ50NgmZTSPlFA0YFk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= From 135dea6e6ac7a5f6225b0702f5dc8e6a0fc73f8a Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Sun, 24 May 2026 21:38:36 -0400 Subject: [PATCH 4/4] test: satisfy telemetry alias lint --- telemetry/sdk_aliases_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/telemetry/sdk_aliases_test.go b/telemetry/sdk_aliases_test.go index b8f27130..c6f2b705 100644 --- a/telemetry/sdk_aliases_test.go +++ b/telemetry/sdk_aliases_test.go @@ -28,7 +28,11 @@ func TestSDKTelemetryAliasesMatchCoreContracts(t *testing.T) { t.Fatalf("metrics = %#v", got) } - var _ sdk.TelemetryMetricRecord = telemetry.MetricRecord{} - var _ sdk.TelemetryLogRecord = telemetry.LogRecord{} - var _ sdk.TelemetrySpanEvent = telemetry.SpanEvent{} + requireSDKMetricRecord(telemetry.MetricRecord{}) + requireSDKLogRecord(telemetry.LogRecord{}) + requireSDKSpanEvent(telemetry.SpanEvent{}) } + +func requireSDKMetricRecord(sdk.TelemetryMetricRecord) {} +func requireSDKLogRecord(sdk.TelemetryLogRecord) {} +func requireSDKSpanEvent(sdk.TelemetrySpanEvent) {}