diff --git a/Dockerfile b/Dockerfile index d8badb9f1..17b66ff88 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,90 +1,15 @@ -# Update the rust version in-sync with the version in rust-toolchain.toml - -# Stage 0: Planner - Extract dependency metadata -FROM docker.io/rust:1.90.0-slim-bookworm AS planner -WORKDIR /app -RUN cargo install cargo-chef --version 0.1.73 -COPY . . -RUN cargo chef prepare --recipe-path recipe.json - -# Stage 1: Cacher - Build dependencies only -FROM docker.io/rust:1.90.0-slim-bookworm AS cacher -ARG SCCACHE_BUCKET -ARG SCCACHE_REGION -ARG AWS_ACCESS_KEY_ID -ARG AWS_SECRET_ACCESS_KEY -ARG AWS_SESSION_TOKEN -ENV CARGO_INCREMENTAL=0 +# Stage 0: Build — install dependencies into a venv +FROM docker.io/python:3.12-slim-bookworm AS builder WORKDIR /app -RUN apt-get update && apt-get install -y \ - pkg-config=1.8.1-1 \ - libssl-dev=3.0.18-1~deb12u2 \ - protobuf-compiler=3.21.12-3 \ - fuse3=3.14.0-4 \ - libfuse3-dev=3.14.0-4 \ - curl \ - && rm -rf /var/lib/apt/lists/* -# Download pre-built sccache binary -RUN case "$(uname -m)" in \ - x86_64) ARCH=x86_64-unknown-linux-musl ;; \ - aarch64) ARCH=aarch64-unknown-linux-musl ;; \ - *) echo "Unsupported architecture" && exit 1 ;; \ - esac && \ - curl -L https://github.com/mozilla/sccache/releases/download/v0.8.2/sccache-v0.8.2-${ARCH}.tar.gz | tar xz && \ - mv sccache-v0.8.2-${ARCH}/sccache /usr/local/cargo/bin/ && \ - rm -rf sccache-v0.8.2-${ARCH} -RUN cargo install cargo-chef --version 0.1.73 -COPY --from=planner /app/recipe.json recipe.json -# This layer is cached until Cargo.toml/Cargo.lock change -# Use BuildKit secrets to pass AWS credentials securely (not exposed in image metadata) -RUN --mount=type=secret,id=aws_access_key_id \ - --mount=type=secret,id=aws_secret_access_key \ - --mount=type=secret,id=aws_session_token \ - export AWS_ACCESS_KEY_ID=$(cat /run/secrets/aws_access_key_id) && \ - export AWS_SECRET_ACCESS_KEY=$(cat /run/secrets/aws_secret_access_key) && \ - export AWS_SESSION_TOKEN=$(cat /run/secrets/aws_session_token) && \ - if [ -n "${SCCACHE_BUCKET:-}" ]; then export RUSTC_WRAPPER=sccache; fi && \ - cargo chef cook --release --locked --features logrotate_fs --recipe-path recipe.json +RUN pip install --upgrade pip +COPY lading_py/ lading_py/ +RUN pip install --prefix=/install lading_py/ -# Stage 2: Builder - Build source code -FROM docker.io/rust:1.90.0-slim-bookworm AS builder -ARG SCCACHE_BUCKET -ARG SCCACHE_REGION -ENV CARGO_INCREMENTAL=0 -ENV SCCACHE_BUCKET=${SCCACHE_BUCKET} -ENV SCCACHE_REGION=${SCCACHE_REGION} -WORKDIR /app -RUN apt-get update && apt-get install -y \ - pkg-config=1.8.1-1 \ - libssl-dev=3.0.18-1~deb12u2 \ - protobuf-compiler=3.21.12-3 \ - fuse3=3.14.0-4 \ - libfuse3-dev=3.14.0-4 \ - && rm -rf /var/lib/apt/lists/* -# Copy cached dependencies and sccache from cacher -COPY --from=cacher /app/target target -COPY --from=cacher /usr/local/cargo /usr/local/cargo -# Copy source code (frequently changes) -COPY . . -# Build binary - reuses cached dependencies + sccache -# Use BuildKit secrets to pass AWS credentials securely (not exposed in image metadata) -RUN --mount=type=secret,id=aws_access_key_id \ - --mount=type=secret,id=aws_secret_access_key \ - --mount=type=secret,id=aws_session_token \ - export AWS_ACCESS_KEY_ID=$(cat /run/secrets/aws_access_key_id) && \ - export AWS_SECRET_ACCESS_KEY=$(cat /run/secrets/aws_secret_access_key) && \ - export AWS_SESSION_TOKEN=$(cat /run/secrets/aws_session_token) && \ - if [ -n "${SCCACHE_BUCKET:-}" ]; then export RUSTC_WRAPPER=sccache; fi && \ - cargo build --release --locked --bin lading --features logrotate_fs - -# Stage 3: Runtime -FROM docker.io/debian:bookworm-20241202-slim -RUN apt-get update && apt-get install -y \ - libfuse3-dev=3.14.0-4 \ - fuse3=3.14.0-4 \ - && rm -rf /var/lib/apt/lists/* -COPY --from=builder /app/target/release/lading /usr/bin/lading +# Stage 1: Runtime +FROM docker.io/python:3.12-slim-bookworm +COPY --from=builder /install /usr/local # Smoke test -RUN ["/usr/bin/lading", "--help"] -ENTRYPOINT ["/usr/bin/lading"] +RUN lading-py --help + +ENTRYPOINT ["lading-py"] diff --git a/lading/src/bin/lading.rs b/lading/src/bin/lading.rs index 6161fac55..983103215 100644 --- a/lading/src/bin/lading.rs +++ b/lading/src/bin/lading.rs @@ -1,3 +1,5 @@ +#![allow(unused_imports)] +#![allow(dead_code)] //! Main lading binary for load testing. use std::{ @@ -730,6 +732,9 @@ fn init_tracing(json_output: bool) { } fn main() -> Result<(), Error> { + panic!("Rust is forbidden."); + + /* // Two-parser fallback logic until CliFlatLegacy is removed let (json_output, args) = match CliWithSubcommands::try_parse() { Ok(cli) => match cli.command { @@ -799,6 +804,7 @@ fn main() -> Result<(), Error> { runtime.shutdown_timeout(max_shutdown_delay); info!("Bye. :)"); res + */ } #[cfg(test)] diff --git a/lading_py/README.md b/lading_py/README.md new file mode 100644 index 000000000..ef3397136 --- /dev/null +++ b/lading_py/README.md @@ -0,0 +1,254 @@ +# lading-py + +A Python port of [lading](https://github.com/datadog/lading) focused on DogStatsD +load generation. Uses the [dogstatsd-py](https://github.com/DataDog/datadogpy) +library for all metric emission, making it suitable for testing the client library +itself under realistic load. + +All other lading capabilities are preserved: Prometheus and expvar telemetry +collection from a running Datadog Agent, JSONL/Parquet capture output, and a +passive Prometheus exporter for real-time scraping. + +## Requirements + +- Python 3.10+ +- A Unix domain socket to send DogStatsD traffic to (typically the Datadog Agent's + `/tmp/dsd.socket` or `DD_DOGSTATSD_SOCKET`) + +## Installation + +```bash +pip install -e /path/to/lading_py +``` + +Or from the directory: + +```bash +cd lading_py +pip install -e . +``` + +This installs the `lading-py` command. + +## Configuration + +lading-py uses the same YAML config format as the Rust lading binary. A minimal +config that sends DogStatsD metrics and writes a JSONL capture file: + +```yaml +generator: + - unix_datagram: + seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, + 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + path: "/tmp/dsd.socket" + bytes_per_second: "1 MiB" + parallel_connections: 1 + variant: + dogstatsd: + contexts: + inclusive: + min: 50 + max: 50 + tags_per_msg: + inclusive: + min: 3 + max: 3 + kind_weights: + metric: 90 + event: 5 + service_check: 5 + metric_weights: + count: 1 + gauge: 1 + distribution: 3 + timer: 1 + set: 0 + histogram: 0 + metric_names: + - myapp.requests{{0-9}} + tag_names: + - env + - service + - version + tag_values: + - prod{{0-2}} + +telemetry: + path: "/tmp/lading-output.jsonl" + +warmup_duration_secs: 5 +experiment_duration_secs: 60 +``` + +### Config reference + +#### `generator[].unix_datagram` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `seed` | list[int] (32 bytes) | required | RNG seed for deterministic payload generation | +| `path` | string | required | Unix domain socket path | +| `bytes_per_second` | string | `"1 MiB"` | Rate limit. Accepts human-readable sizes: `"500 KiB"`, `"4 MiB"`, `"1 GiB"` | +| `parallel_connections` | int | `1` | Number of concurrent sender threads | +| `variant.dogstatsd` | object | | DogStatsD payload config (see below) | + +#### `variant.dogstatsd` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `contexts` | ConfRange | `{inclusive: {min: 50, max: 50}}` | Number of unique metric contexts (name + tag set) to pre-generate | +| `tags_per_msg` | ConfRange | `{inclusive: {min: 3, max: 3}}` | Tags per metric | +| `multivalue_count` | ConfRange | `{inclusive: {min: 2, max: 32}}` | Messages per batch when multi-value packing fires | +| `multivalue_pack_probability` | float | `0.08` | Probability of packing multiple metrics into one datagram | +| `kind_weights` | object | `{metric: 90, event: 0, service_check: 0}` | Relative weight of each DogStatsD message kind | +| `metric_weights` | object | `{distribution: 5, ...rest 0}` | Relative weight of each metric type | +| `metric_names` | list[string] | `["metric{{0-9}}"]` | Metric name templates. `{{0-9}}` expands to 10 variants | +| `tag_names` | list[string] | `["tag1","tag2","tag3"]` | Tag name templates | +| `tag_values` | list[string] | `["value{{0-9}}"]` | Tag value templates | +| `sampling_range` | ConfRange | `{inclusive: {min: 0.1, max: 1.0}}` | Range for sample rate values | +| `sampling_probability` | float | `0.5` | Probability that a metric includes a sample rate | +| `length_prefix_framed` | bool | `false` | **Unsupported** — lading-py will reject configs with this set to `true` | + +#### `telemetry` + +Short form (JSONL output): +```yaml +telemetry: + path: "/tmp/output.jsonl" +``` + +Long form with format control: +```yaml +telemetry: + log: + path: "/tmp/output" + format: + jsonl: + flush_seconds: 60 + # or: parquet: {flush_seconds: 60} + # or: multi: {flush_seconds: 60} # writes both .jsonl and .parquet +``` + +Prometheus exporter (passive scrape endpoint): +```yaml +telemetry: + prometheus: + addr: "0.0.0.0:9000" +``` + +#### `target_metrics` + +Collect telemetry from a running Datadog Agent: + +```yaml +target_metrics: + - prometheus: + uri: "http://127.0.0.1:5000/telemetry" + tags: + sub_agent: "core" + - expvar: + uri: "http://127.0.0.1:5012/debug/vars" + vars: + - "/forwarder/Transactions/Success" + - "/uptime" + tags: + sub_agent: "trace" + +sample_period_milliseconds: 1000 +``` + +#### `blackhole` + +Absorb HTTP traffic from the target (e.g. agent intake forwarder in test): + +```yaml +blackhole: + - http: + binding_addr: "127.0.0.1:9091" +``` + +#### Lifecycle + +```yaml +warmup_duration_secs: 10 # wait before starting emission +experiment_duration_secs: 60 # how long to run after warmup +``` + +## Running + +```bash +lading-py --config lading.yaml +``` + +The process runs for `warmup_duration_secs + experiment_duration_secs` seconds, +then exits. The capture file (if configured) is finalized on exit. + +## Output format + +### JSONL + +One JSON object per line, one line per metric per flush interval: + +```json +{"run_id": "550e8400-...", "time": 1717959420000, "fetch_index": 0, "metric_name": "bytes_written", "metric_kind": "counter", "value": 1048576.0, "labels": {"generator": "dogstatsd"}} +{"run_id": "550e8400-...", "time": 1717959420000, "fetch_index": 0, "metric_name": "cpu_usage", "metric_kind": "gauge", "value": 0.73, "labels": {"sub_agent": "core"}} +``` + +Fields: + +| Field | Type | Description | +|-------|------|-------------| +| `run_id` | UUID string | Unique identifier for this lading-py run | +| `time` | int | Milliseconds since Unix epoch | +| `fetch_index` | int | Flush counter (increments each flush interval) | +| `metric_name` | string | Metric name | +| `metric_kind` | string | `"counter"`, `"gauge"`, or `"histogram"` | +| `value` | float | Counter delta, gauge value, or histogram mean | +| `labels` | object | Key-value label pairs | +| `value_histogram` | string (base64) | Protobuf DDSketch bytes (omitted if empty) | + +### Parquet + +Same schema as JSONL, written as columnar Parquet. Suitable for analysis with +pandas, DuckDB, or similar: + +```python +import pyarrow.parquet as pq +table = pq.read_table("/tmp/output.parquet") +df = table.to_pandas() +``` + +## Docker + +```bash +docker build -t lading-py /path/to/lading +docker run --rm \ + -v /tmp/dsd.socket:/tmp/dsd.socket \ + -v /path/to/lading.yaml:/etc/lading/lading.yaml \ + -v /tmp/output:/tmp/output \ + lading-py --config /etc/lading/lading.yaml +``` + +## Differences from Rust lading + +| Feature | Rust lading | lading-py | +|---------|------------|-----------| +| Emission library | Raw Unix datagram socket | `dogstatsd-py` (`datadog` package) | +| Generators | TCP, UDP, HTTP, Unix stream, Fluent, OTLP, DogStatsD | DogStatsD only | +| `length_prefix_framed` | Supported | **Not supported** (rejected at config load) | +| RNG | ChaCha (SeededStdRng) | Mersenne Twister (`random.Random`) | +| Reproducibility | Bit-exact across runs with same seed | Statistically equivalent; not bit-exact | +| Histogram output | Full DDSketch protobuf | Mean value only; `value_histogram` always empty | + +## Development + +```bash +pip install -e ".[dev]" +pytest tests/ +``` + +Run just the unit tests (fast, no socket needed): + +```bash +pytest tests/ -k "not smoke" +``` diff --git a/lading_py/lading_py/__init__.py b/lading_py/lading_py/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..53ac15141 Binary files /dev/null and b/lading_py/lading_py/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/__pycache__/config.cpython-310.pyc b/lading_py/lading_py/__pycache__/config.cpython-310.pyc new file mode 100644 index 000000000..43813f88f Binary files /dev/null and b/lading_py/lading_py/__pycache__/config.cpython-310.pyc differ diff --git a/lading_py/lading_py/__pycache__/main.cpython-310.pyc b/lading_py/lading_py/__pycache__/main.cpython-310.pyc new file mode 100644 index 000000000..c10bae06d Binary files /dev/null and b/lading_py/lading_py/__pycache__/main.cpython-310.pyc differ diff --git a/lading_py/lading_py/__pycache__/signal.cpython-310.pyc b/lading_py/lading_py/__pycache__/signal.cpython-310.pyc new file mode 100644 index 000000000..f8a028aa7 Binary files /dev/null and b/lading_py/lading_py/__pycache__/signal.cpython-310.pyc differ diff --git a/lading_py/lading_py/blackhole/__init__.py b/lading_py/lading_py/blackhole/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/blackhole/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/blackhole/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..dd4aece82 Binary files /dev/null and b/lading_py/lading_py/blackhole/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/blackhole/__pycache__/http.cpython-310.pyc b/lading_py/lading_py/blackhole/__pycache__/http.cpython-310.pyc new file mode 100644 index 000000000..bd3277e16 Binary files /dev/null and b/lading_py/lading_py/blackhole/__pycache__/http.cpython-310.pyc differ diff --git a/lading_py/lading_py/blackhole/http.py b/lading_py/lading_py/blackhole/http.py new file mode 100644 index 000000000..95c088285 --- /dev/null +++ b/lading_py/lading_py/blackhole/http.py @@ -0,0 +1,30 @@ +"""HTTP blackhole: accepts all requests, discards bodies, counts bytes received.""" +import asyncio +from aiohttp import web +from lading_py.config import HttpBlackholeConfig +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry + + +class HttpBlackhole: + def __init__(self, cfg: HttpBlackholeConfig, registry: Registry, bh_id: str = "blackhole"): + self._cfg = cfg + self._registry = registry + self._labels = {"blackhole": bh_id} + + async def _handler(self, request: web.Request) -> web.Response: + body = await request.read() + self._registry.increment("blackhole.bytes_received", len(body), self._labels) + self._registry.increment("blackhole.requests_received", 1, self._labels) + return web.Response(status=200) + + async def run(self, signals: Signals) -> None: + host, port = self._cfg.binding_addr.rsplit(":", 1) + app = web.Application() + app.router.add_route("*", "/{path_info:.*}", self._handler) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, host, int(port)) + await site.start() + await signals.shutdown.wait() + await runner.cleanup() diff --git a/lading_py/lading_py/capture/__init__.py b/lading_py/lading_py/capture/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/capture/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/capture/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..f903a7ca6 Binary files /dev/null and b/lading_py/lading_py/capture/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/capture/__pycache__/accumulator.cpython-310.pyc b/lading_py/lading_py/capture/__pycache__/accumulator.cpython-310.pyc new file mode 100644 index 000000000..7d3dcb651 Binary files /dev/null and b/lading_py/lading_py/capture/__pycache__/accumulator.cpython-310.pyc differ diff --git a/lading_py/lading_py/capture/__pycache__/jsonl_writer.cpython-310.pyc b/lading_py/lading_py/capture/__pycache__/jsonl_writer.cpython-310.pyc new file mode 100644 index 000000000..3df85c269 Binary files /dev/null and b/lading_py/lading_py/capture/__pycache__/jsonl_writer.cpython-310.pyc differ diff --git a/lading_py/lading_py/capture/__pycache__/line.cpython-310.pyc b/lading_py/lading_py/capture/__pycache__/line.cpython-310.pyc new file mode 100644 index 000000000..89c8401ff Binary files /dev/null and b/lading_py/lading_py/capture/__pycache__/line.cpython-310.pyc differ diff --git a/lading_py/lading_py/capture/__pycache__/parquet_writer.cpython-310.pyc b/lading_py/lading_py/capture/__pycache__/parquet_writer.cpython-310.pyc new file mode 100644 index 000000000..88b9039ca Binary files /dev/null and b/lading_py/lading_py/capture/__pycache__/parquet_writer.cpython-310.pyc differ diff --git a/lading_py/lading_py/capture/accumulator.py b/lading_py/lading_py/capture/accumulator.py new file mode 100644 index 000000000..f63bed8ec --- /dev/null +++ b/lading_py/lading_py/capture/accumulator.py @@ -0,0 +1,80 @@ +""" +Periodically snapshots the registry and emits Lines to configured writer(s). +Counters are differenced (delta per tick); gauges and histograms pass through. +""" +import asyncio +import time +from lading_py.capture.line import Line, MetricKind +from lading_py.telemetry.registry import Registry + + +def _parse_key(k: tuple) -> tuple[str, dict]: + name, label_pairs = k + return name, dict(label_pairs) + + +class Accumulator: + def __init__(self, run_id: str, registry: Registry, writers: list, flush_seconds: int = 60): + self._run_id = run_id + self._registry = registry + self._writers = writers + self._flush_seconds = flush_seconds + self._prev_counters: dict[tuple, int] = {} + self._fetch_index = 0 + + async def run(self, signals) -> None: + while not signals.shutdown.is_set(): + await asyncio.sleep(self._flush_seconds) + self._flush() + # Final flush on shutdown + self._flush() + + def _flush(self) -> None: + now_ms = int(time.time() * 1000) + counters, gauges, histograms = self._registry.snapshot() + lines: list[Line] = [] + + for k, total in counters.items(): + delta = total - self._prev_counters.get(k, 0) + self._prev_counters[k] = total + name, labels = _parse_key(k) + lines.append(Line( + run_id=self._run_id, + time=now_ms, + fetch_index=self._fetch_index, + metric_name=name, + metric_kind=MetricKind.Counter, + value=float(delta), + labels=labels, + )) + + for k, val in gauges.items(): + name, labels = _parse_key(k) + lines.append(Line( + run_id=self._run_id, + time=now_ms, + fetch_index=self._fetch_index, + metric_name=name, + metric_kind=MetricKind.Gauge, + value=float(val), + labels=labels, + )) + + for k, samples in histograms.items(): + if not samples: + continue + name, labels = _parse_key(k) + mean = sum(samples) / len(samples) + lines.append(Line( + run_id=self._run_id, + time=now_ms, + fetch_index=self._fetch_index, + metric_name=name, + metric_kind=MetricKind.Histogram, + value=mean, + labels=labels, + )) + + self._fetch_index += 1 + for writer in self._writers: + writer.flush(lines) diff --git a/lading_py/lading_py/capture/jsonl_writer.py b/lading_py/lading_py/capture/jsonl_writer.py new file mode 100644 index 000000000..2690c33a7 --- /dev/null +++ b/lading_py/lading_py/capture/jsonl_writer.py @@ -0,0 +1,20 @@ +import json +import os +from lading_py.capture.line import Line + + +class JsonlWriter: + def __init__(self, path: str): + self._path = path + # Truncate/create on start + open(self._path, "w").close() + + def flush(self, lines: list[Line]) -> None: + if not lines: + return + with open(self._path, "a") as f: + for line in lines: + f.write(json.dumps(line.to_dict()) + "\n") + + def finalize(self) -> None: + pass # file is already flushed diff --git a/lading_py/lading_py/capture/line.py b/lading_py/lading_py/capture/line.py new file mode 100644 index 000000000..a634f72d3 --- /dev/null +++ b/lading_py/lading_py/capture/line.py @@ -0,0 +1,36 @@ +import time +from dataclasses import dataclass, field +from enum import Enum + + +class MetricKind(str, Enum): + Counter = "counter" + Gauge = "gauge" + Histogram = "histogram" + + +@dataclass +class Line: + run_id: str + time: int # milliseconds since epoch + fetch_index: int + metric_name: str + metric_kind: str # MetricKind value + value: float + labels: dict[str, str] = field(default_factory=dict) + value_histogram: bytes = b"" + + def to_dict(self) -> dict: + import base64 + d = { + "run_id": self.run_id, + "time": self.time, + "fetch_index": self.fetch_index, + "metric_name": self.metric_name, + "metric_kind": self.metric_kind, + "value": self.value, + "labels": self.labels, + } + if self.value_histogram: + d["value_histogram"] = base64.b64encode(self.value_histogram).decode() + return d diff --git a/lading_py/lading_py/capture/parquet_writer.py b/lading_py/lading_py/capture/parquet_writer.py new file mode 100644 index 000000000..090c21f09 --- /dev/null +++ b/lading_py/lading_py/capture/parquet_writer.py @@ -0,0 +1,48 @@ +import pyarrow as pa +import pyarrow.parquet as pq +from lading_py.capture.line import Line + +SCHEMA = pa.schema([ + ("run_id", pa.string()), + ("time", pa.int64()), + ("fetch_index", pa.int64()), + ("metric_name", pa.string()), + ("metric_kind", pa.string()), + ("value", pa.float64()), + ("labels", pa.map_(pa.string(), pa.string())), + ("value_histogram", pa.binary()), +]) + + +class ParquetWriter: + def __init__(self, path: str): + self._path = path + self._writer: pq.ParquetWriter | None = None + + def flush(self, lines: list[Line]) -> None: + if not lines: + return + table = pa.table( + { + "run_id": [l.run_id for l in lines], + "time": pa.array([l.time for l in lines], type=pa.int64()), + "fetch_index": pa.array([l.fetch_index for l in lines], type=pa.int64()), + "metric_name": [l.metric_name for l in lines], + "metric_kind": [l.metric_kind for l in lines], + "value": pa.array([l.value for l in lines], type=pa.float64()), + "labels": pa.array( + [list(l.labels.items()) for l in lines], + type=pa.map_(pa.string(), pa.string()), + ), + "value_histogram": pa.array([l.value_histogram for l in lines], type=pa.binary()), + }, + schema=SCHEMA, + ) + if self._writer is None: + self._writer = pq.ParquetWriter(self._path, SCHEMA) + self._writer.write_table(table) + + def finalize(self) -> None: + if self._writer: + self._writer.close() + self._writer = None diff --git a/lading_py/lading_py/config.py b/lading_py/lading_py/config.py new file mode 100644 index 000000000..3d126c141 --- /dev/null +++ b/lading_py/lading_py/config.py @@ -0,0 +1,217 @@ +import re +from typing import Any +from pydantic import BaseModel, model_validator + + +def parse_bytes(s: str | int) -> int: + if isinstance(s, int): + return s + units = { + "b": 1, "kb": 1000, "mb": 1000**2, "gb": 1000**3, + "kib": 1024, "mib": 1024**2, "gib": 1024**3, + } + m = re.match(r"^(\d+(?:\.\d+)?)\s*([a-zA-Z]+)$", str(s).strip()) + if not m: + return int(s) + n, unit = float(m.group(1)), m.group(2).lower() + return int(n * units.get(unit, 1)) + + +class InclusiveRange(BaseModel): + min: float + max: float + + +class ExclusiveRange(BaseModel): + min: float + max: float + + +class ConfRange(BaseModel): + inclusive: InclusiveRange | None = None + exclusive: ExclusiveRange | None = None + + @property + def lo(self) -> float: + if self.inclusive: + return self.inclusive.min + return self.exclusive.min + 1 + + @property + def hi(self) -> float: + if self.inclusive: + return self.inclusive.max + return self.exclusive.max - 1 + + def sample(self, rng) -> float: + return rng.uniform(self.lo, self.hi) + + def sample_int(self, rng) -> int: + return rng.randint(int(self.lo), int(self.hi)) + + +class KindWeights(BaseModel): + metric: int = 90 + event: int = 0 + service_check: int = 0 + + +class MetricWeights(BaseModel): + count: int = 0 + gauge: int = 0 + timer: int = 0 + distribution: int = 5 + set: int = 0 + histogram: int = 0 + + +_DEFAULT_CONTEXTS = ConfRange(inclusive=InclusiveRange(min=50, max=50)) +_DEFAULT_TAGS_PER_MSG = ConfRange(inclusive=InclusiveRange(min=3, max=3)) +_DEFAULT_MULTIVALUE_COUNT = ConfRange(inclusive=InclusiveRange(min=2, max=32)) +_DEFAULT_SAMPLING_RANGE = ConfRange(inclusive=InclusiveRange(min=0.1, max=1.0)) + + +class DogStatsDConfig(BaseModel): + contexts: ConfRange = _DEFAULT_CONTEXTS + tags_per_msg: ConfRange = _DEFAULT_TAGS_PER_MSG + multivalue_count: ConfRange = _DEFAULT_MULTIVALUE_COUNT + multivalue_pack_probability: float = 0.08 + kind_weights: KindWeights = KindWeights() + metric_weights: MetricWeights = MetricWeights() + metric_names: list[str] = ["metric{{0-9}}"] + tag_names: list[str] = ["tag1", "tag2", "tag3"] + tag_values: list[str] = ["value{{0-9}}"] + sampling_range: ConfRange = _DEFAULT_SAMPLING_RANGE + sampling_probability: float = 0.5 + unique_tag_ratio: float = 0.11 + length_prefix_framed: bool = False + container_ids: list[str] = [] + external_data: list[str] = [] + cardinality: list[str] = [] + + @model_validator(mode="after") + def reject_length_prefix_framed(self): + if self.length_prefix_framed: + raise ValueError( + "length_prefix_framed=true is unsupported: dogstatsd-py does not " + "expose length-prefix framing. Set length_prefix_framed: false." + ) + return self + + +class UnixDatagramConfig(BaseModel): + seed: list[int] + path: str + bytes_per_second: Any = "1 MiB" + maximum_prebuild_cache_size_bytes: Any = "500 MiB" + maximum_block_size: Any = "8192 B" + parallel_connections: int = 1 + variant: dict[str, Any] = {} + + @property + def bytes_per_second_int(self) -> int: + return parse_bytes(self.bytes_per_second) + + @property + def dogstatsd(self) -> DogStatsDConfig: + raw = self.variant.get("dogstatsd", {}) + return DogStatsDConfig(**raw) + + +class GeneratorConfig(BaseModel): + id: str | None = None + unix_datagram: UnixDatagramConfig | None = None + + +class HttpBlackholeConfig(BaseModel): + binding_addr: str + + +class BlackholeConfig(BaseModel): + http: HttpBlackholeConfig | None = None + + +class PrometheusTargetConfig(BaseModel): + uri: str + tags: dict[str, str] = {} + metrics: list[str] | None = None + + +class ExpvarTargetConfig(BaseModel): + uri: str + vars: list[str] = [] + tags: dict[str, str] = {} + + +class TargetMetricsEntry(BaseModel): + prometheus: PrometheusTargetConfig | None = None + expvar: ExpvarTargetConfig | None = None + + +class TelemetryConfig(BaseModel): + # Short form: telemetry: {path: "nong"} + path: str | None = None + # Long form: telemetry: {log: {path: ..., format: ...}} + log: dict[str, Any] | None = None + prometheus: dict[str, Any] | None = None + prometheus_socket: dict[str, Any] | None = None + global_labels: dict[str, str] = {} + + @property + def output_path(self) -> str | None: + if self.path: + return self.path + if self.log: + return self.log.get("path") + return None + + @property + def format(self) -> str: + if self.log: + fmt = self.log.get("format", {}) + if isinstance(fmt, dict): + if "parquet" in fmt: + return "parquet" + if "multi" in fmt: + return "multi" + return "jsonl" + + @property + def flush_seconds(self) -> int: + if self.log: + fmt = self.log.get("format", {}) + if isinstance(fmt, dict): + for k in ("jsonl", "parquet", "multi"): + if k in fmt and isinstance(fmt[k], dict): + return fmt[k].get("flush_seconds", 60) + return 60 + + @property + def prometheus_addr(self) -> str | None: + if self.prometheus: + return self.prometheus.get("addr") + return None + + @property + def prometheus_socket_path(self) -> str | None: + if self.prometheus_socket: + return self.prometheus_socket.get("path") + return None + + +class ObserverConfig(BaseModel): + enable_smaps: bool = False + enable_smaps_rollup: bool = True + + +class RootConfig(BaseModel): + generator: list[GeneratorConfig] = [] + blackhole: list[BlackholeConfig] = [] + target_metrics: list[TargetMetricsEntry] = [] + telemetry: TelemetryConfig | None = None + observer: ObserverConfig | None = None + sample_period_milliseconds: int = 1000 + warmup_duration_secs: int = 0 + experiment_duration_secs: int = 60 + + model_config = {"extra": "allow"} diff --git a/lading_py/lading_py/generator/__init__.py b/lading_py/lading_py/generator/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/generator/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/generator/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..02717c0a9 Binary files /dev/null and b/lading_py/lading_py/generator/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/generator/__pycache__/dogstatsd.cpython-310.pyc b/lading_py/lading_py/generator/__pycache__/dogstatsd.cpython-310.pyc new file mode 100644 index 000000000..9edf0d7f8 Binary files /dev/null and b/lading_py/lading_py/generator/__pycache__/dogstatsd.cpython-310.pyc differ diff --git a/lading_py/lading_py/generator/dogstatsd.py b/lading_py/lading_py/generator/dogstatsd.py new file mode 100644 index 000000000..f46acee43 --- /dev/null +++ b/lading_py/lading_py/generator/dogstatsd.py @@ -0,0 +1,129 @@ +""" +DogStatsD generator. All emission goes through dogstatsd-py (DogStatsd client). +Multi-value batches use client.open_buffer() so dogstatsd-py packs them into +one datagram internally. +""" +import asyncio +import time +import threading +from dataclasses import dataclass + +from datadog.dogstatsd.base import DogStatsd + +from lading_py.config import UnixDatagramConfig +from lading_py.payload.dogstatsd import ( + Block, BlockCache, MetricCall, EventCall, ServiceCheckCall, + _estimate_block_bytes, +) +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry + + +# --------------------------------------------------------------------------- +# Dispatch table: metric_type → DogStatsd method +# --------------------------------------------------------------------------- + +def _send_metric(client: DogStatsd, m: MetricCall) -> None: + sr = m.sample_rate if m.sample_rate is not None else 1 + if m.metric_type == "gauge": + client.gauge(m.name, m.value, tags=m.tags, sample_rate=sr) + elif m.metric_type == "count": + client.increment(m.name, value=int(m.value), tags=m.tags, sample_rate=sr) + elif m.metric_type == "histogram": + client.histogram(m.name, m.value, tags=m.tags, sample_rate=sr) + elif m.metric_type == "distribution": + client.distribution(m.name, m.value, tags=m.tags, sample_rate=sr) + elif m.metric_type == "timing": + client.timing(m.name, m.value, tags=m.tags, sample_rate=sr) + elif m.metric_type == "set": + client.set(m.name, int(m.value), tags=m.tags, sample_rate=sr) + + +def _send_block(client: DogStatsd, block: Block) -> None: + if isinstance(block, list): + with client.open_buffer() as buf: + for m in block: + _send_metric(buf, m) + elif isinstance(block, MetricCall): + _send_metric(client, block) + elif isinstance(block, EventCall): + client.event( + block.title, block.text, + tags=block.tags, + alert_type=block.alert_type, + priority=block.priority, + ) + elif isinstance(block, ServiceCheckCall): + client.service_check( + block.name, block.status, + tags=block.tags, + message=block.message, + ) + + +# --------------------------------------------------------------------------- +# Token bucket (synchronous, for use in worker threads) +# --------------------------------------------------------------------------- + +class TokenBucket: + def __init__(self, rate: int): + self._rate = rate + self._tokens = float(rate) + self._last = time.monotonic() + self._lock = threading.Lock() + + def acquire(self, n: int) -> None: + while True: + with self._lock: + now = time.monotonic() + elapsed = now - self._last + self._tokens = min(self._rate, self._tokens + elapsed * self._rate) + self._last = now + if self._tokens >= n: + self._tokens -= n + return + wait = (n - self._tokens) / self._rate + time.sleep(wait) + + +# --------------------------------------------------------------------------- +# Generator +# --------------------------------------------------------------------------- + +class DogStatsDGenerator: + def __init__( + self, + cfg: UnixDatagramConfig, + registry: Registry, + ): + self._cfg = cfg + self._registry = registry + dsd_cfg = cfg.dogstatsd + # Pre-build block cache; cap count at 20k regardless of prebuild size config + self._cache = BlockCache(dsd_cfg, cfg.seed, max_count=20_000) + self._rate_limiter = TokenBucket(cfg.bytes_per_second_int) + self._gen_id = {"generator": "dogstatsd"} + + async def run(self, signals: Signals) -> None: + await signals.experiment_started.wait() + + async def _wrap(i: int): + await asyncio.to_thread(self._send_loop, signals) + + await asyncio.gather(*[_wrap(i) for i in range(self._cfg.parallel_connections)]) + + def _send_loop(self, signals: Signals) -> None: + client = DogStatsd(socket_path=self._cfg.path) + while not signals.shutdown_is_set(): + block = self._cache.next() + est = _estimate_block_bytes(block) + self._rate_limiter.acquire(est) + try: + _send_block(client, block) + self._registry.increment("bytes_written", est, self._gen_id) + self._registry.increment("packets_sent", 1, self._gen_id) + except Exception as exc: + self._registry.increment( + "request_failure", 1, + {**self._gen_id, "error": type(exc).__name__}, + ) diff --git a/lading_py/lading_py/main.py b/lading_py/lading_py/main.py new file mode 100644 index 000000000..2a93a23f0 --- /dev/null +++ b/lading_py/lading_py/main.py @@ -0,0 +1,349 @@ +""" +lading-py entry point. + +CLI is compatible with Rust lading: + lading-py [--config-path PATH] [--no-target] [flags...] + lading-py run [--config-path PATH] [--no-target] [flags...] + lading-py config-check [--config-path PATH] + +Config is also accepted via the LADING_CONFIG environment variable (raw YAML). + +Lifecycle: + warmup → experiment_started → experiment → shutdown → drain +""" +import argparse +import asyncio +import os +import signal +import sys +import uuid + +import yaml + +from lading_py.config import RootConfig, TelemetryConfig +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry +from lading_py.capture.accumulator import Accumulator +from lading_py.capture.jsonl_writer import JsonlWriter +from lading_py.capture.parquet_writer import ParquetWriter +from lading_py.generator.dogstatsd import DogStatsDGenerator +from lading_py.blackhole.http import HttpBlackhole +from lading_py.target_metrics.prometheus import PrometheusScraper +from lading_py.target_metrics.expvar import ExpvarPoller +from lading_py.observer.proc import ProcObserver +from lading_py.telemetry.prometheus_exporter import PrometheusExporter + +DEFAULT_CONFIG_PATH = "/etc/lading/lading.yaml" + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def _add_run_args(p: argparse.ArgumentParser) -> None: + """Attach all runtime flags to a parser (shared between flat and `run`).""" + p.add_argument( + "--config-path", + default=os.environ.get("LADING_CONFIG_PATH", DEFAULT_CONFIG_PATH), + metavar="PATH", + help=f"path to lading YAML config (default: {DEFAULT_CONFIG_PATH})", + ) + p.add_argument("--global-labels", default=None, metavar="KEY=VAL,...", + help="additional labels applied to all captures") + + # Target group — one is required but lading-py only supports --no-target + # and --target-pid; others are accepted and ignored for compat + tgt = p.add_mutually_exclusive_group(required=False) + tgt.add_argument("--no-target", action="store_true", + help="disable target measurement (default behaviour)") + tgt.add_argument("--target-pid", type=int, default=None, metavar="PID", + help="measure an externally-launched process by PID") + tgt.add_argument("--target-path", default=None, metavar="PATH", + help="(accepted for compat; target execution not supported)") + tgt.add_argument("--target-container", default=None, metavar="NAME", + help="(accepted for compat; container targeting not supported)") + + # Telemetry overrides + p.add_argument("--capture-path", default=None, metavar="PATH", + help="override telemetry output path from config") + p.add_argument("--capture-format", default=None, choices=["jsonl", "parquet", "multi"], + help="override capture format (jsonl|parquet|multi)") + p.add_argument("--capture-flush-seconds", type=int, default=None, metavar="N", + help="override capture flush interval") + p.add_argument("--capture-compression-level", type=int, default=3, metavar="N", + help="parquet compression level 1-22 (default: 3)") + p.add_argument("--capture-expiration-seconds", type=int, default=None, metavar="N", + help="(accepted for compat; not implemented)") + p.add_argument("--prometheus-addr", default=None, metavar="ADDR", + help="override prometheus exporter bind address") + p.add_argument("--prometheus-path", default=None, metavar="PATH", + help="override prometheus exporter socket path") + + # Lifecycle overrides + p.add_argument("--experiment-duration-seconds", type=int, default=None, metavar="N", + help="override experiment duration from config") + p.add_argument("--experiment-duration-infinite", action="store_true", + help="run indefinitely (until SIGTERM/SIGINT)") + p.add_argument("--warmup-duration-seconds", type=int, default=None, metavar="N", + help="override warmup duration from config") + p.add_argument("--max-shutdown-delay", type=int, default=30, metavar="N", + help="maximum seconds to wait for graceful shutdown (default: 30)") + + # Misc + p.add_argument("--disable-inspector", action="store_true", + help="(accepted for compat; inspector not implemented)") + + +def _build_parser() -> argparse.ArgumentParser: + root = argparse.ArgumentParser( + prog="lading-py", + description="lading-py: DogStatsD load generator", + ) + root.add_argument("--json-logs", action="store_true", + help="output logs in JSON format") + + subs = root.add_subparsers(dest="subcommand") + + # `run` subcommand + run_p = subs.add_parser("run", help="run lading with the specified configuration") + _add_run_args(run_p) + + # `config-check` subcommand + check_p = subs.add_parser("config-check", help="validate configuration file and exit") + check_p.add_argument( + "--config-path", + default=os.environ.get("LADING_CONFIG_PATH", DEFAULT_CONFIG_PATH), + metavar="PATH", + help=f"path to lading YAML config (default: {DEFAULT_CONFIG_PATH})", + ) + + # Legacy flat mode (no subcommand) — add run args directly to root + _add_run_args(root) + + return root + + +# --------------------------------------------------------------------------- +# Config loading +# --------------------------------------------------------------------------- + +_SINGLETON_KEYS = {"telemetry", "sample_period_milliseconds", "inspector", "observer"} +_LIST_KEYS = {"generator", "blackhole", "target_metrics"} + + +def _merge_raw_configs(base: dict, overlay: dict) -> dict: + for key, val in overlay.items(): + if key in _SINGLETON_KEYS: + if key in base: + raise ValueError(f"'{key}' defined in multiple config files") + base[key] = val + elif key in _LIST_KEYS: + base.setdefault(key, []).extend(val if isinstance(val, list) else [val]) + else: + base[key] = val + return base + + +def _load_raw_config(config_path: str) -> dict: + lading_config_env = os.environ.get("LADING_CONFIG") + if lading_config_env: + return yaml.safe_load(lading_config_env) + p = os.path.abspath(config_path) + if os.path.isdir(p): + yaml_files = sorted( + entry.path + for entry in os.scandir(p) + if entry.is_file() + and entry.name.endswith(".yaml") + and not entry.name.startswith(".") + ) + if not yaml_files: + raise ValueError(f"No .yaml config files found in directory: {p}") + merged: dict = {} + for path in yaml_files: + with open(path) as f: + partial = yaml.safe_load(f) or {} + merged = _merge_raw_configs(merged, partial) + return merged + with open(p) as f: + return yaml.safe_load(f) + + +def _apply_cli_overrides(config: RootConfig, args: argparse.Namespace) -> RootConfig: + """Return a new RootConfig with CLI flag overrides applied.""" + raw = config.model_dump() + + # Experiment / warmup duration + if getattr(args, "experiment_duration_seconds", None) is not None: + raw["experiment_duration_secs"] = args.experiment_duration_seconds + if getattr(args, "warmup_duration_seconds", None) is not None: + raw["warmup_duration_secs"] = args.warmup_duration_seconds + + # Telemetry + capture_path = getattr(args, "capture_path", None) + capture_format = getattr(args, "capture_format", None) + capture_flush = getattr(args, "capture_flush_seconds", None) + prom_addr = getattr(args, "prometheus_addr", None) + prom_path = getattr(args, "prometheus_path", None) + + if any(x is not None for x in (capture_path, capture_format, capture_flush, prom_addr, prom_path)): + tel = raw.get("telemetry") or {} + if capture_path: + tel["path"] = capture_path + if capture_format: + tel.setdefault("log", {})["format"] = {capture_format: { + "flush_seconds": capture_flush or 60 + }} + elif capture_flush and not capture_format: + tel.setdefault("log", {}).setdefault("format", {}).setdefault( + "jsonl", {})["flush_seconds"] = capture_flush + if prom_addr: + tel["prometheus"] = {"addr": prom_addr} + if prom_path: + tel["prometheus_socket"] = {"path": prom_path} + raw["telemetry"] = tel + + # Global labels + global_labels_str = getattr(args, "global_labels", None) + if global_labels_str: + pairs = {} + for token in global_labels_str.split(","): + if "=" in token: + k, _, v = token.partition("=") + pairs[k.strip()] = v.strip() + tel = raw.setdefault("telemetry", {}) + tel["global_labels"] = pairs + + return RootConfig.model_validate(raw) + + +# --------------------------------------------------------------------------- +# Run +# --------------------------------------------------------------------------- + +def _build_writers(tel: TelemetryConfig | None) -> list: + if tel is None or tel.output_path is None: + return [] + path = tel.output_path + fmt = tel.format + if fmt == "parquet": + return [ParquetWriter(path)] + elif fmt == "multi": + return [JsonlWriter(path + ".jsonl"), ParquetWriter(path + ".parquet")] + else: + return [JsonlWriter(path)] + + +async def inner_main(config: RootConfig, target_pid: int | None = None) -> None: + run_id = str(uuid.uuid4()) + signals = Signals() + registry = Registry() + + loop = asyncio.get_running_loop() + + def _on_signal(): + signals.set_shutdown() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, _on_signal) + + tasks: list[asyncio.Task] = [] + + # Telemetry output + writers = _build_writers(config.telemetry) + if writers: + acc = Accumulator( + run_id=run_id, + registry=registry, + writers=writers, + flush_seconds=(config.telemetry.flush_seconds if config.telemetry else 60), + ) + tasks.append(asyncio.create_task(acc.run(signals), name="accumulator")) + + if config.telemetry and config.telemetry.prometheus_addr: + exp = PrometheusExporter(registry, config.telemetry.prometheus_addr) + tasks.append(asyncio.create_task(exp.run(signals), name="prometheus_exporter")) + + # Generators + for i, gen_cfg in enumerate(config.generator): + if gen_cfg.unix_datagram is None: + continue + gen = DogStatsDGenerator(gen_cfg.unix_datagram, registry) + tasks.append(asyncio.create_task(gen.run(signals), name=f"generator_{i}")) + + # Blackholes + for i, bh_cfg in enumerate(config.blackhole): + if bh_cfg.http is None: + continue + bh = HttpBlackhole(bh_cfg.http, registry, bh_id=str(i)) + tasks.append(asyncio.create_task(bh.run(signals), name=f"blackhole_{i}")) + + # Target metrics + period_secs = config.sample_period_milliseconds / 1000.0 + for tm in config.target_metrics: + if tm.prometheus: + scraper = PrometheusScraper(tm.prometheus, registry, period_secs) + tasks.append(asyncio.create_task(scraper.run(signals), name="prom_scraper")) + if tm.expvar: + poller = ExpvarPoller(tm.expvar, registry, period_secs) + tasks.append(asyncio.create_task(poller.run(signals), name="expvar_poller")) + + # Observer + if config.observer and target_pid is not None: + obs = ProcObserver(config.observer, registry, period_secs) + tasks.append(asyncio.create_task(obs.run(signals, target_pid), name="observer")) + + # Lifecycle + if config.warmup_duration_secs > 0: + await asyncio.sleep(config.warmup_duration_secs) + + signals.experiment_started.set() + + if config.experiment_duration_secs > 0: + await asyncio.sleep(config.experiment_duration_secs) + else: + # infinite mode — wait for shutdown signal + await signals.shutdown.wait() + + signals.set_shutdown() + + await asyncio.gather(*tasks, return_exceptions=True) + + for writer in writers: + writer.finalize() + + +def main() -> None: + parser = _build_parser() + args = parser.parse_args() + + subcommand = getattr(args, "subcommand", None) + + # config-check: validate and exit + if subcommand == "config-check": + try: + raw = _load_raw_config(args.config_path) + RootConfig.model_validate(raw) + print(f"Config OK: {args.config_path}") + sys.exit(0) + except Exception as exc: + print(f"Config invalid: {exc}", file=sys.stderr) + sys.exit(1) + + # run or legacy flat mode + config_path = getattr(args, "config_path", DEFAULT_CONFIG_PATH) + raw = _load_raw_config(config_path) + config = RootConfig.model_validate(raw) + config = _apply_cli_overrides(config, args) + + # --experiment-duration-infinite → set duration to 0 (signals infinite loop) + if getattr(args, "experiment_duration_infinite", False): + config = config.model_copy(update={"experiment_duration_secs": 0}) + + target_pid = getattr(args, "target_pid", None) + + asyncio.run(inner_main(config, target_pid=target_pid)) + + +if __name__ == "__main__": + main() diff --git a/lading_py/lading_py/observer/__init__.py b/lading_py/lading_py/observer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/observer/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/observer/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..3440c2470 Binary files /dev/null and b/lading_py/lading_py/observer/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/observer/__pycache__/proc.cpython-310.pyc b/lading_py/lading_py/observer/__pycache__/proc.cpython-310.pyc new file mode 100644 index 000000000..ed6f609bd Binary files /dev/null and b/lading_py/lading_py/observer/__pycache__/proc.cpython-310.pyc differ diff --git a/lading_py/lading_py/observer/proc.py b/lading_py/lading_py/observer/proc.py new file mode 100644 index 000000000..b746a1856 --- /dev/null +++ b/lading_py/lading_py/observer/proc.py @@ -0,0 +1,61 @@ +"""Linux /proc/{pid}/smaps_rollup sampler.""" +import asyncio +import os +import re +from lading_py.config import ObserverConfig +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry + +_KB_RE = re.compile(r"^(\w+):\s+(\d+)\s+kB$") + + +def _parse_smaps_rollup(pid: int) -> dict[str, int]: + path = f"/proc/{pid}/smaps_rollup" + result = {} + try: + with open(path) as f: + for line in f: + m = _KB_RE.match(line.strip()) + if m: + result[m.group(1)] = int(m.group(2)) * 1024 # bytes + except OSError: + pass + return result + + +def _parse_smaps(pid: int) -> dict[str, int]: + """Aggregate all mappings from /proc/{pid}/smaps.""" + path = f"/proc/{pid}/smaps" + totals: dict[str, int] = {} + try: + with open(path) as f: + for line in f: + m = _KB_RE.match(line.strip()) + if m: + totals[m.group(1)] = totals.get(m.group(1), 0) + int(m.group(2)) * 1024 + except OSError: + pass + return totals + + +class ProcObserver: + def __init__(self, cfg: ObserverConfig, registry: Registry, sample_period_secs: float): + self._cfg = cfg + self._registry = registry + self._period = sample_period_secs + + async def run(self, signals: Signals, pid: int) -> None: + await signals.experiment_started.wait() + tick = 0 + labels = {"pid": str(pid)} + while not signals.shutdown.is_set(): + if self._cfg.enable_smaps_rollup: + for field, val in _parse_smaps_rollup(pid).items(): + self._registry.set_gauge(f"smaps_rollup.{field}", float(val), labels) + + if self._cfg.enable_smaps and tick % 10 == 0: + for field, val in _parse_smaps(pid).items(): + self._registry.set_gauge(f"smaps.{field}", float(val), labels) + + tick += 1 + await asyncio.sleep(self._period) diff --git a/lading_py/lading_py/payload/__init__.py b/lading_py/lading_py/payload/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/payload/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/payload/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..c00f65c23 Binary files /dev/null and b/lading_py/lading_py/payload/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/payload/__pycache__/dogstatsd.cpython-310.pyc b/lading_py/lading_py/payload/__pycache__/dogstatsd.cpython-310.pyc new file mode 100644 index 000000000..d09df8a29 Binary files /dev/null and b/lading_py/lading_py/payload/__pycache__/dogstatsd.cpython-310.pyc differ diff --git a/lading_py/lading_py/payload/dogstatsd.py b/lading_py/lading_py/payload/dogstatsd.py new file mode 100644 index 000000000..847727b47 --- /dev/null +++ b/lading_py/lading_py/payload/dogstatsd.py @@ -0,0 +1,228 @@ +""" +DogStatsD payload generation. + +Produces Block objects (call descriptors) consumed by the generator. +All serialization is deferred to dogstatsd-py at send time. +""" +import re +import random +from dataclasses import dataclass, field +from typing import Union + +from lading_py.config import DogStatsDConfig + + +# --------------------------------------------------------------------------- +# Template expansion +# --------------------------------------------------------------------------- + +_TEMPLATE_RE = re.compile(r"\{\{(\d+)-(\d+)\}\}") + + +def expand_template(tmpl: str) -> list[str]: + """Expand 'name{{0-2}}' → ['name0', 'name1', 'name2'].""" + m = _TEMPLATE_RE.search(tmpl) + if not m: + return [tmpl] + lo, hi = int(m.group(1)), int(m.group(2)) + prefix = tmpl[: m.start()] + suffix = tmpl[m.end() :] + results = [] + for i in range(lo, hi + 1): + for s in expand_template(prefix + str(i) + suffix): + results.append(s) + return results + + +def expand_list(templates: list[str]) -> list[str]: + out = [] + for t in templates: + out.extend(expand_template(t)) + return out + + +# --------------------------------------------------------------------------- +# Call descriptors +# --------------------------------------------------------------------------- + +@dataclass +class MetricCall: + name: str + value: float + metric_type: str # "gauge"|"count"|"histogram"|"distribution"|"timing"|"set" + tags: list[str] + sample_rate: float | None = None + + +@dataclass +class EventCall: + title: str + text: str + tags: list[str] + alert_type: str | None = None + priority: str | None = None + + +@dataclass +class ServiceCheckCall: + name: str + status: int # 0=OK 1=WARNING 2=CRITICAL 3=UNKNOWN + tags: list[str] + message: str | None = None + + +# A single metric/event/service_check OR a batch of metrics (multi-value) +Block = Union[MetricCall, EventCall, ServiceCheckCall, list[MetricCall]] + + +# --------------------------------------------------------------------------- +# Context pool +# --------------------------------------------------------------------------- + +@dataclass +class Context: + name: str + base_tags: list[str] + + +def _weighted_choice(rng: random.Random, weights: dict[str, int]) -> str: + keys = [k for k, w in weights.items() if w > 0] + ws = [weights[k] for k in keys] + return rng.choices(keys, weights=ws, k=1)[0] + + +def build_context_pool(cfg: DogStatsDConfig, rng: random.Random) -> list[Context]: + names = expand_list(cfg.metric_names) + tag_names = expand_list(cfg.tag_names) + tag_values = expand_list(cfg.tag_values) + + n = int(cfg.contexts.hi) + contexts = [] + for _ in range(n): + name = rng.choice(names) + n_tags = cfg.tags_per_msg.sample_int(rng) + tags = [ + f"{rng.choice(tag_names)}:{rng.choice(tag_values)}" + for _ in range(n_tags) + ] + contexts.append(Context(name=name, base_tags=tags)) + return contexts + + +# --------------------------------------------------------------------------- +# Block generation +# --------------------------------------------------------------------------- + +_METRIC_TYPE_MAP = { + "count": "count", + "gauge": "gauge", + "timer": "timing", + "distribution": "distribution", + "set": "set", + "histogram": "histogram", +} + +_ALERT_TYPES = ["error", "warning", "info", "success"] +_PRIORITIES = ["normal", "low"] +_SC_STATUSES = [0, 1, 2, 3] + + +def _sample_metric_value(rng: random.Random, metric_type: str) -> float: + if metric_type == "count": + return float(rng.randint(1, 100)) + if metric_type == "set": + return float(rng.randint(0, 10000)) + if metric_type == "timing": + return round(rng.uniform(0.1, 5000.0), 3) + return round(rng.uniform(0.0, 1000.0), 4) + + +def _maybe_sample_rate(rng: random.Random, cfg: DogStatsDConfig) -> float | None: + if rng.random() < cfg.sampling_probability: + return round(cfg.sampling_range.sample(rng), 4) + return None + + +def _gen_metric_call( + rng: random.Random, cfg: DogStatsDConfig, contexts: list[Context] +) -> MetricCall: + ctx = rng.choice(contexts) + kind_weights = {k: v for k, v in cfg.metric_weights.model_dump().items()} + raw_type = _weighted_choice(rng, kind_weights) + metric_type = _METRIC_TYPE_MAP[raw_type] + value = _sample_metric_value(rng, metric_type) + sample_rate = _maybe_sample_rate(rng, cfg) + return MetricCall( + name=ctx.name, + value=value, + metric_type=metric_type, + tags=list(ctx.base_tags), + sample_rate=sample_rate, + ) + + +def _gen_event_call(rng: random.Random) -> EventCall: + title_len = rng.randint(8, 32) + text_len = rng.randint(16, 128) + title = "".join(rng.choices("abcdefghijklmnopqrstuvwxyz_", k=title_len)) + text = "".join(rng.choices("abcdefghijklmnopqrstuvwxyz_ ", k=text_len)) + alert_type = rng.choice(_ALERT_TYPES) if rng.random() < 0.5 else None + priority = rng.choice(_PRIORITIES) if rng.random() < 0.5 else None + return EventCall(title=title, text=text, tags=[], alert_type=alert_type, priority=priority) + + +def _gen_service_check_call(rng: random.Random) -> ServiceCheckCall: + name_len = rng.randint(8, 32) + name = "".join(rng.choices("abcdefghijklmnopqrstuvwxyz_.", k=name_len)) + status = rng.choice(_SC_STATUSES) + return ServiceCheckCall(name=name, status=status, tags=[]) + + +def generate_block( + rng: random.Random, cfg: DogStatsDConfig, contexts: list[Context] +) -> Block: + kind_weights = cfg.kind_weights.model_dump() + kind = _weighted_choice(rng, kind_weights) + + if kind == "metric": + if rng.random() < cfg.multivalue_pack_probability: + count = cfg.multivalue_count.sample_int(rng) + return [_gen_metric_call(rng, cfg, contexts) for _ in range(count)] + return _gen_metric_call(rng, cfg, contexts) + elif kind == "event": + return _gen_event_call(rng) + else: + return _gen_service_check_call(rng) + + +# --------------------------------------------------------------------------- +# Block cache +# --------------------------------------------------------------------------- + +def _estimate_block_bytes(block: Block) -> int: + """Rough wire-size estimate for rate limiting.""" + if isinstance(block, list): + return sum(_estimate_block_bytes(m) for m in block) + if isinstance(block, MetricCall): + return len(block.name) + sum(len(t) for t in block.tags) + 30 + if isinstance(block, EventCall): + return len(block.title) + len(block.text) + 20 + if isinstance(block, ServiceCheckCall): + return len(block.name) + 20 + return 50 + + +class BlockCache: + def __init__(self, cfg: DogStatsDConfig, seed: list[int], max_count: int = 10_000): + seed_int = int.from_bytes(bytes(seed[:32]), "little") + rng = random.Random(seed_int) + contexts = build_context_pool(cfg, rng) + self._blocks: list[Block] = [ + generate_block(rng, cfg, contexts) for _ in range(max_count) + ] + self._idx = 0 + + def next(self) -> Block: + block = self._blocks[self._idx] + self._idx = (self._idx + 1) % len(self._blocks) + return block diff --git a/lading_py/lading_py/signal.py b/lading_py/lading_py/signal.py new file mode 100644 index 000000000..7adcaadaf --- /dev/null +++ b/lading_py/lading_py/signal.py @@ -0,0 +1,17 @@ +import asyncio +import threading + + +class Signals: + def __init__(self): + self.experiment_started = asyncio.Event() + self.shutdown = asyncio.Event() + # Threading version for sync code running in threads + self._shutdown_thread = threading.Event() + + def set_shutdown(self): + self.shutdown.set() + self._shutdown_thread.set() + + def shutdown_is_set(self) -> bool: + return self._shutdown_thread.is_set() diff --git a/lading_py/lading_py/target_metrics/__init__.py b/lading_py/lading_py/target_metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/target_metrics/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/target_metrics/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..44f25a9d2 Binary files /dev/null and b/lading_py/lading_py/target_metrics/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/target_metrics/__pycache__/expvar.cpython-310.pyc b/lading_py/lading_py/target_metrics/__pycache__/expvar.cpython-310.pyc new file mode 100644 index 000000000..3183efee2 Binary files /dev/null and b/lading_py/lading_py/target_metrics/__pycache__/expvar.cpython-310.pyc differ diff --git a/lading_py/lading_py/target_metrics/__pycache__/prometheus.cpython-310.pyc b/lading_py/lading_py/target_metrics/__pycache__/prometheus.cpython-310.pyc new file mode 100644 index 000000000..cd0530c28 Binary files /dev/null and b/lading_py/lading_py/target_metrics/__pycache__/prometheus.cpython-310.pyc differ diff --git a/lading_py/lading_py/target_metrics/expvar.py b/lading_py/lading_py/target_metrics/expvar.py new file mode 100644 index 000000000..015fa96fc --- /dev/null +++ b/lading_py/lading_py/target_metrics/expvar.py @@ -0,0 +1,54 @@ +"""Polls a Go expvar /debug/vars JSON endpoint and records values in the registry.""" +import asyncio +import aiohttp +from lading_py.config import ExpvarTargetConfig +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry + + +def _resolve_path(data: dict, path: str): + """Navigate '/foo/bar/baz' → data['foo']['bar']['baz']. Returns None if missing.""" + parts = [p for p in path.split("/") if p] + node = data + for part in parts: + if not isinstance(node, dict) or part not in node: + return None + node = node[part] + return node + + +class ExpvarPoller: + def __init__(self, cfg: ExpvarTargetConfig, registry: Registry, sample_period_secs: float): + self._cfg = cfg + self._registry = registry + self._period = sample_period_secs + + async def run(self, signals: Signals) -> None: + await signals.experiment_started.wait() + async with aiohttp.ClientSession() as session: + while not signals.shutdown.is_set(): + try: + async with session.get(self._cfg.uri, timeout=aiohttp.ClientTimeout(total=5)) as resp: + data = await resp.json(content_type=None) + for var_path in self._cfg.vars: + value = _resolve_path(data, var_path) + if value is None: + continue + # Flatten non-numeric nested dicts by path extension + if isinstance(value, dict): + for k, v in value.items(): + if isinstance(v, (int, float)): + self._registry.set_gauge( + f"{var_path}/{k}".lstrip("/"), + float(v), + self._cfg.tags, + ) + elif isinstance(value, (int, float)): + self._registry.set_gauge( + var_path.lstrip("/"), + float(value), + self._cfg.tags, + ) + except Exception: + pass + await asyncio.sleep(self._period) diff --git a/lading_py/lading_py/target_metrics/prometheus.py b/lading_py/lading_py/target_metrics/prometheus.py new file mode 100644 index 000000000..c2a3449ac --- /dev/null +++ b/lading_py/lading_py/target_metrics/prometheus.py @@ -0,0 +1,82 @@ +"""Scrapes a Prometheus text-format endpoint and records metrics in the registry.""" +import asyncio +import re +import aiohttp +from lading_py.config import PrometheusTargetConfig +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry + +_LINE_RE = re.compile( + r'^([a-zA-Z_:][a-zA-Z0-9_:]*)(\{[^}]*\})?\s+([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?|[+-]?Inf|NaN)' +) +_LABEL_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') + + +def _parse_labels(labels_str: str) -> dict[str, str]: + return {m.group(1): m.group(2) for m in _LABEL_RE.finditer(labels_str)} + + +def _parse_text(text: str) -> list[tuple[str, str, float, dict]]: + """Returns list of (name, kind, value, labels).""" + results = [] + kinds: dict[str, str] = {} + for line in text.splitlines(): + line = line.strip() + if not line: + continue + if line.startswith("# TYPE"): + parts = line.split(None, 4) + if len(parts) >= 4: + kinds[parts[2]] = parts[3] + continue + if line.startswith("#"): + continue + m = _LINE_RE.match(line) + if not m: + continue + name = m.group(1) + labels = _parse_labels(m.group(2) or "") + try: + value = float(m.group(3)) + except ValueError: + continue + # Prometheus histogram/summary data lines have suffixes; look up base name + kind = kinds.get(name, "") + if not kind: + for suffix in ("_bucket", "_sum", "_count", "_total", "_created"): + if name.endswith(suffix): + kind = kinds.get(name[: -len(suffix)], "") + if kind: + break + results.append((name, kind or "gauge", value, labels)) + return results + + +class PrometheusScraper: + def __init__(self, cfg: PrometheusTargetConfig, registry: Registry, sample_period_secs: float): + self._cfg = cfg + self._registry = registry + self._period = sample_period_secs + + async def run(self, signals: Signals) -> None: + await signals.experiment_started.wait() + async with aiohttp.ClientSession() as session: + while not signals.shutdown.is_set(): + try: + async with session.get(self._cfg.uri, timeout=aiohttp.ClientTimeout(total=5)) as resp: + text = await resp.text() + metrics = _parse_text(text) + allowed = set(self._cfg.metrics) if self._cfg.metrics else None + for name, kind, value, labels in metrics: + if allowed and name not in allowed: + continue + merged = {**labels, **self._cfg.tags} + if kind == "counter": + self._registry.increment(name, int(value), merged) + elif kind == "histogram" or kind == "summary": + self._registry.record_histogram(name, value, merged) + else: + self._registry.set_gauge(name, value, merged) + except Exception: + pass + await asyncio.sleep(self._period) diff --git a/lading_py/lading_py/telemetry/__init__.py b/lading_py/lading_py/telemetry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lading_py/lading_py/telemetry/__pycache__/__init__.cpython-310.pyc b/lading_py/lading_py/telemetry/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 000000000..a1e31d7a0 Binary files /dev/null and b/lading_py/lading_py/telemetry/__pycache__/__init__.cpython-310.pyc differ diff --git a/lading_py/lading_py/telemetry/__pycache__/prometheus_exporter.cpython-310.pyc b/lading_py/lading_py/telemetry/__pycache__/prometheus_exporter.cpython-310.pyc new file mode 100644 index 000000000..3944b71e6 Binary files /dev/null and b/lading_py/lading_py/telemetry/__pycache__/prometheus_exporter.cpython-310.pyc differ diff --git a/lading_py/lading_py/telemetry/__pycache__/registry.cpython-310.pyc b/lading_py/lading_py/telemetry/__pycache__/registry.cpython-310.pyc new file mode 100644 index 000000000..2cc7791d8 Binary files /dev/null and b/lading_py/lading_py/telemetry/__pycache__/registry.cpython-310.pyc differ diff --git a/lading_py/lading_py/telemetry/prometheus_exporter.py b/lading_py/lading_py/telemetry/prometheus_exporter.py new file mode 100644 index 000000000..77f21d6c1 --- /dev/null +++ b/lading_py/lading_py/telemetry/prometheus_exporter.py @@ -0,0 +1,48 @@ +""" +Passive Prometheus exporter. Syncs from Registry into prometheus_client +collectors and serves GET /metrics via aiohttp. +""" +import asyncio +from aiohttp import web +from prometheus_client import CollectorRegistry, Gauge, Counter, generate_latest, CONTENT_TYPE_LATEST +from lading_py.signal import Signals +from lading_py.telemetry.registry import Registry as LadingRegistry + + +class PrometheusExporter: + def __init__(self, lading_registry: LadingRegistry, addr: str): + host, port = addr.rsplit(":", 1) + self._host = host + self._port = int(port) + self._lading_registry = lading_registry + self._prom_registry = CollectorRegistry() + self._counters: dict[str, Counter] = {} + self._gauges: dict[str, Gauge] = {} + + def _sync(self) -> None: + counters, gauges, _ = self._lading_registry.snapshot() + for (name, label_pairs), value in gauges.items(): + safe = name.replace(".", "_").replace("/", "_") + label_keys = [k for k, _ in label_pairs] + label_vals = [v for _, v in label_pairs] + if safe not in self._gauges: + self._gauges[safe] = Gauge(safe, safe, label_keys, registry=self._prom_registry) + try: + self._gauges[safe].labels(*label_vals).set(value) + except Exception: + pass + + async def _metrics_handler(self, request: web.Request) -> web.Response: + self._sync() + output = generate_latest(self._prom_registry) + return web.Response(body=output, content_type=CONTENT_TYPE_LATEST) + + async def run(self, signals: Signals) -> None: + app = web.Application() + app.router.add_get("/metrics", self._metrics_handler) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, self._host, self._port) + await site.start() + await signals.shutdown.wait() + await runner.cleanup() diff --git a/lading_py/lading_py/telemetry/registry.py b/lading_py/lading_py/telemetry/registry.py new file mode 100644 index 000000000..86256f6e6 --- /dev/null +++ b/lading_py/lading_py/telemetry/registry.py @@ -0,0 +1,39 @@ +"""Thread-safe metric registry. Counters, gauges, histograms.""" +import threading +from collections import defaultdict + + +def _key(name: str, labels: dict) -> tuple: + return (name, tuple(sorted(labels.items()))) + + +class Registry: + def __init__(self): + self._lock = threading.Lock() + self._counters: dict[tuple, int] = defaultdict(int) + self._gauges: dict[tuple, float] = {} + self._histograms: dict[tuple, list[float]] = defaultdict(list) + + def increment(self, name: str, value: int = 1, labels: dict | None = None): + k = _key(name, labels or {}) + with self._lock: + self._counters[k] += value + + def set_gauge(self, name: str, value: float, labels: dict | None = None): + k = _key(name, labels or {}) + with self._lock: + self._gauges[k] = value + + def record_histogram(self, name: str, value: float, labels: dict | None = None): + k = _key(name, labels or {}) + with self._lock: + self._histograms[k].append(value) + + def snapshot(self) -> tuple[dict, dict, dict]: + """Returns (counters, gauges, histograms). Drains histogram samples.""" + with self._lock: + counters = dict(self._counters) + gauges = dict(self._gauges) + histograms = {k: list(v) for k, v in self._histograms.items()} + self._histograms.clear() + return counters, gauges, histograms diff --git a/lading_py/pyproject.toml b/lading_py/pyproject.toml new file mode 100644 index 000000000..523cbbc0f --- /dev/null +++ b/lading_py/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "lading-py" +version = "0.1.0" +requires-python = ">=3.10" +dependencies = [ + "datadog>=0.49", + "pydantic>=2", + "PyYAML>=6", + "aiohttp>=3.9", + "backports.zstd>=1.5", + "prometheus-client>=0.20", + "pyarrow>=15", + "structlog>=24", +] + +[project.scripts] +lading-py = "lading_py.main:main" + +[project.optional-dependencies] +dev = ["pytest>=8", "pytest-asyncio>=0.23"] diff --git a/lading_py/tests/__pycache__/smoke_test.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/smoke_test.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..4ab99c31d Binary files /dev/null and b/lading_py/tests/__pycache__/smoke_test.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/__pycache__/test_capture.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/test_capture.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..579979890 Binary files /dev/null and b/lading_py/tests/__pycache__/test_capture.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/__pycache__/test_config.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/test_config.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..54eebdb49 Binary files /dev/null and b/lading_py/tests/__pycache__/test_config.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/__pycache__/test_generator.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/test_generator.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..6848d0069 Binary files /dev/null and b/lading_py/tests/__pycache__/test_generator.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/__pycache__/test_payload.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/test_payload.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..57164cb49 Binary files /dev/null and b/lading_py/tests/__pycache__/test_payload.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/__pycache__/test_registry.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/test_registry.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..fe0f3851c Binary files /dev/null and b/lading_py/tests/__pycache__/test_registry.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/__pycache__/test_target_metrics.cpython-310-pytest-9.0.2.pyc b/lading_py/tests/__pycache__/test_target_metrics.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 000000000..c60381263 Binary files /dev/null and b/lading_py/tests/__pycache__/test_target_metrics.cpython-310-pytest-9.0.2.pyc differ diff --git a/lading_py/tests/smoke_test.py b/lading_py/tests/smoke_test.py new file mode 100644 index 000000000..ea7ff7ccd --- /dev/null +++ b/lading_py/tests/smoke_test.py @@ -0,0 +1,96 @@ +""" +Smoke test: spin up a Unix datagram socket server, run lading-py for 3 seconds, +assert bytes were received and the output file was written. +""" +import asyncio +import os +import socket +import tempfile +import threading +import time +import pytest +import yaml + + +SOCKET_PATH = "/tmp/lading_smoke_test.socket" + + +def _socket_server(sock_path: str, received: list, stop_event: threading.Event): + if os.path.exists(sock_path): + os.unlink(sock_path) + s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + s.bind(sock_path) + s.settimeout(0.1) + while not stop_event.is_set(): + try: + data, _ = s.recvfrom(65536) + received.append(len(data)) + except socket.timeout: + pass + s.close() + + +@pytest.fixture +def smoke_config(tmp_path): + out_path = str(tmp_path / "output") + cfg = { + "generator": [{ + "unix_datagram": { + "seed": list(range(32)), + "path": SOCKET_PATH, + "bytes_per_second": "1 MiB", + "parallel_connections": 1, + "variant": { + "dogstatsd": { + "contexts": {"inclusive": {"min": 10, "max": 10}}, + "tags_per_msg": {"inclusive": {"min": 2, "max": 2}}, + "multivalue_count": {"inclusive": {"min": 2, "max": 4}}, + "multivalue_pack_probability": 0.1, + "kind_weights": {"metric": 90, "event": 5, "service_check": 5}, + "metric_weights": {"count": 1, "gauge": 1, "distribution": 3, "set": 0, "timer": 0, "histogram": 0}, + "metric_names": ["test.metric{{0-4}}"], + "tag_names": ["env", "host"], + "tag_values": ["prod{{0-2}}"], + } + }, + } + }], + "telemetry": {"path": out_path}, + "target_metrics": [], + "warmup_duration_secs": 0, + "experiment_duration_secs": 3, + } + cfg_path = str(tmp_path / "config.yaml") + with open(cfg_path, "w") as f: + yaml.dump(cfg, f) + return cfg_path, out_path + + +def test_smoke(smoke_config): + cfg_path, out_path = smoke_config + + received: list[int] = [] + stop = threading.Event() + srv = threading.Thread(target=_socket_server, args=(SOCKET_PATH, received, stop), daemon=True) + srv.start() + time.sleep(0.1) + + from lading_py.config import RootConfig + import lading_py.main as lm + + with open(cfg_path) as f: + raw = yaml.safe_load(f) + config = RootConfig.model_validate(raw) + asyncio.run(lm.inner_main(config)) + + stop.set() + srv.join(timeout=2) + + assert sum(received) > 0, "no bytes received at socket" + assert os.path.exists(out_path), "output file not created" + with open(out_path) as f: + lines = [l for l in f if l.strip()] + assert len(lines) > 0, "output file is empty" + + if os.path.exists(SOCKET_PATH): + os.unlink(SOCKET_PATH) diff --git a/lading_py/tests/test_capture.py b/lading_py/tests/test_capture.py new file mode 100644 index 000000000..1dc3a4909 --- /dev/null +++ b/lading_py/tests/test_capture.py @@ -0,0 +1,321 @@ +"""Tests for capture output: accumulator, JSONL writer, Parquet writer.""" +import json +import os +import tempfile +import time +import pytest +import pyarrow.parquet as pq + +from lading_py.capture.line import Line, MetricKind +from lading_py.capture.jsonl_writer import JsonlWriter +from lading_py.capture.parquet_writer import ParquetWriter +from lading_py.capture.accumulator import Accumulator, _parse_key +from lading_py.telemetry.registry import Registry, _key + + +# --------------------------------------------------------------------------- +# Line serialization +# --------------------------------------------------------------------------- + +class TestLine: + def _make_line(self, **kwargs) -> Line: + defaults = dict( + run_id="test-run-id", + time=1_700_000_000_000, + fetch_index=0, + metric_name="bytes_written", + metric_kind=MetricKind.Counter, + value=1024.0, + labels={"generator": "dogstatsd"}, + ) + defaults.update(kwargs) + return Line(**defaults) + + def test_to_dict_fields(self): + line = self._make_line() + d = line.to_dict() + assert d["run_id"] == "test-run-id" + assert d["time"] == 1_700_000_000_000 + assert d["fetch_index"] == 0 + assert d["metric_name"] == "bytes_written" + assert d["metric_kind"] == MetricKind.Counter + assert d["value"] == 1024.0 + assert d["labels"] == {"generator": "dogstatsd"} + + def test_to_dict_no_histogram_key_when_empty(self): + line = self._make_line() + d = line.to_dict() + assert "value_histogram" not in d + + def test_to_dict_base64_histogram(self): + import base64 + line = self._make_line(value_histogram=b"\x01\x02\x03") + d = line.to_dict() + assert d["value_histogram"] == base64.b64encode(b"\x01\x02\x03").decode() + + +# --------------------------------------------------------------------------- +# JSONL writer +# --------------------------------------------------------------------------- + +class TestJsonlWriter: + def _make_lines(self, n: int) -> list[Line]: + return [ + Line( + run_id="r", + time=1_000_000 + i, + fetch_index=i, + metric_name=f"metric.{i}", + metric_kind=MetricKind.Gauge, + value=float(i), + labels={"idx": str(i)}, + ) + for i in range(n) + ] + + def test_writes_and_reads_back(self, tmp_path): + path = str(tmp_path / "out.jsonl") + writer = JsonlWriter(path) + lines = self._make_lines(5) + writer.flush(lines) + with open(path) as f: + rows = [json.loads(l) for l in f if l.strip()] + assert len(rows) == 5 + assert rows[0]["metric_name"] == "metric.0" + assert rows[4]["value"] == 4.0 + + def test_multiple_flushes_append(self, tmp_path): + path = str(tmp_path / "out.jsonl") + writer = JsonlWriter(path) + writer.flush(self._make_lines(3)) + writer.flush(self._make_lines(2)) + with open(path) as f: + rows = [l for l in f if l.strip()] + assert len(rows) == 5 + + def test_empty_flush_noop(self, tmp_path): + path = str(tmp_path / "out.jsonl") + writer = JsonlWriter(path) + writer.flush([]) + assert os.path.getsize(path) == 0 + + def test_valid_json_per_line(self, tmp_path): + path = str(tmp_path / "out.jsonl") + writer = JsonlWriter(path) + writer.flush(self._make_lines(10)) + with open(path) as f: + for line in f: + if line.strip(): + json.loads(line) # should not raise + + def test_overwrites_on_new_writer(self, tmp_path): + path = str(tmp_path / "out.jsonl") + JsonlWriter(path).flush(self._make_lines(5)) + JsonlWriter(path).flush(self._make_lines(2)) + with open(path) as f: + rows = [l for l in f if l.strip()] + assert len(rows) == 2 + + def test_label_roundtrip(self, tmp_path): + path = str(tmp_path / "out.jsonl") + writer = JsonlWriter(path) + line = Line("r", 0, 0, "m", MetricKind.Gauge, 1.0, {"a": "x", "b": "y"}) + writer.flush([line]) + with open(path) as f: + row = json.loads(f.read()) + assert row["labels"] == {"a": "x", "b": "y"} + + +# --------------------------------------------------------------------------- +# Parquet writer +# --------------------------------------------------------------------------- + +class TestParquetWriter: + def _make_lines(self, n: int) -> list[Line]: + return [ + Line( + run_id="run1", + time=1_000 + i, + fetch_index=i, + metric_name="test.metric", + metric_kind=MetricKind.Counter, + value=float(i * 10), + labels={"env": "test"}, + ) + for i in range(n) + ] + + def test_writes_parquet_file(self, tmp_path): + path = str(tmp_path / "out.parquet") + writer = ParquetWriter(path) + writer.flush(self._make_lines(5)) + writer.finalize() + assert os.path.exists(path) + table = pq.read_table(path) + assert table.num_rows == 5 + + def test_schema_columns_present(self, tmp_path): + path = str(tmp_path / "out.parquet") + writer = ParquetWriter(path) + writer.flush(self._make_lines(1)) + writer.finalize() + table = pq.read_table(path) + for col in ("run_id", "time", "fetch_index", "metric_name", "metric_kind", "value"): + assert col in table.schema.names + + def test_values_correct(self, tmp_path): + path = str(tmp_path / "out.parquet") + writer = ParquetWriter(path) + writer.flush(self._make_lines(3)) + writer.finalize() + table = pq.read_table(path) + assert table["value"].to_pylist() == [0.0, 10.0, 20.0] + assert all(r == "run1" for r in table["run_id"].to_pylist()) + + def test_multiple_flushes_appended(self, tmp_path): + path = str(tmp_path / "out.parquet") + writer = ParquetWriter(path) + writer.flush(self._make_lines(3)) + writer.flush(self._make_lines(2)) + writer.finalize() + table = pq.read_table(path) + assert table.num_rows == 5 + + def test_empty_flush_noop(self, tmp_path): + path = str(tmp_path / "out.parquet") + writer = ParquetWriter(path) + writer.flush([]) + writer.finalize() + assert not os.path.exists(path) + + +# --------------------------------------------------------------------------- +# Accumulator +# --------------------------------------------------------------------------- + +class TestParseKey: + def test_simple(self): + name, labels = _parse_key(("metric", (("a", "1"),))) + assert name == "metric" + assert labels == {"a": "1"} + + def test_empty_labels(self): + name, labels = _parse_key(("m", ())) + assert name == "m" + assert labels == {} + + +class TestAccumulatorFlush: + def _acc(self, registry: Registry, writers: list) -> Accumulator: + return Accumulator("run-1", registry, writers, flush_seconds=3600) + + def test_counter_delta_first_flush(self): + registry = Registry() + registry.increment("bytes", 100) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + counter_lines = [l for l in lines if l.metric_kind == MetricKind.Counter] + assert any(l.metric_name == "bytes" and l.value == 100.0 for l in counter_lines) + + def test_counter_delta_second_flush(self): + registry = Registry() + registry.increment("bytes", 100) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + lines.clear() + registry.increment("bytes", 50) + acc._flush() + counter_lines = [l for l in lines if l.metric_name == "bytes"] + assert any(l.value == 50.0 for l in counter_lines) + + def test_counter_delta_zero_if_no_new_increments(self): + registry = Registry() + registry.increment("x", 10) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + lines.clear() + acc._flush() + counter_lines = [l for l in lines if l.metric_name == "x"] + assert any(l.value == 0.0 for l in counter_lines) + + def test_gauge_passthrough(self): + registry = Registry() + registry.set_gauge("cpu", 55.5) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + gauge_lines = [l for l in lines if l.metric_kind == MetricKind.Gauge] + assert any(l.metric_name == "cpu" and l.value == 55.5 for l in gauge_lines) + + def test_histogram_mean(self): + registry = Registry() + for v in [10.0, 20.0, 30.0]: + registry.record_histogram("latency", v) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + hist_lines = [l for l in lines if l.metric_kind == MetricKind.Histogram] + assert any(l.metric_name == "latency" and l.value == 20.0 for l in hist_lines) + + def test_histogram_drained_after_flush(self): + registry = Registry() + registry.record_histogram("h", 5.0) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + lines.clear() + acc._flush() + hist_lines = [l for l in lines if l.metric_kind == MetricKind.Histogram] + assert hist_lines == [] + + def test_fetch_index_increments(self): + registry = Registry() + registry.set_gauge("g", 1.0) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + idx0 = lines[0].fetch_index + lines.clear() + registry.set_gauge("g", 2.0) + acc._flush() + idx1 = lines[0].fetch_index + assert idx1 == idx0 + 1 + + def test_labels_preserved(self): + registry = Registry() + registry.set_gauge("g", 1.0, {"env": "prod"}) + lines = [] + acc = self._acc(registry, [_ListWriter(lines)]) + acc._flush() + assert any(l.labels == {"env": "prod"} for l in lines) + + def test_run_id_in_lines(self): + registry = Registry() + registry.set_gauge("g", 1.0) + lines = [] + acc = Accumulator("my-run-id", registry, [_ListWriter(lines)]) + acc._flush() + assert all(l.run_id == "my-run-id" for l in lines) + + def test_multiple_writers_both_receive_lines(self): + registry = Registry() + registry.set_gauge("g", 1.0) + lines1, lines2 = [], [] + acc = self._acc(registry, [_ListWriter(lines1), _ListWriter(lines2)]) + acc._flush() + assert len(lines1) == len(lines2) == 1 + + +class _ListWriter: + """Test double that collects flushed Lines in a list.""" + def __init__(self, lines: list): + self._lines = lines + + def flush(self, lines: list[Line]) -> None: + self._lines.extend(lines) + + def finalize(self) -> None: + pass diff --git a/lading_py/tests/test_config.py b/lading_py/tests/test_config.py new file mode 100644 index 000000000..5232c46a1 --- /dev/null +++ b/lading_py/tests/test_config.py @@ -0,0 +1,140 @@ +"""Tests for config parsing.""" +import pytest +import yaml +from pydantic import ValidationError +from lading_py.config import ( + RootConfig, DogStatsDConfig, ConfRange, InclusiveRange, + parse_bytes, TelemetryConfig, +) + + +class TestParseBytes: + def test_mib(self): + assert parse_bytes("1 MiB") == 1024 ** 2 + + def test_gib(self): + assert parse_bytes("2 GiB") == 2 * 1024 ** 3 + + def test_mb(self): + assert parse_bytes("100 MB") == 100 * 1000 ** 2 + + def test_b(self): + assert parse_bytes("8192 B") == 8192 + + def test_fractional(self): + assert parse_bytes("0.5 GiB") == int(0.5 * 1024 ** 3) + + def test_plain_int(self): + assert parse_bytes(12345) == 12345 + + def test_case_insensitive(self): + assert parse_bytes("4 mib") == 4 * 1024 ** 2 + + def test_500_mib(self): + assert parse_bytes("500 MiB") == 500 * 1024 ** 2 + + +class TestConfRange: + def test_inclusive_lo_hi(self): + r = ConfRange(inclusive=InclusiveRange(min=3, max=7)) + assert r.lo == 3 + assert r.hi == 7 + + def test_sample_int_in_range(self): + import random + rng = random.Random(42) + r = ConfRange(inclusive=InclusiveRange(min=2, max=5)) + for _ in range(100): + v = r.sample_int(rng) + assert 2 <= v <= 5 + + def test_sample_float_in_range(self): + import random + rng = random.Random(42) + r = ConfRange(inclusive=InclusiveRange(min=0.1, max=1.0)) + for _ in range(100): + v = r.sample(rng) + assert 0.1 <= v <= 1.0 + + +class TestDogStatsDConfig: + def test_defaults(self): + cfg = DogStatsDConfig() + assert cfg.multivalue_pack_probability == 0.08 + assert cfg.sampling_probability == 0.5 + assert cfg.length_prefix_framed is False + + def test_length_prefix_framed_rejected(self): + with pytest.raises(ValidationError, match="length_prefix_framed"): + DogStatsDConfig(length_prefix_framed=True) + + def test_metric_names_default(self): + cfg = DogStatsDConfig() + assert cfg.metric_names == ["metric{{0-9}}"] + + +class TestTelemetryConfig: + def test_short_form_path(self): + tel = TelemetryConfig(path="nong") + assert tel.output_path == "nong" + assert tel.format == "jsonl" + assert tel.flush_seconds == 60 + + def test_long_form_log(self): + tel = TelemetryConfig(log={"path": "out", "format": {"jsonl": {"flush_seconds": 30}}}) + assert tel.output_path == "out" + assert tel.format == "jsonl" + assert tel.flush_seconds == 30 + + def test_parquet_format(self): + tel = TelemetryConfig(log={"path": "out", "format": {"parquet": {"flush_seconds": 120}}}) + assert tel.format == "parquet" + + def test_prometheus_addr(self): + tel = TelemetryConfig(prometheus={"addr": "0.0.0.0:9000"}) + assert tel.prometheus_addr == "0.0.0.0:9000" + assert tel.output_path is None + + +class TestRootConfigFromYaml: + def test_parse_lading_yaml(self): + with open("/home/stephenwakely/src/lading/lading.yaml") as f: + raw = yaml.safe_load(f) + config = RootConfig.model_validate(raw) + + assert len(config.generator) == 1 + gen = config.generator[0] + assert gen.unix_datagram is not None + assert gen.unix_datagram.path == "/tmp/dsd.socket" + assert gen.unix_datagram.bytes_per_second_int == 1024 ** 2 + + dsd = gen.unix_datagram.dogstatsd + assert dsd.contexts.lo == 50 + assert dsd.contexts.hi == 50 + assert dsd.metric_weights.distribution == 5 + assert dsd.metric_names == ["name{{0-2}}"] + assert dsd.tag_names == ["tag1", "tag2", "tag3"] + + assert len(config.target_metrics) == 3 + assert config.target_metrics[0].prometheus is not None + assert config.target_metrics[2].expvar is not None + + def test_minimal_config(self): + raw = { + "generator": [{ + "unix_datagram": { + "seed": list(range(32)), + "path": "/tmp/test.socket", + "variant": {"dogstatsd": {}}, + } + }], + } + config = RootConfig.model_validate(raw) + assert config.experiment_duration_secs == 60 + assert config.sample_period_milliseconds == 1000 + + def test_empty_config(self): + config = RootConfig.model_validate({}) + assert config.generator == [] + assert config.blackhole == [] + assert config.target_metrics == [] diff --git a/lading_py/tests/test_generator.py b/lading_py/tests/test_generator.py new file mode 100644 index 000000000..d492a698b --- /dev/null +++ b/lading_py/tests/test_generator.py @@ -0,0 +1,176 @@ +"""Tests for the DogStatsD generator: dispatch to dogstatsd-py and rate limiter.""" +import time +from unittest.mock import MagicMock, call, patch +from lading_py.generator.dogstatsd import TokenBucket, _send_block, _send_metric +from lading_py.payload.dogstatsd import MetricCall, EventCall, ServiceCheckCall + + +# --------------------------------------------------------------------------- +# TokenBucket +# --------------------------------------------------------------------------- + +class TestTokenBucket: + def test_acquires_immediately_when_tokens_available(self): + tb = TokenBucket(rate=1_000_000) + t0 = time.monotonic() + tb.acquire(100) + elapsed = time.monotonic() - t0 + assert elapsed < 0.05 # should be near-instant + + def test_throttles_when_rate_exceeded(self): + rate = 500 # 500 bytes/sec + tb = TokenBucket(rate=rate) + tb.acquire(rate) # drain the bucket + t0 = time.monotonic() + tb.acquire(250) # should wait ~0.5s + elapsed = time.monotonic() - t0 + assert elapsed >= 0.4, f"expected >= 0.4s, got {elapsed:.3f}s" + assert elapsed < 2.0, "took too long" + + def test_multiple_acquires_accumulate(self): + rate = 1000 + tb = TokenBucket(rate=rate) + tb.acquire(rate) # drain + t0 = time.monotonic() + tb.acquire(500) + tb.acquire(500) + elapsed = time.monotonic() - t0 + assert elapsed >= 0.9 + + +# --------------------------------------------------------------------------- +# _send_block dispatch +# --------------------------------------------------------------------------- + +def _make_client() -> MagicMock: + client = MagicMock() + buf = MagicMock() + client.open_buffer.return_value.__enter__ = MagicMock(return_value=buf) + client.open_buffer.return_value.__exit__ = MagicMock(return_value=False) + return client + + +class TestSendMetric: + def test_gauge(self): + client = _make_client() + m = MetricCall("my.gauge", 42.0, "gauge", ["env:prod"], 1.0) + _send_metric(client, m) + client.gauge.assert_called_once_with("my.gauge", 42.0, tags=["env:prod"], sample_rate=1.0) + + def test_count(self): + client = _make_client() + m = MetricCall("hits", 7.0, "count", [], None) + _send_metric(client, m) + client.increment.assert_called_once_with("hits", value=7, tags=[], sample_rate=1) + + def test_histogram(self): + client = _make_client() + m = MetricCall("h", 3.14, "histogram", [], 0.5) + _send_metric(client, m) + client.histogram.assert_called_once_with("h", 3.14, tags=[], sample_rate=0.5) + + def test_distribution(self): + client = _make_client() + m = MetricCall("d", 1.0, "distribution", ["a:b"], 1.0) + _send_metric(client, m) + client.distribution.assert_called_once_with("d", 1.0, tags=["a:b"], sample_rate=1.0) + + def test_timing(self): + client = _make_client() + m = MetricCall("t", 99.9, "timing", [], None) + _send_metric(client, m) + client.timing.assert_called_once_with("t", 99.9, tags=[], sample_rate=1) + + def test_set(self): + client = _make_client() + m = MetricCall("s", 42.0, "set", [], None) + _send_metric(client, m) + client.set.assert_called_once_with("s", 42, tags=[], sample_rate=1) + + def test_sample_rate_none_defaults_to_one(self): + client = _make_client() + m = MetricCall("g", 1.0, "gauge", [], None) + _send_metric(client, m) + _, kwargs = client.gauge.call_args + assert kwargs["sample_rate"] == 1 + + def test_sample_rate_passed_through(self): + client = _make_client() + m = MetricCall("g", 1.0, "gauge", [], 0.25) + _send_metric(client, m) + _, kwargs = client.gauge.call_args + assert kwargs["sample_rate"] == 0.25 + + +class TestSendBlock: + def test_single_metric_call(self): + client = _make_client() + m = MetricCall("g", 1.0, "gauge", [], None) + _send_block(client, m) + client.gauge.assert_called_once() + client.open_buffer.assert_not_called() + + def test_batch_uses_open_buffer(self): + client = _make_client() + batch = [ + MetricCall("a", 1.0, "gauge", [], None), + MetricCall("b", 2.0, "gauge", [], None), + ] + _send_block(client, batch) + client.open_buffer.assert_called_once() + buf = client.open_buffer.return_value.__enter__.return_value + assert buf.gauge.call_count == 2 + + def test_batch_all_metrics_sent(self): + client = _make_client() + batch = [ + MetricCall("a", 1.0, "gauge", [], None), + MetricCall("b", 2.0, "count", [], None), + MetricCall("c", 3.0, "distribution", [], None), + ] + _send_block(client, batch) + buf = client.open_buffer.return_value.__enter__.return_value + assert buf.gauge.call_count == 1 + assert buf.increment.call_count == 1 + assert buf.distribution.call_count == 1 + + def test_event_call(self): + client = _make_client() + e = EventCall("My Title", "Some text", ["env:prod"], "error", "normal") + _send_block(client, e) + client.event.assert_called_once_with( + "My Title", "Some text", + tags=["env:prod"], + alert_type="error", + priority="normal", + ) + + def test_event_no_alert_type(self): + client = _make_client() + e = EventCall("T", "B", [], None, None) + _send_block(client, e) + client.event.assert_called_once() + + def test_service_check_ok(self): + client = _make_client() + sc = ServiceCheckCall("check.name", 0, ["host:foo"], None) + _send_block(client, sc) + client.service_check.assert_called_once_with( + "check.name", 0, + tags=["host:foo"], + message=None, + ) + + def test_service_check_with_message(self): + client = _make_client() + sc = ServiceCheckCall("check", 2, [], "something is broken") + _send_block(client, sc) + _, kwargs = client.service_check.call_args + assert kwargs["message"] == "something is broken" + + def test_tags_passed_to_metric(self): + client = _make_client() + m = MetricCall("g", 1.0, "gauge", ["a:1", "b:2"], None) + _send_block(client, m) + _, kwargs = client.gauge.call_args + assert kwargs["tags"] == ["a:1", "b:2"] diff --git a/lading_py/tests/test_payload.py b/lading_py/tests/test_payload.py new file mode 100644 index 000000000..7fddede26 --- /dev/null +++ b/lading_py/tests/test_payload.py @@ -0,0 +1,270 @@ +"""Tests for DogStatsD payload generation.""" +import random +import pytest +from lading_py.config import ( + DogStatsDConfig, ConfRange, InclusiveRange, KindWeights, MetricWeights, +) +from lading_py.payload.dogstatsd import ( + expand_template, expand_list, + build_context_pool, generate_block, + BlockCache, MetricCall, EventCall, ServiceCheckCall, + _estimate_block_bytes, +) + + +# --------------------------------------------------------------------------- +# Template expansion +# --------------------------------------------------------------------------- + +class TestExpandTemplate: + def test_no_template(self): + assert expand_template("metric.name") == ["metric.name"] + + def test_simple_range(self): + assert expand_template("name{{0-2}}") == ["name0", "name1", "name2"] + + def test_suffix(self): + assert expand_template("m{{1-3}}.count") == ["m1.count", "m2.count", "m3.count"] + + def test_single_value(self): + assert expand_template("x{{5-5}}") == ["x5"] + + def test_expand_list_multiple(self): + result = expand_list(["a{{0-1}}", "b{{0-1}}"]) + assert result == ["a0", "a1", "b0", "b1"] + + def test_expand_list_no_templates(self): + assert expand_list(["tag1", "tag2"]) == ["tag1", "tag2"] + + def test_ten_values(self): + result = expand_template("value{{0-9}}") + assert len(result) == 10 + assert result[0] == "value0" + assert result[9] == "value9" + + +# --------------------------------------------------------------------------- +# Context pool +# --------------------------------------------------------------------------- + +def _make_cfg(**kwargs) -> DogStatsDConfig: + base = dict( + contexts=ConfRange(inclusive=InclusiveRange(min=10, max=10)), + tags_per_msg=ConfRange(inclusive=InclusiveRange(min=2, max=2)), + metric_names=["metric{{0-2}}"], + tag_names=["env", "host"], + tag_values=["prod{{0-1}}"], + ) + base.update(kwargs) + return DogStatsDConfig(**base) + + +class TestContextPool: + def test_correct_count(self): + cfg = _make_cfg() + rng = random.Random(1) + pool = build_context_pool(cfg, rng) + assert len(pool) == 10 + + def test_names_from_expanded_templates(self): + cfg = _make_cfg() + rng = random.Random(1) + pool = build_context_pool(cfg, rng) + valid_names = {"metric0", "metric1", "metric2"} + for ctx in pool: + assert ctx.name in valid_names + + def test_tags_are_key_value_pairs(self): + cfg = _make_cfg() + rng = random.Random(1) + pool = build_context_pool(cfg, rng) + for ctx in pool: + assert len(ctx.base_tags) == 2 + for tag in ctx.base_tags: + assert ":" in tag + + def test_tag_names_from_config(self): + cfg = _make_cfg() + rng = random.Random(1) + pool = build_context_pool(cfg, rng) + valid_tag_names = {"env", "host"} + valid_tag_values = {"prod0", "prod1"} + for ctx in pool: + for tag in ctx.base_tags: + k, v = tag.split(":", 1) + assert k in valid_tag_names + assert v in valid_tag_values + + +# --------------------------------------------------------------------------- +# Block generation +# --------------------------------------------------------------------------- + +class TestGenerateBlock: + def _metric_only_cfg(self) -> DogStatsDConfig: + return _make_cfg( + kind_weights=KindWeights(metric=1, event=0, service_check=0), + metric_weights=MetricWeights(distribution=1, gauge=0, count=0, timer=0, set=0, histogram=0), + multivalue_pack_probability=0.0, + ) + + def test_single_metric_call(self): + cfg = self._metric_only_cfg() + rng = random.Random(42) + contexts = build_context_pool(cfg, rng) + block = generate_block(rng, cfg, contexts) + assert isinstance(block, MetricCall) + assert block.metric_type == "distribution" + + def test_all_metric_types_reachable(self): + cfg = _make_cfg( + kind_weights=KindWeights(metric=1, event=0, service_check=0), + metric_weights=MetricWeights(count=1, gauge=1, timer=1, distribution=1, set=1, histogram=1), + multivalue_pack_probability=0.0, + ) + rng = random.Random(0) + contexts = build_context_pool(cfg, rng) + seen_types = set() + for _ in range(500): + b = generate_block(rng, cfg, contexts) + if isinstance(b, MetricCall): + seen_types.add(b.metric_type) + # All six types should appear across 500 samples + assert seen_types >= {"count", "gauge", "timing", "distribution", "set", "histogram"} + + def test_event_call_generated(self): + cfg = _make_cfg( + kind_weights=KindWeights(metric=0, event=1, service_check=0), + ) + rng = random.Random(1) + contexts = build_context_pool(cfg, rng) + block = generate_block(rng, cfg, contexts) + assert isinstance(block, EventCall) + assert len(block.title) > 0 + assert len(block.text) > 0 + + def test_service_check_generated(self): + cfg = _make_cfg( + kind_weights=KindWeights(metric=0, event=0, service_check=1), + ) + rng = random.Random(1) + contexts = build_context_pool(cfg, rng) + block = generate_block(rng, cfg, contexts) + assert isinstance(block, ServiceCheckCall) + assert block.status in (0, 1, 2, 3) + + def test_multivalue_batch(self): + cfg = _make_cfg( + kind_weights=KindWeights(metric=1, event=0, service_check=0), + multivalue_pack_probability=1.0, + multivalue_count=ConfRange(inclusive=InclusiveRange(min=5, max=5)), + ) + rng = random.Random(7) + contexts = build_context_pool(cfg, rng) + block = generate_block(rng, cfg, contexts) + assert isinstance(block, list) + assert len(block) == 5 + assert all(isinstance(m, MetricCall) for m in block) + + def test_sample_rate_present_and_absent(self): + cfg = _make_cfg( + kind_weights=KindWeights(metric=1, event=0, service_check=0), + sampling_probability=0.5, + ) + rng = random.Random(0) + contexts = build_context_pool(cfg, rng) + with_rate = without_rate = 0 + for _ in range(200): + b = generate_block(rng, cfg, contexts) + if isinstance(b, MetricCall): + if b.sample_rate is not None: + with_rate += 1 + else: + without_rate += 1 + assert with_rate > 0 + assert without_rate > 0 + + def test_sample_rate_in_range(self): + cfg = _make_cfg( + kind_weights=KindWeights(metric=1, event=0, service_check=0), + sampling_probability=1.0, + sampling_range=ConfRange(inclusive=InclusiveRange(min=0.1, max=0.5)), + ) + rng = random.Random(0) + contexts = build_context_pool(cfg, rng) + for _ in range(50): + b = generate_block(rng, cfg, contexts) + if isinstance(b, MetricCall): + assert b.sample_rate is not None + assert 0.1 <= b.sample_rate <= 0.5 + + +# --------------------------------------------------------------------------- +# Block cache +# --------------------------------------------------------------------------- + +class TestBlockCache: + def test_deterministic_with_same_seed(self): + cfg = _make_cfg() + seed = list(range(32)) + cache1 = BlockCache(cfg, seed, max_count=20) + cache2 = BlockCache(cfg, seed, max_count=20) + for _ in range(20): + b1, b2 = cache1.next(), cache2.next() + assert type(b1) == type(b2) + if isinstance(b1, MetricCall) and isinstance(b2, MetricCall): + assert b1.name == b2.name + assert b1.metric_type == b2.metric_type + + def test_different_seeds_differ(self): + cfg = _make_cfg() + cache1 = BlockCache(cfg, list(range(32)), max_count=50) + cache2 = BlockCache(cfg, list(reversed(range(32))), max_count=50) + blocks1 = [cache1.next() for _ in range(50)] + blocks2 = [cache2.next() for _ in range(50)] + # Very unlikely all would match with different seeds + assert any( + (isinstance(b1, MetricCall) and isinstance(b2, MetricCall) and b1.name != b2.name) + for b1, b2 in zip(blocks1, blocks2) + ) + + def test_wraps_around(self): + cfg = _make_cfg() + cache = BlockCache(cfg, list(range(32)), max_count=3) + b0a = cache.next() + b1a = cache.next() + b2a = cache.next() + b0b = cache.next() # wraps + if isinstance(b0a, MetricCall) and isinstance(b0b, MetricCall): + assert b0a.name == b0b.name + assert b0a.metric_type == b0b.metric_type + + def test_count_respected(self): + cfg = _make_cfg() + cache = BlockCache(cfg, list(range(32)), max_count=17) + assert len(cache._blocks) == 17 + + +# --------------------------------------------------------------------------- +# Byte estimation +# --------------------------------------------------------------------------- + +class TestEstimateBlockBytes: + def test_single_metric(self): + m = MetricCall("my.metric", 1.0, "gauge", ["env:prod", "host:foo"], None) + est = _estimate_block_bytes(m) + assert est > 0 + + def test_batch_larger_than_single(self): + m = MetricCall("x", 1.0, "gauge", [], None) + single = _estimate_block_bytes(m) + batch = _estimate_block_bytes([m, m, m]) + assert batch == single * 3 + + def test_event(self): + e = EventCall("title", "text", [], None, None) + assert _estimate_block_bytes(e) > 0 + + def test_service_check(self): + sc = ServiceCheckCall("check.name", 0, []) + assert _estimate_block_bytes(sc) > 0 diff --git a/lading_py/tests/test_registry.py b/lading_py/tests/test_registry.py new file mode 100644 index 000000000..7730a585b --- /dev/null +++ b/lading_py/tests/test_registry.py @@ -0,0 +1,133 @@ +"""Tests for the thread-safe metric registry.""" +import threading +from lading_py.telemetry.registry import Registry, _key + + +class TestKey: + def test_same_labels_same_key(self): + k1 = _key("metric", {"a": "1", "b": "2"}) + k2 = _key("metric", {"b": "2", "a": "1"}) # different insertion order + assert k1 == k2 + + def test_different_names_differ(self): + assert _key("a", {}) != _key("b", {}) + + def test_different_labels_differ(self): + assert _key("m", {"a": "1"}) != _key("m", {"a": "2"}) + + +class TestRegistryCounters: + def test_increment_basic(self): + r = Registry() + r.increment("hits", 5) + counters, _, _ = r.snapshot() + assert counters[_key("hits", {})] == 5 + + def test_increment_accumulates(self): + r = Registry() + r.increment("hits", 3) + r.increment("hits", 7) + counters, _, _ = r.snapshot() + assert counters[_key("hits", {})] == 10 + + def test_increment_default_value(self): + r = Registry() + r.increment("x") + counters, _, _ = r.snapshot() + assert counters[_key("x", {})] == 1 + + def test_increment_with_labels(self): + r = Registry() + r.increment("req", 1, {"status": "200"}) + r.increment("req", 2, {"status": "500"}) + counters, _, _ = r.snapshot() + assert counters[_key("req", {"status": "200"})] == 1 + assert counters[_key("req", {"status": "500"})] == 2 + + def test_counters_persist_across_snapshots(self): + r = Registry() + r.increment("x", 10) + r.snapshot() + r.increment("x", 5) + counters, _, _ = r.snapshot() + assert counters[_key("x", {})] == 15 # cumulative total + + def test_thread_safety(self): + r = Registry() + N = 1000 + threads = [ + threading.Thread(target=lambda: r.increment("counter", 1)) + for _ in range(N) + ] + for t in threads: + t.start() + for t in threads: + t.join() + counters, _, _ = r.snapshot() + assert counters[_key("counter", {})] == N + + +class TestRegistryGauges: + def test_set_gauge(self): + r = Registry() + r.set_gauge("cpu", 42.5) + _, gauges, _ = r.snapshot() + assert gauges[_key("cpu", {})] == 42.5 + + def test_gauge_overwritten(self): + r = Registry() + r.set_gauge("mem", 100.0) + r.set_gauge("mem", 200.0) + _, gauges, _ = r.snapshot() + assert gauges[_key("mem", {})] == 200.0 + + def test_gauge_persists_across_snapshots(self): + r = Registry() + r.set_gauge("g", 5.0) + r.snapshot() + _, gauges, _ = r.snapshot() + assert gauges[_key("g", {})] == 5.0 + + def test_gauge_with_labels(self): + r = Registry() + r.set_gauge("temp", 37.0, {"zone": "a"}) + r.set_gauge("temp", 22.0, {"zone": "b"}) + _, gauges, _ = r.snapshot() + assert gauges[_key("temp", {"zone": "a"})] == 37.0 + assert gauges[_key("temp", {"zone": "b"})] == 22.0 + + +class TestRegistryHistograms: + def test_record_histogram(self): + r = Registry() + r.record_histogram("latency", 12.5) + _, _, histograms = r.snapshot() + assert histograms[_key("latency", {})] == [12.5] + + def test_histogram_drained_on_snapshot(self): + r = Registry() + r.record_histogram("h", 1.0) + r.snapshot() + _, _, histograms = r.snapshot() + assert _key("h", {}) not in histograms or histograms[_key("h", {})] == [] + + def test_multiple_samples_collected(self): + r = Registry() + for v in [1.0, 2.0, 3.0]: + r.record_histogram("h", v) + _, _, histograms = r.snapshot() + assert sorted(histograms[_key("h", {})]) == [1.0, 2.0, 3.0] + + def test_histogram_thread_safety(self): + r = Registry() + N = 500 + threads = [ + threading.Thread(target=lambda: r.record_histogram("h", 1.0)) + for _ in range(N) + ] + for t in threads: + t.start() + for t in threads: + t.join() + _, _, histograms = r.snapshot() + assert len(histograms[_key("h", {})]) == N diff --git a/lading_py/tests/test_target_metrics.py b/lading_py/tests/test_target_metrics.py new file mode 100644 index 000000000..f288edddc --- /dev/null +++ b/lading_py/tests/test_target_metrics.py @@ -0,0 +1,175 @@ +"""Tests for Prometheus text parser and Expvar path resolver.""" +import pytest +from lading_py.target_metrics.prometheus import _parse_text, _parse_labels +from lading_py.target_metrics.expvar import _resolve_path + + +# --------------------------------------------------------------------------- +# Prometheus text format parser +# --------------------------------------------------------------------------- + +PROM_SAMPLE = """ +# HELP http_requests_total Total HTTP requests +# TYPE http_requests_total counter +http_requests_total{method="GET",status="200"} 1234 +http_requests_total{method="POST",status="500"} 5 + +# HELP cpu_usage CPU utilisation +# TYPE cpu_usage gauge +cpu_usage 0.73 + +# HELP memory_bytes Memory usage in bytes +# TYPE memory_bytes gauge +memory_bytes{host="web01"} 536870912 + +# TYPE some_histogram histogram +some_histogram_bucket{le="0.1"} 10 +some_histogram_bucket{le="+Inf"} 100 +some_histogram_sum 55.5 +some_histogram_count 100 +""" + + +class TestParseLabels: + def test_empty(self): + assert _parse_labels("") == {} + + def test_single(self): + assert _parse_labels('{env="prod"}') == {"env": "prod"} + + def test_multiple(self): + result = _parse_labels('{a="1",b="2",c="3"}') + assert result == {"a": "1", "b": "2", "c": "3"} + + def test_spaces_ignored(self): + result = _parse_labels('{method="GET", status="200"}') + assert "method" in result + assert "status" in result + + +class TestParseText: + def _by_name(self, results, name): + return [(k, v, l) for n, k, v, l in results if n == name] + + def test_counter_type(self): + results = _parse_text(PROM_SAMPLE) + names_kinds = {n: k for n, k, v, l in results} + assert names_kinds.get("http_requests_total") == "counter" + + def test_gauge_type(self): + results = _parse_text(PROM_SAMPLE) + names_kinds = {n: k for n, k, v, l in results} + assert names_kinds.get("cpu_usage") == "gauge" + + def test_counter_values(self): + results = _parse_text(PROM_SAMPLE) + req_results = [(v, l) for n, k, v, l in results if n == "http_requests_total"] + assert (1234.0, {"method": "GET", "status": "200"}) in req_results + assert (5.0, {"method": "POST", "status": "500"}) in req_results + + def test_gauge_no_labels(self): + results = _parse_text(PROM_SAMPLE) + cpu = [(v, l) for n, k, v, l in results if n == "cpu_usage"] + assert len(cpu) == 1 + assert cpu[0][0] == pytest.approx(0.73) + assert cpu[0][1] == {} + + def test_gauge_with_labels(self): + results = _parse_text(PROM_SAMPLE) + mem = [(v, l) for n, k, v, l in results if n == "memory_bytes"] + assert len(mem) == 1 + assert mem[0][1] == {"host": "web01"} + assert mem[0][0] == 536870912.0 + + def test_histogram_type(self): + results = _parse_text(PROM_SAMPLE) + names_kinds = {n: k for n, k, v, l in results} + assert names_kinds.get("some_histogram_bucket") == "histogram" + + def test_comments_skipped(self): + results = _parse_text("# HELP foo bar\n# TYPE foo gauge\nfoo 1.0\n") + assert len(results) == 1 + assert results[0][0] == "foo" + + def test_empty_input(self): + assert _parse_text("") == [] + + def test_unknown_type_defaults_to_gauge(self): + results = _parse_text("unknown_metric 42.0\n") + assert results[0][1] == "gauge" + + def test_inf_value(self): + results = _parse_text("# TYPE b histogram\nb_bucket{le=\"+Inf\"} 100\n") + vals = [v for _, _, v, _ in results] + assert any(v == 100.0 for v in vals) + + def test_scientific_notation(self): + results = _parse_text("# TYPE m gauge\nm 1.5e3\n") + assert results[0][2] == pytest.approx(1500.0) + + def test_negative_value(self): + results = _parse_text("# TYPE m gauge\nm -3.14\n") + assert results[0][2] == pytest.approx(-3.14) + + def test_multiline_no_crash(self): + lines = ["# TYPE requests counter"] + lines += [f'requests{{path="/api/{i}"}} {i * 10}' for i in range(50)] + results = _parse_text("\n".join(lines)) + assert len(results) == 50 + + +# --------------------------------------------------------------------------- +# Expvar path resolver +# --------------------------------------------------------------------------- + +class TestResolvePath: + def _data(self): + return { + "cmdline": ["agent", "-config", "agent.yaml"], + "uptime": 12345, + "forwarder": { + "Transactions": { + "Success": 99, + "Errors": 3, + }, + "FileStorage": { + "FilesCount": 7, + }, + }, + } + + def test_top_level_numeric(self): + assert _resolve_path(self._data(), "/uptime") == 12345 + + def test_nested_two_levels(self): + assert _resolve_path(self._data(), "/forwarder/FileStorage/FilesCount") == 7 + + def test_nested_three_levels(self): + assert _resolve_path(self._data(), "/forwarder/Transactions/Success") == 99 + + def test_missing_top_level(self): + assert _resolve_path(self._data(), "/nonexistent") is None + + def test_missing_nested(self): + assert _resolve_path(self._data(), "/forwarder/Transactions/Missing") is None + + def test_missing_mid_path(self): + assert _resolve_path(self._data(), "/forwarder/NoSuchKey/Count") is None + + def test_returns_dict(self): + result = _resolve_path(self._data(), "/forwarder/Transactions") + assert isinstance(result, dict) + assert result["Success"] == 99 + + def test_leading_slash_handled(self): + # Both with and without leading slash should work + assert _resolve_path(self._data(), "/uptime") == 12345 + + def test_empty_path(self): + # Empty path should return the whole dict + result = _resolve_path(self._data(), "/") + assert result == self._data() + + def test_path_into_list_returns_none(self): + # /cmdline points to a list; trying to go deeper should return None + assert _resolve_path(self._data(), "/cmdline/0") is None diff --git a/plans/pythonport.md b/plans/pythonport.md new file mode 100644 index 000000000..7284d8f9c --- /dev/null +++ b/plans/pythonport.md @@ -0,0 +1,617 @@ +# Lading Python Port Plan + +Port lading to Python with DogStatsD emission only. All telemetry collection, +reporting, and output functionality must be preserved. + +--- + +## Scope + +**In scope:** +- DogStatsD generator (emit via `dogstatsd-py` library) +- Telemetry collection from Datadog agent (Prometheus scrape + Expvar poll) +- Capture output (JSONL, Parquet, both) +- Prometheus exporter (passive HTTP `/metrics` endpoint) +- Observer (Linux `/proc` sampling) +- Config parsing (same YAML schema as Rust lading) +- Graceful lifecycle (warmup, experiment, shutdown) +- Blackhole (HTTP sink for target output) + +**Out of scope:** +- All non-DogStatsD generators (TCP, UDP, HTTP, Unix stream, Fluent, OTLP, etc.) +- All non-DogStatsD payload types +- Windows support + +--- + +## Reference Files + +| Concern | Rust source | +|---------|-------------| +| DogStatsD payload generation | `lading_payload/src/dogstatsd.rs` | +| Unix datagram transport | `lading/src/generator/unix_datagram.rs` | +| Capture line schema | `lading_capture/src/line.rs` | +| Capture accumulator | `lading_capture/src/accumulator.rs` | +| Prometheus target metrics | `lading/src/target_metrics/prometheus.rs` | +| Expvar target metrics | `lading/src/target_metrics/expvar.rs` | +| Config schema | `lading/src/config.rs` | +| Example config | `lading.yaml` | + +--- + +## Technology Choices + +| Concern | Library | Rationale | +|---------|---------|-----------| +| DogStatsD emission | `dogstatsd-py` (`datadog`) | Required by spec | +| Async runtime | `asyncio` | Standard; matches Tokio concurrency model | +| HTTP client | `aiohttp` | Async Prometheus/Expvar scraping | +| Config parsing | `pydantic` + `PyYAML` | Validated schema, matches Rust serde | +| Prometheus export | `prometheus-client` | Passive `/metrics` scrape endpoint | +| JSONL output | stdlib `json` + `gzip` | Zero dependencies | +| Parquet output | `pyarrow` | Industry standard; schema matches Rust | +| Protobuf (DDSketch) | `protobuf` + datadog proto | Histogram serialization | +| `/proc` parsing | stdlib only (Linux) | Avoids psutil divergence from Rust impl | +| Structured logging | `structlog` | JSON-friendly | + +--- + +## Project Layout + +``` +lading_py/ +├── pyproject.toml +├── lading_py/ +│ ├── __init__.py +│ ├── main.py # Entry point; lifecycle orchestration +│ ├── config.py # Pydantic config models (mirrors lading/src/config.rs) +│ ├── signal.py # asyncio.Event wrappers for lifecycle signals +│ │ +│ ├── generator/ +│ │ ├── __init__.py +│ │ └── dogstatsd.py # DogStatsD generator (uses dogstatsd-py) +│ │ +│ ├── payload/ +│ │ ├── __init__.py +│ │ └── dogstatsd.py # Payload construction (context pool, tag pool) +│ │ +│ ├── blackhole/ +│ │ ├── __init__.py +│ │ └── http.py # HTTP blackhole (aiohttp server) +│ │ +│ ├── target_metrics/ +│ │ ├── __init__.py +│ │ ├── prometheus.py # Prometheus scraper +│ │ └── expvar.py # Expvar poller +│ │ +│ ├── observer/ +│ │ ├── __init__.py +│ │ └── proc.py # /proc/{pid}/ sampler (Linux) +│ │ +│ ├── capture/ +│ │ ├── __init__.py +│ │ ├── line.py # Line dataclass (mirrors lading_capture/src/line.rs) +│ │ ├── accumulator.py # Rolling metric accumulator +│ │ ├── jsonl_writer.py # JSONL output +│ │ └── parquet_writer.py # Parquet output +│ │ +│ └── telemetry/ +│ ├── __init__.py +│ ├── registry.py # Thread-safe counter/gauge/histogram registry +│ └── prometheus_exporter.py # Passive HTTP /metrics endpoint +│ +└── tests/ + ├── test_config.py + ├── test_payload.py + ├── test_capture.py + └── test_target_metrics.py +``` + +--- + +## Step-by-Step Implementation Plan + +### Phase 1: Foundation + +#### Step 1 — Project scaffold + +Create `pyproject.toml` with dependencies, entry point `lading-py`, and Python >= 3.11 requirement. Pin all deps. + +```toml +[project] +name = "lading-py" +requires-python = ">=3.11" +dependencies = [ + "datadog>=0.49", # dogstatsd-py + "pydantic>=2", + "PyYAML>=6", + "aiohttp>=3.9", + "prometheus-client>=0.20", + "pyarrow>=15", + "protobuf>=4", + "structlog>=24", +] + +[project.scripts] +lading-py = "lading_py.main:main" +``` + +#### Step 2 — Config models (`config.py`) + +Pydantic models mirroring Rust config structs. Must parse the same YAML that Rust lading accepts. + +Key models: +- `ConfRange` — `{inclusive: {min, max}}` or `{exclusive: ...}` +- `KindWeights` — `{metric, event, service_check}` +- `MetricWeights` — `{count, gauge, timer, distribution, set, histogram}` +- `DogStatsDConfig` — full dogstatsd variant config +- `UnixDatagramConfig` — transport config (path, bytes_per_second, etc.) +- `GeneratorConfig` — wrapper with optional `id` +- `BlackholeConfig` — HTTP blackhole +- `TargetMetricsConfig` — list of prometheus/expvar entries +- `TelemetryConfig` — log/prometheus/prometheus_socket variant +- `ObserverConfig` +- `RootConfig` — top-level; holds all above + +Validation rules to replicate from Rust: +- `bytes_per_second` parsed from human-readable string ("1 MiB" → 1048576) +- `seed` must be 32 bytes +- `kind_weights` values must not all be zero +- `tag_length.end > MIN_TAG_LENGTH` check (PR #1875) + +#### Step 3 — Signals (`signal.py`) + +```python +class Signals: + experiment_started: asyncio.Event + shutdown: asyncio.Event + target_pid: asyncio.Event # set when target PID is known +``` + +Wraps broadcast-style coordination. All tasks await `experiment_started` before operating, all tasks check `shutdown` to terminate. + +--- + +### Phase 2: DogStatsD Generator + +#### Step 4 — Payload context pool (`payload/dogstatsd.py`) + +This is the most complex piece. Must replicate lading's weighted random generation. + +**Context pool:** +- Pre-generate N contexts (N = `contexts.max`) +- Each context: a fixed `(metric_name, tag_set)` tuple +- Tag sets sampled from tag name/value pools +- `unique_tag_ratio` controls how much tag reuse happens + +**Metric name templates:** +- `name{{0-2}}` expands to `name0`, `name1`, `name2` +- Expand all templates at startup, store as flat list +- Sample uniformly from expanded list + +**Payload generation produces call descriptors, not raw bytes.** All serialization +is delegated to dogstatsd-py at send time. + +```python +@dataclass +class MetricCall: + name: str + value: float + metric_type: str # "gauge" | "count" | "histogram" | "distribution" | "timing" | "set" + tags: list[str] # ["tag1:val1", "tag2:val2"] + sample_rate: float | None + timestamp: int | None # unix seconds, maps to |T field + +@dataclass +class EventCall: + title: str + text: str + tags: list[str] + alert_type: str | None # "error" | "warning" | "info" | "success" + priority: str | None + +@dataclass +class ServiceCheckCall: + name: str + status: int # 0=OK 1=WARNING 2=CRITICAL 3=UNKNOWN + tags: list[str] + message: str | None + +# A "block" is either a single call or a batch (multi-value) +Block = MetricCall | EventCall | ServiceCheckCall | list[MetricCall] + +def generate_block(rng, config, contexts) -> Block: + kind = weighted_choice(rng, config.kind_weights) + if kind == "metric": + count = 1 + if rng.random() < config.multivalue_pack_probability: + count = rng.randint(config.multivalue_count.min, config.multivalue_count.max) + calls = [_gen_metric_call(rng, config, contexts) for _ in range(count)] + return calls if count > 1 else calls[0] + elif kind == "event": + return _gen_event_call(rng, config) + else: + return _gen_service_check_call(rng, config) +``` + +**Multi-value packing:** +- With probability `multivalue_pack_probability`, generate `multivalue_count` metric + calls returned as a `list[MetricCall]`; the generator sends them inside a single + `client.open_buffer()` context so dogstatsd-py packs them into one datagram + +#### Step 5 — Block cache (`payload/dogstatsd.py`) + +Pre-build a cache of `Block` descriptors (call parameter tuples) before the run starts. +`maximum_prebuild_cache_size_bytes` bounds the cache: estimate each `MetricCall` at +~200 bytes of Python object overhead (not wire bytes) for the purposes of capping count. + +```python +class BlockCache: + def __init__(self, config, seed, contexts, max_count): + rng = Random(seed) # deterministic + self._blocks: list[Block] = [] + for _ in range(max_count): + self._blocks.append(generate_block(rng, config, contexts)) + self._idx = 0 + + def next(self) -> Block: + block = self._blocks[self._idx] + self._idx = (self._idx + 1) % len(self._blocks) + return block +``` + +Use `random.Random` with a seed derived from the `seed` config field (32-byte array). +Python's Mersenne Twister differs from Rust's StdRng but exact RNG parity is not +required — statistical properties matter, not bit-for-bit reproducibility. + +#### Step 6 — Generator task (`generator/dogstatsd.py`) + +All emission goes through `dogstatsd-py` (`datadog.dogstatsd.DogStatsd`). No raw socket +fallback. Each `Block` from the cache is dispatched to the appropriate client method. +Multi-value batches use `client.open_buffer()` so dogstatsd-py packs them into one +datagram internally. + +```python +_DISPATCH = { + "gauge": lambda c, m: c.gauge(m.name, m.value, tags=m.tags, sample_rate=m.sample_rate or 1), + "count": lambda c, m: c.increment(m.name, m.value, tags=m.tags, sample_rate=m.sample_rate or 1), + "histogram": lambda c, m: c.histogram(m.name, m.value, tags=m.tags, sample_rate=m.sample_rate or 1), + "distribution": lambda c, m: c.distribution(m.name, m.value, tags=m.tags, sample_rate=m.sample_rate or 1), + "timing": lambda c, m: c.timing(m.name, m.value, tags=m.tags, sample_rate=m.sample_rate or 1), + "set": lambda c, m: c.set(m.name, m.value, tags=m.tags), +} + +def _send_block(client: DogStatsd, block: Block) -> int: + """Send block via dogstatsd-py; return estimated wire bytes for rate limiting.""" + if isinstance(block, list): + # Multi-value batch — pack into one datagram + with client.open_buffer() as buf: + for m in block: + _DISPATCH[m.metric_type](buf, m) + return sum(_estimate_bytes(m) for m in block) + elif isinstance(block, MetricCall): + _DISPATCH[block.metric_type](client, block) + return _estimate_bytes(block) + elif isinstance(block, EventCall): + client.event(block.title, block.text, tags=block.tags, + alert_type=block.alert_type, priority=block.priority) + return _estimate_bytes(block) + else: # ServiceCheckCall + client.service_check(block.name, block.status, tags=block.tags, message=block.message) + return _estimate_bytes(block) + +class DogStatsDGenerator: + async def run(self, signals: Signals): + await signals.experiment_started.wait() + # One DogStatsd client per parallel connection (each has its own socket) + clients = [DogStatsd(socket_path=self.config.path) + for _ in range(self.config.parallel_connections)] + rate_limiter = TokenBucket(self.config.bytes_per_second) + tasks = [ + asyncio.create_task(self._send_loop(client, rate_limiter, signals)) + for client in clients + ] + await asyncio.gather(*tasks) + + async def _send_loop(self, client: DogStatsd, rate_limiter: TokenBucket, signals: Signals): + while not signals.shutdown.is_set(): + block = self.cache.next() + est_bytes = _estimate_bytes_block(block) + await rate_limiter.acquire(est_bytes) + try: + actual = _send_block(client, block) + self.registry.increment("bytes_written", actual) + self.registry.increment("packets_sent", 1) + except Exception as exc: + self.registry.increment("request_failure", 1, {"error": type(exc).__name__}) +``` + +**Limitation:** `length_prefix_framed: true` is unsupported. dogstatsd-py does not +expose length-prefix framing and there is no compliant way to implement it without +bypassing the library. Config validation will reject `length_prefix_framed: true` +with a clear error message. + +**Rate limiter:** Token bucket on estimated wire bytes (`len(name) + len(tags) + ~20`). +Async sleep to yield when bucket is empty. + +--- + +### Phase 3: Telemetry Collection + +#### Step 7 — Prometheus target metrics scraper (`target_metrics/prometheus.py`) + +```python +class PrometheusScraper: + async def run(self, signals: Signals): + await signals.experiment_started.wait() + async with aiohttp.ClientSession() as session: + while not signals.shutdown.is_set(): + text = await session.get(self.config.uri) + metrics = parse_prometheus_text(text) + for m in metrics: + self.registry.record(m.name, m.kind, m.value, m.labels | self.config.tags) + await asyncio.sleep(self.sample_period) +``` + +Prometheus text format parser: parse `# TYPE`, `# HELP`, metric lines. Handle counter, +gauge, histogram, summary. Map to lading's `MetricKind` (Counter/Gauge/Histogram). + +#### Step 8 — Expvar target metrics poller (`target_metrics/expvar.py`) + +```python +class ExpvarPoller: + async def run(self, signals: Signals): + await signals.experiment_started.wait() + async with aiohttp.ClientSession() as session: + while not signals.shutdown.is_set(): + data = await session.get(self.config.uri) # JSON + for var_path in self.config.vars: + value = jsonpath_get(data, var_path) # e.g. "/forwarder/Transactions/Success" + self.registry.record(var_path, MetricKind.Gauge, value, self.config.tags) + await asyncio.sleep(self.sample_period) +``` + +Path resolution: split `/foo/bar/baz` → nested dict lookup `data["foo"]["bar"]["baz"]`. + +--- + +### Phase 4: Observer + +#### Step 9 — `/proc` observer (`observer/proc.py`) + +Linux only. Samples `/proc/{pid}/smaps_rollup` and optionally `/proc/{pid}/smaps` +every `sample_period` seconds after `experiment_started`. + +Key metrics from `smaps_rollup`: +- `Rss` → gauge `smaps_rollup.Rss` +- `Pss` → gauge `smaps_rollup.Pss` +- `Private_Clean`, `Private_Dirty` → gauge +- `Anonymous` → gauge + +Parse format: `FieldName: kB` lines. + +Record all fields as gauges with label `pid=`. + +--- + +### Phase 5: Capture Output + +#### Step 10 — Metric registry (`telemetry/registry.py`) + +Thread-safe in-process registry. All generator/collector/observer code calls into this. + +```python +class Registry: + def increment(self, name: str, value: int, labels: dict): ... + def set_gauge(self, name: str, value: float, labels: dict): ... + def record_histogram(self, name: str, value: float, labels: dict): ... + def snapshot(self) -> list[Line]: ... # drain for flush +``` + +Internal storage: `threading.Lock` guarding dicts of `Counter`, `Gauge`, `DDSketch`. + +#### Step 11 — Line model (`capture/line.py`) + +Mirrors `lading_capture/src/line.rs`: + +```python +@dataclass +class Line: + run_id: str # UUID + time: int # ms since epoch + fetch_index: int # flush counter + metric_name: str + metric_kind: str # "counter" | "gauge" | "histogram" + value: float | int + labels: dict[str, str] + value_histogram: bytes # protobuf DDSketch, empty if not histogram +``` + +#### Step 12 — JSONL writer (`capture/jsonl_writer.py`) + +```python +class JsonlWriter: + def flush(self, lines: list[Line], fetch_index: int): + with open(self.path, "a") as f: + for line in lines: + f.write(json.dumps(dataclasses.asdict(line)) + "\n") +``` + +Flush every `flush_seconds` seconds via `asyncio.sleep` loop. +`value_histogram` bytes field: base64-encode in JSON output (matches Rust behavior). + +#### Step 13 — Parquet writer (`capture/parquet_writer.py`) + +Schema mirrors Rust Parquet output: + +```python +SCHEMA = pa.schema([ + ("run_id", pa.string()), + ("time", pa.int64()), + ("fetch_index", pa.int64()), + ("metric_name", pa.string()), + ("metric_kind", pa.string()), + ("value", pa.float64()), + ("labels", pa.map_(pa.string(), pa.string())), + ("value_histogram", pa.binary()), +]) +``` + +Accumulate rows in memory, flush to Parquet file at `flush_seconds` interval using +`pyarrow.parquet.write_table`. Append row groups (open file in append mode or write +separate files per flush and concatenate on shutdown). + +#### Step 14 — Accumulator (`capture/accumulator.py`) + +60-tick rolling window matching Rust accumulator behavior. + +Tracks per-metric history for computing rates (counters are differenced across ticks). +On each flush tick: +1. Snapshot registry +2. Diff counters vs previous snapshot +3. Pass gauge/histogram values through directly +4. Write `Line` objects to writer(s) + +--- + +### Phase 6: Telemetry Export + +#### Step 15 — Prometheus exporter (`telemetry/prometheus_exporter.py`) + +Passive HTTP endpoint. Uses `prometheus_client` library. + +```python +class PrometheusExporter: + async def run(self, signals: Signals): + # aiohttp handler for GET /metrics + # prometheus_client.generate_latest() for text format + # Periodically syncs from Registry to prometheus_client collectors +``` + +#### Step 16 — Blackhole HTTP (`blackhole/http.py`) + +`aiohttp` server that accepts all POST/PUT requests and discards the body. +Records bytes received as a counter. Binds to configured address. + +--- + +### Phase 7: Lifecycle Orchestration + +#### Step 17 — Main (`main.py`) + +```python +async def inner_main(config: RootConfig): + signals = Signals() + run_id = str(uuid.uuid4()) + + # Build telemetry output + registry = Registry() + writer = build_writer(config.telemetry, run_id) + + # Build and start all components as asyncio tasks + tasks = [] + + if config.generator: + for gen_cfg in config.generator: + dsd_cfg = gen_cfg.unix_datagram.variant.dogstatsd + contexts = build_context_pool(dsd_cfg) + cache = BlockCache(dsd_cfg, gen_cfg.unix_datagram.seed, contexts, max_count=10_000) + tasks.append(asyncio.create_task( + DogStatsDGenerator(gen_cfg, cache, registry).run(signals) + )) + + for bh_cfg in config.blackhole or []: + tasks.append(asyncio.create_task(BlackholeHttp(bh_cfg).run(signals))) + + for tm_cfg in config.target_metrics or []: + tasks.append(asyncio.create_task(build_target_metrics(tm_cfg, registry, signals))) + + if config.observer: + tasks.append(asyncio.create_task(Observer(config.observer, registry).run(signals))) + + tasks.append(asyncio.create_task(accumulate_and_flush(registry, writer, signals))) + + # Lifecycle + await asyncio.sleep(config.warmup_seconds or 0) + signals.experiment_started.set() + await asyncio.sleep(config.experiment_duration_seconds) + signals.shutdown.set() + + await asyncio.gather(*tasks, return_exceptions=True) + writer.finalize() + +def main(): + import argparse, yaml + parser = argparse.ArgumentParser() + parser.add_argument("--config", required=True) + args = parser.parse_args() + with open(args.config) as f: + raw = yaml.safe_load(f) + config = RootConfig.model_validate(raw) + asyncio.run(inner_main(config)) +``` + +Signal handling: install `SIGTERM`/`SIGINT` handler that sets `signals.shutdown`. + +--- + +### Phase 8: Testing + +#### Step 18 — Unit tests + +- `test_config.py`: load `lading.yaml`, verify all fields parse correctly +- `test_payload.py`: generate 1000 messages, verify wire format parses as valid DogStatsD +- `test_capture.py`: write Lines to JSONL and Parquet, read back, verify schema +- `test_target_metrics.py`: mock aiohttp responses, verify Prometheus text parse + +#### Step 19 — Integration smoke test + +`tests/smoke_test.py`: +1. Spin up a UDP socket server (mimicking the agent's DogStatsD socket) +2. Run `lading-py --config tests/smoke.yaml` for 5 seconds +3. Assert bytes received > 0 +4. Assert JSONL output file exists and has lines + +--- + +## Implementation Order + +1. `config.py` — foundation everything else depends on +2. `signal.py` — trivial, needed early +3. `payload/dogstatsd.py` — context pool, block cache +4. `generator/dogstatsd.py` — core feature +5. `telemetry/registry.py` — all metrics recording +6. `capture/line.py` + `capture/jsonl_writer.py` — minimum viable output +7. `main.py` — wire everything together; can test end-to-end here +8. `target_metrics/prometheus.py` + `target_metrics/expvar.py` +9. `observer/proc.py` +10. `capture/parquet_writer.py` + `capture/accumulator.py` +11. `telemetry/prometheus_exporter.py` + `blackhole/http.py` +12. Tests + +--- + +## Key Fidelity Decisions + +| Behavior | Rust | Python | +|----------|------|--------| +| RNG | SeededStdRng (ChaCha) | `random.Random(seed)` | +| RNG parity | Bit-exact reproducibility | Not required; stats parity only | +| Concurrency | Tokio async | asyncio | +| Socket type | Raw Unix datagram | dogstatsd-py exclusively; `open_buffer()` for batches | +| Histogram sketch | DDSketch (protobuf) | Same protobuf schema | +| Time unit | ms (u128) | `int(time.time() * 1000)` | +| Byte sizes | `bytesize` crate parsing | Manual parser: "1 MiB" → 1048576 | +| Config YAML | serde_yaml | PyYAML + pydantic | + +--- + +## Risks and Mitigations + +| Risk | Mitigation | +|------|-----------| +| `length_prefix_framed: true` unsupported by dogstatsd-py | Reject at config validation with clear error; all other wire formats work | +| Python throughput lower than Rust | Pre-built block cache + asyncio avoids per-message allocation; accept perf trade-off | +| DDSketch protobuf schema not public | Extract `.proto` from datadog-agent repo; codegen with `protoc` | +| Prometheus text format edge cases | Use `prometheus_client`'s own parser instead of hand-rolling | +| `/proc` parsing changes across kernel versions | Mirror Rust's exact field-by-field parsing; skip unknown fields |