Skip to content

Pipeline Orchestrator #3852

Pipeline Orchestrator

Pipeline Orchestrator #3852

# Pipeline Orchestrator v3
#
# Automates PR lifecycle management for agent-created (aw-labeled) PRs:
# - Dispatches implementer for unworked issues
# - Dispatches ci-fixer when CI fails
# - Dispatches responder when review threads need addressing
# - Resolves addressed review threads
# - Rebases PRs behind main
# - Labels stuck PRs for human intervention
#
# Related issues:
# #135 — Pipeline orchestrator (main tracking issue)
# #148 — Responder bug (safe output context)
name: "Pipeline Orchestrator"
on:
# Event-driven triggers (immediate response)
workflow_run:
workflows: ["Review Responder", "CI Fixer", "CI", "Quality Gate", "Copilot code review"]
types: [completed]
pull_request_review:
types: [submitted]
# Manual trigger for testing
workflow_dispatch:
inputs:
pr_number:
description: "PR number to process (optional — defaults to all aw PRs)"
required: false
type: string
schedule:
- cron: "0,30 * * * *"
concurrency:
group: pipeline-orchestrator
cancel-in-progress: false
permissions:
contents: write
pull-requests: write
issues: write
actions: write
jobs:
orchestrate:
runs-on: ubuntu-latest
# Skip pull_request_review unless PR has aw label
if: >-
github.event_name == 'workflow_dispatch' ||
github.event_name == 'schedule' ||
(github.event_name == 'workflow_run' && github.event.workflow_run.conclusion != 'cancelled') ||
(github.event_name == 'pull_request_review' && contains(github.event.pull_request.labels.*.name, 'aw'))
env:
GH_TOKEN: ${{ secrets.GH_AW_WRITE_TOKEN }}
steps:
- name: Checkout repository
uses: actions/checkout@v6
with:
fetch-depth: 0
token: ${{ secrets.GH_AW_WRITE_TOKEN }}
- name: Find aw-labeled PRs
id: find-prs
env:
EVENT_NAME: ${{ github.event_name }}
PR_NUMBER_INPUT: ${{ inputs.pr_number }}
run: |
set -euo pipefail
OWNER="${GITHUB_REPOSITORY_OWNER}"
REPO="${GITHUB_REPOSITORY#*/}"
if [[ "$EVENT_NAME" == "workflow_dispatch" && -n "$PR_NUMBER_INPUT" ]]; then
PR_NUMBERS="$PR_NUMBER_INPUT"
else
# Find open aw PRs, excluding stuck ones
PR_NUMBERS=$(gh api graphql -f query='
query($owner: String!, $repo: String!) {
repository(owner: $owner, name: $repo) {
pullRequests(labels: ["aw"], states: OPEN, first: 10) {
nodes {
number
labels(first: 100) { nodes { name } }
}
}
}
}' -f owner="$OWNER" -f repo="$REPO" \
--jq '[.data.repository.pullRequests.nodes[] | select([.labels.nodes[].name] | any(startswith("aw-pr-stuck:")) | not) | .number] | .[]')
fi
if [[ -z "$PR_NUMBERS" ]]; then
echo "No active aw-labeled PRs found."
echo "has_prs=false" >> "$GITHUB_OUTPUT"
else
echo "has_prs=true" >> "$GITHUB_OUTPUT"
echo "pr_numbers=$(echo $PR_NUMBERS | tr '\n' ' ')" >> "$GITHUB_OUTPUT"
echo "Found active PRs: $PR_NUMBERS"
fi
- name: Dispatch implementer for unworked issues
if: steps.find-prs.outputs.has_prs == 'false'
run: |
set -euo pipefail
OWNER="${GITHUB_REPOSITORY_OWNER}"
REPO="${GITHUB_REPOSITORY#*/}"
# Skip if an implementer is already running
IMPL_RUNNING=$(gh run list --workflow=issue-implementer.lock.yml --json databaseId,status --jq '[.[] | select(.status == "in_progress" or .status == "queued" or .status == "waiting")] | length' 2>/dev/null || echo "1")
if [[ "$IMPL_RUNNING" -gt 0 ]]; then
echo "Implementer already in flight ($IMPL_RUNNING run(s)). Skipping dispatch."
exit 0
fi
# Find oldest aw-labeled issue without aw-dispatched, agentic-workflows, or aw-protected-files
ISSUE=$(gh api graphql -f query='
query($owner: String!, $repo: String!) {
repository(owner: $owner, name: $repo) {
issues(labels: ["aw"], states: OPEN, first: 20, orderBy: {field: CREATED_AT, direction: ASC}) {
nodes {
number
title
labels(first: 100) { nodes { name } }
}
}
}
}' -f owner="$OWNER" -f repo="$REPO" \
--jq '[.data.repository.issues.nodes[] | select(
([.labels.nodes[].name] | any(. == "aw-dispatched") | not)
and ([.labels.nodes[].name] | any(. == "agentic-workflows") | not)
and ([.labels.nodes[].name] | any(. == "aw-protected-files") | not)
)] | .[0]')
if [[ "$ISSUE" == "null" || -z "$ISSUE" ]]; then
echo "No eligible issues to dispatch. Pipeline idle."
exit 0
fi
ISSUE_NUM=$(echo "$ISSUE" | jq -r '.number')
ISSUE_TITLE=$(echo "$ISSUE" | jq -r '.title')
echo "Dispatching implementer for issue #${ISSUE_NUM}: ${ISSUE_TITLE}"
gh issue edit "$ISSUE_NUM" --add-label "aw-dispatched"
gh workflow run issue-implementer.lock.yml -f issue_number="$ISSUE_NUM"
echo "✅ Dispatched implementer for issue #${ISSUE_NUM}"
- name: Check CI and dispatch fixer or responder
if: steps.find-prs.outputs.has_prs == 'true'
env:
PR_NUMBERS: ${{ steps.find-prs.outputs.pr_numbers }}
run: |
set -uo pipefail
OWNER="${GITHUB_REPOSITORY_OWNER}"
REPO="${GITHUB_REPOSITORY#*/}"
for PR in $PR_NUMBERS; do
echo "::group::Checking CI and reviews for PR #${PR}"
# Query PR state: CI status, labels, review threads, reviews
PR_DATA=$(gh api graphql -f query='
query($owner: String!, $repo: String!, $pr: Int!) {
repository(owner: $owner, name: $repo) {
pullRequest(number: $pr) {
headRefName
headRefOid
mergeStateStatus
labels(first: 100) { nodes { name } }
commits(last: 1) { nodes { commit {
committedDate
statusCheckRollup { state contexts(first: 30) { nodes {
... on CheckRun { name conclusion status }
}}}
}}}
reviewThreads(first: 100) { nodes {
id isResolved
comments(last: 1) { nodes { author { login } } }
}}
latestReviews(first: 10) { nodes {
author { login }
state
commit { oid }
}}
}
}
}' -f owner="$OWNER" -f repo="$REPO" -F pr="$PR") || {
echo " ⚠️ PR #${PR}: Failed to query. Skipping."
echo "::endgroup::"
continue
}
LABELS=$(echo "$PR_DATA" | jq -r '[.data.repository.pullRequest.labels.nodes[].name]')
BRANCH=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.headRefName')
HEAD_SHA=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.headRefOid')
# Clean up stale quality-gate-approved label if approval is no longer valid
HAS_QG_LABEL=$(echo "$LABELS" | jq -r 'if any(. == "aw-quality-gate-approved") then "yes" else "" end')
if [[ -n "$HAS_QG_LABEL" ]]; then
HAS_VALID_APPROVAL=$(echo "$PR_DATA" | jq -r --arg sha "$HEAD_SHA" '
[.data.repository.pullRequest.latestReviews.nodes[]
| select(.author.login == "${{ github.repository_owner }}")
| select(.state == "APPROVED")
| select(.commit.oid == $sha)]
| length > 0')
if [[ "$HAS_VALID_APPROVAL" != "true" ]]; then
echo " 🏷️ PR #${PR}: Removing stale aw-quality-gate-approved label."
gh api repos/"$OWNER"/"$REPO"/issues/"$PR"/labels/aw-quality-gate-approved -X DELETE --silent 2>/dev/null || true
fi
fi
# Check CI status — look for the "check" job specifically
CI_CONCLUSION=$(echo "$PR_DATA" | jq -r '
.data.repository.pullRequest.commits.nodes[0].commit.statusCheckRollup.contexts.nodes[]
| select(.name == "check")
| .conclusion // .status' 2>/dev/null || echo "NO_CI_RESULT")
[ -z "$CI_CONCLUSION" ] && CI_CONCLUSION="NO_CI_RESULT"
HAS_CI_FIX_LABEL=$(echo "$LABELS" | jq -r 'if any(. == "aw-ci-fix-attempted") then "yes" else "" end')
HAS_RESPONSE_LABEL=$(echo "$LABELS" | jq -r 'if any(. == "aw-review-response-attempted") then "yes" else "" end')
echo " Branch: $BRANCH | CI: $CI_CONCLUSION"
# --- Step 1: CI failing ---
if [[ "$CI_CONCLUSION" == "FAILURE" || "$CI_CONCLUSION" == "failure" ]]; then
if [[ -n "$HAS_CI_FIX_LABEL" ]]; then
echo " ❌ PR #${PR}: CI still failing after ci-fixer. Marking stuck."
gh pr edit "$PR" --add-label "aw-pr-stuck:ci"
gh pr comment "$PR" --body "❌ Pipeline orchestrator: CI still failing after ci-fixer attempt. Marking as stuck for human review."
else
echo " 🔧 PR #${PR}: CI failing. Dispatching ci-fixer."
gh workflow run ci-fixer.lock.yml -f pr_number="$PR"
fi
echo "::endgroup::"
continue
fi
# --- Step 2: CI still running or unknown ---
if [[ "$CI_CONCLUSION" != "SUCCESS" && "$CI_CONCLUSION" != "success" && "$CI_CONCLUSION" != "NEUTRAL" && "$CI_CONCLUSION" != "neutral" ]]; then
# If CI has no result, attempt to re-trigger via workflow_dispatch
if [[ "$CI_CONCLUSION" == "NO_CI_RESULT" ]]; then
COMMIT_DATE=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.commits.nodes[0].commit.committedDate // empty')
if [[ -n "$COMMIT_DATE" ]]; then
COMMIT_EPOCH=$(date -d "$COMMIT_DATE" +%s 2>/dev/null || echo "0")
if [[ "$COMMIT_EPOCH" -eq 0 ]]; then
echo " ⚠️ PR #${PR}: Could not parse commit date. Skipping re-trigger."
echo "::endgroup::"
continue
fi
NOW_EPOCH=$(date +%s)
AGE_MINUTES=$(( (NOW_EPOCH - COMMIT_EPOCH) / 60 ))
if [[ "$AGE_MINUTES" -ge 5 ]]; then
# Count re-triggers for the current head commit specifically
RETRIGGER_COUNT=$(gh run list --workflow=ci.yml --event workflow_dispatch --branch="$BRANCH" \
--json headSha --jq "[.[] | select(.headSha == \"$HEAD_SHA\")] | length" 2>/dev/null || echo "0")
if [[ "$RETRIGGER_COUNT" -ge 2 ]]; then
echo " ❌ PR #${PR}: No CI result after $RETRIGGER_COUNT re-trigger attempts. Marking stuck."
gh pr edit "$PR" --add-label "aw-pr-stuck:ci"
gh pr comment "$PR" --body "❌ Pipeline orchestrator: No CI result after $RETRIGGER_COUNT re-trigger attempts. Marking as stuck for human review."
echo "::endgroup::"
continue
fi
# Check cooldown — skip if we already dispatched within last 30 min
RECENT_CI=$(gh run list --workflow=ci.yml --event workflow_dispatch --branch="$BRANCH" \
--limit 1 --json createdAt --jq '
[.[] | select((now - (.createdAt | fromdateiso8601)) < 1800)] | length' 2>/dev/null || echo "0")
if [[ "$RECENT_CI" -eq 0 ]]; then
echo " 🔄 PR #${PR}: No CI result, commit is ${AGE_MINUTES}m old. Re-triggering CI (attempt $((RETRIGGER_COUNT + 1))/2)."
gh workflow run ci.yml --ref "$BRANCH"
else
echo " ⏳ PR #${PR}: No CI result — already re-triggered recently. Waiting."
fi
echo "::endgroup::"
continue
fi
fi
fi
echo " ⏳ PR #${PR}: CI status is $CI_CONCLUSION. Waiting."
echo "::endgroup::"
continue
fi
# --- Step 3: CI passed — check review threads ---
THREADS=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.reviewThreads.nodes')
UNRESOLVED_COUNT=$(echo "$THREADS" | jq -r '[.[] | select(.isResolved == false)] | length')
ADDRESSED_COUNT=$(echo "$THREADS" | jq -r '[.[] | select(
.isResolved == false
and (.comments.nodes | length > 0)
and (.comments.nodes[-1].author?.login // "" | . != "copilot-pull-request-reviewer")
)] | length')
UNADDRESSED_COUNT=$(echo "$THREADS" | jq -r '[.[] | select(
.isResolved == false
and (.comments.nodes | length > 0)
and (.comments.nodes[-1].author?.login // "" | . == "copilot-pull-request-reviewer")
)] | length')
echo " Threads: $UNRESOLVED_COUNT unresolved ($ADDRESSED_COUNT addressed, $UNADDRESSED_COUNT unaddressed)"
# Resolve addressed threads
if [[ "$ADDRESSED_COUNT" -gt 0 ]]; then
RESOLVABLE=$(echo "$THREADS" | jq -r '[.[] | select(
.isResolved == false
and (.comments.nodes | length > 0)
and (.comments.nodes[-1].author?.login // "" | . != "copilot-pull-request-reviewer")
)] | .[].id')
for THREAD_ID in $RESOLVABLE; do
echo " Resolving thread: ${THREAD_ID}"
gh api graphql -f query='
mutation($threadId: ID!) {
resolveReviewThread(input: {threadId: $threadId}) {
thread { isResolved }
}
}' -f threadId="$THREAD_ID" \
--jq '.data.resolveReviewThread.thread.isResolved' || {
echo " ⚠️ Failed to resolve thread ${THREAD_ID}"
}
done
echo " ✅ Resolved $ADDRESSED_COUNT thread(s)."
fi
# Dispatch responder for unaddressed threads
if [[ "$UNADDRESSED_COUNT" -gt 0 ]]; then
# Check if a responder is already running
RESP_RUNNING=$(gh run list --workflow=review-responder.lock.yml --json databaseId,status 2>/dev/null | jq -r '[.[] | select(.status == "in_progress" or .status == "queued" or .status == "waiting")] | length' 2>/dev/null || echo "1")
if [[ "$RESP_RUNNING" -gt 0 ]]; then
echo " ⏳ PR #${PR}: Responder already in flight. Skipping dispatch."
echo "::endgroup::"
continue
fi
if [[ -n "$HAS_RESPONSE_LABEL" ]]; then
# Count review-response-N labels for round tracking
ROUND=$(echo "$LABELS" | jq -r '[.[] | select(test("^aw-review-response-[0-9]+$"))] | length')
if [[ "$ROUND" -ge 3 ]]; then
echo " ❌ PR #${PR}: Responder hit 3 rounds. Marking stuck."
gh pr edit "$PR" --add-label "aw-pr-stuck:review"
gh pr comment "$PR" --body "❌ Pipeline orchestrator: review-response loop reached 3 rounds. Marking as stuck for human review."
else
NEXT_ROUND=$((ROUND + 1))
echo " 🔄 PR #${PR}: Removing response label, enabling round ${NEXT_ROUND}."
gh pr edit "$PR" --remove-label "aw-review-response-attempted"
gh pr edit "$PR" --add-label "aw-review-response-${NEXT_ROUND}"
echo " 📤 Dispatching responder for PR #${PR}."
gh workflow run review-responder.lock.yml -f pr_number="$PR"
fi
else
echo " 📤 PR #${PR}: $UNADDRESSED_COUNT unaddressed threads. Dispatching responder."
gh workflow run review-responder.lock.yml -f pr_number="$PR"
fi
echo "::endgroup::"
continue
fi
echo " ✅ PR #${PR}: All threads resolved, CI passing."
# --- Step 3.5: Skip quality gate if PR needs rebase ---
MERGE_STATE=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.mergeStateStatus')
if [[ "$MERGE_STATE" == "BEHIND" || "$MERGE_STATE" == "DIRTY" ]]; then
echo " ⏳ PR #${PR}: PR is $MERGE_STATE — rebase needed before quality gate. Skipping."
echo "::endgroup::"
continue
fi
# --- Step 4: Dispatch quality gate if ready ---
# Check 1: Has Copilot reviewed the current head commit?
HAS_COPILOT_REVIEW=$(echo "$PR_DATA" | jq -r --arg sha "$HEAD_SHA" '
[.data.repository.pullRequest.latestReviews.nodes[]
| select(.author.login == "copilot-pull-request-reviewer")
| select(.commit.oid == $sha)]
| length > 0')
if [[ "$HAS_COPILOT_REVIEW" != "true" ]]; then
echo " ⏳ PR #${PR}: No Copilot review on current commit. Requesting review."
if ! gh pr edit "$PR" --add-reviewer "@copilot" 2>&1; then
echo " ⚠️ PR #${PR}: Failed to request Copilot review. Will retry next cycle."
fi
echo "::endgroup::"
continue
fi
# Check 2: Is a Copilot code review currently in progress for this commit?
COPILOT_REVIEW_RUNNING=$(gh run list --workflow="Copilot code review" --json headSha,status 2>/dev/null | jq -r --arg sha "$HEAD_SHA" '[.[] | select(.headSha == $sha) | select(.status == "in_progress" or .status == "queued")] | length' 2>/dev/null || echo "1")
if [[ "$COPILOT_REVIEW_RUNNING" -gt 0 ]]; then
echo " ⏳ PR #${PR}: Copilot review in progress. Skipping quality gate."
echo "::endgroup::"
continue
fi
# Check 3: Is there a valid (non-dismissed) approval on the current commit?
HAS_VALID_APPROVAL=$(echo "$PR_DATA" | jq -r --arg sha "$HEAD_SHA" '
[.data.repository.pullRequest.latestReviews.nodes[]
| select(.author.login == "${{ github.repository_owner }}")
| select(.state == "APPROVED")
| select(.commit.oid == $sha)]
| length > 0')
HAS_QG_EVALUATED=$(echo "$LABELS" | jq -r 'if any(. == "aw-quality-gate-evaluated") then "yes" else "" end')
if [[ "$HAS_VALID_APPROVAL" == "true" ]]; then
echo " ✅ PR #${PR}: Valid quality-gate approval on current commit. Auto-merge should handle it."
elif [[ -n "$HAS_QG_EVALUATED" ]]; then
echo " ⏸️ PR #${PR}: Quality gate already evaluated (needs human review). Skipping."
else
# Check 4: Is a quality gate already running?
QG_RUNNING=$(gh run list --workflow=quality-gate.lock.yml --json databaseId,status 2>/dev/null | jq -r '[.[] | select(.status == "in_progress" or .status == "queued" or .status == "waiting")] | length' 2>/dev/null || echo "1")
if [[ "$QG_RUNNING" -gt 0 ]]; then
echo " ⏳ PR #${PR}: Quality gate already in flight. Skipping dispatch."
else
echo " 🔍 PR #${PR}: Dispatching quality gate."
gh workflow run quality-gate.lock.yml -f pr_number="$PR"
fi
fi
echo "::endgroup::"
done
- name: Rebase PRs behind main
if: steps.find-prs.outputs.has_prs == 'true'
env:
PR_NUMBERS: ${{ steps.find-prs.outputs.pr_numbers }}
run: |
set -uo pipefail
OWNER="${GITHUB_REPOSITORY_OWNER}"
REPO="${GITHUB_REPOSITORY#*/}"
TOTAL_REBASED=0
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
for PR in $PR_NUMBERS; do
echo "::group::Checking rebase for PR #${PR}"
PR_DATA=$(gh api graphql -f query='
query($owner: String!, $repo: String!, $pr: Int!) {
repository(owner: $owner, name: $repo) {
pullRequest(number: $pr) {
headRefName
mergeStateStatus
labels(first: 100) { nodes { name } }
}
}
}' -f owner="$OWNER" -f repo="$REPO" -F pr="$PR") || {
echo " ⚠️ PR #${PR}: Failed to query. Skipping."
echo "::endgroup::"
continue
}
MERGE_STATE=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.mergeStateStatus')
BRANCH=$(echo "$PR_DATA" | jq -r '.data.repository.pullRequest.headRefName')
HAS_STUCK_REBASE=$(echo "$PR_DATA" | jq -r '[.data.repository.pullRequest.labels.nodes[].name] | any(. == "aw-pr-stuck:rebase") | if . then "yes" else "" end')
echo " Branch: $BRANCH | Merge state: $MERGE_STATE"
if [[ "$MERGE_STATE" != "BEHIND" && "$MERGE_STATE" != "DIRTY" ]]; then
echo " PR #${PR}: State is $MERGE_STATE — no rebase needed."
echo "::endgroup::"
continue
fi
if [[ -n "$HAS_STUCK_REBASE" ]]; then
echo " PR #${PR}: Already has aw-pr-stuck:rebase label. Skipping."
echo "::endgroup::"
continue
fi
echo " Attempting rebase of $BRANCH onto main..."
git fetch origin main "$BRANCH":"refs/remotes/origin/$BRANCH" || {
echo " ⚠️ PR #${PR}: Failed to fetch branch. Skipping."
echo "::endgroup::"
continue
}
git checkout -B "$BRANCH" "origin/$BRANCH"
if git rebase origin/main; then
if git push origin "$BRANCH" --force-with-lease; then
echo " ✅ PR #${PR}: Rebased and pushed successfully."
TOTAL_REBASED=$((TOTAL_REBASED + 1))
else
echo " ⚠️ PR #${PR}: Rebase succeeded but push failed."
gh pr comment "$PR" --body "⚠️ Pipeline orchestrator: rebase succeeded but force-push failed. Manual intervention needed."
gh pr edit "$PR" --add-label "aw-pr-stuck:rebase"
fi
else
git rebase --abort 2>/dev/null || true
echo " ❌ PR #${PR}: Rebase conflicts detected."
gh pr comment "$PR" --body "❌ Pipeline orchestrator: rebase onto main failed due to conflicts. Manual rebase needed."
gh pr edit "$PR" --add-label "aw-pr-stuck:rebase"
fi
git checkout --detach HEAD 2>/dev/null || true
echo "::endgroup::"
done
echo "✅ PRs rebased: ${TOTAL_REBASED}"