Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions test/lib/go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ SHELL := /bin/bash
BIN := bin/telepact-go-dispatcher
GO_SOURCES := $(shell find cmd -type f -name '*.go') $(shell find internal -type f -name '*.go' 2>/dev/null)
CODEGEN_DIR := cmd/dispatcher/gen
GO_ENV := GO111MODULE=on GOCACHE=/tmp/telepact-go-build

all: codegen build

build: $(BIN)

$(BIN): go.mod go.sum $(GO_SOURCES)
mkdir -p bin
GO111MODULE=on go build -o $(BIN) ./cmd/dispatcher
$(GO_ENV) go build -o $(BIN) ./cmd/dispatcher

codegen:
rm -rf $(CODEGEN_DIR)
Expand All @@ -35,13 +36,13 @@ codegen:
gofmt -w $(CODEGEN_DIR)

mod-tidy:
GO111MODULE=on go mod tidy
$(GO_ENV) go mod tidy

run:
GO111MODULE=on go run ./cmd/dispatcher
$(GO_ENV) go run ./cmd/dispatcher

test-server: codegen
GO111MODULE=on go run ./cmd/dispatcher
$(GO_ENV) go run ./cmd/dispatcher

clean:
rm -rf bin
Expand Down
2 changes: 1 addition & 1 deletion test/lib/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ make
### Run the dispatcher locally

```sh
NATS_URL=nats://127.0.0.1:4222 make test-server
TP_TRANSPORT_URL=stdio://local make test-server
```

The harness writes Prometheus metrics to `metrics.txt` when it shuts down.
88 changes: 44 additions & 44 deletions test/lib/go/cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"sync/atomic"
"time"

nats "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
telepact "github.com/telepact/telepact/lib/go"
transport "github.com/telepact/telepact/test/lib/go/internal/stdiotransport"
"github.com/vmihailenco/msgpack/v5"
)

Expand All @@ -29,11 +29,11 @@ const backwardsCompatibleChangeSchema = `
`

func main() {
logger := log.New(os.Stdout, "[telepact-go] ", log.LstdFlags|log.Lmicroseconds)
logger := log.New(os.Stderr, "[telepact-go] ", log.LstdFlags|log.Lmicroseconds)

natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
logger.Fatal("NATS_URL env var not set")
transportURL := os.Getenv("TP_TRANSPORT_URL")
if transportURL == "" {
logger.Fatal("TP_TRANSPORT_URL env var not set")
}

subject := os.Getenv("TP_HARNESS_SUBJECT")
Expand All @@ -43,12 +43,12 @@ func main() {

metricsFile := "metrics.txt"

nc, err := nats.Connect(natsURL, nats.Name("telepact-go-test-harness"))
nc, err := transport.Open(transportURL, transport.WithLabel("telepact-go-test-harness"))
if err != nil {
logger.Fatalf("failed to connect to NATS: %v", err)
logger.Fatalf("failed to open stdio transport: %v", err)
}
defer func() {
_ = nc.Drain()
_ = nc.Close()
}()

dispatcher := NewDispatcher(nc, logger, metricsFile)
Expand All @@ -68,28 +68,28 @@ func main() {
}

type Dispatcher struct {
conn *nats.Conn
conn *transport.Transport
logger *log.Logger
metrics *metricRegistry
dispatcherSub *nats.Subscription
servers map[string]*nats.Subscription
dispatcherSub *transport.Listener
servers map[string]*transport.Listener
serversMu sync.Mutex
done chan struct{}
doneOnce sync.Once
}

func NewDispatcher(conn *nats.Conn, logger *log.Logger, metricsFile string) *Dispatcher {
func NewDispatcher(conn *transport.Transport, logger *log.Logger, metricsFile string) *Dispatcher {
return &Dispatcher{
conn: conn,
logger: logger,
metrics: newMetricRegistry(metricsFile),
servers: make(map[string]*nats.Subscription),
servers: make(map[string]*transport.Listener),
done: make(chan struct{}),
}
}

func (d *Dispatcher) Start(subject string) error {
sub, err := d.conn.Subscribe(subject, d.handleMessage)
sub, err := d.conn.Listen(subject, d.handleMessage)
if err != nil {
return err
}
Expand All @@ -105,7 +105,7 @@ func (d *Dispatcher) Done() <-chan struct{} {
func (d *Dispatcher) Close() error {
d.stopAllServers()
if d.dispatcherSub != nil {
if err := d.dispatcherSub.Drain(); err != nil {
if err := d.dispatcherSub.Close(); err != nil {
d.logger.Printf("failed to drain dispatcher subscription: %v", err)
}
}
Expand All @@ -119,10 +119,10 @@ func (d *Dispatcher) WriteMetrics() error {
return d.metrics.WriteToFile()
}

func (d *Dispatcher) handleMessage(msg *nats.Msg) {
func (d *Dispatcher) handleMessage(msg *transport.Envelope) {
response := buildErrorResponse()

if err := d.processCommand(msg.Data); err != nil {
if err := d.processCommand(msg.Payload); err != nil {
d.logger.Printf("dispatcher command failed: %v", err)
} else {
response = buildOKResponse()
Expand Down Expand Up @@ -205,7 +205,7 @@ func (d *Dispatcher) processCommand(data []byte) error {
}
}

func (d *Dispatcher) trackServer(cfg map[string]any, sub *nats.Subscription) error {
func (d *Dispatcher) trackServer(cfg map[string]any, sub *transport.Listener) error {
id := stringValue(cfg["id"])
if id == "" {
return errors.New("missing id in payload")
Expand All @@ -215,7 +215,7 @@ func (d *Dispatcher) trackServer(cfg map[string]any, sub *nats.Subscription) err
defer d.serversMu.Unlock()

if old, exists := d.servers[id]; exists {
_ = old.Drain()
_ = old.Close()
}
d.servers[id] = sub
return nil
Expand All @@ -233,20 +233,20 @@ func (d *Dispatcher) stopServer(id string) error {
return fmt.Errorf("server %s not found", id)
}

return sub.Drain()
return sub.Close()
}

func (d *Dispatcher) stopAllServers() {
d.serversMu.Lock()
subs := make([]*nats.Subscription, 0, len(d.servers))
subs := make([]*transport.Listener, 0, len(d.servers))
for _, sub := range d.servers {
subs = append(subs, sub)
}
d.servers = make(map[string]*nats.Subscription)
d.servers = make(map[string]*transport.Listener)
d.serversMu.Unlock()

for _, sub := range subs {
_ = sub.Drain()
_ = sub.Close()
}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func parseEnvelope(data []byte) (envelope, error) {
return envelope{Headers: headers, Body: body}, nil
}

func respond(msg *nats.Msg, env envelope) error {
func respond(msg *transport.Envelope, env envelope) error {
payload := []any{env.Headers, env.Body}
data, err := json.Marshal(payload)
if err != nil {
Expand Down Expand Up @@ -495,7 +495,7 @@ func parseServerConfig(cfg map[string]any) (serverConfig, error) {
return result, nil
}

func startClientTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription, error) {
func startClientTestServer(d *Dispatcher, rawCfg map[string]any) (*transport.Listener, error) {
cfg, err := parseServerConfig(rawCfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -524,12 +524,12 @@ func startClientTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscrip
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

reply, err := d.conn.RequestWithContext(ctx, cfg.ClientBackdoor, bytes)
reply, err := d.conn.Call(ctx, cfg.ClientBackdoor, bytes)
if err != nil {
return telepact.Message{}, err
}

return serializer.Deserialize(reply.Data)
return serializer.Deserialize(reply.Payload)
}

clientOptions := telepact.NewClientOptions()
Expand All @@ -550,7 +550,7 @@ func startClientTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscrip
generatedClient = newGeneratedTypedClient(client)
}

sub, err := d.conn.Subscribe(cfg.ClientFrontdoor, func(msg *nats.Msg) {
sub, err := d.conn.Listen(cfg.ClientFrontdoor, func(msg *transport.Envelope) {
d.handleClientRequest(msg, client, generatedClient, testClient, cfg)
})
if err != nil {
Expand All @@ -561,7 +561,7 @@ func startClientTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscrip
return sub, nil
}

func startMockTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription, error) {
func startMockTestServer(d *Dispatcher, rawCfg map[string]any) (*transport.Listener, error) {
cfg, err := parseServerConfig(rawCfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -600,9 +600,9 @@ func startMockTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscripti
return nil, err
}

sub, err := d.conn.Subscribe(cfg.FrontdoorTopic, func(msg *nats.Msg) {
sub, err := d.conn.Listen(cfg.FrontdoorTopic, func(msg *transport.Envelope) {
start := time.Now()
resp, err := mockServer.Process(msg.Data)
resp, err := mockServer.Process(msg.Payload)
if err != nil {
d.logger.Printf("mock server process error: %v", err)
_ = msg.Respond(buildUnknownPayload())
Expand All @@ -625,7 +625,7 @@ func startMockTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscripti
return sub, nil
}

func startSchemaTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription, error) {
func startSchemaTestServer(d *Dispatcher, rawCfg map[string]any) (*transport.Listener, error) {
cfg, err := parseServerConfig(rawCfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -702,7 +702,7 @@ func startSchemaTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscrip
return nil, err
}

sub, err := d.conn.Subscribe(cfg.FrontdoorTopic, func(msg *nats.Msg) {
sub, err := d.conn.Listen(cfg.FrontdoorTopic, func(msg *transport.Envelope) {
d.handleServerRequest(server, cfg.FrontdoorTopic, msg)
})
if err != nil {
Expand All @@ -713,7 +713,7 @@ func startSchemaTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscrip
return sub, nil
}

func startTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription, error) {
func startTestServer(d *Dispatcher, rawCfg map[string]any) (*transport.Listener, error) {
cfg, err := parseServerConfig(rawCfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -760,13 +760,13 @@ func startTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription,
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()

reply, err := d.conn.RequestWithContext(ctx, cfg.BackdoorTopic, payloadBytes)
reply, err := d.conn.Call(ctx, cfg.BackdoorTopic, payloadBytes)
if err != nil {
return telepact.Message{}, err
}

var response []any
decoder := json.NewDecoder(bytes.NewReader(reply.Data))
decoder := json.NewDecoder(bytes.NewReader(reply.Payload))
decoder.UseNumber()
if err := decoder.Decode(&response); err != nil {
return telepact.Message{}, err
Expand Down Expand Up @@ -862,18 +862,18 @@ func startTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription,
return nil, err
}

sub, err := d.conn.Subscribe(cfg.FrontdoorTopic, func(msg *nats.Msg) {
sub, err := d.conn.Listen(cfg.FrontdoorTopic, func(msg *transport.Envelope) {
start := time.Now()
var (
resp telepact.Response
err error
)

if serveAlternate.Load() {
resp, err = alternateServer.Process(msg.Data)
resp, err = alternateServer.Process(msg.Payload)
} else {
override := map[string]any{"@override": "new"}
resp, err = server.ProcessWithHeaders(msg.Data, override)
resp, err = server.ProcessWithHeaders(msg.Payload, override)
}

if err != nil {
Expand All @@ -898,9 +898,9 @@ func startTestServer(d *Dispatcher, rawCfg map[string]any) (*nats.Subscription,
return sub, nil
}

func (d *Dispatcher) handleServerRequest(server *telepact.Server, topic string, msg *nats.Msg) {
func (d *Dispatcher) handleServerRequest(server *telepact.Server, topic string, msg *transport.Envelope) {
start := time.Now()
resp, err := server.Process(msg.Data)
resp, err := server.Process(msg.Payload)
if err != nil {
d.logger.Printf("server.process error: %v", err)
_ = msg.Respond(buildUnknownPayload())
Expand All @@ -917,15 +917,15 @@ func (d *Dispatcher) handleServerRequest(server *telepact.Server, topic string,
}

func (d *Dispatcher) handleClientRequest(
msg *nats.Msg,
msg *transport.Envelope,
client *telepact.Client,
generatedClient *generatedTypedClient,
testClient *telepact.TestClient,
cfg serverConfig,
) {
start := time.Now()

request, err := deserializePseudoJSON(msg.Data)
request, err := deserializePseudoJSON(msg.Payload)
if err != nil {
d.logger.Printf("client request decode error: %v", err)
_ = msg.Respond(buildUnknownPayload())
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func buildUnknownPayload() []byte {
return bytes
}

func respondWithBytes(msg *nats.Msg, data []byte) error {
func respondWithBytes(msg *transport.Envelope, data []byte) error {
return msg.Respond(data)
}

Expand Down
3 changes: 0 additions & 3 deletions test/lib/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/telepact/telepact/test/lib/go
go 1.22

require (
github.com/nats-io/nats.go v1.37.0
github.com/prometheus/client_golang v1.19.0
github.com/telepact/telepact/lib/go v0.0.0
)
Expand All @@ -12,8 +11,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions test/lib/go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
Expand Down
Loading
Loading