Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 127 additions & 100 deletions backend/infrahub/core/branch/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import TYPE_CHECKING, Any, Sequence
from uuid import uuid4

import pydantic
Expand All @@ -14,7 +14,7 @@
from infrahub.core.branch import Branch
from infrahub.core.branch.enums import BranchStatus
from infrahub.core.changelog.diff import DiffChangelogCollector, MigrationTracker
from infrahub.core.constants import MutationAction
from infrahub.core.constants import DiffAction, MutationAction
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.diff.diff_locker import DiffLocker
from infrahub.core.diff.ipam_diff_parser import IpamDiffParser
Expand All @@ -24,6 +24,7 @@
from infrahub.core.diff.repository.repository import DiffRepository
from infrahub.core.graph import GRAPH_VERSION
from infrahub.core.merge import BranchMerger
from infrahub.core.merge.merge_locker import MergeLocker
from infrahub.core.migrations.exceptions import MigrationFailureError
from infrahub.core.migrations.runner import MigrationRunner
from infrahub.core.schema.update_coordinator import MigrationExecutor, SchemaUpdateCoordinator
Expand Down Expand Up @@ -57,6 +58,12 @@
)
from infrahub.workflows.utils import add_tags

if TYPE_CHECKING:
from logging import Logger, LoggerAdapter

from infrahub.core.changelog.models import NodeChangelog
from infrahub.database import InfrahubDatabase


@flow(name="branch-migrate", flow_run_name="Apply migrations to branch {branch}")
async def migrate_branch(branch: str, context: InfrahubContext, send_events: bool = True) -> None:
Expand Down Expand Up @@ -273,115 +280,26 @@ async def merge_branch(branch: str, context: InfrahubContext, proposed_change_id

obj = await Branch.get_by_name(db=db, name=branch)
default_branch = await registry.get_branch(db=db, branch=registry.default_branch)
component_registry = get_component_registry()
merge_event = BranchMergedEvent(
branch_name=obj.name,
branch_id=str(obj.get_uuid()),
proposed_change_id=proposed_change_id,
meta=EventMeta.from_context(context=context, branch=registry.get_global_branch()),
)

merger: BranchMerger | None = None
workflow = get_workflow()
merge_at = Timestamp()
pre_merge_schema = registry.schema.get_schema_branch(name=registry.default_branch).duplicate()
async with lock.registry.global_graph_lock():
diff_repository = await component_registry.get_component(DiffRepository, db=db, branch=obj)
diff_coordinator = await component_registry.get_component(DiffCoordinator, db=db, branch=obj)
diff_merger = await component_registry.get_component(DiffMerger, db=db, branch=obj)
merger = BranchMerger(
db=db,
diff_coordinator=diff_coordinator,
diff_merger=diff_merger,
diff_repository=diff_repository,
source_branch=obj,
diff_locker=DiffLocker(),
workflow=workflow,
)
branch_diff = await merger.merge(at=merge_at)
merge_locker = MergeLocker()
async with merge_locker.acquire_global_lock():
obj = await Branch.get_by_name(db=db, name=branch)
if obj.status != BranchStatus.OPEN:
log.info(f"Branch '{branch}' is not open (status={obj.status}), skipping merge")
return

changelog_collector = DiffChangelogCollector(diff=branch_diff, branch=obj, db=db)
node_events = changelog_collector.collect_changelogs()

# Handle schema updates and migrations after merge
if merger and await merger.has_schema_changes():
# Load the updated schema from DB after merge
log.info("Loading updated schema")
updated_schema = await registry.schema.load_schema_from_db(
node_events = await _do_merge_branch(
db=db,
branch=merger.destination_branch,
)
log.info("Calculating migrations")
migrations = await merger.calculate_migrations(target_schema=updated_schema)

# Use coordinator to update registry and run migrations with rollback on failure
log.info("Running migrations")
coordinator = SchemaUpdateCoordinator(
db=db,
branch=merger.destination_branch,
schema_manager=registry.schema,
origin_schema=pre_merge_schema,
workflow=workflow,
context=context,
migration_executor=MigrationExecutor.WORKFLOW,
logger=log,
)
await coordinator.execute(
candidate_schema=updated_schema,
at=merge_at,
migrations=migrations,
update_db=False, # Schema nodes already written by merge
update_registry=True,
user_id=context.account.account_id,
)
log.info("Migrations completed")
# -------------------------------------------------------------
# Trigger the reconciliation of IPAM data after the merge
# -------------------------------------------------------------
diff_parser = await component_registry.get_component(IpamDiffParser, db=db, branch=obj)
ipam_node_details = await diff_parser.get_changed_ipam_node_details(
source_branch_name=obj.name,
target_branch_name=registry.default_branch,
)
if ipam_node_details:
await get_workflow().submit_workflow(
workflow=IPAM_RECONCILIATION,
log=log,
obj=obj,
context=context,
parameters={"branch": registry.default_branch, "ipam_node_details": ipam_node_details},
)
# -------------------------------------------------------------
# remove tracking ID from the diff because there is no diff after the merge
# -------------------------------------------------------------
diff_repository = await component_registry.get_component(DiffRepository, db=db, branch=obj)
await diff_repository.mark_tracking_ids_merged(tracking_ids=[BranchTrackingId(name=obj.name)])
await diff_repository.freeze_diffs_for_branch(branch_name=obj.name)

# -------------------------------------------------------------
# Set branch status to MERGED to make it read-only
# -------------------------------------------------------------
obj.status = BranchStatus.MERGED
await obj.save(db=db)
registry.branch[obj.name] = obj

# -------------------------------------------------------------
# Cancel any remaining open proposed changes for this merged branch
# -------------------------------------------------------------
await get_workflow().submit_workflow(
workflow=BRANCH_CANCEL_PROPOSED_CHANGES,
context=context,
parameters={"branch_name": obj.name},
)

# -------------------------------------------------------------
# Generate an event to indicate that a branch has been merged
# NOTE: we still need to convert this event and potentially pull
# some tasks currently executed based on the event into this workflow
# -------------------------------------------------------------
await get_workflow().submit_workflow(
workflow=BRANCH_MERGE_POST_PROCESS,
context=context,
parameters={"source_branch": obj.name, "target_branch": registry.default_branch},
)

events: list[InfrahubEvent] = [merge_event]

Expand All @@ -402,6 +320,115 @@ async def merge_branch(branch: str, context: InfrahubContext, proposed_change_id
await event_service.send(event=event)


async def _do_merge_branch(
db: InfrahubDatabase, log: Logger | LoggerAdapter, obj: Branch, context: InfrahubContext
) -> Sequence[tuple[DiffAction, NodeChangelog]]:
component_registry = get_component_registry()

merger: BranchMerger | None = None
workflow = get_workflow()
merge_at = Timestamp()
pre_merge_schema = registry.schema.get_schema_branch(name=registry.default_branch).duplicate()
async with lock.registry.global_graph_lock():
diff_repository = await component_registry.get_component(DiffRepository, db=db, branch=obj)
diff_coordinator = await component_registry.get_component(DiffCoordinator, db=db, branch=obj)
diff_merger = await component_registry.get_component(DiffMerger, db=db, branch=obj)
merger = BranchMerger(
db=db,
diff_coordinator=diff_coordinator,
diff_merger=diff_merger,
diff_repository=diff_repository,
source_branch=obj,
diff_locker=DiffLocker(),
workflow=workflow,
)
branch_diff = await merger.merge(at=merge_at)

changelog_collector = DiffChangelogCollector(diff=branch_diff, branch=obj, db=db)
node_events = changelog_collector.collect_changelogs()

# Handle schema updates and migrations after merge
if merger and await merger.has_schema_changes():
# Load the updated schema from DB after merge
log.info("Loading updated schema")
updated_schema = await registry.schema.load_schema_from_db(
db=db,
branch=merger.destination_branch,
)
log.info("Calculating migrations")
migrations = await merger.calculate_migrations(target_schema=updated_schema)

# Use coordinator to update registry and run migrations with rollback on failure
log.info("Running migrations")
coordinator = SchemaUpdateCoordinator(
db=db,
branch=merger.destination_branch,
schema_manager=registry.schema,
origin_schema=pre_merge_schema,
workflow=workflow,
context=context,
migration_executor=MigrationExecutor.WORKFLOW,
logger=log,
)
await coordinator.execute(
candidate_schema=updated_schema,
at=merge_at,
migrations=migrations,
update_db=False, # Schema nodes already written by merge
update_registry=True,
user_id=context.account.account_id,
)
log.info("Migrations completed")
# -------------------------------------------------------------
# Trigger the reconciliation of IPAM data after the merge
# -------------------------------------------------------------
diff_parser = await component_registry.get_component(IpamDiffParser, db=db, branch=obj)
ipam_node_details = await diff_parser.get_changed_ipam_node_details(
source_branch_name=obj.name,
target_branch_name=registry.default_branch,
)
if ipam_node_details:
await workflow.submit_workflow(
workflow=IPAM_RECONCILIATION,
context=context,
parameters={"branch": registry.default_branch, "ipam_node_details": ipam_node_details},
)
# -------------------------------------------------------------
# remove tracking ID from the diff because there is no diff after the merge
# -------------------------------------------------------------
await diff_repository.mark_tracking_ids_merged(tracking_ids=[BranchTrackingId(name=obj.name)])
await diff_repository.freeze_diffs_for_branch(branch_name=obj.name)

# -------------------------------------------------------------
# Set branch status to MERGED to make it read-only
# -------------------------------------------------------------
obj.status = BranchStatus.MERGED
await obj.save(db=db)
registry.branch[obj.name] = obj

# -------------------------------------------------------------
# Cancel any remaining open proposed changes for this merged branch
# -------------------------------------------------------------
await workflow.submit_workflow(
workflow=BRANCH_CANCEL_PROPOSED_CHANGES,
context=context,
parameters={"branch_name": obj.name},
)

# -------------------------------------------------------------
# Generate an event to indicate that a branch has been merged
# NOTE: we still need to convert this event and potentially pull
# some tasks currently executed based on the event into this workflow
# -------------------------------------------------------------
await workflow.submit_workflow(
workflow=BRANCH_MERGE_POST_PROCESS,
context=context,
parameters={"source_branch": obj.name, "target_branch": registry.default_branch},
)

return node_events


@flow(name="branch-delete", flow_run_name="Delete branch {branch}")
async def delete_branch(branch: str, context: InfrahubContext) -> None:
await add_tags(branches=[branch])
Expand Down
3 changes: 3 additions & 0 deletions backend/infrahub/core/merge/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from infrahub.core.merge.branch_merger import BranchMerger

__all__ = ["BranchMerger"]
Comment on lines +1 to +3
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this __init__py be empty or did you add it to avoid changing the existing imports?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second one. I added it for backwards compatibility with existing imports

Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
from infrahub.core.registry import registry
from infrahub.core.timestamp import Timestamp
from infrahub.exceptions import MergeFailedError, ValidationError
from infrahub.git.models import GitRepositoryMerge
from infrahub.log import get_logger

from ..git.models import GitRepositoryMerge
from ..workflows.catalogue import GIT_REPOSITORIES_MERGE
from infrahub.workflows.catalogue import GIT_REPOSITORIES_MERGE

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand Down Expand Up @@ -155,24 +154,24 @@ async def merge(
"""Merge the current branch into main."""
if self.source_branch.name == registry.default_branch:
raise ValidationError(f"Unable to merge the branch '{self.source_branch.name}' into itself")

log.info("Updating diff for merge")
await self.diff_coordinator.update_branch_diff(
base_branch=self.destination_branch, diff_branch=self.source_branch
)
log.info("Diff updated for merge")

log.info("Acquiring lock for merge")
log.info("Acquiring diff lock for merge")
async with self.diff_locker.acquire_lock(
target_branch_name=self.destination_branch.name,
source_branch_name=self.source_branch.name,
is_incremental=False,
):
log.info("Lock acquired for merge")
log.info("Diff lock acquired for merge")
try:
errors: list[str] = []
async for conflict_path, conflict in self.diff_repository.get_all_conflicts_for_diff(
diff_branch_name=self.source_branch.name, tracking_id=BranchTrackingId(name=self.source_branch.name)
diff_branch_name=self.source_branch.name,
tracking_id=BranchTrackingId(name=self.source_branch.name),
):
if conflict.selected_branch is None or conflict.resolvable is False:
errors.append(conflict_path)
Expand Down
11 changes: 11 additions & 0 deletions backend/infrahub/core/merge/merge_locker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from infrahub import lock


class MergeLocker:
lock_namespace = "merge"

def __init__(self) -> None:
self.lock_registry = lock.registry

def acquire_global_lock(self) -> lock.InfrahubLock:
return self.lock_registry.get(name="all_branches", namespace=self.lock_namespace)
Loading