Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
from enum import Enum


class Querys(Enum):
FLOWS_FAILED_LAST_WEEK = """
query($since: timestamptz!) {
flow(
where: {
schedule: { _is_null: false }
is_schedule_active: {_eq: true}
archived: {_eq: false}
flow_runs: {
state: { _eq: "Failed" }
start_time: { _gte: $since }
}
}
) {
id
created
name
}
}
"""

LAST_COMPLETED_RUNS_TASKS = """
query LastTwoCompletedRunsWithTasks($flow_id: uuid!) {
flow_run(
where: {
flow_id: { _eq: $flow_id }
state: { _in: ["Success", "Failed"] }
}
order_by: { start_time: desc }
limit: 2
) {
id
name
start_time
state
task_runs(
where: {
state: { _in: ["Failed"] }
}
order_by: { start_time: desc }
limit: 1) {
id
state
end_time
state_message
task {
id
name
}
}
}
}
"""


class Constants(Enum):
TASKS_NAME_DISABLE = ("run_dbt",)
FLOW_SUCCESS_STATE = "Success"
FLOW_FAILED_STATE = "Failed"

PREFECT_URL = "https://prefect.basedosdados.org/"
PREFECT_URL_FLOW = PREFECT_URL + "flow/"
PREFECT_URL_API = PREFECT_URL + "api"

DISCORD_ROLE_DADOS = "865034571469160458"
TEXT_FLOW_FORMAT = "- {run_name} | Last failure `{task_name}` | {link}"

STATE_MESSAGE_IGNORE = (
"No heartbeat detected from the remote task; marking the run as failed.",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta


def parse_datetime(value: str) -> datetime:
return datetime.fromisoformat(value)


def one_week_ago() -> str:
from django.utils import timezone

return (timezone.now() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
from typing import TYPE_CHECKING, Optional

from .constants import Constants
from .datetime_utils import parse_datetime

if TYPE_CHECKING:
from .service import FlowService


class Task:
def __init__(self, id: str, name: str):
self.id = id
self.name = name


class TaskRun:
def __init__(
self,
id: str,
state: str,
end_time: Optional[str],
state_message: str,
task: dict,
):
self.id = id
self.state = state
self.end_time = end_time
self.state_message = state_message
self.task = Task(**task)


class FlowRun:
def __init__(
self,
id: str,
name: str,
start_time: str,
state: str,
task_runs: list,
):
self.id = id
self.name = name
self.start_time = parse_datetime(start_time)
self.state = state
self.task_runs = (
TaskRun(**task_runs[0]) if task_runs else None
) # Caso não tenha segundo Run


class FlowDisable:
def __init__(self, id: str, created: str, service: "FlowService"):
self.id = id
self.created = parse_datetime(created)
self.service = service
self.runs = self.get_runs()

def get_runs(self):
response = self.service.last_completed_runs_tasks(self.id)
return [FlowRun(**run) for run in response["flow_run"]]

def validate(self) -> bool:
last_run = self.runs[0]
next_last = self.runs[1] if len(self.runs) == 2 else None

failed = Constants.FLOW_FAILED_STATE.value
dbt_failed_after_created = (
last_run.state == failed
and last_run.task_runs.task.name in Constants.TASKS_NAME_DISABLE.value
and last_run.start_time >= self.created
and last_run.task_runs.state_message not in Constants.STATE_MESSAGE_IGNORE.value
)

consecutive_failed_after_created = (
next_last
and last_run.state == failed
and next_last.state == failed
and max(last_run.start_time, next_last.start_time) >= self.created
)

return bool(dbt_failed_after_created or consecutive_failed_after_created)
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
import os
from typing import Dict

from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from loguru import logger

from backend.custom.client import send_discord_message

from .constants import Constants, Querys
from .datetime_utils import one_week_ago
from .models import FlowDisable

logger = logger.bind(module="core")


class MakeClient:
def __init__(self):
self.graphql_url = Constants.PREFECT_URL_API.value
self.query = self.make_client({"Authorization": f"Bearer {os.getenv('API_KEY_PREFECT')}"})

def make_client(self, headers: Dict[str, str] = None) -> Client:
transport = RequestsHTTPTransport(url=self.graphql_url, headers=headers, use_json=True)

return Client(transport=transport, fetch_schema_from_transport=False)


class FlowService:
def __init__(self):
self.client = MakeClient()

def flows_failed_last_week(self) -> list:
since = one_week_ago()

variables = {"since": since}

response = self.client.query.execute(
gql(Querys.FLOWS_FAILED_LAST_WEEK.value), variable_values=variables
)

return [{"id": fail["id"], "created": fail["created"]} for fail in response["flow"]]

def last_completed_runs_tasks(self, flow_id: str):
variables = {"flow_id": flow_id}

return self.client.query.execute(
gql(Querys.LAST_COMPLETED_RUNS_TASKS.value), variable_values=variables
)

def set_flow_schedule(self, flow_id: str, active: bool):
mutation_name = "set_schedule_active" if active else "set_schedule_inactive"

query = f"""
mutation SetFlowSchedule($flow_id: UUID!) {{
{mutation_name}(
input: {{
flow_id: $flow_id
}}
) {{
success
}}
}}
"""

variables = {"flow_id": flow_id}

return self.client.query.execute(gql(query), variable_values=variables)

def disable_unhealthy_flow_schedules(self) -> None:
flows_data = self.flows_failed_last_week()

flows = [FlowDisable(**flow, service=self) for flow in flows_data]

flows_to_disable = [flow for flow in flows if flow.validate()]

if flows_to_disable:
for flow in flows_to_disable:
self.set_flow_schedule(flow_id=flow.id, active=False)

message_parts = [
self.format_flows("🚨 Flows em alerta", flows),
self.format_flows(
f"⛔ Flows desativados <@&{Constants.DISCORD_ROLE_DADOS.value}>",
flows_to_disable,
),
]

send_discord_message("\n\n".join(message_parts))

@staticmethod
def format_flows(title: str, flows: list) -> str:
if not flows:
return f"**{title}**\n_(nenhum)_"

lines = [f"**{title}**"]
for flow in flows:
link = Constants.PREFECT_URL_FLOW.value + flow.id
last_run = flow.runs[0]
if last_run.task_runs:
lines.append(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=last_run.task_runs.task.name,
run_name=last_run.name,
link=link,
)
)
return "\n".join(lines)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
from django.core.management.base import BaseCommand

from ._disable_unhealthy_flow_schedules.service import FlowService


class Command(BaseCommand):
help = "Disable unhealthy flow schedules"

def handle(self, *args, **options):
FlowService().disable_unhealthy_flow_schedules()
7 changes: 7 additions & 0 deletions backend/apps/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ def sync_database_with_prod():
call_command("fetch_metabase")
call_command("populate")
return "Sincronização concluída com sucesso"


@db_periodic_task(crontab(minute="*/20"))
@production_task
def disable_unhealthy_flow_schedules():
"""Disable unhealthy flow schedules"""
call_command("disable_unhealthy_flow_schedules")
Loading
Loading