Feature: Run audits concurrently using concurrent_tasks setting#5718
Draft
airhorns wants to merge 3 commits intoTobikoData:mainfrom
Draft
Feature: Run audits concurrently using concurrent_tasks setting#5718airhorns wants to merge 3 commits intoTobikoData:mainfrom
airhorns wants to merge 3 commits intoTobikoData:mainfrom
Conversation
Adds two levels of audit concurrency: 1. Per-model (SnapshotEvaluator): audits within a single snapshot now run concurrently via concurrent_apply_to_values, controlled by concurrent_tasks. This benefits both plan/apply and audit-only runs. 2. Cross-model (Scheduler): when audit_only=True, all audit tasks across all snapshots are flattened into a single thread pool instead of following DAG ordering. Since audits are read-only SELECT queries with no side effects, DAG dependencies are irrelevant and all concurrent_tasks slots stay filled. The SnapshotEvaluator parameter ddl_concurrent_tasks is renamed to concurrent_tasks to reflect its broader scope. Closes TobikoData#5468 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Circuit breaker: Use a shared threading.Event to cancel remaining audit tasks when the circuit breaker fires. Previously, CircuitBreakerError was collected like any other error and all tasks ran to completion. - Nested concurrency: Pass audit_concurrent_tasks=1 from the scheduler's flat pool to the evaluator, preventing max_workers * concurrent_tasks threads from hitting the DB simultaneously. Add audit_concurrent_tasks parameter to SnapshotEvaluator.audit() for this override. - Add tests for circuit breaker short-circuiting, blocking audit error collection (NodeAuditsErrors), and nested concurrency prevention. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Apply ruff formatting to new/modified lines - Fix mypy error in test_audit_only_no_nested_concurrency: use fully mocked evaluator instead of real evaluator with replaced method, avoiding type mismatch on call_count/call_args_list Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
I'm porting a really test-heavy dbt project over to sqlmesh and noticing overall runtimes are WAY slower, and its because sqlmesh runs the audits in serial for each model, and that serial audit run blocks donwstream models from getting started. This PR adjusts the concurrenct tasks approach to apply to both models and audits run during an apply, or just all the audits when running an audit_only run.
WIP
Summary
SnapshotEvaluator.audit()usingconcurrent_apply_to_values, controlled by the existingconcurrent_tasksconnection settingSchedulerforaudit_only=Trueruns — all audit tasks are flattened into a single thread pool since audits are read-only SELECT queries and don't need DAG orderingddl_concurrent_taskstoconcurrent_tasksonSnapshotEvaluatorto reflect its broader scopeCloses #5468
🤖 Generated with Claude Code