diff --git a/.github/workflows/weekly-docker-build.yaml b/.github/workflows/weekly-docker-build.yaml new file mode 100644 index 0000000..2cc5cde --- /dev/null +++ b/.github/workflows/weekly-docker-build.yaml @@ -0,0 +1,123 @@ +name: Weekly Docker Build + +# Rebuilds Docker images from main every Monday at 00:00 UTC so the +# Chainguard base image picks up the latest OS security patches even +# when no code changes have been pushed that week. +# +# Can also be triggered manually via the Actions UI. + +on: + schedule: + - cron: '0 0 * * 1' # Monday 00:00 UTC + workflow_dispatch: # Manual trigger + +env: + IMAGE: cbaugus/rust_loadtest + +jobs: + weekly-build: + name: Build and push weekly Docker images + runs-on: ubuntu-latest + # Only run on main branch (schedule always targets default branch, + # but guard against accidental workflow_dispatch on a feature branch). + if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch' + + steps: + - name: Checkout main branch + uses: actions/checkout@v4 + with: + ref: main + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Generate date and week tags + id: tags + run: | + echo "date=$(date +%Y-%m-%d)" >> $GITHUB_OUTPUT + echo "week=$(date +weekly-%Y-%U)" >> $GITHUB_OUTPUT + + # ── Standard (Ubuntu) image ───────────────────────────────────────────── + - name: Build and push standard image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile + push: true + tags: | + ${{ env.IMAGE }}:latest + ${{ env.IMAGE }}:${{ steps.tags.outputs.date }} + ${{ env.IMAGE }}:${{ steps.tags.outputs.week }} + cache-from: type=registry,ref=${{ env.IMAGE }}:buildcache + cache-to: type=registry,ref=${{ env.IMAGE }}:buildcache,mode=max + + # ── Chainguard (distroless) image ──────────────────────────────────────── + - name: Build and push Chainguard image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile.chainguard + push: true + tags: | + ${{ env.IMAGE }}:latest-Chainguard + ${{ env.IMAGE }}:${{ steps.tags.outputs.date }}-Chainguard + ${{ env.IMAGE }}:${{ steps.tags.outputs.week }}-Chainguard + cache-from: type=registry,ref=${{ env.IMAGE }}:buildcache-chainguard + cache-to: type=registry,ref=${{ env.IMAGE }}:buildcache-chainguard,mode=max + + # ── SBOMs ──────────────────────────────────────────────────────────────── + - name: Install Syft + run: curl -sSfL https://raw.githubusercontent.com/anchore/syft/main/install.sh | sh -s -- -b /usr/local/bin + + - name: Generate SBOM — standard image + run: | + syft "docker:${{ env.IMAGE }}:${{ steps.tags.outputs.date }}" \ + -o cyclonedx-json \ + > sbom-standard-${{ steps.tags.outputs.date }}.json + + - name: Generate SBOM — Chainguard image + run: | + syft "docker:${{ env.IMAGE }}:${{ steps.tags.outputs.date }}-Chainguard" \ + -o cyclonedx-json \ + > sbom-chainguard-${{ steps.tags.outputs.date }}.json + + # ── Vulnerability scan ─────────────────────────────────────────────────── + - name: Scan standard image (Trivy) + uses: aquasecurity/trivy-action@master + with: + image-ref: ${{ env.IMAGE }}:${{ steps.tags.outputs.date }} + format: sarif + output: trivy-standard.sarif + severity: CRITICAL,HIGH + continue-on-error: true # don't fail the build; results go to Security tab + + - name: Scan Chainguard image (Trivy) + uses: aquasecurity/trivy-action@master + with: + image-ref: ${{ env.IMAGE }}:${{ steps.tags.outputs.date }}-Chainguard + format: sarif + output: trivy-chainguard.sarif + severity: CRITICAL,HIGH + continue-on-error: true + + - name: Upload Trivy results to GitHub Security + uses: github/codeql-action/upload-sarif@v3 + with: + sarif_file: trivy-standard.sarif + continue-on-error: true + + # ── Upload artifacts ───────────────────────────────────────────────────── + - name: Upload SBOMs + uses: actions/upload-artifact@v4 + with: + name: sbom-${{ steps.tags.outputs.date }} + path: | + sbom-standard-${{ steps.tags.outputs.date }}.json + sbom-chainguard-${{ steps.tags.outputs.date }}.json + retention-days: 90 diff --git a/.gitignore b/.gitignore index ccef2d1..f356e30 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ Thumbs.db # Business documents (keep local only) PRODUCT_DESIGN.md BUSINESS_PLAN.md +Product_Readness_Review.md # Local phase/project status notes phase3_status.md @@ -23,3 +24,6 @@ next_steps.md # Phase 3 web app design (separate repo — local planning only) phase3_web_app/ + +# Local/personal Nomad job files (environment-specific, not examples) +nomad/loadtest.nomad.hcl diff --git a/ACCEPTABLE_USE.md b/ACCEPTABLE_USE.md new file mode 100644 index 0000000..1e32c13 --- /dev/null +++ b/ACCEPTABLE_USE.md @@ -0,0 +1,54 @@ +# Acceptable Use Policy + +## rust_loadtest + +This software is a load testing tool designed to help engineers measure and +understand the performance of HTTP services they own or are authorized to test. + +--- + +## Permitted Use + +You may use rust_loadtest to test: + +- Systems you own +- Systems operated by your employer, where your role includes performance testing +- Systems for which you have explicit written permission from the owner to conduct + load or stress testing + +--- + +## Prohibited Use + +You may **not** use rust_loadtest to: + +- Send traffic to any system without the explicit permission of its owner +- Conduct denial-of-service (DoS) or distributed denial-of-service (DDoS) attacks +- Disrupt, degrade, or interfere with the availability of any service, network, + or infrastructure +- Circumvent rate limits, access controls, or security measures on systems you + do not own +- Test third-party services, APIs, or websites without written authorization, + regardless of whether those services are publicly accessible + +--- + +## Your Responsibility + +By running this software you accept full responsibility for: + +- Ensuring you have authorization to generate load against the target system +- Any damage, disruption, or legal consequence resulting from your use +- Compliance with all applicable laws and regulations in your jurisdiction, + including the Computer Fraud and Abuse Act (CFAA), the Computer Misuse Act + (CMA), and equivalent statutes + +The authors and contributors of rust_loadtest bear no liability for any harm +caused by unauthorized or malicious use of this software. + +--- + +## Reporting Misuse + +If you observe rust_loadtest being used to attack or disrupt systems, please +report it to: **cbaugus@users.noreply.github.com** diff --git a/DOCKER_HUB_OVERVIEW.md b/DOCKER_HUB_OVERVIEW.md index a7df8ed..96aa6c9 100644 --- a/DOCKER_HUB_OVERVIEW.md +++ b/DOCKER_HUB_OVERVIEW.md @@ -217,6 +217,67 @@ Send JSON data with POST requests: -e JSON_PAYLOAD='{"key":"value","nested":{"data":"here"}}' ``` +### Large Payload / Upload Testing (`bodySize`) + +Stress-test upload endpoints or request-body parsing by sending a synthetic payload of a specified size on every request — no need to inline large strings in your config. + +Supported units: `B`, `KB`, `MB` + +```yaml +scenarios: + - name: "Upload stress test" + weight: 100 + steps: + - name: "POST 1MB body" + request: + method: "POST" + path: "/api/upload" + bodySize: "1MB" # generates 1 048 576 random bytes per request + headers: + Content-Type: "application/octet-stream" + assertions: + - type: statusCode + expected: 200 + - type: responseTime + max: "5s" +``` + +`bodySize` and `body` are mutually exclusive — use one or the other per step. + +**Combined with JWT auth (multi-step):** +```yaml +scenarios: + - name: "Auth then upload" + weight: 100 + steps: + - name: "Login" + request: + method: "POST" + path: "/auth/login" + body: '{"username":"loadtest","password":"secret"}' + headers: + Content-Type: "application/json" + extract: + - type: jsonPath + name: "jwt_token" + jsonPath: "$.token" + assertions: + - type: statusCode + expected: 200 + + - name: "Upload 512KB" + request: + method: "POST" + path: "/api/upload" + bodySize: "512KB" + headers: + Authorization: "Bearer ${jwt_token}" + Content-Type: "application/octet-stream" + assertions: + - type: statusCode + expected: 200 +``` + ## Live Control API (port 8080) Every node exposes a lightweight HTTP API for real-time inspection and reconfiguration. diff --git a/examples/configs/README.md b/examples/configs/README.md index 5147f9a..59be772 100644 --- a/examples/configs/README.md +++ b/examples/configs/README.md @@ -263,7 +263,39 @@ rust-loadtest --config graphql-api.yaml --- -### 8. Spike Test (`spike-test.yaml`) +### 8. Large Payload Test (`large-payload-test.yaml`) + +**Purpose**: Stress-test endpoints that accept large request bodies (uploads, multipart, binary ingestion) + +**Use Cases**: +- Upload endpoint capacity testing +- Request-body parser stress testing +- Memory pressure under large payloads +- Bandwidth / throughput validation + +**Key Features**: +- `bodySize` field generates synthetic random data per request — no YAML bloat +- Supports `B`, `KB`, `MB` units +- Two scenarios: raw upload and JWT-authenticated upload +- `bodySize` and `body` are mutually exclusive per step + +**Quick Start**: +```bash +# Edit baseUrl, then push config to a running node +curl -X POST http://:8080/config \ + -H "Content-Type: application/x-yaml" \ + --data-binary @large-payload-test.yaml +``` + +**Customize**: +- `bodySize`: Change size (`"128KB"`, `"512KB"`, `"1MB"`) +- `path`: Point to your upload endpoint +- `target`: Adjust RPS (start low — large bodies use significant bandwidth) +- Remove the auth scenario if your endpoint is unauthenticated + +--- + +### 9. Spike Test (`spike-test.yaml`) **Purpose**: Sudden traffic spike test for resilience validation @@ -310,6 +342,7 @@ rust-loadtest --config spike-test.yaml | Authenticated | Medium | 20m | 25 | 75 | Auth flows, token management | | Microservices | High | 30m | 40 | 20-150 | Distributed systems, multiple services | | GraphQL | Medium | 20m | 30 | 80 | GraphQL APIs, complex queries | +| Large Payload | Medium | 10m | 20 | 50 | Upload endpoints, body-parser stress | | Spike Test | High | 30m | 150 | Burst | Resilience, auto-scaling | ## Customization Guide @@ -383,7 +416,19 @@ thinkTime: max: "5s" ``` -#### 6. Add Custom Assertions +#### 6. Send Large Synthetic Payloads +```yaml +steps: + - name: "Upload 1MB" + request: + method: "POST" + path: "/api/upload" + bodySize: "1MB" # B, KB, or MB — mutually exclusive with `body` + headers: + Content-Type: "application/octet-stream" +``` + +#### 7. Add Custom Assertions ```yaml assertions: - statusCode: 200 diff --git a/examples/configs/large-payload-test.yaml b/examples/configs/large-payload-test.yaml new file mode 100644 index 0000000..1bda4b6 --- /dev/null +++ b/examples/configs/large-payload-test.yaml @@ -0,0 +1,90 @@ +# Large Payload / Upload Stress Test +# +# Tests how your endpoint handles large request bodies. +# Uses `bodySize` to generate synthetic random data of the specified size +# on every request — no need to inline large strings in YAML. +# +# Supported units: B, KB, MB +# +# `bodySize` and `body` are mutually exclusive per step. +# +# Quick Start: +# Edit baseUrl below, then POST this config to a running node: +# +# curl -X POST http://:8080/config \ +# -H "Content-Type: application/x-yaml" \ +# --data-binary @large-payload-test.yaml + +version: "1.0" + +metadata: + name: "Large Payload Upload Stress Test" + description: "Stress-test upload endpoints with synthetic large bodies" + +config: + baseUrl: "https://api.example.com" + workers: 20 + duration: "10m" + timeout: "30s" + skipTlsVerify: false + +load: + model: "rps" + target: 50 + +scenarios: + # ── Scenario 1: Raw 1 MB upload (no auth) ─────────────────────────────────── + - name: "1MB upload" + weight: 50 + steps: + - name: "POST 1MB body" + request: + method: "POST" + path: "/api/upload" + bodySize: "1MB" # 1 048 576 random bytes per request + headers: + Content-Type: "application/octet-stream" + assertions: + - type: statusCode + expected: 200 + - type: responseTime + max: "10s" + + # ── Scenario 2: JWT login then 512 KB upload ───────────────────────────────── + - name: "Auth then 512KB upload" + weight: 50 + steps: + - name: "Login — get JWT" + request: + method: "POST" + path: "/auth/login" + body: '{"username":"loadtest","password":"secret"}' + headers: + Content-Type: "application/json" + extract: + - type: jsonPath + name: "jwt_token" + jsonPath: "$.token" + cache: + ttl: "5m" # reuse token for 5 min; re-login on expiry + assertions: + - type: statusCode + expected: 200 + + - name: "Upload 512KB with JWT" + request: + method: "POST" + path: "/api/upload" + bodySize: "512KB" # 524 288 random bytes per request + headers: + Authorization: "Bearer ${jwt_token}" + Content-Type: "application/octet-stream" + assertions: + - type: statusCode + expected: 200 + - type: responseTime + max: "5s" + +standby: + workers: 2 + rps: 0 diff --git a/examples/scenario_example.rs b/examples/scenario_example.rs index 3430a05..5ccdedb 100644 --- a/examples/scenario_example.rs +++ b/examples/scenario_example.rs @@ -94,6 +94,7 @@ fn create_shopping_scenario() -> Scenario { method: "GET".to_string(), path: "/health".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -108,6 +109,7 @@ fn create_shopping_scenario() -> Scenario { method: "GET".to_string(), path: "/products?limit=10".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![ @@ -133,6 +135,7 @@ fn create_shopping_scenario() -> Scenario { // ⭐ Variable substitution: ${product_id} is replaced with extracted value path: "/products/${product_id}".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -157,6 +160,7 @@ fn create_shopping_scenario() -> Scenario { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -187,6 +191,7 @@ fn create_shopping_scenario() -> Scenario { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -212,6 +217,7 @@ fn create_shopping_scenario() -> Scenario { method: "GET".to_string(), path: "/cart".to_string(), body: None, + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert( diff --git a/nomad/config-load-post.yaml b/nomad/config-load-post.yaml new file mode 100644 index 0000000..c82962b --- /dev/null +++ b/nomad/config-load-post.yaml @@ -0,0 +1,65 @@ +# Consul KV test config — dual-host JWT auth flow (Issue #82) +# +# Upload to Consul KV before deploying the cluster: +# +# consul kv put loadtest/config @nomad/consul-kv-config-example.yaml +# +# Demonstrates multi-host steps: step 1 fetches a JWT from the auth service +# (relative path → baseUrl), step 2 calls the API service using a full URL +# override. The extracted jwt_token variable is passed between steps. + +version: "1.0" + +metadata: + name: "Dual-host JWT auth flow" + description: "Fetch JWT from auth service, use it against API service" + +config: + baseUrl: "http://192.168.2.23:31362" + workers: 25 + duration: "75h" + timeout: "30s" + skipTlsVerify: false + +load: + model: "rps" + target: 400 + +scenarios: + - name: "JWT Auth → API call" + weight: 100 + steps: + - name: "Login — get JWT" + request: + method: "POST" + path: "/auth/login" # relative → uses baseUrl (auth service) + body: '{"email":"test@example.com","password":"password123"}' + headers: + Content-Type: "application/json" + extract: + - type: jsonPath + name: "jwt_token" + jsonPath: "$.token" + cache: + ttl: "3m" # reuse token for 1 hour per worker; re-login on expiry + assertions: + - type: statusCode + expected: 200 + + - name: "Call API with JWT" + request: + method: "POST" + path: "http://ecom-test-target.service.consul:31362/devnull" # full URL → host B + bodySize: "1MB" # 1 048 576 bytes of random data per request + headers: + Authorization: "Bearer ${jwt_token}" + Content-Type: "application/octet-stream" + assertions: + - type: statusCode + expected: 200 + - type: responseTime + max: "2s" + +standby: + workers: 2 + rps: 0 diff --git a/nomad/consul-kv-config-example.yaml b/nomad/consul-kv-config-example.yaml index 8464ace..1019963 100644 --- a/nomad/consul-kv-config-example.yaml +++ b/nomad/consul-kv-config-example.yaml @@ -48,10 +48,12 @@ scenarios: - name: "Call API with JWT" request: - method: "GET" + method: "POST" path: "http://ecom-test-target.service.consul:31362/users/me" # full URL → host B + bodySize: "1MB" # 1 048 576 bytes of random data per request headers: Authorization: "Bearer ${jwt_token}" + Content-Type: "application/octet-stream" assertions: - type: statusCode expected: 200 diff --git a/nomad/loadtest-consul-kv-example.nomad.hcl b/nomad/loadtest-consul-kv-example.nomad.hcl index 9af3691..a0fd849 100644 --- a/nomad/loadtest-consul-kv-example.nomad.hcl +++ b/nomad/loadtest-consul-kv-example.nomad.hcl @@ -123,8 +123,11 @@ job "envoy-loadtest" { name = "load-health" tags = ["load-health"] port = "health" + # /ready is unauthenticated and always returns 200 — safe for Nomad probes + # even when HEALTH_AUTH_ENABLED=true protects the full /health endpoint. check { - type = "tcp" + type = "http" + path = "/ready" port = "health" interval = "10s" timeout = "6s" @@ -180,6 +183,22 @@ TEST_DURATION=2h LOAD_MODEL_TYPE=Rps TARGET_RPS=0 +# ── Security (Issue #91 / #92) ──────────────────────────────────────────────── +# Uncomment to protect POST /config and POST /stop with a bearer token: +# API_AUTH_TOKEN=your-secret-token-here +# +# Uncomment to also protect GET /health (GET /ready is always open for Nomad): +# HEALTH_AUTH_ENABLED=true + +# ── Node auto-registration with web app (Issue #89) ────────────────────────── +# All three vars must be set to enable registration; missing any one is a no-op. +# NODE_REGISTRY_URL=https://loadtest-control.example.com +# AUTO_REGISTER_PSK=shared-secret-across-all-nodes +# NODE_BASE_URL=http://{{env "attr.unique.network.ip-address"}}:8080 +# NODE_NAME={{env "node.unique.name"}} +# NODE_TAGS={"env":"staging","datacenter":"{{env "node.datacenter"}}"} +# NODE_REGISTRY_INTERVAL=30s + EOH } diff --git a/src/executor.rs b/src/executor.rs index 9911ebb..70e87f6 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -12,6 +12,7 @@ use crate::metrics::{ SCENARIO_STEP_STATUS_CODES, }; use crate::scenario::{Scenario, ScenarioContext, Step}; +use rand::Rng; use std::collections::HashMap; use std::time::Instant; use tokio::time::sleep; @@ -330,10 +331,16 @@ impl ScenarioExecutor { request_builder = request_builder.header(key, substituted_value); } - // Add body if present with variable substitution + // Add body: inline string (with variable substitution) or synthetic generated body if let Some(body) = &step.request.body { let substituted_body = context.substitute_variables(body); request_builder = request_builder.body(substituted_body); + } else if let Some(size) = step.request.body_size { + let synthetic: Vec = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(size) + .collect(); + request_builder = request_builder.body(synthetic); } // Execute the request diff --git a/src/lib.rs b/src/lib.rs index 01a5bba..64a6643 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ pub mod memory_guard; pub mod metrics; pub mod multi_scenario; pub mod percentiles; +pub mod registry; pub mod scenario; pub mod throughput; pub mod utils; diff --git a/src/main.rs b/src/main.rs index e393752..7a30248 100644 --- a/src/main.rs +++ b/src/main.rs @@ -187,6 +187,160 @@ fn print_pool_report() { info!("{}\n", "=".repeat(120)); } +/// Reads current environment variables and writes an equivalent YAML config +/// file. Called when the binary is run as `rust-loadtest migrate [--output +/// ]`. Exits the process when done. +fn run_migrate(args: &[String]) { + // Parse optional --output flag + let output_path = args + .windows(2) + .find(|w| w[0] == "--output" || w[0] == "-o") + .map(|w| w[1].as_str()) + .unwrap_or("config.yaml"); + + let env = |k: &str| std::env::var(k).ok(); + let env_or = |k: &str, default: &str| std::env::var(k).unwrap_or_else(|_| default.to_string()); + + // ── Required ────────────────────────────────────────────────────────────── + let target_url = match env("TARGET_URL") { + Some(u) => u, + None => { + eprintln!("migrate: TARGET_URL is not set — nothing to migrate."); + std::process::exit(1); + } + }; + + // ── Load model ──────────────────────────────────────────────────────────── + let model_type = env_or("LOAD_MODEL_TYPE", "Concurrent"); + let (load_model_str, target_rps_line) = match model_type.to_lowercase().as_str() { + "rps" => { + let rps = env_or("TARGET_RPS", "10"); + ("rps".to_string(), format!(" target: {}", rps)) + } + "ramprps" => { + let min = env_or("MIN_RPS", "1"); + let max = env_or("MAX_RPS", "100"); + let ramp = env_or("RAMP_DURATION", ""); + let ramp_line = if ramp.is_empty() { + String::new() + } else { + format!("\n rampDuration: \"{}\"", ramp) + }; + ( + "rampRps".to_string(), + format!(" minRps: {}\n maxRps: {}{}", min, max, ramp_line), + ) + } + _ => ("concurrent".to_string(), String::new()), + }; + + // ── Workers / duration / timeout ────────────────────────────────────────── + let workers = env_or("NUM_CONCURRENT_TASKS", "10"); + let duration = env_or("TEST_DURATION", "2h"); + let timeout = env_or("REQUEST_TIMEOUT_SECS", "30"); + let timeout_str = format!("{}s", timeout); + let skip_tls = env_or("SKIP_TLS_VERIFY", "false"); + + // ── Request ─────────────────────────────────────────────────────────────── + let method = env_or("REQUEST_TYPE", "GET"); + let send_json = env_or("SEND_JSON", "false"); + let json_payload = env("JSON_PAYLOAD"); + + // ── Body section for single-URL config ──────────────────────────────────── + let body_section = if send_json == "true" { + match &json_payload { + Some(payload) => format!( + "\n body: '{}'\n headers:\n Content-Type: application/json", + payload + ), + None => String::new(), + } + } else { + String::new() + }; + + // ── Optional metadata ───────────────────────────────────────────────────── + let node_id = env("CLUSTER_NODE_ID").unwrap_or_default(); + let region = env_or("CLUSTER_REGION", "local"); + let tenant = env("TENANT"); + + let mut metadata_extras = String::new(); + if !node_id.is_empty() { + metadata_extras.push_str(&format!("\n author: \"{}\"", node_id)); + } + if let Some(t) = &tenant { + metadata_extras.push_str(&format!("\n tenant: \"{}\"", t)); + } + + // ── Assemble YAML ───────────────────────────────────────────────────────── + let load_section = if target_rps_line.is_empty() { + format!("load:\n model: \"{}\"", load_model_str) + } else { + format!( + "load:\n model: \"{}\"\n{}", + load_model_str, target_rps_line + ) + }; + + let yaml = format!( + r#"version: "1.0" + +metadata: + name: "Migrated configuration" + description: "Generated by rust-loadtest migrate from environment variables" + region: "{region}"{metadata_extras} + +config: + baseUrl: "{target_url}" + workers: {workers} + duration: "{duration}" + timeout: "{timeout_str}" + skipTlsVerify: {skip_tls} + +{load_section} + +scenarios: + - name: "Default scenario" + weight: 100 + steps: + - name: "{method} {target_url}" + request: + method: "{method}" + path: "/"{body_section} + assertions: + - type: statusCode + expected: 200 +"#, + region = region, + metadata_extras = metadata_extras, + target_url = target_url, + workers = workers, + duration = duration, + timeout_str = timeout_str, + skip_tls = skip_tls, + load_section = load_section, + method = method, + body_section = body_section, + ); + + match std::fs::write(output_path, &yaml) { + Ok(()) => { + eprintln!("migrate: wrote YAML config to '{}'", output_path); + eprintln!(" Review the file, adjust as needed, then POST it:"); + eprintln!( + " curl -X POST http://:8080/config \ + --data-binary @{}", + output_path + ); + } + Err(e) => { + eprintln!("migrate: failed to write '{}': {}", output_path, e); + std::process::exit(1); + } + } + std::process::exit(0); +} + /// Prints helpful configuration documentation. fn print_config_help() { eprintln!("Required environment variables:"); @@ -246,8 +400,20 @@ fn print_config_help() { eprintln!( " CLUSTER_HEALTH_ADDR - Health/config HTTP listen address (default: 0.0.0.0:8080)" ); + eprintln!(" API_AUTH_TOKEN - Bearer token required on POST /config and POST /stop"); + eprintln!(" (optional; when unset, endpoints are open)"); + eprintln!(" HEALTH_AUTH_ENABLED - Set to 'true' to require Bearer token on GET /health"); + eprintln!(" (default: false — /health is open, /ready always open)"); + eprintln!(" NODE_REGISTRY_URL - Web app base URL for auto-registration (optional)"); + eprintln!(" AUTO_REGISTER_PSK - Pre-shared key for X-Auto-Register-PSK header"); + eprintln!(" NODE_BASE_URL - This node's reachable URL (e.g. http://10.0.1.5:8080)"); + eprintln!(" NODE_NAME - Human-readable node name (default: CLUSTER_NODE_ID)"); + eprintln!(" NODE_TAGS - JSON tags object (default: {{}})"); + eprintln!(" NODE_REGISTRY_INTERVAL - Heartbeat interval (default: 30s)"); + eprintln!(" GET /ready - Returns {{\"ready\":true}} — no auth (Nomad/K8s probe)"); eprintln!(" GET /health - Returns JSON with live node metrics"); eprintln!(" POST /config - Accepts a YAML config body to reconfigure workers"); + eprintln!(" POST /stop - Stops all workers and transitions node to idle"); eprintln!(); eprintln!("Logging configuration:"); eprintln!(" RUST_LOG - Log level: error, warn, info, debug, trace"); @@ -314,6 +480,8 @@ struct TestState { node_state: &'static str, // "running" | "idle" | "standby" generation: u64, // bumped on each new test; completion-watcher checks this standby: Option, + /// Tenant identifier for the active test run. None when no tenant is set. + tenant: Option, } /// Returns the current Unix timestamp in seconds. @@ -397,6 +565,7 @@ fn spawn_completion_watcher( percentile_tracking_enabled: sb.percentile_tracking_enabled, percentile_sampling_rate: sb.percentile_sampling_rate, region: sb.region.clone(), + tenant: String::new(), // standby mode has no tenant stop_rx: new_stop_rx.clone(), }; tokio::spawn(run_worker(client.clone(), wc, new_start)) @@ -437,6 +606,14 @@ struct WorkerPool { #[tokio::main] async fn main() -> Result<(), Box> { + // ── Subcommand dispatch ──────────────────────────────────────────────────── + let args: Vec = std::env::args().collect(); + if args.get(1).map(|s| s.as_str()) == Some("migrate") { + run_migrate(&args[2..]); + // run_migrate always exits; this is unreachable but satisfies the compiler. + return Ok(()); + } + // Initialize tracing subscriber init_tracing(); @@ -533,11 +710,15 @@ async fn main() -> Result<(), Box> { node_state: "running", generation: 0, standby: None, + tenant: None, })); // ── Standalone health + config HTTP server ───────────────────────────── + // GET /ready → {"ready":true} (no auth — safe for Nomad health checks) // GET /health → JSON with node identity and live metrics + // (requires Bearer token when HEALTH_AUTH_ENABLED=true) // POST /config → accept YAML body, apply new config, restart workers + // POST /stop → stop active test workers { let health_addr = std::env::var("CLUSTER_HEALTH_ADDR").unwrap_or_else(|_| "0.0.0.0:8080".to_string()); @@ -550,6 +731,12 @@ async fn main() -> Result<(), Box> { let region_for_http = config.cluster.region.clone(); let live_metrics_for_http = live_metrics.clone(); let config_tx_for_http = config_tx.clone(); + let worker_pool_for_http = worker_pool.clone(); + let test_state_for_http = test_state.clone(); + let api_token_for_http = std::env::var("API_AUTH_TOKEN").ok(); + let health_auth_enabled_for_http = std::env::var("HEALTH_AUTH_ENABLED") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); tokio::spawn(async move { let make_svc = make_service_fn(move |_conn| { @@ -557,20 +744,54 @@ async fn main() -> Result<(), Box> { let region = region_for_http.clone(); let lm = live_metrics_for_http.clone(); let tx = config_tx_for_http.clone(); + let wp = worker_pool_for_http.clone(); + let ts = test_state_for_http.clone(); + let token = api_token_for_http.clone(); + let health_auth_enabled = health_auth_enabled_for_http; async move { Ok::<_, Infallible>(service_fn(move |req: Request| { let node_id = node_id.clone(); let region = region.clone(); let lm = lm.clone(); let tx = tx.clone(); + let wp = wp.clone(); + let ts = ts.clone(); + let token = token.clone(); async move { match (req.method(), req.uri().path()) { + // Unauthenticated liveness probe — safe for + // Nomad / K8s health checks regardless of + // HEALTH_AUTH_ENABLED. + (&Method::GET, "/ready") => Ok::<_, Infallible>( + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(r#"{"ready":true}"#)) + .unwrap(), + ), (&Method::GET, "/health") => { + if health_auth_enabled { + if let Some(ref t) = token { + let auth = req + .headers() + .get("authorization") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + if auth != format!("Bearer {}", t) { + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::from("unauthorized")) + .unwrap()); + } + } + } let m = lm.lock().unwrap().clone(); + let current_tenant = ts.lock().unwrap().tenant.clone(); let body = serde_json::json!({ "status": "ok", "node_id": node_id, "region": region, + "tenant": current_tenant, "node_state": m.node_state, "rps": (m.rps * 100.0).round() / 100.0, "error_rate_pct": (m.error_rate_pct * 100.0).round() / 100.0, @@ -595,6 +816,19 @@ async fn main() -> Result<(), Box> { ) } (&Method::POST, "/config") => { + if let Some(ref t) = token { + let auth = req + .headers() + .get("authorization") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + if auth != format!("Bearer {}", t) { + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::from("unauthorized")) + .unwrap()); + } + } let body_bytes = hyper::body::to_bytes(req.into_body()) .await .unwrap_or_default(); @@ -618,6 +852,90 @@ async fn main() -> Result<(), Box> { ), } } + (&Method::POST, "/stop") => { + if let Some(ref t) = token { + let auth = req + .headers() + .get("authorization") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + if auth != format!("Bearer {}", t) { + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::from("unauthorized")) + .unwrap()); + } + } + // Optional JSON body: {"tenant": "acme"}. + // When present, only stop if the tenant matches the + // active test. Omit body to stop unconditionally. + let body_bytes = hyper::body::to_bytes(req.into_body()) + .await + .unwrap_or_default(); + let stop_tenant: Option = if body_bytes.is_empty() { + None + } else { + serde_json::from_slice::(&body_bytes) + .ok() + .and_then(|v| { + v.get("tenant") + .and_then(|t| t.as_str()) + .map(|s| s.to_string()) + }) + }; + // Guard: if caller specifies a tenant, only stop + // if it matches the currently running test. + if let Some(ref filter) = stop_tenant { + let active = ts.lock().unwrap().tenant.clone(); + if active.as_deref() != Some(filter.as_str()) { + let body = serde_json::json!({ + "stopped": false, + "message": "no active test for this tenant" + }) + .to_string(); + return Ok(Response::builder() + .status(StatusCode::CONFLICT) + .header("Content-Type", "application/json") + .body(Body::from(body)) + .unwrap()); + } + } + // Send stop signal to all workers. + { + let pool = wp.lock().await; + let _ = pool.stop_tx.send(true); + } + // Abort worker handles. + { + let mut pool = wp.lock().await; + for h in pool.handles.drain(..) { + h.abort(); + } + } + // Transition node state to idle. + { + let mut state = ts.lock().unwrap(); + state.node_state = "idle"; + state.tenant = None; + state.generation += 1; + } + let m = lm.lock().unwrap().clone(); + let body = serde_json::json!({ + "stopped": true, + "tenant": stop_tenant, + "rps": m.rps, + "workers": m.workers, + "message": "test stopped" + }) + .to_string(); + Ok::<_, Infallible>( + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(body)) + .unwrap(), + ) + } _ => Ok::<_, Infallible>( Response::builder() .status(StatusCode::NOT_FOUND) @@ -641,6 +959,21 @@ async fn main() -> Result<(), Box> { }); } + // ── Node auto-registration (Issue #89) ───────────────────────────────── + // Opt-in: all three vars must be set or registration is skipped silently. + if let Some(reg_cfg) = rust_loadtest::registry::RegistrationConfig::from_env( + &config.cluster.node_id, + &config.cluster.region, + ) { + info!( + registry_url = %reg_cfg.registry_url, + node = %reg_cfg.node_name, + interval_secs = reg_cfg.interval.as_secs(), + "Auto-registration enabled" + ); + rust_loadtest::registry::spawn_registration_task(client.clone(), reg_cfg); + } + // ── Config-watcher / worker-pool reconfiguration ─────────────────────── // Receives YAML from POST /config, drains workers, spawns fresh pool. { @@ -713,6 +1046,7 @@ async fn main() -> Result<(), Box> { let (new_stop_tx, new_stop_rx) = watch::channel(false); let new_start = time::Instant::now(); + let new_tenant = yaml_cfg_parsed.metadata.tenant.clone(); // If the YAML contains scenarios, use scenario workers; otherwise // fall back to the legacy single-URL worker. @@ -738,6 +1072,7 @@ async fn main() -> Result<(), Box> { .percentile_tracking_enabled, percentile_sampling_rate: new_cfg.percentile_sampling_rate, region: region_for_watcher.clone(), + tenant: new_tenant.clone().unwrap_or_default(), }; tokio::spawn(run_scenario_worker( new_client.clone(), @@ -764,6 +1099,7 @@ async fn main() -> Result<(), Box> { .percentile_tracking_enabled, percentile_sampling_rate: new_cfg.percentile_sampling_rate, region: region_for_watcher.clone(), + tenant: new_tenant.clone().unwrap_or_default(), stop_rx: new_stop_rx.clone(), }; tokio::spawn(run_worker(new_client.clone(), wc, new_start)) @@ -786,6 +1122,7 @@ async fn main() -> Result<(), Box> { percentile_tracking_enabled: new_cfg.percentile_tracking_enabled, percentile_sampling_rate: new_cfg.percentile_sampling_rate, region: region_for_watcher.clone(), + tenant: new_tenant.clone().unwrap_or_default(), stop_rx: new_stop_rx.clone(), }; tokio::spawn(run_worker(new_client.clone(), wc, new_start)) @@ -808,6 +1145,7 @@ async fn main() -> Result<(), Box> { ts.node_state = "running"; ts.generation += 1; ts.standby = standby_cfg; + ts.tenant = new_tenant.clone(); ts.generation }; spawn_completion_watcher( @@ -886,6 +1224,7 @@ async fn main() -> Result<(), Box> { let mut interval = time::interval(Duration::from_secs(1)); let mut prev_requests: u64 = 0; let mut prev_errors: u64 = 0; + let mut prev_tenant: String = String::new(); // CPU tracking (Linux only) — tracks utime+stime jiffies #[cfg(target_os = "linux")] let mut prev_cpu_ticks: Option = None; @@ -894,14 +1233,29 @@ async fn main() -> Result<(), Box> { interval.tick().await; // ── Request counter (monotonic) ────────────────────────── - let curr_requests = REQUEST_TOTAL.with_label_values(&[®ion]).get(); + let tenant_str = test_state_for_updater + .lock() + .unwrap() + .tenant + .clone() + .unwrap_or_default(); + // Reset delta tracking when the active tenant changes so we + // don't compute a nonsensical RPS across test boundaries. + if tenant_str != prev_tenant { + prev_requests = 0; + prev_errors = 0; + prev_tenant = tenant_str.clone(); + } + let curr_requests = REQUEST_TOTAL + .with_label_values(&[®ion, &tenant_str]) + .get(); // ── Error counter: sum all known categories ─────────────── let curr_errors: u64 = ErrorCategory::all() .iter() .map(|cat| { REQUEST_ERRORS_BY_CATEGORY - .with_label_values(&[cat.label(), ®ion]) + .with_label_values(&[cat.label(), ®ion, &tenant_str]) .get() }) .sum(); @@ -1102,6 +1456,8 @@ async fn main() -> Result<(), Box> { percentile_tracking_enabled: config.percentile_tracking_enabled, percentile_sampling_rate: config.percentile_sampling_rate, region: config.cluster.region.clone(), + // No tenant when running from env-var config (no YAML submitted yet). + tenant: String::new(), // Graceful-stop signal (Issue #79). In cluster mode the // config-watcher fires this before replacing the worker pool. // In standalone mode it is never fired; workers self-terminate diff --git a/src/metrics.rs b/src/metrics.rs index b77e2a7..9866f66 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -17,21 +17,21 @@ lazy_static::lazy_static! { IntCounterVec::new( Opts::new("requests_total", "Total number of HTTP requests made") .namespace(METRIC_NAMESPACE.as_str()), - &["region"] + &["region", "tenant"] ).unwrap(); pub static ref REQUEST_STATUS_CODES: IntCounterVec = IntCounterVec::new( Opts::new("requests_status_codes_total", "Number of HTTP requests by status code") .namespace(METRIC_NAMESPACE.as_str()), - &["status_code", "region"] + &["status_code", "region", "tenant"] ).unwrap(); pub static ref CONCURRENT_REQUESTS: prometheus::GaugeVec = prometheus::GaugeVec::new( Opts::new("concurrent_requests", "Number of HTTP requests currently in flight") .namespace(METRIC_NAMESPACE.as_str()), - &["region"] + &["region", "tenant"] ).unwrap(); pub static ref REQUEST_DURATION_SECONDS: HistogramVec = @@ -40,7 +40,7 @@ lazy_static::lazy_static! { "request_duration_seconds", "HTTP request latencies in seconds." ).namespace(METRIC_NAMESPACE.as_str()), - &["region"] + &["region", "tenant"] ).unwrap(); // === Scenario Metrics === @@ -103,7 +103,7 @@ lazy_static::lazy_static! { IntCounterVec::new( Opts::new("scenario_requests_total", "Total number of requests per scenario") .namespace(METRIC_NAMESPACE.as_str()), - &["scenario"] + &["scenario", "tenant"] ).unwrap(); pub static ref SCENARIO_THROUGHPUT_RPS: prometheus::GaugeVec = @@ -119,7 +119,7 @@ lazy_static::lazy_static! { IntCounterVec::new( Opts::new("request_errors_by_category", "Number of errors by category") .namespace(METRIC_NAMESPACE.as_str()), - &["category", "region"] + &["category", "region", "tenant"] ).unwrap(); // === Connection Pool Metrics (Issue #36) === diff --git a/src/registry.rs b/src/registry.rs new file mode 100644 index 0000000..38af06b --- /dev/null +++ b/src/registry.rs @@ -0,0 +1,147 @@ +//! Node auto-registration with the web app registry (Issue #89). +//! +//! When `NODE_REGISTRY_URL`, `AUTO_REGISTER_PSK`, and `NODE_BASE_URL` are all +//! set, the node POSTs its identity to the web app at startup and re-registers +//! at a configurable interval (heartbeat) to keep its record alive. +//! +//! If any of the three required env vars is missing, registration is silently +//! skipped — the node operates exactly as before (fully backwards-compatible). + +use reqwest::Client; +use std::time::Duration; +use tracing::{error, info, warn}; + +use crate::utils::parse_duration_string; + +/// Configuration for auto-registration, built from environment variables. +pub struct RegistrationConfig { + /// Base URL of the web app, e.g. `https://loadtest-control.example.com` + pub registry_url: String, + /// Pre-shared key sent as `X-Auto-Register-PSK` header. + pub psk: String, + /// This node's reachable URL, e.g. `http://10.0.1.5:8080` + pub node_base_url: String, + /// Human-readable node name shown in the registry UI. + pub node_name: String, + /// Region label forwarded from `ClusterConfig`. + pub region: String, + /// Arbitrary JSON tags, e.g. `{"env":"staging","rack":"A"}`. + pub tags: serde_json::Value, + /// How often to re-register (heartbeat). Default: 30 s. + pub interval: Duration, +} + +impl RegistrationConfig { + /// Build from environment variables. Returns `None` if any required var + /// (`NODE_REGISTRY_URL`, `AUTO_REGISTER_PSK`, `NODE_BASE_URL`) is missing. + pub fn from_env(node_id: &str, region: &str) -> Option { + let registry_url = match std::env::var("NODE_REGISTRY_URL") { + Ok(v) => v, + Err(_) => return None, + }; + let psk = match std::env::var("AUTO_REGISTER_PSK") { + Ok(v) => v, + Err(_) => { + warn!("NODE_REGISTRY_URL is set but AUTO_REGISTER_PSK is missing — skipping registration"); + return None; + } + }; + let node_base_url = match std::env::var("NODE_BASE_URL") { + Ok(v) => v, + Err(_) => { + warn!( + "NODE_REGISTRY_URL is set but NODE_BASE_URL is missing — skipping registration" + ); + return None; + } + }; + + let node_name = std::env::var("NODE_NAME").unwrap_or_else(|_| node_id.to_string()); + + let tags = std::env::var("NODE_TAGS") + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .unwrap_or_else(|| serde_json::json!({})); + + let interval = std::env::var("NODE_REGISTRY_INTERVAL") + .ok() + .and_then(|s| parse_duration_string(&s).ok()) + .unwrap_or(Duration::from_secs(30)); + + Some(Self { + registry_url, + psk, + node_base_url, + node_name, + region: region.to_string(), + tags, + interval, + }) + } +} + +/// Send a single registration POST. Returns `true` on success. +/// Errors are logged but never propagated — the node must keep running. +pub async fn register_once(client: &Client, cfg: &RegistrationConfig) -> bool { + let url = format!("{}/api/v1/nodes/register", cfg.registry_url); + let body = serde_json::json!({ + "name": cfg.node_name, + "base_url": cfg.node_base_url, + "region": cfg.region, + "tags": cfg.tags, + }); + + match client + .post(&url) + .header("X-Auto-Register-PSK", &cfg.psk) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + { + Ok(resp) if resp.status().is_success() => { + info!( + url = %url, + node = %cfg.node_name, + base_url = %cfg.node_base_url, + "Node registered with web app" + ); + true + } + Ok(resp) => { + warn!( + url = %url, + status = %resp.status(), + node = %cfg.node_name, + "Node registration rejected by web app" + ); + false + } + Err(e) => { + error!( + url = %url, + error = %e, + node = %cfg.node_name, + "Node registration request failed" + ); + false + } + } +} + +/// Spawn a background task that registers immediately then re-registers at +/// `cfg.interval`. All failures are logged; the task never crashes the node. +pub fn spawn_registration_task(client: Client, cfg: RegistrationConfig) { + tokio::spawn(async move { + // Register immediately on startup. + register_once(&client, &cfg).await; + + // Heartbeat loop — keeps the node alive in the registry. + let mut ticker = tokio::time::interval(cfg.interval); + ticker.tick().await; // first tick fires immediately; skip it + loop { + ticker.tick().await; + register_once(&client, &cfg).await; + } + }); +} diff --git a/src/scenario.rs b/src/scenario.rs index 47f6d40..37bf262 100644 --- a/src/scenario.rs +++ b/src/scenario.rs @@ -25,6 +25,7 @@ use std::time::{Duration, Instant}; /// method: "GET".to_string(), /// path: "/products".to_string(), /// body: None, +/// body_size: None, /// headers: HashMap::new(), /// }, /// extractions: vec![], @@ -171,6 +172,9 @@ pub struct RequestConfig { /// Optional request body (can contain variable references) pub body: Option, + /// Generate a synthetic body of exactly this many bytes (mutually exclusive with `body`). + pub body_size: Option, + /// Request headers (values can contain variable references) pub headers: HashMap, } @@ -463,6 +467,7 @@ mod tests { method: "GET".to_string(), path: "/api/test".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], diff --git a/src/utils.rs b/src/utils.rs index 4682221..66b3c6a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -40,6 +40,51 @@ pub fn parse_duration_string(s: &str) -> Result { } } +/// Parses a body size string like "512B", "512KB", or "1MB" into bytes. +/// +/// Supported units: +/// - `B` — bytes +/// - `KB` — kibibytes (× 1 024) +/// - `MB` — mebibytes (× 1 048 576) +pub fn parse_body_size(s: &str) -> Result { + let s = s.trim(); + + if s.is_empty() { + return Err("Body size string cannot be empty".to_string()); + } + + let (value_str, multiplier) = if let Some(v) = s.strip_suffix("MB") { + (v, 1024 * 1024usize) + } else if let Some(v) = s.strip_suffix("KB") { + (v, 1024usize) + } else if let Some(v) = s.strip_suffix('B') { + // Reject units like "GB", "TB" — value_str must be purely numeric + if v.trim().chars().any(|c| !c.is_ascii_digit()) { + return Err(format!( + "Unknown body size unit in '{}'. Use 'B', 'KB', or 'MB'.", + s + )); + } + (v, 1usize) + } else { + return Err(format!( + "Unknown body size unit in '{}'. Use 'B', 'KB', or 'MB'.", + s + )); + }; + + let value: usize = value_str + .trim() + .parse() + .map_err(|_| format!("Invalid numeric value in body size: '{}'", value_str.trim()))?; + + if value == 0 { + return Err("Body size must be greater than zero".to_string()); + } + + Ok(value * multiplier) +} + /// Parses a comma-separated header string with support for escaped commas. /// /// Use `\,` to include a literal comma in a header value. @@ -278,4 +323,59 @@ mod tests { assert_eq!(result[0], "Connection:keep-alive,close"); assert_eq!(result[1], "Keep-Alive:timeout=5,max=1000,custom=value"); } + + // --- parse_body_size tests --- + + mod body_size { + use super::*; + + #[test] + fn parse_bytes() { + assert_eq!(parse_body_size("128B").unwrap(), 128); + } + + #[test] + fn parse_kilobytes() { + assert_eq!(parse_body_size("512KB").unwrap(), 512 * 1024); + } + + #[test] + fn parse_megabytes() { + assert_eq!(parse_body_size("1MB").unwrap(), 1024 * 1024); + } + + #[test] + fn parse_large_mb() { + assert_eq!(parse_body_size("10MB").unwrap(), 10 * 1024 * 1024); + } + + #[test] + fn trims_whitespace() { + assert_eq!(parse_body_size(" 256KB ").unwrap(), 256 * 1024); + } + + #[test] + fn zero_errors() { + let err = parse_body_size("0KB").unwrap_err(); + assert!(err.contains("greater than zero"), "error was: {}", err); + } + + #[test] + fn empty_errors() { + let err = parse_body_size("").unwrap_err(); + assert!(err.contains("empty"), "error was: {}", err); + } + + #[test] + fn unknown_unit_errors() { + let err = parse_body_size("1GB").unwrap_err(); + assert!(err.contains("Unknown body size unit"), "error was: {}", err); + } + + #[test] + fn invalid_number_errors() { + let err = parse_body_size("abcKB").unwrap_err(); + assert!(err.contains("Invalid numeric"), "error was: {}", err); + } + } } diff --git a/src/worker.rs b/src/worker.rs index baa9a90..b18613b 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -51,6 +51,10 @@ pub struct WorkerConfig { /// In standalone mode this is "local"; in cluster mode it is the node's /// geographic region (e.g. "us-central1"). pub region: String, + /// Optional tenant identifier. Empty string when no tenant is configured. + /// Attached as a `tenant` label to all request metrics so per-tenant + /// request counts can be queried from Prometheus for billing. + pub tenant: String, /// Graceful-stop signal (Issue #79). When the sender fires `true` the /// worker finishes its current request and exits at the top of the next /// loop iteration so no in-flight request is aborted. @@ -137,9 +141,11 @@ pub async fn run_worker(client: reqwest::Client, config: WorkerConfig, start_tim // Track metrics CONCURRENT_REQUESTS - .with_label_values(&[&config.region]) + .with_label_values(&[&config.region, &config.tenant]) + .inc(); + REQUEST_TOTAL + .with_label_values(&[&config.region, &config.tenant]) .inc(); - REQUEST_TOTAL.with_label_values(&[&config.region]).inc(); let request_start_time = time::Instant::now(); @@ -152,13 +158,13 @@ pub async fn run_worker(client: reqwest::Client, config: WorkerConfig, start_tim // Use static strings to avoid a heap allocation on every request let status_str = status_code_label(status); REQUEST_STATUS_CODES - .with_label_values(&[status_str, &config.region]) + .with_label_values(&[status_str, &config.region, &config.tenant]) .inc(); // Categorize HTTP errors (Issue #34) if let Some(category) = ErrorCategory::from_status_code(status) { REQUEST_ERRORS_BY_CATEGORY - .with_label_values(&[category.label(), &config.region]) + .with_label_values(&[category.label(), &config.region, &config.tenant]) .inc(); } @@ -179,13 +185,13 @@ pub async fn run_worker(client: reqwest::Client, config: WorkerConfig, start_tim } Err(e) => { REQUEST_STATUS_CODES - .with_label_values(&["error", &config.region]) + .with_label_values(&["error", &config.region, &config.tenant]) .inc(); // Categorize request error (Issue #34) let error_category = ErrorCategory::from_reqwest_error(&e); REQUEST_ERRORS_BY_CATEGORY - .with_label_values(&[error_category.label(), &config.region]) + .with_label_values(&[error_category.label(), &config.region, &config.tenant]) .inc(); error!( @@ -201,10 +207,10 @@ pub async fn run_worker(client: reqwest::Client, config: WorkerConfig, start_tim let actual_latency_ms = request_start_time.elapsed().as_millis() as u64; REQUEST_DURATION_SECONDS - .with_label_values(&[&config.region]) + .with_label_values(&[&config.region, &config.tenant]) .observe(request_start_time.elapsed().as_secs_f64()); CONCURRENT_REQUESTS - .with_label_values(&[&config.region]) + .with_label_values(&[&config.region, &config.tenant]) .dec(); // Record latency in percentile tracker (Issue #33, #66, #70, #72) @@ -310,6 +316,8 @@ pub struct ScenarioWorkerConfig { pub percentile_sampling_rate: u8, /// Region label attached to all metrics emitted by this worker (Issue #45). pub region: String, + /// Optional tenant identifier. Empty string when no tenant is configured. + pub tenant: String, } /// Runs a scenario-based worker task that executes multi-step scenarios according to the load model. @@ -434,20 +442,22 @@ pub async fn run_scenario_worker( if step.cache_hit { continue; } - REQUEST_TOTAL.with_label_values(&[&config.region]).inc(); + REQUEST_TOTAL + .with_label_values(&[&config.region, &config.tenant]) + .inc(); if let Some(code) = step.status_code { REQUEST_STATUS_CODES - .with_label_values(&[status_code_label(code), &config.region]) + .with_label_values(&[status_code_label(code), &config.region, &config.tenant]) .inc(); } REQUEST_DURATION_SECONDS - .with_label_values(&[&config.region]) + .with_label_values(&[&config.region, &config.tenant]) .observe(step.response_time_ms as f64 / 1000.0); } // Record throughput (Issue #35) SCENARIO_REQUESTS_TOTAL - .with_label_values(&[&config.scenario.name]) + .with_label_values(&[&config.scenario.name, &config.tenant]) .inc(); GLOBAL_THROUGHPUT_TRACKER.record( &config.scenario.name, diff --git a/src/yaml_config.rs b/src/yaml_config.rs index 0bbb7fb..872bfd3 100644 --- a/src/yaml_config.rs +++ b/src/yaml_config.rs @@ -18,6 +18,7 @@ use crate::load_models::LoadModel; use crate::scenario::{ Assertion, Extractor, RequestConfig, Scenario, Step, StepCache, VariableExtraction, }; +use crate::utils::parse_body_size; /// Errors that can occur when loading or parsing YAML configuration. #[derive(Error, Debug)] @@ -62,6 +63,10 @@ pub struct YamlMetadata { pub author: Option, #[serde(default)] pub tags: Vec, + /// Optional tenant identifier for multi-tenant SaaS deployments. + /// When set, all metrics emitted by this test run include a `tenant` label + /// so per-tenant request counts can be queried from Prometheus. + pub tenant: Option, } /// Global configuration settings. @@ -315,6 +320,11 @@ pub struct YamlRequest { pub headers: Option>, pub body: Option, + + /// Generate a synthetic body of this size instead of using `body`. + /// Mutually exclusive with `body`. Supports "512B", "512KB", "1MB". + #[serde(rename = "bodySize")] + pub body_size: Option, } /// Extractor definition in YAML. @@ -564,10 +574,33 @@ impl YamlConfig { yaml_step.request.path.clone() }; + // Validate mutual exclusion of body and body_size + if yaml_step.request.body.is_some() && yaml_step.request.body_size.is_some() { + return Err(YamlConfigError::Validation(format!( + "Step '{}': 'body' and 'bodySize' are mutually exclusive — use one or the other", + step_name + ))); + } + + // Parse body_size string to bytes + let body_size = yaml_step + .request + .body_size + .as_deref() + .map(parse_body_size) + .transpose() + .map_err(|e| { + YamlConfigError::Validation(format!( + "Step '{}': invalid bodySize — {}", + step_name, e + )) + })?; + let request = RequestConfig { method: yaml_step.request.method.clone(), path, body: yaml_step.request.body.clone(), + body_size, headers, }; diff --git a/tests/assertion_integration_tests.rs b/tests/assertion_integration_tests.rs index eabca6b..cf43c81 100644 --- a/tests/assertion_integration_tests.rs +++ b/tests/assertion_integration_tests.rs @@ -41,6 +41,7 @@ async fn test_status_code_assertion_pass() { method: "GET".to_string(), path: "/status/200".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -86,6 +87,7 @@ async fn test_status_code_assertion_fail() { method: "GET".to_string(), path: "/status/200".to_string(), // Returns 200, not 404 body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -131,6 +133,7 @@ async fn test_response_time_assertion_pass() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -177,6 +180,7 @@ async fn test_response_time_assertion_fail() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -225,6 +229,7 @@ async fn test_json_path_assertion_existence() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -273,6 +278,7 @@ async fn test_json_path_assertion_value_match() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -321,6 +327,7 @@ async fn test_json_path_assertion_value_mismatch() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -372,6 +379,7 @@ async fn test_body_contains_assertion_pass() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -417,6 +425,7 @@ async fn test_body_contains_assertion_fail() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -462,6 +471,7 @@ async fn test_body_matches_regex_assertion() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -506,6 +516,7 @@ async fn test_header_exists_assertion_pass() { method: "GET".to_string(), path: "/headers".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -549,6 +560,7 @@ async fn test_header_exists_assertion_fail() { method: "GET".to_string(), path: "/headers".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -596,6 +608,7 @@ async fn test_multiple_assertions_all_pass() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -651,6 +664,7 @@ async fn test_multiple_assertions_mixed_results() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -710,6 +724,7 @@ async fn test_multi_step_assertion_stops_on_failure() { method: "GET".to_string(), path: "/status/200".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -723,6 +738,7 @@ async fn test_multi_step_assertion_stops_on_failure() { method: "GET".to_string(), path: "/status/200".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -736,6 +752,7 @@ async fn test_multi_step_assertion_stops_on_failure() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -786,6 +803,7 @@ async fn test_realistic_e_commerce_flow_with_assertions() { method: "GET".to_string(), path: "/health".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -802,6 +820,7 @@ async fn test_realistic_e_commerce_flow_with_assertions() { method: "GET".to_string(), path: "/products?limit=10".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -821,6 +840,7 @@ async fn test_realistic_e_commerce_flow_with_assertions() { method: "GET".to_string(), path: "/status".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], diff --git a/tests/cookie_session_tests.rs b/tests/cookie_session_tests.rs index 1463cdc..b69b3fd 100644 --- a/tests/cookie_session_tests.rs +++ b/tests/cookie_session_tests.rs @@ -42,6 +42,7 @@ async fn test_cookies_persist_across_steps() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -59,6 +60,7 @@ async fn test_cookies_persist_across_steps() { method: "GET".to_string(), path: "/users/me".to_string(), body: None, + body_size: None, headers: HashMap::new(), // No manual auth header needed - cookies handle it }, extractions: vec![], @@ -122,6 +124,7 @@ async fn test_auth_flow_with_token_and_cookies() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -145,6 +148,7 @@ async fn test_auth_flow_with_token_and_cookies() { method: "GET".to_string(), path: "/users/me".to_string(), body: None, + body_size: None, headers: { let mut headers = HashMap::new(); // Use extracted token in Authorization header @@ -220,6 +224,7 @@ async fn test_cookie_isolation_between_clients() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -276,6 +281,7 @@ async fn test_shopping_flow_with_session() { method: "GET".to_string(), path: "/products?limit=3".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![VariableExtraction { @@ -299,6 +305,7 @@ async fn test_shopping_flow_with_session() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -325,6 +332,7 @@ async fn test_shopping_flow_with_session() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -343,6 +351,7 @@ async fn test_shopping_flow_with_session() { method: "GET".to_string(), path: "/cart".to_string(), body: None, + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Authorization".to_string(), "Bearer ${token}".to_string()); @@ -403,6 +412,7 @@ async fn test_client_without_cookies_fails_session() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); diff --git a/tests/csv_data_driven_tests.rs b/tests/csv_data_driven_tests.rs index 4d9b1b9..bb8e962 100644 --- a/tests/csv_data_driven_tests.rs +++ b/tests/csv_data_driven_tests.rs @@ -139,6 +139,7 @@ async fn test_scenario_with_csv_data() { method: "POST".to_string(), path: "/post".to_string(), body: Some(r#"{"username": "${username}", "email": "${email}"}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -190,6 +191,7 @@ async fn test_multiple_users_different_data() { method: "GET".to_string(), path: "/get".to_string(), // Simple GET endpoint body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -229,6 +231,23 @@ async fn test_multiple_users_different_data() { #[tokio::test] async fn test_realistic_user_pool() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/get")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("/json")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + // Simulate a realistic user pool with credentials let user_csv = r#"username,password,email,role alice,alice123,alice@company.com,admin @@ -237,6 +256,7 @@ carol,carol789,carol@company.com,user dave,dave012,dave@company.com,manager"#; let ds = CsvDataSource::from_string(user_csv).unwrap(); + let base_url = server.uri(); let scenario = Scenario { name: "User Pool Test".to_string(), @@ -248,6 +268,7 @@ dave,dave012,dave@company.com,manager"#; method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -261,6 +282,7 @@ dave,dave012,dave@company.com,manager"#; method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -274,7 +296,7 @@ dave,dave012,dave@company.com,manager"#; // Simulate 8 virtual users (2 full cycles through 4 users) for i in 0..8 { let client = create_test_client(); - let executor = ScenarioExecutor::new(BASE_URL.to_string(), client); + let executor = ScenarioExecutor::new(base_url.clone(), client); let mut context = ScenarioContext::new(); let row = ds.next_row().unwrap(); diff --git a/tests/error_categorization_tests.rs b/tests/error_categorization_tests.rs index 2265e52..cbdae13 100644 --- a/tests/error_categorization_tests.rs +++ b/tests/error_categorization_tests.rs @@ -178,6 +178,7 @@ async fn test_404_error_categorization() { method: "GET".to_string(), path: "/this-endpoint-does-not-exist-12345".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -217,6 +218,7 @@ async fn test_timeout_error_categorization() { method: "GET".to_string(), path: "/health".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -258,6 +260,7 @@ async fn test_network_error_categorization() { method: "GET".to_string(), path: "/health".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -299,6 +302,7 @@ async fn test_mixed_error_types_in_scenario() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -312,6 +316,7 @@ async fn test_mixed_error_types_in_scenario() { method: "GET".to_string(), path: "/status/404".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], diff --git a/tests/http_methods_tests.rs b/tests/http_methods_tests.rs index cde6a13..8e904c9 100644 --- a/tests/http_methods_tests.rs +++ b/tests/http_methods_tests.rs @@ -29,6 +29,7 @@ async fn test_get_request() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -68,6 +69,7 @@ async fn test_post_request() { method: "POST".to_string(), path: "/post".to_string(), body: Some(r#"{"test": "data"}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -111,6 +113,7 @@ async fn test_put_request() { method: "PUT".to_string(), path: "/put".to_string(), body: Some(r#"{"update": "data"}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -152,6 +155,7 @@ async fn test_patch_request() { method: "PATCH".to_string(), path: "/patch".to_string(), body: Some(r#"{"patch": "data"}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -193,6 +197,7 @@ async fn test_delete_request() { method: "DELETE".to_string(), path: "/delete".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -230,6 +235,7 @@ async fn test_head_request() { method: "HEAD".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -268,6 +274,7 @@ async fn test_options_request() { method: "OPTIONS".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -296,6 +303,32 @@ async fn test_options_request() { #[tokio::test] async fn test_mixed_methods_scenario() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/get")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/post")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/put")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .and(path("/get")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + let scenario = Scenario { name: "Mixed HTTP Methods".to_string(), weight: 1.0, @@ -306,6 +339,7 @@ async fn test_mixed_methods_scenario() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -319,6 +353,7 @@ async fn test_mixed_methods_scenario() { method: "POST".to_string(), path: "/post".to_string(), body: Some(r#"{"action": "check"}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -336,6 +371,7 @@ async fn test_mixed_methods_scenario() { method: "PUT".to_string(), path: "/put".to_string(), body: Some(r#"{"action": "update"}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -353,6 +389,7 @@ async fn test_mixed_methods_scenario() { method: "HEAD".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -364,20 +401,16 @@ async fn test_mixed_methods_scenario() { }; let client = create_test_client(); - let executor = ScenarioExecutor::new(BASE_URL.to_string(), client); + let executor = ScenarioExecutor::new(server.uri(), client); let mut context = ScenarioContext::new(); let result = executor .execute(&scenario, &mut context, &mut SessionStore::new()) .await; - // All steps should execute (some may fail depending on API implementation) - assert!(result.steps.len() >= 2, "Should execute multiple steps"); + assert_eq!(result.steps.len(), 4, "All 4 steps should execute"); assert!(result.steps[0].success, "GET should succeed"); - assert!( - result.steps[3].success || result.steps.len() == 4, - "HEAD should execute" - ); + assert!(result.steps[3].success, "HEAD should succeed"); println!("✅ Mixed methods scenario works"); println!(" Steps executed: {}", result.steps.len()); @@ -393,7 +426,23 @@ async fn test_mixed_methods_scenario() { #[tokio::test] async fn test_case_insensitive_methods() { - // Test that methods are case-insensitive + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/get")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/post")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + // Test that methods are case-insensitive (executor normalises to uppercase before sending) let test_cases: Vec<(&str, &str)> = vec![ ("get", "/get"), ("Get", "/get"), @@ -403,16 +452,17 @@ async fn test_case_insensitive_methods() { ("POST", "/post"), ]; - for (method, path) in test_cases { + for (m, p) in test_cases { let scenario = Scenario { - name: format!("Case Test: {}", method), + name: format!("Case Test: {}", m), weight: 1.0, steps: vec![Step { - name: format!("{} request", method), + name: format!("{} request", m), request: RequestConfig { - method: method.to_string(), - path: path.to_string(), + method: m.to_string(), + path: p.to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -423,14 +473,14 @@ async fn test_case_insensitive_methods() { }; let client = create_test_client(); - let executor = ScenarioExecutor::new(BASE_URL.to_string(), client); + let executor = ScenarioExecutor::new(server.uri(), client); let mut context = ScenarioContext::new(); let result = executor .execute(&scenario, &mut context, &mut SessionStore::new()) .await; - assert!(result.success, "{} should work (case-insensitive)", method); + assert!(result.success, "{} should work (case-insensitive)", m); } println!("✅ HTTP methods are case-insensitive"); @@ -449,6 +499,7 @@ async fn test_rest_crud_flow() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -462,6 +513,7 @@ async fn test_rest_crud_flow() { method: "POST".to_string(), path: "/post".to_string(), body: Some(r#"{"name": "Test Item", "price": 99.99}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -481,6 +533,7 @@ async fn test_rest_crud_flow() { body: Some( r#"{"name": "Updated Item", "price": 149.99, "stock": 10}"#.to_string(), ), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -498,6 +551,7 @@ async fn test_rest_crud_flow() { method: "PATCH".to_string(), path: "/patch".to_string(), body: Some(r#"{"price": 129.99}"#.to_string()), + body_size: None, headers: { let mut h = HashMap::new(); h.insert("Content-Type".to_string(), "application/json".to_string()); @@ -515,6 +569,7 @@ async fn test_rest_crud_flow() { method: "HEAD".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -528,6 +583,7 @@ async fn test_rest_crud_flow() { method: "DELETE".to_string(), path: "/delete".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -568,6 +624,7 @@ async fn test_options_cors_preflight() { method: "OPTIONS".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: { let mut h = HashMap::new(); h.insert( diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 1fce8cf..acba2d2 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -21,18 +21,18 @@ fn init_metrics() { } fn get_total_requests() -> u64 { - REQUEST_TOTAL.with_label_values(&["local"]).get() + REQUEST_TOTAL.with_label_values(&["local", ""]).get() } fn get_status_code_count(code: &str) -> u64 { REQUEST_STATUS_CODES - .with_label_values(&[code, "local"]) + .with_label_values(&[code, "local", ""]) .get() } fn get_duration_count() -> u64 { REQUEST_DURATION_SECONDS - .with_label_values(&["local"]) + .with_label_values(&["local", ""]) .get_sample_count() } @@ -64,6 +64,7 @@ async fn worker_sends_get_requests() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -105,6 +106,7 @@ async fn worker_sends_post_requests() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -142,6 +144,7 @@ async fn worker_sends_json_post_body() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -178,6 +181,7 @@ async fn worker_tracks_200_status_codes() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -218,6 +222,7 @@ async fn worker_tracks_404_status_codes() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -258,6 +263,7 @@ async fn worker_tracks_500_status_codes() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -300,6 +306,7 @@ async fn worker_records_request_duration() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -340,6 +347,7 @@ async fn concurrent_requests_returns_to_zero_after_worker_finishes() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -347,7 +355,7 @@ async fn concurrent_requests_returns_to_zero_after_worker_finishes() { run_worker(client, config, Instant::now()).await; // After worker finishes, concurrent requests gauge should not be negative - let gauge = CONCURRENT_REQUESTS.with_label_values(&["local"]).get(); + let gauge = CONCURRENT_REQUESTS.with_label_values(&["local", ""]).get(); assert!( gauge >= 0.0, "concurrent requests gauge should not be negative, got {}", @@ -376,6 +384,7 @@ async fn worker_handles_connection_error_gracefully() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -420,6 +429,7 @@ async fn worker_respects_rps_rate_limit() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -461,6 +471,7 @@ async fn worker_stops_after_test_duration() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; @@ -509,6 +520,7 @@ async fn worker_handles_slow_responses() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), stop_rx: tokio::sync::watch::channel(false).1, }; diff --git a/tests/per_scenario_throughput_tests.rs b/tests/per_scenario_throughput_tests.rs index 26a3b2e..62a8953 100644 --- a/tests/per_scenario_throughput_tests.rs +++ b/tests/per_scenario_throughput_tests.rs @@ -165,6 +165,7 @@ async fn test_scenario_throughput_tracking() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -210,6 +211,7 @@ async fn test_multiple_scenarios_different_throughput() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -229,6 +231,7 @@ async fn test_multiple_scenarios_different_throughput() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -242,6 +245,7 @@ async fn test_multiple_scenarios_different_throughput() { method: "GET".to_string(), path: "/delay/1".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], diff --git a/tests/percentile_tracking_tests.rs b/tests/percentile_tracking_tests.rs index e999e48..6f3842f 100644 --- a/tests/percentile_tracking_tests.rs +++ b/tests/percentile_tracking_tests.rs @@ -230,6 +230,7 @@ async fn test_scenario_percentile_tracking() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -243,6 +244,7 @@ async fn test_scenario_percentile_tracking() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], diff --git a/tests/scenario_integration_tests.rs b/tests/scenario_integration_tests.rs index 8a1682c..abc7312 100644 --- a/tests/scenario_integration_tests.rs +++ b/tests/scenario_integration_tests.rs @@ -35,6 +35,7 @@ async fn test_health_check_scenario() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -69,6 +70,7 @@ async fn test_product_browsing_scenario() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -82,6 +84,7 @@ async fn test_product_browsing_scenario() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -127,6 +130,7 @@ async fn test_variable_substitution() { method: "GET".to_string(), path: "/get?product=${product_id}".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -176,6 +180,7 @@ async fn test_multi_step_with_delays() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -189,6 +194,7 @@ async fn test_multi_step_with_delays() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -202,6 +208,7 @@ async fn test_multi_step_with_delays() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -245,6 +252,7 @@ async fn test_scenario_failure_handling() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -258,6 +266,7 @@ async fn test_scenario_failure_handling() { method: "GET".to_string(), path: "/status/404".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -271,6 +280,7 @@ async fn test_scenario_failure_handling() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -319,6 +329,7 @@ async fn test_timestamp_variable() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: { let mut headers = HashMap::new(); // Test timestamp in headers @@ -364,6 +375,7 @@ async fn test_post_request_with_json_body() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -405,6 +417,7 @@ async fn test_scenario_context_isolation() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -439,3 +452,56 @@ async fn test_scenario_context_isolation() { assert_eq!(context1.get_variable("test"), Some(&"value1".to_string())); assert_eq!(context2.get_variable("test"), Some(&"value2".to_string())); } + +/// Verify that `body_size` sends a synthetic body of exactly the requested size. +#[tokio::test] +async fn test_body_size_sends_correct_content_length() { + let server = MockServer::start().await; + let base_url = server.uri(); + + Mock::given(method("POST")) + .and(path("/upload")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let scenario = Scenario { + name: "body_size test".to_string(), + weight: 1.0, + steps: vec![Step { + name: "POST 512B".to_string(), + request: RequestConfig { + method: "POST".to_string(), + path: "/upload".to_string(), + body: None, + body_size: Some(512), + headers: HashMap::new(), + }, + extractions: vec![], + assertions: vec![Assertion::StatusCode(200)], + cache: None, + think_time: None, + }], + }; + + let client = create_test_client(); + let executor = ScenarioExecutor::new(base_url, client); + let mut context = ScenarioContext::new(); + + let result = executor + .execute(&scenario, &mut context, &mut SessionStore::new()) + .await; + + assert!(result.success, "body_size POST should succeed"); + assert_eq!(result.steps.len(), 1); + assert!(result.steps[0].success); + + // Verify wiremock received exactly one request with a 512-byte body + let requests = server.received_requests().await.unwrap(); + assert_eq!(requests.len(), 1); + assert_eq!( + requests[0].body.len(), + 512, + "body should be exactly 512 bytes" + ); +} diff --git a/tests/scenario_worker_tests.rs b/tests/scenario_worker_tests.rs index 49c6b84..cd795e7 100644 --- a/tests/scenario_worker_tests.rs +++ b/tests/scenario_worker_tests.rs @@ -21,6 +21,7 @@ async fn test_scenario_worker_respects_duration() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -40,6 +41,7 @@ async fn test_scenario_worker_respects_duration() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), }; let client = reqwest::Client::new(); @@ -69,6 +71,7 @@ async fn test_scenario_worker_constant_load() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -90,6 +93,7 @@ async fn test_scenario_worker_constant_load() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), }; let client = reqwest::Client::new(); @@ -113,6 +117,7 @@ async fn test_scenario_worker_with_think_time() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -126,6 +131,7 @@ async fn test_scenario_worker_with_think_time() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -146,6 +152,7 @@ async fn test_scenario_worker_with_think_time() { percentile_tracking_enabled: true, percentile_sampling_rate: 100, region: "local".to_string(), + tenant: String::new(), }; let client = reqwest::Client::new(); diff --git a/tests/think_time_tests.rs b/tests/think_time_tests.rs index 0401b5f..dab1c4a 100644 --- a/tests/think_time_tests.rs +++ b/tests/think_time_tests.rs @@ -49,6 +49,7 @@ async fn test_fixed_think_time() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -62,6 +63,7 @@ async fn test_fixed_think_time() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -133,6 +135,7 @@ async fn test_random_think_time() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -149,6 +152,7 @@ async fn test_random_think_time() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -213,6 +217,7 @@ async fn test_multiple_think_times() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -226,6 +231,7 @@ async fn test_multiple_think_times() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -239,6 +245,7 @@ async fn test_multiple_think_times() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -304,6 +311,7 @@ async fn test_no_think_time() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -317,6 +325,7 @@ async fn test_no_think_time() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -367,6 +376,7 @@ async fn test_realistic_user_behavior() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -383,6 +393,7 @@ async fn test_realistic_user_behavior() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -399,6 +410,7 @@ async fn test_realistic_user_behavior() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], diff --git a/tests/variable_extraction_tests.rs b/tests/variable_extraction_tests.rs index 16e9ee9..49d0cf4 100644 --- a/tests/variable_extraction_tests.rs +++ b/tests/variable_extraction_tests.rs @@ -31,6 +31,7 @@ async fn test_jsonpath_extraction_from_products() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![ @@ -86,6 +87,7 @@ async fn test_extraction_and_reuse_in_next_step() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![VariableExtraction { @@ -102,6 +104,7 @@ async fn test_extraction_and_reuse_in_next_step() { method: "GET".to_string(), path: "/get?origin=${origin_ip}".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![], @@ -148,6 +151,7 @@ async fn test_header_extraction() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![VariableExtraction { @@ -195,6 +199,7 @@ async fn test_multiple_extractions_in_single_step() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![ @@ -260,6 +265,7 @@ async fn test_shopping_flow_with_extraction() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![VariableExtraction { @@ -282,6 +288,7 @@ async fn test_shopping_flow_with_extraction() { }"# .to_string(), ), + body_size: None, headers: { let mut headers = HashMap::new(); headers.insert("Content-Type".to_string(), "application/json".to_string()); @@ -302,6 +309,7 @@ async fn test_shopping_flow_with_extraction() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![VariableExtraction { @@ -351,6 +359,7 @@ async fn test_extraction_failure_doesnt_stop_scenario() { method: "GET".to_string(), path: "/json".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![ @@ -373,6 +382,7 @@ async fn test_extraction_failure_doesnt_stop_scenario() { method: "GET".to_string(), path: "/get".to_string(), body: None, + body_size: None, headers: HashMap::new(), }, extractions: vec![],