diff --git a/composer/tools/README.md b/composer/tools/README.md
index ad686ca7c8b..8e8f8a3eab7 100644
--- a/composer/tools/README.md
+++ b/composer/tools/README.md
@@ -1,3 +1,4 @@
# Composer Tools
* [Composer DAGs Pausing/Unpausing script](composer_dags.md)
+* [Composer DAGs Parsing Profiler tool](parsing_profiler/README.md)
\ No newline at end of file
diff --git a/composer/tools/parsing_profiler/README.md b/composer/tools/parsing_profiler/README.md
new file mode 100644
index 00000000000..78fa0840c51
--- /dev/null
+++ b/composer/tools/parsing_profiler/README.md
@@ -0,0 +1,48 @@
+# đ Composer DAG Linter & Parsing Profiler
+
+## Overview
+This Airflow DAG provides an **on-demand performance profiling** tool for your DAG parsing logic, designed to help you optimize your Google Cloud Composer environment.
+
+When triggered, it executes a **one-off** analysis inside an isolated Kubernetes Pod. This mimics the standard parsing process but runs independently, ensuring that **parsing latency** and resource-heavy **top-level code** can be identified without interrupting your environment's normal operations. Additionally, it validates DAG integrity and detects syntax errors.
+
+## đ Key Features
+* **Isolated Execution:** Executes the diagnostic parsing logic in a dedicated Pod to ensure no resource contention with your live environment.
+* **Top-Level Code Profiling:** Detects DAGs that exceed a configurable parse-time threshold and generates a `cProfile` report to identify the specific calls causing delays (e.g., database connections, heavy imports).
+* **Smart Image Detection:** Automatically detects the correct worker image for environments with **extra PyPI packages**, ensuring accurate replication of dependencies.
+ * *Note:* If a "Vanilla" (Default) environment is detected, the task will **Skip** gracefully and request manual configuration.
+* **Parallel Processing:** Leverages multiprocessing to analyze the entire DAG repository efficiently.
+* **Cross-Environment Diagnostics:** Capable of scanning unstable or crashing environments by running this tool from a separate, stable Composer instance.
+
+---
+
+## âī¸ Quick Setup
+
+### 1. Installation
+Upload both files to your Composer environment's `dags/` folder:
+* `dag_linter_kubernetes_pod.py` (The Orchestrator)
+* `linter_core.py` (The Logic Script)
+
+> **Note:** The orchestrator expects `linter_core.py` to be in the **same directory** as the DAG file.
+
+### 2. Configuration
+Open `dag_linter_kubernetes_pod.py`. The tool automatically detects your bucket and image, but you can configure limits:
+
+| Variable | Description |
+| :--- | :--- |
+| `_CONFIG_GCS_BUCKET_NAME` | The bucket containing your DAGs/Plugins. Set to `None` for auto-detection. |
+| `_CONFIG_POD_IMAGE` | **CRITICAL:** Path to your Composer Worker image. Set to `None` for auto-detection.
**Manual Retrieval Options (for Vanilla or Cross Environment):**
**Option 1: Cloud Build Logs (Composer 2 & 3)**
Check the logs of the most recent successful build in Cloud Build.
**Option 2: GKE Workloads (Composer 2 Only)**
Navigate to Environment's GKE > Workloads > airflow-worker > YAML tab and look for `image:` under the 'spec.containers' section.
**Option 3: Customer Support**
Customers with a valid support package can contact Google Cloud Support for assistance. |
+| `_CONFIG_POD_DISK_SIZE` | Ephemeral storage size for the Pod (ensure this fits your repo size). |
+| `_CONFIG_PARSE_TIME_THRESHOLD_SECONDS` | Time limit before a DAG is flagged as "slow". |
+
+#### â ī¸ Cross-Environment Diagnostics (Advanced)
+If you are using this tool in a **Stable** environment to debug a **different** environment, you must set `_CONFIG_GCS_BUCKET_NAME` and `_CONFIG_POD_IMAGE` manually.
+
+**Strict Prerequisites:**
+1. **Same Service Project:** Both environments must reside in the same Service Project.
+2. **Same Service Account:** The Stable environment must run as the **same Service Account** as the target environment to ensure correct IAM permissions for GCP resources.
+3. **Same Major Version:** You must troubleshoot Composer 2 with Composer 2, or Composer 3 with Composer 3.
+
+### 3. Execution
+Trigger the DAG **`composer_dag_parser_profile`** manually from the Airflow UI.
+
+Check the Task Logs for the **`profile_and_check_linter`** task to view the integrity report and performance profiles.
\ No newline at end of file
diff --git a/composer/tools/parsing_profiler/dag_linter_kubernetes_pod.py b/composer/tools/parsing_profiler/dag_linter_kubernetes_pod.py
new file mode 100644
index 00000000000..125e2f94c26
--- /dev/null
+++ b/composer/tools/parsing_profiler/dag_linter_kubernetes_pod.py
@@ -0,0 +1,433 @@
+#!/usr/bin/env python
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# [START composer_dag_parsing_profiler_dag]
+"""
+Orchestration DAG for the Composer Parsing Profiler.
+
+This script defines the Airflow DAG that provisions and launches the isolated
+profiling environment. It handles:
+1. Configuration resolution (auto-detecting buckets and worker images).
+2. Resource provisioning (ephemeral storage volumes).
+3. Execution of the core analysis logic within a KubernetesPodOperator.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import pendulum
+import requests
+
+import google.auth
+from google.auth.transport.requests import Request
+
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator
+from airflow.exceptions import AirflowSkipException
+from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
+from kubernetes.client import models as k8s
+
+# ==============================================================================
+# âī¸ CONFIGURATION SECTION
+# ==============================================================================
+
+# --- 1. Feature Flags ---
+# Enable to run cProfile on DAGs that exceed the parse time threshold.
+_CONFIG_PROFILE_SLOW_DAGS = True
+
+# Enable to download 'data/' and 'plugins/' folders to the pod.
+# Essential if your DAGs rely on custom plugins or local data files during import.
+_CONFIG_FETCH_DATA_AND_PLUGINS = True
+
+# Profiling Sorting Strategy:
+# 'tottime' (Default): Sort by time spent INSIDE the function itself.
+# Best for finding specific bottlenecks (math, regex, sleeps).
+# 'cumtime': Sort by cumulative time (function + sub-calls).
+# Best for finding heavy high-level imports.
+_CONFIG_PROFILE_SORT_KEY = "tottime"
+
+# The maximum acceptable time (in seconds) for a DAG file to be parsed.
+# Files exceeding this threshold trigger a WARNING and performance profiling.
+#
+# CALIBRATION HINT: This default is arbitrary. Parsing duration within this
+# isolated Pod differs from the actual Scheduler parsing time. It is recommended
+# to use the parse time of the simple, built-in 'airflow_monitoring' DAG as a
+# reference baseline.
+_CONFIG_PARSE_TIME_THRESHOLD_SECONDS = 1.0
+
+# --- 2. Infrastructure Configuration ---
+
+# OPTIONAL: Manual Override for the GCS Bucket.
+# Leave as None to automatically detect the environment's default bucket.
+# Set to a specific string only if you need to target a different environment's
+# bucket (e.g., for cross-environment troubleshooting).
+# Format: "region-envName-hash-bucket"
+_CONFIG_GCS_BUCKET_NAME = None
+
+# OPTIONAL: Manual Override for the Worker Image.
+# Leave as None to automatically detect the correct image for the current environment.
+# Set to a specific string URI only if you need to replicate a remote environment
+# (see Cross-Environment Troubleshooting Guide below).
+_CONFIG_POD_IMAGE = None
+
+# --- 3. Source Folder Paths (Defaults) ---
+# These variables define the specific source folders within the GCS bucket.
+# They allow developers to point the linter at non-standard directories (e.g.,
+# a test copy like 'dags_staging/') while the core script ensures the contents
+# are mapped to the correct Airflow local directory name (dags, plugins, etc.)
+# for validation.
+_CONFIG_GCS_DAGS_SOURCE_FOLDER = "dags/"
+_CONFIG_GCS_PLUGINS_SOURCE_FOLDER = "plugins/"
+_CONFIG_GCS_DATA_SOURCE_FOLDER = "data/"
+
+# --- 4. Kubernetes Resources ---
+# The disk size is crucial for parallel processing, especially when downloading
+# large supporting files. This configures ephemeral storage for the pod.
+#
+# CAUTION: If setting this value above 10Gi, ensure you are running on
+# Composer 3+ environments (which support up to 100Gi). Large values often
+# indicate unnecessary files; consider cleaning up the source folders before
+# increasing this limit.
+_CONFIG_POD_DISK_SIZE = "10Gi" # Default set to 10Gi (typical Composer 2 maximum).
+
+# Keep these values as the minimum resources. Having smaller values can crash the
+# pod due to the heavy parallel processing and multiple DAG imports happening simultaneously.
+_CONFIG_POD_NAMESPACE = "composer-user-workloads"
+_CONFIG_POD_RESOURCES = k8s.V1ResourceRequirements(
+ requests={"cpu": "4000m", "memory": "16Gi"},
+ limits={"cpu": "4000m", "memory": "16Gi"},
+)
+
+# ==============================================================================
+# đ§° CROSS-ENVIRONMENT TROUBLESHOOTING GUIDE
+# ==============================================================================
+# For unstable environments (e.g., crashing schedulers), run this DAG from a
+# stable Composer environment that meets the following criteria:
+#
+# REQUIREMENTS:
+# 1. Location: Must be in the SAME Service Project as the target environment.
+# 2. Identity: Must use the SAME Service Account as the target environment.
+# (This ensures identical IAM permissions for accessing GCP resources).
+# 3. Version: Must match the major Composer version of the target environment
+# (e.g., troubleshoot a Composer 3 environment using another Composer 3
+# environment). This ensures infrastructure and image compatibility.
+#
+# CONFIGURATION:
+# - Set _CONFIG_GCS_BUCKET_NAME to the target environment's bucket.
+# - Set _CONFIG_POD_IMAGE manually to the exact image path of the target environment.
+#
+# Manual Image Retrieval Options:
+# Option 1: Cloud Build Logs (Composer 2 & 3)
+# Check the logs of the most recent successful build in Cloud Build.
+# The image URI is typically listed in the build steps or artifacts.
+#
+# Option 2: GKE Workloads (Composer 2 Only)
+# Go to GKE > Workloads > airflow-worker > YAML tab.
+# Look for the 'image:' field under the 'spec.containers' section.
+#
+# Option 3: Google Cloud Support
+# Customers with a valid support package can contact Google Cloud Support
+# for assistance in locating the correct worker image path.
+
+
+# ==============================================================================
+# đĩī¸ IMAGE DETECTION LOGIC (ARTIFACT REGISTRY CHECK)
+# ==============================================================================
+def _verify_docker_image_v2(image_uri: str) -> bool:
+ """
+ Verifies if an image exists by checking its manifest via the Docker Registry V2 API.
+ This mimics a 'docker pull' check and works even if 'list' permissions are restricted.
+ """
+ print(f" đ Verifying Manifest for: {image_uri}")
+
+ try:
+ # 1. Parse the URI
+ parts = image_uri.split('/')
+ domain = parts[0]
+ repo_path = "/".join(parts[1:])
+
+ # 2. Construct V2 API URL
+ api_url = f"https://{domain}/v2/{repo_path}/manifests/latest"
+
+ # 3. Authenticate
+ credentials, _ = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
+ credentials.refresh(Request())
+
+ # 4. Request Manifest
+ response = requests.get(
+ api_url,
+ headers={'Authorization': f'Bearer {credentials.token}'},
+ timeout=10
+ )
+
+ if response.status_code == 200:
+ print(f" â
Manifest found (HTTP 200). Valid Custom Image.")
+ return True
+
+ if response.status_code == 404:
+ print(f" âšī¸ Manifest not found (HTTP 404). Confirmed Vanilla Environment.")
+ return False
+
+ print(f" â ī¸ Access Forbidden (HTTP {response.status_code}). Assuming Vanilla.")
+ return False
+
+ except requests.RequestException as e:
+ print(f" â ī¸ Verification failed with error: {e}. Assuming Vanilla.")
+ return False
+
+
+def _get_c3_image_string() -> str | None:
+ """Constructs and verifies the C3 image URL."""
+ fingerprint = os.getenv('COMPOSER_OPERATION_FINGERPRINT')
+ project_id = os.getenv('GCP_TENANT_PROJECT')
+ location = os.getenv('COMPOSER_LOCATION')
+
+ if not all([fingerprint, location, project_id]):
+ return None
+
+ image_uuid = fingerprint.split('@')[0]
+ registry_domain = f"{location}-docker.pkg.dev"
+
+ c3_image = f"{registry_domain}/{project_id}/composer-images/{image_uuid}"
+
+ # Validate existence via Docker V2 API
+ if _verify_docker_image_v2(c3_image):
+ print(f" â
Using C3 Image Path: {c3_image}")
+ return c3_image
+
+ return None
+
+
+def _get_c2_image_api() -> str | None:
+ """Queries the Google Artifact Registry API to find the latest Docker image (C2)."""
+ project_id = os.getenv('GCP_PROJECT')
+ location = os.getenv('COMPOSER_LOCATION')
+ gke_name = os.getenv('COMPOSER_GKE_NAME')
+
+ if not all([project_id, location, gke_name]):
+ return None
+
+ repo_name = f"composer-images-{gke_name}"
+ print(f" đ Querying Repo: projects/{project_id}/locations/{location}/repositories/{repo_name}")
+
+ try:
+ credentials, _ = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
+ credentials.refresh(Request())
+ except google.auth.exceptions.DefaultCredentialsError as e:
+ print(f" â ī¸ Authentication failed: {e}")
+ return None
+
+ api_url = (
+ f"https://artifactregistry.googleapis.com/v1/"
+ f"projects/{project_id}/locations/{location}/repositories/{repo_name}/dockerImages"
+ )
+
+ try:
+ response = requests.get(
+ api_url,
+ params={'pageSize': 1, 'orderBy': 'update_time desc'},
+ headers={'Authorization': f'Bearer {credentials.token}'},
+ timeout=10
+ )
+
+ if response.status_code == 404:
+ print(" âšī¸ Repository not found (Likely a Vanilla Environment).")
+ return None
+
+ response.raise_for_status()
+ data = response.json()
+
+ if 'dockerImages' in data and len(data['dockerImages']) > 0:
+ image_uri = data['dockerImages'][0]['uri']
+ clean_uri = image_uri.split('@')[0]
+ print(f" â
Found Custom Image: {clean_uri}")
+ return clean_uri
+ else:
+ print(" âšī¸ Repository exists but is empty (Vanilla Environment).")
+ return None
+
+ except requests.RequestException as e:
+ print(f" â ī¸ API Request failed: {e}")
+ return None
+
+
+def detect_worker_image(**context) -> str:
+ """Determines the correct worker image to use for the profiler Pod.
+
+ Logic:
+ 1. Manual: Checks _CONFIG_POD_IMAGE variable.
+ 2. C3 (Auto): Constructs path from Fingerprint and verifies manifest via V2 API.
+ 3. C2 (Auto): Queries Artifact Registry API.
+
+ Returns:
+ str: The full URI of the worker image to use.
+
+ Raises:
+ AirflowSkipException: If a custom image cannot be detected (Vanilla environment).
+ """
+ # 1. Check for Manual Override
+ if _CONFIG_POD_IMAGE:
+ print(f"â
Using manually configured image: {_CONFIG_POD_IMAGE}")
+ return _CONFIG_POD_IMAGE
+
+ print("đš Manual image not configured. Attempting to detect CUSTOM image...")
+
+ c3_fingerprint = os.getenv('COMPOSER_OPERATION_FINGERPRINT')
+
+ if c3_fingerprint:
+ print(" Detected: Composer 3 Environment")
+ c3_image = _get_c3_image_string()
+ if c3_image:
+ return c3_image
+ else:
+ print(" Detected: Composer 2 Environment")
+ c2_image = _get_c2_image_api()
+ if c2_image:
+ return c2_image
+
+ # 4. Graceful Skip for Vanilla Environments
+ print(" â ī¸ No Custom Image found. This environment appears to be 'Vanilla'.")
+ print(" â ī¸ Skipping Linter execution as default images cannot be auto-detected.")
+
+ raise AirflowSkipException(
+ "Skipping: Vanilla Environment detected. To profile this environment, "
+ "please retrieve the image manually (see 'Cross-Environment Guide' in code) "
+ "and set the '_CONFIG_POD_IMAGE' variable."
+ )
+
+
+# ==============================================================================
+# đ LINTER SCRIPT GENERATION (EXECUTION TIME)
+# ==============================================================================
+def generate_linter_command(**context):
+ """
+ Reads the core script, resolves configuration, and builds the python command.
+ """
+ # 1. Resolve the GCS Bucket Name (Auto-detect or Manual)
+ target_bucket = _CONFIG_GCS_BUCKET_NAME if _CONFIG_GCS_BUCKET_NAME else os.environ.get("GCS_BUCKET")
+
+ if not target_bucket:
+ raise ValueError(
+ "đ¨ FATAL: Could not auto-detect GCS Bucket. "
+ "Please configure '_CONFIG_GCS_BUCKET_NAME' manually in the DAG file."
+ )
+
+ # 2. Read the content of the external script.
+ # We use the location of the current file to find linter_core.py
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ core_script_path = os.path.join(current_dir, 'linter_core.py')
+
+ try:
+ with open(core_script_path, 'r') as f:
+ linter_script_content = f.read()
+ except FileNotFoundError:
+ raise FileNotFoundError(
+ f"Could not find 'linter_core.py' at {core_script_path}. "
+ "Ensure both files are in the DAGs folder."
+ )
+
+ # 3. Build the configuration dictionary
+ linter_config_payload = {
+ "GCS_BUCKET": target_bucket,
+ "BASE_WORK_DIR": "/mnt/data/airflow_content",
+ "FETCH_EXTRAS": _CONFIG_FETCH_DATA_AND_PLUGINS,
+ "PROFILE_SLOW": _CONFIG_PROFILE_SLOW_DAGS,
+ "PROFILE_SORT_KEY": _CONFIG_PROFILE_SORT_KEY,
+ "PARSE_TIME_THRESHOLD_SECONDS": _CONFIG_PARSE_TIME_THRESHOLD_SECONDS,
+ "GCS_DAGS_SOURCE_FOLDER": _CONFIG_GCS_DAGS_SOURCE_FOLDER,
+ "GCS_PLUGINS_SOURCE_FOLDER": _CONFIG_GCS_PLUGINS_SOURCE_FOLDER,
+ "GCS_DATA_SOURCE_FOLDER": _CONFIG_GCS_DATA_SOURCE_FOLDER,
+ }
+
+ # 4. Serialize the config dictionary to a JSON string
+ config_json_arg = json.dumps(linter_config_payload)
+
+ # 5. Construct the full Python command string
+ # Use string concatenation (+) to prevent f-string corruption of the script content.
+ final_command_string = (
+ linter_script_content +
+ f"\nmain('{config_json_arg}')"
+ )
+
+ # Push to XCom manually to avoid printing the huge command string in the logs
+ context['task_instance'].xcom_push(key='linter_command', value=final_command_string)
+
+
+# ==============================================================================
+# đ¨ DAG DEFINITION
+# ==============================================================================
+with DAG(
+ dag_id="composer_dag_parser_profile",
+ start_date=pendulum.datetime(2025, 8, 6, tz="UTC"),
+ schedule=None, # Triggered manually/on-demand
+ catchup=False,
+ tags=["profiler", "troubleshooting", "gcp-composer"],
+ doc_md=__doc__, # Use the module docstring as DAG documentation
+) as dag:
+
+ # Define the Volume and Volume Mount for guaranteed storage
+ storage_volume = k8s.V1Volume(
+ name="ephemeral-storage",
+ # Use EmptyDir with size_limit to guarantee the requested ephemeral storage
+ empty_dir=k8s.V1EmptyDirVolumeSource(size_limit=_CONFIG_POD_DISK_SIZE),
+ )
+
+ storage_volume_mount = k8s.V1VolumeMount(
+ name="ephemeral-storage",
+ mount_path="/mnt/data", # Mount point used by BASE_WORK_DIR in injected script
+ )
+
+ # Task 1: Detect the correct worker image (Auto or Manual)
+ detect_image_task = PythonOperator(
+ task_id="detect_worker_image",
+ python_callable=detect_worker_image,
+ retries=0, # FAIL FAST: If image detection fails (Vanilla), stop DAG immediately.
+ )
+
+ # Task 2: Prepare the Linter Command (Reads file, builds config)
+ prepare_script_task = PythonOperator(
+ task_id="prepare_linter_script",
+ python_callable=generate_linter_command,
+ )
+
+ # Task 3: Execute the Linter Pod
+ profile_and_check_linter = KubernetesPodOperator(
+ task_id="profile_and_check_linter",
+ name="dag-linter-pod",
+ namespace=_CONFIG_POD_NAMESPACE,
+
+ # Dynamically pull the image and command from previous tasks (using custom XCom key for command)
+ image="{{ task_instance.xcom_pull(task_ids='detect_worker_image') }}",
+ arguments=["-c", "{{ task_instance.xcom_pull(task_ids='prepare_linter_script', key='linter_command') }}"],
+ cmds=["python"],
+
+ container_resources=_CONFIG_POD_RESOURCES,
+ do_xcom_push=False,
+ get_logs=True,
+ log_events_on_failure=True,
+ startup_timeout_seconds=300,
+
+ # Kubernetes connection parameters (Required for permissions in some environments)
+ config_file="/home/airflow/composer_kube_config",
+ kubernetes_conn_id="kubernetes_default",
+
+ # Attach Ephemeral Storage Volume
+ volumes=[storage_volume],
+ volume_mounts=[storage_volume_mount],
+ )
+
+ detect_image_task >> prepare_script_task >> profile_and_check_linter
+# [END composer_dag_parsing_profiler_dag]
\ No newline at end of file
diff --git a/composer/tools/parsing_profiler/linter_core.py b/composer/tools/parsing_profiler/linter_core.py
new file mode 100644
index 00000000000..3ffda79ab14
--- /dev/null
+++ b/composer/tools/parsing_profiler/linter_core.py
@@ -0,0 +1,293 @@
+#!/usr/bin/env python
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# [START composer_dag_linter_core]
+"""
+Core logic for the Composer DAG Linter & Parsing Profiler.
+
+This script is executed inside an isolated Kubernetes Pod. It handles GCS downloads,
+parallel parsing/profiling of DAG files, and final report generation. Its primary
+goal is to detect parsing latency issues and identify heavy top-level code execution.
+"""
+
+import cProfile
+import importlib.util
+import io
+import json
+import logging
+import os
+import pstats
+import sys
+import time
+from contextlib import redirect_stderr, redirect_stdout
+from multiprocessing import Pool, cpu_count
+
+from airflow.models.dag import DAG
+from google.cloud import storage
+
+
+class Ansi:
+ """ANSI escape codes for colored terminal output."""
+ # pylint: disable=too-few-public-methods
+ BOLD = '\033[1m'
+ RESET = '\033[0m'
+
+
+def download_folder(bucket: storage.Bucket, prefix: str, local_subdir: str, config: dict, report_lines: list[str]) -> int:
+ """Downloads a directory from GCS to a local path, mirroring the structure.
+
+ Args:
+ bucket: The google.cloud.storage Bucket object.
+ prefix: The GCS prefix (folder) to download from.
+ local_subdir: The subdirectory inside BASE_WORK_DIR to save files to.
+ config: Dictionary containing BASE_WORK_DIR configuration.
+ report_lines: A list of strings to append logs to (for final reporting).
+
+ Returns:
+ int: The number of files successfully downloaded.
+ """
+ base_work_dir = config['BASE_WORK_DIR']
+ blobs = bucket.list_blobs(prefix=prefix)
+ target_dir = os.path.join(base_work_dir, local_subdir)
+ os.makedirs(target_dir, exist_ok=True)
+
+ count = 0
+ for blob in blobs:
+ if blob.name.endswith('/'):
+ continue
+
+ # Calculate relative path (e.g., 'plugins/hooks/my_hook.py' -> 'hooks/my_hook.py')
+ rel_path = os.path.relpath(blob.name, prefix)
+ dest_path = os.path.join(target_dir, rel_path)
+
+ os.makedirs(os.path.dirname(dest_path), exist_ok=True)
+ blob.download_to_filename(dest_path)
+ count += 1
+
+ report_lines.append(f"Downloaded {count} files from '{prefix}' to '{target_dir}'")
+ return count
+
+
+def parse_file(filepath: str, config: dict) -> dict:
+ """Parses a single Python file to check for DAG integrity and import performance.
+
+ This function is designed to be run in parallel. It captures stdout/stderr
+ to prevent console interleaving and optionally profiles execution time.
+
+ Args:
+ filepath: The absolute path to the Python file to test.
+ config: Dictionary containing configuration for profiling and logging.
+
+ Returns:
+ dict: A dictionary containing status (SUCCESS/WARNING/ERROR), messages,
+ captured stdout/stderr, and optional profile data.
+ """
+ stdout_buffer = io.StringIO()
+ stderr_buffer = io.StringIO()
+ result = {}
+ profiler = cProfile.Profile()
+
+ # Load configuration (Snake case for local variables to satisfy pylint)
+ profile_slow = config['PROFILE_SLOW']
+ profile_sort_key = config['PROFILE_SORT_KEY']
+ parse_threshold = config['PARSE_TIME_THRESHOLD_SECONDS']
+
+ # Note: Temporarily disable logging to prevent plugins from printing to the console
+ # during import, which would interleave with the linter's report output.
+ previous_log_level = logging.root.manager.disable
+ logging.disable(logging.CRITICAL)
+
+ try:
+ start_time = time.monotonic()
+
+ # Capture standard print() statements
+ with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
+ module_name = f"dag_linter_test.{os.path.basename(filepath).replace('.py', '')}"
+ spec = importlib.util.spec_from_file_location(module_name, filepath)
+ if spec is None:
+ raise ImportError("Could not create module spec.")
+
+ module = importlib.util.module_from_spec(spec)
+
+ # Execute module import inside profiler
+ profiler.runctx(
+ 'spec.loader.exec_module(module)',
+ globals(),
+ {"spec": spec, "module": module}
+ )
+
+ duration = time.monotonic() - start_time
+ captured_stderr = stderr_buffer.getvalue()
+ is_slow = duration > parse_threshold
+
+ # Determine Status
+ if captured_stderr:
+ result = {'status': 'ERROR', 'message': "DAG generated error messages during parsing (see stderr)."}
+ elif is_slow:
+ result = {'status': 'WARNING', 'duration': duration, 'message': f"Slow parse time: {duration:.4f}s."}
+ elif not any(isinstance(var, DAG) for var in vars(module).values()):
+ result = {'status': 'WARNING', 'message': "Parsed, but no DAG objects were found."}
+ else:
+ result = {'status': 'SUCCESS', 'duration': duration, 'message': f"Parsed in {duration:.4f}s."}
+
+ # Attach Profile Data if Slow
+ if is_slow and profile_slow:
+ profile_stream = io.StringIO()
+ stats = pstats.Stats(profiler, stream=profile_stream).sort_stats(profile_sort_key)
+ stats.print_stats(10) # Limit to Top 10 offenders
+ result['profile_output'] = profile_stream.getvalue()
+
+ # pylint: disable=broad-except
+ except Exception as e:
+ result = {'status': 'ERROR', 'message': f"Failed to import file. Full error: {e}"}
+ finally:
+ # Restore logging configuration immediately
+ logging.disable(previous_log_level)
+
+ result['stdout'] = stdout_buffer.getvalue()
+ result['stderr'] = stderr_buffer.getvalue()
+ return result
+
+
+def main(config_json: str):
+ """Main execution flow for the Linter Pod.
+
+ Args:
+ config_json: A JSON string containing all necessary runtime configuration.
+ """
+ config = json.loads(config_json)
+
+ report_lines = ["--- Starting DAG Linter ---"]
+
+ # Extract config variables
+ gcs_bucket = config['GCS_BUCKET']
+ base_work_dir = config['BASE_WORK_DIR']
+ fetch_extras = config['FETCH_EXTRAS']
+
+ dags_source_folder = config['GCS_DAGS_SOURCE_FOLDER']
+ plugins_source_folder = config['GCS_PLUGINS_SOURCE_FOLDER']
+ data_source_folder = config['GCS_DATA_SOURCE_FOLDER']
+
+ # Determine Parallelism based on available CPU resources
+ parallelism = max(cpu_count() - 1, 1)
+
+ # 1. Configure System Paths
+ dags_dir = os.path.join(base_work_dir, "dags")
+ plugins_dir = os.path.join(base_work_dir, "plugins")
+
+ sys.path.append(dags_dir)
+ sys.path.append(plugins_dir)
+
+ # 2. Download Content from GCS
+ try:
+ storage_client = storage.Client()
+ bucket = storage_client.bucket(gcs_bucket)
+
+ # Essential: Download DAGs
+ download_folder(bucket, dags_source_folder, "dags", config, report_lines)
+
+ # Optional: Download supporting folders
+ if fetch_extras:
+ report_lines.append("Flag enabled: Fetching data/ and plugins/ folders...")
+ download_folder(bucket, data_source_folder, "data", config, report_lines)
+ download_folder(bucket, plugins_source_folder, "plugins", config, report_lines)
+
+ # pylint: disable=broad-except
+ except Exception as e:
+ report_lines.append(f"FATAL: GCS Download failed. Error: {e}")
+ print("\n".join(report_lines))
+ return
+
+ # 3. Identify Target Files
+ files_to_process = []
+ if os.path.exists(dags_dir):
+ for root, _, files in os.walk(dags_dir):
+ for file in files:
+ if not file.endswith(".py"):
+ continue
+
+ # Exclude the orchestrator DAG and the Linter Core script
+ # to prevent recursion loops or self-parsing errors.
+ if file in ['dag_linter_kubernetes_pod.py', 'linter_core.py']:
+ continue
+
+ files_to_process.append(os.path.join(root, file))
+
+ if not files_to_process:
+ report_lines.append("WARNING: No DAG files found.")
+ print("\n".join(report_lines))
+ return
+
+ report_lines.append(f"Analyzing {len(files_to_process)} Python files using {parallelism} processes...")
+
+ # 4. Execute Parallel Linting
+ with Pool(processes=parallelism) as pool:
+ results = pool.starmap(parse_file, [(f, config) for f in files_to_process])
+
+ # 5. Generate Final Report
+ report_lines.append("\n" + "#" * 80)
+ report_lines.append("##### Linter Report #####".center(80, ' '))
+ report_lines.append("#" * 80)
+ has_issues = False
+
+ for i, res in enumerate(results):
+ filepath = files_to_process[i]
+ filename = os.path.basename(filepath)
+
+ report_lines.append(f"\n{Ansi.BOLD}########## [START] Processing: {filename} ##########{Ansi.RESET}")
+
+ status = res.get('status', 'ERROR')
+ message = res.get('message', 'Unknown error')
+
+ if status == 'SUCCESS':
+ report_lines.append(f" â
Status: SUCCESS")
+ elif status == 'WARNING':
+ report_lines.append(f" â ī¸ Status: WARNING")
+ has_issues = True
+ elif status == 'ERROR':
+ report_lines.append(f" đ¨ Status: FAILED")
+ has_issues = True
+
+ report_lines.append(f" Details: {message}")
+
+ if res.get('stdout'):
+ report_lines.append(" đ Captured Output (stdout):")
+ for line in res['stdout'].strip().split('\n'):
+ report_lines.append(f" | {line}")
+ if res.get('stderr'):
+ report_lines.append(" đ Captured Errors (stderr):")
+ for line in res['stderr'].strip().split('\n'):
+ report_lines.append(f" | {line}")
+ if res.get('profile_output'):
+ report_lines.append(f" đ Performance Profile (Top 10 slowest calls by {config['PROFILE_SORT_KEY']}):")
+ for line in res['profile_output'].strip().split('\n'):
+ report_lines.append(f" | {line}")
+
+ report_lines.append(f"{Ansi.BOLD}########## [END] Processing: {filename} ##########{Ansi.RESET}")
+
+ report_lines.append("\n" + "#" * 80)
+ if has_issues:
+ report_lines.append("Linter finished and found one or more performance issues.")
+ else:
+ report_lines.append("â
Linting complete. All files parsed successfully.")
+
+ print("\n".join(report_lines))
+
+
+if __name__ == "__main__":
+ # The KubernetesPodOperator passes the JSON configuration as sys.argv[1]
+ if len(sys.argv) > 1:
+ main(sys.argv[1])
+# [END composer_dag_linter_core]
\ No newline at end of file