diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 3bdd1fc..dd652c0 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -20,6 +20,38 @@ updates: versions: - "1.7.0" - "1.7.1" + - package-ecosystem: gomod + directory: "/api" + schedule: + interval: "weekly" + groups: + gomod: + update-types: + - minor + - patch + - package-ecosystem: gomod + directory: "/tests/integration" + schedule: + interval: "weekly" + groups: + gomod: + update-types: + - minor + - patch + ignore: + - dependency-name: github.com/rabbitmq/rabbitmq-stream-go-client + versions: + - "1.7.0" + - "1.7.1" + - package-ecosystem: gomod + directory: "/tools/anarkey" + schedule: + interval: "weekly" + groups: + gomod: + update-types: + - minor + - patch - package-ecosystem: github-actions directory: "/" schedule: diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e72b520..be3da66 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -62,8 +62,9 @@ jobs: if: always() uses: ludeeus/action-shellcheck@00b27aa7cb85167568cb48a3838b75f4265f2bca + # yamllint does not exist in the act image so skip it if we're in act - name: Run yamllint - if: always() + if: ${{ !env.ACT }} run: | yamllint -c .github/linters/yamllint.yml . diff --git a/.gitignore b/.gitignore index a07ee27..5fa637e 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ coverage/ *.out gitleaks .DS_Store +tools/anarkey/logs \ No newline at end of file diff --git a/internal/provider/connectors/amqp091/amqp091.go b/internal/provider/connectors/amqp091/amqp091.go index 8e96026..b833ba3 100644 --- a/internal/provider/connectors/amqp091/amqp091.go +++ b/internal/provider/connectors/amqp091/amqp091.go @@ -1262,7 +1262,7 @@ func (prov *amqp091provider) streamSubscribe(ctx context.Context, bd *BrokerDeta if err := latch.Increment(); err != nil { return } - messageChannel <- m + stm := streamMessage{Body: message.GetData(), Headers: m.GetHeaders()} stm.Ack = func() { latch.Decrement() @@ -1280,7 +1280,11 @@ func (prov *amqp091provider) streamSubscribe(ctx context.Context, bd *BrokerDeta util.Logger.Tracef("Nack of message(%s) on stream %s%s with offset %d", messageUUID, ctx.Consumer.GetStreamName(), consumerGroup, ctx.Consumer.GetOffset()) latch.Decrement() } + + // add the stream message to active messages before we send it to the client to avoid the client + // potentially acking before we add it to active messages (can happen in low latency envs bd.activeMessages.Add(messageUUID, stm) + messageChannel <- m atomic.AddInt64(&bd.consumed, 1) } @@ -1933,7 +1937,7 @@ func (bd *BrokerDetails) loadExchanges() { return } if marshErr := json.Unmarshal(body, &results); marshErr != nil { - util.Logger.Debugf("Error unmarshaling exchanges from management API: %s", err.Error()) + util.Logger.Debugf("Error unmarshaling exchanges from management API: %s", marshErr.Error()) return } util.Logger.Debugf("Loaded exchanges from management API: %s", string(body)) diff --git a/internal/provider/connectors/amqp091/streamshim.go b/internal/provider/connectors/amqp091/streamshim.go index 56da5d8..c2bafdc 100644 --- a/internal/provider/connectors/amqp091/streamshim.go +++ b/internal/provider/connectors/amqp091/streamshim.go @@ -274,7 +274,7 @@ func (sc *streamConnection) GetLastOffset(streamName string, consumerName string } func (sc *streamConnection) StoreOffset(streamName string, consumerName string, offset int64) error { - util.Logger.Debugf("StoreOffset (%s)(%s)(%d)", consumerName, streamName, offset) + util.Logger.Tracef("StoreOffset (%s)(%s)(%d)", consumerName, streamName, offset) return sc.env.StoreOffset(consumerName, streamName, offset) } diff --git a/tests/integration/go.mod b/tests/integration/go.mod index e5f99f6..edd44eb 100644 --- a/tests/integration/go.mod +++ b/tests/integration/go.mod @@ -52,7 +52,7 @@ require ( golang.org/x/oauth2 v0.35.0 // indirect golang.org/x/sys v0.42.0 // indirect golang.org/x/term v0.41.0 // indirect - golang.org/x/text v0.35.0 // indirect + golang.org/x/text v0.36.0 // indirect golang.org/x/time v0.15.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect diff --git a/tests/integration/go.sum b/tests/integration/go.sum index d70ae23..5cf4fa5 100644 --- a/tests/integration/go.sum +++ b/tests/integration/go.sum @@ -118,8 +118,8 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= -golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= +golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= @@ -131,12 +131,12 @@ golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= -golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= -golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= -golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= -golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= diff --git a/tools/anarkey/Makefile b/tools/anarkey/Makefile new file mode 100644 index 0000000..2a0d1e4 --- /dev/null +++ b/tools/anarkey/Makefile @@ -0,0 +1,4 @@ +.PHONY: build + +build: + go build -o build/anarkey . diff --git a/tools/anarkey/README.md b/tools/anarkey/README.md new file mode 100644 index 0000000..5c018fe --- /dev/null +++ b/tools/anarkey/README.md @@ -0,0 +1,355 @@ +# Anarkey + +An interactive load testing tool for the Arke message broker proxy, +built with [Bubble Tea](https://github.com/charmbracelet/bubbletea). + +## Features + +- **Interactive TUI**: User-friendly terminal interface for configuration and monitoring +- **Dynamic Connection Management**: Scale connections up or down during runtime +- **Publisher/Consumer Support**: Test both message publishing and consumption +- **Stream/Queue Support**: Works with both queues and streams +- **Real-time Metrics**: Monitor message counts and throughput rates +- **Flexible Message Generation**: Configure message count or run continuously +- **Debug Logging**: Automatic logging to `logs/` directory for troubleshooting + +## Building + +```bash +go build -o build/anarkey . +``` + +or + +```bash +make build +``` + +## Usage + +Run the tool: + +```bash +./build/anarkey +``` + +Or with a saved configuration file to skip configuration screens: + +```bash +./build/anarkey -config my-config.json +``` + +### Configuration Screens + +The tool will guide you through several configuration screens: + +### 1. Arke Server Configuration + +Configure connection to the Arke proxy server: + +- **Arke Hostname**: The hostname or IP of the Arke server +(default: `localhost`) +- **Arke Port**: The gRPC port (default: `50051`) +- **Arke TLS**: Enable TLS for Arke connection (`true`/`false`) + +### 2. Broker Configuration + +Configure the message broker that Arke connects to: + +- **Broker Hostname**: The hostname or IP of the message broker +(default: `localhost`) +- **Broker Port**: The broker port (default: `5672` for RabbitMQ) +- **Username**: Broker authentication username (default: `guest`) +- **Password**: Broker authentication password (default: `guest`) +- **Broker TLS**: Enable TLS for broker connection (`true`/`false`) + +### 3. Connection Configuration + +Configure the load test connections for both publishers and consumers: + +- **Number of Publisher Connections**: How many publisher connections to +create (default: `1`) +- **Number of Consumer Connections**: How many consumer connections to +create (default: `1`) + +### 4. Stream Configuration + +Configure the publish/consume streams for both types: + +- **Number of Publisher Streams**: How many publisher streams per publisher +connection (default: `1`) +- **Number of Consumer Streams**: How many consumer streams per consumer +connection (default: `1`) +- **Source Type**: Either `queue` or `stream` +- **Source Name**: The name of the source/queue/stream (default: `test-source`) +- **Address Name**: The address name for routing (default: `test-address`) +- **Subject**: The routing key/subject for message routing (default: +`test.routing.key`) + - For queue sources: Used as the routing key to bind the queue to the + topic exchange + - For stream sources: Used as the subject for stream addressing +- **Message Count**: Number of messages to publish per publisher stream, or +`0` for continuous (default: `100`) +- **Publish Rate Limit**: Maximum messages per second per publisher stream, +or `0` for unlimited (default: `0`) + - When set to `0`: Publishers send messages as fast as possible + - When set to a positive integer: Each publisher stream is limited to that + many messages per second + - Example: Setting `1000` limits each publisher stream to 1000 msg/s +- **Message File**: Optional path to a JSON file containing a message template +with headers and body (default: empty) + - When left blank: Auto-generated messages are used (e.g., "Load test + message 1 from publisher 0") + - When specified: All publishers use the message template from the file +- **Save Config File**: Optional path to save all configuration settings +to a JSON file (default: empty) + - When specified: Configuration is saved to the file for later reuse + - Can be loaded with `./build/anarkey -config ` to skip + configuration screens + +**Note**: When using `queue` as the source type, the Address type is +automatically set to `TOPIC` to enable routing key functionality. Stream +sources use `STREAM` as the Address type. + +#### Saving and Loading Configurations + +You can save your configuration settings to a JSON file for reuse: + +1. On the Stream Configuration screen, enter a filename in the "Save Config +File" field (e.g., `my-test-config.json`) +2. Complete the configuration and start the load test +3. The configuration will be saved to the specified file + +To reuse a saved configuration: + +```bash +./build/anarkey -config my-test-config.json +``` + +This will load all settings from the file and immediately start the load test, +taking you directly to the Running screen. + +#### Example Configurations + +Two ready-to-use configuration files are included: + +- **[example-config-queue.json](example-config-queue.json)** — Queue-based +load test with 5 publisher connections (10 streams each), 3 consumer connections +(5 streams each), a rate limit of 1000 msg/s per stream, and a total of +10000 messages per publisher stream. +- **[example-config-stream.json](example-config-stream.json)** — Stream-based +load test with 1 publisher connection and 1 consumer connection, running +continuously at 2000 msg/s per stream. + +Both files reference [example-message.json](example-message.json) as the message +template. Run either directly with: + +```bash +./build/anarkey -config example-config-queue.json +./build/anarkey -config example-config-stream.json +``` + +#### Custom Message Files + +You can provide a JSON file to customize the message content and headers. +The JSON file should have the following format: + + +```json +{ + "headers": { + "Content-Type": "application/json", + "X-Custom-Header": "custom-value", + "Priority": "high" + }, + "body": "{\"order_id\": \"12345\", \"customer\": \"John Doe\", \"items\": [{\"sku\": \"ABC-123\", \"quantity\": 2}], \"total\": 49.99}" +} +``` + + +**Fields:** + +- `headers` (object, optional): Key-value pairs for message headers +- `body` (string, required): The message body content (can be JSON string, +plain text, etc.) + +**UUID Replacement:** + +The body (and header values) support a special `@id@` placeholder that will +be replaced with a unique UUID for each published message: + +- Use `@id@` anywhere in the body or header values +- Each occurrence is replaced with the same UUID per message +- A new UUID is generated for each message published +- Parsing is optimized - the body is only parsed once when the file is loaded + +**Example with UUID:** + +```json +{ + "headers": { + "x-correlation-id": "@id@", + "source": "load-test" + }, + "body": "{\"id\":\"@id@\",\"event\":\"test.event\",\"data\":{\"uniqueId\":\"@id@\"}}" +} +``` + +**Example:** See [example-message.json](example-message.json) for a complete example. + +When a message file is provided: + +- All publishers will use the same message template +- Headers from the file are included in every published message +- The body content is sent as-is (no message counter is added) +- If `@id@` placeholders are present, each message gets a unique UUID +- The file is loaded once when publishers start (optimized parsing) + +### 5. Running Screen + +Once started, the tool displays real-time metrics and allows dynamic +connection and stream scaling: + +- Publisher and consumer connection counts with streams per connection +- Total published messages and rate (msg/s) +- Total consumed messages and rate (msg/s) +- **Runtime Connection Scaling**: Adjust the number of publisher and consumer connections + - Use arrow keys or j/k to move between fields + - Type the desired number of connections for publishers or consumers + - Enter `0` to close all connections of that type + - Press Enter to apply changes immediately + - New connections are automatically configured with the current stream settings +- **Runtime Stream Scaling**: Adjust publisher and consumer stream counts per connection + - Use arrow keys or j/k to move between the stream count fields + - Type the desired number of streams for publishers or consumers + - Enter `0` to stop all streams of that type + - Press Enter to apply the changes to all connections of that type + - Changes take effect immediately without restarting +- **Runtime Rate Limiting**: Adjust publisher rate limits in real-time + - Use arrow keys or j/k to navigate to the "Publish Rate Limit" field + - Type the desired rate in messages per second (or `0` for unlimited) + - Press Enter to apply the new rate limit to all publisher streams + - All publisher streams are restarted with the new rate limit + - Changes take effect immediately + +Press `q` or `Ctrl+C` to stop the load test and exit. + +## Navigation + +- **Arrow Keys** or **j/k**: Move between fields +- **Text Input**: Type values directly into fields +- **Backspace**: Delete characters +- **Enter**: Proceed to next screen or start the load test +- **q** or **Ctrl+C**: Quit the application + +## Architecture + +The load tool consists of several key components: + +- **TUI (tui.go)**: Bubble Tea-based terminal user interface +- **Connection Manager (connection.go)**: Manages gRPC connections to Arke +- **Publisher (publisher.go)**: Handles message publishing with declare-only pre-declaration +- **Consumer (consumer.go)**: Handles message consumption and acknowledgment +- **Metrics (types.go)**: Tracks message counts and calculates rates + +## Publisher Behavior + +For publisher connections: + +1. Creates a declare-only consume stream to ensure the source exists +2. Closes the declare-only stream +3. Opens publish streams and begins sending messages +4. Each message is tracked and metrics are updated in real-time +5. **Rate Limiting**: + - When `PublishRateLimit` is set to `0`: Messages are sent as fast as possible + - When `PublishRateLimit` is a positive integer: Messages are rate-limited + using a ticker + - The rate limit is per publisher stream (e.g., 1000 msg/s with 5 streams = 5000 + total msg/s) + - Rate limits can be changed at runtime, which restarts all publisher streams + with the new limit +6. **Completion Behavior**: + - If a non-zero message count is specified, the publisher stream stops after sending + that many messages + - Completed publishers are marked as finished and will **not be restarted** + - When scaling publisher streams, only active (non-completed) publishers are counted + - Continuous publishers (message count = 0) run indefinitely until manually stopped + +## Consumer Behavior + +For consumer connections: + +1. Creates consume streams with the configured source +2. Receives messages and automatically acknowledges them +3. Updates metrics for each consumed message + +## Example Scenarios + +### High-Volume Publisher and Consumer Test + +```text +Publisher Connections: 5 +Consumer Connections: 3 +Publisher Streams per Connection: 10 +Consumer Streams per Connection: 5 +Message Count: 0 (continuous) +``` + +### Stream Testing with Multiple Publishers + +```text +Publisher Connections: 3 +Consumer Connections: 2 +Publisher Streams per Connection: 5 +Consumer Streams per Connection: 3 +Source Type: stream +Message Count: 10000 +``` + +### Balanced Load Test + +```text +Publisher Connections: 2 +Consumer Connections: 2 +Publisher Streams per Connection: 3 +Consumer Streams per Connection: 3 +Source Type: queue +Message Count: 5000 +``` + +### Runtime Scaling Example + +Start with minimal configuration and scale up during runtime: + +1. Start with 1 publisher connection, 1 stream +2. Once running, increase publisher streams to 10 +3. Add consumer connections at startup (1 connection, 5 streams) +4. Scale consumer streams up to 10 during runtime + +## Requirements + +- Go 1.25 or later +- Running Arke server +- Accessible message broker (RabbitMQ) + +## Debugging + +The tool automatically creates detailed logs in the `logs/` directory +with timestamps (e.g., `logs/anarkey-20240206-153045.log`). Logs include: + +- Connection establishment and teardown +- Publisher/consumer lifecycle events +- Message sending/receiving errors +- Configuration changes and runtime scaling operations +- Detailed error messages with context + +Check the logs for troubleshooting connection issues, understanding message +flow, or debugging unexpected behavior. + +## Notes + +- The tool automatically creates sources with declare-only mode before publishing +- Metrics update every second +- All gRPC streams are managed with proper context cancellation +- Connections are gracefully closed on exit +- Log files are created in the `logs/` directory with timestamps for each run diff --git a/tools/anarkey/connection.go b/tools/anarkey/connection.go new file mode 100644 index 0000000..8b6e6ff --- /dev/null +++ b/tools/anarkey/connection.go @@ -0,0 +1,236 @@ +package main + +import ( + "context" + "crypto/tls" + "fmt" + "sync" + + "github.com/sassoftware/arke/api" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +// ConnectionManager manages multiple connections to the arke server +type ConnectionManager struct { + config *Config + connections []*Connection + mu sync.RWMutex + grpcConn *grpc.ClientConn + metrics *Metrics +} + +// Connection represents a single connection to arke +type Connection struct { + ID int + Type ConnectionType + grpcConn *grpc.ClientConn + ProducerClient api.ProducerClient + ConsumerClient api.ConsumerClient + config *Config + metrics *Metrics + publishers []*Publisher + consumers []*Consumer + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// NewConnectionManager creates a new connection manager +func NewConnectionManager(config *Config, metrics *Metrics) *ConnectionManager { + return &ConnectionManager{ + config: config, + connections: make([]*Connection, 0), + metrics: metrics, + } +} + +// createGRPCConnection establishes a new gRPC connection to the arke server +func (cm *ConnectionManager) createGRPCConnection() (*grpc.ClientConn, error) { + address := fmt.Sprintf("%s:%d", cm.config.ArkeHost, cm.config.ArkePort) + + var opts []grpc.DialOption + if cm.config.ArkeTLS { + // Skip certificate verification for development tool + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + } + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + conn, err := grpc.NewClient(address, opts...) + if err != nil { + return nil, fmt.Errorf("failed to connect to arke server: %w", err) + } + + Logger.Printf("Established new gRPC connection to arke server at %s", address) + return conn, nil +} + +// CreateConnection creates a new connection +func (cm *ConnectionManager) CreateConnection(ctx context.Context, connType ConnectionType) error { + cm.mu.Lock() + defer cm.mu.Unlock() + + Logger.Printf("Creating %v connection to broker %s:%d", connType, cm.config.BrokerHost, cm.config.BrokerPort) + + // Each connection needs its own TCP connection to Arke + grpcConn, err := cm.createGRPCConnection() + if err != nil { + Logger.Printf("Failed to create gRPC connection: %v", err) + return fmt.Errorf("failed to create gRPC connection: %w", err) + } + + connCtx, cancel := context.WithCancel(ctx) + conn := &Connection{ + ID: len(cm.connections), + Type: connType, + grpcConn: grpcConn, + ProducerClient: api.NewProducerClient(grpcConn), + ConsumerClient: api.NewConsumerClient(grpcConn), + config: cm.config, + metrics: cm.metrics, + publishers: make([]*Publisher, 0), + consumers: make([]*Consumer, 0), + ctx: connCtx, + cancel: cancel, + } + + Logger.Printf("Connection %d: created with type %v", conn.ID, connType) + + // Connect to broker + connConfig := &api.ConnectionConfiguration{ + Host: cm.config.BrokerHost, + Port: int32(cm.config.BrokerPort), + Provider: cm.config.BrokerProvider, + Tls: cm.config.BrokerTLS, + Credentials: &api.Credentials{ + Username: cm.config.BrokerUsername, + Password: cm.config.BrokerPassword, + }, + } + + if connType == ConnectionTypePublisher { + Logger.Printf("Connection %d: connecting as PRODUCER", conn.ID) + resp, err := conn.ProducerClient.Connect(ctx, connConfig) + if err != nil { + cancel() + return fmt.Errorf("failed to connect producer: %w", err) + } + if !resp.Success { + cancel() + return fmt.Errorf("producer connection failed: %s", resp.Error.Message) + } + } else { + Logger.Printf("Connection %d: connecting as CONSUMER", conn.ID) + resp, err := conn.ConsumerClient.Connect(ctx, connConfig) + if err != nil { + cancel() + return fmt.Errorf("failed to connect consumer: %w", err) + } + if !resp.Success { + cancel() + return fmt.Errorf("consumer connection failed: %s", resp.Error.Message) + } + } + + cm.connections = append(cm.connections, conn) + + Logger.Printf("Connection %d (%v) created successfully", conn.ID, connType) + + return nil +} + +// ScaleConnections scales the number of connections +func (cm *ConnectionManager) ScaleConnections(ctx context.Context, count int, connType ConnectionType) error { + Logger.Printf("Scaling %v connections to %d", connType, count) + + cm.mu.Lock() + // Count existing connections of this type + var existingConns []*Connection + var otherConns []*Connection + for _, conn := range cm.connections { + if conn.Type == connType { + Logger.Printf("ScaleConnections: found existing %v connection (ID: %d)", connType, conn.ID) + existingConns = append(existingConns, conn) + } else { + Logger.Printf("ScaleConnections: found other type %v connection (ID: %d)", conn.Type, conn.ID) + otherConns = append(otherConns, conn) + } + } + currentCount := len(existingConns) + Logger.Printf("ScaleConnections: current %v count: %d, target: %d", connType, currentCount, count) + cm.mu.Unlock() + + if count > currentCount { + // Add connections + for i := currentCount; i < count; i++ { + if err := cm.CreateConnection(ctx, connType); err != nil { + return err + } + } + } else if count < currentCount { + // Remove connections of this type from the end + cm.mu.Lock() + for i := count; i < currentCount; i++ { + if existingConns[i] != nil { + existingConns[i].Close() + } + } + // Rebuild connections list with remaining connections of this type + all other type connections + cm.connections = append(existingConns[:count], otherConns...) + cm.mu.Unlock() + } + + return nil +} + +// GetConnections returns all connections +func (cm *ConnectionManager) GetConnections() []*Connection { + cm.mu.RLock() + defer cm.mu.RUnlock() + return cm.connections +} + +// Close closes all connections +func (cm *ConnectionManager) Close() { + cm.mu.Lock() + defer cm.mu.Unlock() + + for _, conn := range cm.connections { + if conn != nil { + conn.Close() + } + } +} + +// Close closes the connection +func (c *Connection) Close() { + c.mu.Lock() + defer c.mu.Unlock() + + // Stop all publishers and consumers + for _, p := range c.publishers { + p.Stop() + } + for _, cons := range c.consumers { + cons.Stop() + } + + // Disconnect + if c.Type == ConnectionTypePublisher && c.ProducerClient != nil { + c.ProducerClient.Disconnect(c.ctx, &api.Empty{}) + } else if c.Type == ConnectionTypeConsumer && c.ConsumerClient != nil { + c.ConsumerClient.Disconnect(c.ctx, &api.Empty{}) + } + + // Close the gRPC connection + if c.grpcConn != nil { + c.grpcConn.Close() + } + + c.cancel() +} diff --git a/tools/anarkey/consumer.go b/tools/anarkey/consumer.go new file mode 100644 index 0000000..da91370 --- /dev/null +++ b/tools/anarkey/consumer.go @@ -0,0 +1,208 @@ +package main + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/sassoftware/arke/api" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Consumer handles consuming messages +type Consumer struct { + ID int + conn *Connection + config *StreamConfig + stream api.Consumer_ConsumeClient + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex + stopped bool +} + +// NewConsumer creates a new consumer +func NewConsumer(id int, conn *Connection, config *StreamConfig) *Consumer { + ctx, cancel := context.WithCancel(conn.ctx) + return &Consumer{ + ID: id, + conn: conn, + config: config, + ctx: ctx, + cancel: cancel, + } +} + +// Start starts the consumer +func (c *Consumer) Start() error { + c.mu.Lock() + if c.stopped { + c.mu.Unlock() + Logger.Printf("Consumer %d: cannot start - already stopped", c.ID) + return fmt.Errorf("consumer already stopped") + } + c.mu.Unlock() + + Logger.Printf("Consumer %d: starting (source: %s, type: %v)", + c.ID, c.config.SourceName, c.config.SourceType) + + // Create consume stream + stream, err := c.conn.ConsumerClient.Consume(c.ctx) + if err != nil { + Logger.Printf("Consumer %d: failed to create consume stream: %v", c.ID, err) + return fmt.Errorf("failed to create consume stream: %w", err) + } + c.stream = stream + + // Determine address type based on source type + // Queue sources use TOPIC addresses, streams use STREAM addresses + addressType := api.Address_TOPIC + if c.config.SourceType == api.Source_STREAM { + addressType = api.Address_STREAM + } + + options := map[string]string{} + if c.config.SourceType == api.Source_STREAM { + options["Offset"] = "next" + } + + // Send source + source := &api.Source{ + Name: c.config.SourceName, + Type: c.config.SourceType, + Address: &api.Address{ + Name: c.config.AddressName, + Type: addressType, + Subjects: []string{c.config.Subject}, + }, + PrefetchCount: 40, + Options: options, + } + + err = stream.Send(&api.Consume{ + Msg: &api.Consume_Src{Src: source}, + }) + if err != nil { + return fmt.Errorf("failed to send source: %w", err) + } + + // Start consuming + go c.consume() + + return nil +} + +// consume receives and acknowledges messages +func (c *Consumer) consume() { + for { + resp, err := c.stream.Recv() + if err != nil { + if err == io.EOF || status.Code(err) == codes.Canceled { + return + } + Logger.Printf("Consumer %d: receive error: %v", c.ID, err) + return + } + + switch r := resp.Resp.(type) { + case *api.ConsumeResponse_Msg: + // Acknowledge the message + ack := &api.MessageConsumed{ + Uuid: r.Msg.Uuid, + Nack: false, + } + + err := c.stream.Send(&api.Consume{ + Msg: &api.Consume_Ack{Ack: ack}, + }) + if err != nil { + if status.Code(err) != codes.Canceled { + Logger.Printf("Consumer %d: failed to send ack: %v", c.ID, err) + } + return + } + + c.conn.metrics.IncrementConsumed() + + case *api.ConsumeResponse_ConsumedResponse: + // Message was acknowledged + if !r.ConsumedResponse.Success && r.ConsumedResponse.Error != nil { + Logger.Printf("Consumer %d: ack failed: %s", c.ID, r.ConsumedResponse.Error.Message) + } + + case *api.ConsumeResponse_Error: + // Error occurred + Logger.Printf("Consumer %d: consume error (fatal=%v): %s", c.ID, r.Error.IsFatal, r.Error.Message) + if r.Error.IsFatal { + return + } + } + } +} + +// Stop stops the consumer +func (c *Consumer) Stop() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.stopped { + return + } + + Logger.Printf("Consumer %d: stopping", c.ID) + + c.stopped = true + if c.stream != nil { + c.stream.CloseSend() + } + c.cancel() +} + +// AddConsumers adds consumers to a connection +func (c *Connection) AddConsumers(config *StreamConfig) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.Type != ConnectionTypeConsumer { + return fmt.Errorf("cannot add consumers to publisher connection") + } + + startID := len(c.consumers) + for i := 0; i < config.NumConsumerStreams; i++ { + consumer := NewConsumer(startID+i, c, config) + if err := consumer.Start(); err != nil { + return fmt.Errorf("failed to start consumer %d: %w", i, err) + } + c.consumers = append(c.consumers, consumer) + } + + return nil +} + +// ScaleConsumers scales the number of consumers +func (c *Connection) ScaleConsumers(config *StreamConfig) error { + c.mu.Lock() + currentCount := len(c.consumers) + c.mu.Unlock() + + if config.NumConsumerStreams > currentCount { + // Add consumers + newConfig := *config + newConfig.NumConsumerStreams = config.NumConsumerStreams - currentCount + return c.AddConsumers(&newConfig) + } else if config.NumConsumerStreams < currentCount { + // Remove consumers + c.mu.Lock() + for i := config.NumConsumerStreams; i < currentCount; i++ { + if c.consumers[i] != nil { + c.consumers[i].Stop() + } + } + c.consumers = c.consumers[:config.NumConsumerStreams] + c.mu.Unlock() + } + + return nil +} diff --git a/tools/anarkey/example-config-queue.json b/tools/anarkey/example-config-queue.json new file mode 100644 index 0000000..f8c4511 --- /dev/null +++ b/tools/anarkey/example-config-queue.json @@ -0,0 +1,27 @@ +{ + "config": { + "arke_host": "localhost", + "arke_port": 50051, + "arke_tls": false, + "broker_host": "localhost", + "broker_port": 5672, + "broker_username": "guest", + "broker_password": "guest", + "broker_tls": false, + "broker_provider": "amqp091", + "num_publisher_connections": 5, + "num_consumer_connections": 3 + }, + "stream_config": { + "num_publisher_streams": 10, + "num_consumer_streams": 5, + "source_type": 0, + "source_name": "test-queue", + "address_name": "test-address", + "subject": "test.routing.key", + "message_count": 10000, + "is_continuous": false, + "publish_rate_limit": 1000, + "message_file": "example-message.json" + } +} diff --git a/tools/anarkey/example-config-stream.json b/tools/anarkey/example-config-stream.json new file mode 100644 index 0000000..e6d88e5 --- /dev/null +++ b/tools/anarkey/example-config-stream.json @@ -0,0 +1,27 @@ +{ + "config": { + "arke_host": "localhost", + "arke_port": 50051, + "arke_tls": false, + "broker_host": "localhost", + "broker_port": 5672, + "broker_username": "guest", + "broker_password": "guest", + "broker_tls": false, + "broker_provider": "amqp091", + "num_publisher_connections": 1, + "num_consumer_connections": 1 + }, + "stream_config": { + "num_publisher_streams": 1, + "num_consumer_streams": 1, + "source_type": 2, + "source_name": "mystream", + "address_name": "mystream", + "subject": "mystream", + "message_count": 0, + "is_continuous": true, + "publish_rate_limit": 2000, + "message_file": "example-message.json" + } +} \ No newline at end of file diff --git a/tools/anarkey/example-message.json b/tools/anarkey/example-message.json new file mode 100644 index 0000000..6ebddb6 --- /dev/null +++ b/tools/anarkey/example-message.json @@ -0,0 +1,8 @@ +{ + "headers": { + "content-type": "application/json", + "x-correlation-id": "@id@", + "source": "load-test" + }, + "body": "{\"id\":\"@id@\",\"timestamp\":\"2026-02-13T00:00:00.000Z\",\"event\":\"test.event\",\"data\":{\"message\":\"Load test message with unique ID: @id@\"}}" +} diff --git a/tools/anarkey/go.mod b/tools/anarkey/go.mod new file mode 100644 index 0000000..4252e72 --- /dev/null +++ b/tools/anarkey/go.mod @@ -0,0 +1,36 @@ +module github.com/sassoftware/arke/tools/anarkey + +go 1.25.0 + +require ( + github.com/charmbracelet/bubbletea v1.3.10 + github.com/charmbracelet/lipgloss v1.1.0 + github.com/google/uuid v1.6.0 + github.com/sassoftware/arke/api v1.13.0 + google.golang.org/grpc v1.80.0 +) + +require ( + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/charmbracelet/colorprofile v0.4.3 // indirect + github.com/charmbracelet/x/ansi v0.11.7 // indirect + github.com/charmbracelet/x/cellbuf v0.0.15 // indirect + github.com/charmbracelet/x/term v0.2.2 // indirect + github.com/clipperhouse/displaywidth v0.11.0 // indirect + github.com/clipperhouse/uax29/v2 v2.7.0 // indirect + github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect + github.com/lucasb-eyer/go-colorful v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.21 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect + github.com/mattn/go-runewidth v0.0.23 // indirect + github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/termenv v0.16.0 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + golang.org/x/net v0.53.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/text v0.36.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 // indirect + google.golang.org/protobuf v1.36.11 // indirect +) diff --git a/tools/anarkey/go.sum b/tools/anarkey/go.sum new file mode 100644 index 0000000..02ef379 --- /dev/null +++ b/tools/anarkey/go.sum @@ -0,0 +1,81 @@ +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/charmbracelet/bubbletea v1.3.10 h1:otUDHWMMzQSB0Pkc87rm691KZ3SWa4KUlvF9nRvCICw= +github.com/charmbracelet/bubbletea v1.3.10/go.mod h1:ORQfo0fk8U+po9VaNvnV95UPWA1BitP1E0N6xJPlHr4= +github.com/charmbracelet/colorprofile v0.4.3 h1:QPa1IWkYI+AOB+fE+mg/5/4HRMZcaXex9t5KX76i20Q= +github.com/charmbracelet/colorprofile v0.4.3/go.mod h1:/zT4BhpD5aGFpqQQqw7a+VtHCzu+zrQtt1zhMt9mR4Q= +github.com/charmbracelet/lipgloss v1.1.0 h1:vYXsiLHVkK7fp74RkV7b2kq9+zDLoEU4MZoFqR/noCY= +github.com/charmbracelet/lipgloss v1.1.0/go.mod h1:/6Q8FR2o+kj8rz4Dq0zQc3vYf7X+B0binUUBwA0aL30= +github.com/charmbracelet/x/ansi v0.11.7 h1:kzv1kJvjg2S3r9KHo8hDdHFQLEqn4RBCb39dAYC84jI= +github.com/charmbracelet/x/ansi v0.11.7/go.mod h1:9qGpnAVYz+8ACONkZBUWPtL7lulP9No6p1epAihUZwQ= +github.com/charmbracelet/x/cellbuf v0.0.15 h1:ur3pZy0o6z/R7EylET877CBxaiE1Sp1GMxoFPAIztPI= +github.com/charmbracelet/x/cellbuf v0.0.15/go.mod h1:J1YVbR7MUuEGIFPCaaZ96KDl5NoS0DAWkskup+mOY+Q= +github.com/charmbracelet/x/term v0.2.2 h1:xVRT/S2ZcKdhhOuSP4t5cLi5o+JxklsoEObBSgfgZRk= +github.com/charmbracelet/x/term v0.2.2/go.mod h1:kF8CY5RddLWrsgVwpw4kAa6TESp6EB5y3uxGLeCqzAI= +github.com/clipperhouse/displaywidth v0.11.0 h1:lBc6kY44VFw+TDx4I8opi/EtL9m20WSEFgwIwO+UVM8= +github.com/clipperhouse/displaywidth v0.11.0/go.mod h1:bkrFNkf81G8HyVqmKGxsPufD3JhNl3dSqnGhOoSD/o0= +github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJv2v7Vk= +github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/lucasb-eyer/go-colorful v1.4.0 h1:UtrWVfLdarDgc44HcS7pYloGHJUjHV/4FwW4TvVgFr4= +github.com/lucasb-eyer/go-colorful v1.4.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.21 h1:xYae+lCNBP7QuW4PUnNG61ffM4hVIfm+zUzDuSzYLGs= +github.com/mattn/go-isatty v0.0.21/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= +github.com/mattn/go-runewidth v0.0.23 h1:7ykA0T0jkPpzSvMS5i9uoNn2Xy3R383f9HDx3RybWcw= +github.com/mattn/go-runewidth v0.0.23/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc= +github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/sassoftware/arke/api v1.13.0 h1:LKRUQccwVLm7dJ1aPm2fFn5YzXyhUz0LpdUThOHjjZo= +github.com/sassoftware/arke/api v1.13.0/go.mod h1:C5d6FXOBNXQo8jeuiqwYjqH1EMMxVnfmhSevxw39FBA= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 h1:RmoJA1ujG+/lRGNfUnOMfhCy5EipVMyvUE+KNbPbTlw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= diff --git a/tools/anarkey/logger.go b/tools/anarkey/logger.go new file mode 100644 index 0000000..5744555 --- /dev/null +++ b/tools/anarkey/logger.go @@ -0,0 +1,35 @@ +package main + +import ( + "fmt" + "log" + "os" + "path/filepath" + "time" +) + +var Logger *log.Logger + +// InitLogger initializes the logger to write to a file +func InitLogger() error { + // Create logs directory if it doesn't exist + logsDir := "logs" + if err := os.MkdirAll(logsDir, 0755); err != nil { + return fmt.Errorf("failed to create logs directory: %w", err) + } + + // Create log file with timestamp + timestamp := time.Now().Format("20060102-150405") + logFile := filepath.Join(logsDir, fmt.Sprintf("anarkey-%s.log", timestamp)) + + file, err := os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return fmt.Errorf("failed to open log file: %w", err) + } + + Logger = log.New(file, "", log.LstdFlags|log.Lshortfile) + Logger.Printf("=== Anarkey Started ===") + Logger.Printf("Log file: %s", logFile) + + return nil +} diff --git a/tools/anarkey/main.go b/tools/anarkey/main.go new file mode 100644 index 0000000..b722f27 --- /dev/null +++ b/tools/anarkey/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "flag" + "fmt" + "os" + + tea "github.com/charmbracelet/bubbletea" +) + +func main() { + // Parse command line flags + configFile := flag.String("config", "", "Path to configuration JSON file (skip config screens)") + flag.Parse() + + // Initialize logger + if err := InitLogger(); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to initialize logger: %v\n", err) + } + + var model *Model + + // Load config from file if provided + if *configFile != "" { + config, streamConfig, err := LoadConfig(*configFile) + if err != nil { + fmt.Fprintf(os.Stderr, "Error loading config file: %v\n", err) + os.Exit(1) + } + model = NewModelWithConfig(config, streamConfig) + } else { + model = NewModel() + } + + p := tea.NewProgram(model, tea.WithAltScreen()) + + if _, err := p.Run(); err != nil { + fmt.Printf("Error running load tool: %v\n", err) + os.Exit(1) + } +} diff --git a/tools/anarkey/publisher.go b/tools/anarkey/publisher.go new file mode 100644 index 0000000..5fabe97 --- /dev/null +++ b/tools/anarkey/publisher.go @@ -0,0 +1,391 @@ +package main + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/sassoftware/arke/api" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Publisher handles publishing messages +type Publisher struct { + ID int + conn *Connection + config *StreamConfig + stream api.Producer_PublishClient + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex + stopped bool + completed bool // Set to true when message count is reached + messageCount int + messageTemplate *MessageTemplate // loaded from file if MessageFile is specified +} + +// NewPublisher creates a new publisher +func NewPublisher(id int, conn *Connection, config *StreamConfig) *Publisher { + ctx, cancel := context.WithCancel(conn.ctx) + return &Publisher{ + ID: id, + conn: conn, + config: config, + ctx: ctx, + cancel: cancel, + } +} + +// Start starts the publisher +func (p *Publisher) Start() error { + p.mu.Lock() + if p.stopped || p.completed { + p.mu.Unlock() + Logger.Printf("Publisher %d: cannot start - already stopped or completed", p.ID) + return fmt.Errorf("publisher already stopped or completed") + } + p.mu.Unlock() + + Logger.Printf("Publisher %d: starting (rate limit: %d msg/s, message count: %d, continuous: %v)", + p.ID, p.config.PublishRateLimit, p.config.MessageCount, p.config.IsContinuous) + + // Load message template from file if specified + if p.config.MessageFile != "" { + Logger.Printf("Publisher %d: loading message template from %s", p.ID, p.config.MessageFile) + template, err := LoadMessageFromFile(p.config.MessageFile) + if err != nil { + Logger.Printf("Publisher %d: failed to load message file: %v", p.ID, err) + return fmt.Errorf("failed to load message file: %w", err) + } + p.messageTemplate = template + Logger.Printf("Publisher %d: loaded message template with %d headers, body length: %d", + p.ID, len(template.Headers), len(template.Body)) + } + + // First, create a declare-only consumer if this is for publishing + if err := p.declareSource(); err != nil { + Logger.Printf("Publisher %d: failed to declare source: %v", p.ID, err) + return fmt.Errorf("failed to declare source: %w", err) + } + + // STREAM sources use the unary PublishOne RPC; QUEUE sources use the + // bidirectional Publish stream for better throughput. + if p.config.SourceType != api.Source_STREAM { + stream, err := p.conn.ProducerClient.Publish(p.ctx) + if err != nil { + Logger.Printf("Publisher %d: failed to create publish stream: %v", p.ID, err) + return fmt.Errorf("failed to create publish stream: %w", err) + } + p.stream = stream + Logger.Printf("Publisher %d: publish stream created successfully", p.ID) + } else { + Logger.Printf("Publisher %d: using PublishOne for stream source", p.ID) + } + + // Start publishing + go p.publish() + + return nil +} + +// declareSource creates a declare-only consume stream to ensure the source exists +func (p *Publisher) declareSource() error { + // Create consume stream + consumeStream, err := p.conn.ConsumerClient.Consume(p.ctx) + if err != nil { + return err + } + + // Determine address type based on source type + // Queue sources use TOPIC addresses, streams use STREAM addresses + addressType := api.Address_TOPIC + if p.config.SourceType == api.Source_STREAM { + addressType = api.Address_STREAM + } + + // Send source with declare_only = true + source := &api.Source{ + Name: p.config.SourceName, + Type: p.config.SourceType, + DeclareOnly: true, + Address: &api.Address{ + Name: p.config.AddressName, + Type: addressType, + Subjects: []string{p.config.Subject}, + }, + } + + err = consumeStream.Send(&api.Consume{ + Msg: &api.Consume_Src{Src: source}, + }) + if err != nil { + consumeStream.CloseSend() + return err + } + + // Wait for declare-only response + resp, err := consumeStream.Recv() + if err != nil { + consumeStream.CloseSend() + return err + } + + switch r := resp.Resp.(type) { + case *api.ConsumeResponse_DeclareOnlyResponse: + if !r.DeclareOnlyResponse.Success { + consumeStream.CloseSend() + return fmt.Errorf("declare failed: %s", r.DeclareOnlyResponse.Error.Message) + } + case *api.ConsumeResponse_Error: + consumeStream.CloseSend() + return fmt.Errorf("declare error: %s", r.Error.Message) + } + + // Close the consume stream + consumeStream.CloseSend() + + return nil +} + +// publish sends messages +func (p *Publisher) publish() { + // Setup rate limiting if configured + var ticker *time.Ticker + var tickerChan <-chan time.Time + if p.config.PublishRateLimit > 0 { + interval := time.Second / time.Duration(p.config.PublishRateLimit) + ticker = time.NewTicker(interval) + defer ticker.Stop() + tickerChan = ticker.C + } + + for { + // If rate limiting is enabled, wait for ticker + if tickerChan != nil { + select { + case <-p.ctx.Done(): + return + case <-tickerChan: + // Continue to send message + } + } else { + // No rate limiting, check context + select { + case <-p.ctx.Done(): + return + default: + // Continue to send message + } + } + + p.mu.Lock() + if p.stopped { + p.mu.Unlock() + return + } + + // Check if we've reached the message limit + if !p.config.IsContinuous && p.messageCount >= p.config.MessageCount { + p.completed = true + p.mu.Unlock() + Logger.Printf("Publisher %d: reached message limit (%d messages)", p.ID, p.messageCount) + p.Stop() + return + } + + // Queue sources use TOPIC addresses, streams use STREAM addresses + addressType := api.Address_TOPIC + if p.config.SourceType == api.Source_STREAM { + addressType = api.Address_STREAM + } + + // Build message from template or generate default + var msgBody []byte + var msgHeaders map[string]string + if p.messageTemplate != nil { + body, headers := p.messageTemplate.GetBodyAndHeaders() + msgBody = []byte(body) + msgHeaders = headers + } else { + msgBody = []byte(fmt.Sprintf("Load test message %d from publisher %d", p.messageCount, p.ID)) + } + + msg := &api.Message{ + Body: msgBody, + Headers: msgHeaders, + Address: &api.Address{ + Name: p.config.AddressName, + Type: addressType, + Subjects: []string{p.config.Subject}, + }, + Persistent: true, + } + p.mu.Unlock() + + var publishSuccess bool + if p.stream != nil { + // QUEUE path: bidirectional Publish stream + err := p.stream.Send(msg) + if err != nil { + Logger.Printf("Publisher %d: send error at message %d: %v", p.ID, p.messageCount, err) + if status.Code(err) == codes.Canceled || err == io.EOF { + return + } + return + } + resp, err := p.stream.Recv() + if err != nil { + Logger.Printf("Publisher %d: receive error after message %d: %v", p.ID, p.messageCount, err) + if err == io.EOF || status.Code(err) == codes.Canceled { + return + } + return + } + publishSuccess = resp.Success + if !resp.Success { + if resp.Error != nil { + Logger.Printf("Publisher %d: publish failed for message %d: %s", p.ID, p.messageCount, resp.Error.Message) + } else { + Logger.Printf("Publisher %d: publish failed for message %d (no error details)", p.ID, p.messageCount) + } + } + } else { + // STREAM path: unary PublishOne + resp, err := p.conn.ProducerClient.PublishOne(p.ctx, msg) + if err != nil { + Logger.Printf("Publisher %d: PublishOne error at message %d: %v", p.ID, p.messageCount, err) + if status.Code(err) == codes.Canceled { + return + } + return + } + publishSuccess = resp.Success + if !resp.Success { + if resp.Error != nil { + Logger.Printf("Publisher %d: PublishOne failed for message %d: %s", p.ID, p.messageCount, resp.Error.Message) + } else { + Logger.Printf("Publisher %d: PublishOne failed for message %d (no error details)", p.ID, p.messageCount) + } + } + } + _ = publishSuccess + + // Successfully sent and received response - increment counter + p.mu.Lock() + p.messageCount++ + p.conn.metrics.IncrementPublished() + p.mu.Unlock() + } +} + +// Stop stops the publisher +func (p *Publisher) Stop() { + p.mu.Lock() + defer p.mu.Unlock() + + if p.stopped { + return + } + + Logger.Printf("Publisher %d: stopping (sent %d messages)", p.ID, p.messageCount) + + p.stopped = true + if p.stream != nil { + p.stream.CloseSend() + } + p.cancel() +} + +// AddPublishers adds publishers to a connection +func (c *Connection) AddPublishers(config *StreamConfig) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.Type != ConnectionTypePublisher { + return fmt.Errorf("cannot add publishers to consumer connection") + } + + startID := len(c.publishers) + for i := 0; i < config.NumPublisherStreams; i++ { + publisher := NewPublisher(startID+i, c, config) + if err := publisher.Start(); err != nil { + return fmt.Errorf("failed to start publisher %d: %w", i, err) + } + c.publishers = append(c.publishers, publisher) + } + + return nil +} + +// ScalePublishers scales the number of publishers and updates their configuration +func (c *Connection) ScalePublishers(config *StreamConfig) error { + c.mu.Lock() + // Count only active publishers (not completed) + activeCount := 0 + needsRestart := false + for _, p := range c.publishers { + p.mu.Lock() + if !p.completed { + activeCount++ + // Check if config has changed (rate limit, etc.) + if p.config.PublishRateLimit != config.PublishRateLimit { + needsRestart = true + } + } + p.mu.Unlock() + } + totalCount := len(c.publishers) + c.mu.Unlock() + + // If configuration has changed, restart all active publishers + if needsRestart && config.NumPublisherStreams == activeCount { + Logger.Printf("Connection %d: restarting %d publishers with new config (rate limit: %d)", + c.ID, activeCount, config.PublishRateLimit) + + c.mu.Lock() + // Stop all active publishers + for _, p := range c.publishers { + p.mu.Lock() + if !p.completed { + p.mu.Unlock() + p.Stop() + } else { + p.mu.Unlock() + } + } + // Clear the publisher list + c.publishers = nil + c.mu.Unlock() + + // Add new publishers with updated config + return c.AddPublishers(config) + } + + if config.NumPublisherStreams > activeCount { + // Add new publishers (only if not in continuous mode or if we need more active ones) + newConfig := *config + newConfig.NumPublisherStreams = config.NumPublisherStreams - activeCount + return c.AddPublishers(&newConfig) + } else if config.NumPublisherStreams < totalCount { + // Remove publishers from the end (prioritize removing completed ones) + c.mu.Lock() + // First, identify which publishers to remove + toRemove := totalCount - config.NumPublisherStreams + removed := 0 + + // Remove from end, stopping any that are still active + for i := totalCount - 1; i >= 0 && removed < toRemove; i-- { + if c.publishers[i] != nil { + c.publishers[i].Stop() + removed++ + } + } + c.publishers = c.publishers[:config.NumPublisherStreams] + c.mu.Unlock() + } + + return nil +} diff --git a/tools/anarkey/tui.go b/tools/anarkey/tui.go new file mode 100644 index 0000000..5486b07 --- /dev/null +++ b/tools/anarkey/tui.go @@ -0,0 +1,1091 @@ +package main + +import ( + "context" + "fmt" + "strings" + "time" + + tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/lipgloss" + "github.com/sassoftware/arke/api" +) + +// Color styles +var ( + titleStyle = lipgloss.NewStyle(). + Bold(true). + Foreground(lipgloss.Color("#7D56F4")). + Background(lipgloss.Color("#282828")). + Padding(0, 1) + + headerStyle = lipgloss.NewStyle(). + Bold(true). + Foreground(lipgloss.Color("#00D7FF")) + + boxStyle = lipgloss.NewStyle(). + Border(lipgloss.RoundedBorder()). + BorderForeground(lipgloss.Color("#874BFD")). + Padding(0, 1) + + cursorStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#FF5F87")). + Bold(true) + + labelStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#FAFAFA")) + + valueStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#00FF87")). + Bold(true) + + metricLabelStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#00D7FF")) + + metricValueStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#FFD700")). + Bold(true) + + errorStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#FF0000")). + Bold(true) + + subtleStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("#626262")) +) + +// Screen represents different screens in the TUI +type Screen int + +const ( + ScreenArkeConfig Screen = iota + ScreenBrokerConfig + ScreenConnectionConfig + ScreenStreamConfig + ScreenRunning +) + +// Model represents the bubbletea model +type Model struct { + screen Screen + config *Config + streamConfig *StreamConfig + connManager *ConnectionManager + metrics *Metrics + + // UI state + cursor int + editing bool // true when editing a field + inputs map[string]string + err error + width int + height int + + // Running state + ctx context.Context + cancel context.CancelFunc +} + +// tickMsg is sent periodically to update metrics +type tickMsg time.Time + +// errMsg is sent when an error occurs +type errMsg struct{ err error } + +func (e errMsg) Error() string { return e.err.Error() } + +// NewModel creates a new bubbletea model +func NewModel() *Model { + ctx, cancel := context.WithCancel(context.Background()) + metrics := &Metrics{ + lastUpdate: time.Now(), + } + + return &Model{ + screen: ScreenArkeConfig, + config: &Config{ + BrokerProvider: "amqp091", + NumPublisherConnections: 1, + NumConsumerConnections: 1, + }, + streamConfig: &StreamConfig{ + NumPublisherStreams: 1, + NumConsumerStreams: 1, + SourceType: api.Source_QUEUE, + MessageCount: 10000, + }, + metrics: metrics, + inputs: make(map[string]string), + ctx: ctx, + cancel: cancel, + } +} + +// NewModelWithConfig creates a new bubbletea model with pre-loaded configuration +func NewModelWithConfig(config *Config, streamConfig *StreamConfig) *Model { + ctx, cancel := context.WithCancel(context.Background()) + metrics := &Metrics{ + lastUpdate: time.Now(), + } + + model := &Model{ + screen: ScreenRunning, // Start at running screen immediately + config: config, + streamConfig: streamConfig, + metrics: metrics, + inputs: make(map[string]string), + ctx: ctx, + cancel: cancel, + cursor: 0, + editing: false, + } + + // Initialize connection manager and start the load test + Logger.Printf("=== Starting Load Test (from config file) ===") + Logger.Printf("Arke: %s:%d (TLS: %v)", config.ArkeHost, config.ArkePort, config.ArkeTLS) + Logger.Printf("Broker: %s:%d (TLS: %v)", config.BrokerHost, config.BrokerPort, config.BrokerTLS) + Logger.Printf("Publisher Connections: %d, Consumer Connections: %d", + config.NumPublisherConnections, config.NumConsumerConnections) + Logger.Printf("Publisher Streams/Conn: %d, Consumer Streams/Conn: %d", + streamConfig.NumPublisherStreams, streamConfig.NumConsumerStreams) + Logger.Printf("Source: %s (%v), Rate Limit: %d msg/s", + streamConfig.SourceName, streamConfig.SourceType, streamConfig.PublishRateLimit) + + // Create connection manager + model.connManager = NewConnectionManager(config, metrics) + + // Create publisher connections (each creates its own TCP connection to Arke) + for i := 0; i < config.NumPublisherConnections; i++ { + if err := model.connManager.CreateConnection(ctx, ConnectionTypePublisher); err != nil { + Logger.Printf("Error creating publisher connection: %v", err) + model.err = err + } + } + + // Create consumer connections (each creates its own TCP connection to Arke) + for i := 0; i < config.NumConsumerConnections; i++ { + if err := model.connManager.CreateConnection(ctx, ConnectionTypeConsumer); err != nil { + Logger.Printf("Error creating consumer connection: %v", err) + model.err = err + } + } + + // Add streams to each connection + for _, conn := range model.connManager.GetConnections() { + if conn.Type == ConnectionTypePublisher { + if streamConfig.NumPublisherStreams > 0 { + cfg := &StreamConfig{ + NumPublisherStreams: streamConfig.NumPublisherStreams, + SourceType: streamConfig.SourceType, + SourceName: streamConfig.SourceName, + AddressName: streamConfig.AddressName, + Subject: streamConfig.Subject, + MessageCount: streamConfig.MessageCount, + IsContinuous: streamConfig.IsContinuous, + MessageFile: streamConfig.MessageFile, + PublishRateLimit: streamConfig.PublishRateLimit, + } + if err := conn.AddPublishers(cfg); err != nil { + Logger.Printf("Error adding publishers: %v", err) + model.err = err + } + } + } else { + if streamConfig.NumConsumerStreams > 0 { + cfg := &StreamConfig{ + NumConsumerStreams: streamConfig.NumConsumerStreams, + SourceType: streamConfig.SourceType, + SourceName: streamConfig.SourceName, + AddressName: streamConfig.AddressName, + Subject: streamConfig.Subject, + } + if err := conn.AddConsumers(cfg); err != nil { + Logger.Printf("Error adding consumers: %v", err) + model.err = err + } + } + } + } + + return model +} + +// Init initializes the model +func (m *Model) Init() tea.Cmd { + // If we're starting on the Running screen (loaded from config), + // begin the metrics ticker immediately + if m.screen == ScreenRunning { + return tick() + } + return nil +} + +// Update handles messages +func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch msg := msg.(type) { + case tea.KeyMsg: + return m.handleKeyPress(msg) + + case tea.WindowSizeMsg: + m.width = msg.Width + m.height = msg.Height + return m, nil + + case tickMsg: + if m.screen == ScreenRunning && m.metrics != nil { + m.metrics.Update() + return m, tick() + } + return m, nil + + case errMsg: + m.err = msg.err + return m, nil + } + + return m, nil +} + +// handleKeyPress handles keyboard input +func (m *Model) handleKeyPress(msg tea.KeyMsg) (tea.Model, tea.Cmd) { + switch msg.String() { + case "ctrl+c": + if m.connManager != nil { + m.connManager.Close() + } + m.cancel() + return m, tea.Quit + + case "up": + // Only navigate when not editing + if !m.editing && m.cursor > 0 { + m.cursor-- + } + + case "down": + // Only navigate when not editing + if !m.editing { + maxCursor := m.getMaxCursor() + if m.cursor < maxCursor { + m.cursor++ + } + } + + case "tab": + // Jump to Continue/Start button when not editing (only on screens with continue button) + if !m.editing && m.screen != ScreenRunning { + m.cursor = m.getMaxCursor() + } + + case "enter": + return m.handleEnter() + + case "backspace": + return m.handleBackspace() + + case "esc": + // Exit editing mode without saving + if m.editing { + m.editing = false + } + + default: + // Handle text input only when editing + return m.handleTextInput(msg.String()) + } + + return m, nil +} + +// handleEnter handles the enter key +func (m *Model) handleEnter() (tea.Model, tea.Cmd) { + switch m.screen { + case ScreenArkeConfig: + maxCursor := m.getMaxCursor() + if m.cursor == maxCursor { + // Continue button pressed + if m.validateArkeConfig() { + m.screen = ScreenBrokerConfig + m.cursor = 0 + m.editing = false + } + } else { + // Toggle editing mode for the current field + m.editing = !m.editing + } + + case ScreenBrokerConfig: + maxCursor := m.getMaxCursor() + if m.cursor == maxCursor { + // Continue button pressed + if m.validateBrokerConfig() { + m.screen = ScreenConnectionConfig + m.cursor = 0 + m.editing = false + } + } else { + // Toggle editing mode for the current field + m.editing = !m.editing + } + + case ScreenConnectionConfig: + maxCursor := m.getMaxCursor() + if m.cursor == maxCursor { + // Continue button pressed + if m.validateConnectionConfig() { + m.screen = ScreenStreamConfig + m.cursor = 0 + m.editing = false + } + } else { + // Toggle editing mode for the current field + m.editing = !m.editing + } + + case ScreenStreamConfig: + maxCursor := m.getMaxCursor() + if m.cursor == maxCursor { + // Start button pressed + if m.validateStreamConfig() { + m.editing = false + return m, m.startLoad() + } + } else { + // Toggle editing mode for the current field + m.editing = !m.editing + } + + case ScreenRunning: + // Running screen has no continue button, all cursor positions are editable fields + // Toggle editing mode and apply changes when exiting edit mode + if m.editing { + // Exiting edit mode - apply the change + m.editing = false + return m.handleRunningCommand() + } else { + // Entering edit mode + m.editing = true + } + } + + return m, nil +} + +// handleBackspace handles the backspace key +func (m *Model) handleBackspace() (tea.Model, tea.Cmd) { + if !m.editing { + return m, nil + } + field := m.getCurrentField() + if field != "" { + current := m.inputs[field] + if len(current) > 0 { + m.inputs[field] = current[:len(current)-1] + } + } + return m, nil +} + +// getMaxCursor returns the maximum cursor position for the current screen +func (m *Model) getMaxCursor() int { + switch m.screen { + case ScreenArkeConfig: + return 3 // arke_host, arke_port, arke_tls, + Continue button + case ScreenBrokerConfig: + return 5 // broker_host, broker_port, broker_username, broker_password, broker_tls, + Continue button + case ScreenConnectionConfig: + return 2 // num_publisher_connections, num_consumer_connections, + Continue button + case ScreenStreamConfig: + return 10 // num_publisher_streams, num_consumer_streams, source_type, source_name, address_name, subject, message_count, publish_rate_limit, message_file, save_config_file, + Start button + case ScreenRunning: + return 4 // runtime_publisher_connections, runtime_consumer_connections, runtime_publisher_streams, runtime_consumer_streams, runtime_publish_rate_limit (no continue button) + } + return 0 +} + +// handleTextInput handles text input +func (m *Model) handleTextInput(s string) (tea.Model, tea.Cmd) { + if !m.editing { + return m, nil + } + if len(s) == 1 { + field := m.getCurrentField() + if field != "" { + m.inputs[field] += s + } + } + return m, nil +} + +// getCurrentField returns the current input field based on cursor position +func (m *Model) getCurrentField() string { + switch m.screen { + case ScreenArkeConfig: + fields := []string{"arke_host", "arke_port", "arke_tls"} + if m.cursor < len(fields) { + return fields[m.cursor] + } + case ScreenBrokerConfig: + fields := []string{"broker_host", "broker_port", "broker_username", "broker_password", "broker_tls"} + if m.cursor < len(fields) { + return fields[m.cursor] + } + case ScreenConnectionConfig: + fields := []string{"num_publisher_connections", "num_consumer_connections"} + if m.cursor < len(fields) { + return fields[m.cursor] + } + case ScreenStreamConfig: + fields := []string{"num_publisher_streams", "num_consumer_streams", "source_type", "source_name", "address_name", "subject", "message_count", "publish_rate_limit", "message_file", "save_config_file"} + if m.cursor < len(fields) { + return fields[m.cursor] + } + case ScreenRunning: + fields := []string{"runtime_publisher_connections", "runtime_consumer_connections", "runtime_publisher_streams", "runtime_consumer_streams", "runtime_publish_rate_limit"} + if m.cursor < len(fields) { + return fields[m.cursor] + } + } + return "" +} + +// validateArkeConfig validates the arke configuration +func (m *Model) validateArkeConfig() bool { + m.config.ArkeHost = m.getInput("arke_host", "localhost") + m.config.ArkePort = m.getInputInt("arke_port", 50051) + m.config.ArkeTLS = m.getInput("arke_tls", "false") == "true" + return true +} + +// validateBrokerConfig validates the broker configuration +func (m *Model) validateBrokerConfig() bool { + m.config.BrokerHost = m.getInput("broker_host", "localhost") + m.config.BrokerPort = m.getInputInt("broker_port", 5672) + m.config.BrokerUsername = m.getInput("broker_username", "guest") + m.config.BrokerPassword = m.getInput("broker_password", "guest") + m.config.BrokerTLS = m.getInput("broker_tls", "false") == "true" + return true +} + +// validateConnectionConfig validates the connection configuration +func (m *Model) validateConnectionConfig() bool { + m.config.NumPublisherConnections = m.getInputInt("num_publisher_connections", 1) + m.config.NumConsumerConnections = m.getInputInt("num_consumer_connections", 1) + return true +} + +// validateStreamConfig validates the stream configuration +func (m *Model) validateStreamConfig() bool { + m.streamConfig.NumPublisherStreams = m.getInputInt("num_publisher_streams", 1) + m.streamConfig.NumConsumerStreams = m.getInputInt("num_consumer_streams", 1) + sourceType := m.getInput("source_type", "queue") + if sourceType == "stream" { + m.streamConfig.SourceType = api.Source_STREAM + } else { + m.streamConfig.SourceType = api.Source_QUEUE + } + m.streamConfig.SourceName = m.getInput("source_name", "test-source") + m.streamConfig.AddressName = m.getInput("address_name", "test-address") + m.streamConfig.Subject = m.getInput("subject", "test.routing.key") + msgCount := m.getInputInt("message_count", 10000) + if msgCount == 0 { + m.streamConfig.IsContinuous = true + m.streamConfig.MessageCount = 0 + } else { + m.streamConfig.IsContinuous = false + m.streamConfig.MessageCount = msgCount + } + m.streamConfig.PublishRateLimit = m.getInputInt("publish_rate_limit", 0) + m.streamConfig.MessageFile = m.getInput("message_file", "") + + // Save config if filename provided + saveFile := m.getInput("save_config_file", "") + if saveFile != "" { + if err := SaveConfig(saveFile, m.config, m.streamConfig); err != nil { + Logger.Printf("Failed to save config: %v", err) + m.err = fmt.Errorf("failed to save config: %w", err) + return false + } + Logger.Printf("Configuration saved to %s", saveFile) + } + + return true +} + +// getInput gets an input value or returns a default +func (m *Model) getInput(key, defaultVal string) string { + if val, ok := m.inputs[key]; ok && val != "" { + return val + } + return defaultVal +} + +// getInputInt gets an input value as int or returns a default +func (m *Model) getInputInt(key string, defaultVal int) int { + if val, ok := m.inputs[key]; ok && val != "" { + var result int + fmt.Sscanf(val, "%d", &result) + if result >= 0 { + return result + } + } + return defaultVal +} + +// startLoad starts the load test +func (m *Model) startLoad() tea.Cmd { + return func() tea.Msg { + Logger.Printf("=== Starting Load Test ===") + Logger.Printf("Arke: %s:%d (TLS: %v)", m.config.ArkeHost, m.config.ArkePort, m.config.ArkeTLS) + Logger.Printf("Broker: %s:%d (TLS: %v)", m.config.BrokerHost, m.config.BrokerPort, m.config.BrokerTLS) + Logger.Printf("Publisher Connections: %d, Consumer Connections: %d", + m.config.NumPublisherConnections, m.config.NumConsumerConnections) + Logger.Printf("Publisher Streams/Conn: %d, Consumer Streams/Conn: %d", + m.streamConfig.NumPublisherStreams, m.streamConfig.NumConsumerStreams) + Logger.Printf("Source: %s (%v), Rate Limit: %d msg/s", + m.streamConfig.SourceName, m.streamConfig.SourceType, m.streamConfig.PublishRateLimit) + + // Create connection manager + m.connManager = NewConnectionManager(m.config, m.metrics) + + // Create publisher connections (each creates its own TCP connection to Arke) + for i := 0; i < m.config.NumPublisherConnections; i++ { + if err := m.connManager.CreateConnection(m.ctx, ConnectionTypePublisher); err != nil { + return errMsg{err} + } + } + + // Create consumer connections (each creates its own TCP connection to Arke) + for i := 0; i < m.config.NumConsumerConnections; i++ { + if err := m.connManager.CreateConnection(m.ctx, ConnectionTypeConsumer); err != nil { + return errMsg{err} + } + } + + // Add streams to each connection + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypePublisher { + if m.streamConfig.NumPublisherStreams > 0 { + config := &StreamConfig{ + NumPublisherStreams: m.streamConfig.NumPublisherStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, + Subject: m.streamConfig.Subject, + MessageCount: m.streamConfig.MessageCount, + IsContinuous: m.streamConfig.IsContinuous, + MessageFile: m.streamConfig.MessageFile, + } + if err := conn.AddPublishers(config); err != nil { + return errMsg{err} + } + } + } else { + if m.streamConfig.NumConsumerStreams > 0 { + config := &StreamConfig{ + NumConsumerStreams: m.streamConfig.NumConsumerStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, + Subject: m.streamConfig.Subject, + } + if err := conn.AddConsumers(config); err != nil { + return errMsg{err} + } + } + } + } + + m.screen = ScreenRunning + m.cursor = 0 // Reset cursor for running screen + m.editing = false + return tick()() + } +} + +// handleRunningCommand handles commands in running mode +func (m *Model) handleRunningCommand() (tea.Model, tea.Cmd) { + field := m.getCurrentField() + if field == "" { + return m, nil + } + + switch field { + case "runtime_publisher_connections": + numConns := m.getInputInt("runtime_publisher_connections", m.config.NumPublisherConnections) + if numConns != m.config.NumPublisherConnections { + oldCount := m.config.NumPublisherConnections + m.config.NumPublisherConnections = numConns + if err := m.connManager.ScaleConnections(m.ctx, numConns, ConnectionTypePublisher); err != nil { + m.err = err + } else { + // If we added connections, add streams to them + if numConns > oldCount && m.streamConfig.NumPublisherStreams > 0 { + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypePublisher && len(conn.publishers) == 0 { + config := &StreamConfig{ + NumPublisherStreams: m.streamConfig.NumPublisherStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, + Subject: m.streamConfig.Subject, + MessageCount: m.streamConfig.MessageCount, + IsContinuous: m.streamConfig.IsContinuous, + PublishRateLimit: m.streamConfig.PublishRateLimit, + MessageFile: m.streamConfig.MessageFile, + } + if err := conn.AddPublishers(config); err != nil { + m.err = err + } + } + } + } + } + m.inputs["runtime_publisher_connections"] = "" + } + + case "runtime_consumer_connections": + numConns := m.getInputInt("runtime_consumer_connections", m.config.NumConsumerConnections) + if numConns != m.config.NumConsumerConnections { + oldCount := m.config.NumConsumerConnections + m.config.NumConsumerConnections = numConns + if err := m.connManager.ScaleConnections(m.ctx, numConns, ConnectionTypeConsumer); err != nil { + m.err = err + } else { + // If we added connections, add streams to them + if numConns > oldCount && m.streamConfig.NumConsumerStreams > 0 { + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypeConsumer && len(conn.consumers) == 0 { + config := &StreamConfig{ + NumConsumerStreams: m.streamConfig.NumConsumerStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, + Subject: m.streamConfig.Subject, + } + if err := conn.AddConsumers(config); err != nil { + m.err = err + } + } + } + } + } + m.inputs["runtime_consumer_connections"] = "" + } + + case "runtime_publisher_streams": + numStreams := m.getInputInt("runtime_publisher_streams", m.streamConfig.NumPublisherStreams) + if numStreams != m.streamConfig.NumPublisherStreams { + m.streamConfig.NumPublisherStreams = numStreams + // Scale all publisher connections + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypePublisher { + config := &StreamConfig{ + NumPublisherStreams: numStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, + Subject: m.streamConfig.Subject, + MessageCount: m.streamConfig.MessageCount, + IsContinuous: m.streamConfig.IsContinuous, + PublishRateLimit: m.streamConfig.PublishRateLimit, + MessageFile: m.streamConfig.MessageFile, + } + if err := conn.ScalePublishers(config); err != nil { + m.err = err + } + } + } + m.inputs["runtime_publisher_streams"] = "" + } + + case "runtime_consumer_streams": + numStreams := m.getInputInt("runtime_consumer_streams", m.streamConfig.NumConsumerStreams) + if numStreams != m.streamConfig.NumConsumerStreams { + m.streamConfig.NumConsumerStreams = numStreams + // Scale all consumer connections + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypeConsumer { + config := &StreamConfig{ + NumConsumerStreams: numStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, Subject: m.streamConfig.Subject} + if err := conn.ScaleConsumers(config); err != nil { + m.err = err + } + } + } + m.inputs["runtime_consumer_streams"] = "" + } + + case "runtime_publish_rate_limit": + rateLimit := m.getInputInt("runtime_publish_rate_limit", m.streamConfig.PublishRateLimit) + if rateLimit != m.streamConfig.PublishRateLimit { + m.streamConfig.PublishRateLimit = rateLimit + // Restart all publishers with new rate limit + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypePublisher { + config := &StreamConfig{ + NumPublisherStreams: m.streamConfig.NumPublisherStreams, + SourceType: m.streamConfig.SourceType, + SourceName: m.streamConfig.SourceName, + AddressName: m.streamConfig.AddressName, + Subject: m.streamConfig.Subject, + MessageCount: m.streamConfig.MessageCount, + IsContinuous: m.streamConfig.IsContinuous, + PublishRateLimit: rateLimit, + MessageFile: m.streamConfig.MessageFile, + } + if err := conn.ScalePublishers(config); err != nil { + m.err = err + } + } + } + m.inputs["runtime_publish_rate_limit"] = "" + } + } + + return m, nil +} + +// tick returns a command that sends a tick message +func tick() tea.Cmd { + return tea.Tick(time.Second, func(t time.Time) tea.Msg { + return tickMsg(t) + }) +} + +// View renders the UI +func (m *Model) View() string { + switch m.screen { + case ScreenArkeConfig: + return m.renderArkeConfig() + case ScreenBrokerConfig: + return m.renderBrokerConfig() + case ScreenConnectionConfig: + return m.renderConnectionConfig() + case ScreenStreamConfig: + return m.renderStreamConfig() + case ScreenRunning: + return m.renderRunning() + } + return "" +} + +// renderArkeConfig renders the arke configuration screen +func (m *Model) renderArkeConfig() string { + var b strings.Builder + title := titleStyle.Render("⚡ Arke Load Tool - Arke Server") + b.WriteString(title + "\n\n") + + type field struct { + name string + key string + prompt string + defaultVal string + } + + fields := []field{ + {"Arke Hostname", "arke_host", "Arke hostname", "localhost"}, + {"Arke Port", "arke_port", "Arke port", "50051"}, + {"Arke TLS", "arke_tls", "TLS (true/false)", "false"}, + } + + for i, f := range fields { + var cursor string + if m.cursor == i { + if m.editing { + cursor = cursorStyle.Render("✎ ") + } else { + cursor = cursorStyle.Render("❯ ") + } + } else { + cursor = " " + } + value := m.getInput(f.key, f.defaultVal) + label := labelStyle.Render(f.prompt + ":") + val := valueStyle.Render(value) + b.WriteString(fmt.Sprintf("%s%s %s\n", cursor, label, val)) + } + + // Add Continue button + b.WriteString("\n") + var continueButton string + if m.cursor == len(fields) { + continueButton = cursorStyle.Render("❯ ") + valueStyle.Render("[Continue]") + } else { + continueButton = " " + labelStyle.Render("[Continue]") + } + b.WriteString(continueButton + "\n") + + if m.err != nil { + b.WriteString("\n" + errorStyle.Render(fmt.Sprintf("Error: %v", m.err)) + "\n") + } + + b.WriteString("\n" + subtleStyle.Render("↑/↓: Navigate • Tab: Jump to Continue • Enter: Edit/Select • Esc: Cancel edit • Ctrl+C: Quit") + "\n") + return b.String() +} + +// renderBrokerConfig renders the broker configuration screen +func (m *Model) renderBrokerConfig() string { + var b strings.Builder + title := titleStyle.Render("🔌 Arke Load Tool - Broker Config") + b.WriteString(title + "\n\n") + + type field struct { + name string + key string + prompt string + defaultVal string + } + + fields := []field{ + {"Broker Hostname", "broker_host", "Broker hostname", "localhost"}, + {"Broker Port", "broker_port", "Broker port", "5672"}, + {"Username", "broker_username", "Username", "guest"}, + {"Password", "broker_password", "Password", "guest"}, + {"Broker TLS", "broker_tls", "TLS (true/false)", "false"}, + } + + for i, f := range fields { + var cursor string + if m.cursor == i { + if m.editing { + cursor = cursorStyle.Render("✎ ") + } else { + cursor = cursorStyle.Render("❯ ") + } + } else { + cursor = " " + } + value := m.getInput(f.key, f.defaultVal) + // Mask password + if f.key == "broker_password" && value != "" { + value = strings.Repeat("•", len(value)) + } + label := labelStyle.Render(f.prompt + ":") + val := valueStyle.Render(value) + b.WriteString(fmt.Sprintf("%s%s %s\n", cursor, label, val)) + } + + // Add Continue button + b.WriteString("\n") + var continueButton string + if m.cursor == len(fields) { + continueButton = cursorStyle.Render("❯ ") + valueStyle.Render("[Continue]") + } else { + continueButton = " " + labelStyle.Render("[Continue]") + } + b.WriteString(continueButton + "\n") + + if m.err != nil { + b.WriteString("\n" + errorStyle.Render(fmt.Sprintf("Error: %v", m.err)) + "\n") + } + + b.WriteString("\n" + subtleStyle.Render("↑/↓: Navigate • Tab: Jump to Continue • Enter: Edit/Select • Esc: Cancel edit • Ctrl+C: Quit") + "\n") + return b.String() +} + +// renderConnectionConfig renders the connection configuration screen +func (m *Model) renderConnectionConfig() string { + var b strings.Builder + title := titleStyle.Render("🔗 Arke Load Tool - Connection Config") + b.WriteString(title + "\n\n") + + type field struct { + name string + key string + prompt string + defaultVal string + } + + fields := []field{ + {"Publisher Connections", "num_publisher_connections", "Number of publisher connections", "1"}, + {"Consumer Connections", "num_consumer_connections", "Number of consumer connections", "1"}, + } + + for i, f := range fields { + cursor := " " + if m.cursor == i { + if m.editing { + cursor = cursorStyle.Render("✎ ") + } else { + cursor = cursorStyle.Render("❯ ") + } + } else { + cursor = " " + } + value := m.getInput(f.key, f.defaultVal) + label := labelStyle.Render(f.prompt + ":") + val := valueStyle.Render(value) + b.WriteString(fmt.Sprintf("%s%s %s\n", cursor, label, val)) + } + + // Add Continue button + b.WriteString("\n") + var continueButton string + if m.cursor == len(fields) { + continueButton = cursorStyle.Render("❯ ") + valueStyle.Render("[Continue]") + } else { + continueButton = " " + labelStyle.Render("[Continue]") + } + b.WriteString(continueButton + "\n") + + if m.err != nil { + b.WriteString("\n" + errorStyle.Render(fmt.Sprintf("Error: %v", m.err)) + "\n") + } + + b.WriteString("\n" + subtleStyle.Render("↑/↓: Navigate • Tab: Jump to Continue • Enter: Edit/Select • Esc: Cancel edit • Ctrl+C: Quit") + "\n") + return b.String() +} + +// renderStreamConfig renders the stream configuration screen +func (m *Model) renderStreamConfig() string { + var b strings.Builder + title := titleStyle.Render("📊 Arke Load Tool - Stream Config") + b.WriteString(title + "\n\n") + + type field struct { + name string + key string + prompt string + defaultVal string + } + + fields := []field{ + {"Publisher Streams", "num_publisher_streams", "Number of publisher streams per connection", "1"}, + {"Consumer Streams", "num_consumer_streams", "Number of consumer streams per connection", "1"}, + {"Source Type", "source_type", "Source type (queue/stream)", "queue"}, + {"Source Name", "source_name", "Source name", "test-source"}, + {"Address Name", "address_name", "Address name", "test-address"}, + {"Subject", "subject", "Routing key/subject", "test.routing.key"}, + {"Message Count", "message_count", "Messages per stream (0=continuous)", "10000"}, + {"Publish Rate Limit", "publish_rate_limit", "Messages/sec per stream (0=unlimited)", "0"}, + {"Message File", "message_file", "JSON message file (optional)", ""}, + {"Save Config File", "save_config_file", "Save config to file (optional)", ""}, + } + + for i, f := range fields { + cursor := " " + if m.cursor == i { + if m.editing { + cursor = cursorStyle.Render("✎ ") + } else { + cursor = cursorStyle.Render("❯ ") + } + } else { + cursor = " " + } + value := m.getInput(f.key, f.defaultVal) + label := labelStyle.Render(f.prompt + ":") + val := valueStyle.Render(value) + b.WriteString(fmt.Sprintf("%s%s %s\n", cursor, label, val)) + } + + // Add Start button + b.WriteString("\n") + var startButton string + if m.cursor == len(fields) { + startButton = cursorStyle.Render("❯ ") + valueStyle.Render("[Start Load Test]") + } else { + startButton = " " + labelStyle.Render("[Start Load Test]") + } + b.WriteString(startButton + "\n") + + if m.err != nil { + b.WriteString("\n" + errorStyle.Render(fmt.Sprintf("Error: %v", m.err)) + "\n") + } + + b.WriteString("\n" + subtleStyle.Render("↑/↓: Navigate • Tab: Jump to Start • Enter: Edit/Start • Esc: Cancel edit • Ctrl+C: Quit") + "\n") + return b.String() +} + +// renderRunning renders the running screen with metrics +func (m *Model) renderRunning() string { + var b strings.Builder + title := titleStyle.Render("🚀 Arke Load Tool - Running") + b.WriteString(title + "\n\n") + + pub, con, pubRate, conRate := m.metrics.GetStats() + + // Count connections by type + var publisherConns, consumerConns int + for _, conn := range m.connManager.GetConnections() { + if conn.Type == ConnectionTypePublisher { + publisherConns++ + } else { + consumerConns++ + } + } + + // Connection info + pubConnLabel := metricLabelStyle.Render("Publisher Connections:") + pubConnValue := metricValueStyle.Render(fmt.Sprintf("%d", publisherConns)) + pubStreamValue := valueStyle.Render(fmt.Sprintf("(Streams per: %d)", m.streamConfig.NumPublisherStreams)) + b.WriteString(fmt.Sprintf("%s %s %s\n", pubConnLabel, pubConnValue, pubStreamValue)) + + conConnLabel := metricLabelStyle.Render("Consumer Connections: ") + conConnValue := metricValueStyle.Render(fmt.Sprintf("%d", consumerConns)) + conStreamValue := valueStyle.Render(fmt.Sprintf("(Streams per: %d)", m.streamConfig.NumConsumerStreams)) + b.WriteString(fmt.Sprintf("%s %s %s\n\n", conConnLabel, conConnValue, conStreamValue)) + + // Metrics box + metricsHeader := headerStyle.Render("📈 Metrics") + b.WriteString(metricsHeader + "\n") + b.WriteString(strings.Repeat("─", 40) + "\n") + + pubLabel := metricLabelStyle.Render("Published: ") + pubValue := metricValueStyle.Render(fmt.Sprintf("%d messages", pub)) + pubRateValue := valueStyle.Render(fmt.Sprintf("(%.2f msg/s)", pubRate)) + b.WriteString(fmt.Sprintf("%s%s %s\n", pubLabel, pubValue, pubRateValue)) + + conLabel := metricLabelStyle.Render("Consumed: ") + conValue := metricValueStyle.Render(fmt.Sprintf("%d messages", con)) + conRateValue := valueStyle.Render(fmt.Sprintf("(%.2f msg/s)", conRate)) + b.WriteString(fmt.Sprintf("%s%s %s\n\n", conLabel, conValue, conRateValue)) + + // Runtime controls + controlsHeader := headerStyle.Render("⚙️ Runtime Controls") + b.WriteString(controlsHeader + "\n") + b.WriteString(strings.Repeat("─", 40) + "\n") + + type field struct { + name string + key string + prompt string + defaultVal string + } + + fields := []field{ + {"Publisher Connections", "runtime_publisher_connections", "Publisher connections", fmt.Sprintf("%d", publisherConns)}, + {"Consumer Connections", "runtime_consumer_connections", "Consumer connections", fmt.Sprintf("%d", consumerConns)}, + {"Publisher Streams", "runtime_publisher_streams", "Publisher streams/conn", fmt.Sprintf("%d", m.streamConfig.NumPublisherStreams)}, + {"Consumer Streams", "runtime_consumer_streams", "Consumer streams/conn", fmt.Sprintf("%d", m.streamConfig.NumConsumerStreams)}, + {"Publish Rate Limit", "runtime_publish_rate_limit", "Msgs/sec per stream (0=unlimited)", fmt.Sprintf("%d", m.streamConfig.PublishRateLimit)}, + } + + for i, f := range fields { + cursor := " " + if m.cursor == i { + if m.editing { + cursor = cursorStyle.Render("✎ ") + } else { + cursor = cursorStyle.Render("❯ ") + } + } else { + cursor = " " + } + value := m.getInput(f.key, f.defaultVal) + label := labelStyle.Render(f.prompt + ":") + val := valueStyle.Render(value) + b.WriteString(fmt.Sprintf("%s%s %s\n", cursor, label, val)) + } + + if m.err != nil { + b.WriteString("\n" + errorStyle.Render(fmt.Sprintf("Error: %v", m.err)) + "\n") + } + + b.WriteString("\n" + subtleStyle.Render("↑/↓: Navigate • Enter: Edit/Apply • Esc: Cancel edit • Ctrl+C: Quit") + "\n") + return b.String() +} diff --git a/tools/anarkey/types.go b/tools/anarkey/types.go new file mode 100644 index 0000000..d6a1f5e --- /dev/null +++ b/tools/anarkey/types.go @@ -0,0 +1,230 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/sassoftware/arke/api" +) + +// Config holds the configuration for the arke server and broker +type Config struct { + // Arke server settings + ArkeHost string `json:"arke_host"` + ArkePort int `json:"arke_port"` + ArkeTLS bool `json:"arke_tls"` + + // Broker settings + BrokerHost string `json:"broker_host"` + BrokerPort int `json:"broker_port"` + BrokerUsername string `json:"broker_username"` + BrokerPassword string `json:"broker_password"` + BrokerTLS bool `json:"broker_tls"` + BrokerProvider string `json:"broker_provider"` + + // Connection settings + NumPublisherConnections int `json:"num_publisher_connections"` + NumConsumerConnections int `json:"num_consumer_connections"` +} + +// ConnectionType represents whether connection is Publisher or Consumer +type ConnectionType int + +const ( + ConnectionTypePublisher ConnectionType = iota + ConnectionTypeConsumer +) + +// StreamConfig holds the configuration for a stream +type StreamConfig struct { + NumPublisherStreams int `json:"num_publisher_streams"` + NumConsumerStreams int `json:"num_consumer_streams"` + SourceType api.Source_TargetType `json:"source_type"` + SourceName string `json:"source_name"` + AddressName string `json:"address_name"` + Subject string `json:"subject"` + MessageCount int `json:"message_count"` // 0 means continuous + IsContinuous bool `json:"is_continuous"` + PublishRateLimit int `json:"publish_rate_limit"` // messages per second per stream, 0 means no limit + MessageFile string `json:"message_file"` // path to JSON file containing message template +} + +// SavedConfig combines Config and StreamConfig for saving/loading +type SavedConfig struct { + Config *Config `json:"config"` + StreamConfig *StreamConfig `json:"stream_config"` +} + +// SaveConfig saves the configuration to a JSON file +func SaveConfig(filename string, config *Config, streamConfig *StreamConfig) error { + savedConfig := &SavedConfig{ + Config: config, + StreamConfig: streamConfig, + } + + data, err := json.MarshalIndent(savedConfig, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal config: %w", err) + } + + if err := os.WriteFile(filename, data, 0644); err != nil { + return fmt.Errorf("failed to write config file: %w", err) + } + + return nil +} + +// LoadConfig loads the configuration from a JSON file +func LoadConfig(filename string) (*Config, *StreamConfig, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, nil, fmt.Errorf("failed to read config file: %w", err) + } + + var savedConfig SavedConfig + if err := json.Unmarshal(data, &savedConfig); err != nil { + return nil, nil, fmt.Errorf("failed to parse config JSON: %w", err) + } + + return savedConfig.Config, savedConfig.StreamConfig, nil +} + +// Metrics tracks the published and consumed messages +type Metrics struct { + mu sync.RWMutex + + PublishedCount int64 + ConsumedCount int64 + PublishRate float64 + ConsumeRate float64 + lastPublished int64 + lastConsumed int64 + lastUpdate time.Time +} + +// Update calculates the current rates +func (m *Metrics) Update() { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now() + elapsed := now.Sub(m.lastUpdate).Seconds() + if elapsed == 0 { + return + } + + publishedDiff := m.PublishedCount - m.lastPublished + consumedDiff := m.ConsumedCount - m.lastConsumed + + m.PublishRate = float64(publishedDiff) / elapsed + m.ConsumeRate = float64(consumedDiff) / elapsed + + m.lastPublished = m.PublishedCount + m.lastConsumed = m.ConsumedCount + m.lastUpdate = now +} + +// IncrementPublished increments the published count +func (m *Metrics) IncrementPublished() { + m.mu.Lock() + m.PublishedCount++ + m.mu.Unlock() +} + +// IncrementConsumed increments the consumed count +func (m *Metrics) IncrementConsumed() { + m.mu.Lock() + m.ConsumedCount++ + m.mu.Unlock() +} + +// GetStats returns the current stats +func (m *Metrics) GetStats() (published, consumed int64, pubRate, conRate float64) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.PublishedCount, m.ConsumedCount, m.PublishRate, m.ConsumeRate +} + +// MessageTemplate represents a message template loaded from a JSON file +type MessageTemplate struct { + Headers map[string]string `json:"headers"` + Body string `json:"body"` + + // Optimized fields for @id@ replacement (parsed once during load) + hasUUIDPlaceholder bool // true if body or headers contain @id@ + bodyParts []string // split body around @id@ for fast assembly + headerParts map[string][]string // split header values around @id@ for fast assembly + headersWithUUID []string // list of header keys that contain @id@ +} + +// LoadMessageFromFile loads a message template from a JSON file +func LoadMessageFromFile(filePath string) (*MessageTemplate, error) { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read message file: %w", err) + } + + var msg MessageTemplate + if err := json.Unmarshal(data, &msg); err != nil { + return nil, fmt.Errorf("failed to parse message JSON: %w", err) + } + + // Check for @id@ placeholder in body and parse once + if strings.Contains(msg.Body, "@id@") { + msg.hasUUIDPlaceholder = true + msg.bodyParts = strings.Split(msg.Body, "@id@") + } + + // Check for @id@ placeholder in headers and parse once + msg.headerParts = make(map[string][]string) + for key, value := range msg.Headers { + if strings.Contains(value, "@id@") { + msg.hasUUIDPlaceholder = true + msg.headerParts[key] = strings.Split(value, "@id@") + msg.headersWithUUID = append(msg.headersWithUUID, key) + } + } + + return &msg, nil +} + +// GetBodyAndHeaders returns the message body and headers, replacing @id@ with a UUID if present +// This method generates one UUID and uses it for all replacements in both body and headers +func (mt *MessageTemplate) GetBodyAndHeaders() (string, map[string]string) { + if !mt.hasUUIDPlaceholder { + return mt.Body, mt.Headers + } + + // Generate one UUID for this message (used for all @id@ occurrences) + id := uuid.New().String() + + // Assemble body from pre-parsed parts + body := mt.Body + if mt.bodyParts != nil { + body = strings.Join(mt.bodyParts, id) + } + + // Assemble headers - copy base headers and replace UUID placeholders + headers := make(map[string]string, len(mt.Headers)) + for key, value := range mt.Headers { + if parts, hasUUID := mt.headerParts[key]; hasUUID { + headers[key] = strings.Join(parts, id) + } else { + headers[key] = value + } + } + + return body, headers +} + +// ConnectionState represents the state of a connection +type ConnectionState struct { + ID int + Active bool + Type ConnectionType +}