From 0c6330ffcc4dbc9626cb3df28b3239d4966dbd93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Wed, 25 Mar 2026 11:20:01 +0300 Subject: [PATCH 1/5] [DOP-34706] add indirect dependencies into job hierarchy request --- .../db/repositories/job_dependency.py | 192 ++++++++++++------ data_rentgen/server/api/v1/router/job.py | 3 + data_rentgen/server/schemas/v1/job.py | 19 +- data_rentgen/server/services/job.py | 13 +- tests/test_server/fixtures/factories/job.py | 134 ++++++++++++ .../test_jobs/test_job_hierarchy.py | 69 +++++++ 6 files changed, 368 insertions(+), 62 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 7606d9c1..eea95cb3 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -1,11 +1,28 @@ # SPDX-FileCopyrightText: 2024-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from datetime import datetime from typing import Literal -from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, literal, select, tuple_ -from sqlalchemy.orm import aliased +from sqlalchemy import ( + ARRAY, + CTE, + CompoundSelect, + DateTime, + Integer, + Select, + any_, + bindparam, + cast, + func, + literal, + or_, + select, + tuple_, +) +from data_rentgen.db.models.input import Input from data_rentgen.db.models.job_dependency import JobDependency +from data_rentgen.db.models.output import Output from data_rentgen.db.repositories.base import Repository from data_rentgen.dto import JobDependencyDTO @@ -27,59 +44,6 @@ JobDependency.to_job_id == bindparam("to_job_id"), ) -upstream_jobs_query_base_part = ( - select( - JobDependency, - literal(1).label("depth"), - ) - .select_from(JobDependency) - .where(JobDependency.to_job_id == any_(bindparam("job_ids"))) -) -upstream_jobs_query_cte = upstream_jobs_query_base_part.cte(name="upstream_jobs_query", recursive=True) - -upstream_jobs_query_recursive_part = ( - select( - JobDependency, - (upstream_jobs_query_cte.c.depth + 1).label("depth"), - ) - .select_from(JobDependency) - .where( - upstream_jobs_query_cte.c.depth < bindparam("depth"), - JobDependency.to_job_id == upstream_jobs_query_cte.c.from_job_id, - ) -) - - -upstream_jobs_query_cte = upstream_jobs_query_cte.union(upstream_jobs_query_recursive_part) -upstream_entities_query = select(aliased(JobDependency, upstream_jobs_query_cte)) - -downstream_jobs_query_base_part = ( - select( - JobDependency, - literal(1).label("depth"), - ) - .select_from(JobDependency) - .where(JobDependency.from_job_id == any_(bindparam("job_ids"))) -) -downstream_jobs_query_cte = downstream_jobs_query_base_part.cte(name="downstream_jobs_query", recursive=True) - -downstream_jobs_query_recursive_part = ( - select( - JobDependency, - (downstream_jobs_query_cte.c.depth + 1).label("depth"), - ) - .select_from(JobDependency) - .where( - downstream_jobs_query_cte.c.depth < bindparam("depth"), - JobDependency.from_job_id == downstream_jobs_query_cte.c.to_job_id, - ) -) - -downstream_jobs_query_cte = downstream_jobs_query_cte.union(downstream_jobs_query_recursive_part) -downstream_entities_query = select(aliased(JobDependency, downstream_jobs_query_cte)) - -both_entities_query = select(aliased(JobDependency, (upstream_entities_query.union(downstream_entities_query)).cte())) - class JobDependencyRepository(Repository[JobDependency]): async def fetch_bulk( @@ -115,18 +79,31 @@ async def get_dependencies( job_ids: list[int], direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"], depth: int, + since: datetime | None = None, + until: datetime | None = None, + *, + include_indirect: bool = False, ) -> list[JobDependency]: + core_query = self._get_core_hierarchy_query(include_indirect=include_indirect) + query: Select | CompoundSelect match direction: case "UPSTREAM": - query = upstream_entities_query + query = self._get_upstream_hierarchy_query(core_query) case "DOWNSTREAM": - query = downstream_entities_query + query = self._get_downstream_hierarchy_query(core_query) case "BOTH": - query = both_entities_query + query = self._get_upstream_hierarchy_query(core_query).union( + self._get_downstream_hierarchy_query(core_query) + ) - result = await self._session.scalars(query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) + result = await self._session.execute( + query, {"job_ids": job_ids, "depth": depth, "since": since, "until": until} + ) + return [ + JobDependency(from_job_id=from_job_id, to_job_id=to_job_id, type=type_) + for (from_job_id, to_job_id, type_, _) in result.all() + ] async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: return await self._session.scalar( @@ -146,3 +123,98 @@ async def _create(self, job_dependency: JobDependencyDTO) -> JobDependency: self._session.add(result) await self._session.flush([result]) return result + + def _get_core_hierarchy_query( + self, + *, + include_indirect: bool = False, + ) -> CTE: + query: Select | CompoundSelect + query = select( + JobDependency.from_job_id, + JobDependency.to_job_id, + JobDependency.type, + ) + if include_indirect: + query = query.union( + select( + Output.job_id.label("from_job_id"), + Input.job_id.label("to_job_id"), + literal("INDIRECT_DEPENDENCY").label("type"), + ) + .join(Input, Output.operation_id == Input.operation_id) + .where( + or_( + bindparam("since", type_=DateTime(timezone=True)).is_(None), + Input.created_at >= bindparam("since"), + ), + or_( + bindparam("until", type_=DateTime(timezone=True)).is_(None), + Output.created_at <= bindparam("until"), + ), + Output.created_at >= Input.created_at, + ) + ) + return query.cte("jobs_hierarchy_core_query") + + def _get_upstream_hierarchy_query( + self, + core_query: CTE, + ) -> Select: + base_part = select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + literal(1).label("depth"), + ).where(core_query.c.to_job_id == any_(bindparam("job_ids"))) + + base_query_cte = base_part.cte(name="upstream_jobs_query", recursive=True) + + recursive_part = ( + select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + (base_query_cte.c.depth + 1).label("depth"), + ) + .join( + core_query, + core_query.c.to_job_id == base_query_cte.c.from_job_id, + ) + .where( + base_query_cte.c.depth < bindparam("depth"), + ) + ) + + return select(base_query_cte.union(recursive_part)) + + def _get_downstream_hierarchy_query( + self, + core_query: CTE, + ) -> Select: + base_part = select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + literal(1).label("depth"), + ).where(core_query.c.from_job_id == any_(bindparam("job_ids"))) + + base_part_cte = base_part.cte(name="downstream_jobs_query", recursive=True) + + recursive_part = ( + select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + (base_part_cte.c.depth + 1).label("depth"), + ) + .join( + core_query, + core_query.c.from_job_id == base_part_cte.c.to_job_id, + ) + .where( + base_part_cte.c.depth < bindparam("depth"), + ) + ) + + return select(base_part_cte.union(recursive_part)) diff --git a/data_rentgen/server/api/v1/router/job.py b/data_rentgen/server/api/v1/router/job.py index a345149b..fa13a51a 100644 --- a/data_rentgen/server/api/v1/router/job.py +++ b/data_rentgen/server/api/v1/router/job.py @@ -95,6 +95,9 @@ async def get_job_hierarchy( start_node_id=query_args.start_node_id, direction=query_args.direction, depth=query_args.depth, + include_indirect=query_args.include_indirect, + since=query_args.since, + until=query_args.until, ) return JobHierarchyResponseV1( relations=JobHierarchyRelationsV1( diff --git a/data_rentgen/server/schemas/v1/job.py b/data_rentgen/server/schemas/v1/job.py index 9d6de374..4f10f39d 100644 --- a/data_rentgen/server/schemas/v1/job.py +++ b/data_rentgen/server/schemas/v1/job.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations +from datetime import datetime from typing import Literal from pydantic import BaseModel, ConfigDict, Field @@ -118,4 +119,20 @@ class JobHierarchyQueryV1(BaseModel): examples=["DOWNSTREAM", "UPSTREAM", "BOTH"], ) depth: int = Field(description="Levels of dependencies to dive into", default=1) - model_config = ConfigDict(extra="ignore") + include_indirect: bool = Field( + default=False, + description="Include or not indirect connections between jobs", + examples=[True, False], + ) + since: datetime | None = Field( + default=None, + description="", + examples=["2008-09-15T15:53:00+05:00"], + ) + until: datetime | None = Field( + default=None, + description="", + examples=["2008-09-15T15:53:00+05:00"], + ) + + model_config = ConfigDict(extra="forbid") diff --git a/data_rentgen/server/services/job.py b/data_rentgen/server/services/job.py index 7ca931a5..f1eb95cd 100644 --- a/data_rentgen/server/services/job.py +++ b/data_rentgen/server/services/job.py @@ -3,6 +3,7 @@ import logging from collections.abc import Collection, Sequence from dataclasses import dataclass, field +from datetime import datetime from itertools import groupby from typing import Annotated, Literal @@ -112,6 +113,10 @@ async def get_jobs_hierarchy( start_node_id: int, direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"], depth: int, + since: datetime | None = None, + until: datetime | None = None, + *, + include_indirect: bool = False, ) -> JobHierarchyResult: logger.info( "Get jobs hierarchy with start at job with id %s, direction %s, depth %s", @@ -133,6 +138,9 @@ async def get_jobs_hierarchy( job_ids=list(job_ids), direction=direction, depth=depth, + include_indirect=include_indirect, + since=since, + until=until, ) dependency_job_ids = {d.from_job_id for d in dependencies} | {d.to_job_id for d in dependencies} job_ids |= dependency_job_ids @@ -142,6 +150,9 @@ async def get_jobs_hierarchy( jobs = await self._uow.job.list_by_ids(list(job_ids)) return JobHierarchyResult( parents={(p.parent_job_id, p.child_job_id) for p in ancestor_relations + descendant_relations}, - dependencies={(d.from_job_id, d.to_job_id, d.type) for d in dependencies}, + dependencies={ + (d.from_job_id, d.to_job_id, d.type) + for d in sorted(dependencies, key=lambda x: (x.from_job_id, x.to_job_id, x.type)) + }, jobs=[JobServiceResult.from_orm(job) for job in jobs], ) diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index 11adeea2..a3d77b45 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -1,14 +1,19 @@ from __future__ import annotations +from datetime import UTC, datetime, timedelta from random import randint from typing import TYPE_CHECKING import pytest_asyncio from data_rentgen.db.models import Job, JobDependency, TagValue +from data_rentgen.utils.uuid import generate_new_uuid from tests.test_server.fixtures.factories.base import random_string +from tests.test_server.fixtures.factories.dataset import create_dataset +from tests.test_server.fixtures.factories.input import create_input from tests.test_server.fixtures.factories.job_type import create_job_type from tests.test_server.fixtures.factories.location import create_location +from tests.test_server.fixtures.factories.output import create_output from tests.test_server.utils.delete import clean_db if TYPE_CHECKING: @@ -463,3 +468,132 @@ async def job_dependency_chain( async with async_session_maker() as async_session: await clean_db(async_session) + + +@pytest_asyncio.fixture +async def job_dependency_chain_with_indirect_dependencies( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], + job_dependency_chain: tuple[tuple[Job, Job, Job], ...], +) -> AsyncGenerator[tuple[tuple[Job, Job, Job, Job, Job], ...], None]: + """ + Extends `job_dependency_chain` with two extra parent-child chains: + - left: left_dag -> left_task -> left_spark + - right: right_dag -> right_task -> right_spark + + The chains are connected to the central fixture on task level via IO relations: + - left_task -> task1 (indirect via input/output relation) + - task3 -> right_task (indirect via input/output relation) + """ + (dag1, dag2, dag3), (task1, task2, task3), (spark1, spark2, spark3) = job_dependency_chain + + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) + job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) + job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + + left_dag = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "left_dag"}, + ) + left_task = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "left_task", "parent_job_id": left_dag.id}, + ) + left_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "left_spark", "parent_job_id": left_task.id}, + ) + + right_dag = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "right_dag"}, + ) + right_task = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "right_task", "parent_job_id": right_dag.id}, + ) + right_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "right_spark", "parent_job_id": right_task.id}, + ) + + left_dataset_location = await create_location(async_session) + left_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + right_dataset_location = await create_location(async_session) + right_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + + # Connect left chain to central chain: left_task -> task1 + left_created_at = datetime.now(tz=UTC) + left_operation_id = generate_new_uuid(left_created_at) + left_output = await create_output( + async_session, + output_kwargs={ + "created_at": left_created_at, + "operation_id": left_operation_id, + "run_id": generate_new_uuid(left_created_at), + "job_id": left_task.id, + "dataset_id": left_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": left_created_at - timedelta(seconds=1), + "operation_id": left_operation_id, + "run_id": left_output.run_id, + "job_id": task1.id, + "dataset_id": left_dataset.id, + "schema_id": None, + }, + ) + + # Connect central chain to right chain: task3 -> right_task + right_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) + right_operation_id = generate_new_uuid(right_created_at) + right_output = await create_output( + async_session, + output_kwargs={ + "created_at": right_created_at, + "operation_id": right_operation_id, + "run_id": generate_new_uuid(right_created_at), + "job_id": task3.id, + "dataset_id": right_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": right_created_at - timedelta(seconds=1), + "operation_id": right_operation_id, + "run_id": right_output.run_id, + "job_id": right_task.id, + "dataset_id": right_dataset.id, + "schema_id": None, + }, + ) + + async_session.expunge_all() + + yield ( + (left_dag, dag1, dag2, dag3, right_dag), + (left_task, task1, task2, task3, right_task), + (left_spark, spark1, spark2, spark3, right_spark), + ) + + async with async_session_maker() as async_session: + await clean_db(async_session) diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index 4fd2add6..e1be8494 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -304,3 +304,72 @@ async def test_get_job_hierarchy_with_depth_on_boundary( }, "nodes": {"jobs": jobs_to_json([expected_job])}, } + + +@pytest.mark.parametrize( + ["direction", "depth", "start_node_idx", "expected_deps"], + [ + ("UPSTREAM", 1, 1, [(0, 1, "INDIRECT_DEPENDENCY")]), + ("DOWNSTREAM", 1, 3, [(3, 4, "INDIRECT_DEPENDENCY")]), + ( + "BOTH", + 2, + 2, + [ + (1, 2, "DIRECT_DEPENDENCY"), + (2, 3, "DIRECT_DEPENDENCY"), + (3, 4, "INDIRECT_DEPENDENCY"), + (0, 1, "INDIRECT_DEPENDENCY"), + ], + ), + ], + ids=["indirect-upstream", "indirect-downstream", "indirect-both"], +) +async def test_get_job_hierarchy_with_indirect_dependencies( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, + direction: str, + depth: int, + start_node_idx: int, + expected_deps: list[tuple[int, int, str]], +): + dags, tasks, sparks = job_dependency_chain_with_indirect_dependencies + start_node = tasks[start_node_idx] + + expected_ids = set() + for from_idx, to_idx, _ in expected_deps: + expected_ids.add(from_idx) + expected_ids.add(to_idx) + expected_dags = [dags[idx] for idx in expected_ids] + expected_tasks = [tasks[idx] for idx in expected_ids] + expected_sparks = [sparks[start_node_idx]] + expected_nodes = await enrich_jobs(expected_dags + expected_tasks + expected_sparks, async_session) + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": direction, + "depth": depth, + "include_indirect": True, + }, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": jobs_ancestors_to_json(expected_nodes), + "dependencies": [ + { + "from": {"kind": "JOB", "id": str(tasks[from_idx].id)}, + "to": {"kind": "JOB", "id": str(tasks[to_idx].id)}, + "type": dep_type, + } + for from_idx, to_idx, dep_type in expected_deps + ], + }, + "nodes": {"jobs": jobs_to_json(expected_nodes)}, + } From 99a881772681e29d28b12050528417a6202aca11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Mar 2026 14:19:24 +0300 Subject: [PATCH 2/5] [DOP-34706] change scheme and add since/until params --- .../db/repositories/job_dependency.py | 8 ++--- data_rentgen/server/api/v1/router/job.py | 2 +- data_rentgen/server/schemas/v1/job.py | 20 ++++++++++-- data_rentgen/server/services/job.py | 4 +-- .../test_jobs/test_job_hierarchy.py | 31 +++++++++++++++++-- 5 files changed, 53 insertions(+), 12 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index eea95cb3..c3a4ad60 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -82,9 +82,9 @@ async def get_dependencies( since: datetime | None = None, until: datetime | None = None, *, - include_indirect: bool = False, + infer_from_lineage: bool = False, ) -> list[JobDependency]: - core_query = self._get_core_hierarchy_query(include_indirect=include_indirect) + core_query = self._get_core_hierarchy_query(include_indirect=infer_from_lineage) query: Select | CompoundSelect match direction: @@ -101,8 +101,8 @@ async def get_dependencies( query, {"job_ids": job_ids, "depth": depth, "since": since, "until": until} ) return [ - JobDependency(from_job_id=from_job_id, to_job_id=to_job_id, type=type_) - for (from_job_id, to_job_id, type_, _) in result.all() + JobDependency(from_job_id=item.from_job_id, to_job_id=item.to_job_id, type=item.type) + for item in result.all() ] async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: diff --git a/data_rentgen/server/api/v1/router/job.py b/data_rentgen/server/api/v1/router/job.py index fa13a51a..01fe9220 100644 --- a/data_rentgen/server/api/v1/router/job.py +++ b/data_rentgen/server/api/v1/router/job.py @@ -95,7 +95,7 @@ async def get_job_hierarchy( start_node_id=query_args.start_node_id, direction=query_args.direction, depth=query_args.depth, - include_indirect=query_args.include_indirect, + infer_from_lineage=query_args.infer_from_lineage, since=query_args.since, until=query_args.until, ) diff --git a/data_rentgen/server/schemas/v1/job.py b/data_rentgen/server/schemas/v1/job.py index 4f10f39d..44bf6242 100644 --- a/data_rentgen/server/schemas/v1/job.py +++ b/data_rentgen/server/schemas/v1/job.py @@ -5,7 +5,7 @@ from datetime import datetime from typing import Literal -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator, model_validator from data_rentgen.server.schemas.v1.job_response import JobResponseV1 from data_rentgen.server.schemas.v1.pagination import PaginateQueryV1 @@ -119,7 +119,7 @@ class JobHierarchyQueryV1(BaseModel): examples=["DOWNSTREAM", "UPSTREAM", "BOTH"], ) depth: int = Field(description="Levels of dependencies to dive into", default=1) - include_indirect: bool = Field( + infer_from_lineage: bool = Field( default=False, description="Include or not indirect connections between jobs", examples=[True, False], @@ -136,3 +136,19 @@ class JobHierarchyQueryV1(BaseModel): ) model_config = ConfigDict(extra="forbid") + + @field_validator("until", mode="after") + @classmethod + def _check_until(cls, value: datetime | None, info: ValidationInfo) -> datetime | None: + since = info.data.get("since") + if since and value and since.timestamp() >= value.timestamp(): + msg = "'since' should be less than 'until'" + raise ValueError(msg) + return value + + @model_validator(mode="after") + def _check_indirect_flag(self): + if self.infer_from_lineage and not self.since: + msg = "Inferring from lineage graph only possible with 'since' param" + raise ValueError(msg) + return self diff --git a/data_rentgen/server/services/job.py b/data_rentgen/server/services/job.py index f1eb95cd..380702c8 100644 --- a/data_rentgen/server/services/job.py +++ b/data_rentgen/server/services/job.py @@ -116,7 +116,7 @@ async def get_jobs_hierarchy( since: datetime | None = None, until: datetime | None = None, *, - include_indirect: bool = False, + infer_from_lineage: bool = False, ) -> JobHierarchyResult: logger.info( "Get jobs hierarchy with start at job with id %s, direction %s, depth %s", @@ -138,7 +138,7 @@ async def get_jobs_hierarchy( job_ids=list(job_ids), direction=direction, depth=depth, - include_indirect=include_indirect, + infer_from_lineage=infer_from_lineage, since=since, until=until, ) diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index e1be8494..5bbc7538 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -1,6 +1,6 @@ # SPDX-FileCopyrightText: 2024-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 - +from datetime import UTC, datetime from http import HTTPStatus import pytest @@ -310,7 +310,25 @@ async def test_get_job_hierarchy_with_depth_on_boundary( ["direction", "depth", "start_node_idx", "expected_deps"], [ ("UPSTREAM", 1, 1, [(0, 1, "INDIRECT_DEPENDENCY")]), + ( + "UPSTREAM", + 2, + 2, + [ + (1, 2, "DIRECT_DEPENDENCY"), + (0, 1, "INDIRECT_DEPENDENCY"), + ], + ), ("DOWNSTREAM", 1, 3, [(3, 4, "INDIRECT_DEPENDENCY")]), + ( + "DOWNSTREAM", + 2, + 2, + [ + (2, 3, "DIRECT_DEPENDENCY"), + (3, 4, "INDIRECT_DEPENDENCY"), + ], + ), ( "BOTH", 2, @@ -323,7 +341,13 @@ async def test_get_job_hierarchy_with_depth_on_boundary( ], ), ], - ids=["indirect-upstream", "indirect-downstream", "indirect-both"], + ids=[ + "indirect-upstream-depth-1", + "indirect-upstream-depth-2", + "indirect-downstream-depth-1", + "indirect-downstream-depth-2", + "indirect-both-depth-2", + ], ) async def test_get_job_hierarchy_with_indirect_dependencies( test_client: AsyncClient, @@ -354,7 +378,8 @@ async def test_get_job_hierarchy_with_indirect_dependencies( "start_node_id": start_node.id, "direction": direction, "depth": depth, - "include_indirect": True, + "infer_from_lineage": True, + "since": datetime.min.replace(tzinfo=UTC).isoformat(), }, ) From 0acf303c37874bdaf3318822162ca345cc2ca225 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Mar 2026 16:42:34 +0300 Subject: [PATCH 3/5] [DOP-34706] add tests --- .../test_jobs/test_job_hierarchy.py | 153 +++++++++++++++++- 1 file changed, 150 insertions(+), 3 deletions(-) diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index 5bbc7538..3a012b2d 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -1,15 +1,16 @@ # SPDX-FileCopyrightText: 2024-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from http import HTTPStatus import pytest from httpx import AsyncClient +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from data_rentgen.db.models import Job +from data_rentgen.db.models import Input, Job, Output from tests.fixtures.mocks import MockedUser -from tests.test_server.utils.convert_to_json import jobs_ancestors_to_json, jobs_to_json +from tests.test_server.utils.convert_to_json import format_datetime, jobs_ancestors_to_json, jobs_to_json from tests.test_server.utils.enrich import enrich_jobs pytestmark = [pytest.mark.server, pytest.mark.asyncio] @@ -398,3 +399,149 @@ async def test_get_job_hierarchy_with_indirect_dependencies( }, "nodes": {"jobs": jobs_to_json(expected_nodes)}, } + + +async def test_get_job_hierarchy_with_indirect_dependencies_with_since_and_until( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, +): + dags, tasks, sparks = job_dependency_chain_with_indirect_dependencies + start_node = tasks[2] + + # Cover both indirect links connected to task0 and task4. + edge_task_ids = [tasks[0].id, tasks[4].id] + min_input_created_at = await async_session.scalar( + select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)), + ) - timedelta(seconds=2) + max_output_created_at = await async_session.scalar( + select(func.max(Output.created_at)).where(Output.job_id.in_(edge_task_ids)), + ) + timedelta(seconds=2) + + expected_nodes = await enrich_jobs([*dags[1:4], *tasks[1:4], sparks[2]], async_session) + expected_deps = [ + (1, 2, "DIRECT_DEPENDENCY"), + (2, 3, "DIRECT_DEPENDENCY"), + ] + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + "since": max_output_created_at.isoformat(), + "until": min_input_created_at.isoformat(), + }, + ) + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": jobs_ancestors_to_json(expected_nodes), + "dependencies": [ + { + "from": {"kind": "JOB", "id": str(tasks[from_idx].id)}, + "to": {"kind": "JOB", "id": str(tasks[to_idx].id)}, + "type": dep_type, + } + for from_idx, to_idx, dep_type in expected_deps + ], + }, + "nodes": {"jobs": jobs_to_json(expected_nodes)}, + } + + +async def test_get_job_hierarchy_with_indirect_dependencies_without_since( + test_client: AsyncClient, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, +): + _, tasks, _ = job_dependency_chain_with_indirect_dependencies + start_node = tasks[2] + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + }, + ) + + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY, response.json() + assert response.json() == { + "error": { + "code": "invalid_request", + "details": [ + { + "code": "value_error", + "context": {}, + "input": { + "depth": 2, + "direction": "BOTH", + "infer_from_lineage": True, + "since": None, + "start_node_id": start_node.id, + "until": None, + }, + "location": [], + "message": "Value error, Inferring from lineage graph only possible with 'since' param", + }, + ], + "message": "Invalid request", + }, + } + + +async def test_get_job_hierarchy_with_indirect_dependencies_since_less_then_until( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, +): + _, tasks, _ = job_dependency_chain_with_indirect_dependencies + start_node = tasks[2] + + edge_task_ids = [tasks[0].id, tasks[4].id] + min_input_created_at = await async_session.scalar( + select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)), + ) - timedelta(seconds=2) + max_output_created_at = await async_session.scalar( + select(func.max(Output.created_at)).where(Output.job_id.in_(edge_task_ids)), + ) + timedelta(seconds=2) + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + "since": min_input_created_at.isoformat(), + "until": max_output_created_at.isoformat(), + }, + ) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY, response.json() + assert response.json() == { + "error": { + "code": "invalid_request", + "details": [ + { + "code": "value_error", + "context": {}, + "input": format_datetime(max_output_created_at), + "location": [ + "until", + ], + "message": "Value error, 'since' should be less than 'until'", + }, + ], + "message": "Invalid request", + }, + } From c542c3d87121e632b3f0bc160d88a5079f0f48d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 26 Mar 2026 16:47:46 +0300 Subject: [PATCH 4/5] [DOP-34706] rename type;add changelog --- data_rentgen/db/repositories/job_dependency.py | 2 +- docs/changelog/next_release/416.improvement.rst | 2 ++ docs/entities/index.rst | 2 +- tests/test_server/test_jobs/test_job_hierarchy.py | 12 ++++++------ 4 files changed, 10 insertions(+), 8 deletions(-) create mode 100644 docs/changelog/next_release/416.improvement.rst diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index c3a4ad60..63912bbf 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -140,7 +140,7 @@ def _get_core_hierarchy_query( select( Output.job_id.label("from_job_id"), Input.job_id.label("to_job_id"), - literal("INDIRECT_DEPENDENCY").label("type"), + literal("INFERRED_FROM_LINEAGE").label("type"), ) .join(Input, Output.operation_id == Input.operation_id) .where( diff --git a/docs/changelog/next_release/416.improvement.rst b/docs/changelog/next_release/416.improvement.rst new file mode 100644 index 00000000..a5726d8f --- /dev/null +++ b/docs/changelog/next_release/416.improvement.rst @@ -0,0 +1,2 @@ +Added indirect job hierarchy dependencies inferred from lineage IO relations, with optional ``since``/``until`` bounds and ``infer_from_lineage`` request flag for ``/v1/jobs/hierarchy``. + diff --git a/docs/entities/index.rst b/docs/entities/index.rst index 65198ec6..970ee047 100644 --- a/docs/entities/index.rst +++ b/docs/entities/index.rst @@ -331,7 +331,7 @@ It contains following fields: - ``from: Job | Run`` - entity which should be waited before current job/run will be started. - ``to: Job | Run`` - entity which waits. -- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INDIRECT_DEPENDENCY``. +- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INFERRED_FROM_LINEAGE``. .. image:: dependency_relation.png diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index 3a012b2d..ed5ddae5 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -310,24 +310,24 @@ async def test_get_job_hierarchy_with_depth_on_boundary( @pytest.mark.parametrize( ["direction", "depth", "start_node_idx", "expected_deps"], [ - ("UPSTREAM", 1, 1, [(0, 1, "INDIRECT_DEPENDENCY")]), + ("UPSTREAM", 1, 1, [(0, 1, "INFERRED_FROM_LINEAGE")]), ( "UPSTREAM", 2, 2, [ (1, 2, "DIRECT_DEPENDENCY"), - (0, 1, "INDIRECT_DEPENDENCY"), + (0, 1, "INFERRED_FROM_LINEAGE"), ], ), - ("DOWNSTREAM", 1, 3, [(3, 4, "INDIRECT_DEPENDENCY")]), + ("DOWNSTREAM", 1, 3, [(3, 4, "INFERRED_FROM_LINEAGE")]), ( "DOWNSTREAM", 2, 2, [ (2, 3, "DIRECT_DEPENDENCY"), - (3, 4, "INDIRECT_DEPENDENCY"), + (3, 4, "INFERRED_FROM_LINEAGE"), ], ), ( @@ -337,8 +337,8 @@ async def test_get_job_hierarchy_with_depth_on_boundary( [ (1, 2, "DIRECT_DEPENDENCY"), (2, 3, "DIRECT_DEPENDENCY"), - (3, 4, "INDIRECT_DEPENDENCY"), - (0, 1, "INDIRECT_DEPENDENCY"), + (3, 4, "INFERRED_FROM_LINEAGE"), + (0, 1, "INFERRED_FROM_LINEAGE"), ], ), ], From 2a0098b299a4fd0af0bf2d725931df3a1aa19293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Fri, 27 Mar 2026 11:16:36 +0300 Subject: [PATCH 5/5] [DOP-34706] refactored tests params --- data_rentgen/server/schemas/v1/job.py | 14 +-- tests/test_server/fixtures/factories/job.py | 6 +- .../test_jobs/test_job_hierarchy.py | 101 ++++++++---------- 3 files changed, 58 insertions(+), 63 deletions(-) diff --git a/data_rentgen/server/schemas/v1/job.py b/data_rentgen/server/schemas/v1/job.py index 44bf6242..71e4e2f4 100644 --- a/data_rentgen/server/schemas/v1/job.py +++ b/data_rentgen/server/schemas/v1/job.py @@ -5,7 +5,7 @@ from datetime import datetime from typing import Literal -from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator, model_validator +from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator from data_rentgen.server.schemas.v1.job_response import JobResponseV1 from data_rentgen.server.schemas.v1.pagination import PaginateQueryV1 @@ -146,9 +146,11 @@ def _check_until(cls, value: datetime | None, info: ValidationInfo) -> datetime raise ValueError(msg) return value - @model_validator(mode="after") - def _check_indirect_flag(self): - if self.infer_from_lineage and not self.since: - msg = "Inferring from lineage graph only possible with 'since' param" + @field_validator("since", mode="after") + @classmethod + def _check_since(cls, value: datetime | None, info: ValidationInfo) -> datetime | None: + infer_from_lineage = info.data.get("infer_from_lineage") + if infer_from_lineage and not value: + msg = "'since' is mandatory when 'infer_from_lineage' is used" raise ValueError(msg) - return self + return value diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index a3d77b45..f76e755c 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -471,7 +471,7 @@ async def job_dependency_chain( @pytest_asyncio.fixture -async def job_dependency_chain_with_indirect_dependencies( +async def job_dependency_chain_with_lineage( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], job_dependency_chain: tuple[tuple[Job, Job, Job], ...], ) -> AsyncGenerator[tuple[tuple[Job, Job, Job, Job, Job], ...], None]: @@ -481,8 +481,8 @@ async def job_dependency_chain_with_indirect_dependencies( - right: right_dag -> right_task -> right_spark The chains are connected to the central fixture on task level via IO relations: - - left_task -> task1 (indirect via input/output relation) - - task3 -> right_task (indirect via input/output relation) + - left_task -> task1 (inferred via input/output relation) + - task3 -> right_task (inferred via input/output relation) """ (dag1, dag2, dag3), (task1, task2, task3), (spark1, spark2, spark3) = job_dependency_chain diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index ed5ddae5..23e22bcd 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -208,22 +208,25 @@ async def test_get_job_hierarchy_with_direction_downstream( @pytest.mark.parametrize( ["depth", "direction", "expected_dep_indices", "expected_job_indices"], [ - (1, "DOWNSTREAM", [(2, 3)], [2, 3]), - (2, "DOWNSTREAM", [(2, 3), (3, 4)], [2, 3, 4]), - (1, "UPSTREAM", [(1, 2)], [1, 2]), - (2, "UPSTREAM", [(0, 1), (1, 2)], [0, 1, 2]), - (1, "BOTH", [(1, 2), (2, 3)], [1, 2, 3]), - (2, "BOTH", [(0, 1), (1, 2), (2, 3), (3, 4)], [0, 1, 2, 3, 4]), - (5, "BOTH", [(0, 1), (1, 2), (2, 3), (3, 4)], [0, 1, 2, 3, 4]), - ], - ids=[ - "depth_1-downstream", - "depth_2-downstream", - "depth_1-upstream", - "depth_2-upstream", - "depth_1-both", - "depth_2-both", - "depth_5-both", + pytest.param(1, "DOWNSTREAM", [(2, 3)], [2, 3], id="depth_1-downstream"), + pytest.param(2, "DOWNSTREAM", [(2, 3), (3, 4)], [2, 3, 4], id="depth_2-downstream"), + pytest.param(1, "UPSTREAM", [(1, 2)], [1, 2], id="depth_1-upstream"), + pytest.param(2, "UPSTREAM", [(0, 1), (1, 2)], [0, 1, 2], id="depth_2-upstream"), + pytest.param(1, "BOTH", [(1, 2), (2, 3)], [1, 2, 3], id="depth_1-both"), + pytest.param( + 2, + "BOTH", + [(0, 1), (1, 2), (2, 3), (3, 4)], + [0, 1, 2, 3, 4], + id="depth_2-both", + ), + pytest.param( + 5, + "BOTH", + [(0, 1), (1, 2), (2, 3), (3, 4)], + [0, 1, 2, 3, 4], + id="depth_5-both", + ), ], ) async def test_get_job_hierarchy_with_depth( @@ -270,10 +273,9 @@ async def test_get_job_hierarchy_with_depth( @pytest.mark.parametrize( ["direction", "start_node_index"], [ - ("UPSTREAM", 0), - ("DOWNSTREAM", 4), + pytest.param("UPSTREAM", 0, id="upstream_boundary"), + pytest.param("DOWNSTREAM", 4, id="downstream_boundary"), ], - ids=["upstream_boundary", "downstream_boundary"], ) async def test_get_job_hierarchy_with_depth_on_boundary( test_client: AsyncClient, @@ -310,8 +312,8 @@ async def test_get_job_hierarchy_with_depth_on_boundary( @pytest.mark.parametrize( ["direction", "depth", "start_node_idx", "expected_deps"], [ - ("UPSTREAM", 1, 1, [(0, 1, "INFERRED_FROM_LINEAGE")]), - ( + pytest.param("UPSTREAM", 1, 1, [(0, 1, "INFERRED_FROM_LINEAGE")], id="inferred-upstream-depth-1"), + pytest.param( "UPSTREAM", 2, 2, @@ -319,9 +321,10 @@ async def test_get_job_hierarchy_with_depth_on_boundary( (1, 2, "DIRECT_DEPENDENCY"), (0, 1, "INFERRED_FROM_LINEAGE"), ], + id="inferred-upstream-depth-2", ), - ("DOWNSTREAM", 1, 3, [(3, 4, "INFERRED_FROM_LINEAGE")]), - ( + pytest.param("DOWNSTREAM", 1, 3, [(3, 4, "INFERRED_FROM_LINEAGE")], id="inferred-downstream-depth-1"), + pytest.param( "DOWNSTREAM", 2, 2, @@ -329,8 +332,9 @@ async def test_get_job_hierarchy_with_depth_on_boundary( (2, 3, "DIRECT_DEPENDENCY"), (3, 4, "INFERRED_FROM_LINEAGE"), ], + id="inferred-downstream-depth-2", ), - ( + pytest.param( "BOTH", 2, 2, @@ -340,27 +344,21 @@ async def test_get_job_hierarchy_with_depth_on_boundary( (3, 4, "INFERRED_FROM_LINEAGE"), (0, 1, "INFERRED_FROM_LINEAGE"), ], + id="inferred-both-depth-2", ), ], - ids=[ - "indirect-upstream-depth-1", - "indirect-upstream-depth-2", - "indirect-downstream-depth-1", - "indirect-downstream-depth-2", - "indirect-both-depth-2", - ], ) -async def test_get_job_hierarchy_with_indirect_dependencies( +async def test_get_job_hierarchy_with_inferred_dependencies( test_client: AsyncClient, async_session: AsyncSession, - job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + job_dependency_chain_with_lineage: tuple[tuple[Job, Job, Job, Job, Job], ...], mocked_user: MockedUser, direction: str, depth: int, start_node_idx: int, expected_deps: list[tuple[int, int, str]], ): - dags, tasks, sparks = job_dependency_chain_with_indirect_dependencies + dags, tasks, sparks = job_dependency_chain_with_lineage start_node = tasks[start_node_idx] expected_ids = set() @@ -401,16 +399,16 @@ async def test_get_job_hierarchy_with_indirect_dependencies( } -async def test_get_job_hierarchy_with_indirect_dependencies_with_since_and_until( +async def test_get_job_hierarchy_with_inferred_dependencies_with_since_and_until( test_client: AsyncClient, async_session: AsyncSession, - job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + job_dependency_chain_with_lineage: tuple[tuple[Job, Job, Job, Job, Job], ...], mocked_user: MockedUser, ): - dags, tasks, sparks = job_dependency_chain_with_indirect_dependencies + dags, tasks, sparks = job_dependency_chain_with_lineage start_node = tasks[2] - # Cover both indirect links connected to task0 and task4. + # Cover both inferred links connected to task0 and task4. edge_task_ids = [tasks[0].id, tasks[4].id] min_input_created_at = await async_session.scalar( select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)), @@ -454,12 +452,12 @@ async def test_get_job_hierarchy_with_indirect_dependencies_with_since_and_until } -async def test_get_job_hierarchy_with_indirect_dependencies_without_since( +async def test_get_job_hierarchy_with_inferred_dependencies_without_since( test_client: AsyncClient, - job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + job_dependency_chain_with_lineage: tuple[tuple[Job, Job, Job, Job, Job], ...], mocked_user: MockedUser, ): - _, tasks, _ = job_dependency_chain_with_indirect_dependencies + _, tasks, _ = job_dependency_chain_with_lineage start_node = tasks[2] response = await test_client.get( @@ -481,16 +479,11 @@ async def test_get_job_hierarchy_with_indirect_dependencies_without_since( { "code": "value_error", "context": {}, - "input": { - "depth": 2, - "direction": "BOTH", - "infer_from_lineage": True, - "since": None, - "start_node_id": start_node.id, - "until": None, - }, - "location": [], - "message": "Value error, Inferring from lineage graph only possible with 'since' param", + "input": None, + "location": [ + "since", + ], + "message": "Value error, 'since' is mandatory when 'infer_from_lineage' is used", }, ], "message": "Invalid request", @@ -498,13 +491,13 @@ async def test_get_job_hierarchy_with_indirect_dependencies_without_since( } -async def test_get_job_hierarchy_with_indirect_dependencies_since_less_then_until( +async def test_get_job_hierarchy_with_inferred_dependencies_since_less_then_until( test_client: AsyncClient, async_session: AsyncSession, - job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + job_dependency_chain_with_lineage: tuple[tuple[Job, Job, Job, Job, Job], ...], mocked_user: MockedUser, ): - _, tasks, _ = job_dependency_chain_with_indirect_dependencies + _, tasks, _ = job_dependency_chain_with_lineage start_node = tasks[2] edge_task_ids = [tasks[0].id, tasks[4].id]