Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Querys(Enum):
flow_run(
where: {
flow_id: { _eq: $flow_id }
state: { _in: ["Failed"] }
state: { _in: ["Success", "Failed"] }
}
order_by: { start_time: desc }
limit: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ def validate(self) -> bool:
next_last = self.runs[1] if len(self.runs) == 2 else None

failed = Constants.FLOW_FAILED_STATE.value

dbt_failed_after_created = (
last_run.task_runs.task.name in Constants.TASKS_NAME_DISABLE.value
last_run.state == failed
and last_run.task_runs.task.name in Constants.TASKS_NAME_DISABLE.value
and last_run.start_time >= self.created
and last_run.state == failed
and last_run.task_runs.state_message not in Constants.STATE_MESSAGE_IGNORE.value
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,17 @@ def set_flow_schedule(self, flow_id: str, active: bool):

return self.client.query.execute(gql(query), variable_values=variables)

def disable_unhealthy_flow_schedules(self, dry_run: bool = False) -> None:
def disable_unhealthy_flow_schedules(self) -> None:
flows_data = self.flows_failed_last_week()

flows = [FlowDisable(**flow, service=self) for flow in flows_data]

flows_to_disable = [flow for flow in flows if flow.validate()]

logger.info("Flows para ficar em alerta:")

for flow in flows:
logger.info(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=flow.runs[0].task_runs.task.name,
run_name=flow.runs[0].name,
link=Constants.PREFECT_URL_FLOW.value + flow.id,
)
)

if flows_to_disable and not dry_run:
logger.info("Flows desativados:")

if flows_to_disable:
for flow in flows_to_disable:
self.set_flow_schedule(flow_id=flow.id, active=False)

logger.info(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=flow.runs[0].task_runs.task.name,
run_name=flow.runs[0].name,
link=Constants.PREFECT_URL_FLOW.value + flow.id,
)
)

message_parts = [
self.format_flows("🚨 Flows em alerta", flows),
self.format_flows(
Expand All @@ -117,11 +96,13 @@ def format_flows(title: str, flows: list) -> str:
lines = [f"**{title}**"]
for flow in flows:
link = Constants.PREFECT_URL_FLOW.value + flow.id
lines.append(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=flow.runs[0].task_runs.task.name,
run_name=flow.runs[0].name,
link=link,
last_run = flow.runs[0]
if last_run.task_runs:
lines.append(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=last_run.task_runs.task.name,
run_name=last_run.name,
link=link,
)
)
)
return "\n".join(lines)
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,5 @@
class Command(BaseCommand):
help = "Disable unhealthy flow schedules"

def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Log flows that would be disabled without disabling them",
)

def handle(self, *args, **options):
FlowService().disable_unhealthy_flow_schedules(dry_run=options["dry_run"])
FlowService().disable_unhealthy_flow_schedules()
Loading