Skip to content

Commit a331cf1

Browse files
authored
Chore: Refactor context run and _run (#3328)
1 parent c866f83 commit a331cf1

File tree

1 file changed

+80
-87
lines changed

1 file changed

+80
-87
lines changed

sqlmesh/core/context.py

Lines changed: 80 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,8 @@ def run(
617617
True if the run was successful, False otherwise.
618618
"""
619619
environment = environment or self.config.default_target_environment
620+
if not skip_janitor and environment.lower() == c.PROD:
621+
self._run_janitor()
620622

621623
self.notification_target_manager.notify(
622624
NotificationEvent.RUN_START, environment=environment
@@ -627,30 +629,76 @@ def run(
627629
)
628630
self._load_materializations_and_signals()
629631

632+
env_check_attempts_num = max(
633+
1,
634+
self.config.run.environment_check_max_wait
635+
// self.config.run.environment_check_interval,
636+
)
637+
638+
def _block_until_finalized() -> str:
639+
for _ in range(env_check_attempts_num):
640+
assert environment is not None # mypy
641+
environment_state = self.state_sync.get_environment(environment)
642+
if not environment_state:
643+
raise SQLMeshError(f"Environment '{environment}' was not found.")
644+
if environment_state.finalized_ts:
645+
return environment_state.plan_id
646+
logger.warning(
647+
"Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...",
648+
environment,
649+
environment_state.plan_id,
650+
self.config.run.environment_check_interval,
651+
)
652+
time.sleep(self.config.run.environment_check_interval)
653+
raise SQLMeshError(
654+
f"Exceeded the maximum wait time for environment '{environment}' to be ready. "
655+
"This means that the environment either failed to update or the update is taking longer than expected. "
656+
"See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings."
657+
)
658+
630659
success = False
631660
interrupted = False
632-
try:
633-
success = self._run(
634-
environment=environment,
635-
start=start,
636-
end=end,
637-
execution_time=execution_time,
638-
skip_janitor=skip_janitor,
639-
ignore_cron=ignore_cron,
640-
select_models=select_models,
641-
exit_on_env_update=exit_on_env_update is not None,
642-
)
643-
except CircuitBreakerError:
644-
interrupted = True
645-
except Exception as e:
646-
self.notification_target_manager.notify(
647-
NotificationEvent.RUN_FAILURE, traceback.format_exc()
648-
)
649-
logger.error(f"Run Failure: {traceback.format_exc()}")
650-
analytics.collector.on_run_end(
651-
run_id=analytics_run_id, succeeded=False, interrupted=False, error=e
652-
)
653-
raise e
661+
done = False
662+
while not done:
663+
plan_id_at_start = _block_until_finalized()
664+
665+
def _has_environment_changed() -> bool:
666+
assert environment is not None # mypy
667+
current_environment_state = self.state_sync.get_environment(environment)
668+
return (
669+
not current_environment_state
670+
or current_environment_state.plan_id != plan_id_at_start
671+
or not current_environment_state.finalized_ts
672+
)
673+
674+
try:
675+
success = self._run(
676+
environment,
677+
start=start,
678+
end=end,
679+
execution_time=execution_time,
680+
ignore_cron=ignore_cron,
681+
select_models=select_models,
682+
circuit_breaker=_has_environment_changed,
683+
)
684+
done = True
685+
except CircuitBreakerError:
686+
logger.warning(
687+
"Environment '%s' has been modified while running. Restarting the run...",
688+
environment,
689+
)
690+
if exit_on_env_update:
691+
interrupted = True
692+
done = True
693+
except Exception as e:
694+
self.notification_target_manager.notify(
695+
NotificationEvent.RUN_FAILURE, traceback.format_exc()
696+
)
697+
logger.error(f"Run Failure: {traceback.format_exc()}")
698+
analytics.collector.on_run_end(
699+
run_id=analytics_run_id, succeeded=False, interrupted=False, error=e
700+
)
701+
raise e
654702

655703
if success or interrupted:
656704
self.notification_target_manager.notify(
@@ -1822,41 +1870,10 @@ def _run(
18221870
start: t.Optional[TimeLike],
18231871
end: t.Optional[TimeLike],
18241872
execution_time: t.Optional[TimeLike],
1825-
skip_janitor: bool,
18261873
ignore_cron: bool,
18271874
select_models: t.Optional[t.Collection[str]],
1828-
exit_on_env_update: bool,
1875+
circuit_breaker: t.Optional[t.Callable[[], bool]],
18291876
) -> bool:
1830-
if not skip_janitor and environment.lower() == c.PROD:
1831-
self._run_janitor()
1832-
1833-
env_check_attempts_num = max(
1834-
1,
1835-
self.config.run.environment_check_max_wait
1836-
// self.config.run.environment_check_interval,
1837-
)
1838-
1839-
def _block_until_finalized() -> str:
1840-
for _ in range(env_check_attempts_num):
1841-
assert environment is not None # mypy
1842-
environment_state = self.state_sync.get_environment(environment)
1843-
if not environment_state:
1844-
raise SQLMeshError(f"Environment '{environment}' was not found.")
1845-
if environment_state.finalized_ts:
1846-
return environment_state.plan_id
1847-
logger.warning(
1848-
"Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...",
1849-
environment,
1850-
environment_state.plan_id,
1851-
self.config.run.environment_check_interval,
1852-
)
1853-
time.sleep(self.config.run.environment_check_interval)
1854-
raise SQLMeshError(
1855-
f"Exceeded the maximum wait time for environment '{environment}' to be ready. "
1856-
"This means that the environment either failed to update or the update is taking longer than expected. "
1857-
"See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings."
1858-
)
1859-
18601877
scheduler = self.scheduler(environment=environment)
18611878
snapshots = scheduler.snapshots
18621879

@@ -1870,39 +1887,15 @@ def _block_until_finalized() -> str:
18701887
model_selector = self._new_selector(models=models, dag=dag)
18711888
select_models = set(dag.subdag(*model_selector.expand_model_selections(select_models)))
18721889

1873-
done = False
1874-
while not done:
1875-
plan_id_at_start = _block_until_finalized()
1876-
1877-
def _has_environment_changed() -> bool:
1878-
assert environment is not None # mypy
1879-
current_environment_state = self.state_sync.get_environment(environment)
1880-
return (
1881-
not current_environment_state
1882-
or current_environment_state.plan_id != plan_id_at_start
1883-
or not current_environment_state.finalized_ts
1884-
)
1885-
1886-
try:
1887-
success = scheduler.run(
1888-
environment,
1889-
start=start,
1890-
end=end,
1891-
execution_time=execution_time,
1892-
ignore_cron=ignore_cron,
1893-
circuit_breaker=_has_environment_changed,
1894-
selected_snapshots=select_models,
1895-
)
1896-
done = True
1897-
except CircuitBreakerError:
1898-
logger.warning(
1899-
"Environment '%s' has been modified while running. Restarting the run...",
1900-
environment,
1901-
)
1902-
if exit_on_env_update:
1903-
raise
1904-
1905-
return success
1890+
return scheduler.run(
1891+
environment,
1892+
start=start,
1893+
end=end,
1894+
execution_time=execution_time,
1895+
ignore_cron=ignore_cron,
1896+
circuit_breaker=circuit_breaker,
1897+
selected_snapshots=select_models,
1898+
)
19061899

19071900
def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None:
19081901
self._scheduler.create_plan_evaluator(self).evaluate(

0 commit comments

Comments
 (0)