diff --git a/.gitignore b/.gitignore index 220d374d..c26604f2 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ *.dll *.so *.dylib +/bktec # Test binary, built with `go test -c` *.test diff --git a/go.mod b/go.mod index 03c0406c..a3f349af 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/buildkite/test-engine-client -go 1.21 +go 1.23.0 -toolchain go1.22.4 +toolchain go1.24.0 require ( github.com/buildkite/roko v1.3.1 @@ -11,18 +11,22 @@ require ( require ( drjosh.dev/zzglob v0.4.0 + github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c + github.com/google/uuid v1.6.0 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/olekukonko/tablewriter v0.0.5 github.com/pact-foundation/pact-go/v2 v2.0.10 + golang.org/x/net v0.35.0 golang.org/x/sys v0.30.0 + google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.5 ) require ( github.com/hashicorp/logutils v1.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect - google.golang.org/grpc v1.67.3 // indirect - google.golang.org/protobuf v1.36.3 // indirect + golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e // indirect ) diff --git a/go.sum b/go.sum index 3ee73105..1eebf23f 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,21 @@ drjosh.dev/zzglob v0.4.0 h1:gOb46aIHyHG8BlYpvZZM4dqR2dpsbKtI82IbYVAYIj4= drjosh.dev/zzglob v0.4.0/go.mod h1:c3V3WPyfG+81h/bNOalEaba0jEQl16i9efSAmWOeOw8= +github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c h1:qLnyVD+ND7Ll3p9Lw0Z7Vk5HirKRZcBRJzHELYe5Z84= +github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c/go.mod h1:GHZ5lGzUtz9LQ2oHt8EweXn0zS8t2sCD9bNBw9R9s8E= github.com/buildkite/roko v1.3.1 h1:t7K30ceLLYn6k7hQP4oq1c7dVlhgD5nRcuSRDEEnY1s= github.com/buildkite/roko v1.3.1/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= @@ -20,18 +30,32 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8= -google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE= +google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e h1:nsxey/MfoGzYNduN0NN/+hqP9iiCIYsrVbXb/8hjFM8= +google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e/go.mod h1:Xsh8gBVxGCcbV8ZeTB9wI5XPyZ5RvC6V3CTeeplHbiA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e h1:YA5lmSs3zc/5w+xsRcHqpETkaYyK63ivEPzNTcUUlSA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/internal/bes/bes.go b/internal/bes/bes.go new file mode 100644 index 00000000..6d247263 --- /dev/null +++ b/internal/bes/bes.go @@ -0,0 +1,151 @@ +// Package bes implements a Bazel Build Event Service gRPC listener: +// https://bazel.build/remote/bep#build-event-service +// It listens for TestResult events, and uploads their XML report to Test +// Engine. +package bes + +import ( + "context" + "fmt" + "io" + "sort" + + slog "github.com/buildkite/test-engine-client/internal/bes/quietslog" + + "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/emptypb" +) + +type BuildEventServer struct { + handler *BuildEventHandler +} + +// PublishLifecycleEvent is copied verbatim from: +// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go +// +// PublishLifecycleEvent handles life cycle events. +func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { + slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) + return &emptypb.Empty{}, nil +} + +// PublishBuildToolEventStream is copied verbatim from: +// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go +// The BuildEventHandler and BuildEventChannel that it passes events to mimicks +// the expected interfaces, but provide a bktec-specific implementation. +// +// PublishBuildToolEventStream handles a build tool event stream. +// bktec thanks buildbarn/bb-portal for the basis of this :D +func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { + slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) + + // List of SequenceIds we've received. + // We'll want to ack these once all events are received, as we don't support resumption. + seqNrs := make([]int64, 0) + + ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) { + if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ + StreamId: streamID, + SequenceNumber: sequenceNumber, + }); err != nil { + + // with the option --bes_upload_mode=fully_async or nowait_for_upload_complete + // its not an error when the send fails. the bes gracefully terminated the close + // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) + // the context is processed in the background, so by the time we are acknowledging these + // requests, the client connection may have already timed out and these errors can be + // safely ignored + grpcErr := status.Convert(err) + if isClosing && + grpcErr.Code() == codes.Unavailable && + grpcErr.Message() == "transport is closing" { + return + } + + slog.ErrorContext( + stream.Context(), + "Send failed", + "err", + err, + "streamid", + streamID, + "sequenceNumber", + sequenceNumber, + ) + } + } + + var streamID *build.StreamId + reqCh := make(chan *build.PublishBuildToolEventStreamRequest) + errCh := make(chan error) + var eventCh BuildEventChannel + + go func() { + for { + req, err := stream.Recv() + if err != nil { + errCh <- err + return + } + reqCh <- req + } + }() + + for { + select { + case err := <-errCh: + if err == io.EOF { + slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) + + if eventCh == nil { + slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context()) + return nil + } + + // Validate that all events were received + sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] }) + + // TODO: Find out if initial sequence number can be != 1 + expected := int64(1) + for _, seqNr := range seqNrs { + if seqNr != expected { + return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected)) + } + expected++ + } + + err := eventCh.Finalize() + if err != nil { + return err + } + + // Ack all events + for _, seqNr := range seqNrs { + ack(streamID, seqNr, true) + } + + return nil + } + + slog.ErrorContext(stream.Context(), "Recv failed", "err", err) + return err + + case req := <-reqCh: + // First event + if streamID == nil { + streamID = req.OrderedBuildEvent.GetStreamId() + eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent) + } + + seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) + + if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { + slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) + return err + } + } + } +} diff --git a/internal/bes/bes_test.go b/internal/bes/bes_test.go new file mode 100644 index 00000000..57495e2a --- /dev/null +++ b/internal/bes/bes_test.go @@ -0,0 +1,14 @@ +package bes + +import "testing" + +func TestPathFromURI(t *testing.T) { + path, err := pathFromURI("file:///hello/world.txt") + if err != nil { + t.Errorf("pathFromURI error: %v", err) + } + + if want := "/hello/world.txt"; want != path { + t.Errorf("wanted %v got %v", want, path) + } +} diff --git a/internal/bes/channel.go b/internal/bes/channel.go new file mode 100644 index 00000000..621e6a5e --- /dev/null +++ b/internal/bes/channel.go @@ -0,0 +1,107 @@ +package bes + +import ( + "context" + "fmt" + "log/slog" + "net/url" + "time" + + "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventChannel in bktec mimics the bb-portal interface so that the +// BuildEventServer.PublishBuildEventServer code can be used verbatim. +// +// BuildEventChannel handles a single BuildEvent stream +type BuildEventChannel interface { + // HandleBuildEvent processes a single BuildEvent + // This method should be called for each received event. + HandleBuildEvent(event *build.BuildEvent) error + + // Finalize does post-processing of a stream of BuildEvents. + // This method should be called after receiving the EOF event. + Finalize() error +} + +type buildEventChannel struct { + ctx context.Context + streamID *build.StreamId + filenames chan<- string +} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + if event.GetBazelEvent() == nil { + return nil + } + var bazelEvent bes.BuildEvent + if err := event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil { + slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err) + return err + } + + payload := bazelEvent.GetPayload() + if testResult, ok := payload.(*bes.BuildEvent_TestResult); ok { + r := testResult.TestResult + files := []string{} + for _, x := range r.GetTestActionOutput() { + if x.GetName() == "test.xml" { + path, err := pathFromURI(x.GetUri()) + if err != nil { + return err // maybe just a log a warning? + } + files = append(files, path) + c.filenames <- path + } + } + slog.Info("TestResult", + "status", r.GetStatus(), + "cached", r.GetCachedLocally(), + "dur", r.GetTestAttemptDuration().AsDuration().String(), + "files", files, + ) + } + + return nil +} + +func pathFromURI(uri string) (string, error) { + url, err := url.Parse(uri) + if err != nil { + return "", err + } + if url.Scheme != "file" { + return "", fmt.Errorf("expected file://..., got %v://...", url.Scheme) + } + return url.Path, nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *buildEventChannel) Finalize() error { + // defer the ctx so its not reaped when the client closes the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) + defer cancel() + + slog.Info("finalizing build event channel") + _ = ctx + // TODO: finalize anything that needs finalizing? + + cancel() + return nil +} + +// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events. +// It is used when receiving a stream of events that we wish to ack without processing. +type noOpBuildEventChannel struct{} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + return nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *noOpBuildEventChannel) Finalize() error { + return nil +} diff --git a/internal/bes/handler.go b/internal/bes/handler.go new file mode 100644 index 00000000..95134283 --- /dev/null +++ b/internal/bes/handler.go @@ -0,0 +1,39 @@ +package bes + +import ( + "context" + + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventHandler in bktec mimics the bb-portal handler so that the +// BuildEventServer.PublishBuildToolEventStream code can be used verbatim. +// +// BuildEventHandler orchestrates the handling of incoming Build Event streams. +// For each incoming stream, and BuildEventChannel is created, which handles that stream. +// BuildEventHandler is responsible for managing the things that are common to these event streams. +type BuildEventHandler struct { + filenames chan<- string +} + +// NewBuildEventHandler constructs a new BuildEventHandler +func NewBuildEventHandler(filenames chan<- string) *BuildEventHandler { + return &BuildEventHandler{ + filenames: filenames, + } +} + +// CreateEventChannel creates a new BuildEventChannel +func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel { + // If the first event does not have sequence number 1, we have processed this + // invocation previously, and should skip all processing. + if initialEvent.SequenceNumber != 1 { + return &noOpBuildEventChannel{} + } + + return &buildEventChannel{ + ctx: ctx, + streamID: initialEvent.StreamId, + filenames: h.filenames, + } +} diff --git a/internal/bes/listen.go b/internal/bes/listen.go new file mode 100644 index 00000000..2a74452b --- /dev/null +++ b/internal/bes/listen.go @@ -0,0 +1,101 @@ +package bes + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/upload" + "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc" +) + +func ListenCLI(argv []string, env env.Env) error { + flags := flag.NewFlagSet("bktec bazel listen", flag.ExitOnError) + portFlag := flags.Int("port", 0, "gRPC port to listen") + listenHostFlag := flags.String("listen-host", "127.0.0.1", "gRPC host to listen") + debugFlag := flags.Bool("debug", false, "debug logging") + flags.Parse(argv) + + if *debugFlag { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // a channel to propagate OS signals + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + // configure uploader + cfg, err := upload.ConfigFromEnv(env) + if err != nil { + return fmt.Errorf("uploader configuration: %w", err) + } + runEnv, err := upload.RunEnvFromEnv(env) + if err != nil { + return fmt.Errorf("uploader run_env configuration: %w", err) + } + uploader := NewUploader(cfg, runEnv, "junit") + go uploader.Start(ctx) + + // configure gRPC Bazel BES server + addr := fmt.Sprintf("%s:%d", *listenHostFlag, *portFlag) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("listening on %s: %w", addr, err) + } + opts := []grpc.ServerOption{} + srv := grpc.NewServer(opts...) + build.RegisterPublishBuildEventServer(srv, BuildEventServer{ + handler: &BuildEventHandler{ + filenames: uploader.Filenames, + }, + }) + slog.Info("Bazel BES listener", "addr", "grpc://"+listener.Addr().String()) + go serve(srv, listener) + + // main loop + run := true + sigCount := 0 + for run { + select { + case url, ok := <-uploader.Responses: + if !ok { + slog.Debug("Response channel closed") + run = false + continue + } + slog.Info("Uploaded", "url", url) + case err := <-uploader.Errs: + slog.Error("Upload error", "error", err) + case sig := <-signals: + sigCount++ + srv.Stop() + if sigCount == 1 { + slog.Info("Stopping (again to force)...", "signal", sig) + uploader.Stop() + } else { + slog.Info("Stopping forcefully...", "signal", sig) + cancel() + } + } + } + + slog.Debug("done") + return nil +} + +func serve(s *grpc.Server, listener net.Listener) { + err := s.Serve(listener) + if err != nil { + slog.Error("gRPC server error", "err", err) + } +} diff --git a/internal/bes/quietslog/quietslog.go b/internal/bes/quietslog/quietslog.go new file mode 100644 index 00000000..4bf11c62 --- /dev/null +++ b/internal/bes/quietslog/quietslog.go @@ -0,0 +1,26 @@ +// Package quietslog provides a replacement for slog which downgrades Info +// messages to Debug instead, so that the log output is quieter. This is done +// specifically so that bes.BuildEventServer.PublishBuildToolEventStream() +// source code can be kept unmodified from the bb-portal upstream it's copied +// from. +package quietslog + +import ( + "context" + "log/slog" +) + +// InfoContext delegates to DebugContext of the real logger, making this logger quiet. +func InfoContext(ctx context.Context, msg string, args ...any) { + slog.DebugContext(ctx, msg, args...) +} + +// WarnContext wraps the direct logger directly. +func WarnContext(ctx context.Context, msg string, args ...any) { + slog.WarnContext(ctx, msg, args...) +} + +// ErrorContext wraps the direct logger directly. +func ErrorContext(ctx context.Context, msg string, args ...any) { + slog.ErrorContext(ctx, msg, args...) +} diff --git a/internal/bes/uploader.go b/internal/bes/uploader.go new file mode 100644 index 00000000..8f7fc5a1 --- /dev/null +++ b/internal/bes/uploader.go @@ -0,0 +1,76 @@ +package bes + +import ( + "context" + "log/slog" + + "github.com/buildkite/test-engine-client/internal/upload" +) + +type Uploader struct { + Config upload.Config + RunEnv upload.RunEnvMap + Format string + Filenames chan string + Responses chan string + Errs chan error + + stopping bool +} + +func NewUploader(cfg upload.Config, runEnv upload.RunEnvMap, format string) *Uploader { + // a channel to pass filenames from BES server to uploader + filenames := make(chan string, 1024) + + // a channel to receive response upload URLs + responses := make(chan string) + + // a channel to receive errors from the uploader + errs := make(chan error) + + return &Uploader{ + Config: cfg, + RunEnv: runEnv, + Format: format, + Filenames: filenames, + Responses: responses, + Errs: errs, + } +} + +func (u *Uploader) Start(ctx context.Context) { + for filename := range u.Filenames { + if ctx.Err() != nil { + slog.Debug("Uploader context canceled") + break + } + resp, err := u.UploadFile(ctx, filename) + if err != nil { + u.Errs <- err + continue + } + u.Responses <- resp["upload_url"] + } + slog.Debug("Uploader finished") + close(u.Responses) +} + +// Stop closes the Filenames channel; filenames already buffered on the channel +// will be uploaded before finishing. +func (u *Uploader) Stop() { + if u.stopping { + slog.Warn("Uploader GracefulStop: already stopping") + return + } + slog.Debug("Uploader GracefulStop") + u.stopping = true + close(u.Filenames) +} + +func (u *Uploader) UploadFile(ctx context.Context, filename string) (map[string]string, error) { + resp, err := upload.Upload(ctx, u.Config, u.RunEnv, u.Format, filename) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/internal/upload/upload.go b/internal/upload/upload.go new file mode 100644 index 00000000..21f0b2bf --- /dev/null +++ b/internal/upload/upload.go @@ -0,0 +1,245 @@ +package upload + +import ( + "bytes" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "log/slog" + "maps" + "mime/multipart" + "net/http" + "os" + "path/filepath" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/version" + "github.com/google/uuid" + "golang.org/x/net/context" +) + +type RunEnvMap map[string]string + +// Config is upload-specific configuration, but may also contain configuration +// that is redundant with config.Config, since package upload isn't really +// unified/integrated with the rest of bktec yet. +type Config struct { + // UploadUrl is the Test Engine upload API endpoint e.g. https://analytics-api.buildkite.com/v1/uploads + UploadUrl string + + // SuiteToken is the Test Engine upload API suite authentication token + SuiteToken string +} + +func ConfigFromEnv(env env.Env) (Config, error) { + url := env.Get("BUILDKITE_TEST_ENGINE_UPLOAD_URL") + if url == "" { + url = "https://analytics-api.buildkite.com/v1/uploads" + } + + token := env.Get("BUILDKITE_ANALYTICS_TOKEN") + if token == "" { + return Config{}, fmt.Errorf("BUILDKITE_ANALYTICS_TOKEN missing") + } + + return Config{ + UploadUrl: url, + SuiteToken: token, + }, nil +} + +// UploadCLI is a CLI entrypoint for uploading results to Test Engine. +func UploadCLI(flag *flag.FlagSet, env env.Env) error { + cfg, err := ConfigFromEnv(env) + if err != nil { + return fmt.Errorf("configuration error: %w", err) + } + + filename := flag.Arg(1) + if filename == "" { + return fmt.Errorf("expected path to JUnit XML or JSON file") + } + + info, err := os.Stat(filename) + if err != nil { + return fmt.Errorf("file does not exist: %s", filename) + } else if !info.Mode().IsRegular() { + return fmt.Errorf("not a regular file: %s", filename) + } + + var format string + switch filepath.Ext(filename) { + case ".xml": + format = "junit" + case ".json": + format = "json" + default: + return fmt.Errorf("could not infer format (JUnit / JSON) from filename") + } + + runEnv, err := RunEnvFromEnv(env) + if err != nil { + return fmt.Errorf("unable to derive runEnv: %w", err) + } + + slog.Info("Uploading", "key", runEnv["key"], "format", format, "filename", filename) + + ctx := context.Background() + respData, err := Upload(ctx, cfg, runEnv, format, filename) + if err != nil { + return err + } + + slog.Info("Upload successful", "url", respData["upload_url"]) + + return nil +} + +// Upload sends test result data to Test Engine. +func Upload(ctx context.Context, cfg Config, runEnv RunEnvMap, format string, filename string) (map[string]string, error) { + body, err := buildUploadData(runEnv, format, filename) + if err != nil { + return nil, fmt.Errorf("preparing upload data: %w", err) + } + + req, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + cfg.UploadUrl, + body.buf, + ) + if err != nil { + return nil, fmt.Errorf("creating HTTP request: %w", err) + } + + req.Header.Set("Content-Type", body.writer.FormDataContentType()) + req.Header.Set("Authorization", fmt.Sprintf(`Token token="%s"`, cfg.SuiteToken)) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP error: %w", err) + } + defer resp.Body.Close() + + status := resp.Status + + // Currently this should get HTTP 202 Accepted, but let's be a bit permissive to future changes. + if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf( + "expected HTTP %d or %d from Upload API, got %s", + http.StatusCreated, + http.StatusAccepted, + status, + ) + } + + // try to parse the response, but just warn if that fails + respData := make(map[string]string) + err = json.NewDecoder(resp.Body).Decode(&respData) + if err != nil && !errors.Is(err, io.EOF) { + slog.Warn("failed to parse response", "status", status, "error", err) + } + + return respData, nil +} + +func RunEnvFromEnv(env env.Env) (RunEnvMap, error) { + runEnv := RunEnvMap{ + "collector": "bktec", + "version": version.Version, + } + + if _, ok := env.Lookup("BUILDKITE_BUILD_ID"); ok { + maps.Copy(runEnv, RunEnvMap{ + "CI": "buildkite", + "branch": env.Get("BUILDKITE_BRANCH"), + "commit_sha": env.Get("BUILDKITE_COMMIT"), + "job_id": env.Get("BUILDKITE_JOB_ID"), + "key": env.Get("BUILDKITE_BUILD_ID"), + "message": env.Get("BUILDKITE_MESSAGE"), + "number": env.Get("BUILDKITE_BUILD_NUMBER"), + "url": env.Get("BUILDKITE_BUILD_URL"), + }) + } else { + key, err := uuid.NewV7() + if err != nil { + return nil, fmt.Errorf("UUID generation failed; broken PRNG? %w", err) + } + maps.Copy(runEnv, RunEnvMap{ + "CI": "generic", + "key": key.String(), + }) + } + return runEnv, nil +} + +func buildUploadData(runEnv RunEnvMap, format string, filename string) (*MultipartBody, error) { + var err error + + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("opening %s for reading: %w", filename, err) + } + defer file.Close() + + body := NewMultipartBody() + + if err = body.WriteFormat(format); err != nil { + return nil, err + } + + if err = body.WriteRunEnv(runEnv); err != nil { + return nil, err + } + + if err = body.WriteDataFromFile(file); err != nil { + return nil, err + } + + if err = body.Close(); err != nil { + return nil, err + } + + return body, nil +} + +type MultipartBody struct { + writer multipart.Writer + buf *bytes.Buffer +} + +func NewMultipartBody() *MultipartBody { + buf := &bytes.Buffer{} + return &MultipartBody{ + writer: *multipart.NewWriter(buf), + buf: buf, + } +} + +func (b *MultipartBody) WriteFormat(format string) error { + return b.writer.WriteField("format", format) +} + +func (b *MultipartBody) WriteRunEnv(runEnv RunEnvMap) error { + for k, v := range runEnv { + if err := b.writer.WriteField("run_env["+k+"]", v); err != nil { + return err + } + } + return nil +} + +func (b *MultipartBody) WriteDataFromFile(file *os.File) error { + part, err := b.writer.CreateFormFile("data", file.Name()) + if err != nil { + return fmt.Errorf("MultipartBody: %w", err) + } + _, err = io.Copy(part, file) + return err +} + +func (b *MultipartBody) Close() error { + return b.writer.Close() +} diff --git a/internal/upload/upload_test.go b/internal/upload/upload_test.go new file mode 100644 index 00000000..c774cf88 --- /dev/null +++ b/internal/upload/upload_test.go @@ -0,0 +1,252 @@ +package upload + +import ( + "context" + "fmt" + "io" + "mime" + "mime/multipart" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/version" + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" +) + +func TestConfigFromEnv(t *testing.T) { + cfg, err := ConfigFromEnv(env.Map{ + "BUILDKITE_ANALYTICS_TOKEN": "hunter2", + }) + if err != nil { + t.Errorf("ConfigFromEnv(): %v", err) + } + + want := Config{ + UploadUrl: "https://analytics-api.buildkite.com/v1/uploads", + SuiteToken: "hunter2", + } + + if diff := cmp.Diff(want, cfg); diff != "" { + t.Errorf("ConfigFromEnv() (-want +got)\n%s", diff) + } +} + +func TestConfigFromEnv_missingToken(t *testing.T) { + _, err := ConfigFromEnv(env.Map{}) + if err == nil { + t.Fatal("expected error from ConfigFromEnv with no token") + } + + want, got := "BUILDKITE_ANALYTICS_TOKEN missing", err.Error() + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("ConfigFromEnv() (-want +got):\n%s", diff) + } +} + +func TestConfigFromEnv_uploadURL(t *testing.T) { + cfg, _ := ConfigFromEnv(env.Map{ + "BUILDKITE_TEST_ENGINE_UPLOAD_URL": "http://localhost:1234/foo", + "BUILDKITE_ANALYTICS_TOKEN": "hello", + }) + + want := Config{ + UploadUrl: "http://localhost:1234/foo", + SuiteToken: "hello", + } + + if diff := cmp.Diff(want, cfg); diff != "" { + t.Errorf("ConfigFromEnv (-want +got)\n%s", diff) + } +} + +func TestBuildRunEnv(t *testing.T) { + runEnv, err := RunEnvFromEnv(env.Map{ + "BUILDKITE_BUILD_ID": "thebuild", + "BUILDKITE_BRANCH": "trunk", + "BUILDKITE_COMMIT": "cafe", + "BUILDKITE_JOB_ID": "thejob", + "BUILDKITE_MESSAGE": "hello world", + "BUILDKITE_BUILD_NUMBER": "42", + "BUILDKITE_BUILD_URL": "http://localhost/builds/42", + }) + if err != nil { + t.Errorf("buildRunEnv(): %v", err) + } + + want := RunEnvMap{ + "collector": "bktec", + "version": version.Version, + "CI": "buildkite", + "branch": "trunk", + "commit_sha": "cafe", + "job_id": "thejob", + "key": "thebuild", + "message": "hello world", + "number": "42", + "url": "http://localhost/builds/42", + } + + if diff := cmp.Diff(want, runEnv); diff != "" { + t.Errorf("buildRunEnv() (-want +got):\n%s", diff) + } +} + +func TestBuildRunEnv_generic(t *testing.T) { + runEnv, err := RunEnvFromEnv(env.Map{}) + if err != nil { + t.Errorf("buildRunEnv(): %v", err) + } + + want := RunEnvMap{ + "collector": "bktec", + "version": version.Version, + "CI": "generic", + "key": "00000000-0000-0000-0000-000000000000", // placeholder + } + + if diff := cmp.Diff(want, runEnv, cmpKeyValidUUID()); diff != "" { + t.Errorf("buildRunEnv() (-want +got):\n%s", diff) + } +} + +func TestUpload(t *testing.T) { + filename, xml := createTestXML(t) + defer os.Remove(filename) + + // receive request details from the HTTP handler + type requestInfo struct { + Method string + Path string + Authorization string + Data map[string]string + } + var gotRequestInfo requestInfo + + // fake API server + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data, err := multipartToMap(r) + if err != nil { + t.Errorf("parsing request: %v", err) + } + + gotRequestInfo = requestInfo{ + Method: r.Method, + Path: r.URL.Path, + Authorization: r.Header.Get("Authorization"), + Data: data, + } + + w.WriteHeader(http.StatusAccepted) + io.WriteString(w, `{"id":"theuuid","url":"http://localhost/path/theuuid"}`) + })) + defer srv.Close() + + // Upload! + cfg := Config{ + UploadUrl: srv.URL + "/path", + SuiteToken: "hunter2", + } + runEnv := RunEnvMap{ + "CI": "buildkite", + "key": "thekey", + } + format := "junit" + ctx := context.Background() + responseData, err := Upload(ctx, cfg, runEnv, format, filename) + if err != nil { + t.Fatalf("upload failed: %v", err) + } + + // verify the HTTP request details + wantRequestInfo := requestInfo{ + Method: "POST", + Path: "/path", + Authorization: `Token token="hunter2"`, + Data: map[string]string{ + "data": xml, + "format": "junit", + "run_env[CI]": "buildkite", + "run_env[key]": "thekey", + }, + } + if diff := cmp.Diff(wantRequestInfo, gotRequestInfo); diff != "" { + t.Errorf("HTTP request (-want +got):\n%s", diff) + } + + wantResponseData := map[string]string{ + "id": "theuuid", + "url": "http://localhost/path/theuuid", + } + if diff := cmp.Diff(wantResponseData, responseData); diff != "" { + t.Errorf("HTTP response data (-want +got):\n%s", diff) + } +} + +// cmpKeyValidUUID is an Option for cmp.Diff that validates the values of `key` +// in two maps being compared are both valid UUIDs. Note that Comparer +// functions must be symmetric; they're run as fn(a,b) and fn(b,a). +func cmpKeyValidUUID() cmp.Option { + return cmp.FilterPath(func(path cmp.Path) bool { + return path.Last().String() == `["key"]` + }, cmp.Comparer(func(a, b string) bool { + return uuid.Validate(a) == nil && uuid.Validate(b) == nil + })) +} + +func createTestXML(t *testing.T) (string, string) { + data := `` + f, err := os.CreateTemp("", "test.xml") + if err != nil { + t.Fatal(err) + } + _, err = f.WriteString(data) + if err != nil { + t.Fatal(err) + } + if err := f.Close(); err != nil { + t.Fatal(err) + } + return f.Name(), data +} + +func getMultipartBoundary(contentType string) (string, error) { + mt, params, err := mime.ParseMediaType(contentType) + if err != nil { + return "", err + } + if want := "multipart/form-data"; mt != want { + return "", fmt.Errorf("Content-Type: wanted %s, got %s", want, mt) + } + boundary := params["boundary"] + if boundary == "" { + return "", fmt.Errorf("missing multipart boundary") + } + return boundary, nil +} + +func multipartToMap(r *http.Request) (map[string]string, error) { + boundary, err := getMultipartBoundary(r.Header.Get("Content-Type")) + if err != nil { + return nil, fmt.Errorf("getMultipartBoundary: %w", err) + } + mr := multipart.NewReader(r.Body, boundary) + parsed := map[string]string{} + for { + p, err := mr.NextPart() + if err == io.EOF { + break + } else if err != nil { + return nil, fmt.Errorf("multipartToMap; NextPart: %w", err) + } + partData, err := io.ReadAll(p) + if err != nil { + return nil, fmt.Errorf("multipartToMap; ReadAll: %w", err) + } + parsed[p.FormName()] = string(partData) + } + return parsed, nil +} diff --git a/main.go b/main.go index 3472b9a3..668be0c9 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "errors" "flag" "fmt" + "log" "os" "os/exec" "strconv" @@ -14,11 +15,13 @@ import ( "time" "github.com/buildkite/test-engine-client/internal/api" + "github.com/buildkite/test-engine-client/internal/bes" "github.com/buildkite/test-engine-client/internal/config" "github.com/buildkite/test-engine-client/internal/debug" "github.com/buildkite/test-engine-client/internal/env" "github.com/buildkite/test-engine-client/internal/plan" "github.com/buildkite/test-engine-client/internal/runner" + "github.com/buildkite/test-engine-client/internal/upload" "github.com/buildkite/test-engine-client/internal/version" "github.com/olekukonko/tablewriter" "golang.org/x/sys/unix" @@ -62,6 +65,19 @@ func main() { printStartUpMessage() + // TODO: proper subcommands + if flag.Arg(0) == "upload" { + if err := upload.UploadCLI(flag.CommandLine, env); err != nil { + logErrorAndExit(16, "upload: %v", err) + } + os.Exit(0) + } else if flag.Arg(0) == "bazel" && flag.Arg(1) == "listen" { + if err := bes.ListenCLI(os.Args[3:], env); err != nil { + log.Fatal(err) + } + os.Exit(0) + } + // get config cfg, err := config.New(env) if err != nil {