Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions glean_parser/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
generates does not use the Glean SDK. It is meant to be used to collect events
in server-side environments. In these environments SDK assumptions to measurement
window and connectivity don't hold.

Generated code takes care of assembling pings with metrics, and serializing to messages
conforming to Glean schema.
conforming to Glean schema. Two transport modes are supported:
- Cloud Logging (go_server): Logs to stdout in MozLog format for ingestion via GCP log routing
- Pub/Sub (go_server_pubsub): Publishes directly to GCP Pub/Sub topics

Warning: this outputter supports limited set of metrics,
see `SUPPORTED_METRIC_TYPES` below.

Generated code creates two methods for each ping (`RecordPingX` and `RecordPingXWithoutUserInfo`)
that are used for submitting (logging) them.
If pings have `event` metrics assigned, they can be passed to these methods.
that are used for submitting events. If pings have `event` metrics assigned, they can be
passed to these methods.
"""

from collections import defaultdict
Expand Down Expand Up @@ -79,7 +82,10 @@ def clean_string(s: str) -> str:


def output_go(
objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]]
objs: metrics.ObjectTree,
output_dir: Path,
options: Optional[Dict[str, Any]],
transport: str = "logging",
) -> None:
"""
Given a tree of objects, output Go code to `output_dir`.
Expand All @@ -90,6 +96,8 @@ def output_go(
:param objects: A tree of objects (metrics and pings) as returned from
`parser.parse_objects`.
:param output_dir: Path to an output directory to write to.
:param transport: Transport mode - either "logging" (Cloud Logging) or
"pubsub" (Pub/Sub direct publishing). Default is "logging".
"""

template = util.get_jinja2_template(
Expand Down Expand Up @@ -147,6 +155,37 @@ def output_go(
with filepath.open("w", encoding="utf-8") as fd:
fd.write(
template.render(
parser_version=__version__, pings=ping_to_metrics, events=event_metrics
parser_version=__version__,
pings=ping_to_metrics,
events=event_metrics,
transport=transport,
)
)


def output_go_logger(
objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None
) -> None:
"""
Given a tree of objects, output Go code using Cloud Logging transport.

:param objects: A tree of objects (metrics and pings) as returned from
`parser.parse_objects`.
:param output_dir: Path to an output directory to write to.
:param options: options dictionary (currently unused for Go).
"""
output_go(objs, output_dir, options, transport="logging")


def output_go_pubsub(
objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None
) -> None:
"""
Given a tree of objects, output Go code using Pub/Sub transport.

:param objects: A tree of objects (metrics and pings) as returned from
`parser.parse_objects`.
:param output_dir: Path to an output directory to write to.
:param options: options dictionary (currently unused for Go).
"""
output_go(objs, output_dir, options, transport="pubsub")
203 changes: 203 additions & 0 deletions glean_parser/templates/go_server.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ package glean

// required imports
import (
{% if transport == "pubsub" %}
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
{% else %}
"encoding/json"
"errors"
"fmt"
Expand All @@ -17,8 +30,10 @@ import (
"time"

"github.com/google/uuid"
{% endif %}
)

{% if transport == "logging" %}
// log type string used to identify logs to process in the Moz Data Pipeline
var gleanEventMozlogType string = "glean-server-event"

Expand All @@ -32,6 +47,37 @@ type GleanEventsLogger struct {
AppChannel string // Channel to differentiate logs from prod/beta/staging/devel
Writer io.Writer // Writer to output to. Normal operation expects os.Stdout
}
{% else %}
// Prometheus metrics for monitoring publishing to Pub/Sub
var (
gleanPublishTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "glean_pubsub_publish_total",
Help: "Total number of Glean events published to Pub/Sub by status (success/error)",
},
[]string{"app_id", "document_type", "status"},
)
)

// publishRequest tracks metadata for async publish operations
type publishRequest struct {
result *pubsub.PublishResult
documentType string
}

// GleanEventsPublisher publishes Glean events directly to Pub/Sub
type GleanEventsPublisher struct {
AppID string // Application ID to identify application per Glean standards
AppDisplayVersion string // Version of application emitting the event
AppChannel string // Channel to differentiate logs from prod/beta/staging/devel

topic *pubsub.Topic
ctx context.Context
pendingResults chan *publishRequest
wg sync.WaitGroup
once sync.Once
}
{% endif %}

// exported type for public method parameters
type RequestInfo struct {
Expand Down Expand Up @@ -89,14 +135,20 @@ type gleanEvent struct {
Extra map[string]string `json:"extra"`
}

{% if transport == "logging" %}
type logEnvelope struct {
Timestamp string
Logger string
Type string
Fields ping
}
{% endif %}

{% if transport == "pubsub" %}
func (g *GleanEventsPublisher) createClientInfo() clientInfo {
{% else %}
func (g GleanEventsLogger) createClientInfo() clientInfo {
{% endif %}
// Fields with default values are required in the Glean schema, but not used in server context
return clientInfo{
TelemetrySDKBuild: "glean_parser v{{ parser_version }}",
Expand All @@ -120,7 +172,11 @@ func createPingInfo() pingInfo {
}
}

{% if transport == "pubsub" %}
func (g *GleanEventsPublisher) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) {
{% else %}
func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) {
{% endif %}
payloadJson, err := json.Marshal(payload)
if err != nil {
return ping{}, err
Expand All @@ -142,6 +198,140 @@ func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, p
}, nil
}

{% if transport == "pubsub" %}
// NewGleanEventsPublisher creates a new Pub/Sub-based Glean events publisher
// Authentication uses Application Default Credentials (ADC) - typically GKE Workload Identity
func NewGleanEventsPublisher(ctx context.Context, projectID, topicID, appID, appDisplayVersion, appChannel string) (*GleanEventsPublisher, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("failed to create pubsub client: %w", err)
}

topic := client.Topic(topicID)

topic.PublishSettings = pubsub.PublishSettings{
CountThreshold: 1000,
ByteThreshold: 10e6,
DelayThreshold: 10 * time.Millisecond,

// Flow control prevents memory exhaustion when producing faster than publishing
FlowControlSettings: pubsub.FlowControlSettings{
MaxOutstandingMessages: 100000,
MaxOutstandingBytes: 200e6,
LimitExceededBehavior: pubsub.FlowControlBlock,
},
}

publisher := &GleanEventsPublisher{
AppID: appID,
AppDisplayVersion: appDisplayVersion,
AppChannel: appChannel,
topic: topic,
ctx: ctx,
pendingResults: make(chan *publishRequest, 10000),
}

// Start background goroutine to process publish results asynchronously
publisher.wg.Add(1)
go publisher.processResults()

return publisher, nil
}

// processResults runs in background to handle publish results asynchronously
func (g *GleanEventsPublisher) processResults() {
defer g.wg.Done()

for req := range g.pendingResults {
_, err := req.result.Get(g.ctx)

status := "success"
if err != nil {
status = "error"
log.Printf("Pub/Sub publish error: %v", err)
}

gleanPublishTotal.WithLabelValues(
g.AppID,
req.documentType,
status,
).Inc()
}
}

// construct the Glean envelope and publish to Pub/Sub asynchronously
func (g *GleanEventsPublisher) publish(
documentType string,
requestInfo RequestInfo,
metrics metrics,
events []gleanEvent,
) error {
var telemetryPayload = pingPayload{
ClientInfo: g.createClientInfo(),
PingInfo: createPingInfo(),
Metrics: metrics,
Events: events,
}

ping, err := g.createPing(documentType, requestInfo, telemetryPayload)
if err != nil {
return fmt.Errorf("failed to create ping: %w", err)
}

var envelope = ping

envelopeJSON, err := json.Marshal(envelope)
if err != nil {
return fmt.Errorf("failed to marshal envelope: %w", err)
}

// Publish message asynchronously
// Publish() returns immediately with a PublishResult future
result := g.topic.Publish(g.ctx, &pubsub.Message{
Data: envelopeJSON,
})

// Create publish request with metadata for metrics tracking
req := &publishRequest{
result: result,
documentType: documentType,
}

// Send request to background processor (non-blocking if channel has space)
select {
case g.pendingResults <- req:
// Successfully queued
return nil
case <-g.ctx.Done():
// Context cancelled
return g.ctx.Err()
}
}

// Flush waits for all pending publish operations to complete
// Call this before shutdown to ensure no messages are lost
func (g *GleanEventsPublisher) Flush() {
log.Println("Flushing pending publishes...")

// Stop accepting new messages and flush pending batches
g.topic.Stop()

// Close results channel to signal background processor to finish
g.once.Do(func() {
close(g.pendingResults)
})

// Wait for background processor to finish
g.wg.Wait()
log.Println("All pending publishes flushed.")
}

// Close performs graceful shutdown, flushing all pending messages
func (g *GleanEventsPublisher) Close() error {
g.Flush()
return nil
}
{% else %}
// method called by each ping-specific record method.
// construct the ping, wrap it in the envelope, and print to stdout
func (g GleanEventsLogger) record(
Expand Down Expand Up @@ -180,6 +370,7 @@ func (g GleanEventsLogger) record(
fmt.Fprintln(g.Writer, string(envelopeJson))
return nil
}
{% endif %}
{# if any ping has an event metric, create methods and types for them #}
{% if events %}

Expand Down Expand Up @@ -250,7 +441,11 @@ type {{ ping|ping_type_name }} struct {
}

// Record and submit `{{ ping }}` ping
{% if transport == "pubsub" %}
func (g *GleanEventsPublisher) Record{{ ping|ping_type_name }}(
{% else %}
func (g GleanEventsLogger) Record{{ ping|ping_type_name }}(
{% endif %}
requestInfo RequestInfo,
params {{ ping|ping_type_name }},
) error {
Expand All @@ -276,11 +471,19 @@ func (g GleanEventsLogger) Record{{ ping|ping_type_name }}(
events = append(events, params.Event.gleanEvent())
}
{% endif %}
{% if transport == "pubsub" %}
return g.publish("{{ ping }}", requestInfo, metrics, events)
{% else %}
return g.record("{{ ping }}", requestInfo, metrics, events)
{% endif %}
}

// Record and submit `{{ ping }}` ping omitting user request info
{% if transport == "pubsub" %}
func (g *GleanEventsPublisher) Record{{ ping|ping_type_name}}WithoutUserInfo(
{% else %}
func (g GleanEventsLogger) Record{{ ping|ping_type_name}}WithoutUserInfo(
{% endif %}
params {{ ping|ping_type_name}},
) error {
return g.Record{{ ping|ping_type_name }}(defaultRequestInfo, params)
Expand Down
3 changes: 2 additions & 1 deletion glean_parser/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __init__(


OUTPUTTERS = {
"go_server": Outputter(go_server.output_go, []),
"go_server": Outputter(go_server.output_go_logger, []),
"go_server_pubsub": Outputter(go_server.output_go_pubsub, []),
"javascript": Outputter(javascript.output_javascript, []),
"typescript": Outputter(javascript.output_typescript, []),
"javascript_server": Outputter(javascript_server.output_javascript, []),
Expand Down
Loading