Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/wfctl/type_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func KnownModuleTypes() map[string]ModuleTypeInfo {
Type: "metrics.collector",
Plugin: "observability",
Stateful: false,
ConfigKeys: []string{"namespace", "subsystem", "metricsPath", "enabledMetrics"},
ConfigKeys: []string{"namespace", "subsystem", "enabledMetrics"},
},
"observability.telemetry": {
Type: "observability.telemetry",
Expand Down
5 changes: 1 addition & 4 deletions data/registry/plugins/http/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@
"wiringHooks": [
"http-auth-provider-wiring",
"http-static-fileserver-registration",
"http-health-endpoint-registration",
"http-metrics-endpoint-registration",
"http-log-endpoint-registration",
"http-openapi-endpoint-registration"
"http-cors-global-wiring"
]
}
}
9 changes: 6 additions & 3 deletions data/registry/plugins/observability/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
"health.checker",
"log.collector",
"observability.otel",
"openapi.generator"
"openapi.generator",
"http.middleware.otel",
"tracing.propagation"
],
"stepTypes": [],
"triggerTypes": [],
"workflowHandlers": [],
"wiringHooks": [
"observability.otel-middleware",
"observability.health-endpoints",
"observability.metrics-endpoint",
"observability.log-endpoint",
"observability.openapi-endpoints"
"observability.openapi-endpoints",
"observability.telemetry-bridge"
]
}
}
8 changes: 0 additions & 8 deletions deploy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ helm upgrade --install workflow ./deploy/helm/workflow \
| `podDisruptionBudget.minAvailable` | 1 | Min available pods |
| `ingress.enabled` | false | Enable Ingress |
| `ingress.className` | "" | Ingress class (e.g., nginx, alb) |
| `monitoring.serviceMonitor.enabled` | false | Enable Prometheus ServiceMonitor |
| `envFromSecret` | "" | Name of K8s Secret for env vars |

### Production Helm values example
Expand Down Expand Up @@ -212,13 +211,6 @@ resources:
cpu: 2000m
memory: 2Gi

monitoring:
enabled: true
serviceMonitor:
enabled: true
labels:
prometheus: kube-prometheus

envFromSecret: workflow-secrets
```

Expand Down
5 changes: 0 additions & 5 deletions deploy/docker-compose/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ global:
evaluation_interval: 15s

scrape_configs:
- job_name: "workflow-server"
static_configs:
- targets: ["workflow-server:8081"]
metrics_path: /metrics

- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
19 changes: 0 additions & 19 deletions deploy/helm/workflow/templates/servicemonitor.yaml

This file was deleted.

8 changes: 0 additions & 8 deletions deploy/helm/workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,3 @@ persistence:
storageClass: ""
accessMode: ReadWriteOnce
size: 1Gi

# Prometheus monitoring
monitoring:
enabled: false
serviceMonitor:
enabled: false
interval: 30s
labels: {}
23 changes: 0 additions & 23 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ servers:
tags:
- name: Health
description: Health check and readiness probes
- name: Metrics
description: Prometheus metrics
- name: Auth
description: JWT authentication (login, register, profile)
- name: Affiliates
Expand Down Expand Up @@ -766,27 +764,6 @@ paths:
schema:
$ref: '#/components/schemas/LiveResponse'

# ─── Metrics (port 8080) ───────────────────────────────────────────
/metrics:
get:
tags: [Metrics]
summary: Prometheus metrics
description: |
Prometheus-format metrics endpoint. Available metrics:
- `workflow_executions_total` (counter)
- `workflow_duration_seconds` (histogram)
- `http_requests_total` (counter)
- `http_request_duration_seconds` (histogram)
- `module_operations_total` (counter)
- `active_workflows` (gauge)
responses:
'200':
description: Prometheus exposition format
content:
text/plain:
schema:
type: string

# ─── Auth (port 8080) ─────────────────────────────────────────────
/api/auth/login:
post:
Expand Down
36 changes: 19 additions & 17 deletions e2e_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/GoCodeAlone/workflow/dynamic"
"github.com/GoCodeAlone/workflow/handlers"
"github.com/GoCodeAlone/workflow/module"
dto "github.com/prometheus/client_model/go"
)

// getFreePort returns an available TCP port for test servers.
Expand Down Expand Up @@ -2462,26 +2463,18 @@ func TestE2E_MetricsAndHealthCheck(t *testing.T) {
mc.SetActiveWorkflows("test-workflow", 5)
mc.RecordModuleOperation("mh-handler", "handle", "success")

// Verify the Prometheus handler serves metrics
metricsHandler := mc.Handler()
if metricsHandler == nil {
t.Fatal("MetricsCollector.Handler() returned nil")
metricFamilies, err := mc.Gather()
if err != nil {
t.Fatalf("Gather metrics failed: %v", err)
}

// Serve metrics via a test HTTP recorder
rec := &testResponseRecorder{headers: make(http.Header), body: &bytes.Buffer{}}
metricsReq, _ := http.NewRequest("GET", "/metrics", nil)
metricsHandler.ServeHTTP(rec, metricsReq)

metricsBody := rec.body.String()
if !strings.Contains(metricsBody, "workflow_executions_total") {
t.Error("Metrics output missing workflow_executions_total")
if !metricFamilyExists(metricFamilies, "workflow_workflow_executions_total") {
t.Error("Metrics output missing workflow_workflow_executions_total")
}
if !strings.Contains(metricsBody, "http_requests_total") {
t.Error("Metrics output missing http_requests_total")
if !metricFamilyExists(metricFamilies, "workflow_http_requests_total") {
t.Error("Metrics output missing workflow_http_requests_total")
}
if !strings.Contains(metricsBody, "active_workflows") {
t.Error("Metrics output missing active_workflows")
if !metricFamilyExists(metricFamilies, "workflow_active_workflows") {
t.Error("Metrics output missing workflow_active_workflows")
}
t.Log(" Prometheus metrics contain expected counters")

Expand Down Expand Up @@ -3006,6 +2999,15 @@ type testResponseRecorder struct {
body *bytes.Buffer
}

func metricFamilyExists(families []*dto.MetricFamily, name string) bool {
for _, family := range families {
if family.GetName() == name {
return true
}
}
return false
}

func (r *testResponseRecorder) Header() http.Header {
return r.headers
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/mattn/go-isatty v0.0.22
github.com/nats-io/nats.go v1.52.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/redis/go-redis/v9 v9.19.0
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -218,7 +219,6 @@ require (
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
Expand Down
24 changes: 4 additions & 20 deletions module/metrics.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package module

import (
"net/http"
"strconv"
"time"

"github.com/GoCodeAlone/modular"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
)

// MetricsCollectorConfig holds configuration for the MetricsCollector module.
type MetricsCollectorConfig struct {
Namespace string `yaml:"namespace" json:"namespace" editor:"type=string,description=Prometheus metric namespace prefix,default=workflow,placeholder=workflow"`
Subsystem string `yaml:"subsystem" json:"subsystem" editor:"type=string,description=Prometheus metric subsystem,placeholder=api"`
MetricsPath string `yaml:"metricsPath" json:"metricsPath" editor:"type=string,description=Endpoint path for Prometheus scraping,default=/metrics,placeholder=/metrics"`
EnabledMetrics []string `yaml:"enabledMetrics" json:"enabledMetrics" editor:"type=array,arrayItemType=string,description=Which metric groups to register (workflow http module active_workflows)"`
}

Expand All @@ -23,7 +21,6 @@ func DefaultMetricsCollectorConfig() MetricsCollectorConfig {
return MetricsCollectorConfig{
Namespace: "workflow",
Subsystem: "",
MetricsPath: "/metrics",
EnabledMetrics: []string{"workflow", "http", "module", "active_workflows"},
}
}
Expand Down Expand Up @@ -135,9 +132,6 @@ func NewMetricsCollectorWithConfig(name string, cfg MetricsCollectorConfig) *Met
return mc
}

// MetricsPath returns the configured metrics endpoint path.
func (m *MetricsCollector) MetricsPath() string { return m.config.MetricsPath }

// Name returns the module name.
func (m *MetricsCollector) Name() string {
return m.name
Expand All @@ -148,9 +142,9 @@ func (m *MetricsCollector) Init(app modular.Application) error {
return app.RegisterService("metrics.collector", m)
}

// Handler returns an HTTP handler that serves Prometheus metrics.
func (m *MetricsCollector) Handler() http.Handler {
return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{})
// Gather returns the current Prometheus metric families without exposing an HTTP endpoint.
func (m *MetricsCollector) Gather() ([]*dto.MetricFamily, error) {
return m.registry.Gather()
}

// RecordWorkflowExecution increments the workflow execution counter.
Expand Down Expand Up @@ -191,16 +185,6 @@ func (m *MetricsCollector) SetActiveWorkflows(workflowType string, count float64
}
}

// MetricsHTTPHandler adapts an http.Handler to the HTTPHandler interface
type MetricsHTTPHandler struct {
Handler http.Handler
}

// Handle implements the HTTPHandler interface
func (h *MetricsHTTPHandler) Handle(w http.ResponseWriter, r *http.Request) {
h.Handler.ServeHTTP(w, r)
}

// ProvidesServices returns the services provided by this module.
func (m *MetricsCollector) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
Expand Down
39 changes: 19 additions & 20 deletions module/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package module

import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

dto "github.com/prometheus/client_model/go"
)

func TestNewMetricsCollector(t *testing.T) {
Expand Down Expand Up @@ -56,31 +54,32 @@ func TestMetricsCollector_SetActiveWorkflows(t *testing.T) {
m.SetActiveWorkflows("http", 3)
}

func TestMetricsCollector_Handler(t *testing.T) {
func TestMetricsCollector_Gather(t *testing.T) {
m := NewMetricsCollector("test-metrics")

// Record some metrics first
m.RecordWorkflowExecution("http", "process", "success")
m.RecordHTTPRequest("GET", "/test", 200, 10*time.Millisecond)

handler := m.Handler()
req := httptest.NewRequest("GET", "/metrics", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)

if rec.Code != http.StatusOK {
t.Errorf("expected 200, got %d", rec.Code)
families, err := m.Gather()
if err != nil {
t.Fatalf("Gather failed: %v", err)
}

body, _ := io.ReadAll(rec.Body)
bodyStr := string(body)

if !strings.Contains(bodyStr, "workflow_executions_total") {
t.Error("expected metrics output to contain workflow_executions_total")
if !hasMetricFamily(families, "workflow_workflow_executions_total") {
t.Error("expected gathered metrics to contain workflow_workflow_executions_total")
}
if !strings.Contains(bodyStr, "http_requests_total") {
t.Error("expected metrics output to contain http_requests_total")
if !hasMetricFamily(families, "workflow_http_requests_total") {
t.Error("expected gathered metrics to contain workflow_http_requests_total")
}
}

func hasMetricFamily(families []*dto.MetricFamily, name string) bool {
for _, mf := range families {
if mf.GetName() == name {
return true
}
}
return false
}

func TestMetricsCollector_ProvidesServices(t *testing.T) {
Expand Down
14 changes: 3 additions & 11 deletions module/reflect_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func TestReflectValidation_HealthCheckerConfig(t *testing.T) {
func TestReflectValidation_MetricsCollectorConfig(t *testing.T) {
fields := schema.GenerateConfigFields(module.MetricsCollectorConfig{})

if len(fields) != 4 {
t.Fatalf("expected 4 tagged fields, got %d", len(fields))
if len(fields) != 3 {
t.Fatalf("expected 3 tagged fields, got %d", len(fields))
}

namespace := fields[0]
Expand All @@ -159,15 +159,7 @@ func TestReflectValidation_MetricsCollectorConfig(t *testing.T) {
t.Errorf("namespace: expected default=workflow, got %v", namespace.DefaultValue)
}

metricsPath := fields[2]
if metricsPath.Key != "metricsPath" {
t.Errorf("expected key=metricsPath, got %q", metricsPath.Key)
}
if metricsPath.DefaultValue != "/metrics" {
t.Errorf("metricsPath: expected default=/metrics, got %v", metricsPath.DefaultValue)
}

enabledMetrics := fields[3]
enabledMetrics := fields[2]
assertField(t, "enabledMetrics", enabledMetrics, schema.FieldTypeArray, false, false)
if enabledMetrics.ArrayItemType != "string" {
t.Errorf("enabledMetrics: expected arrayItemType=string, got %q", enabledMetrics.ArrayItemType)
Expand Down
3 changes: 0 additions & 3 deletions plugins/observability/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ func metricsCollectorFactory(name string, cfg map[string]any) modular.Module {
if v, ok := cfg["subsystem"].(string); ok {
mcCfg.Subsystem = v
}
if v, ok := cfg["metricsPath"].(string); ok {
mcCfg.MetricsPath = v
}
if v, ok := cfg["enabledMetrics"].([]any); ok {
enabled := make([]string, 0, len(v))
for _, item := range v {
Expand Down
Loading
Loading