Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 132 additions & 60 deletions data_rentgen/db/repositories/job_dependency.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
# SPDX-FileCopyrightText: 2024-present MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from datetime import datetime
from typing import Literal

from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, literal, select, tuple_
from sqlalchemy.orm import aliased
from sqlalchemy import (
ARRAY,
CTE,
CompoundSelect,
DateTime,
Integer,
Select,
any_,
bindparam,
cast,
func,
literal,
or_,
select,
tuple_,
)

from data_rentgen.db.models.input import Input
from data_rentgen.db.models.job_dependency import JobDependency
from data_rentgen.db.models.output import Output
from data_rentgen.db.repositories.base import Repository
from data_rentgen.dto import JobDependencyDTO

Expand All @@ -27,59 +44,6 @@
JobDependency.to_job_id == bindparam("to_job_id"),
)

upstream_jobs_query_base_part = (
select(
JobDependency,
literal(1).label("depth"),
)
.select_from(JobDependency)
.where(JobDependency.to_job_id == any_(bindparam("job_ids")))
)
upstream_jobs_query_cte = upstream_jobs_query_base_part.cte(name="upstream_jobs_query", recursive=True)

upstream_jobs_query_recursive_part = (
select(
JobDependency,
(upstream_jobs_query_cte.c.depth + 1).label("depth"),
)
.select_from(JobDependency)
.where(
upstream_jobs_query_cte.c.depth < bindparam("depth"),
JobDependency.to_job_id == upstream_jobs_query_cte.c.from_job_id,
)
)


upstream_jobs_query_cte = upstream_jobs_query_cte.union(upstream_jobs_query_recursive_part)
upstream_entities_query = select(aliased(JobDependency, upstream_jobs_query_cte))

downstream_jobs_query_base_part = (
select(
JobDependency,
literal(1).label("depth"),
)
.select_from(JobDependency)
.where(JobDependency.from_job_id == any_(bindparam("job_ids")))
)
downstream_jobs_query_cte = downstream_jobs_query_base_part.cte(name="downstream_jobs_query", recursive=True)

downstream_jobs_query_recursive_part = (
select(
JobDependency,
(downstream_jobs_query_cte.c.depth + 1).label("depth"),
)
.select_from(JobDependency)
.where(
downstream_jobs_query_cte.c.depth < bindparam("depth"),
JobDependency.from_job_id == downstream_jobs_query_cte.c.to_job_id,
)
)

downstream_jobs_query_cte = downstream_jobs_query_cte.union(downstream_jobs_query_recursive_part)
downstream_entities_query = select(aliased(JobDependency, downstream_jobs_query_cte))

both_entities_query = select(aliased(JobDependency, (upstream_entities_query.union(downstream_entities_query)).cte()))


class JobDependencyRepository(Repository[JobDependency]):
async def fetch_bulk(
Expand Down Expand Up @@ -115,18 +79,31 @@ async def get_dependencies(
job_ids: list[int],
direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"],
depth: int,
since: datetime | None = None,
until: datetime | None = None,
*,
infer_from_lineage: bool = False,
) -> list[JobDependency]:
core_query = self._get_core_hierarchy_query(include_indirect=infer_from_lineage)

query: Select | CompoundSelect
match direction:
case "UPSTREAM":
query = upstream_entities_query
query = self._get_upstream_hierarchy_query(core_query)
case "DOWNSTREAM":
query = downstream_entities_query
query = self._get_downstream_hierarchy_query(core_query)
case "BOTH":
query = both_entities_query
query = self._get_upstream_hierarchy_query(core_query).union(
self._get_downstream_hierarchy_query(core_query)
)

result = await self._session.scalars(query, {"job_ids": job_ids, "depth": depth})
return list(result.all())
result = await self._session.execute(
query, {"job_ids": job_ids, "depth": depth, "since": since, "until": until}
)
return [
JobDependency(from_job_id=item.from_job_id, to_job_id=item.to_job_id, type=item.type)
for item in result.all()
]

async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None:
return await self._session.scalar(
Expand All @@ -146,3 +123,98 @@ async def _create(self, job_dependency: JobDependencyDTO) -> JobDependency:
self._session.add(result)
await self._session.flush([result])
return result

def _get_core_hierarchy_query(
self,
*,
include_indirect: bool = False,
) -> CTE:
query: Select | CompoundSelect
query = select(
JobDependency.from_job_id,
JobDependency.to_job_id,
JobDependency.type,
)
if include_indirect:
query = query.union(
select(
Output.job_id.label("from_job_id"),
Input.job_id.label("to_job_id"),
literal("INFERRED_FROM_LINEAGE").label("type"),
)
.join(Input, Output.operation_id == Input.operation_id)
.where(
or_(
bindparam("since", type_=DateTime(timezone=True)).is_(None),
Input.created_at >= bindparam("since"),
),
or_(
bindparam("until", type_=DateTime(timezone=True)).is_(None),
Output.created_at <= bindparam("until"),
),
Output.created_at >= Input.created_at,
)
)
return query.cte("jobs_hierarchy_core_query")

def _get_upstream_hierarchy_query(
self,
core_query: CTE,
) -> Select:
base_part = select(
core_query.c.from_job_id.label("from_job_id"),
core_query.c.to_job_id.label("to_job_id"),
core_query.c.type.label("type"),
literal(1).label("depth"),
).where(core_query.c.to_job_id == any_(bindparam("job_ids")))

base_query_cte = base_part.cte(name="upstream_jobs_query", recursive=True)

recursive_part = (
select(
core_query.c.from_job_id.label("from_job_id"),
core_query.c.to_job_id.label("to_job_id"),
core_query.c.type.label("type"),
(base_query_cte.c.depth + 1).label("depth"),
)
.join(
core_query,
core_query.c.to_job_id == base_query_cte.c.from_job_id,
)
.where(
base_query_cte.c.depth < bindparam("depth"),
)
)

return select(base_query_cte.union(recursive_part))

def _get_downstream_hierarchy_query(
self,
core_query: CTE,
) -> Select:
base_part = select(
core_query.c.from_job_id.label("from_job_id"),
core_query.c.to_job_id.label("to_job_id"),
core_query.c.type.label("type"),
literal(1).label("depth"),
).where(core_query.c.from_job_id == any_(bindparam("job_ids")))

base_part_cte = base_part.cte(name="downstream_jobs_query", recursive=True)

recursive_part = (
select(
core_query.c.from_job_id.label("from_job_id"),
core_query.c.to_job_id.label("to_job_id"),
core_query.c.type.label("type"),
(base_part_cte.c.depth + 1).label("depth"),
)
.join(
core_query,
core_query.c.from_job_id == base_part_cte.c.to_job_id,
)
.where(
base_part_cte.c.depth < bindparam("depth"),
)
)

return select(base_part_cte.union(recursive_part))
3 changes: 3 additions & 0 deletions data_rentgen/server/api/v1/router/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ async def get_job_hierarchy(
start_node_id=query_args.start_node_id,
direction=query_args.direction,
depth=query_args.depth,
infer_from_lineage=query_args.infer_from_lineage,
since=query_args.since,
until=query_args.until,
)
return JobHierarchyResponseV1(
relations=JobHierarchyRelationsV1(
Expand Down
37 changes: 35 additions & 2 deletions data_rentgen/server/schemas/v1/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from datetime import datetime
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator, model_validator

from data_rentgen.server.schemas.v1.job_response import JobResponseV1
from data_rentgen.server.schemas.v1.pagination import PaginateQueryV1
Expand Down Expand Up @@ -118,4 +119,36 @@ class JobHierarchyQueryV1(BaseModel):
examples=["DOWNSTREAM", "UPSTREAM", "BOTH"],
)
depth: int = Field(description="Levels of dependencies to dive into", default=1)
model_config = ConfigDict(extra="ignore")
infer_from_lineage: bool = Field(
default=False,
description="Include or not indirect connections between jobs",
examples=[True, False],
)
since: datetime | None = Field(
default=None,
description="",
examples=["2008-09-15T15:53:00+05:00"],
)
until: datetime | None = Field(
default=None,
description="",
examples=["2008-09-15T15:53:00+05:00"],
)

model_config = ConfigDict(extra="forbid")

@field_validator("until", mode="after")
@classmethod
def _check_until(cls, value: datetime | None, info: ValidationInfo) -> datetime | None:
since = info.data.get("since")
if since and value and since.timestamp() >= value.timestamp():
msg = "'since' should be less than 'until'"
raise ValueError(msg)
return value

@model_validator(mode="after")
def _check_indirect_flag(self):
if self.infer_from_lineage and not self.since:
msg = "Inferring from lineage graph only possible with 'since' param"
raise ValueError(msg)
return self
Comment on lines +149 to +154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@model_validator(mode="after")
def _check_indirect_flag(self):
if self.infer_from_lineage and not self.since:
msg = "Inferring from lineage graph only possible with 'since' param"
raise ValueError(msg)
return self
@field_validator("since", mode="after")
@classmethod
def _check_since(cls, value: datetime | None, info: ValidationInfo) -> datetime | None:
infer_from_lineage = info.data.get("infer_from_lineage")
if infer_from_lineage and not value:
msg = "'since' is mandatory when 'infer_from_lineage' is used"
raise ValueError(msg)
return value

13 changes: 12 additions & 1 deletion data_rentgen/server/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from collections.abc import Collection, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from itertools import groupby
from typing import Annotated, Literal

Expand Down Expand Up @@ -112,6 +113,10 @@ async def get_jobs_hierarchy(
start_node_id: int,
direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"],
depth: int,
since: datetime | None = None,
until: datetime | None = None,
*,
infer_from_lineage: bool = False,
) -> JobHierarchyResult:
logger.info(
"Get jobs hierarchy with start at job with id %s, direction %s, depth %s",
Expand All @@ -133,6 +138,9 @@ async def get_jobs_hierarchy(
job_ids=list(job_ids),
direction=direction,
depth=depth,
infer_from_lineage=infer_from_lineage,
since=since,
until=until,
)
dependency_job_ids = {d.from_job_id for d in dependencies} | {d.to_job_id for d in dependencies}
job_ids |= dependency_job_ids
Expand All @@ -142,6 +150,9 @@ async def get_jobs_hierarchy(
jobs = await self._uow.job.list_by_ids(list(job_ids))
return JobHierarchyResult(
parents={(p.parent_job_id, p.child_job_id) for p in ancestor_relations + descendant_relations},
dependencies={(d.from_job_id, d.to_job_id, d.type) for d in dependencies},
dependencies={
(d.from_job_id, d.to_job_id, d.type)
for d in sorted(dependencies, key=lambda x: (x.from_job_id, x.to_job_id, x.type))
},
jobs=[JobServiceResult.from_orm(job) for job in jobs],
)
2 changes: 2 additions & 0 deletions docs/changelog/next_release/416.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added indirect job hierarchy dependencies inferred from lineage IO relations, with optional ``since``/``until`` bounds and ``infer_from_lineage`` request flag for ``/v1/jobs/hierarchy``.

2 changes: 1 addition & 1 deletion docs/entities/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ It contains following fields:

- ``from: Job | Run`` - entity which should be waited before current job/run will be started.
- ``to: Job | Run`` - entity which waits.
- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INDIRECT_DEPENDENCY``.
- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INFERRED_FROM_LINEAGE``.

.. image:: dependency_relation.png

Expand Down
Loading
Loading