Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/infrahub/trigger/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from infrahub.schema.triggers import TRIGGER_SCHEMA_UPDATED
from infrahub.trigger.models import TriggerDefinition
from infrahub.trigger.system import TRIGGER_CRASH_ZOMBIE_FLOWS
from infrahub.webhook.triggers import TRIGGER_WEBHOOK_CONFIGURE
from infrahub.webhook.triggers import TRIGGER_KEYVALUE_WEBHOOK_INVALIDATE, TRIGGER_WEBHOOK_CONFIGURE

builtin_triggers: list[TriggerDefinition] = [
TRIGGER_ACTION_RULE_UPDATE,
Expand All @@ -20,6 +20,7 @@
TRIGGER_CRASH_ZOMBIE_FLOWS,
TRIGGER_DISPLAY_LABELS_ALL_SCHEMA,
TRIGGER_HFID_ALL_SCHEMA,
TRIGGER_KEYVALUE_WEBHOOK_INVALIDATE,
TRIGGER_PROFILE_REFRESH_SETUP,
TRIGGER_SCHEMA_UPDATED,
TRIGGER_WEBHOOK_CONFIGURE,
Expand Down
27 changes: 18 additions & 9 deletions backend/infrahub/webhook/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import json
import logging
import os
from typing import TYPE_CHECKING, Any, Literal
from enum import StrEnum
from typing import TYPE_CHECKING, Any, assert_never
from uuid import UUID, uuid4

from pydantic import BaseModel, ConfigDict, Field, computed_field
Expand Down Expand Up @@ -117,27 +118,35 @@ def from_event(cls, event_id: str, event_type: str, event_occured_at: str, event
)


class HeaderKind(StrEnum):
STATIC = "static"
ENVIRONMENT = "environment"


class WebhookHeaderResolutionError(Exception):
pass


class WebhookHeader(BaseModel):
key: str
value: str
kind: Literal["static", "environment"]
kind: HeaderKind

def resolve(self) -> str:
"""Resolve the header value based on its kind.

Raises WebhookHeaderResolutionError if the value cannot be resolved.
"""
if self.kind == "static":
return self.value

resolved = os.environ.get(self.value)
if resolved is None:
raise WebhookHeaderResolutionError(f"Environment variable '{self.value}' not found")
return resolved
match self.kind:
case HeaderKind.STATIC:
return self.value
case HeaderKind.ENVIRONMENT:
resolved = os.environ.get(self.value)
if resolved is None:
raise WebhookHeaderResolutionError(f"Environment variable '{self.value}' not found")
return resolved
case _:
assert_never(self.kind)


class Webhook(BaseModel):
Expand Down
22 changes: 22 additions & 0 deletions backend/infrahub/webhook/tasks/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from prefect import task
from prefect.cache_policies import NONE
from prefect.logging import get_run_logger

from infrahub.workers.dependencies import get_cache

if TYPE_CHECKING:
from collections.abc import Set as AbstractSet


@task(name="webhook-invalidate-cache", task_run_name="Invalidate webhook cache", cache_policy=NONE)
async def invalidate_webhook_cache(webhook_ids: AbstractSet[str]) -> None:
"""Delete cached webhook data for the given webhook IDs."""
cache = await get_cache()
log = get_run_logger()
for wid in webhook_ids:
await cache.delete(key=f"webhook:{wid}")
log.info(f"Invalidated cache for {len(webhook_ids)} webhook(s)")
12 changes: 5 additions & 7 deletions backend/infrahub/webhook/tasks/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

from infrahub.trigger.models import ExecuteWorkflow, TriggerType
from infrahub.trigger.setup import gather_all_automations, setup_triggers_specific
from infrahub.workers.dependencies import get_cache, get_client, get_database
from infrahub.workers.dependencies import get_client, get_database

from ..constants import EVENT_TO_ACTION, WebhookAction
from ..gather import gather_trigger_webhook
from ..models import WebhookTriggerDefinition
from .cache import invalidate_webhook_cache

if TYPE_CHECKING:
from prefect import Flow, State
Expand Down Expand Up @@ -133,8 +134,7 @@ async def _configure_one(
else:
log.info(f"Webhook {webhook_name} is disabled, no automation to delete")

cache = await get_cache()
await cache.delete(key=f"webhook:{webhook.id}")
await invalidate_webhook_cache(webhook_ids={webhook.id})
return

# Query the deployment associated with the trigger to have its ID
Expand All @@ -156,8 +156,7 @@ async def _configure_one(
await prefect_client.create_automation(automation=automation)
log.info(f"Automation {trigger.generate_name()} created")

cache = await get_cache()
await cache.delete(key=f"webhook:{webhook.id}")
await invalidate_webhook_cache(webhook_ids={webhook.id})


async def _delete_automation(
Expand All @@ -177,8 +176,7 @@ async def _delete_automation(
await prefect_client.delete_automation(automation_id=existing_automation.id)
log.info(f"Automation {automation_name} deleted")

cache = await get_cache()
await cache.delete(key=f"webhook:{webhook_id}")
await invalidate_webhook_cache(webhook_ids={webhook_id})


async def _reconcile_all() -> None:
Expand Down
69 changes: 69 additions & 0 deletions backend/infrahub/webhook/tasks/invalidate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from prefect import flow
from prefect.logging import get_run_logger
from prefect.runtime import flow_run

from infrahub.core.constants import InfrahubKind
from infrahub.core.manager import NodeManager
from infrahub.workers.dependencies import get_database

from .cache import invalidate_webhook_cache

if TYPE_CHECKING:
from prefect import Flow, State
from prefect.client.schemas.objects import FlowRun


def _invalidate_webhook_headers_run_name() -> str:
params = flow_run.parameters
event_data = params.get("event_data")
keyvalue_id = event_data["node_id"] if event_data else "unknown"
return f"Invalidate webhook headers (KeyValue {keyvalue_id})"


async def _invalidate_webhook_headers_on_failure(flow: Flow, flow_run: FlowRun, state: State) -> None: # noqa: ARG001
log = get_run_logger()
event_data = flow_run.parameters.get("event_data")
keyvalue_id = event_data.get("node_id") if event_data else None
log.error(
"Webhook header invalidation failed: keyvalue_id=%s state_message=%s",
keyvalue_id,
state.message,
)


@flow(
name="webhook-invalidate-headers",
flow_run_name=_invalidate_webhook_headers_run_name,
on_failure=[_invalidate_webhook_headers_on_failure],
)
async def invalidate_webhook_headers(
event_type: str | None = None, # noqa: ARG001
event_data: dict | None = None,
) -> None:
"""Resolve webhooks referencing a KeyValue node and invalidate their cache."""
log = get_run_logger()

keyvalue_id = event_data["node_id"] if event_data else None
if not keyvalue_id:
log.warning("No KeyValue ID provided, skipping")
return

database = await get_database()

async with database.start_session(read_only=True) as db:
webhooks = await NodeManager.query(
Comment thread
polmichel marked this conversation as resolved.
db=db,
schema=InfrahubKind.WEBHOOK,
filters={"headers__ids": [keyvalue_id]},
branch_agnostic=True,
)
webhook_uuids = frozenset(w.id for w in webhooks)

if webhook_uuids:
await invalidate_webhook_cache(webhook_ids=webhook_uuids)
else:
log.info(f"No webhooks reference KeyValue {keyvalue_id}")
11 changes: 6 additions & 5 deletions backend/infrahub/webhook/tasks/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING

import ujson
from infrahub_sdk import InfrahubClient # noqa: TC002 needed for prefect flow
Expand All @@ -9,11 +9,12 @@
from prefect.cache_policies import NONE
from prefect.logging import get_run_logger

from infrahub.core.constants import InfrahubKind
from infrahub.message_bus.types import KVTTL
from infrahub.workers.dependencies import get_cache, get_client, get_http
from infrahub.workflows.utils import add_tags

from ..models import CustomWebhook, EventContext, StandardWebhook, TransformWebhook, Webhook, WebhookHeader
from ..models import CustomWebhook, EventContext, HeaderKind, StandardWebhook, TransformWebhook, Webhook, WebhookHeader

if TYPE_CHECKING:
from httpx import Response
Expand All @@ -36,9 +37,9 @@ async def webhook_send(webhook: Webhook, context: EventContext, event_data: dict
return response


KIND_MAP: dict[str, Literal["static", "environment"]] = {
"CoreStaticKeyValue": "static",
"CoreEnvKeyValue": "environment",
KIND_MAP: dict[str, HeaderKind] = {
InfrahubKind.STATICKEYVALUE: HeaderKind.STATIC,
InfrahubKind.ENVKEYVALUE: HeaderKind.ENVIRONMENT,
}


Expand Down
30 changes: 28 additions & 2 deletions backend/infrahub/webhook/triggers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from infrahub.core.constants import InfrahubKind
from infrahub.events.node_action import NodeCreatedEvent, NodeDeletedEvent, NodeUpdatedEvent
from infrahub.trigger.models import BuiltinTriggerDefinition, EventTrigger, ExecuteWorkflow
from infrahub.workflows.catalogue import WEBHOOK_CONFIGURE
from infrahub.workflows.catalogue import WEBHOOK_CONFIGURE, WEBHOOK_INVALIDATE_HEADERS

TRIGGER_WEBHOOK_CONFIGURE = BuiltinTriggerDefinition(
name="webhook-configure",
trigger=EventTrigger(
events={"infrahub.node.created", "infrahub.node.updated", "infrahub.node.deleted"},
events={NodeCreatedEvent.event_name, NodeUpdatedEvent.event_name, NodeDeletedEvent.event_name},
match={
"infrahub.node.kind": [InfrahubKind.CUSTOMWEBHOOK, InfrahubKind.STANDARDWEBHOOK],
},
Expand All @@ -23,3 +24,28 @@
),
],
)

TRIGGER_KEYVALUE_WEBHOOK_INVALIDATE = BuiltinTriggerDefinition(
name="webhook-keyvalue-invalidate",
trigger=EventTrigger(
events={NodeUpdatedEvent.event_name},
match={
"infrahub.node.kind": [
InfrahubKind.STATICKEYVALUE,
InfrahubKind.ENVKEYVALUE,
],
},
),
actions=[
ExecuteWorkflow(
workflow=WEBHOOK_INVALIDATE_HEADERS,
parameters={
"event_type": "{{ event.event }}",
"event_data": {
"__prefect_kind": "json",
"value": {"__prefect_kind": "jinja", "template": "{{ event.payload['data'] | tojson }}"},
},
},
),
],
)
8 changes: 8 additions & 0 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,13 @@
function="webhook_process",
)

WEBHOOK_INVALIDATE_HEADERS = WorkflowDefinition(
name="webhook-invalidate-headers",
type=WorkflowType.INTERNAL,
module="infrahub.webhook.tasks.invalidate",
function="invalidate_webhook_headers",
)

WEBHOOK_CONFIGURE = WorkflowDefinition(
name="webhook-configure",
type=WorkflowType.INTERNAL,
Expand Down Expand Up @@ -732,6 +739,7 @@
TRIGGER_UPDATE_PYTHON_COMPUTED_ATTRIBUTES,
VALIDATE_SCHEMA_NUMBER_POOLS,
WEBHOOK_CONFIGURE,
WEBHOOK_INVALIDATE_HEADERS,
WEBHOOK_PROCESS,
]

Expand Down
Empty file.
Loading
Loading