diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml index 4b7bab0aaf55..67004a7d2e76 100644 --- a/.github/workflows/skywalking.yaml +++ b/.github/workflows/skywalking.yaml @@ -713,6 +713,10 @@ jobs: config: test/e2e-v2/cases/kong/e2e.yaml - name: Flink config: test/e2e-v2/cases/flink/e2e.yaml + - name: Airflow + config: test/e2e-v2/cases/airflow/e2e.yaml + - name: Airflow Cluster + config: test/e2e-v2/cases/airflow/e2e-cluster.yaml - name: OTLP Trace config: test/e2e-v2/cases/otlp-traces/e2e.yaml diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index b30c394b83aa..fec941aa8677 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -276,6 +276,7 @@ * Fix: TTL query add metadata TTL. * Fix: PersistentWorker used wrong TTL for metrics cache if the storage is BanyanDB. * Add iOS/iPadOS app monitoring via OpenTelemetry Swift SDK (SWIP-11). Includes the `IOS` layer, `IOSHTTPSpanListener` for outbound HTTP client metrics (supports OTel Swift `.old`/`.stable`/`.httpDup` semantic-convention modes via stable-then-legacy attribute fallback), `IOSMetricKitSpanListener` for daily MetricKit metrics (exit counts split by foreground/background, app-launch / hang-time percentile histograms with finite 30 s overflow ceiling), LAL rules for crash/hang diagnostics, Mobile menu, and iOS dashboards. +* Add Apache Airflow monitoring via native OpenTelemetry metrics (SWIP-7). New `AIRFLOW` layer with Service (cluster) and Instance (host) dimensions, MAL rules under `otel-rules/airflow/`, setup docs, mock OTLP e2e (`cases/airflow/e2e.yaml`: full SWIP-7, 30 checks), and real Celery-cluster integration smoke (`e2e-cluster.yaml`: scheduler + two workers + triggerer; deferrable and dataset DAGs with ~4-minute live workload; 25 checks — native scheduler/executor/triggerer OTLP plus e2e Celery sidecar pool gauges on one worker; metrics needing synthetic OTLP or rare failure events such as `pool_queued_slots` are mock-only). See `test/e2e-v2/cases/airflow/README.md`. Horizon UI dashboards ship separately in `apache/skywalking-horizon-ui` under the Workflow Scheduler menu group. * Fix LAL `layer: auto` mode dropping logs after extractor set the layer. Codegen now propagates `layer "..."` assignments to `LogMetadata.layer` so `FilterSpec.doSink()` sees the script-decided layer. * Fix MetricKit histogram percentile metrics being reported at 1000× their true value — the listener now marks its `SampleFamily` with `defaultHistogramBucketUnit(MILLISECONDS)` so MAL's default SECONDS→MS rescale of `le` labels is not applied. * Add WeChat and Alipay Mini Program monitoring via the SkyAPM mini-program-monitor SDK (SWIP-12). Two new layers (`WECHAT_MINI_PROGRAM`, `ALIPAY_MINI_PROGRAM`); two new JavaScript componentIds (`WeChat-MiniProgram: 10002`, `AliPay-MiniProgram: 10003`). Service / instance / endpoint entities are produced by MAL + LAL, not trace analysis — mini-programs are client-side (exit-only) so `RPCAnalysisListener` stays unchanged (same pattern as browser and iOS). MAL rules per platform × scope under `otel-rules/miniprogram/` with explicit `.service(...)` / `.endpoint(...)` chains (empty `expSuffix` so endpoint-scope rules aren't overridden), histogram percentile via `.histogram("le", TimeUnit.MILLISECONDS)` to keep ms bucket bounds intact, and request-cpm derived from the histogram `_count` family. LAL `layer: auto` rule produces both layers via `miniprogram.platform` dispatch and emits error-count samples consumed by per-platform log-MAL rules. Per-layer menu entries and service / instance / endpoint dashboards with Trace and Log sub-tabs. @@ -300,6 +301,7 @@ #### Documentation * Update LAL documentation with `sourceAttribute()` function and `layer: auto` mode. * Add iOS app monitoring setup documentation. +* Add Apache Airflow monitoring setup documentation (SWIP-7). * Add WeChat / Alipay Mini Program monitoring setup documentation, plus a client-side-monitoring section in the security guide covering public-internet ingress (OTLP + `/v3/segments`) for mobile / browser / mini-program SDKs. * Improve downsampling documentation diff --git a/docs/en/concepts-and-designs/service-hierarchy.md b/docs/en/concepts-and-designs/service-hierarchy.md index 5f3c5144fcf1..12b2918d5422 100644 --- a/docs/en/concepts-and-designs/service-hierarchy.md +++ b/docs/en/concepts-and-designs/service-hierarchy.md @@ -38,6 +38,7 @@ If you want to customize it according to your own needs, please refer to [Servic | PULSAR | K8S_SERVICE | [PULSAR On K8S_SERVICE](#pulsar-on-k8s_service) | | SO11Y_OAP | K8S_SERVICE | [SO11Y_OAP On K8S_SERVICE](#so11y_oap-on-k8s_service) | | KONG | K8S_SERVICE | [KONG On K8S_SERVICE](#kong-on-k8s_service) | +| AIRFLOW | K8S_SERVICE | [AIRFLOW On K8S_SERVICE](#airflow-on-k8s_service) | - The following sections will describe the **default matching rules** in detail and use the `upper-layer On lower-layer` format. - The example service name are based on SkyWalking [Showcase](https://github.com/apache/skywalking-showcase) default deployment. @@ -229,6 +230,14 @@ If you want to customize it according to your own needs, please refer to [Servic - KONG.service.name: `kong::kong.skywalking-showcase` - K8S_SERVICE.service.name: `skywalking-showcase::kong.skywalking-showcase` +#### AIRFLOW On K8S_SERVICE +- Rule name: `short-name` +- Matching expression: `{ (u, l) -> u.shortName == l.shortName }` +- Description: AIRFLOW.service.shortName == K8S_SERVICE.service.shortName +- Matched Example: + - AIRFLOW.service.name: `airflow::airflow.skywalking-showcase` + - K8S_SERVICE.service.name: `skywalking-showcase::airflow.skywalking-showcase` + ### Build Through Specific Agents Use agent tech involved(such as eBPF) and deployment tools(such as operator and agent injector) to detect the service hierarchy relations. diff --git a/docs/en/setup/backend/backend-airflow-monitoring.md b/docs/en/setup/backend/backend-airflow-monitoring.md new file mode 100644 index 000000000000..46f69305cde4 --- /dev/null +++ b/docs/en/setup/backend/backend-airflow-monitoring.md @@ -0,0 +1,239 @@ +# Airflow monitoring + +## Airflow metrics via native OpenTelemetry + +SkyWalking receives Airflow metrics through Airflow's native OpenTelemetry exporter and the +[OpenTelemetry receiver](opentelemetry-receiver.md), then aggregates them with +[MAL](../../concepts-and-designs/mal.md). + +## Data flow + +1. Enable OpenTelemetry metrics in Airflow (`pip install 'apache-airflow[otel]'`, `otel_on = True` + or standard `OTEL_EXPORTER_OTLP_*` environment variables). +2. Airflow **pushes** OTLP metrics to OpenTelemetry Collector. +3. OpenTelemetry Collector forwards metrics to SkyWalking OAP via OTLP gRPC exporter. +4. OAP applies MAL rules under `otel-rules/airflow/` and stores Service / Instance entities on + `Layer: AIRFLOW`. + +```mermaid +graph LR; + Airflow("Airflow") --> Collector("OTel Collector") + Collector --> OAP("SkyWalking OAP") + OAP --> UI("Horizon UI") +``` + +In the Horizon UI, Airflow appears under the **Workflow Scheduler** menu group. + +## Setup + +### 1. Enable Airflow OpenTelemetry metrics + +Install the OTel extra and enable metrics export. See the +[Airflow metrics documentation](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html). + +Example environment variables for Airflow 3.x: + +```bash +pip install 'apache-airflow[otel]' + +export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 +export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +export OTEL_RESOURCE_ATTRIBUTES=cluster=prod-airflow +``` + +`cluster` is required so SkyWalking can name the Airflow Service (`airflow::prod-airflow`). You +can also inject it with a Collector `resource` processor. + +Legacy `airflow.cfg` keys (`otel_host`, `otel_port`, `otel_prefix`, …) still work on older +releases but are deprecated in favor of standard OTel SDK variables. + +### 2. Configure OpenTelemetry Collector + +Example pipeline: + +```yaml +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + grpc: + endpoint: 0.0.0.0:4317 + +processors: + batch: + +exporters: + otlp: + endpoint: oap:11800 + tls: + insecure: true + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [otlp] +``` + +Refer to [test/e2e-v2/cases/airflow/otel-collector-config.yaml](../../../../test/e2e-v2/cases/airflow/otel-collector-config.yaml) +for a minimal Collector pipeline without hard-coded service or instance labels. + +### 3. Enable SkyWalking OpenTelemetry receiver + +Ensure `airflow/*` is listed in `SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES` (enabled by default +in the distribution). + +## Entity model + +| SkyWalking entity | Airflow mapping | +|-------------------|-----------------| +| Service | `airflow::{cluster}` from OTLP resource attribute `cluster` | +| Instance | `{host.name}` — scheduler, worker, or triggerer hostname | + +### Components vs SkyWalking Instance vs Airflow Task Instance + +In OAP and MAL, the second entity is the standard SkyWalking **Instance** (see +`Layer: AIRFLOW`, `airflow-instance.yaml`). In the Horizon UI, the AIRFLOW layer uses the +display alias **Components** (Chinese: **组件**) for that tab instead of the generic label +**Instance**. + +This is intentional: + +1. **Avoid confusion with Airflow [Task Instance](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instance).** + In Airflow, a *task instance* is one execution of a task within a single DAG run (for example + `daily_etl · 2026-06-01 · extract_data · try_number=1`). It is short-lived, stored in the + Airflow metadata database, and unrelated to SkyWalking's Instance entity. Airflow operators + already use the word *instance* heavily in that sense; labeling the scheduler / worker / + triggerer tab **Instance** in the UI would suggest task-level drill-down rather than + long-running component processes. + +2. **Match the deployment model.** Each row under **Components** is a long-running Airflow + role — scheduler, Celery worker, triggerer, and optionally webserver — identified by OTLP + resource `host.name` (pod hostname or an operator-defined name). Multiple worker replicas + appear as multiple component rows under one Service (`airflow::{cluster}`). + +3. **Align with other Horizon layers.** Flink uses **TaskManagers**, Kubernetes uses **Pods**; + AIRFLOW uses **Components** for the same pattern: a domain-specific name for what SkyWalking + stores as Instance. + +| Term | Meaning | +|------|---------| +| SkyWalking **Service** | One Airflow cluster (`airflow::{cluster}`) | +| SkyWalking **Instance** (OAP) / **Components** (UI) | One scheduler / worker / triggerer process or pod (`host.name`) | +| Airflow **Task Instance** | One run of one task in one DAG run — **not** shown on this dashboard tab | + +Service-level panels aggregate cluster-wide samples. Instance-level (component-level) panels are +scoped per `host.name`. Do not sum instance-scoped samples into service dashboards when each +component exports the same instrument independently. + +Airflow pushes OTLP metrics; SkyWalking does not pull them. The Collector only receives +push exports and forwards them to OAP. Do not hard-code service or instance names in +Collector processors — derive them from resource attributes that Airflow (or your +deployment) attaches to each export batch. + +Required resource attributes: + +| Attribute | Purpose | +|-----------|---------| +| `cluster` | Names the logical Airflow cluster (`airflow::{cluster}` service) | +| `host.name` | Identifies the scheduler / worker / triggerer host (SkyWalking instance) | + +On Kubernetes, set `cluster` to your deployment name (for example via +`OTEL_RESOURCE_ATTRIBUTES=cluster=prod-airflow`) and rely on the OTel SDK's default +`host.name` (pod hostname) for instance identity. When a single Collector receives +metrics from multiple Airflow pods, each pod's push carries its own resource block, so +no per-instance relabeling is required. + +### Kubernetes sidecar deployment (recommended) + +For production Kubernetes deployments, run OpenTelemetry Collector as a **sidecar** +alongside each Airflow component (scheduler, worker, triggerer). Airflow pushes to +`localhost:4318`; the sidecar forwards to a cluster-wide Collector or directly to OAP. +This matches the push model and keeps `cluster` / `host.name` aligned with the pod that +emitted the metrics. + +Two e2e cases cover Airflow monitoring (full coverage matrix and latest verify report: +[test/e2e-v2/cases/airflow/README.md](../../../../test/e2e-v2/cases/airflow/README.md)): + +- **Mock (CI default, fast):** `test/e2e-v2/cases/airflow/e2e.yaml` replays OTLP JSON via a + Python sidecar ([`otlp_replay_server.py`](../../../../test/e2e-v2/cases/airflow/scripts/otlp_replay_server.py), + built from [`Dockerfile.mock-sender`](../../../../test/e2e-v2/cases/airflow/Dockerfile.mock-sender)) + with realistic `cluster` and `host.name` resource attributes. +- **Real Celery cluster (production-like integration smoke):** `test/e2e-v2/cases/airflow/e2e-cluster.yaml` + starts scheduler, two workers, and triggerer (`cluster=airflow-e2e-cluster`), seeds deferrable + and dataset DAGs plus load workload (~4 minutes), then verifies **25 integration checks** + (native scheduler / executor / triggerer OTLP plus e2e Celery sidecar pool gauges on + `airflow-worker-1`). Metrics that need synthetic OTLP or rare Airflow events + (`asset_updates`, `pool_queued_slots`, `triggers_failed`, `triggers_blocked_main_thread`) + are covered only in the mock suite. See + [e2e README](../../../../test/e2e-v2/cases/airflow/README.md). + +## Supported metrics + +MAL rule definitions live in: + +- `otel-rules/airflow/airflow-service.yaml` — cluster service metrics +- `otel-rules/airflow/airflow-instance.yaml` — per-host instance metrics + +Metric names follow Airflow's OTel export (`airflow.{stat}` with dots escaped to underscores in +MAL). See [SWIP-7](../../swip/SWIP-7.md) for the full panel list. + +## Horizon UI + +After OAP ingests OTLP metrics, open **Workflow Scheduler → Airflow** in Horizon UI. + +When Airflow runs on Kubernetes with [service hierarchy](../../concepts-and-designs/service-hierarchy.md) +(`AIRFLOW` ↔ `K8S_SERVICE`, matched by `shortName`), use the **3D Infrastructure Map** and +**Kubernetes Services** layer pages together with the AIRFLOW dashboards below. + +Screenshots include a local Kubernetes validation stack (`airflow-dev::airflow.airflow-dev` on +`Layer: K8S_SERVICE`) and a Celery cluster layout matching +[`docker-compose-cluster.yml`](../../../../test/e2e-v2/cases/airflow/docker-compose-cluster.yml). + +**3D Infrastructure Map** — Live OAP topology (`#/3d/map`): middleware tier **Airflow**; infra tier +groups **Kubernetes Services** and **Kubernetes** by namespace. + +![Horizon UI — 3D Infrastructure Map with Airflow and Kubernetes tiers](images/airflow/horizon-infra-3d-map-airflow-dev.png) + +**Kubernetes Services — Service** — HTTP RPM, latency, success rate, and pod counts for +`airflow.airflow-dev`. + +![Horizon UI — K8S_SERVICE service dashboard](images/airflow/horizon-k8s-service-service.png) + +**Kubernetes Services — Instances** — Pod instances under the service. + +![Horizon UI — K8S_SERVICE instances](images/airflow/horizon-k8s-service-instances.png) + +**Kubernetes Services — Endpoints** — Per-endpoint HTTP metrics (example: `GET:/health`). + +![Horizon UI — K8S_SERVICE endpoint GET:/health](images/airflow/horizon-k8s-service-endpoints.png) + +**Kubernetes Services — Topology** — Inbound traffic chain observed by Rover (example: Unknown → +kube-dns → airflow). + +![Horizon UI — K8S_SERVICE topology](images/airflow/horizon-k8s-service-topology.png) + +**AIRFLOW — Service** — Cluster-level SWIP-7 panels (tasks, pool slots, scheduler heartbeat, DAG +queue). + +![Horizon UI — Airflow service dashboard](images/airflow/horizon-airflow-service.png) + +**AIRFLOW — Components** — Scheduler, triggerer, and workers under one Service (four-node local +Celery layout). + +![Horizon UI — Airflow components list](images/airflow/horizon-airflow-components.png) + +**AIRFLOW — Component detail** — Instance-scoped metrics for a selected host (example: +`airflow-scheduler`). + +![Horizon UI — Airflow scheduler component metrics](images/airflow/horizon-airflow-component-scheduler.png) + +More e2e coverage and verify reports: +[test/e2e-v2/cases/airflow/README.md](../../../../test/e2e-v2/cases/airflow/README.md). + +## Customization + +You can extend or override MAL rules under `otel-rules/airflow/` and add UI dashboards in the +Horizon UI bundle. Restart OAP after rule changes. diff --git a/docs/en/setup/backend/images/airflow/horizon-airflow-component-scheduler.png b/docs/en/setup/backend/images/airflow/horizon-airflow-component-scheduler.png new file mode 100644 index 000000000000..3ac5b7a83f75 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-airflow-component-scheduler.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-airflow-components.png b/docs/en/setup/backend/images/airflow/horizon-airflow-components.png new file mode 100644 index 000000000000..d8bced54a0b0 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-airflow-components.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-airflow-service.png b/docs/en/setup/backend/images/airflow/horizon-airflow-service.png new file mode 100644 index 000000000000..ff9f8e1500c6 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-airflow-service.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-infra-3d-map-airflow-dev.png b/docs/en/setup/backend/images/airflow/horizon-infra-3d-map-airflow-dev.png new file mode 100644 index 000000000000..24c0f75df1c7 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-infra-3d-map-airflow-dev.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-k8s-service-endpoints.png b/docs/en/setup/backend/images/airflow/horizon-k8s-service-endpoints.png new file mode 100644 index 000000000000..8a5ef9a66134 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-k8s-service-endpoints.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-k8s-service-instances.png b/docs/en/setup/backend/images/airflow/horizon-k8s-service-instances.png new file mode 100644 index 000000000000..ab6c5a809e93 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-k8s-service-instances.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-k8s-service-service.png b/docs/en/setup/backend/images/airflow/horizon-k8s-service-service.png new file mode 100644 index 000000000000..7fcb86433844 Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-k8s-service-service.png differ diff --git a/docs/en/setup/backend/images/airflow/horizon-k8s-service-topology.png b/docs/en/setup/backend/images/airflow/horizon-k8s-service-topology.png new file mode 100644 index 000000000000..82a235d8f0da Binary files /dev/null and b/docs/en/setup/backend/images/airflow/horizon-k8s-service-topology.png differ diff --git a/docs/en/swip/SWIP-7.md b/docs/en/swip/SWIP-7.md new file mode 100644 index 000000000000..530065a0fef8 --- /dev/null +++ b/docs/en/swip/SWIP-7.md @@ -0,0 +1,99 @@ +# Support Apache Airflow Monitoring + +## Motivation + +Apache Airflow is an open-source workflow management platform primarily used for scheduling and +monitoring workflows. It can be used to handle complex data pipelines and has been widely applied +in the fields of data engineering and data science. Airflow allows users to write workflows called +DAGs (Directed Acyclic Graphs). Each DAG contains a series of tasks that can be executed in a +specific sequence and dependency relationship. Due to its support for multitasking in complex +scenarios, monitoring the health and operational status of Airflow is crucial. Through these +metrics, it is possible to help analyze task health status, formulate optimization plans, and +design risk prevention strategies. + +## Architecture Graph + +There is no significant architecture-level change. + +```mermaid +graph LR; + AirflowOTEL("Airflow OTEL") --> OpenTelemetryCollector("OpenTelemetry Collector") --> SkyWalkingOTELReceiver("SkyWalking OTEL Receiver") --> SkyWalkingMALEngine("SkyWalking MAL Engine") --> HorizonUI("Horizon UI") +``` + +## Proposed Changes + +1. Airflow exports metrics via native OpenTelemetry (`otel_on` / `OTEL_EXPORTER_OTLP_*`). +2. OpenTelemetry Collector receives OTLP metrics from Airflow and forwards them to SkyWalking + OTel Receiver via the OpenTelemetry exporter. +3. The SkyWalking OAP Server parses expressions with [MAL](../concepts-and-designs/mal.md) to + filter, calculate, aggregate, and store the results. +4. Metrics are displayed via [Horizon UI](https://github.com/apache/skywalking-horizon-ui) under the + **Workflow Scheduler** menu group and can be customized on dashboards. + +SkyWalking models an Airflow deployment as `Layer: AIRFLOW`: + +- **Service** — one logical cluster (`airflow::{cluster}`), keyed by resource attribute `cluster`. +- **Instance** — scheduler / worker / triggerer host (`host.name` resource attribute). + +Horizon labels this entity **Components** rather than **Instance** so operators are not led to +confuse it with Airflow **Task Instance** (a single task execution within one DAG run). See +[Airflow monitoring setup](../setup/backend/backend-airflow-monitoring.md#components-vs-skywalking-instance-vs-airflow-task-instance) +for the full naming rationale. + +### Airflow Service Supported Metrics + +| Monitoring Panel | Unit | Metric Name | Description | +|------------------|------|-------------|-------------| +| Tasks Executable | count | meter_airflow_scheduler_tasks_executable | Tasks ready for execution | +| Queued Tasks | count | meter_airflow_executor_queued_tasks | Queued tasks on executor | +| Running Tasks | count | meter_airflow_executor_running_tasks | Tasks currently running on executor | +| Open Slots | count | meter_airflow_executor_open_slots | Open executor slots | +| Pool Queued Slots | count | meter_airflow_pool_queued_slots | Queued slots in pool | +| Pool Deferred Slots | count | meter_airflow_pool_deferred_slots | Deferred slots in pool | +| Pool Scheduled Slots | count | meter_airflow_pool_scheduled_slots | Scheduled but not yet running slots in pool | +| Scheduler Heartbeat | count | meter_airflow_scheduler_heartbeat | Scheduler heartbeats per minute | +| Orphaned Tasks Cleared | count | meter_airflow_scheduler_orphaned_tasks_cleared | Orphaned tasks cleared per minute | +| Orphaned Tasks Adopted | count | meter_airflow_scheduler_orphaned_tasks_adopted | Orphaned tasks adopted per minute | +| DAG File Queue Size | count | meter_airflow_dag_file_queue_size | DAG files pending scan | +| Asset Updates | count | meter_airflow_asset_updates | Updated assets per minute | + +### Airflow Instance Supported Metrics + +| Monitoring Panel | Unit | Metric Name | Description | +|------------------|------|-------------|-------------| +| Pool Open Slots | count | meter_airflow_instance_pool_open_slots | Open slots in pool on this host | +| Pool Deferred Slots | count | meter_airflow_instance_pool_deferred_slots | Deferred slots in pool on this host | +| Pool Running Slots | count | meter_airflow_instance_pool_running_slots | Running slots in pool on this host | +| Pool Scheduled Slots | count | meter_airflow_instance_pool_scheduled_slots | Scheduled but not yet running slots on this host | +| Triggerer Heartbeat | count | meter_airflow_instance_triggerer_heartbeat | Triggerer heartbeats per minute | +| Triggers Blocked Main Thread | count | meter_airflow_instance_triggers_blocked_main_thread | Triggers blocking main thread | +| Triggers Failed | count | meter_airflow_instance_triggers_failed | Triggers that failed before firing | +| Triggers Succeeded | count | meter_airflow_instance_triggers_succeeded | Triggers that fired at least once | +| Tasks Executable | count | meter_airflow_instance_scheduler_tasks_executable | Tasks ready on this host | +| Orphaned Tasks Cleared | count | meter_airflow_instance_scheduler_orphaned_tasks_cleared | Orphaned tasks cleared on this host per minute | +| Orphaned Tasks Adopted | count | meter_airflow_instance_scheduler_orphaned_tasks_adopted | Orphaned tasks adopted on this host per minute | +| Queued Tasks | count | meter_airflow_instance_executor_queued_tasks | Queued tasks on this host | +| Running Tasks | count | meter_airflow_instance_executor_running_tasks | Running tasks on this host | +| Asset Updates | count | meter_airflow_instance_asset_updates | Asset updates on this host | +| Asset Orphaned | count | meter_airflow_instance_asset_orphaned | Orphaned assets on this host | +| Asset Triggered DagRuns | count | meter_airflow_instance_asset_triggered_dagruns | DagRuns triggered by assets | + +Service-level panels aggregate cluster-wide samples. Instance-level panels are scoped per +`host.name` (shown as **Components** in the UI). Do not sum instance-scoped samples into service dashboards when each component +exports the same instrument independently. + +Bundled Horizon UI dashboards chart the primary panels above; additional OAP metrics (for example +orphaned-task counters) are available via MQE even when not shown on a default widget. + +## Imported Dependencies libs and their licenses. + +No new dependency. + +## Compatibility + +No breaking changes. + +## General usage docs + +See [Airflow monitoring setup](../setup/backend/backend-airflow-monitoring.md) and +[e2e coverage matrix](../../../test/e2e-v2/cases/airflow/README.md). diff --git a/docs/en/swip/readme.md b/docs/en/swip/readme.md index 952bcf8af3d7..1f06ffc48558 100644 --- a/docs/en/swip/readme.md +++ b/docs/en/swip/readme.md @@ -82,6 +82,7 @@ Next SWIP Number: 15 - [SWIP-10 Support Envoy AI Gateway Observability](SWIP-10/SWIP.md) - [SWIP-9 Support Flink Monitoring](SWIP-9.md) - [SWIP-8 Support Kong Monitoring](SWIP-8.md) +- [SWIP-7 Support Apache Airflow Monitoring](SWIP-7.md) - [SWIP-6 Support ActiveMQ Monitoring](SWIP-6.md) - [SWIP-5 Support ClickHouse Monitoring](SWIP-5.md) - [SWIP-4 Support available layers of service in the topology](SWIP-4.md) diff --git a/docs/menu.yml b/docs/menu.yml index 169644a4d57b..542640bf65a5 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -152,6 +152,10 @@ catalog: path: "/en/setup/backend/backend-rocketmq-monitoring" - name: "ActiveMQ" path: "/en/setup/backend/backend-activemq-monitoring" + - name: "Workflow Scheduler" + catalog: + - name: "Airflow" + path: "/en/setup/backend/backend-airflow-monitoring" - name: "Data Processing Engine" catalog: - name: "Flink" diff --git a/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-instance.data.yaml b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-instance.data.yaml new file mode 100644 index 000000000000..2a079a2bafdb --- /dev/null +++ b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-instance.data.yaml @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +script: oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml +input: + airflow_pool_open_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-worker-1 + pool_name: default_pool + value: 8.0 + airflow_pool_deferred_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-worker-1 + pool_name: default_pool + value: 0.0 + airflow_pool_running_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-worker-1 + pool_name: default_pool + value: 2.0 + airflow_pool_scheduled_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + pool_name: default_pool + value: 1.0 + airflow_triggerer_heartbeat: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-triggerer + value: 5.0 + airflow_triggers_blocked_main_thread: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-triggerer + value: 0.0 + airflow_triggers_failed: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-triggerer + value: 0.0 + airflow_triggers_succeeded: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-triggerer + value: 3.0 + airflow_scheduler_tasks_executable: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 5.0 + airflow_scheduler_orphaned_tasks_cleared: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 2.0 + airflow_scheduler_orphaned_tasks_adopted: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 1.0 + airflow_executor_queued_tasks: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 2.0 + airflow_executor_running_tasks: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 1.0 + airflow_dataset_updates: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-worker-1 + value: 4.0 + airflow_dataset_orphaned: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 1.0 + airflow_dataset_triggered_dagruns: + - labels: + cluster: airflow-cluster + job_name: Airflow + host_name: airflow-scheduler + value: 2.0 diff --git a/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-service.data.yaml b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-service.data.yaml new file mode 100644 index 000000000000..a4149515d48d --- /dev/null +++ b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/resources/scripts/mal/test-otel-rules/airflow/airflow-service.data.yaml @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +script: oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml +input: + airflow_scheduler_tasks_executable: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 5.0 + airflow_executor_queued_tasks: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 3.0 + airflow_executor_running_tasks: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 2.0 + airflow_executor_open_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 16.0 + airflow_pool_queued_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + pool_name: default_pool + value: 1.0 + airflow_pool_deferred_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + pool_name: default_pool + value: 0.0 + airflow_pool_scheduled_slots: + - labels: + cluster: airflow-cluster + job_name: Airflow + pool_name: default_pool + value: 1.0 + airflow_scheduler_heartbeat: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 10.0 + airflow_scheduler_orphaned_tasks_cleared: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 2.0 + airflow_scheduler_orphaned_tasks_adopted: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 1.0 + airflow_dag_processing_file_path_queue_size: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 4.0 + airflow_dataset_updates: + - labels: + cluster: airflow-cluster + job_name: Airflow + value: 7.0 diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java index bb702d5c1ab3..0f269c973604 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java @@ -298,6 +298,9 @@ public final class Layer { /** Alipay Mini Program monitoring via mini-program-monitor SDK */ public static final Layer ALIPAY_MINI_PROGRAM = register("ALIPAY_MINI_PROGRAM", 49, true); + /** Apache Airflow workflow orchestration (native OpenTelemetry metrics via OTel Collector). */ + public static final Layer AIRFLOW = register("AIRFLOW", 50, true); + private final String name; private final int value; /** diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index 985d22bd1a67..ec2d3d7bae4f 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -390,7 +390,7 @@ receiver-otel: selector: ${SW_OTEL_RECEIVER:default} default: enabledHandlers: ${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"otlp-traces,otlp-metrics,otlp-logs"} - enabledOtelMetricsRules: ${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*"} + enabledOtelMetricsRules: ${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,airflow/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*"} receiver-zipkin: selector: ${SW_RECEIVER_ZIPKIN:-} diff --git a/oap-server/server-starter/src/main/resources/hierarchy-definition.yml b/oap-server/server-starter/src/main/resources/hierarchy-definition.yml index d8540809f683..fefff6914f4a 100644 --- a/oap-server/server-starter/src/main/resources/hierarchy-definition.yml +++ b/oap-server/server-starter/src/main/resources/hierarchy-definition.yml @@ -67,6 +67,9 @@ hierarchy: KONG: K8S_SERVICE: short-name + AIRFLOW: + K8S_SERVICE: short-name + VIRTUAL_DATABASE: MYSQL: lower-short-name-with-fqdn POSTGRESQL: lower-short-name-with-fqdn @@ -118,6 +121,7 @@ layer-levels: PULSAR: 2 ACTIVEMQ: 2 KONG: 2 + AIRFLOW: 2 MESH_DP: 1 CILIUM_SERVICE: 1 diff --git a/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml new file mode 100644 index 000000000000..00518d344773 --- /dev/null +++ b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-instance.yaml @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Airflow component-level metrics from native OpenTelemetry export (SWIP-7). +# Instance identity uses resource `host.name` (scheduler / worker / triggerer hostname). +filter: "{ tags -> tags.job_name == 'Airflow' }" +expSuffix: tag({tags -> tags.cluster = 'airflow::' + tags.cluster}).instance(['cluster'], ['host_name'], Layer.AIRFLOW) +metricPrefix: meter_airflow_instance +metricsRules: + - name: pool_open_slots + exp: airflow_pool_open_slots.sum(['cluster', 'host_name', 'pool_name']) + - name: pool_deferred_slots + exp: airflow_pool_deferred_slots.sum(['cluster', 'host_name', 'pool_name']) + - name: pool_running_slots + exp: airflow_pool_running_slots.sum(['cluster', 'host_name', 'pool_name']) + - name: pool_scheduled_slots + exp: airflow_pool_scheduled_slots.sum(['cluster', 'host_name', 'pool_name']) + - name: triggerer_heartbeat + exp: airflow_triggerer_heartbeat.sum(['cluster', 'host_name']).increase('PT1M') + - name: triggers_blocked_main_thread + exp: airflow_triggers_blocked_main_thread.sum(['cluster', 'host_name']).increase('PT1M') + - name: triggers_failed + exp: airflow_triggers_failed.sum(['cluster', 'host_name']).increase('PT1M') + - name: triggers_succeeded + exp: airflow_triggers_succeeded.sum(['cluster', 'host_name']).increase('PT1M') + - name: scheduler_tasks_executable + exp: airflow_scheduler_tasks_executable.sum(['cluster', 'host_name']) + - name: scheduler_orphaned_tasks_cleared + exp: airflow_scheduler_orphaned_tasks_cleared.sum(['cluster', 'host_name']).increase('PT1M') + - name: scheduler_orphaned_tasks_adopted + exp: airflow_scheduler_orphaned_tasks_adopted.sum(['cluster', 'host_name']).increase('PT1M') + - name: executor_queued_tasks + exp: airflow_executor_queued_tasks.sum(['cluster', 'host_name']) + - name: executor_running_tasks + exp: airflow_executor_running_tasks.sum(['cluster', 'host_name']) + - name: asset_updates + exp: airflow_dataset_updates.sum(['cluster', 'host_name']).increase('PT1M') + - name: asset_orphaned + exp: airflow_dataset_orphaned.sum(['cluster', 'host_name']) + - name: asset_triggered_dagruns + exp: airflow_dataset_triggered_dagruns.sum(['cluster', 'host_name']).increase('PT1M') diff --git a/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml new file mode 100644 index 000000000000..cfa3bdd7e611 --- /dev/null +++ b/oap-server/server-starter/src/main/resources/otel-rules/airflow/airflow-service.yaml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Airflow cluster-level metrics from native OpenTelemetry export (SWIP-7). +# Requires resource attribute `cluster` (set via OTEL_RESOURCE_ATTRIBUTES or Collector transform). +# OAP maps resource `service.name` (default "Airflow") to tag `job_name`. +filter: "{ tags -> tags.job_name == 'Airflow' }" +expSuffix: tag({tags -> tags.cluster = 'airflow::' + tags.cluster}).service(['cluster'], Layer.AIRFLOW) +metricPrefix: meter_airflow +metricsRules: + - name: scheduler_tasks_executable + exp: airflow_scheduler_tasks_executable.sum(['cluster']) + - name: executor_queued_tasks + exp: airflow_executor_queued_tasks.sum(['cluster']) + - name: executor_running_tasks + exp: airflow_executor_running_tasks.sum(['cluster']) + - name: executor_open_slots + exp: airflow_executor_open_slots.sum(['cluster']) + - name: pool_queued_slots + exp: airflow_pool_queued_slots.sum(['cluster', 'pool_name']) + - name: pool_deferred_slots + exp: airflow_pool_deferred_slots.sum(['cluster', 'pool_name']) + - name: pool_scheduled_slots + exp: airflow_pool_scheduled_slots.sum(['cluster', 'pool_name']) + - name: scheduler_heartbeat + exp: airflow_scheduler_heartbeat.sum(['cluster']).increase('PT1M') + - name: scheduler_orphaned_tasks_cleared + exp: airflow_scheduler_orphaned_tasks_cleared.sum(['cluster']).increase('PT1M') + - name: scheduler_orphaned_tasks_adopted + exp: airflow_scheduler_orphaned_tasks_adopted.sum(['cluster']).increase('PT1M') + - name: dag_file_queue_size + exp: airflow_dag_processing_file_path_queue_size.sum(['cluster']) + - name: asset_updates + exp: airflow_dataset_updates.sum(['cluster']).increase('PT1M') diff --git a/test/e2e-v2/cases/airflow/.gitignore b/test/e2e-v2/cases/airflow/.gitignore new file mode 100644 index 000000000000..29cc0478f61b --- /dev/null +++ b/test/e2e-v2/cases/airflow/.gitignore @@ -0,0 +1,6 @@ +# Runtime artifacts from local cluster e2e runs +cluster/logs/** +cluster/dags/__pycache__/ +cluster-e2e-report.txt +cluster-e2e-run.log +mock-e2e-report.txt diff --git a/test/e2e-v2/cases/airflow/Dockerfile.mock-sender b/test/e2e-v2/cases/airflow/Dockerfile.mock-sender new file mode 100644 index 000000000000..126896aba82e --- /dev/null +++ b/test/e2e-v2/cases/airflow/Dockerfile.mock-sender @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ARG PYTHON_IMAGE=python:3.11-slim +FROM ${PYTHON_IMAGE} + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends netcat-openbsd curl \ + && rm -rf /var/lib/apt/lists/* + +COPY scripts/requirements-replay.txt . +RUN pip install --no-cache-dir -r requirements-replay.txt + +COPY scripts/otlp_replay_server.py . + +EXPOSE 9093 + +CMD ["python", "otlp_replay_server.py"] diff --git a/test/e2e-v2/cases/airflow/README.md b/test/e2e-v2/cases/airflow/README.md new file mode 100644 index 000000000000..576ffa19c889 --- /dev/null +++ b/test/e2e-v2/cases/airflow/README.md @@ -0,0 +1,288 @@ +# Airflow monitoring e2e tests (SWIP-7) + +End-to-end tests for [SWIP-7](../../../../docs/en/swip/SWIP-7.md) Airflow monitoring. Two suites +share the same MAL rules but split responsibilities: + +| Suite | Entry | Checks | Airflow | CI matrix | +|-------|-------|--------|---------|-----------| +| **Mock (fast, full SWIP-7)** | [`e2e.yaml`](e2e.yaml) | **30** (2 topology + 28 metrics) | OTLP JSON replay via Python sidecar [`scripts/otlp_replay_server.py`](scripts/otlp_replay_server.py) + [`Dockerfile.mock-sender`](Dockerfile.mock-sender) | `Airflow` | +| **Real Celery cluster (integration smoke)** | [`e2e-cluster.yaml`](e2e-cluster.yaml) | **25** (2 topology + 23 metrics) | Live Airflow 2.10 CeleryExecutor | `Airflow Cluster` | + +Query definitions: [`airflow-cases.yaml`](airflow-cases.yaml) (mock, full matrix) and +[`airflow-cluster-cases.yaml`](airflow-cluster-cases.yaml) (cluster smoke). MAL rules: +[`otel-rules/airflow/`](../../../../oap-server/server-starter/src/main/resources/otel-rules/airflow/). + +## Real cluster topology + +``` +Airflow (scheduler + worker-1 + worker-2 + triggerer) + │ OTLP HTTP :4318 + ▼ +OpenTelemetry Collector + │ OTLP gRPC + ▼ +SkyWalking OAP (BanyanDB) + ▼ +swctl verify (GraphQL / MQE) +``` + +| Entity | Value in cluster e2e | +|--------|----------------------| +| Service | `airflow::airflow-e2e-cluster` | +| Instances verified (`host.name`) | `airflow-scheduler`, `airflow-worker-1`, `airflow-triggerer` | +| Compose project | `skywalking_e2e` (see [`scripts/cluster-compose-env.sh`](scripts/cluster-compose-env.sh)) | +| Compose file | [`docker-compose-cluster.yml`](docker-compose-cluster.yml) | + +Compose still runs **four** Airflow processes (scheduler, worker-1, worker-2, triggerer) for a +realistic Celery layout. Verify expects **three** OTLP-exporting instances: worker-2 executes +tasks but does not run the e2e Celery sidecar and is not required in the cluster smoke matrix. +The mock suite uses the same three-host OTLP fixture with identical instance metric bindings. + +### Workload seeding + +[`scripts/seed-e2e-cluster-workload.sh`](scripts/seed-e2e-cluster-workload.sh) triggers DAG runs +3 rounds (default), then waits **240 s** for OTLP export and MAL aggregation: + +- Custom: `cluster_smoke`, `cluster_load` ([`cluster/dags/`](cluster/dags/)) +- Built-in examples: `example_bash_operator`, `example_python_operator`, `example_branch_operator`, + `example_short_circuit_operator` +- `LOAD_EXAMPLES=true` loads the full Airflow example DAG set for scheduler activity + +Environment overrides: `SEED_ROUNDS`, `SEED_INTERVAL_SECONDS`, `RUN_SECONDS`. + +### OTLP metric sources (real cluster) + +| Source | Hosts | Instruments | +|--------|-------|-------------| +| **Airflow native OTel** (`AIRFLOW__METRICS__OTEL_ON`) | scheduler, triggerer, workers | Scheduler / executor / pool (scheduler) / heartbeat / orphaned-task / DAG-processing / dataset / triggers (when deferrable + dataset DAGs run) | +| **E2e-only Celery sidecar** [`worker_otel_reporter.py`](cluster/scripts/worker_otel_reporter.py) | `airflow-worker-1` only | `airflow.pool.{open,running,deferred}_slots` — Airflow 2.10 Celery workers do not emit pool gauges natively; values are derived from `celery inspect active`, not Airflow Stats | + +Dedicated e2e DAGs (native OTel, no reporter): + +| DAG | Purpose | +|-----|---------| +| [`e2e_deferrable.py`](cluster/dags/e2e_deferrable.py) | `TimeDeltaSensorAsync` → triggerer exports `triggers_*` counters | +| [`e2e_dataset.py`](cluster/dags/e2e_dataset.py) | `e2e_dataset_producer` / `e2e_dataset_consumer` — scheduler `asset_orphaned` / `asset_triggered_dagruns` in cluster smoke (`asset_updates` is mock-only) | + +`meter_airflow_instance_asset_orphaned` is verified in the cluster smoke when Airflow emits the +gauge (0 when no orphans). Metrics that need synthetic OTLP or rare failure events are **mock-only** +(see [Coverage split](#coverage-split) below). + +## Coverage split + +| Concern | Mock (`airflow-cases.yaml`) | Cluster (`airflow-cluster-cases.yaml`) | +|---------|----------------------------|----------------------------------------| +| **Goal** | Full SWIP-7 MAL/MQE contract | Real Airflow → OTel → OAP integration | +| **Topology** | 3 instances | 3 OTLP-exporting instances | +| **Service metrics** | 12 | 10 (excludes `asset_updates`, `pool_queued_slots`) | +| **Instance metrics** | 16 | 13 (excludes `asset_updates`, `triggers_failed`, `triggers_blocked_main_thread`) | +| **Total checks** | **30** | **25** | + +**Mock-only metrics** (synthetic OTLP in [`mock-data/otel-airflow-metrics.json`](mock-data/otel-airflow-metrics.json)): + +| Metric | Reason cluster omits it | +|--------|-------------------------| +| `meter_airflow_asset_updates` (service + instance) | Dataset producer timing is hard to stabilize; mock sender injects `airflow.dataset.updates` | +| `meter_airflow_pool_queued_slots` (service) | Scheduler-only OTLP; default pool rarely queues under e2e load — unstable without synthetic OTLP | +| `meter_airflow_instance_triggers_failed` | Airflow may not export the counter when no triggers fail (`null`, not `0`) | +| `meter_airflow_instance_triggers_blocked_main_thread` | Same — absent unless a trigger blocks the main thread | + +Cluster still verifies `triggers_succeeded` on the triggerer (deferrable DAG smoke) plus +`asset_orphaned` / `asset_triggered_dagruns` when the scheduler emits them. + +## Test coverage matrix (full SWIP-7 — mock suite) + +Each row is one `swctl metrics exec` assertion. Expected template: +[`expected/metrics-has-value.yml`](expected/metrics-has-value.yml) (non-null numeric time series; +`0` is valid). The cluster smoke uses the same expressions except for the mock-only rows below. + +### Topology (2 checks) + +| # | Query | Expected | +|---|-------|----------| +| 1 | `swctl service ly AIRFLOW` | Service `airflow::airflow-e2e-cluster`, layer `AIRFLOW` — [`expected/service-cluster.yml`](expected/service-cluster.yml) | +| 2 | `swctl instance ls --service-name=airflow::airflow-e2e-cluster` | 3 instances — [`expected/instance.yml`](expected/instance.yml) | + +Mock suite uses `airflow::airflow-cluster` — [`expected/service.yml`](expected/service.yml), +[`expected/instance.yml`](expected/instance.yml) (same 3 hosts). Cluster verify matches mock +instance bindings; worker-2 remains in compose for Celery realism but is not an e2e assertion. + +### Service metrics (12) + +| # | MQE expression | Airflow OTel instrument (MAL) | Cluster smoke | +|---|----------------|----------------------------------|---------------| +| 1 | `meter_airflow_scheduler_tasks_executable` | `airflow.scheduler.tasks_executable` | yes | +| 2 | `meter_airflow_executor_queued_tasks` | `airflow.executor.queued_tasks` | yes | +| 3 | `meter_airflow_executor_running_tasks` | `airflow.executor.running_tasks` | yes | +| 4 | `meter_airflow_executor_open_slots` | `airflow.executor.open_slots` | yes | +| 5 | `meter_airflow_pool_queued_slots` | `airflow.pool.queued_slots` | mock only | +| 6 | `meter_airflow_pool_deferred_slots` | `airflow.pool.deferred_slots` | yes | +| 7 | `meter_airflow_pool_scheduled_slots` | `airflow.pool.scheduled_slots` | yes | +| 8 | `meter_airflow_scheduler_heartbeat` | `airflow.scheduler.heartbeat` | yes | +| 9 | `meter_airflow_scheduler_orphaned_tasks_cleared` | `airflow.scheduler.orphaned_tasks_cleared` | yes | +| 10 | `meter_airflow_scheduler_orphaned_tasks_adopted` | `airflow.scheduler.orphaned_tasks_adopted` | yes | +| 11 | `meter_airflow_dag_file_queue_size` | `airflow.dag_processing.file_path_queue_size` | yes | +| 12 | `meter_airflow_asset_updates` | `airflow.dataset.updates` | mock only | + +### Instance metrics (16) + +Instance-scoped queries use `--instance-name={host.name}`. + +| # | MQE expression | Scoped instance | Cluster smoke | +|---|----------------|-----------------|---------------| +| 1 | `meter_airflow_instance_pool_open_slots` | `airflow-worker-1` | yes (e2e Celery sidecar) | +| 2 | `meter_airflow_instance_pool_deferred_slots` | `airflow-worker-1` | yes (e2e Celery sidecar) | +| 3 | `meter_airflow_instance_pool_running_slots` | `airflow-worker-1` | yes (e2e Celery sidecar) | +| 4 | `meter_airflow_instance_pool_scheduled_slots` | `airflow-scheduler` | yes | +| 5 | `meter_airflow_instance_triggerer_heartbeat` | `airflow-triggerer` | yes | +| 6 | `meter_airflow_instance_triggers_blocked_main_thread` | `airflow-triggerer` | mock only | +| 7 | `meter_airflow_instance_triggers_failed` | `airflow-triggerer` | mock only | +| 8 | `meter_airflow_instance_triggers_succeeded` | `airflow-triggerer` | yes | +| 9 | `meter_airflow_instance_scheduler_tasks_executable` | `airflow-scheduler` | yes | +| 10 | `meter_airflow_instance_scheduler_orphaned_tasks_cleared` | `airflow-scheduler` | yes | +| 11 | `meter_airflow_instance_scheduler_orphaned_tasks_adopted` | `airflow-scheduler` | yes | +| 12 | `meter_airflow_instance_executor_queued_tasks` | `airflow-scheduler` | yes | +| 13 | `meter_airflow_instance_executor_running_tasks` | `airflow-scheduler` | yes | +| 14 | `meter_airflow_instance_asset_updates` | `airflow-worker-1` | mock only | +| 15 | `meter_airflow_instance_asset_orphaned` | `airflow-scheduler` | yes | +| 16 | `meter_airflow_instance_asset_triggered_dagruns` | `airflow-scheduler` | yes | + +**Mock total: 30 checks** (2 topology + 28 metrics) = full SWIP-7 panel set. +**Cluster total: 25 checks** (2 topology + 23 metrics) = integration smoke. + +Both suites use the same instance bindings for triggerer (`airflow-triggerer`), dataset +orphan/triggered DagRuns (`airflow-scheduler`), and worker pool gauges (`airflow-worker-1`). + +## Running locally + +### Prerequisites + +- Docker / Docker Compose +- Git Bash on Windows (use `/usr/bin/bash`, not MSYS-only shell) +- Go (for `swctl` install via setup scripts) + +### Real cluster — one command + +```bash +cd /path/to/skywalking +export OTEL_COLLECTOR_VERSION=0.102.1 SW_AGENT_JDK_VERSION=8 +chmod +x test/e2e-v2/cases/airflow/scripts/*.sh +/usr/bin/bash test/e2e-v2/cases/airflow/scripts/run-full-cluster-e2e.sh +``` + +Steps inside: `compose up` → install tools → wait for scheduler → seed workload → verify. + +### Real cluster — incremental + +```bash +set -a && source test/e2e-v2/script/env && set +a +source test/e2e-v2/cases/airflow/scripts/cluster-compose-env.sh +dc up -d +/usr/bin/bash test/e2e-v2/cases/airflow/scripts/run-cluster-setup.sh +/usr/bin/bash test/e2e-v2/cases/airflow/scripts/verify-cluster-e2e.sh +``` + +### Mock suite (infra-e2e) + +```bash +# From repo root, with e2e CLI installed +e2e run -c test/e2e-v2/cases/airflow/e2e.yaml +``` + +### Verify tuning + +| Variable | Default | Purpose | +|----------|---------|---------| +| `VERIFY_RETRIES` | `18` | Poll attempts per check | +| `VERIFY_INTERVAL_SECONDS` | `10` | Sleep between attempts | +| `VERIFY_REPORT` | `test/e2e-v2/cases/airflow/cluster-e2e-report.txt` | Report output path | + +### Windows notes + +- Ensure `test/e2e-v2/script/env` uses **LF** line endings (CRLF breaks BanyanDB image tags). +- OAP port is **dynamic** (`12800` without host binding in compose); scripts resolve it via + `docker compose port`. +- Use `run-full-cluster-e2e.sh` + `verify-cluster-e2e.sh` instead of raw `e2e run` verify on + Windows (`${oap_12800}` substitution issue in infra-e2e). +- Setup steps do not persist `PATH` between infra-e2e steps — `run-cluster-setup.sh` merges + tool install, health wait, and workload seed into one script. + +## Verification report + +Each cluster verify run writes a line-oriented report to +`cluster-e2e-report.txt` (overwritten, gitignored). Full compose logs from local runs may be +captured in `cluster-e2e-run.log` (gitignored). + +### Report format + +``` +=== Airflow cluster e2e verify (integration smoke) === +time: +compose project: skywalking_e2e +OAP GraphQL: http://localhost:/graphql + + PASS|FAIL: + detail: + +=== Summary === +PASS: FAIL: TOTAL: 25 +Report: test/e2e-v2/cases/airflow/cluster-e2e-report.txt +``` + +Pass criteria per metric: `swctl metrics exec` returns `TIME_SERIES_VALUES` with at least one +point whose `value` is a non-null number (zero counts as pass). + +### Cluster smoke checklist (25 checks) + +**Topology (2)** + +- service ly AIRFLOW → `airflow::airflow-e2e-cluster` +- instances: scheduler, worker-1, triggerer + +**Service metrics (10)** — excludes `meter_airflow_asset_updates`, `meter_airflow_pool_queued_slots` + +**Instance metrics (13)** — excludes `asset_updates`, `triggers_failed`, `triggers_blocked_main_thread` + +Full SWIP-7 (30 checks) baseline is the mock suite — see `mock-e2e-report.txt` (gitignored). + +
+Historical full cluster run (30 checks, superseded by split above) + +2026-06-02 run achieved 30/30 before the mock/cluster split; several checks were flaky on real +Airflow without synthetic OTLP. Current cluster scope is the 25-check integration smoke. + +
+ +## File reference + +| Path | Role | +|------|------| +| [`e2e.yaml`](e2e.yaml) | Mock suite entry (CI `Airflow`) | +| [`e2e-cluster.yaml`](e2e-cluster.yaml) | Real cluster entry (CI `Airflow Cluster`, timeout 35m) | +| [`docker-compose.yml`](docker-compose.yml) | Mock stack (OAP + mock sender) | +| [`scripts/otlp_replay_server.py`](scripts/otlp_replay_server.py) | Python OTLP JSON replay sidecar (supports `increase('PT1M')` metrics; built via `Dockerfile.mock-sender`) | +| [`docker-compose-cluster.yml`](docker-compose-cluster.yml) | Real Airflow Celery stack | +| [`otel-collector-config.yaml`](otel-collector-config.yaml) | Collector → OAP pipeline | +| [`mock-data/otel-airflow-metrics.json`](mock-data/otel-airflow-metrics.json) | Mock OTLP payload | +| [`scripts/run-full-cluster-e2e.sh`](scripts/run-full-cluster-e2e.sh) | Local end-to-end driver | +| [`scripts/run-cluster-setup.sh`](scripts/run-cluster-setup.sh) | Tools + health + workload | +| [`scripts/verify-cluster-e2e.sh`](scripts/verify-cluster-e2e.sh) | Cluster integration smoke (25 swctl checks) | +| [`scripts/wait-scheduler-healthy.sh`](scripts/wait-scheduler-healthy.sh) | Scheduler health gate | +| `cluster-e2e-report.txt` | Generated verify report (gitignored) | + +## CI + +GitHub Actions matrix (`.github/workflows/skywalking.yaml`): + +- **Airflow** — `test/e2e-v2/cases/airflow/e2e.yaml` +- **Airflow Cluster** — `test/e2e-v2/cases/airflow/e2e-cluster.yaml` + +Cluster job uses infra-e2e `verify` with [`airflow-cluster-cases.yaml`](airflow-cluster-cases.yaml) +(Linux; `${oap_host}:${oap_12800}` substitution works). Local Windows runs use +[`verify-cluster-e2e.sh`](scripts/verify-cluster-e2e.sh) instead. + +## Related docs + +- [Airflow monitoring setup](../../../../docs/en/setup/backend/backend-airflow-monitoring.md) +- [SWIP-7 proposal](../../../../docs/en/swip/SWIP-7.md) +- [E2E test guide](../../CLAUDE.md) diff --git a/test/e2e-v2/cases/airflow/airflow-cases.yaml b/test/e2e-v2/cases/airflow/airflow-cases.yaml new file mode 100644 index 000000000000..92959b192f89 --- /dev/null +++ b/test/e2e-v2/cases/airflow/airflow-cases.yaml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cases: + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ly AIRFLOW + expected: expected/service.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance ls --service-name=airflow::airflow-cluster + expected: expected/instance.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_tasks_executable --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_executor_queued_tasks --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_executor_running_tasks --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_executor_open_slots --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_pool_queued_slots --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_pool_deferred_slots --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_pool_scheduled_slots --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_heartbeat --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_orphaned_tasks_cleared --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_orphaned_tasks_adopted --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_dag_file_queue_size --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_asset_updates --service-name=airflow::airflow-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_open_slots --service-name=airflow::airflow-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_deferred_slots --service-name=airflow::airflow-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_running_slots --service-name=airflow::airflow-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_scheduled_slots --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_triggerer_heartbeat --service-name=airflow::airflow-cluster --instance-name=airflow-triggerer + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_triggers_blocked_main_thread --service-name=airflow::airflow-cluster --instance-name=airflow-triggerer + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_triggers_failed --service-name=airflow::airflow-cluster --instance-name=airflow-triggerer + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_triggers_succeeded --service-name=airflow::airflow-cluster --instance-name=airflow-triggerer + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_scheduler_tasks_executable --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_scheduler_orphaned_tasks_cleared --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_scheduler_orphaned_tasks_adopted --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_executor_queued_tasks --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_executor_running_tasks --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_asset_updates --service-name=airflow::airflow-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_asset_orphaned --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_asset_triggered_dagruns --service-name=airflow::airflow-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml diff --git a/test/e2e-v2/cases/airflow/airflow-cluster-cases.yaml b/test/e2e-v2/cases/airflow/airflow-cluster-cases.yaml new file mode 100644 index 000000000000..38dd07c14fd7 --- /dev/null +++ b/test/e2e-v2/cases/airflow/airflow-cluster-cases.yaml @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Real Celery cluster integration smoke (service airflow-e2e-cluster). +# Full SWIP-7 matrix (30 checks) lives in airflow-cases.yaml (mock OTLP replay). +# Cluster omits metrics that are flaky without synthetic OTLP: asset_updates, +# pool_queued_slots, triggers_blocked_main_thread, triggers_failed. Instance topology uses the three +# hosts that export OTLP in this compose layout (scheduler, worker-1, triggerer). + +cases: + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ly AIRFLOW + expected: expected/service-cluster.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance ls --service-name=airflow::airflow-e2e-cluster + expected: expected/instance.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_tasks_executable --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_executor_queued_tasks --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_executor_running_tasks --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_executor_open_slots --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_pool_deferred_slots --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_pool_scheduled_slots --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_heartbeat --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_orphaned_tasks_cleared --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_scheduler_orphaned_tasks_adopted --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_dag_file_queue_size --service-name=airflow::airflow-e2e-cluster + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_open_slots --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_deferred_slots --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_running_slots --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-worker-1 + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_pool_scheduled_slots --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value-label-poolname.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_triggerer_heartbeat --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-triggerer + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_triggers_succeeded --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-triggerer + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_scheduler_tasks_executable --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_scheduler_orphaned_tasks_cleared --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_scheduler_orphaned_tasks_adopted --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_executor_queued_tasks --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_executor_running_tasks --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_asset_orphaned --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics exec --expression=meter_airflow_instance_asset_triggered_dagruns --service-name=airflow::airflow-e2e-cluster --instance-name=airflow-scheduler + expected: expected/metrics-has-value.yml diff --git a/test/e2e-v2/cases/airflow/cluster/dags/cluster_load.py b/test/e2e-v2/cases/airflow/cluster/dags/cluster_load.py new file mode 100644 index 000000000000..15d5af6fc33e --- /dev/null +++ b/test/e2e-v2/cases/airflow/cluster/dags/cluster_load.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from airflow import DAG +from airflow.operators.bash import BashOperator + +# Sustained load for real-cluster e2e (queued / running / scheduled gauges). +with DAG( + dag_id="cluster_load", + start_date=datetime(2024, 1, 1), + schedule=None, + catchup=False, + tags=["swip7", "e2e", "load"], + max_active_runs=3, +) as dag: + for index in range(1, 9): + BashOperator( + task_id=f"sleep_{index}", + bash_command=f"echo load-{index}-start && sleep 90 && echo load-{index}-done", + ) diff --git a/test/e2e-v2/cases/airflow/cluster/dags/cluster_smoke.py b/test/e2e-v2/cases/airflow/cluster/dags/cluster_smoke.py new file mode 100644 index 000000000000..0ff309c1ec51 --- /dev/null +++ b/test/e2e-v2/cases/airflow/cluster/dags/cluster_smoke.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.bash import BashOperator + +with DAG( + dag_id="cluster_smoke", + start_date=datetime(2024, 1, 1), + schedule=timedelta(minutes=1), + catchup=False, + tags=["swip7", "e2e"], +) as dag: + BashOperator( + task_id="ping", + bash_command="echo cluster-smoke-$(hostname)", + ) diff --git a/test/e2e-v2/cases/airflow/cluster/dags/e2e_dataset.py b/test/e2e-v2/cases/airflow/cluster/dags/e2e_dataset.py new file mode 100644 index 000000000000..7523dfaef2ec --- /dev/null +++ b/test/e2e-v2/cases/airflow/cluster/dags/e2e_dataset.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from airflow import DAG +from airflow.datasets import Dataset +from airflow.operators.bash import BashOperator + +E2E_DATASET = Dataset("file:///tmp/swip7-e2e-dataset") + +with DAG( + dag_id="e2e_dataset_producer", + start_date=datetime(2024, 1, 1), + schedule=None, + catchup=False, + tags=["swip7", "e2e", "dataset"], +) as producer_dag: + BashOperator( + task_id="produce", + bash_command="echo swip7-dataset-produce", + outlets=[E2E_DATASET], + ) + +with DAG( + dag_id="e2e_dataset_consumer", + start_date=datetime(2024, 1, 1), + schedule=[E2E_DATASET], + catchup=False, + tags=["swip7", "e2e", "dataset"], +) as consumer_dag: + BashOperator( + task_id="consume", + bash_command="echo swip7-dataset-consume", + ) diff --git a/test/e2e-v2/cases/airflow/cluster/dags/e2e_deferrable.py b/test/e2e-v2/cases/airflow/cluster/dags/e2e_deferrable.py new file mode 100644 index 000000000000..d03baaa7f69a --- /dev/null +++ b/test/e2e-v2/cases/airflow/cluster/dags/e2e_deferrable.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.sensors.time_delta import TimeDeltaSensorAsync + +# Deferrable sensor so triggerer exports native triggers_* OTel counters. +with DAG( + dag_id="e2e_deferrable", + start_date=datetime(2024, 1, 1), + schedule=None, + catchup=False, + tags=["swip7", "e2e", "deferrable"], +) as dag: + TimeDeltaSensorAsync( + task_id="defer_wait", + delta=timedelta(seconds=45), + ) diff --git a/test/e2e-v2/cases/airflow/cluster/scripts/worker_otel_reporter.py b/test/e2e-v2/cases/airflow/cluster/scripts/worker_otel_reporter.py new file mode 100644 index 000000000000..15f6f11397e8 --- /dev/null +++ b/test/e2e-v2/cases/airflow/cluster/scripts/worker_otel_reporter.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/usr/bin/env python3 +# Airflow 2.10 Celery workers do not emit pool gauges via native OTel; approximate from Celery +# active tasks for e2e only (see test/e2e-v2/cases/airflow/README.md). +import os +import socket +import subprocess +import time + +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.metrics import Observation +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource + +HOST = os.environ.get("HOSTNAME") or socket.gethostname() +CLUSTER = os.environ.get("AIRFLOW_CLUSTER", "airflow-e2e-cluster") +ENDPOINT = os.environ.get( + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", + "http://otel-collector:4318/v1/metrics", +) +INTERVAL_MS = int(os.environ.get("OTEL_METRIC_EXPORT_INTERVAL", "30000")) +OPEN_SLOTS = float(os.environ.get("WORKER_OTEL_OPEN_SLOTS", "8")) + + +def _celery_active_tasks() -> float: + cmds = [ + [ + "celery", + "-A", + "airflow.providers.celery.executors.celery_executor.app", + "inspect", + "active", + "-d", + f"celery@{HOST}", + ], + [ + "celery", + "-A", + "airflow.executors.celery_executor.app", + "inspect", + "active", + "-d", + f"celery@{HOST}", + ], + ] + for cmd in cmds: + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=10, + check=False, + ) + if proc.returncode != 0: + continue + count = proc.stdout.count("'name':") + if count == 0: + count = proc.stdout.count('"name":') + return float(count) + except (OSError, subprocess.SubprocessError): + continue + return 0.0 + + +def pool_open_slots(_options): + active = _celery_active_tasks() + yield Observation(max(OPEN_SLOTS - active, 0.0), {"pool_name": "default_pool"}) + + +def pool_running_slots(_options): + yield Observation(_celery_active_tasks(), {"pool_name": "default_pool"}) + + +def pool_deferred_slots(_options): + yield Observation(0.0, {"pool_name": "default_pool"}) + + +def main(): + resource = Resource.create( + { + HOST_NAME: HOST, + SERVICE_NAME: "Airflow", + "cluster": CLUSTER, + } + ) + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=ENDPOINT), + export_interval_millis=INTERVAL_MS, + ) + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=[reader]) + ) + meter = metrics.get_meter("airflow.worker.reporter") + meter.create_observable_gauge("airflow.pool.open_slots", callbacks=[pool_open_slots]) + meter.create_observable_gauge( + "airflow.pool.running_slots", callbacks=[pool_running_slots] + ) + meter.create_observable_gauge( + "airflow.pool.deferred_slots", callbacks=[pool_deferred_slots] + ) + while True: + time.sleep(INTERVAL_MS / 1000.0) + + +if __name__ == "__main__": + main() diff --git a/test/e2e-v2/cases/airflow/docker-compose-cluster.yml b/test/e2e-v2/cases/airflow/docker-compose-cluster.yml new file mode 100644 index 000000000000..3c127908111d --- /dev/null +++ b/test/e2e-v2/cases/airflow/docker-compose-cluster.yml @@ -0,0 +1,238 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Real Airflow Celery cluster e2e (4 metric nodes: scheduler + 2 workers + triggerer). +# Airflow -> OTel Collector -> OAP (BanyanDB). Pair with e2e-cluster.yaml. + +x-airflow-otel-env: &airflow-otel-env + _PIP_ADDITIONAL_REQUIREMENTS: "apache-airflow[otel] opentelemetry-exporter-otlp-proto-http" + AIRFLOW__METRICS__OTEL_ON: "True" + AIRFLOW__METRICS__OTEL_SERVICE: "Airflow" + AIRFLOW__METRICS__OTEL_HOST: otel-collector + AIRFLOW__METRICS__OTEL_PORT: "4318" + AIRFLOW__METRICS__OTEL_SSL_ACTIVE: "False" + OTEL_SERVICE_NAME: "Airflow" + OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf + OTEL_METRICS_EXPORTER: otlp + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://otel-collector:4318/v1/metrics + OTEL_METRIC_EXPORT_INTERVAL: "30000" + AIRFLOW_CLUSTER: "airflow-e2e-cluster" + +x-airflow-common: &airflow-common + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.5-python3.11} + environment: &airflow-common-env + <<: *airflow-otel-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: "" + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false" + AIRFLOW__CORE__LOAD_EXAMPLES: "true" + AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session" + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true" + volumes: + - ./cluster/dags:/opt/airflow/dags + - ./cluster/logs:/opt/airflow/logs + - ./cluster/config:/opt/airflow/config + - ./cluster/plugins:/opt/airflow/plugins + - ./cluster/scripts:/opt/airflow/scripts + user: "${AIRFLOW_UID:-50000}:0" + depends_on: &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + networks: + - e2e + +services: + oap: + extends: + file: ../../script/docker-compose/base-compose.yml + service: oap + ports: + - 12800 + environment: + SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES: "airflow/*" + networks: + - e2e + depends_on: + banyandb: + condition: service_healthy + + banyandb: + extends: + file: ../../script/docker-compose/base-compose.yml + service: banyandb + ports: + - 17912 + networks: + e2e: + aliases: + - banyandb + + otel-collector: + image: otel/opentelemetry-collector:${OTEL_COLLECTOR_VERSION} + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro + depends_on: + oap: + condition: service_healthy + networks: + - e2e + + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + networks: + - e2e + + redis: + image: redis:7.2-bookworm + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + networks: + - e2e + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + command: + - -c + - | + mkdir -p /opt/airflow/logs /opt/airflow/dags /opt/airflow/plugins /opt/airflow/config + chown -R "${AIRFLOW_UID:-50000}:0" /opt/airflow/logs /opt/airflow/dags /opt/airflow/plugins /opt/airflow/config || true + airflow db migrate + airflow users create --username "$${_AIRFLOW_WWW_USER_USERNAME}" --password "$${_AIRFLOW_WWW_USER_PASSWORD}" --firstname Admin --lastname User --role Admin --email admin@example.com || true + exec airflow version + environment: + <<: *airflow-common-env + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-admin} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-admin} + user: "0:0" + depends_on: + <<: *airflow-common-depends-on + + airflow-scheduler: + <<: *airflow-common + hostname: airflow-scheduler + command: scheduler + environment: + <<: *airflow-common-env + OTEL_RESOURCE_ATTRIBUTES: "cluster=airflow-e2e-cluster,service.name=Airflow,host.name=airflow-scheduler" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 90s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + otel-collector: + condition: service_started + + airflow-triggerer: + <<: *airflow-common + hostname: airflow-triggerer + command: triggerer + environment: + <<: *airflow-common-env + OTEL_RESOURCE_ATTRIBUTES: "cluster=airflow-e2e-cluster,service.name=Airflow,host.name=airflow-triggerer" + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 90s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + otel-collector: + condition: service_started + + airflow-worker-1: + <<: *airflow-common + hostname: airflow-worker-1 + command: + - bash + - -c + - | + python /opt/airflow/scripts/worker_otel_reporter.py & + exec airflow celery worker + environment: + <<: *airflow-common-env + OTEL_RESOURCE_ATTRIBUTES: "cluster=airflow-e2e-cluster,service.name=Airflow,host.name=airflow-worker-1" + DUMB_INIT_SETSID: "0" + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 90s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + otel-collector: + condition: service_started + + airflow-worker-2: + <<: *airflow-common + hostname: airflow-worker-2 + command: celery worker + environment: + <<: *airflow-common-env + OTEL_RESOURCE_ATTRIBUTES: "cluster=airflow-e2e-cluster,service.name=Airflow,host.name=airflow-worker-2" + DUMB_INIT_SETSID: "0" + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 90s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + otel-collector: + condition: service_started + +networks: + e2e: diff --git a/test/e2e-v2/cases/airflow/docker-compose.mock-local.yml b/test/e2e-v2/cases/airflow/docker-compose.mock-local.yml new file mode 100644 index 000000000000..2d23060745c7 --- /dev/null +++ b/test/e2e-v2/cases/airflow/docker-compose.mock-local.yml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Local mock e2e overrides: +# - BanyanDB stays internal-only (avoid host port 17912 clashes with other stacks). +# - OAP loads SWIP-7 config/libs from deploy/skywalking (CI uses a dist-built image instead). + +services: + banyandb: + ports: !reset [] + oap: + volumes: + - ../../../../deploy/skywalking/config:/skywalking/config:ro + - ../../../../deploy/skywalking/oap-libs:/skywalking/oap-libs:ro + sender: + healthcheck: + test: ["CMD", "sh", "-c", "wget -qO- http://127.0.0.1:9093/otel-metrics/send >/dev/null 2>&1 || curl -sf http://127.0.0.1:9093/otel-metrics/send >/dev/null"] + interval: 5s + timeout: 60s + retries: 120 diff --git a/test/e2e-v2/cases/airflow/docker-compose.yml b/test/e2e-v2/cases/airflow/docker-compose.yml new file mode 100644 index 000000000000..d5e9400e307a --- /dev/null +++ b/test/e2e-v2/cases/airflow/docker-compose.yml @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + oap: + extends: + file: ../../script/docker-compose/base-compose.yml + service: oap + ports: + - 12800 + environment: + SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES: "airflow/*" + networks: + - e2e + + banyandb: + extends: + file: ../../script/docker-compose/base-compose.yml + service: banyandb + ports: + - 17912 + + sender: + build: + context: . + dockerfile: Dockerfile.mock-sender + args: + PYTHON_IMAGE: python:3.11-slim + volumes: + - ./mock-data:/data/otel-metrics:ro + environment: + OAP_HOST: oap + OAP_GRPC_PORT: 11800 + OTEL_METRICS_DATA_PATH: /data/otel-metrics + networks: + - e2e + ports: + - 9093 + healthcheck: + test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + oap: + condition: service_healthy + +networks: + e2e: diff --git a/test/e2e-v2/cases/airflow/e2e-cluster.yaml b/test/e2e-v2/cases/airflow/e2e-cluster.yaml new file mode 100644 index 000000000000..223887761716 --- /dev/null +++ b/test/e2e-v2/cases/airflow/e2e-cluster.yaml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Real Airflow Celery cluster (scheduler + 2 workers + triggerer) with live OTLP metrics. +# Slower than e2e.yaml (mock sender); verifies production-like OTLP integration (25 checks). +# Full SWIP-7 metric matrix (30 checks) is in e2e.yaml (mock OTLP replay). + +setup: + env: compose + file: docker-compose-cluster.yml + timeout: 35m + init-system-environment: ../../script/env + steps: + - name: cluster setup (tools, workload, verify env) + command: /usr/bin/bash test/e2e-v2/cases/airflow/scripts/run-cluster-setup.sh + +verify: + retry: + count: 60 + interval: 10s + cases: + - includes: + - ./airflow-cluster-cases.yaml diff --git a/test/e2e-v2/cases/airflow/e2e.yaml b/test/e2e-v2/cases/airflow/e2e.yaml new file mode 100644 index 000000000000..e2341ffb2813 --- /dev/null +++ b/test/e2e-v2/cases/airflow/e2e.yaml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +setup: + env: compose + file: docker-compose.yml + timeout: 20m + init-system-environment: ../../script/env + steps: + - name: set PATH + command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH + - name: install yq + command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq + - name: install swctl + command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl + +trigger: + action: http + interval: 3s + times: -1 + url: http://${sender_host}:${sender_9093}/otel-metrics/send + method: GET + +verify: + retry: + count: 20 + interval: 10s + cases: + - includes: + - ./airflow-cases.yaml diff --git a/test/e2e-v2/cases/airflow/expected/instance.yml b/test/e2e-v2/cases/airflow/expected/instance.yml new file mode 100644 index 000000000000..64d7a1434f16 --- /dev/null +++ b/test/e2e-v2/cases/airflow/expected/instance.yml @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{{- contains . }} +- id: {{ notEmpty .id }} + name: airflow-scheduler + instanceuuid: {{ notEmpty .instanceuuid }} + attributes: [] + language: UNKNOWN +- id: {{ notEmpty .id }} + name: airflow-worker-1 + instanceuuid: {{ notEmpty .instanceuuid }} + attributes: [] + language: UNKNOWN +- id: {{ notEmpty .id }} + name: airflow-triggerer + instanceuuid: {{ notEmpty .instanceuuid }} + attributes: [] + language: UNKNOWN +{{- end }} diff --git a/test/e2e-v2/cases/airflow/expected/metrics-has-value-label-poolname.yml b/test/e2e-v2/cases/airflow/expected/metrics-has-value-label-poolname.yml new file mode 100644 index 000000000000..ba881827ba5f --- /dev/null +++ b/test/e2e-v2/cases/airflow/expected/metrics-has-value-label-poolname.yml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +debuggingtrace: null +type: TIME_SERIES_VALUES +results: + {{- contains .results }} + - metric: + labels: + {{- contains .metric.labels }} + - key: pool_name + value: {{ notEmpty .value }} + {{- end}} + values: + {{- contains .values }} + - id: {{ notEmpty .id }} + value: {{ notEmpty .value }} + traceid: null + owner: null + - id: {{ notEmpty .id }} + value: null + traceid: null + owner: null + {{- end}} + {{- end}} +error: null diff --git a/test/e2e-v2/cases/airflow/expected/metrics-has-value.yml b/test/e2e-v2/cases/airflow/expected/metrics-has-value.yml new file mode 100644 index 000000000000..ced9ce289197 --- /dev/null +++ b/test/e2e-v2/cases/airflow/expected/metrics-has-value.yml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +debuggingtrace: null +type: TIME_SERIES_VALUES +results: + {{- contains .results }} + - metric: + labels: [] + values: + {{- contains .values }} + - id: {{ notEmpty .id }} + value: {{ notEmpty .value }} + traceid: null + owner: null + {{- end}} + {{- end}} +error: null diff --git a/test/e2e-v2/cases/airflow/expected/service-cluster.yml b/test/e2e-v2/cases/airflow/expected/service-cluster.yml new file mode 100644 index 000000000000..23bd03fc3511 --- /dev/null +++ b/test/e2e-v2/cases/airflow/expected/service-cluster.yml @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{{- containsOnce . }} +- id: {{ b64enc "airflow::airflow-e2e-cluster" }}.1 + name: airflow::airflow-e2e-cluster + group: airflow + shortname: airflow-e2e-cluster + layers: + - AIRFLOW + normal: true +{{- end }} diff --git a/test/e2e-v2/cases/airflow/expected/service.yml b/test/e2e-v2/cases/airflow/expected/service.yml new file mode 100644 index 000000000000..46998441ee2f --- /dev/null +++ b/test/e2e-v2/cases/airflow/expected/service.yml @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{{- containsOnce . }} +- id: {{ b64enc "airflow::airflow-cluster" }}.1 + name: airflow::airflow-cluster + group: airflow + shortname: airflow-cluster + layers: + - AIRFLOW + normal: true +{{- end }} diff --git a/test/e2e-v2/cases/airflow/mock-data/otel-airflow-metrics.json b/test/e2e-v2/cases/airflow/mock-data/otel-airflow-metrics.json new file mode 100644 index 000000000000..a2290ac2cc45 --- /dev/null +++ b/test/e2e-v2/cases/airflow/mock-data/otel-airflow-metrics.json @@ -0,0 +1,428 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "Airflow" + } + }, + { + "key": "cluster", + "value": { + "stringValue": "airflow-cluster" + } + }, + { + "key": "host.name", + "value": { + "stringValue": "airflow-scheduler" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "airflow.scheduler.tasks.executable", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 5.0 + } + ] + } + }, + { + "name": "airflow.executor.running_tasks", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 2.0 + } + ] + } + }, + { + "name": "airflow.pool.scheduled_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 1.0, + "attributes": [ + { + "key": "pool_name", + "value": { + "stringValue": "default_pool" + } + } + ] + } + ] + } + }, + { + "name": "airflow.executor.queued_tasks", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 3.0 + } + ] + } + }, + { + "name": "airflow.executor.open_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 16.0 + } + ] + } + }, + { + "name": "airflow.pool.queued_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 1.0, + "attributes": [ + { + "key": "pool_name", + "value": { + "stringValue": "default_pool" + } + } + ] + } + ] + } + }, + { + "name": "airflow.pool.deferred_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 0.0, + "attributes": [ + { + "key": "pool_name", + "value": { + "stringValue": "default_pool" + } + } + ] + } + ] + } + }, + { + "name": "airflow.scheduler.orphaned_tasks.cleared", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 2.0 + } + ] + } + }, + { + "name": "airflow.scheduler.orphaned_tasks.adopted", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 1.0 + } + ] + } + }, + { + "name": "airflow.scheduler_heartbeat", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 10.0 + } + ] + } + }, + { + "name": "airflow.dag_processing.file_path_queue_size", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 4.0 + } + ] + } + }, + { + "name": "airflow.dataset.updates", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 7.0 + } + ] + } + }, + { + "name": "airflow.dataset.orphaned", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 1.0 + } + ] + } + }, + { + "name": "airflow.dataset.triggered_dagruns", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 2.0 + } + ] + } + } + ] + } + ] + }, + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "Airflow" + } + }, + { + "key": "cluster", + "value": { + "stringValue": "airflow-cluster" + } + }, + { + "key": "host.name", + "value": { + "stringValue": "airflow-worker-1" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "airflow.pool.deferred_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 0.0, + "attributes": [ + { + "key": "pool_name", + "value": { + "stringValue": "default_pool" + } + } + ] + } + ] + } + }, + { + "name": "airflow.pool.open_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 8.0, + "attributes": [ + { + "key": "pool_name", + "value": { + "stringValue": "default_pool" + } + } + ] + } + ] + } + }, + { + "name": "airflow.pool.running_slots", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 2.0, + "attributes": [ + { + "key": "pool_name", + "value": { + "stringValue": "default_pool" + } + } + ] + } + ] + } + }, + { + "name": "airflow.dataset.updates", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 4.0 + } + ] + } + } + ] + } + ] + }, + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "Airflow" + } + }, + { + "key": "cluster", + "value": { + "stringValue": "airflow-cluster" + } + }, + { + "key": "host.name", + "value": { + "stringValue": "airflow-triggerer" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "airflow.scheduler.tasks.executable", + "gauge": { + "dataPoints": [ + { + "timeUnixNano": "1676140375004000000", + "asDouble": 0.0 + } + ] + } + }, + { + "name": "airflow.triggerer_heartbeat", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 5.0 + } + ] + } + }, + { + "name": "airflow.triggers.blocked_main_thread", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 0.0 + } + ] + } + }, + { + "name": "airflow.triggers.failed", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 0.0 + } + ] + } + }, + { + "name": "airflow.triggers.succeeded", + "sum": { + "aggregationTemporality": 2, + "isMonotonic": true, + "dataPoints": [ + { + "startTimeUnixNano": "1676140244999000000", + "timeUnixNano": "1676140375004000000", + "asDouble": 3.0 + } + ] + } + } + ] + } + ] + } + ] +} diff --git a/test/e2e-v2/cases/airflow/otel-collector-config.yaml b/test/e2e-v2/cases/airflow/otel-collector-config.yaml new file mode 100644 index 000000000000..1bb74a2eafa4 --- /dev/null +++ b/test/e2e-v2/cases/airflow/otel-collector-config.yaml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + grpc: + endpoint: 0.0.0.0:4317 + +processors: + batch: + +exporters: + otlp: + endpoint: oap:11800 + tls: + insecure: true + +service: + pipelines: + metrics: + receivers: + - otlp + processors: + - batch + exporters: + - otlp diff --git a/test/e2e-v2/cases/airflow/scripts/cluster-compose-env.sh b/test/e2e-v2/cases/airflow/scripts/cluster-compose-env.sh new file mode 100644 index 000000000000..a7623191a98a --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/cluster-compose-env.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Shared docker compose helpers for Airflow cluster e2e. +# infra-e2e uses project name {workspace}_e2e (e.g. skywalking_e2e), not the case folder name. + +COMPOSE_FILE="${COMPOSE_FILE:-test/e2e-v2/cases/airflow/docker-compose-cluster.yml}" +COMPOSE_OVERRIDE="${COMPOSE_OVERRIDE:-}" + +resolve_compose_project() { + if [[ -n "${COMPOSE_PROJECT_NAME:-}" ]]; then + echo "${COMPOSE_PROJECT_NAME}" + return + fi + local scheduler_container + scheduler_container="$(docker ps --filter 'name=-airflow-scheduler-' --format '{{.Names}}' | head -1)" + if [[ -n "${scheduler_container}" ]]; then + echo "${scheduler_container%-airflow-scheduler-*}" + return + fi + echo "skywalking_e2e" +} + +COMPOSE_PROJECT_NAME="$(resolve_compose_project)" + +dc() { + if [[ -n "${COMPOSE_OVERRIDE}" ]]; then + docker compose -p "${COMPOSE_PROJECT_NAME}" -f "${COMPOSE_FILE}" -f "${COMPOSE_OVERRIDE}" "$@" + else + docker compose -p "${COMPOSE_PROJECT_NAME}" -f "${COMPOSE_FILE}" "$@" + fi +} diff --git a/test/e2e-v2/cases/airflow/scripts/export-e2e-env.sh b/test/e2e-v2/cases/airflow/scripts/export-e2e-env.sh new file mode 100644 index 000000000000..617af8fae566 --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/export-e2e-env.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# infra-e2e reads ~/.skywalking-infra-e2e/.env for ${oap_host} / ${oap_12800} substitution in verify. +# On Windows the runner may not populate that file; export ports from the running compose project. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=cluster-compose-env.sh +source "${SCRIPT_DIR}/cluster-compose-env.sh" + +OAP_PORT="$(dc port oap 12800 | cut -d: -f2)" +ENV_DIR="${HOME}/.skywalking-infra-e2e" +ENV_FILE="${ENV_DIR}/.env" + +mkdir -p "${ENV_DIR}" +cat > "${ENV_FILE}" < setup (tools + workload) -> verify all cases. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../../.." && pwd)" +cd "${REPO_ROOT}" + +export OTEL_COLLECTOR_VERSION="${OTEL_COLLECTOR_VERSION:-0.102.1}" +export SW_AGENT_JDK_VERSION="${SW_AGENT_JDK_VERSION:-8}" + +# shellcheck source=cluster-compose-env.sh +source "${SCRIPT_DIR}/cluster-compose-env.sh" + +echo "=== Airflow cluster full e2e (project ${COMPOSE_PROJECT_NAME}) ===" +dc down --remove-orphans 2>/dev/null || true +dc up -d + +/usr/bin/bash test/e2e-v2/cases/airflow/scripts/run-cluster-setup.sh +/usr/bin/bash test/e2e-v2/cases/airflow/scripts/verify-cluster-e2e.sh + +echo "=== Full cluster e2e PASSED ===" diff --git a/test/e2e-v2/cases/airflow/scripts/run-mock-e2e.sh b/test/e2e-v2/cases/airflow/scripts/run-mock-e2e.sh new file mode 100644 index 000000000000..0eba30f08105 --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/run-mock-e2e.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Local mock e2e (OAP + BanyanDB + Python OTLP JSON replay sidecar). +# Set MOCK_E2E_USE_LOCAL_OVERRIDE=0 to mirror CI (docker-compose.yml only). + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CASE_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../../.." && pwd)" +cd "${REPO_ROOT}" + +export SW_BANYANDB_COMMIT="${SW_BANYANDB_COMMIT:-84b919efca3fee3d51df9e97a734a9f10ae6f1d2}" +export COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-airflow_mock_e2e}" +export PATH="/tmp/skywalking-infra-e2e/bin:/usr/bin:/bin:${PATH}" + +COMPOSE_FILE="${CASE_DIR}/docker-compose.yml" +LOCAL_OVERRIDE="${CASE_DIR}/docker-compose.mock-local.yml" +USE_LOCAL_OVERRIDE="${MOCK_E2E_USE_LOCAL_OVERRIDE:-1}" + +dc() { + if [[ "${USE_LOCAL_OVERRIDE}" == "1" ]]; then + docker compose -f "${COMPOSE_FILE}" -f "${LOCAL_OVERRIDE}" -p "${COMPOSE_PROJECT_NAME}" "$@" + else + docker compose -f "${COMPOSE_FILE}" -p "${COMPOSE_PROJECT_NAME}" "$@" + fi +} + +echo "=== Airflow mock e2e (project ${COMPOSE_PROJECT_NAME}, local_override=${USE_LOCAL_OVERRIDE}) ===" + +dc down --remove-orphans 2>/dev/null || true +echo "Building Python OTLP replay sender image..." +BUILD_ARGS=() +if [[ -n "${MOCK_SENDER_PYTHON_IMAGE:-}" ]]; then + BUILD_ARGS+=(--build-arg "PYTHON_IMAGE=${MOCK_SENDER_PYTHON_IMAGE}") +fi +dc build "${BUILD_ARGS[@]}" sender +dc up -d + +echo "Waiting for OAP and mock sender..." +for _ in $(seq 1 90); do + if dc exec -T oap bash -c 'cat < /dev/null > /dev/tcp/127.0.0.1/11800' 2>/dev/null && + dc exec -T sender sh -c 'nc -nz 127.0.0.1 9093' 2>/dev/null; then + break + fi + sleep 5 +done + +/usr/bin/bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl +/usr/bin/bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq + +trigger_metrics() { + dc exec -T sender sh -c \ + 'wget -q -O /dev/null http://127.0.0.1:9093/otel-metrics/send 2>/dev/null || curl -sf http://127.0.0.1:9093/otel-metrics/send' +} + +# Replay like infra-e2e trigger (continuous during verify for PT1M increase metrics). +trigger_loop() { + while true; do + trigger_metrics || true + sleep 3 + done +} + +echo "Seeding OTLP metrics..." +trigger_loop & +TRIGGER_PID=$! +sleep 120 + +VERIFY_RETRIES=20 VERIFY_INTERVAL_SECONDS=10 MOCK_E2E_USE_LOCAL_OVERRIDE="${USE_LOCAL_OVERRIDE}" \ + /usr/bin/bash "${SCRIPT_DIR}/verify-mock-e2e.sh" || VERIFY_EXIT=$? + +kill "${TRIGGER_PID}" 2>/dev/null || true +wait "${TRIGGER_PID}" 2>/dev/null || true + +if [[ "${VERIFY_EXIT:-0}" -ne 0 ]]; then + exit "${VERIFY_EXIT}" +fi + +echo "=== Mock e2e PASSED ===" diff --git a/test/e2e-v2/cases/airflow/scripts/seed-e2e-cluster-workload.sh b/test/e2e-v2/cases/airflow/scripts/seed-e2e-cluster-workload.sh new file mode 100644 index 000000000000..bd38c7a2ef34 --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/seed-e2e-cluster-workload.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=cluster-compose-env.sh +source "${SCRIPT_DIR}/cluster-compose-env.sh" + +SCHEDULER="${AIRFLOW_SCHEDULER_SERVICE:-airflow-scheduler}" +ROUNDS="${SEED_ROUNDS:-3}" +INTERVAL="${SEED_INTERVAL_SECONDS:-20}" +RUN_SECONDS="${RUN_SECONDS:-240}" + +# Native OTel coverage: deferrable (triggerer triggers_*) and dataset (asset_*). +NATIVE_OTEL_DAGS=( + e2e_deferrable + e2e_dataset_producer + e2e_dataset_consumer +) + +LOAD_DAGS=( + cluster_smoke + cluster_load + example_bash_operato + example_python_operato + example_branch_operato + example_short_circuit_operato +) + +echo "=== Airflow real-cluster e2e: seed workload (project ${COMPOSE_PROJECT_NAME}, ${ROUNDS} rounds, then ${RUN_SECONDS}s) ===" + +# Scheduler health does not guarantee DagModel is populated; trigger fails silently otherwise. +echo "Syncing DAG metadata to database..." +dc exec -T "${SCHEDULER}" airflow dags reserialize >/dev/null 2>&1 || true +sleep 10 + +trigger_dags() { + local dag + for dag in "$@"; do + dc exec -T "${SCHEDULER}" airflow dags trigger "${dag}" >/dev/null 2>&1 || true + done +} + +echo "Trigger native-OTel DAGs (deferrable + dataset)..." +trigger_dags "${NATIVE_OTEL_DAGS[@]}" + +for round in $(seq 1 "${ROUNDS}"); do + echo "Trigger load round ${round}/${ROUNDS}" + trigger_dags "${LOAD_DAGS[@]}" + trigger_dags e2e_deferrable e2e_dataset_producer e2e_dataset_consumer + if [[ "${round}" -lt "${ROUNDS}" ]]; then + sleep "${INTERVAL}" + fi +done + +echo "Running ${RUN_SECONDS}s for deferrable triggers, dataset events, and MAL aggregation..." +sleep "${RUN_SECONDS}" +echo "Workload seed complete." diff --git a/test/e2e-v2/cases/airflow/scripts/verify-cluster-e2e.sh b/test/e2e-v2/cases/airflow/scripts/verify-cluster-e2e.sh new file mode 100644 index 000000000000..1f8447ae1b6b --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/verify-cluster-e2e.sh @@ -0,0 +1,178 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Real Celery cluster integration verify (mirrors airflow-cluster-cases.yaml). +# Topology: 1 service + 3 OTLP-exporting instances + 10 service metrics + 13 instance metrics = 25 checks. +# Full SWIP-7 (30 checks) is covered by the mock suite (airflow-cases.yaml). + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=cluster-compose-env.sh +source "${SCRIPT_DIR}/cluster-compose-env.sh" + +export PATH="/tmp/skywalking-infra-e2e/bin:/usr/bin:/bin:${PATH}" + +SERVICE="airflow::airflow-e2e-cluster" +RETRIES="${VERIFY_RETRIES:-18}" +INTERVAL="${VERIFY_INTERVAL_SECONDS:-10}" +REPORT="${VERIFY_REPORT:-test/e2e-v2/cases/airflow/cluster-e2e-report.txt}" + +OAP_PORT="$(dc port oap 12800 | cut -d: -f2)" +BASE_URL="http://localhost:${OAP_PORT}/graphql" +SWCTL="${SWCTL:-swctl}" + +pass=0 +fail=0 + +log() { + echo "$@" | tee -a "${REPORT}" +} + +check_pass() { + pass=$((pass + 1)) + log " PASS: $1" +} + +check_fail() { + fail=$((fail + 1)) + log " FAIL: $1" + if [[ -n "${2:-}" ]]; then + log " detail: $2" + fi +} + +# At least one time-series point with a non-null numeric value (0 is OK). +metric_has_value() { + local expression="$1" + shift + local out + out="$("${SWCTL}" --display yaml --base-url="${BASE_URL}" metrics exec \ + --expression="${expression}" --service-name="${SERVICE}" "$@" 2>&1)" || return 1 + echo "${out}" | grep -q 'type: TIME_SERIES_VALUES' || return 1 + echo "${out}" | grep -qE '^[[:space:]]*- id:' || return 1 + echo "${out}" | grep -qE '^[[:space:]]*value: ("[0-9]+(\.[0-9]+)?"|[0-9]+(\.[0-9]+)?)$' +} + +verify_metric() { + local label="$1" + local expression="$2" + shift 2 + local attempt + for attempt in $(seq 1 "${RETRIES}"); do + if metric_has_value "${expression}" "$@"; then + check_pass "${label}" + return 0 + fi + if [[ "${attempt}" -lt "${RETRIES}" ]]; then + sleep "${INTERVAL}" + fi + done + check_fail "${label}" "no non-null value after ${RETRIES} attempts" + return 1 +} + +mkdir -p "$(dirname "${REPORT}")" +: > "${REPORT}" +log "=== Airflow cluster e2e verify (integration smoke) ===" +log "time: $(date -u +%Y-%m-%dT%H:%M:%SZ)" +log "compose project: ${COMPOSE_PROJECT_NAME}" +log "OAP GraphQL: ${BASE_URL}" +log "" + +# --- Topology --- +for attempt in $(seq 1 "${RETRIES}"); do + if out=$("${SWCTL}" --display yaml --base-url="${BASE_URL}" service ly AIRFLOW 2>&1) && + echo "${out}" | grep -q "name: airflow::airflow-e2e-cluster" && + echo "${out}" | grep -q "AIRFLOW"; then + check_pass "service ly AIRFLOW -> airflow::airflow-e2e-cluster" + break + fi + if [[ "${attempt}" -eq "${RETRIES}" ]]; then + check_fail "service ly AIRFLOW -> airflow::airflow-e2e-cluster" + else + sleep "${INTERVAL}" + fi +done + +for attempt in $(seq 1 "${RETRIES}"); do + if out=$("${SWCTL}" --display yaml --base-url="${BASE_URL}" instance ls --service-name="${SERVICE}" 2>&1) && + echo "${out}" | grep -q "name: airflow-scheduler" && + echo "${out}" | grep -q "name: airflow-worker-1" && + echo "${out}" | grep -q "name: airflow-triggerer"; then + check_pass "instances: scheduler, worker-1, triggerer" + break + fi + if [[ "${attempt}" -eq "${RETRIES}" ]]; then + check_fail "instances: scheduler, worker-1, triggerer" + else + sleep "${INTERVAL}" + fi +done + +log "" +log "--- Service metrics (10) ---" + +SERVICE_METRICS=( + meter_airflow_scheduler_tasks_executable + meter_airflow_executor_queued_tasks + meter_airflow_executor_running_tasks + meter_airflow_executor_open_slots + meter_airflow_pool_deferred_slots + meter_airflow_pool_scheduled_slots + meter_airflow_scheduler_heartbeat + meter_airflow_scheduler_orphaned_tasks_cleared + meter_airflow_scheduler_orphaned_tasks_adopted + meter_airflow_dag_file_queue_size +) + +for metric in "${SERVICE_METRICS[@]}"; do + verify_metric "${metric} (service)" "${metric}" || true +done + +log "" +log "--- Instance metrics (13) ---" + +# expression|instance-name|label suffix (13 instance-level metrics in cluster smoke) +INSTANCE_METRICS=( + "meter_airflow_instance_pool_open_slots|airflow-worker-1|pool_open_slots worker-1" + "meter_airflow_instance_pool_deferred_slots|airflow-worker-1|pool_deferred_slots worker-1" + "meter_airflow_instance_pool_running_slots|airflow-worker-1|pool_running_slots worker-1" + "meter_airflow_instance_pool_scheduled_slots|airflow-scheduler|pool_scheduled_slots scheduler" + "meter_airflow_instance_triggerer_heartbeat|airflow-triggerer|triggerer_heartbeat" + "meter_airflow_instance_triggers_succeeded|airflow-triggerer|triggers_succeeded" + "meter_airflow_instance_scheduler_tasks_executable|airflow-scheduler|scheduler_tasks_executable" + "meter_airflow_instance_scheduler_orphaned_tasks_cleared|airflow-scheduler|scheduler_orphaned_tasks_cleared" + "meter_airflow_instance_scheduler_orphaned_tasks_adopted|airflow-scheduler|scheduler_orphaned_tasks_adopted" + "meter_airflow_instance_executor_queued_tasks|airflow-scheduler|executor_queued_tasks scheduler" + "meter_airflow_instance_executor_running_tasks|airflow-scheduler|executor_running_tasks scheduler" + "meter_airflow_instance_asset_orphaned|airflow-scheduler|asset_orphaned scheduler" + "meter_airflow_instance_asset_triggered_dagruns|airflow-scheduler|asset_triggered_dagruns scheduler" +) + +for entry in "${INSTANCE_METRICS[@]}"; do + IFS='|' read -r expression instance label <<< "${entry}" + verify_metric "${expression} (${label})" "${expression}" --instance-name="${instance}" || true +done + +log "" +log "=== Summary ===" +log "PASS: ${pass} FAIL: ${fail} TOTAL: $((pass + fail))" +log "Report: ${REPORT}" + +if [[ "${fail}" -gt 0 ]]; then + exit 1 +fi diff --git a/test/e2e-v2/cases/airflow/scripts/verify-mock-e2e.sh b/test/e2e-v2/cases/airflow/scripts/verify-mock-e2e.sh new file mode 100644 index 000000000000..9b1486dd998f --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/verify-mock-e2e.sh @@ -0,0 +1,193 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Full SWIP-7 verify for mock OTLP replay (mirrors airflow-cases.yaml). +# Topology: 1 service + 3 instances + 12 service metrics + 16 instance metrics = 30 checks. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CASE_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../../.." && pwd)" +cd "${REPO_ROOT}" + +export COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-airflow_mock_e2e}" +export PATH="/tmp/skywalking-infra-e2e/bin:/usr/bin:/bin:${PATH}" + +COMPOSE_FILE="${CASE_DIR}/docker-compose.yml" +LOCAL_OVERRIDE="${CASE_DIR}/docker-compose.mock-local.yml" +USE_LOCAL_OVERRIDE="${MOCK_E2E_USE_LOCAL_OVERRIDE:-1}" +SERVICE="airflow::airflow-cluster" +RETRIES="${VERIFY_RETRIES:-20}" +INTERVAL="${VERIFY_INTERVAL_SECONDS:-10}" +REPORT="${VERIFY_REPORT:-test/e2e-v2/cases/airflow/mock-e2e-report.txt}" + +dc() { + if [[ "${USE_LOCAL_OVERRIDE}" == "1" ]]; then + docker compose -f "${COMPOSE_FILE}" -f "${LOCAL_OVERRIDE}" -p "${COMPOSE_PROJECT_NAME}" "$@" + else + docker compose -f "${COMPOSE_FILE}" -p "${COMPOSE_PROJECT_NAME}" "$@" + fi +} + +OAP_PORT="$(dc port oap 12800 | cut -d: -f2)" +BASE_URL="http://localhost:${OAP_PORT}/graphql" +SWCTL="${SWCTL:-swctl}" + +pass=0 +fail=0 + +log() { + echo "$@" | tee -a "${REPORT}" +} + +check_pass() { + pass=$((pass + 1)) + log " PASS: $1" +} + +check_fail() { + fail=$((fail + 1)) + log " FAIL: $1" + if [[ -n "${2:-}" ]]; then + log " detail: $2" + fi +} + +metric_has_value() { + local expression="$1" + shift + local out + out="$("${SWCTL}" --display yaml --base-url="${BASE_URL}" metrics exec \ + --expression="${expression}" --service-name="${SERVICE}" "$@" 2>&1)" || return 1 + echo "${out}" | grep -q 'type: TIME_SERIES_VALUES' || return 1 + echo "${out}" | grep -qE '^[[:space:]]*- id:' || return 1 + echo "${out}" | grep -qE '^[[:space:]]*value: ("[0-9]+(\.[0-9]+)?"|[0-9]+(\.[0-9]+)?)$' +} + +verify_metric() { + local label="$1" + local expression="$2" + shift 2 + local attempt + for attempt in $(seq 1 "${RETRIES}"); do + if metric_has_value "${expression}" "$@"; then + check_pass "${label}" + return 0 + fi + if [[ "${attempt}" -lt "${RETRIES}" ]]; then + sleep "${INTERVAL}" + fi + done + check_fail "${label}" "no non-null value after ${RETRIES} attempts" + return 1 +} + +mkdir -p "$(dirname "${REPORT}")" +: > "${REPORT}" +log "=== Airflow mock e2e verify (full SWIP-7) ===" +log "time: $(date -u +%Y-%m-%dT%H:%M:%SZ)" +log "compose project: ${COMPOSE_PROJECT_NAME}" +log "OAP GraphQL: ${BASE_URL}" +log "" + +for attempt in $(seq 1 "${RETRIES}"); do + if out=$("${SWCTL}" --display yaml --base-url="${BASE_URL}" service ly AIRFLOW 2>&1) && + echo "${out}" | grep -q "name: airflow::airflow-cluster" && + echo "${out}" | grep -q "AIRFLOW"; then + check_pass "service ly AIRFLOW -> airflow::airflow-cluster" + break + fi + if [[ "${attempt}" -eq "${RETRIES}" ]]; then + check_fail "service ly AIRFLOW -> airflow::airflow-cluster" + else + sleep "${INTERVAL}" + fi +done + +for attempt in $(seq 1 "${RETRIES}"); do + if out=$("${SWCTL}" --display yaml --base-url="${BASE_URL}" instance ls \ + --service-name="${SERVICE}" 2>&1) && + echo "${out}" | grep -q "name: airflow-scheduler" && + echo "${out}" | grep -q "name: airflow-worker-1" && + echo "${out}" | grep -q "name: airflow-triggerer"; then + check_pass "instances: scheduler, worker-1, triggerer" + break + fi + if [[ "${attempt}" -eq "${RETRIES}" ]]; then + check_fail "instances: scheduler, worker-1, triggerer" + else + sleep "${INTERVAL}" + fi +done + +log "" +log "--- Service metrics (12) ---" + +SERVICE_METRICS=( + meter_airflow_scheduler_tasks_executable + meter_airflow_executor_queued_tasks + meter_airflow_executor_running_tasks + meter_airflow_executor_open_slots + meter_airflow_pool_queued_slots + meter_airflow_pool_deferred_slots + meter_airflow_pool_scheduled_slots + meter_airflow_scheduler_heartbeat + meter_airflow_scheduler_orphaned_tasks_cleared + meter_airflow_scheduler_orphaned_tasks_adopted + meter_airflow_dag_file_queue_size + meter_airflow_asset_updates +) + +for metric in "${SERVICE_METRICS[@]}"; do + verify_metric "${metric} (service)" "${metric}" || true +done + +log "" +log "--- Instance metrics (16) ---" + +INSTANCE_METRICS=( + "meter_airflow_instance_pool_open_slots|airflow-worker-1|pool_open_slots worker-1" + "meter_airflow_instance_pool_deferred_slots|airflow-worker-1|pool_deferred_slots worker-1" + "meter_airflow_instance_pool_running_slots|airflow-worker-1|pool_running_slots worker-1" + "meter_airflow_instance_pool_scheduled_slots|airflow-scheduler|pool_scheduled_slots scheduler" + "meter_airflow_instance_triggerer_heartbeat|airflow-triggerer|triggerer_heartbeat" + "meter_airflow_instance_triggers_blocked_main_thread|airflow-triggerer|triggers_blocked_main_thread" + "meter_airflow_instance_triggers_failed|airflow-triggerer|triggers_failed" + "meter_airflow_instance_triggers_succeeded|airflow-triggerer|triggers_succeeded" + "meter_airflow_instance_scheduler_tasks_executable|airflow-scheduler|scheduler_tasks_executable" + "meter_airflow_instance_scheduler_orphaned_tasks_cleared|airflow-scheduler|scheduler_orphaned_tasks_cleared" + "meter_airflow_instance_scheduler_orphaned_tasks_adopted|airflow-scheduler|scheduler_orphaned_tasks_adopted" + "meter_airflow_instance_executor_queued_tasks|airflow-scheduler|executor_queued_tasks scheduler" + "meter_airflow_instance_executor_running_tasks|airflow-scheduler|executor_running_tasks scheduler" + "meter_airflow_instance_asset_updates|airflow-worker-1|asset_updates worker-1" + "meter_airflow_instance_asset_orphaned|airflow-scheduler|asset_orphaned scheduler" + "meter_airflow_instance_asset_triggered_dagruns|airflow-scheduler|asset_triggered_dagruns scheduler" +) + +for entry in "${INSTANCE_METRICS[@]}"; do + IFS='|' read -r expression instance label <<< "${entry}" + verify_metric "${expression} (${label})" "${expression}" --instance-name="${instance}" || true +done + +log "" +log "=== Summary ===" +log "PASS: ${pass} FAIL: ${fail} TOTAL: $((pass + fail))" +log "Report: ${REPORT}" + +if [[ "${fail}" -gt 0 ]]; then + exit 1 +fi diff --git a/test/e2e-v2/cases/airflow/scripts/wait-scheduler-healthy.sh b/test/e2e-v2/cases/airflow/scripts/wait-scheduler-healthy.sh new file mode 100644 index 000000000000..b558f5c9c1ae --- /dev/null +++ b/test/e2e-v2/cases/airflow/scripts/wait-scheduler-healthy.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=cluster-compose-env.sh +source "${SCRIPT_DIR}/cluster-compose-env.sh" + +SCHEDULER="${AIRFLOW_SCHEDULER_SERVICE:-airflow-scheduler}" +MAX_ATTEMPTS="${SCHEDULER_HEALTH_ATTEMPTS:-90}" +SLEEP_SECONDS="${SCHEDULER_HEALTH_INTERVAL_SECONDS:-10}" + +echo "Waiting for ${SCHEDULER} (compose project ${COMPOSE_PROJECT_NAME})..." + +for _ in $(seq 1 "${MAX_ATTEMPTS}"); do + if dc exec -T "${SCHEDULER}" \ + airflow jobs check --job-type SchedulerJob --hostname "${SCHEDULER}"; then + echo "Airflow scheduler healthy" + exit 0 + fi + sleep "${SLEEP_SECONDS}" +done + +echo "Airflow scheduler did not become healthy in time" +exit 1 diff --git a/test/e2e-v2/cases/storage/expected/config-dump.yml b/test/e2e-v2/cases/storage/expected/config-dump.yml index a95bda9e18f3..e2a37ee996cb 100644 --- a/test/e2e-v2/cases/storage/expected/config-dump.yml +++ b/test/e2e-v2/cases/storage/expected/config-dump.yml @@ -176,7 +176,7 @@ "receiver-log.provider": "default", "receiver-meter.provider": "default", "receiver-otel.default.enabledHandlers": "otlp-traces,otlp-metrics,otlp-logs", - "receiver-otel.default.enabledOtelMetricsRules": "apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*", + "receiver-otel.default.enabledOtelMetricsRules": "apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,activemq/*,kong/*,flink/*,airflow/*,banyandb/*,envoy-ai-gateway/*,ios/*,miniprogram/*", "receiver-otel.provider": "default", "receiver-pprof.default.memoryParserEnabled": "true", "receiver-pprof.default.pprofMaxSize": "31457280",