From 3b7002ab0420a3bb0fc120aad5b46395c1eb84f4 Mon Sep 17 00:00:00 2001 From: Arkadiusz Komarzewski Date: Thu, 6 Nov 2025 12:45:44 +0100 Subject: [PATCH 1/2] DENG-9533 - Go server template: support publishing directly to Pub/Sub --- glean_parser/go_server.py | 49 +++++- glean_parser/templates/go_server.jinja2 | 196 ++++++++++++++++++++++++ glean_parser/translate.py | 3 +- tests/test_go_server.py | 70 +++++++++ 4 files changed, 312 insertions(+), 6 deletions(-) diff --git a/glean_parser/go_server.py b/glean_parser/go_server.py index 9f8fdfd9d..9a9533bf8 100644 --- a/glean_parser/go_server.py +++ b/glean_parser/go_server.py @@ -11,15 +11,18 @@ generates does not use the Glean SDK. It is meant to be used to collect events in server-side environments. In these environments SDK assumptions to measurement window and connectivity don't hold. + Generated code takes care of assembling pings with metrics, and serializing to messages -conforming to Glean schema. +conforming to Glean schema. Two transport modes are supported: +- Cloud Logging (go_server): Logs to stdout in MozLog format for ingestion via GCP log routing +- Pub/Sub (go_server_pubsub): Publishes directly to GCP Pub/Sub topics Warning: this outputter supports limited set of metrics, see `SUPPORTED_METRIC_TYPES` below. Generated code creates two methods for each ping (`RecordPingX` and `RecordPingXWithoutUserInfo`) -that are used for submitting (logging) them. -If pings have `event` metrics assigned, they can be passed to these methods. +that are used for submitting events. If pings have `event` metrics assigned, they can be +passed to these methods. """ from collections import defaultdict @@ -79,7 +82,10 @@ def clean_string(s: str) -> str: def output_go( - objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] + objs: metrics.ObjectTree, + output_dir: Path, + options: Optional[Dict[str, Any]], + transport: str = "logging", ) -> None: """ Given a tree of objects, output Go code to `output_dir`. @@ -90,6 +96,8 @@ def output_go( :param objects: A tree of objects (metrics and pings) as returned from `parser.parse_objects`. :param output_dir: Path to an output directory to write to. + :param transport: Transport mode - either "logging" (Cloud Logging) or + "pubsub" (Pub/Sub direct publishing). Default is "logging". """ template = util.get_jinja2_template( @@ -147,6 +155,37 @@ def output_go( with filepath.open("w", encoding="utf-8") as fd: fd.write( template.render( - parser_version=__version__, pings=ping_to_metrics, events=event_metrics + parser_version=__version__, + pings=ping_to_metrics, + events=event_metrics, + transport=transport, ) ) + + +def output_go_logger( + objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None +) -> None: + """ + Given a tree of objects, output Go code using Cloud Logging transport. + + :param objects: A tree of objects (metrics and pings) as returned from + `parser.parse_objects`. + :param output_dir: Path to an output directory to write to. + :param options: options dictionary (currently unused for Go). + """ + output_go(objs, output_dir, options, transport="logging") + + +def output_go_pubsub( + objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None +) -> None: + """ + Given a tree of objects, output Go code using Pub/Sub transport. + + :param objects: A tree of objects (metrics and pings) as returned from + `parser.parse_objects`. + :param output_dir: Path to an output directory to write to. + :param options: options dictionary (currently unused for Go). + """ + output_go(objs, output_dir, options, transport="pubsub") diff --git a/glean_parser/templates/go_server.jinja2 b/glean_parser/templates/go_server.jinja2 index f3ebb1481..59f6c054c 100644 --- a/glean_parser/templates/go_server.jinja2 +++ b/glean_parser/templates/go_server.jinja2 @@ -9,6 +9,18 @@ package glean // required imports import ( +{% if transport == "pubsub" %} + "context" + "encoding/json" + "fmt" + "log" + "sync" + "sync/atomic" + "time" + + "cloud.google.com/go/pubsub" + "github.com/google/uuid" +{% else %} "encoding/json" "errors" "fmt" @@ -17,8 +29,10 @@ import ( "time" "github.com/google/uuid" +{% endif %} ) +{% if transport == "logging" %} // log type string used to identify logs to process in the Moz Data Pipeline var gleanEventMozlogType string = "glean-server-event" @@ -32,6 +46,22 @@ type GleanEventsLogger struct { AppChannel string // Channel to differentiate logs from prod/beta/staging/devel Writer io.Writer // Writer to output to. Normal operation expects os.Stdout } +{% else %} +// GleanEventsPublisher publishes Glean events directly to Pub/Sub +type GleanEventsPublisher struct { + AppID string // Application ID to identify application per Glean standards + AppDisplayVersion string // Version of application emitting the event + AppChannel string // Channel to differentiate logs from prod/beta/staging/devel + + topic *pubsub.Topic + ctx context.Context + publishedCount atomic.Int64 + errorCount atomic.Int64 + pendingResults chan *pubsub.PublishResult + wg sync.WaitGroup + once sync.Once +} +{% endif %} // exported type for public method parameters type RequestInfo struct { @@ -89,14 +119,20 @@ type gleanEvent struct { Extra map[string]string `json:"extra"` } +{% if transport == "logging" %} type logEnvelope struct { Timestamp string Logger string Type string Fields ping } +{% endif %} +{% if transport == "pubsub" %} +func (g *GleanEventsPublisher) createClientInfo() clientInfo { +{% else %} func (g GleanEventsLogger) createClientInfo() clientInfo { +{% endif %} // Fields with default values are required in the Glean schema, but not used in server context return clientInfo{ TelemetrySDKBuild: "glean_parser v{{ parser_version }}", @@ -120,7 +156,11 @@ func createPingInfo() pingInfo { } } +{% if transport == "pubsub" %} +func (g *GleanEventsPublisher) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) { +{% else %} func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) { +{% endif %} payloadJson, err := json.Marshal(payload) if err != nil { return ping{}, err @@ -142,6 +182,149 @@ func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, p }, nil } +{% if transport == "pubsub" %} +// Stats holds publishing statistics +type Stats struct { + Published int64 + Errors int64 +} + +// NewGleanEventsPublisher creates a new Pub/Sub-based Glean events publisher +// Authentication uses Application Default Credentials (ADC) - typically GKE Workload Identity +func NewGleanEventsPublisher(ctx context.Context, projectID, topicID, appID, appDisplayVersion, appChannel string) (*GleanEventsPublisher, error) { + // Create Pub/Sub client using Application Default Credentials + client, err := pubsub.NewClient(ctx, projectID) + if err != nil { + return nil, fmt.Errorf("failed to create pubsub client: %w", err) + } + + topic := client.Topic(topicID) + + // Configure publish settings following Pub/Sub best practices for high throughput + // https://cloud.google.com/pubsub/docs/publish-best-practices + topic.PublishSettings = pubsub.PublishSettings{ + CountThreshold: 1000, + ByteThreshold: 10e6, + DelayThreshold: 100 * time.Millisecond, + + // Flow control prevents memory exhaustion when producing faster than publishing + FlowControlSettings: pubsub.FlowControlSettings{ + MaxOutstandingMessages: 100000, + MaxOutstandingBytes: 200e6, + LimitExceededBehavior: pubsub.FlowControlBlock, // Block Publish() calls when limit exceeded (backpressure) + }, + } + + publisher := &GleanEventsPublisher{ + AppID: appID, + AppDisplayVersion: appDisplayVersion, + AppChannel: appChannel, + topic: topic, + ctx: ctx, + pendingResults: make(chan *pubsub.PublishResult, 10000), // Buffer for pending results + } + + // Start background goroutine to process publish results asynchronously + publisher.wg.Add(1) + go publisher.processResults() + + return publisher, nil +} + +// processResults runs in background to handle publish results asynchronously +func (g *GleanEventsPublisher) processResults() { + defer g.wg.Done() + + for result := range g.pendingResults { + // Get() blocks until the publish completes or fails + _, err := result.Get(g.ctx) + if err != nil { + g.errorCount.Add(1) + // Log error but don't block on I/O + log.Printf("Pub/Sub publish error: %v", err) + } else { + g.publishedCount.Add(1) + } + } +} + +// construct the Glean envelope and publish to Pub/Sub asynchronously +func (g *GleanEventsPublisher) publish( + documentType string, + requestInfo RequestInfo, + metrics metrics, + events []gleanEvent, +) error { + var telemetryPayload = pingPayload{ + ClientInfo: g.createClientInfo(), + PingInfo: createPingInfo(), + Metrics: metrics, + Events: events, + } + + ping, err := g.createPing(documentType, requestInfo, telemetryPayload) + if err != nil { + return fmt.Errorf("failed to create ping: %w", err) + } + + var envelope = ping + + envelopeJSON, err := json.Marshal(envelope) + if err != nil { + return fmt.Errorf("failed to marshal envelope: %w", err) + } + + // Publish message asynchronously + // Publish() returns immediately with a PublishResult future + result := g.topic.Publish(g.ctx, &pubsub.Message{ + Data: envelopeJSON, + }) + + // Send result to background processor (non-blocking if channel has space) + select { + case g.pendingResults <- result: + // Successfully queued + return nil + case <-g.ctx.Done(): + // Context cancelled + return g.ctx.Err() + } +} + +// Stats returns current publishing statistics +func (g *GleanEventsPublisher) Stats() Stats { + return Stats{ + Published: g.publishedCount.Load(), + Errors: g.errorCount.Load(), + } +} + +// Flush waits for all pending publish operations to complete +// Call this before shutdown to ensure no messages are lost +func (g *GleanEventsPublisher) Flush() { + log.Println("Flushing pending publishes...") + + // Stop accepting new messages and flush pending batches + g.topic.Stop() + + // Close results channel to signal background processor to finish + g.once.Do(func() { + close(g.pendingResults) + }) + + // Wait for background processor to finish + g.wg.Wait() + + stats := g.Stats() + log.Printf("Flush complete. Published: %d, Errors: %d", stats.Published, stats.Errors) +} + +// Close performs graceful shutdown, flushing all pending messages +func (g *GleanEventsPublisher) Close() error { + g.Flush() + return nil +} +{% else %} // method called by each ping-specific record method. // construct the ping, wrap it in the envelope, and print to stdout func (g GleanEventsLogger) record( @@ -180,6 +363,7 @@ func (g GleanEventsLogger) record( fmt.Fprintln(g.Writer, string(envelopeJson)) return nil } +{% endif %} {# if any ping has an event metric, create methods and types for them #} {% if events %} @@ -250,7 +434,11 @@ type {{ ping|ping_type_name }} struct { } // Record and submit `{{ ping }}` ping +{% if transport == "pubsub" %} +func (g *GleanEventsPublisher) Record{{ ping|ping_type_name }}( +{% else %} func (g GleanEventsLogger) Record{{ ping|ping_type_name }}( +{% endif %} requestInfo RequestInfo, params {{ ping|ping_type_name }}, ) error { @@ -276,11 +464,19 @@ func (g GleanEventsLogger) Record{{ ping|ping_type_name }}( events = append(events, params.Event.gleanEvent()) } {% endif %} +{% if transport == "pubsub" %} + return g.publish("{{ ping }}", requestInfo, metrics, events) +{% else %} return g.record("{{ ping }}", requestInfo, metrics, events) +{% endif %} } // Record and submit `{{ ping }}` ping omitting user request info +{% if transport == "pubsub" %} +func (g *GleanEventsPublisher) Record{{ ping|ping_type_name}}WithoutUserInfo( +{% else %} func (g GleanEventsLogger) Record{{ ping|ping_type_name}}WithoutUserInfo( +{% endif %} params {{ ping|ping_type_name}}, ) error { return g.Record{{ ping|ping_type_name }}(defaultRequestInfo, params) diff --git a/glean_parser/translate.py b/glean_parser/translate.py index 303ea15a6..98363c277 100644 --- a/glean_parser/translate.py +++ b/glean_parser/translate.py @@ -56,7 +56,8 @@ def __init__( OUTPUTTERS = { - "go_server": Outputter(go_server.output_go, []), + "go_server": Outputter(go_server.output_go_logger, []), + "go_server_pubsub": Outputter(go_server.output_go_pubsub, []), "javascript": Outputter(javascript.output_javascript, []), "typescript": Outputter(javascript.output_typescript, []), "javascript_server": Outputter(javascript_server.output_javascript, []), diff --git a/tests/test_go_server.py b/tests/test_go_server.py index 26b1e2825..101e74718 100644 --- a/tests/test_go_server.py +++ b/tests/test_go_server.py @@ -397,3 +397,73 @@ def test_run_logging_custom_ping_with_event(tmp_path): assert validate_ping.validate_ping(input, output, schema_url=schema_url) == 0, ( output.getvalue() ) + + +def test_parser_go_server_pubsub_generation(tmp_path): + """Test that parser generates pubsub transport code correctly""" + translate.translate( + ROOT / "data" / "go_server_events_only_metrics.yaml", + "go_server_pubsub", + tmp_path, + ) + + assert set(x.name for x in tmp_path.iterdir()) == set(["server_events.go"]) + + # Verify pubsub-specific code exists in generated file + with (tmp_path / "server_events.go").open("r", encoding="utf-8") as fd: + content = fd.read() + + # Check for pubsub imports + assert "cloud.google.com/go/pubsub" in content + assert "sync/atomic" in content + assert "context" in content + + # Check for GleanEventsPublisher struct (not GleanEventsLogger) + assert "type GleanEventsPublisher struct" in content + assert "type GleanEventsLogger struct" not in content + + # Check for pubsub-specific methods + assert "func NewGleanEventsPublisher" in content + assert "func (g *GleanEventsPublisher) publish(" in content + assert "func (g *GleanEventsPublisher) Stats()" in content + assert "func (g *GleanEventsPublisher) Flush()" in content + assert "func (g *GleanEventsPublisher) Close()" in content + + # Check for Stats type + assert "type Stats struct" in content + + # Check there's no MozLog envelope + assert "type logEnvelope struct" not in content + assert "gleanEventMozlogType" not in content + + # Check Record methods use pointer receiver + assert "func (g *GleanEventsPublisher) RecordEventsPing(" in content + + +def test_parser_go_server_logging_backward_compat(tmp_path): + """Test that default go_server outputter still generates logging code""" + translate.translate( + ROOT / "data" / "go_server_events_only_metrics.yaml", + "go_server", + tmp_path, + ) + + with (tmp_path / "server_events.go").open("r", encoding="utf-8") as fd: + content = fd.read() + + # Check for logging-specific code + assert "type GleanEventsLogger struct" in content + assert "type GleanEventsPublisher struct" not in content + assert "io.Writer" in content + assert "type logEnvelope struct" in content + assert "gleanEventMozlogType" in content + + # Check no pubsub imports + assert "cloud.google.com/go/pubsub" not in content + assert "sync/atomic" not in content + + # Check Record methods use value receiver + assert "func (g GleanEventsLogger) RecordEventsPing(" in content + + # Check SDK build string does not include (pubsub) + assert f"glean_parser v{glean_parser.__version__} (pubsub)" not in content From 79c54b9edfda76a526e29446ee7e60195228644b Mon Sep 17 00:00:00 2001 From: Arkadiusz Komarzewski Date: Thu, 6 Nov 2025 17:40:14 +0100 Subject: [PATCH 2/2] update template - add prometheus telemetry --- glean_parser/templates/go_server.jinja2 | 79 ++++++++++++++----------- tests/test_go_server.py | 20 +++++-- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/glean_parser/templates/go_server.jinja2 b/glean_parser/templates/go_server.jinja2 index 59f6c054c..278ae7095 100644 --- a/glean_parser/templates/go_server.jinja2 +++ b/glean_parser/templates/go_server.jinja2 @@ -15,11 +15,12 @@ import ( "fmt" "log" "sync" - "sync/atomic" "time" "cloud.google.com/go/pubsub" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" {% else %} "encoding/json" "errors" @@ -47,6 +48,23 @@ type GleanEventsLogger struct { Writer io.Writer // Writer to output to. Normal operation expects os.Stdout } {% else %} +// Prometheus metrics for monitoring publishing to Pub/Sub +var ( + gleanPublishTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "glean_pubsub_publish_total", + Help: "Total number of Glean events published to Pub/Sub by status (success/error)", + }, + []string{"app_id", "document_type", "status"}, + ) +) + +// publishRequest tracks metadata for async publish operations +type publishRequest struct { + result *pubsub.PublishResult + documentType string +} + // GleanEventsPublisher publishes Glean events directly to Pub/Sub type GleanEventsPublisher struct { AppID string // Application ID to identify application per Glean standards @@ -55,9 +73,7 @@ type GleanEventsPublisher struct { topic *pubsub.Topic ctx context.Context - publishedCount atomic.Int64 - errorCount atomic.Int64 - pendingResults chan *pubsub.PublishResult + pendingResults chan *publishRequest wg sync.WaitGroup once sync.Once } @@ -183,16 +199,9 @@ func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, p } {% if transport == "pubsub" %} -// Stats holds publishing statistics -type Stats struct { - Published int64 - Errors int64 -} - // NewGleanEventsPublisher creates a new Pub/Sub-based Glean events publisher // Authentication uses Application Default Credentials (ADC) - typically GKE Workload Identity func NewGleanEventsPublisher(ctx context.Context, projectID, topicID, appID, appDisplayVersion, appChannel string) (*GleanEventsPublisher, error) { - // Create Pub/Sub client using Application Default Credentials client, err := pubsub.NewClient(ctx, projectID) if err != nil { return nil, fmt.Errorf("failed to create pubsub client: %w", err) @@ -200,18 +209,16 @@ func NewGleanEventsPublisher(ctx context.Context, projectID, topicID, appID, app topic := client.Topic(topicID) - // Configure publish settings following Pub/Sub best practices for high throughput - // https://cloud.google.com/pubsub/docs/publish-best-practices topic.PublishSettings = pubsub.PublishSettings{ CountThreshold: 1000, ByteThreshold: 10e6, - DelayThreshold: 100 * time.Millisecond, + DelayThreshold: 10 * time.Millisecond, // Flow control prevents memory exhaustion when producing faster than publishing FlowControlSettings: pubsub.FlowControlSettings{ MaxOutstandingMessages: 100000, MaxOutstandingBytes: 200e6, - LimitExceededBehavior: pubsub.FlowControlBlock, // Block Publish() calls when limit exceeded (backpressure) + LimitExceededBehavior: pubsub.FlowControlBlock, }, } @@ -221,7 +228,7 @@ func NewGleanEventsPublisher(ctx context.Context, projectID, topicID, appID, app AppChannel: appChannel, topic: topic, ctx: ctx, - pendingResults: make(chan *pubsub.PublishResult, 10000), // Buffer for pending results + pendingResults: make(chan *publishRequest, 10000), } // Start background goroutine to process publish results asynchronously @@ -235,16 +242,20 @@ func NewGleanEventsPublisher(ctx context.Context, projectID, topicID, appID, app func (g *GleanEventsPublisher) processResults() { defer g.wg.Done() - for result := range g.pendingResults { - // Get() blocks until the publish completes or fails - _, err := result.Get(g.ctx) + for req := range g.pendingResults { + _, err := req.result.Get(g.ctx) + + status := "success" if err != nil { - g.errorCount.Add(1) - // Log error but don't block on I/O + status = "error" log.Printf("Pub/Sub publish error: %v", err) - } else { - g.publishedCount.Add(1) } + + gleanPublishTotal.WithLabelValues( + g.AppID, + req.documentType, + status, + ).Inc() } } @@ -280,9 +291,15 @@ func (g *GleanEventsPublisher) publish( Data: envelopeJSON, }) - // Send result to background processor (non-blocking if channel has space) + // Create publish request with metadata for metrics tracking + req := &publishRequest{ + result: result, + documentType: documentType, + } + + // Send request to background processor (non-blocking if channel has space) select { - case g.pendingResults <- result: + case g.pendingResults <- req: // Successfully queued return nil case <-g.ctx.Done(): @@ -291,14 +308,6 @@ func (g *GleanEventsPublisher) publish( } } -// Stats returns current publishing statistics -func (g *GleanEventsPublisher) Stats() Stats { - return Stats{ - Published: g.publishedCount.Load(), - Errors: g.errorCount.Load(), - } -} - // Flush waits for all pending publish operations to complete // Call this before shutdown to ensure no messages are lost func (g *GleanEventsPublisher) Flush() { @@ -314,9 +323,7 @@ func (g *GleanEventsPublisher) Flush() { // Wait for background processor to finish g.wg.Wait() - - stats := g.Stats() - log.Printf("Flush complete. Published: %d, Errors: %d", stats.Published, stats.Errors) + log.Println("All pending publishes flushed.") } // Close performs graceful shutdown, flushing all pending messages diff --git a/tests/test_go_server.py b/tests/test_go_server.py index 101e74718..a76c4fe2f 100644 --- a/tests/test_go_server.py +++ b/tests/test_go_server.py @@ -415,22 +415,30 @@ def test_parser_go_server_pubsub_generation(tmp_path): # Check for pubsub imports assert "cloud.google.com/go/pubsub" in content - assert "sync/atomic" in content assert "context" in content + # Check for Prometheus imports (not atomic) + assert "github.com/prometheus/client_golang/prometheus" in content + assert "github.com/prometheus/client_golang/prometheus/promauto" in content + assert "sync/atomic" not in content + # Check for GleanEventsPublisher struct (not GleanEventsLogger) assert "type GleanEventsPublisher struct" in content assert "type GleanEventsLogger struct" not in content + # Check for Prometheus metrics + assert "gleanPublishTotal" in content + assert "type publishRequest struct" in content + # Check for pubsub-specific methods assert "func NewGleanEventsPublisher" in content assert "func (g *GleanEventsPublisher) publish(" in content - assert "func (g *GleanEventsPublisher) Stats()" in content assert "func (g *GleanEventsPublisher) Flush()" in content assert "func (g *GleanEventsPublisher) Close()" in content - # Check for Stats type - assert "type Stats struct" in content + # Check Stats methods are removed + assert "func (g *GleanEventsPublisher) Stats()" not in content + assert "type Stats struct" not in content # Check there's no MozLog envelope assert "type logEnvelope struct" not in content @@ -458,9 +466,9 @@ def test_parser_go_server_logging_backward_compat(tmp_path): assert "type logEnvelope struct" in content assert "gleanEventMozlogType" in content - # Check no pubsub imports + # Check no pubsub or Prometheus imports assert "cloud.google.com/go/pubsub" not in content - assert "sync/atomic" not in content + assert "prometheus" not in content # Check Record methods use value receiver assert "func (g GleanEventsLogger) RecordEventsPing(" in content