diff --git a/.github/workflows/main_pr.yml b/.github/workflows/main_pr.yml index a379356a..a73701b9 100644 --- a/.github/workflows/main_pr.yml +++ b/.github/workflows/main_pr.yml @@ -25,7 +25,12 @@ jobs: spark_matrix: ${{ steps.set-matrix-values.outputs.spark_dataproc_matrix }} hive_matrix: ${{ steps.set-matrix-values.outputs.hive_dataproc_matrix }} dbt_matrix: ${{ steps.set-matrix-values.outputs.dbt_matrix }} + execution_time: ${{ steps.get-execution-time.outputs.execution_time }} steps: + - name: Get execution time + id: get-execution-time + run: echo "execution_time=$(date +'%Y%d%m%H%M')" >> $GITHUB_OUTPUT + - name: Checkout code uses: actions/checkout@v4 @@ -175,6 +180,17 @@ jobs: with: fail-for-new-failures: true + notify-maintainers: + needs: + - initialize_workflow + - collect-and-compare-reports + if: ${{ failure() && needs.initialize_workflow.outputs.any_run == 'true' }} + uses: ./.github/workflows/notify_maintainers.yml + with: + workflow-type: 'pr' + trigger-type: 'pr' + execution-time: ${{ needs.initialize_workflow.outputs.execution_time }} + generate-compatibility-tables: needs: - collect-and-compare-reports diff --git a/.github/workflows/notify_maintainers.yml b/.github/workflows/notify_maintainers.yml index fa99e640..81550861 100644 --- a/.github/workflows/notify_maintainers.yml +++ b/.github/workflows/notify_maintainers.yml @@ -10,7 +10,7 @@ on: workflow-type: type: string required: true - description: "type of workflow calling, allowed values: release, spec_change" + description: "type of workflow calling, allowed values: release, spec_change, pr" execution-time: type: string required: true @@ -33,11 +33,23 @@ jobs: id: check-report-empty run: | result=$(jq '. == []' reports/retention-failures-report.json) - echo "report-empty=${result}" >> $GITHUB_OUTPUT + echo "report_empty=${result}" >> $GITHUB_OUTPUT + + + - name: Generate PR summary + id: generate-pr-summary + if: ${{ inputs.workflow-type == 'pr' && steps.check-report-empty.outputs.report_empty == 'false'}} + run: | + python scripts/generate_issue.py \ + --failure_path=reports/retention-failures-report.json \ + --issue_path=generated-files/summary.md \ + --skip-maintainers + + cat generated-files/summary.md >> $GITHUB_STEP_SUMMARY - name: Run task for Collect Reports - id: collect-and-merge-reports - if: ${{ steps.check-report-empty.outputs.report-empty == 'false'}} + id: generate-issue-report + if: ${{ inputs.workflow-type != 'pr' && steps.check-report-empty.outputs.report_empty == 'false'}} run: | python scripts/generate_issue.py \ --failure_path=reports/retention-failures-report.json \ @@ -45,7 +57,7 @@ jobs: - name: Create Issue From File uses: peter-evans/create-issue-from-file@v5 - if: ${{ steps.check-report-empty.outputs.report-empty == 'false'}} + if: ${{ inputs.workflow-type != 'pr' && steps.check-report-empty.outputs.report_empty == 'false'}} with: title: new failures in report from run ${{ github.run_id }} content-filepath: generated-files/issue.md diff --git a/producer/dbt/README.md b/producer/dbt/README.md index 282a8da3..aeea03d7 100644 --- a/producer/dbt/README.md +++ b/producer/dbt/README.md @@ -102,15 +102,13 @@ The GitHub Actions workflow: ### Local Debugging (Optional) -**For development debugging, you may optionally run PostgreSQL locally. The standard test environment is GitHub Actions.** +**For development debugging, local runs should use the same PostgreSQL 15 Docker setup as CI.** If you need to debug event generation locally: -1. **Start PostgreSQL (Optional)**: - ```bash - cd producer/dbt/scenarions/csv_to_postgres/test - docker compose up - ``` +1. **Ensure Docker is running**: + - The scenario runner uses `producer/dbt/scenarios/csv_to_postgres/test/compose.yml`. + - If PostgreSQL is not already available on `localhost:5432`, the scenario script starts the local Docker Compose service automatically and waits until it is ready. 2. **Install Python Dependencies**: ```bash @@ -120,8 +118,12 @@ If you need to debug event generation locally: ``` 3. **Run Test Scenario**: + - The example below assumes you run the command from the repository root, so relative paths such as `./producer/dbt/output` and `./dbt_producer_report.json` resolve from that location. ```bash - ./producer/dbt/run_dbt_tests.sh --openlineage-directory + ./producer/dbt/run_dbt_tests.sh \ + --openlineage-directory \ + --producer-output-events-dir ./producer/dbt/output \ + --openlineage-release 1.45.0 ``` 4. **Inspect Generated Events**: @@ -130,10 +132,10 @@ If you need to debug event generation locally: cat ./producer/dbt/output/csv_to_postgres/event-{id}.json | jq '.' # check report - cat ./producer/dbt/dbt_producer_report.json | jq '.' + cat ./dbt_producer_report.json | jq '.' ``` -**Note**: Local debugging is entirely optional. All official validation happens in GitHub Actions with PostgreSQL service containers. The test runner (`test/run.sh`) is the same code used by CI/CD, ensuring consistency. +**Note**: Local debugging is entirely optional. All official validation happens in GitHub Actions with PostgreSQL service containers. Local runs now reuse the same PostgreSQL 15 image and readiness check as CI to reduce drift between local debugging and workflow execution. ## Important dbt Integration Notes diff --git a/producer/dbt/run_dbt_tests.sh b/producer/dbt/run_dbt_tests.sh old mode 100644 new mode 100755 index 4069afb9..e803882e --- a/producer/dbt/run_dbt_tests.sh +++ b/producer/dbt/run_dbt_tests.sh @@ -1,5 +1,26 @@ #!/bin/bash +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DBT_DIR="$SCRIPT_DIR" +REPO_ROOT="$(cd "$DBT_DIR/../.." && pwd)" +SCENARIOS_DIR="$DBT_DIR/scenarios" +RUNNER_REQUIREMENTS="$DBT_DIR/runner/requirements.txt" +SPECS_BASE_DIR="$DBT_DIR/specs" +SCRIPTS_DIR="$REPO_ROOT/scripts" + +resolve_path() { + local input_path="$1" + local base_dir="$2" + if [[ "$input_path" == ~* ]]; then + input_path="${input_path/#\~/$HOME}" + fi + if [[ "$input_path" = /* ]]; then + echo "$input_path" + else + echo "$base_dir/$input_path" + fi +} + ################################################################################ ############ dbt Producer Compatibility Test Execution Script ################ ################################################################################ @@ -10,13 +31,13 @@ usage() { echo "" echo "Options:" echo " --openlineage-directory PATH Path to openlineage repository directory (required)" - echo " --producer-output-events-dir PATH Path to producer output events directory (default: output)" - echo " --openlineage-release VERSION OpenLineage release version (default: 2-0-2)" - echo " --report-path PATH Path to report directory (default: ../dbt_producer_report.json)" + echo " --producer-output-events-dir PATH Path to producer output events directory (default: /output)" + echo " --openlineage-release VERSION OpenLineage release version (default: 1.40.1)" + echo " --report-path PATH Path to report file (default: /dbt_producer_report.json)" echo " -h, --help Show this help message and exit" echo "" echo "Example:" - echo " $0 --openlineage-directory /path/to/specs --producer-output-events-dir output --openlineage-release 2-0-2" + echo " $0 --openlineage-directory /path/to/OpenLineage --producer-output-events-dir /tmp/dbt-output --openlineage-release 1.45.0" exit 0 } @@ -24,9 +45,9 @@ usage() { OPENLINEAGE_DIRECTORY="" # Variables with default values -PRODUCER_OUTPUT_EVENTS_DIR=output +PRODUCER_OUTPUT_EVENTS_DIR="$DBT_DIR/output" OPENLINEAGE_RELEASE=1.40.1 -REPORT_PATH="./dbt_producer_report.json" +REPORT_PATH="$REPO_ROOT/dbt_producer_report.json" # If -h or --help is passed, print usage and exit if [[ "$1" == "-h" || "$1" == "--help" ]]; then @@ -45,6 +66,10 @@ while [[ "$#" -gt 0 ]]; do shift done +OPENLINEAGE_DIRECTORY="$(resolve_path "$OPENLINEAGE_DIRECTORY" "$PWD")" +PRODUCER_OUTPUT_EVENTS_DIR="$(resolve_path "$PRODUCER_OUTPUT_EVENTS_DIR" "$PWD")" +REPORT_PATH="$(resolve_path "$REPORT_PATH" "$PWD")" + # Check required arguments if [[ -z "$OPENLINEAGE_DIRECTORY" ]]; then echo "Error: Missing required arguments." @@ -52,7 +77,8 @@ if [[ -z "$OPENLINEAGE_DIRECTORY" ]]; then fi # fail if scenarios are not defined in scenario directory -[[ $(find scenarios | wc -l) -gt 0 ]] || { echo >&2 "NO SCENARIOS DEFINED IN scenarios"; exit 1; } +[[ -d "$SCENARIOS_DIR" ]] || { echo >&2 "Error: scenarios directory not found at $SCENARIOS_DIR"; exit 1; } +[[ $(find "$SCENARIOS_DIR" | wc -l) -gt 0 ]] || { echo >&2 "NO SCENARIOS DEFINED IN $SCENARIOS_DIR"; exit 1; } mkdir -p "$PRODUCER_OUTPUT_EVENTS_DIR" @@ -72,16 +98,16 @@ echo "========================================================================== ################################################################################ # Check if scenario directory exists -if [[ ! -d "scenarios" ]]; then - echo "Error: scenarios directory not found" +if [[ ! -d "$SCENARIOS_DIR" ]]; then + echo "Error: scenarios directory not found at $SCENARIOS_DIR" exit 1 fi #install python dependencies python -m pip install --upgrade pip -if [ -f ./runner/requirements.txt ]; then - pip install -r ./runner/requirements.txt +if [ -f "$RUNNER_REQUIREMENTS" ]; then + pip install -r "$RUNNER_REQUIREMENTS" fi ################################################################################ @@ -91,18 +117,16 @@ fi ################################################################################ echo "Running dbt producer tests..." -POSIX_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -BASE_DIR="$(cygpath -m "$POSIX_DIR")" # Run tests for each scenario echo "Discovering test scenarios..." -for scenario_dir in scenarios/*/; do +for scenario_dir in "$SCENARIOS_DIR"/*/; do if [[ -d "$scenario_dir" && -f "${scenario_dir}config.json" ]]; then SCENARIO_NAME=$(basename "$scenario_dir") echo "Found scenario: $SCENARIO_NAME" mkdir -p "$PRODUCER_OUTPUT_EVENTS_DIR/$SCENARIO_NAME" - "$scenario_dir"test/run.sh "$BASE_DIR/$PRODUCER_OUTPUT_EVENTS_DIR/$SCENARIO_NAME" + "$scenario_dir"test/run.sh "$PRODUCER_OUTPUT_EVENTS_DIR/$SCENARIO_NAME" echo "Scenario $SCENARIO_NAME completed" fi @@ -114,16 +138,19 @@ echo "EVENT VALIDATION FOR SPEC VERSION $OPENLINEAGE_RELEASE" REPORT_DIR=$(dirname "$REPORT_PATH") mkdir -p "$REPORT_DIR" -SPECS_BASE_DIR="./specs" DEST_DIR="$SPECS_BASE_DIR/$OPENLINEAGE_RELEASE" mkdir -p "$DEST_DIR" -if [ -d "$OPENLINEAGE_DIRECTORY"/spec ]; then - find "$OPENLINEAGE_DIRECTORY"/spec -type f \( -name '*Facet.json' -o -name 'OpenLineage.json' \) -exec cp -t "$DEST_DIR" {} + +if [ -d "$OPENLINEAGE_DIRECTORY/spec" ]; then + while IFS= read -r spec_file; do + cp "$spec_file" "$DEST_DIR/" + done < <(find "$OPENLINEAGE_DIRECTORY/spec" -type f \( -name '*Facet.json' -o -name 'OpenLineage.json' \)) fi -if [ -d "$OPENLINEAGE_DIRECTORY"/integration/common/src/openlineage ]; then - find "$OPENLINEAGE_DIRECTORY"/integration/common/src/openlineage -type f -iname '*facet.json' -exec cp -t "$DEST_DIR" {} + +if [ -d "$OPENLINEAGE_DIRECTORY/integration/common/src/openlineage" ]; then + while IFS= read -r spec_file; do + cp "$spec_file" "$DEST_DIR/" + done < <(find "$OPENLINEAGE_DIRECTORY/integration/common/src/openlineage" -type f -iname '*facet.json') fi if [ -z "$(ls -A "$DEST_DIR")" ]; then @@ -131,15 +158,15 @@ if [ -z "$(ls -A "$DEST_DIR")" ]; then exit 1 fi -pip install -r ../../scripts/requirements.txt +pip install -r "$SCRIPTS_DIR/requirements.txt" -python ../../scripts/validate_ol_events.py \ +python "$SCRIPTS_DIR/validate_ol_events.py" \ --event_base_dir="$PRODUCER_OUTPUT_EVENTS_DIR" \ --spec_base_dir="$SPECS_BASE_DIR" \ --target="$REPORT_PATH" \ --component="dbt" \ --component_version="1.8.0" \ ---producer_dir=.. \ +--producer_dir="$REPO_ROOT/producer" \ --openlineage_version="$OPENLINEAGE_RELEASE" echo "EVENT VALIDATION FINISHED" diff --git a/producer/dbt/scenarios/csv_to_postgres/test/compose.yml b/producer/dbt/scenarios/csv_to_postgres/test/compose.yml index b9eddf99..56238f51 100644 --- a/producer/dbt/scenarios/csv_to_postgres/test/compose.yml +++ b/producer/dbt/scenarios/csv_to_postgres/test/compose.yml @@ -1,11 +1,9 @@ -version: "3.9" name: csv_to_postgres services: postgres: image: postgres:15-alpine - container_name: postgres15 restart: always environment: POSTGRES_USER: testuser @@ -13,6 +11,11 @@ services: POSTGRES_DB: dbt_test ports: - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U testuser -d dbt_test"] + interval: 10s + timeout: 5s + retries: 5 volumes: - postgres_data:/var/lib/postgresql/data diff --git a/producer/dbt/scenarios/csv_to_postgres/test/run.sh b/producer/dbt/scenarios/csv_to_postgres/test/run.sh old mode 100644 new mode 100755 index acb9dbd3..ec8b9229 --- a/producer/dbt/scenarios/csv_to_postgres/test/run.sh +++ b/producer/dbt/scenarios/csv_to_postgres/test/run.sh @@ -1,25 +1,99 @@ #!/bin/bash -PRODUCER_OUTPUT_EVENTS_DIR=$1 +set -euo pipefail -if [ -d "$PRODUCER_OUTPUT_EVENTS_DIR" ]; then - cd "$(dirname "${BASH_SOURCE[0]}")" || exit +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RUNNER_DIR="$(cd "$SCRIPT_DIR/../../../runner" && pwd)" +COMPOSE_FILE="$SCRIPT_DIR/compose.yml" +PRODUCER_OUTPUT_EVENTS_DIR="${1:-}" - cat < openlineage.yml +DBT_POSTGRES_HOST="${DBT_POSTGRES_HOST:-localhost}" +DBT_POSTGRES_PORT="${DBT_POSTGRES_PORT:-5432}" +DBT_POSTGRES_USER="${DBT_POSTGRES_USER:-testuser}" +DBT_POSTGRES_DB="${DBT_POSTGRES_DB:-dbt_test}" + +resolve_compose_command() { + if docker compose version >/dev/null 2>&1; then + echo "docker compose" + elif command -v docker-compose >/dev/null 2>&1; then + echo "docker-compose" + else + return 1 + fi +} + +postgres_is_ready() { + python - "$DBT_POSTGRES_HOST" "$DBT_POSTGRES_PORT" <<'PY' +import socket +import sys + +host = sys.argv[1] +port = int(sys.argv[2]) + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(1) + sys.exit(0 if sock.connect_ex((host, port)) == 0 else 1) +PY +} + +ensure_local_postgres() { + if postgres_is_ready; then + echo "Using existing Postgres instance at ${DBT_POSTGRES_HOST}:${DBT_POSTGRES_PORT}" + return 0 + fi + + local compose_command + compose_command="$(resolve_compose_command)" || { + echo "Postgres is not reachable at ${DBT_POSTGRES_HOST}:${DBT_POSTGRES_PORT} and Docker Compose is unavailable" >&2 + return 1 + } + + echo "Starting local Postgres container with Docker Compose" + $compose_command -f "$COMPOSE_FILE" up -d postgres + + for attempt in $(seq 1 30); do + if $compose_command -f "$COMPOSE_FILE" exec -T postgres pg_isready -U "$DBT_POSTGRES_USER" -d "$DBT_POSTGRES_DB" >/dev/null 2>&1; then + echo "Postgres is ready" + return 0 + fi + sleep 2 + done + + echo "Timed out waiting for Postgres to become ready" >&2 + return 1 +} + +if [[ -z "$PRODUCER_OUTPUT_EVENTS_DIR" ]]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +if [[ ! -d "$PRODUCER_OUTPUT_EVENTS_DIR" ]]; then + echo "Output events directory '${PRODUCER_OUTPUT_EVENTS_DIR}' does not exist" >&2 + exit 1 +fi + +ensure_local_postgres + +cd "$SCRIPT_DIR" + +cat < openlineage.yml transport: type: file log_file_path: "${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" append: true EOF - dbt-ol seed --project-dir="../../../runner" --profiles-dir="../../../runner" --target=postgres --no-version-check - dbt-ol run --project-dir="../../../runner" --profiles-dir="../../../runner" --target=postgres --no-version-check +dbt-ol seed --project-dir="$RUNNER_DIR" --profiles-dir="$RUNNER_DIR" --target=postgres --no-version-check +dbt-ol run --project-dir="$RUNNER_DIR" --profiles-dir="$RUNNER_DIR" --target=postgres --no-version-check - jq -c '.' "${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" | nl -w1 -s' ' | while read -r i line; do - echo "$line" | jq '.' > "${PRODUCER_OUTPUT_EVENTS_DIR}/event-$i.json" - done - rm "${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" +EVENTS_FILE="${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" +if [[ ! -f "$EVENTS_FILE" ]]; then + echo "Expected OpenLineage events file was not created: $EVENTS_FILE" >&2 + exit 1 +fi -else - echo "Output events directory '${PRODUCER_OUTPUT_EVENTS_DIR}' does not exist" -fi \ No newline at end of file +jq -c '.' "$EVENTS_FILE" | nl -w1 -s' ' | while read -r i line; do + echo "$line" | jq '.' > "${PRODUCER_OUTPUT_EVENTS_DIR}/event-$i.json" +done +rm "$EVENTS_FILE" diff --git a/scripts/generate_issue.py b/scripts/generate_issue.py index 24f88ec9..b6f31fce 100644 --- a/scripts/generate_issue.py +++ b/scripts/generate_issue.py @@ -3,21 +3,23 @@ from os.path import isfile, join -def get_failures(components): +def get_failures(components, skip_maintainers=False): result = '' for component in components: result += f"## Component: {component['name']}\n" - component_path = get_component_path(component) - consumer_github_maintainers = [maintainer.get('github-name') for maintainer in json.load( - open(join(component_path, 'maintainers.json'), 'r')) - if maintainer.get('github-name') is not None] - result += "Maintainers for component: " + ' '.join([f"@{cgm}" for cgm in consumer_github_maintainers]) + ' \n' + if not skip_maintainers: + component_path = get_component_path(component) + consumer_github_maintainers = [maintainer.get('github-name') for maintainer in json.load( + open(join(component_path, 'maintainers.json'), 'r')) + if maintainer.get('github-name') is not None] + result += "Maintainers for component: " + ' '.join([f"@{cgm}" for cgm in consumer_github_maintainers]) + ' \n' for failed in component['scenarios']: result += f"### Scenario: {failed['name']}\n" - scenario_github_maintainers = [maintainer.get('github-name') for maintainer in - json.load(open(get_scenario_path(component_path, failed), 'r')) if maintainer.get('github-name') is not None] - result += "Maintainers for scenario: " + ' '.join( - [f"@{cgm}" for cgm in scenario_github_maintainers if cgm not in consumer_github_maintainers]) + ' \n' + if not skip_maintainers: + scenario_github_maintainers = [maintainer.get('github-name') for maintainer in + json.load(open(get_scenario_path(component_path, failed), 'r')) if maintainer.get('github-name') is not None] + result += "Maintainers for scenario: " + ' '.join( + [f"@{cgm}" for cgm in scenario_github_maintainers if cgm not in consumer_github_maintainers]) + ' \n' # result += f"failures in: \n" result += '\n'.join( [f"name: {t['name']}, \nvalidation_type: {t['validation_type']} \nentity_type: {t['entity_type']} \ndetails: \n\t" + @@ -41,16 +43,17 @@ def get_arguments(): parser = argparse.ArgumentParser(description="") parser.add_argument('--failure_path', type=str, help="directory containing the failures file") parser.add_argument('--issue_path', type=str, help="target directory") + parser.add_argument('--skip-maintainers', action='store_true', help="omit maintainer lines from the output") args = parser.parse_args() - return args.failure_path, args.issue_path + return args.failure_path, args.issue_path, args.skip_maintainers def main(): issue = '# Failures in automatic tests\n' - failure_path, issue_path = get_arguments() + failure_path, issue_path, skip_maintainers = get_arguments() components = json.load(open(failure_path, 'r')) - issue += get_failures(components) + issue += get_failures(components, skip_maintainers=skip_maintainers) t = open(issue_path, 'w') t.write(issue) t.close()