Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go/v5 v5.8.3 h1:s58CUJ9s8lezjhTNJO/SxkPBv2qZjS3ktpRSqGF5n0s=
github.com/DataDog/datadog-go/v5 v5.8.3/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw=
github.com/GoCodeAlone/go-plugin v1.7.0 h1:EwnhqPlXiNmp85S+MXnKKvm3YlfA6O4NzBb4+GSlEVY=
github.com/GoCodeAlone/go-plugin v1.7.0/go.mod h1:HbGQRZUIa+jbDfjsaZIMJYvrz+LnxL0mJpggfynSTMk=
github.com/GoCodeAlone/modular v1.13.2 h1:qmiSgLrbDUIHzT1CHNh8Zbm3nYCneksymsmECMyG5FI=
github.com/GoCodeAlone/modular v1.13.2/go.mod h1:b06Pvgcc8HsGxvl30iO39zGH2jIWz467QEj2+OQL2Do=
github.com/GoCodeAlone/modular/modules/auth v1.15.0 h1:pBSkPSf4k4GLSbUQFLuPa+nFbfoJXGzSz9q89VoapZk=
Expand Down Expand Up @@ -324,6 +326,8 @@ github.com/hashicorp/serf v0.10.2 h1:m5IORhuNSjaxeljg5DeQVDlQyVkhRIjJDimbkCa8aAc
github.com/hashicorp/serf v0.10.2/go.mod h1:T1CmSGfSeGfnfNy/w0odXQUR1rfECGd2Qdsp84DjOiY=
github.com/hashicorp/vault/api v1.23.0 h1:gXgluBsSECfRWTSW9niY2jwg2e9mMJc4WoHNv4g3h6A=
github.com/hashicorp/vault/api v1.23.0/go.mod h1:zransKiB9ftp+kgY8ydjnvCU7Wk8i9L0DYWpXeMj9ko=
github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8=
github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns=
github.com/itchyny/gojq v0.12.19 h1:ttXA0XCLEMoaLOz5lSeFOZ6u6Q3QxmG46vfgI4O0DEs=
github.com/itchyny/gojq v0.12.19/go.mod h1:5galtVPDywX8SPSOrqjGxkBeDhSxEW1gSxoy7tn1iZY=
github.com/itchyny/timefmt-go v0.1.8 h1:1YEo1JvfXeAHKdjelbYr/uCuhkybaHCeTkH8Bo791OI=
Expand Down Expand Up @@ -391,6 +395,8 @@ github.com/minio/highwayhash v1.0.4 h1:asJizugGgchQod2ja9NJlGOWq4s7KsAWr5XUc9Clg
github.com/minio/highwayhash v1.0.4/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
Expand Down Expand Up @@ -441,6 +447,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/oklog/run v1.2.0 h1:O8x3yXwah4A73hJdlrwo/2X6J62gE5qTMusH0dvz60E=
github.com/oklog/run v1.2.0/go.mod h1:mgDbKRSwPhJfesJ4PntqFUbKQRZ50NgmZTSPlFA0YFk=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
Expand Down
108 changes: 108 additions & 0 deletions module/telemetry_bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package module

import (
"context"
"sync"
"time"

"github.com/GoCodeAlone/modular"
"github.com/GoCodeAlone/workflow/telemetry"
)

const defaultTelemetryBridgeInterval = 30 * time.Second

type TelemetryBridgeConfig struct {
Interval time.Duration `yaml:"interval" json:"interval"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

type TelemetryBridge struct {
name string
config TelemetryBridgeConfig
bridge *telemetry.Bridge
app modular.Application
logger modular.Logger

mu sync.Mutex
cancel context.CancelFunc
}

func NewTelemetryBridge(name string, sink telemetry.TelemetrySink, config TelemetryBridgeConfig) *TelemetryBridge {
if config.Interval <= 0 {
config.Interval = defaultTelemetryBridgeInterval
}
return &TelemetryBridge{
name: name,
config: config,
bridge: telemetry.NewBridge(sink, telemetry.BridgeConfig{Timeout: config.Timeout}),
}
}

func (m *TelemetryBridge) Name() string {
return m.name
}

func (m *TelemetryBridge) Init(app modular.Application) error {
m.app = app
m.logger = app.Logger()
return app.RegisterService("telemetry.bridge", m)
}

func (m *TelemetryBridge) Start(ctx context.Context) error {
m.mu.Lock()
if m.cancel != nil {
m.mu.Unlock()
return nil
}
runCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
m.mu.Unlock()

m.collect(runCtx)
go m.run(runCtx)
return nil
}

func (m *TelemetryBridge) Stop(context.Context) error {
m.mu.Lock()
cancel := m.cancel
m.cancel = nil
m.mu.Unlock()
if cancel != nil {
cancel()
}
return nil
}

func (m *TelemetryBridge) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: "telemetry.bridge",
Description: "Host-side telemetry bridge for neutral Workflow emitters",
Instance: m,
},
}
}

func (m *TelemetryBridge) RequiresServices() []modular.ServiceDependency {
return nil
}

func (m *TelemetryBridge) run(ctx context.Context) {
ticker := time.NewTicker(m.config.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.collect(ctx)
}
}
}

func (m *TelemetryBridge) collect(ctx context.Context) {
if err := m.bridge.Collect(ctx, m.app); err != nil && m.logger != nil {
m.logger.Warn("telemetry bridge collection failed", "error", err)
}
}
77 changes: 77 additions & 0 deletions module/telemetry_bridge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package module

import (
"context"
"sync"
"testing"
"time"

"github.com/GoCodeAlone/workflow/telemetry"
)

type telemetryBridgeMetricEmitter struct{}

func (telemetryBridgeMetricEmitter) EmitMetrics(_ context.Context, r telemetry.MetricRecorder) error {
r.Counter("requests_total", 1, nil)
return nil
}

type telemetryBridgeRecordingSink struct {
mu sync.Mutex
metrics []telemetry.MetricRecord
}

func (s *telemetryBridgeRecordingSink) RecordMetrics(_ context.Context, records []telemetry.MetricRecord) error {
s.mu.Lock()
defer s.mu.Unlock()
s.metrics = append(s.metrics, records...)
return nil
}

func (s *telemetryBridgeRecordingSink) RecordLogs(context.Context, []telemetry.LogRecord) error {
return nil
}

func (s *telemetryBridgeRecordingSink) RecordSpanEvents(context.Context, []telemetry.SpanEvent) error {
return nil
}

func (s *telemetryBridgeRecordingSink) count() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.metrics)
}

func TestTelemetryBridgeModuleCollectsOnInterval(t *testing.T) {
app := NewMockApplication()
if err := app.RegisterService("emitter", telemetryBridgeMetricEmitter{}); err != nil {
t.Fatal(err)
}
sink := &telemetryBridgeRecordingSink{}
mod := NewTelemetryBridge("telemetry-bridge", sink, TelemetryBridgeConfig{
Interval: 10 * time.Millisecond,
Timeout: time.Second,
})
if err := mod.Init(app); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := mod.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := mod.Stop(context.Background()); err != nil {
t.Fatalf("Stop: %v", err)
}
})

deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if sink.count() > 0 {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatal("bridge did not collect metrics")
}
12 changes: 12 additions & 0 deletions plugin/external/sdk/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"

pb "github.com/GoCodeAlone/workflow/plugin/external/proto"
"github.com/GoCodeAlone/workflow/telemetry"
"google.golang.org/protobuf/types/known/anypb"
)

Expand Down Expand Up @@ -161,3 +162,14 @@ type ServiceContextInvoker interface {
type TypedServiceInvoker interface {
InvokeTypedMethod(method string, input *anypb.Any) (*anypb.Any, error)
}

type TelemetryAttrs = telemetry.Attrs
type TelemetryMetricKind = telemetry.MetricKind
type TelemetryMetricRecord = telemetry.MetricRecord
type TelemetryMetricRecorder = telemetry.MetricRecorder
type TelemetryMetricEmitter = telemetry.MetricEmitter
type TelemetryLogRecord = telemetry.LogRecord
type TelemetryLogEmitter = telemetry.LogEmitter
type TelemetrySpanEvent = telemetry.SpanEvent
type TelemetrySpanRecorder = telemetry.SpanRecorder
type TelemetryTraceAnnotator = telemetry.TraceAnnotator
48 changes: 46 additions & 2 deletions plugins/observability/plugin_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package observability

import (
"context"
"testing"

"github.com/GoCodeAlone/modular"
"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/plugin"
"github.com/GoCodeAlone/workflow/schema"
)
Expand Down Expand Up @@ -239,12 +243,13 @@ func TestWiringHooks(t *testing.T) {
p := New()
hooks := p.WiringHooks()

if len(hooks) != 5 {
t.Fatalf("WiringHooks() count = %d, want 5", len(hooks))
if len(hooks) != 6 {
t.Fatalf("WiringHooks() count = %d, want 6", len(hooks))
}

expectedNames := map[string]bool{
"observability.otel-middleware": false,
"observability.telemetry-bridge": false,
"observability.health-endpoints": false,
"observability.metrics-endpoint": false,
"observability.log-endpoint": false,
Expand All @@ -269,6 +274,45 @@ func TestWiringHooks(t *testing.T) {
}
}

func TestWireTelemetryBridgeRegistersBridge(t *testing.T) {
app := module.NewMockApplication()
app.RegisterModule(&testTelemetryModule{name: "telemetry"})
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "telemetry", Type: "observability.telemetry"},
},
}

if err := wireTelemetryBridge(app, cfg); err != nil {
t.Fatal(err)
}
svc, ok := app.Services["telemetry.bridge"].(*module.TelemetryBridge)
if !ok {
t.Fatalf("telemetry.bridge service = %#v", app.Services["telemetry.bridge"])
}
if err := svc.Stop(context.Background()); err != nil {
t.Fatal(err)
}
}

type testTelemetryModule struct {
name string
}

func (m *testTelemetryModule) Name() string { return m.name }
func (m *testTelemetryModule) Init(modular.Application) error {
return nil
}
func (m *testTelemetryModule) ProvidesServices() []modular.ServiceProvider {
return nil
}
func (m *testTelemetryModule) RequiresServices() []modular.ServiceDependency {
return nil
}
func (m *testTelemetryModule) InvokeServiceContext(context.Context, string, map[string]any) (map[string]any, error) {
return map[string]any{"accepted": true}, nil
}

func TestStepFactories(t *testing.T) {
p := New()
steps := p.StepFactories()
Expand Down
49 changes: 49 additions & 0 deletions plugins/observability/wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package observability

import (
"context"
"time"

"github.com/GoCodeAlone/modular"
"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/plugin"
"github.com/GoCodeAlone/workflow/telemetry"
)

// wiringHooks returns post-init wiring functions that connect observability
Expand Down Expand Up @@ -41,7 +43,54 @@ func wiringHooks() []plugin.WiringHook {
Priority: 40, // run after health/metrics so routes are stable
Hook: wireOpenAPIEndpoints,
},
{
Name: "observability.telemetry-bridge",
Priority: 10,
Hook: wireTelemetryBridge,
},
}
}

type legacyServiceInvoker interface {
InvokeService(string, map[string]any) (map[string]any, error)
}

type legacyServiceInvokerAdapter struct {
invoker legacyServiceInvoker
}

func (a legacyServiceInvokerAdapter) InvokeServiceContext(_ context.Context, method string, args map[string]any) (map[string]any, error) {
return a.invoker.InvokeService(method, args)
}

func wireTelemetryBridge(app modular.Application, cfg *config.WorkflowConfig) error {
sink := telemetry.TelemetrySink(telemetry.NoopSink{})
if cfg != nil {
for _, modCfg := range cfg.Modules {
if modCfg.Type != "observability.telemetry" {
continue
}
mod := app.GetModule(modCfg.Name)
if invoker, ok := mod.(telemetry.ContextServiceInvoker); ok {
sink = telemetry.NewServiceInvokerSink(invoker)
break
}
if invoker, ok := mod.(legacyServiceInvoker); ok {
sink = telemetry.NewServiceInvokerSink(legacyServiceInvokerAdapter{invoker: invoker})
break
}
}
}

bridge := module.NewTelemetryBridge("observability.telemetry-bridge", sink, module.TelemetryBridgeConfig{
Interval: 30 * time.Second,
Timeout: 2 * time.Second,
})
if err := bridge.Init(app); err != nil {
return err
}
app.RegisterModule(bridge)
return bridge.Start(context.Background())
}

// wireHealthEndpoints registers health check endpoints on any available router,
Expand Down
Loading
Loading