From 6a6dde41111d2a69985a027bbcc70babfc3532c9 Mon Sep 17 00:00:00 2001 From: "a.okhovat" Date: Sun, 14 Jun 2026 10:36:44 +0330 Subject: [PATCH] tracing/opentracing: add TraceProducer and TraceConsumer with tests --- tracing/opentracing/endpoint.go | 50 ++++++++-- tracing/opentracing/endpoint_options.go | 13 +++ tracing/opentracing/endpoint_test.go | 126 ++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 6 deletions(-) diff --git a/tracing/opentracing/endpoint.go b/tracing/opentracing/endpoint.go index 4df1ef264..3cc10b98d 100644 --- a/tracing/opentracing/endpoint.go +++ b/tracing/opentracing/endpoint.go @@ -19,7 +19,8 @@ import ( // If `ctx` doesn't yet have a Span, the new one is created. func TraceEndpoint(tracer opentracing.Tracer, operationName string, opts ...EndpointOption) endpoint.Middleware { cfg := &EndpointOptions{ - Tags: make(opentracing.Tags), + Tags: make(opentracing.Tags), + SpanReferenceType: opentracing.ChildOfRef, } for _, opt := range opts { @@ -35,11 +36,22 @@ func TraceEndpoint(tracer opentracing.Tracer, operationName string, opts ...Endp } var span opentracing.Span + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { - span = tracer.StartSpan( - operationName, - opentracing.ChildOf(parentSpan.Context()), - ) + switch cfg.SpanReferenceType { + case opentracing.FollowsFromRef: + span = tracer.StartSpan( + operationName, + opentracing.FollowsFrom(parentSpan.Context()), + ) + case opentracing.ChildOfRef: + fallthrough + default: + span = tracer.StartSpan( + operationName, + opentracing.ChildOf(parentSpan.Context()), + ) + } } else { span = tracer.StartSpan(operationName) } @@ -98,7 +110,7 @@ func TraceEndpoint(tracer opentracing.Tracer, operationName string, opts ...Endp } // TraceServer returns a Middleware that wraps the `next` Endpoint in an -// OpenTracing Span called `operationName` with server span.kind tag.. +// OpenTracing Span called `operationName` with server span.kind tag. func TraceServer(tracer opentracing.Tracer, operationName string, opts ...EndpointOption) endpoint.Middleware { opts = append(opts, WithTags(map[string]interface{}{ otext.SpanKindRPCServer.Key: otext.SpanKindRPCServer.Value, @@ -117,6 +129,32 @@ func TraceClient(tracer opentracing.Tracer, operationName string, opts ...Endpoi return TraceEndpoint(tracer, operationName, opts...) } +// TraceProducer returns a Middleware that wraps the `next` Endpoint in an +// OpenTracing Span called `operationName` with producer span.kind tag. +func TraceProducer(tracer opentracing.Tracer, operationName string, opts ...EndpointOption) endpoint.Middleware { + opts = append(opts, + WithTags(map[string]interface{}{ + otext.SpanKindProducer.Key: otext.SpanKindProducer.Value, + }), + WithSpanReferenceType(opentracing.FollowsFromRef), + ) + + return TraceEndpoint(tracer, operationName, opts...) +} + +// TraceConsumer returns a Middleware that wraps the `next` Endpoint in an +// OpenTracing Span called `operationName` with consumer span.kind tag. +func TraceConsumer(tracer opentracing.Tracer, operationName string, opts ...EndpointOption) endpoint.Middleware { + opts = append(opts, + WithTags(map[string]interface{}{ + otext.SpanKindConsumer.Key: otext.SpanKindConsumer.Value, + }), + WithSpanReferenceType(opentracing.FollowsFromRef), + ) + + return TraceEndpoint(tracer, operationName, opts...) +} + func applyTags(span opentracing.Span, tags opentracing.Tags) { for key, value := range tags { span.SetTag(key, value) diff --git a/tracing/opentracing/endpoint_options.go b/tracing/opentracing/endpoint_options.go index 6854271af..4e742a976 100644 --- a/tracing/opentracing/endpoint_options.go +++ b/tracing/opentracing/endpoint_options.go @@ -25,6 +25,10 @@ type EndpointOptions struct { // GetTags is an optional function that can extract tags // from the context and add them to the span. GetTags func(ctx context.Context) opentracing.Tags + + // SpanReferenceType defines the reference type used when creating a child span. + // Defaults to opentracing.ChildOf. Use opentracing.FollowsFrom for async operations. + SpanReferenceType opentracing.SpanReferenceType } // EndpointOption allows for functional options to endpoint tracing middleware. @@ -72,3 +76,12 @@ func WithTagsFunc(getTags func(ctx context.Context) opentracing.Tags) EndpointOp o.GetTags = getTags } } + +// WithSpanReferenceType sets the reference type for span relationships +// Use opentracing.ChildOf for synchronous operations (default) +// Use opentracing.FollowsFrom for asynchronous operations (producer/consumer) +func WithSpanReferenceType(spanReferenceType opentracing.SpanReferenceType) EndpointOption { + return func(o *EndpointOptions) { + o.SpanReferenceType = spanReferenceType + } +} diff --git a/tracing/opentracing/endpoint_test.go b/tracing/opentracing/endpoint_test.go index 267415b1f..5e7e3828e 100644 --- a/tracing/opentracing/endpoint_test.go +++ b/tracing/opentracing/endpoint_test.go @@ -403,3 +403,129 @@ func TestTraceClient(t *testing.T) { t.Fatalf("Want %q, have %q", want, have) } } + +func TestTraceProducer(t *testing.T) { + tracer := mocktracer.New() + + // Empty/background context. + tracedEndpoint := kitot.TraceProducer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := tracer.FinishedSpans() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + span := finishedSpans[0] + + if want, have := "testOp", span.OperationName; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + + if want, have := map[string]interface{}{ + otext.SpanKindProducer.Key: otext.SpanKindProducer.Value, + }, span.Tags(); fmt.Sprint(want) != fmt.Sprint(have) { + t.Fatalf("Want %q, have %q", want, have) + } +} + +func TestTraceConsumer(t *testing.T) { + tracer := mocktracer.New() + + // Empty/background context. + tracedEndpoint := kitot.TraceConsumer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := tracer.FinishedSpans() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + span := finishedSpans[0] + + if want, have := "testOp", span.OperationName; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + + if want, have := map[string]interface{}{ + otext.SpanKindConsumer.Key: otext.SpanKindConsumer.Value, + }, span.Tags(); fmt.Sprint(want) != fmt.Sprint(have) { + t.Fatalf("Want %q, have %q", want, have) + } +} + +func TestTraceProducerFollowsFromRef(t *testing.T) { + tracer := mocktracer.New() + + // Initialize the ctx with a parent Span. + parentSpan := tracer.StartSpan("parent").(*mocktracer.MockSpan) + defer parentSpan.Finish() + ctx := opentracing.ContextWithSpan(context.Background(), parentSpan) + + tracedEndpoint := kitot.TraceProducer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := tracer.FinishedSpans() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.OperationName; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + + if want, have := map[string]interface{}{ + otext.SpanKindProducer.Key: otext.SpanKindProducer.Value, + }, endpointSpan.Tags(); fmt.Sprint(want) != fmt.Sprint(have) { + t.Fatalf("Want %q, have %q", want, have) + } + + if want, have := parentSpan.SpanContext.SpanID, endpointSpan.ParentID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } +} + +func TestTraceConsumerFollowsFromRef(t *testing.T) { + tracer := mocktracer.New() + + // Initialize the ctx with a parent Span. + parentSpan := tracer.StartSpan("parent").(*mocktracer.MockSpan) + defer parentSpan.Finish() + ctx := opentracing.ContextWithSpan(context.Background(), parentSpan) + + tracedEndpoint := kitot.TraceConsumer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := tracer.FinishedSpans() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.OperationName; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + + if want, have := map[string]interface{}{ + otext.SpanKindConsumer.Key: otext.SpanKindConsumer.Value, + }, endpointSpan.Tags(); fmt.Sprint(want) != fmt.Sprint(have) { + t.Fatalf("Want %q, have %q", want, have) + } + + if want, have := parentSpan.SpanContext.SpanID, endpointSpan.ParentID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } +}