diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 7606d9c1..63912bbf 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -1,11 +1,28 @@ # SPDX-FileCopyrightText: 2024-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from datetime import datetime from typing import Literal -from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, literal, select, tuple_ -from sqlalchemy.orm import aliased +from sqlalchemy import ( + ARRAY, + CTE, + CompoundSelect, + DateTime, + Integer, + Select, + any_, + bindparam, + cast, + func, + literal, + or_, + select, + tuple_, +) +from data_rentgen.db.models.input import Input from data_rentgen.db.models.job_dependency import JobDependency +from data_rentgen.db.models.output import Output from data_rentgen.db.repositories.base import Repository from data_rentgen.dto import JobDependencyDTO @@ -27,59 +44,6 @@ JobDependency.to_job_id == bindparam("to_job_id"), ) -upstream_jobs_query_base_part = ( - select( - JobDependency, - literal(1).label("depth"), - ) - .select_from(JobDependency) - .where(JobDependency.to_job_id == any_(bindparam("job_ids"))) -) -upstream_jobs_query_cte = upstream_jobs_query_base_part.cte(name="upstream_jobs_query", recursive=True) - -upstream_jobs_query_recursive_part = ( - select( - JobDependency, - (upstream_jobs_query_cte.c.depth + 1).label("depth"), - ) - .select_from(JobDependency) - .where( - upstream_jobs_query_cte.c.depth < bindparam("depth"), - JobDependency.to_job_id == upstream_jobs_query_cte.c.from_job_id, - ) -) - - -upstream_jobs_query_cte = upstream_jobs_query_cte.union(upstream_jobs_query_recursive_part) -upstream_entities_query = select(aliased(JobDependency, upstream_jobs_query_cte)) - -downstream_jobs_query_base_part = ( - select( - JobDependency, - literal(1).label("depth"), - ) - .select_from(JobDependency) - .where(JobDependency.from_job_id == any_(bindparam("job_ids"))) -) -downstream_jobs_query_cte = downstream_jobs_query_base_part.cte(name="downstream_jobs_query", recursive=True) - -downstream_jobs_query_recursive_part = ( - select( - JobDependency, - (downstream_jobs_query_cte.c.depth + 1).label("depth"), - ) - .select_from(JobDependency) - .where( - downstream_jobs_query_cte.c.depth < bindparam("depth"), - JobDependency.from_job_id == downstream_jobs_query_cte.c.to_job_id, - ) -) - -downstream_jobs_query_cte = downstream_jobs_query_cte.union(downstream_jobs_query_recursive_part) -downstream_entities_query = select(aliased(JobDependency, downstream_jobs_query_cte)) - -both_entities_query = select(aliased(JobDependency, (upstream_entities_query.union(downstream_entities_query)).cte())) - class JobDependencyRepository(Repository[JobDependency]): async def fetch_bulk( @@ -115,18 +79,31 @@ async def get_dependencies( job_ids: list[int], direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"], depth: int, + since: datetime | None = None, + until: datetime | None = None, + *, + infer_from_lineage: bool = False, ) -> list[JobDependency]: + core_query = self._get_core_hierarchy_query(include_indirect=infer_from_lineage) + query: Select | CompoundSelect match direction: case "UPSTREAM": - query = upstream_entities_query + query = self._get_upstream_hierarchy_query(core_query) case "DOWNSTREAM": - query = downstream_entities_query + query = self._get_downstream_hierarchy_query(core_query) case "BOTH": - query = both_entities_query + query = self._get_upstream_hierarchy_query(core_query).union( + self._get_downstream_hierarchy_query(core_query) + ) - result = await self._session.scalars(query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) + result = await self._session.execute( + query, {"job_ids": job_ids, "depth": depth, "since": since, "until": until} + ) + return [ + JobDependency(from_job_id=item.from_job_id, to_job_id=item.to_job_id, type=item.type) + for item in result.all() + ] async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: return await self._session.scalar( @@ -146,3 +123,98 @@ async def _create(self, job_dependency: JobDependencyDTO) -> JobDependency: self._session.add(result) await self._session.flush([result]) return result + + def _get_core_hierarchy_query( + self, + *, + include_indirect: bool = False, + ) -> CTE: + query: Select | CompoundSelect + query = select( + JobDependency.from_job_id, + JobDependency.to_job_id, + JobDependency.type, + ) + if include_indirect: + query = query.union( + select( + Output.job_id.label("from_job_id"), + Input.job_id.label("to_job_id"), + literal("INFERRED_FROM_LINEAGE").label("type"), + ) + .join(Input, Output.operation_id == Input.operation_id) + .where( + or_( + bindparam("since", type_=DateTime(timezone=True)).is_(None), + Input.created_at >= bindparam("since"), + ), + or_( + bindparam("until", type_=DateTime(timezone=True)).is_(None), + Output.created_at <= bindparam("until"), + ), + Output.created_at >= Input.created_at, + ) + ) + return query.cte("jobs_hierarchy_core_query") + + def _get_upstream_hierarchy_query( + self, + core_query: CTE, + ) -> Select: + base_part = select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + literal(1).label("depth"), + ).where(core_query.c.to_job_id == any_(bindparam("job_ids"))) + + base_query_cte = base_part.cte(name="upstream_jobs_query", recursive=True) + + recursive_part = ( + select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + (base_query_cte.c.depth + 1).label("depth"), + ) + .join( + core_query, + core_query.c.to_job_id == base_query_cte.c.from_job_id, + ) + .where( + base_query_cte.c.depth < bindparam("depth"), + ) + ) + + return select(base_query_cte.union(recursive_part)) + + def _get_downstream_hierarchy_query( + self, + core_query: CTE, + ) -> Select: + base_part = select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + literal(1).label("depth"), + ).where(core_query.c.from_job_id == any_(bindparam("job_ids"))) + + base_part_cte = base_part.cte(name="downstream_jobs_query", recursive=True) + + recursive_part = ( + select( + core_query.c.from_job_id.label("from_job_id"), + core_query.c.to_job_id.label("to_job_id"), + core_query.c.type.label("type"), + (base_part_cte.c.depth + 1).label("depth"), + ) + .join( + core_query, + core_query.c.from_job_id == base_part_cte.c.to_job_id, + ) + .where( + base_part_cte.c.depth < bindparam("depth"), + ) + ) + + return select(base_part_cte.union(recursive_part)) diff --git a/data_rentgen/server/api/v1/router/job.py b/data_rentgen/server/api/v1/router/job.py index a345149b..01fe9220 100644 --- a/data_rentgen/server/api/v1/router/job.py +++ b/data_rentgen/server/api/v1/router/job.py @@ -95,6 +95,9 @@ async def get_job_hierarchy( start_node_id=query_args.start_node_id, direction=query_args.direction, depth=query_args.depth, + infer_from_lineage=query_args.infer_from_lineage, + since=query_args.since, + until=query_args.until, ) return JobHierarchyResponseV1( relations=JobHierarchyRelationsV1( diff --git a/data_rentgen/server/schemas/v1/job.py b/data_rentgen/server/schemas/v1/job.py index 9d6de374..44bf6242 100644 --- a/data_rentgen/server/schemas/v1/job.py +++ b/data_rentgen/server/schemas/v1/job.py @@ -2,9 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations +from datetime import datetime from typing import Literal -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validator, model_validator from data_rentgen.server.schemas.v1.job_response import JobResponseV1 from data_rentgen.server.schemas.v1.pagination import PaginateQueryV1 @@ -118,4 +119,36 @@ class JobHierarchyQueryV1(BaseModel): examples=["DOWNSTREAM", "UPSTREAM", "BOTH"], ) depth: int = Field(description="Levels of dependencies to dive into", default=1) - model_config = ConfigDict(extra="ignore") + infer_from_lineage: bool = Field( + default=False, + description="Include or not indirect connections between jobs", + examples=[True, False], + ) + since: datetime | None = Field( + default=None, + description="", + examples=["2008-09-15T15:53:00+05:00"], + ) + until: datetime | None = Field( + default=None, + description="", + examples=["2008-09-15T15:53:00+05:00"], + ) + + model_config = ConfigDict(extra="forbid") + + @field_validator("until", mode="after") + @classmethod + def _check_until(cls, value: datetime | None, info: ValidationInfo) -> datetime | None: + since = info.data.get("since") + if since and value and since.timestamp() >= value.timestamp(): + msg = "'since' should be less than 'until'" + raise ValueError(msg) + return value + + @model_validator(mode="after") + def _check_indirect_flag(self): + if self.infer_from_lineage and not self.since: + msg = "Inferring from lineage graph only possible with 'since' param" + raise ValueError(msg) + return self diff --git a/data_rentgen/server/services/job.py b/data_rentgen/server/services/job.py index 7ca931a5..380702c8 100644 --- a/data_rentgen/server/services/job.py +++ b/data_rentgen/server/services/job.py @@ -3,6 +3,7 @@ import logging from collections.abc import Collection, Sequence from dataclasses import dataclass, field +from datetime import datetime from itertools import groupby from typing import Annotated, Literal @@ -112,6 +113,10 @@ async def get_jobs_hierarchy( start_node_id: int, direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"], depth: int, + since: datetime | None = None, + until: datetime | None = None, + *, + infer_from_lineage: bool = False, ) -> JobHierarchyResult: logger.info( "Get jobs hierarchy with start at job with id %s, direction %s, depth %s", @@ -133,6 +138,9 @@ async def get_jobs_hierarchy( job_ids=list(job_ids), direction=direction, depth=depth, + infer_from_lineage=infer_from_lineage, + since=since, + until=until, ) dependency_job_ids = {d.from_job_id for d in dependencies} | {d.to_job_id for d in dependencies} job_ids |= dependency_job_ids @@ -142,6 +150,9 @@ async def get_jobs_hierarchy( jobs = await self._uow.job.list_by_ids(list(job_ids)) return JobHierarchyResult( parents={(p.parent_job_id, p.child_job_id) for p in ancestor_relations + descendant_relations}, - dependencies={(d.from_job_id, d.to_job_id, d.type) for d in dependencies}, + dependencies={ + (d.from_job_id, d.to_job_id, d.type) + for d in sorted(dependencies, key=lambda x: (x.from_job_id, x.to_job_id, x.type)) + }, jobs=[JobServiceResult.from_orm(job) for job in jobs], ) diff --git a/docs/changelog/next_release/416.improvement.rst b/docs/changelog/next_release/416.improvement.rst new file mode 100644 index 00000000..a5726d8f --- /dev/null +++ b/docs/changelog/next_release/416.improvement.rst @@ -0,0 +1,2 @@ +Added indirect job hierarchy dependencies inferred from lineage IO relations, with optional ``since``/``until`` bounds and ``infer_from_lineage`` request flag for ``/v1/jobs/hierarchy``. + diff --git a/docs/entities/index.rst b/docs/entities/index.rst index 65198ec6..970ee047 100644 --- a/docs/entities/index.rst +++ b/docs/entities/index.rst @@ -331,7 +331,7 @@ It contains following fields: - ``from: Job | Run`` - entity which should be waited before current job/run will be started. - ``to: Job | Run`` - entity which waits. -- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INDIRECT_DEPENDENCY``. +- ``type: str`` - type of dependency, any arbitrary string provided by integration, usually something like ``DIRECT_DEPENDENCY``, ``INFERRED_FROM_LINEAGE``. .. image:: dependency_relation.png diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index 11adeea2..a3d77b45 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -1,14 +1,19 @@ from __future__ import annotations +from datetime import UTC, datetime, timedelta from random import randint from typing import TYPE_CHECKING import pytest_asyncio from data_rentgen.db.models import Job, JobDependency, TagValue +from data_rentgen.utils.uuid import generate_new_uuid from tests.test_server.fixtures.factories.base import random_string +from tests.test_server.fixtures.factories.dataset import create_dataset +from tests.test_server.fixtures.factories.input import create_input from tests.test_server.fixtures.factories.job_type import create_job_type from tests.test_server.fixtures.factories.location import create_location +from tests.test_server.fixtures.factories.output import create_output from tests.test_server.utils.delete import clean_db if TYPE_CHECKING: @@ -463,3 +468,132 @@ async def job_dependency_chain( async with async_session_maker() as async_session: await clean_db(async_session) + + +@pytest_asyncio.fixture +async def job_dependency_chain_with_indirect_dependencies( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], + job_dependency_chain: tuple[tuple[Job, Job, Job], ...], +) -> AsyncGenerator[tuple[tuple[Job, Job, Job, Job, Job], ...], None]: + """ + Extends `job_dependency_chain` with two extra parent-child chains: + - left: left_dag -> left_task -> left_spark + - right: right_dag -> right_task -> right_spark + + The chains are connected to the central fixture on task level via IO relations: + - left_task -> task1 (indirect via input/output relation) + - task3 -> right_task (indirect via input/output relation) + """ + (dag1, dag2, dag3), (task1, task2, task3), (spark1, spark2, spark3) = job_dependency_chain + + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type_dag = await create_job_type(async_session, {"type": "AIRFLOW_DAG"}) + job_type_task = await create_job_type(async_session, {"type": "AIRFLOW_TASK"}) + job_type_spark = await create_job_type(async_session, {"type": "SPARK_APPLICATION"}) + + left_dag = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "left_dag"}, + ) + left_task = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "left_task", "parent_job_id": left_dag.id}, + ) + left_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "left_spark", "parent_job_id": left_task.id}, + ) + + right_dag = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_dag.id, + job_kwargs={"name": "right_dag"}, + ) + right_task = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_task.id, + job_kwargs={"name": "right_task", "parent_job_id": right_dag.id}, + ) + right_spark = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type_spark.id, + job_kwargs={"name": "right_spark", "parent_job_id": right_task.id}, + ) + + left_dataset_location = await create_location(async_session) + left_dataset = await create_dataset(async_session, location_id=left_dataset_location.id) + right_dataset_location = await create_location(async_session) + right_dataset = await create_dataset(async_session, location_id=right_dataset_location.id) + + # Connect left chain to central chain: left_task -> task1 + left_created_at = datetime.now(tz=UTC) + left_operation_id = generate_new_uuid(left_created_at) + left_output = await create_output( + async_session, + output_kwargs={ + "created_at": left_created_at, + "operation_id": left_operation_id, + "run_id": generate_new_uuid(left_created_at), + "job_id": left_task.id, + "dataset_id": left_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": left_created_at - timedelta(seconds=1), + "operation_id": left_operation_id, + "run_id": left_output.run_id, + "job_id": task1.id, + "dataset_id": left_dataset.id, + "schema_id": None, + }, + ) + + # Connect central chain to right chain: task3 -> right_task + right_created_at = datetime.now(tz=UTC) + timedelta(seconds=10) + right_operation_id = generate_new_uuid(right_created_at) + right_output = await create_output( + async_session, + output_kwargs={ + "created_at": right_created_at, + "operation_id": right_operation_id, + "run_id": generate_new_uuid(right_created_at), + "job_id": task3.id, + "dataset_id": right_dataset.id, + "schema_id": None, + }, + ) + await create_input( + async_session, + input_kwargs={ + "created_at": right_created_at - timedelta(seconds=1), + "operation_id": right_operation_id, + "run_id": right_output.run_id, + "job_id": right_task.id, + "dataset_id": right_dataset.id, + "schema_id": None, + }, + ) + + async_session.expunge_all() + + yield ( + (left_dag, dag1, dag2, dag3, right_dag), + (left_task, task1, task2, task3, right_task), + (left_spark, spark1, spark2, spark3, right_spark), + ) + + async with async_session_maker() as async_session: + await clean_db(async_session) diff --git a/tests/test_server/test_jobs/test_job_hierarchy.py b/tests/test_server/test_jobs/test_job_hierarchy.py index 4fd2add6..ed5ddae5 100644 --- a/tests/test_server/test_jobs/test_job_hierarchy.py +++ b/tests/test_server/test_jobs/test_job_hierarchy.py @@ -1,15 +1,16 @@ # SPDX-FileCopyrightText: 2024-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 - +from datetime import UTC, datetime, timedelta from http import HTTPStatus import pytest from httpx import AsyncClient +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from data_rentgen.db.models import Job +from data_rentgen.db.models import Input, Job, Output from tests.fixtures.mocks import MockedUser -from tests.test_server.utils.convert_to_json import jobs_ancestors_to_json, jobs_to_json +from tests.test_server.utils.convert_to_json import format_datetime, jobs_ancestors_to_json, jobs_to_json from tests.test_server.utils.enrich import enrich_jobs pytestmark = [pytest.mark.server, pytest.mark.asyncio] @@ -304,3 +305,243 @@ async def test_get_job_hierarchy_with_depth_on_boundary( }, "nodes": {"jobs": jobs_to_json([expected_job])}, } + + +@pytest.mark.parametrize( + ["direction", "depth", "start_node_idx", "expected_deps"], + [ + ("UPSTREAM", 1, 1, [(0, 1, "INFERRED_FROM_LINEAGE")]), + ( + "UPSTREAM", + 2, + 2, + [ + (1, 2, "DIRECT_DEPENDENCY"), + (0, 1, "INFERRED_FROM_LINEAGE"), + ], + ), + ("DOWNSTREAM", 1, 3, [(3, 4, "INFERRED_FROM_LINEAGE")]), + ( + "DOWNSTREAM", + 2, + 2, + [ + (2, 3, "DIRECT_DEPENDENCY"), + (3, 4, "INFERRED_FROM_LINEAGE"), + ], + ), + ( + "BOTH", + 2, + 2, + [ + (1, 2, "DIRECT_DEPENDENCY"), + (2, 3, "DIRECT_DEPENDENCY"), + (3, 4, "INFERRED_FROM_LINEAGE"), + (0, 1, "INFERRED_FROM_LINEAGE"), + ], + ), + ], + ids=[ + "indirect-upstream-depth-1", + "indirect-upstream-depth-2", + "indirect-downstream-depth-1", + "indirect-downstream-depth-2", + "indirect-both-depth-2", + ], +) +async def test_get_job_hierarchy_with_indirect_dependencies( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, + direction: str, + depth: int, + start_node_idx: int, + expected_deps: list[tuple[int, int, str]], +): + dags, tasks, sparks = job_dependency_chain_with_indirect_dependencies + start_node = tasks[start_node_idx] + + expected_ids = set() + for from_idx, to_idx, _ in expected_deps: + expected_ids.add(from_idx) + expected_ids.add(to_idx) + expected_dags = [dags[idx] for idx in expected_ids] + expected_tasks = [tasks[idx] for idx in expected_ids] + expected_sparks = [sparks[start_node_idx]] + expected_nodes = await enrich_jobs(expected_dags + expected_tasks + expected_sparks, async_session) + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": direction, + "depth": depth, + "infer_from_lineage": True, + "since": datetime.min.replace(tzinfo=UTC).isoformat(), + }, + ) + + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": jobs_ancestors_to_json(expected_nodes), + "dependencies": [ + { + "from": {"kind": "JOB", "id": str(tasks[from_idx].id)}, + "to": {"kind": "JOB", "id": str(tasks[to_idx].id)}, + "type": dep_type, + } + for from_idx, to_idx, dep_type in expected_deps + ], + }, + "nodes": {"jobs": jobs_to_json(expected_nodes)}, + } + + +async def test_get_job_hierarchy_with_indirect_dependencies_with_since_and_until( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, +): + dags, tasks, sparks = job_dependency_chain_with_indirect_dependencies + start_node = tasks[2] + + # Cover both indirect links connected to task0 and task4. + edge_task_ids = [tasks[0].id, tasks[4].id] + min_input_created_at = await async_session.scalar( + select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)), + ) - timedelta(seconds=2) + max_output_created_at = await async_session.scalar( + select(func.max(Output.created_at)).where(Output.job_id.in_(edge_task_ids)), + ) + timedelta(seconds=2) + + expected_nodes = await enrich_jobs([*dags[1:4], *tasks[1:4], sparks[2]], async_session) + expected_deps = [ + (1, 2, "DIRECT_DEPENDENCY"), + (2, 3, "DIRECT_DEPENDENCY"), + ] + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + "since": max_output_created_at.isoformat(), + "until": min_input_created_at.isoformat(), + }, + ) + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": jobs_ancestors_to_json(expected_nodes), + "dependencies": [ + { + "from": {"kind": "JOB", "id": str(tasks[from_idx].id)}, + "to": {"kind": "JOB", "id": str(tasks[to_idx].id)}, + "type": dep_type, + } + for from_idx, to_idx, dep_type in expected_deps + ], + }, + "nodes": {"jobs": jobs_to_json(expected_nodes)}, + } + + +async def test_get_job_hierarchy_with_indirect_dependencies_without_since( + test_client: AsyncClient, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, +): + _, tasks, _ = job_dependency_chain_with_indirect_dependencies + start_node = tasks[2] + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + }, + ) + + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY, response.json() + assert response.json() == { + "error": { + "code": "invalid_request", + "details": [ + { + "code": "value_error", + "context": {}, + "input": { + "depth": 2, + "direction": "BOTH", + "infer_from_lineage": True, + "since": None, + "start_node_id": start_node.id, + "until": None, + }, + "location": [], + "message": "Value error, Inferring from lineage graph only possible with 'since' param", + }, + ], + "message": "Invalid request", + }, + } + + +async def test_get_job_hierarchy_with_indirect_dependencies_since_less_then_until( + test_client: AsyncClient, + async_session: AsyncSession, + job_dependency_chain_with_indirect_dependencies: tuple[tuple[Job, Job, Job, Job, Job], ...], + mocked_user: MockedUser, +): + _, tasks, _ = job_dependency_chain_with_indirect_dependencies + start_node = tasks[2] + + edge_task_ids = [tasks[0].id, tasks[4].id] + min_input_created_at = await async_session.scalar( + select(func.min(Input.created_at)).where(Input.job_id.in_(edge_task_ids)), + ) - timedelta(seconds=2) + max_output_created_at = await async_session.scalar( + select(func.max(Output.created_at)).where(Output.job_id.in_(edge_task_ids)), + ) + timedelta(seconds=2) + + response = await test_client.get( + "v1/jobs/hierarchy", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={ + "start_node_id": start_node.id, + "direction": "BOTH", + "depth": 2, + "infer_from_lineage": True, + "since": min_input_created_at.isoformat(), + "until": max_output_created_at.isoformat(), + }, + ) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY, response.json() + assert response.json() == { + "error": { + "code": "invalid_request", + "details": [ + { + "code": "value_error", + "context": {}, + "input": format_datetime(max_output_created_at), + "location": [ + "until", + ], + "message": "Value error, 'since' should be less than 'until'", + }, + ], + "message": "Invalid request", + }, + }