diff --git a/ENV.md b/ENV.md index b9d08ed1..4e3cf7ec 100644 --- a/ENV.md +++ b/ENV.md @@ -32,24 +32,25 @@ Task flags are used to enable/disable certain tasks. They are set to `1` to enab The following flags are available: -| Flag | Description | -|---------------------------------------|-------------------------------------------------------| -| `SCHEDULED_TASKS_FLAG` | All scheduled tasks. | -| `URL_HTML_TASK_FLAG` | URL HTML scraping task. | -| `URL_RECORD_TYPE_TASK_FLAG` | Automatically assigns Record Types to URLs. | -| `URL_AGENCY_IDENTIFICATION_TASK_FLAG` | Automatically assigns and suggests Agencies for URLs. | -| `URL_SUBMIT_APPROVED_TASK_FLAG` | Submits approved URLs to the Data Sources App. | -| `URL_MISC_METADATA_TASK_FLAG` | Adds misc metadata to URLs. | -| `URL_404_PROBE_TASK_FLAG` | Probes URLs for 404 errors. | -| `URL_AUTO_RELEVANCE_TASK_FLAG` | Automatically assigns Relevances to URLs. | -| `URL_PROBE_TASK_FLAG` | Probes URLs for web metadata. | -| `URL_ROOT_URL_TASK_FLAG` | Extracts and links Root URLs to URLs. | -| `SYNC_AGENCIES_TASK_FLAG` | Synchonize agencies from Data Sources App. | -| `SYNC_DATA_SOURCES_TASK_FLAG` | Synchonize data sources from Data Sources App. | -| `PUSH_TO_HUGGING_FACE_TASK_FLAG` | Pushes data to HuggingFace. | -| `POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG` | Populates the backlog snapshot. | -| `DELETE_OLD_LOGS_TASK_FLAG` | Deletes old logs. | -| `RUN_URL_TASKS_TASK_FLAG` | Runs URL tasks. | +| Flag | Description | +|---------------------------------------|--------------------------------------------------------| +| `SCHEDULED_TASKS_FLAG` | All scheduled tasks. | +| `URL_HTML_TASK_FLAG` | URL HTML scraping task. | +| `URL_RECORD_TYPE_TASK_FLAG` | Automatically assigns Record Types to URLs. | +| `URL_AGENCY_IDENTIFICATION_TASK_FLAG` | Automatically assigns and suggests Agencies for URLs. | +| `URL_SUBMIT_APPROVED_TASK_FLAG` | Submits approved URLs to the Data Sources App. | +| `URL_MISC_METADATA_TASK_FLAG` | Adds misc metadata to URLs. | +| `URL_404_PROBE_TASK_FLAG` | Probes URLs for 404 errors. | +| `URL_AUTO_RELEVANCE_TASK_FLAG` | Automatically assigns Relevances to URLs. | +| `URL_PROBE_TASK_FLAG` | Probes URLs for web metadata. | +| `URL_ROOT_URL_TASK_FLAG` | Extracts and links Root URLs to URLs. | +| `SYNC_AGENCIES_TASK_FLAG` | Synchonize agencies from Data Sources App. | +| `SYNC_DATA_SOURCES_TASK_FLAG` | Synchonize data sources from Data Sources App. | +| `PUSH_TO_HUGGING_FACE_TASK_FLAG` | Pushes data to HuggingFace. | +| `POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG` | Populates the backlog snapshot. | +| `DELETE_OLD_LOGS_TASK_FLAG` | Deletes old logs. | +| `RUN_URL_TASKS_TASK_FLAG` | Runs URL tasks. | +| `IA_PROBE_TASK_FLAG` | Extracts and links Internet Archives metadata to URLs. | ## Foreign Data Wrapper (FDW) diff --git a/alembic/versions/2025_08_14_0722-2a7192657354_add_internet_archive_tables.py b/alembic/versions/2025_08_14_0722-2a7192657354_add_internet_archive_tables.py new file mode 100644 index 00000000..afdaecbe --- /dev/null +++ b/alembic/versions/2025_08_14_0722-2a7192657354_add_internet_archive_tables.py @@ -0,0 +1,108 @@ +"""Add Internet Archive Tables + +Revision ID: 2a7192657354 +Revises: 49fd9f295b8d +Create Date: 2025-08-14 07:22:15.308210 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +from src.util.alembic_helpers import url_id_column, created_at_column, id_column, updated_at_column, switch_enum_type + +# revision identifiers, used by Alembic. +revision: str = '2a7192657354' +down_revision: Union[str, None] = '49fd9f295b8d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +IA_METADATA_TABLE_NAME = "urls_internet_archive_metadata" +IA_FLAGS_TABLE_NAME = "flag_url_checked_for_internet_archive" + +def upgrade() -> None: + _create_metadata_table() + _create_flags_table() + _add_internet_archives_task_enum() + +def downgrade() -> None: + op.drop_table(IA_METADATA_TABLE_NAME) + op.drop_table(IA_FLAGS_TABLE_NAME) + _remove_internet_archives_task_enum() + + +def _create_metadata_table(): + op.create_table( + IA_METADATA_TABLE_NAME, + id_column(), + url_id_column(), + sa.Column('archive_url', sa.String(), nullable=False), + sa.Column('digest', sa.String(), nullable=False), + sa.Column('length', sa.Integer(), nullable=False), + created_at_column(), + updated_at_column(), + sa.UniqueConstraint('url_id', name='uq_url_id_internet_archive_metadata') + ) + +def _add_internet_archives_task_enum(): + switch_enum_type( + table_name='tasks', + column_name='task_type', + enum_name='task_type', + new_enum_values=[ + 'HTML', + 'Relevancy', + 'Record Type', + 'Agency Identification', + 'Misc Metadata', + 'Submit Approved URLs', + 'Duplicate Detection', + '404 Probe', + 'Sync Agencies', + 'Sync Data Sources', + 'Push to Hugging Face', + 'URL Probe', + 'Populate Backlog Snapshot', + 'Delete Old Logs', + 'Run URL Task Cycles', + 'Root URL', + 'Internet Archives Probe', + 'Internet Archives Archive' + ] + ) + +def _remove_internet_archives_task_enum(): + switch_enum_type( + table_name='tasks', + column_name='task_type', + enum_name='task_type', + new_enum_values=[ + 'HTML', + 'Relevancy', + 'Record Type', + 'Agency Identification', + 'Misc Metadata', + 'Submit Approved URLs', + 'Duplicate Detection', + '404 Probe', + 'Sync Agencies', + 'Sync Data Sources', + 'Push to Hugging Face', + 'URL Probe', + 'Populate Backlog Snapshot', + 'Delete Old Logs', + 'Run URL Task Cycles', + 'Root URL', + ] + ) + +def _create_flags_table(): + op.create_table( + IA_FLAGS_TABLE_NAME, + url_id_column(), + sa.Column('success', sa.Boolean(), nullable=False), + created_at_column(), + sa.PrimaryKeyConstraint('url_id') + ) + diff --git a/pyproject.toml b/pyproject.toml index 15e3c8ea..3eb1446d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,6 +4,7 @@ version = "0.1.0" requires-python = ">=3.11" dependencies = [ "aiohttp~=3.11.11", + "aiolimiter>=1.2.1", "alembic~=1.14.0", "apscheduler~=3.11.0", "asyncpg~=0.30.0", diff --git a/src/api/main.py b/src/api/main.py index 384cb680..b6679827 100644 --- a/src/api/main.py +++ b/src/api/main.py @@ -32,6 +32,7 @@ from src.db.client.sync import DatabaseClient from src.external.huggingface.hub.client import HuggingFaceHubClient from src.external.huggingface.inference.client import HuggingFaceInferenceClient +from src.external.internet_archives.client import InternetArchivesClient from src.external.pdap.client import PDAPClient from src.external.url_request.core import URLRequestInterface @@ -81,7 +82,7 @@ async def lifespan(app: FastAPI): hf_inference_client=HuggingFaceInferenceClient( session=session, token=env_var_manager.hf_inference_api_key - ) + ), ), ) async_collector_manager = AsyncCollectorManager( @@ -104,6 +105,9 @@ async def lifespan(app: FastAPI): token=env_var_manager.hf_hub_token ), async_core=async_core, + ia_client=InternetArchivesClient( + session=session + ) ), registry=ScheduledJobRegistry() ) diff --git a/src/core/exceptions.py b/src/core/exceptions.py index e3e93e55..d4f9c4a8 100644 --- a/src/core/exceptions.py +++ b/src/core/exceptions.py @@ -3,10 +3,6 @@ from fastapi import HTTPException -class InvalidPreprocessorError(Exception): - pass - - class MuckrockAPIError(Exception): pass @@ -17,4 +13,4 @@ class MatchAgencyError(Exception): class FailedValidationException(HTTPException): def __init__(self, detail: str): - super().__init__(status_code=HTTPStatus.BAD_REQUEST, detail=detail) \ No newline at end of file + super().__init__(status_code=HTTPStatus.BAD_REQUEST, detail=detail) diff --git a/src/core/tasks/base/operator.py b/src/core/tasks/base/operator.py index ce0ee860..25f3fc5d 100644 --- a/src/core/tasks/base/operator.py +++ b/src/core/tasks/base/operator.py @@ -1,6 +1,7 @@ import traceback from abc import ABC, abstractmethod +from src.core.enums import BatchStatus from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from src.db.client.async_ import AsyncDatabaseClient @@ -9,8 +10,18 @@ class TaskOperatorBase(ABC): def __init__(self, adb_client: AsyncDatabaseClient): - self.adb_client = adb_client - self.task_id = None + self._adb_client = adb_client + self._task_id: int | None = None + + @property + def task_id(self) -> int: + if self._task_id is None: + raise AttributeError("Task id is not set. Call initiate_task_in_db() first.") + return self._task_id + + @property + def adb_client(self) -> AsyncDatabaseClient: + return self._adb_client @property @abstractmethod @@ -27,8 +38,8 @@ async def initiate_task_in_db(self) -> int: async def conclude_task(self): raise NotImplementedError - async def run_task(self, task_id: int) -> TaskOperatorRunInfo: - self.task_id = task_id + async def run_task(self) -> TaskOperatorRunInfo: + self._task_id = await self.initiate_task_in_db() try: await self.inner_task_logic() return await self.conclude_task() diff --git a/src/core/tasks/dtos/run_info.py b/src/core/tasks/dtos/run_info.py deleted file mode 100644 index 2296f65b..00000000 --- a/src/core/tasks/dtos/run_info.py +++ /dev/null @@ -1,10 +0,0 @@ -from typing import Optional - -from pydantic import BaseModel - -from src.core.tasks.base.run_info import TaskOperatorRunInfo -from src.core.tasks.url.enums import TaskOperatorOutcome - - -class URLTaskOperatorRunInfo(TaskOperatorRunInfo): - linked_url_ids: list[int] diff --git a/src/core/tasks/handler.py b/src/core/tasks/handler.py index 3e3aca77..7f488594 100644 --- a/src/core/tasks/handler.py +++ b/src/core/tasks/handler.py @@ -4,7 +4,6 @@ from src.core.enums import BatchStatus from src.core.tasks.base.run_info import TaskOperatorRunInfo -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType diff --git a/src/core/tasks/dtos/__init__.py b/src/core/tasks/mixins/__init__.py similarity index 100% rename from src/core/tasks/dtos/__init__.py rename to src/core/tasks/mixins/__init__.py diff --git a/src/core/tasks/mixins/link_urls.py b/src/core/tasks/mixins/link_urls.py new file mode 100644 index 00000000..f58a3dff --- /dev/null +++ b/src/core/tasks/mixins/link_urls.py @@ -0,0 +1,43 @@ +from abc import abstractmethod + +from src.db.client.async_ import AsyncDatabaseClient + + +class LinkURLsMixin: + + def __init__( + self, + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + self._urls_linked = False + self._linked_url_ids = [] + + @property + def urls_linked(self) -> bool: + return self._urls_linked + + @property + def linked_url_ids(self) -> list[int]: + return self._linked_url_ids + + @property + @abstractmethod + def adb_client(self) -> AsyncDatabaseClient: + raise NotImplementedError + + @property + @abstractmethod + def task_id(self) -> int: + raise NotImplementedError + + async def link_urls_to_task(self, url_ids: list[int]): + self._linked_url_ids = url_ids + if not hasattr(self, "linked_url_ids"): + raise AttributeError("Class does not have linked_url_ids attribute") + await self.adb_client.link_urls_to_task( + task_id=self.task_id, + url_ids=url_ids + ) + self._urls_linked = True \ No newline at end of file diff --git a/src/core/tasks/mixins/prereq.py b/src/core/tasks/mixins/prereq.py new file mode 100644 index 00000000..dcfec66b --- /dev/null +++ b/src/core/tasks/mixins/prereq.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class HasPrerequisitesMixin(ABC): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @abstractmethod + async def meets_task_prerequisites(self) -> bool: + """ + A task should not be initiated unless certain + conditions are met + """ + raise NotImplementedError \ No newline at end of file diff --git a/src/core/tasks/scheduled/enums.py b/src/core/tasks/scheduled/enums.py index 27d03be6..e011ab6e 100644 --- a/src/core/tasks/scheduled/enums.py +++ b/src/core/tasks/scheduled/enums.py @@ -2,5 +2,6 @@ class IntervalEnum(Enum): - DAILY = "DAILY" - HOURLY = "HOURLY" \ No newline at end of file + DAILY = 60 * 24 + HOURLY = 60 + TEN_MINUTES = 10 \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/internet_archives/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/archive/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/archive/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/archive/models/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/archive/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/archive/operator.py b/src/core/tasks/scheduled/impl/internet_archives/archive/operator.py new file mode 100644 index 00000000..1d823a34 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/archive/operator.py @@ -0,0 +1,31 @@ +from src.core.tasks.mixins.link_urls import LinkURLsMixin +from src.core.tasks.mixins.prereq import HasPrerequisitesMixin +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase +from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType +from src.external.internet_archives.client import InternetArchivesClient + + +class InternetArchivesArchiveTaskOperator( + ScheduledTaskOperatorBase, + HasPrerequisitesMixin, + LinkURLsMixin +): + + def __init__( + self, + adb_client: AsyncDatabaseClient, + ia_client: InternetArchivesClient + ): + super().__init__(adb_client) + self.ia_client = ia_client + + async def meets_task_prerequisites(self) -> bool: + raise NotImplementedError + + @property + def task_type(self) -> TaskType: + return TaskType.IA_ARCHIVE + + async def inner_task_logic(self) -> None: + raise NotImplementedError diff --git a/src/core/tasks/scheduled/impl/internet_archives/archive/queries/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/archive/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/probe/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/convert.py b/src/core/tasks/scheduled/impl/internet_archives/probe/convert.py new file mode 100644 index 00000000..aa0c03b6 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/convert.py @@ -0,0 +1,17 @@ +from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping +from src.db.models.impl.flag.checked_for_ia.pydantic import FlagURLCheckedForInternetArchivesPydantic +from src.db.models.impl.url.ia_metadata.pydantic import URLInternetArchiveMetadataPydantic +from src.util.url_mapper import URLMapper + + +def convert_ia_url_mapping_to_ia_metadata( + url_mapper: URLMapper, + ia_mapping: InternetArchivesURLMapping +) -> URLInternetArchiveMetadataPydantic: + iam = ia_mapping.ia_metadata + return URLInternetArchiveMetadataPydantic( + url_id=url_mapper.get_id(ia_mapping.url), + archive_url=iam.archive_url, + digest=iam.digest, + length=iam.length + ) diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/filter.py b/src/core/tasks/scheduled/impl/internet_archives/probe/filter.py new file mode 100644 index 00000000..2713b080 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/filter.py @@ -0,0 +1,16 @@ +from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping +from src.core.tasks.scheduled.impl.internet_archives.probe.models.subset import IAURLMappingSubsets + + +def filter_into_subsets( + ia_mappings: list[InternetArchivesURLMapping] +) -> IAURLMappingSubsets: + subsets = IAURLMappingSubsets() + for ia_mapping in ia_mappings: + if ia_mapping.has_error: + subsets.error.append(ia_mapping) + + if ia_mapping.has_metadata: + subsets.has_metadata.append(ia_mapping) + + return subsets diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/models/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/probe/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/models/subset.py b/src/core/tasks/scheduled/impl/internet_archives/probe/models/subset.py new file mode 100644 index 00000000..b01fd317 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/models/subset.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel + +from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping + + +class IAURLMappingSubsets(BaseModel): + error: list[InternetArchivesURLMapping] = [] + has_metadata: list[InternetArchivesURLMapping] = [] \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py b/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py new file mode 100644 index 00000000..1c280b39 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py @@ -0,0 +1,112 @@ +from tqdm.asyncio import tqdm_asyncio + +from src.core.tasks.mixins.link_urls import LinkURLsMixin +from src.core.tasks.mixins.prereq import HasPrerequisitesMixin +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.prereq import \ + CheckURLInternetArchivesTaskPrerequisitesQueryBuilder +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase +from src.core.tasks.scheduled.impl.internet_archives.probe.convert import convert_ia_url_mapping_to_ia_metadata +from src.core.tasks.scheduled.impl.internet_archives.probe.filter import filter_into_subsets +from src.core.tasks.scheduled.impl.internet_archives.probe.models.subset import IAURLMappingSubsets +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.get import GetURLsForInternetArchivesTaskQueryBuilder +from src.db.client.async_ import AsyncDatabaseClient +from src.db.dtos.url.mapping import URLMapping +from src.db.enums import TaskType +from src.db.models.impl.flag.checked_for_ia.pydantic import FlagURLCheckedForInternetArchivesPydantic +from src.db.models.impl.url.error_info.pydantic import URLErrorPydanticInfo +from src.db.models.impl.url.ia_metadata.pydantic import URLInternetArchiveMetadataPydantic +from src.external.internet_archives.client import InternetArchivesClient +from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping +from src.util.url_mapper import URLMapper + + +class InternetArchivesProbeTaskOperator( + ScheduledTaskOperatorBase, + HasPrerequisitesMixin, + LinkURLsMixin +): + + def __init__( + self, + adb_client: AsyncDatabaseClient, + ia_client: InternetArchivesClient + ): + super().__init__(adb_client) + self.ia_client = ia_client + + @property + def task_type(self) -> TaskType: + return TaskType.IA_PROBE + + async def meets_task_prerequisites(self) -> bool: + return await self.adb_client.run_query_builder( + CheckURLInternetArchivesTaskPrerequisitesQueryBuilder() + ) + + async def inner_task_logic(self) -> None: + url_mappings: list[URLMapping] = await self._get_url_mappings() + if len(url_mappings) == 0: + return + mapper = URLMapper(url_mappings) + + await self.link_urls_to_task(mapper.get_all_ids()) + + ia_mappings: list[InternetArchivesURLMapping] = await self._search_for_internet_archive_links(mapper.get_all_urls()) + await self._add_ia_flags_to_db(mapper, ia_mappings=ia_mappings) + + subsets: IAURLMappingSubsets = filter_into_subsets(ia_mappings) + await self._add_errors_to_db(mapper, ia_mappings=subsets.error) + await self._add_ia_metadata_to_db(mapper, ia_mappings=subsets.has_metadata) + + async def _add_errors_to_db(self, mapper: URLMapper, ia_mappings: list[InternetArchivesURLMapping]) -> None: + url_error_info_list: list[URLErrorPydanticInfo] = [] + for ia_mapping in ia_mappings: + url_id = mapper.get_id(ia_mapping.url) + url_error_info = URLErrorPydanticInfo( + url_id=url_id, + error=ia_mapping.error, + task_id=self.task_id + ) + url_error_info_list.append(url_error_info) + await self.adb_client.bulk_insert(url_error_info_list) + + async def _get_url_mappings(self) -> list[URLMapping]: + return await self.adb_client.run_query_builder( + GetURLsForInternetArchivesTaskQueryBuilder() + ) + + async def _search_for_internet_archive_links(self, urls: list[str]) -> list[InternetArchivesURLMapping]: + return await tqdm_asyncio.gather( + *[ + self.ia_client.search_for_url_snapshot(url) + for url in urls + ], + timeout=60 * 10 # 10 minutes + ) + + async def _add_ia_metadata_to_db( + self, + url_mapper: URLMapper, + ia_mappings: list[InternetArchivesURLMapping], + ) -> None: + insert_objects: list[URLInternetArchiveMetadataPydantic] = [ + convert_ia_url_mapping_to_ia_metadata( + url_mapper=url_mapper, + ia_mapping=ia_mapping + ) + for ia_mapping in ia_mappings + ] + await self.adb_client.bulk_insert(insert_objects) + + async def _add_ia_flags_to_db( + self, mapper: URLMapper, ia_mappings: list[InternetArchivesURLMapping]) -> None: + flags: list[FlagURLCheckedForInternetArchivesPydantic] = [] + for ia_mapping in ia_mappings: + url_id = mapper.get_id(ia_mapping.url) + flag = FlagURLCheckedForInternetArchivesPydantic( + url_id=url_id, + success=not ia_mapping.has_error + ) + flags.append(flag) + await self.adb_client.bulk_insert(flags) + diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/__init__.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py new file mode 100644 index 00000000..94f2ad5e --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py @@ -0,0 +1,33 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.dtos.url.mapping import URLMapping +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.queries.base.builder import QueryBuilderBase + +from src.db.helpers.session import session_helper as sh + +class GetURLsForInternetArchivesTaskQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> list[URLMapping]: + query = ( + select( + URL.id, + URL.url + ) + .outerjoin( + FlagURLCheckedForInternetArchives, + URL.id == FlagURLCheckedForInternetArchives.url_id + ) + .where(FlagURLCheckedForInternetArchives.url_id.is_(None)) + .limit(100) + ) + + db_mappings = await sh.mappings(session, query=query) + return [ + URLMapping( + url_id=mapping["id"], + url=mapping["url"] + ) for mapping in db_mappings + ] diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py new file mode 100644 index 00000000..a74dc0a6 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py @@ -0,0 +1,23 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.queries.base.builder import QueryBuilderBase + +from src.db.helpers.session import session_helper as sh + +class CheckURLInternetArchivesTaskPrerequisitesQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> bool: + query = ( + select(URL) + .outerjoin( + FlagURLCheckedForInternetArchives, + URL.id == FlagURLCheckedForInternetArchives.url_id + ) + .where(FlagURLCheckedForInternetArchives.url_id.is_(None)) + .limit(1) + ) + result = await sh.one_or_none(session, query=query) + return result is not None diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/upsert.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/upsert.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/loader.py b/src/core/tasks/scheduled/loader.py index 2d0cfd1a..cb98dff0 100644 --- a/src/core/tasks/scheduled/loader.py +++ b/src/core/tasks/scheduled/loader.py @@ -5,12 +5,14 @@ from src.core.tasks.scheduled.impl.backlog.operator import PopulateBacklogSnapshotTaskOperator from src.core.tasks.scheduled.impl.delete_logs.operator import DeleteOldLogsTaskOperator from src.core.tasks.scheduled.impl.huggingface.operator import PushToHuggingFaceTaskOperator +from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator from src.core.tasks.scheduled.impl.run_url_tasks.operator import RunURLTasksTaskOperator from src.core.tasks.scheduled.impl.sync.agency.operator import SyncAgenciesTaskOperator from src.core.tasks.scheduled.impl.sync.data_sources.operator import SyncDataSourcesTaskOperator from src.core.tasks.scheduled.models.entry import ScheduledTaskEntry from src.db.client.async_ import AsyncDatabaseClient from src.external.huggingface.hub.client import HuggingFaceHubClient +from src.external.internet_archives.client import InternetArchivesClient from src.external.pdap.client import PDAPClient @@ -21,13 +23,17 @@ def __init__( async_core: AsyncCore, adb_client: AsyncDatabaseClient, pdap_client: PDAPClient, - hf_client: HuggingFaceHubClient + hf_client: HuggingFaceHubClient, + ia_client: InternetArchivesClient ): # Dependencies self.async_core = async_core self.adb_client = adb_client self.pdap_client = pdap_client + + # External Interfaces self.hf_client = hf_client + self.ia_client = ia_client self.env = Env() self.env.read_env() @@ -42,13 +48,21 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: return [ ScheduledTaskEntry( - operator=DeleteOldLogsTaskOperator(adb_client=self.async_core.adb_client), + operator=InternetArchivesProbeTaskOperator( + adb_client=self.adb_client, + ia_client=self.ia_client + ), + interval=IntervalEnum.TEN_MINUTES, + enabled=self.env.bool("IA_PROBE_TASK_FLAG", default=True), + ), + ScheduledTaskEntry( + operator=DeleteOldLogsTaskOperator(adb_client=self.adb_client), interval=IntervalEnum.DAILY, enabled=self.env.bool("DELETE_OLD_LOGS_TASK_FLAG", default=True) ), ScheduledTaskEntry( operator=SyncDataSourcesTaskOperator( - adb_client=self.async_core.adb_client, + adb_client=self.adb_client, pdap_client=self.pdap_client ), interval=IntervalEnum.DAILY, diff --git a/src/core/tasks/scheduled/manager.py b/src/core/tasks/scheduled/manager.py index 0006af41..e97e0f8e 100644 --- a/src/core/tasks/scheduled/manager.py +++ b/src/core/tasks/scheduled/manager.py @@ -3,6 +3,8 @@ from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.handler import TaskHandler +from src.core.tasks.mixins.link_urls import LinkURLsMixin +from src.core.tasks.mixins.prereq import HasPrerequisitesMixin from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader from src.core.tasks.scheduled.models.entry import ScheduledTaskEntry from src.core.tasks.scheduled.registry.core import ScheduledJobRegistry @@ -53,6 +55,16 @@ def shutdown(self): async def run_task(self, operator: ScheduledTaskOperatorBase): print(f"Running {operator.task_type.value} Task") - task_id = await self._handler.initiate_task_in_db(task_type=operator.task_type) - run_info: TaskOperatorRunInfo = await operator.run_task(task_id) + if issubclass(operator.__class__, HasPrerequisitesMixin): + operator: HasPrerequisitesMixin + if not await operator.meets_task_prerequisites(): + operator: ScheduledTaskOperatorBase + print(f"Prerequisites not met for {operator.task_type.value} Task. Skipping.") + return + run_info: TaskOperatorRunInfo = await operator.run_task() + if issubclass(operator.__class__, LinkURLsMixin): + operator: LinkURLsMixin + if not operator.urls_linked: + operator: ScheduledTaskOperatorBase + raise Exception(f"Task {operator.task_type.value} has not been linked to any URLs but is designated as a link task") await self._handler.handle_outcome(run_info) diff --git a/src/core/tasks/scheduled/models/entry.py b/src/core/tasks/scheduled/models/entry.py index e3d647d0..22430a42 100644 --- a/src/core/tasks/scheduled/models/entry.py +++ b/src/core/tasks/scheduled/models/entry.py @@ -1,5 +1,3 @@ -from typing import Any - from pydantic import BaseModel from src.core.tasks.scheduled.enums import IntervalEnum diff --git a/src/core/tasks/scheduled/registry/convert.py b/src/core/tasks/scheduled/registry/convert.py deleted file mode 100644 index 866e536a..00000000 --- a/src/core/tasks/scheduled/registry/convert.py +++ /dev/null @@ -1,11 +0,0 @@ -from src.core.tasks.scheduled.enums import IntervalEnum - - -def convert_interval_enum_to_hours(interval: IntervalEnum) -> int: - match interval: - case IntervalEnum.DAILY: - return 24 - case IntervalEnum.HOURLY: - return 1 - case _: - raise ValueError(f"Invalid interval: {interval}") \ No newline at end of file diff --git a/src/core/tasks/scheduled/registry/core.py b/src/core/tasks/scheduled/registry/core.py index a7af830f..a1928504 100644 --- a/src/core/tasks/scheduled/registry/core.py +++ b/src/core/tasks/scheduled/registry/core.py @@ -1,11 +1,10 @@ from datetime import datetime, timedelta -from typing import Awaitable, Callable +from typing import Callable from apscheduler.job import Job from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger -from src.core.tasks.scheduled.registry.convert import convert_interval_enum_to_hours from src.core.tasks.scheduled.models.entry import ScheduledTaskEntry from src.db.enums import TaskType @@ -33,7 +32,7 @@ async def add_job( self._jobs[entry.operator.task_type] = self.scheduler.add_job( func, trigger=IntervalTrigger( - hours=convert_interval_enum_to_hours(entry.interval), + minutes=entry.interval.value, start_date=datetime.now() + timedelta(minutes=minute_lag) ), misfire_grace_time=60, diff --git a/src/core/tasks/url/loader.py b/src/core/tasks/url/loader.py index 2203674d..45f750af 100644 --- a/src/core/tasks/url/loader.py +++ b/src/core/tasks/url/loader.py @@ -20,6 +20,7 @@ from src.core.tasks.url.operators.submit_approved.core import SubmitApprovedURLTaskOperator from src.db.client.async_ import AsyncDatabaseClient from src.external.huggingface.inference.client import HuggingFaceInferenceClient +from src.external.internet_archives.client import InternetArchivesClient from src.external.pdap.client import PDAPClient from src.external.url_request.core import URLRequestInterface @@ -33,7 +34,7 @@ def __init__( html_parser: HTMLResponseParser, pdap_client: PDAPClient, muckrock_api_interface: MuckrockAPIInterface, - hf_inference_client: HuggingFaceInferenceClient + hf_inference_client: HuggingFaceInferenceClient, ): # Dependencies self.adb_client = adb_client @@ -165,6 +166,7 @@ async def _get_url_root_url_task_operator(self) -> URLTaskEntry: ) ) + async def load_entries(self) -> list[URLTaskEntry]: return [ await self._get_url_root_url_task_operator(), diff --git a/src/core/tasks/url/manager.py b/src/core/tasks/url/manager.py index 8d4973a1..399da5b0 100644 --- a/src/core/tasks/url/manager.py +++ b/src/core/tasks/url/manager.py @@ -1,10 +1,10 @@ import logging +from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.handler import TaskHandler from src.core.tasks.url.loader import URLTaskOperatorLoader from src.core.tasks.url.models.entry import URLTaskEntry from src.db.enums import TaskType -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from src.core.function_trigger import FunctionTrigger @@ -57,7 +57,7 @@ async def _run_task(self, entry: URLTaskEntry) -> None: await self.handler.post_to_discord(message=message) break task_id = await self.handler.initiate_task_in_db(task_type=operator.task_type) - run_info: URLTaskOperatorRunInfo = await operator.run_task(task_id) + run_info: TaskOperatorRunInfo = await operator.run_task(task_id) await self.conclude_task(run_info) if run_info.outcome == TaskOperatorOutcome.ERROR: break @@ -68,11 +68,7 @@ async def trigger_task_run(self) -> None: await self.task_trigger.trigger_or_rerun() - async def conclude_task(self, run_info: URLTaskOperatorRunInfo) -> None: - await self.handler.link_urls_to_task( - task_id=run_info.task_id, - url_ids=run_info.linked_url_ids - ) + async def conclude_task(self, run_info: TaskOperatorRunInfo) -> None: await self.handler.handle_outcome(run_info) diff --git a/src/core/tasks/url/operators/auto_relevant/core.py b/src/core/tasks/url/operators/auto_relevant/core.py index 53ff101f..386b4be7 100644 --- a/src/core/tasks/url/operators/auto_relevant/core.py +++ b/src/core/tasks/url/operators/auto_relevant/core.py @@ -21,16 +21,16 @@ def __init__( self.hf_client = hf_client @property - def task_type(self): + def task_type(self) -> TaskType: return TaskType.RELEVANCY - async def meets_task_prerequisites(self): + async def meets_task_prerequisites(self) -> bool: return await self.adb_client.has_urls_with_html_data_and_without_auto_relevant_suggestion() async def get_tdos(self) -> list[URLRelevantTDO]: return await self.adb_client.get_tdos_for_auto_relevancy() - async def inner_task_logic(self): + async def inner_task_logic(self) -> None: tdos = await self.get_tdos() url_ids = [tdo.url_id for tdo in tdos] await self.link_urls_to_task(url_ids=url_ids) @@ -41,7 +41,12 @@ async def inner_task_logic(self): await self.put_results_into_database(subsets.success) await self.update_errors_in_database(subsets.error) - async def get_ml_classifications(self, tdos: list[URLRelevantTDO]): + async def get_ml_classifications(self, tdos: list[URLRelevantTDO]) -> None: + """ + Modifies: + tdo.annotation + tdo.error + """ for tdo in tdos: try: input_ = BasicInput( @@ -59,7 +64,7 @@ async def get_ml_classifications(self, tdos: list[URLRelevantTDO]): ) tdo.annotation = annotation_info - async def put_results_into_database(self, tdos: list[URLRelevantTDO]): + async def put_results_into_database(self, tdos: list[URLRelevantTDO]) -> None: inputs = [] for tdo in tdos: input_ = AutoRelevancyAnnotationInput( @@ -71,7 +76,7 @@ async def put_results_into_database(self, tdos: list[URLRelevantTDO]): inputs.append(input_) await self.adb_client.add_user_relevant_suggestions(inputs) - async def update_errors_in_database(self, tdos: list[URLRelevantTDO]): + async def update_errors_in_database(self, tdos: list[URLRelevantTDO]) -> None: error_infos = [] for tdo in tdos: error_info = URLErrorPydanticInfo( diff --git a/src/core/tasks/url/operators/base.py b/src/core/tasks/url/operators/base.py index d4d1667e..e1d70d5e 100644 --- a/src/core/tasks/url/operators/base.py +++ b/src/core/tasks/url/operators/base.py @@ -1,61 +1,36 @@ -import traceback -from abc import ABC, abstractmethod - from src.core.tasks.base.operator import TaskOperatorBase -from src.db.client.async_ import AsyncDatabaseClient -from src.db.enums import TaskType -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo +from src.core.tasks.base.run_info import TaskOperatorRunInfo +from src.core.tasks.mixins.link_urls import LinkURLsMixin +from src.core.tasks.mixins.prereq import HasPrerequisitesMixin from src.core.tasks.url.enums import TaskOperatorOutcome -from src.core.enums import BatchStatus +from src.db.client.async_ import AsyncDatabaseClient -class URLTaskOperatorBase(TaskOperatorBase): +class URLTaskOperatorBase( + TaskOperatorBase, + LinkURLsMixin, + HasPrerequisitesMixin, +): def __init__(self, adb_client: AsyncDatabaseClient): super().__init__(adb_client) - self.tasks_linked = False - self.linked_url_ids = [] - - @abstractmethod - async def meets_task_prerequisites(self) -> bool: - """ - A task should not be initiated unless certain - conditions are met - """ - raise NotImplementedError - - async def link_urls_to_task(self, url_ids: list[int]): - self.linked_url_ids = url_ids async def conclude_task(self): - if not self.linked_url_ids: + if not self.urls_linked: raise Exception("Task has not been linked to any URLs") return await self.run_info( outcome=TaskOperatorOutcome.SUCCESS, message="Task completed successfully" ) - async def run_task(self, task_id: int) -> URLTaskOperatorRunInfo: - self.task_id = task_id - try: - await self.inner_task_logic() - return await self.conclude_task() - except Exception as e: - stack_trace = traceback.format_exc() - return await self.run_info( - outcome=TaskOperatorOutcome.ERROR, - message=str(e) + "\n" + stack_trace - ) - async def run_info( self, outcome: TaskOperatorOutcome, message: str - ) -> URLTaskOperatorRunInfo: - return URLTaskOperatorRunInfo( + ) -> TaskOperatorRunInfo: + return TaskOperatorRunInfo( task_id=self.task_id, task_type=self.task_type, - linked_url_ids=self.linked_url_ids, outcome=outcome, message=message ) diff --git a/src/db/client/async_.py b/src/db/client/async_.py index cd2f7c02..3b994f86 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -604,7 +604,7 @@ async def get_all( self, session, model: Base, - order_by_attribute: Optional[str] = None + order_by_attribute: str | None = None ) -> list[Base]: """Get all records of a model. Used primarily in testing.""" return await sh.get_all(session=session, model=model, order_by_attribute=order_by_attribute) diff --git a/src/db/enums.py b/src/db/enums.py index dee42c2e..b8d6792d 100644 --- a/src/db/enums.py +++ b/src/db/enums.py @@ -45,6 +45,8 @@ class TaskType(PyEnum): PROBE_404 = "404 Probe" PROBE_URL = "URL Probe" ROOT_URL = "Root URL" + IA_PROBE = "Internet Archives Probe" + IA_ARCHIVE = "Internet Archives Archive" # Scheduled Tasks PUSH_TO_HUGGINGFACE = "Push to Hugging Face" diff --git a/src/db/models/impl/flag/checked_for_ia/__init__.py b/src/db/models/impl/flag/checked_for_ia/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/db/models/impl/flag/checked_for_ia/pydantic.py b/src/db/models/impl/flag/checked_for_ia/pydantic.py new file mode 100644 index 00000000..5b801f6d --- /dev/null +++ b/src/db/models/impl/flag/checked_for_ia/pydantic.py @@ -0,0 +1,11 @@ +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.templates.markers.bulk.insert import BulkInsertableModel + + +class FlagURLCheckedForInternetArchivesPydantic(BulkInsertableModel): + url_id: int + success: bool + + @classmethod + def sa_model(cls) -> type[FlagURLCheckedForInternetArchives]: + return FlagURLCheckedForInternetArchives \ No newline at end of file diff --git a/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py b/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py new file mode 100644 index 00000000..87914eb2 --- /dev/null +++ b/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py @@ -0,0 +1,21 @@ +from sqlalchemy import PrimaryKeyConstraint +from sqlalchemy.orm import Mapped + +from src.db.models.mixins import URLDependentMixin +from src.db.models.templates_.base import Base +from src.db.models.templates_.with_id import WithIDBase + + +class FlagURLCheckedForInternetArchives( + URLDependentMixin, + Base +): + + success: Mapped[bool] + + __tablename__ = 'flag_url_checked_for_internet_archive' + __table_args__ = ( + PrimaryKeyConstraint( + 'url_id', + ), + ) diff --git a/src/db/models/impl/url/ia_metadata/__init__.py b/src/db/models/impl/url/ia_metadata/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/db/models/impl/url/ia_metadata/pydantic.py b/src/db/models/impl/url/ia_metadata/pydantic.py new file mode 100644 index 00000000..ed98b057 --- /dev/null +++ b/src/db/models/impl/url/ia_metadata/pydantic.py @@ -0,0 +1,14 @@ +from src.db.models.impl.url.ia_metadata.sqlalchemy import URLInternetArchivesMetadata +from src.db.templates.markers.bulk.insert import BulkInsertableModel + + +class URLInternetArchiveMetadataPydantic(BulkInsertableModel): + + url_id: int + archive_url: str + digest: str + length: int + + @classmethod + def sa_model(cls) -> type[URLInternetArchivesMetadata]: + return URLInternetArchivesMetadata diff --git a/src/db/models/impl/url/ia_metadata/sqlalchemy.py b/src/db/models/impl/url/ia_metadata/sqlalchemy.py new file mode 100644 index 00000000..d89c0b8b --- /dev/null +++ b/src/db/models/impl/url/ia_metadata/sqlalchemy.py @@ -0,0 +1,15 @@ +from sqlalchemy.orm import Mapped + +from src.db.models.mixins import URLDependentMixin +from src.db.models.templates_.standard import StandardBase + + +class URLInternetArchivesMetadata( + StandardBase, + URLDependentMixin +): + __tablename__ = 'urls_internet_archive_metadata' + + archive_url: Mapped[str] + digest: Mapped[str] + length: Mapped[int] \ No newline at end of file diff --git a/src/external/internet_archives/__init__.py b/src/external/internet_archives/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/external/internet_archives/client.py b/src/external/internet_archives/client.py new file mode 100644 index 00000000..48458711 --- /dev/null +++ b/src/external/internet_archives/client.py @@ -0,0 +1,71 @@ +import asyncio +from asyncio import Semaphore + +from aiolimiter import AsyncLimiter +from aiohttp import ClientSession + +from src.external.internet_archives.convert import convert_capture_to_archive_metadata +from src.external.internet_archives.models.capture import IACapture +from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping + +limiter = AsyncLimiter( + max_rate=50, + time_period=50 +) +sem = Semaphore(10) + +class InternetArchivesClient: + + def __init__( + self, + session: ClientSession + ): + self.session = session + + async def _get_url_snapshot(self, url: str) -> IACapture | None: + params = { + "url": url, + "output": "json", + "limit": "1", + "gzip": "false", + "filter": "statuscode:200", + "fl": "timestamp,original,length,digest" + } + async with sem: + async with limiter: + async with self.session.get( + f"http://web.archive.org/cdx/search/cdx", + params=params + ) as response: + raw_data = await response.json() + if len(raw_data) == 0: + return None + fields = raw_data[0] + values = raw_data[1] + d = dict(zip(fields, values)) + + return IACapture(**d) + + async def search_for_url_snapshot(self, url: str) -> InternetArchivesURLMapping: + try: + capture: IACapture | None = await self._get_url_snapshot(url) + except Exception as e: + return InternetArchivesURLMapping( + url=url, + ia_metadata=None, + error=f"{e.__class__.__name__}: {e}" + ) + + if capture is None: + return InternetArchivesURLMapping( + url=url, + ia_metadata=None, + error=None + ) + + metadata = convert_capture_to_archive_metadata(capture) + return InternetArchivesURLMapping( + url=url, + ia_metadata=metadata, + error=None + ) diff --git a/src/external/internet_archives/constants.py b/src/external/internet_archives/constants.py new file mode 100644 index 00000000..9ddc48bf --- /dev/null +++ b/src/external/internet_archives/constants.py @@ -0,0 +1,3 @@ + + +MAX_CONCURRENT_REQUESTS = 10 \ No newline at end of file diff --git a/src/external/internet_archives/convert.py b/src/external/internet_archives/convert.py new file mode 100644 index 00000000..df7079ab --- /dev/null +++ b/src/external/internet_archives/convert.py @@ -0,0 +1,11 @@ +from src.external.internet_archives.models.archive_metadata import IAArchiveMetadata +from src.external.internet_archives.models.capture import IACapture + + +def convert_capture_to_archive_metadata(capture: IACapture) -> IAArchiveMetadata: + archive_url = f"https://web.archive.org/web/{capture.timestamp}/{capture.original}" + return IAArchiveMetadata( + archive_url=archive_url, + length=capture.length, + digest=capture.digest + ) \ No newline at end of file diff --git a/src/external/internet_archives/models/__init__.py b/src/external/internet_archives/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/external/internet_archives/models/archive_metadata.py b/src/external/internet_archives/models/archive_metadata.py new file mode 100644 index 00000000..2093377c --- /dev/null +++ b/src/external/internet_archives/models/archive_metadata.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class IAArchiveMetadata(BaseModel): + archive_url: str + length: int + digest: str \ No newline at end of file diff --git a/src/external/internet_archives/models/capture.py b/src/external/internet_archives/models/capture.py new file mode 100644 index 00000000..839c8ed0 --- /dev/null +++ b/src/external/internet_archives/models/capture.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel + + +class IACapture(BaseModel): + timestamp: int + original: str + length: int + digest: str \ No newline at end of file diff --git a/src/external/internet_archives/models/ia_url_mapping.py b/src/external/internet_archives/models/ia_url_mapping.py new file mode 100644 index 00000000..21650b0c --- /dev/null +++ b/src/external/internet_archives/models/ia_url_mapping.py @@ -0,0 +1,17 @@ +from pydantic import BaseModel + +from src.external.internet_archives.models.archive_metadata import IAArchiveMetadata + + +class InternetArchivesURLMapping(BaseModel): + url: str + ia_metadata: IAArchiveMetadata | None + error: str | None + + @property + def has_error(self) -> bool: + return self.error is not None + + @property + def has_metadata(self) -> bool: + return self.ia_metadata is not None diff --git a/src/util/url_mapper.py b/src/util/url_mapper.py index 17ddb3e6..3a399d77 100644 --- a/src/util/url_mapper.py +++ b/src/util/url_mapper.py @@ -22,6 +22,12 @@ def get_ids(self, urls: list[str]) -> list[int]: for url in urls ] + def get_all_ids(self) -> list[int]: + return list(self._url_to_id.values()) + + def get_all_urls(self) -> list[str]: + return list(self._url_to_id.keys()) + def get_url(self, url_id: int) -> str: return self._id_to_url[url_id] diff --git a/tests/automated/integration/api/conftest.py b/tests/automated/integration/api/conftest.py index d07e92d5..2943c76c 100644 --- a/tests/automated/integration/api/conftest.py +++ b/tests/automated/integration/api/conftest.py @@ -1,3 +1,5 @@ +import os +from contextlib import contextmanager from typing import Generator, Any, AsyncGenerator from unittest.mock import AsyncMock @@ -36,23 +38,45 @@ def override_access_info() -> AccessInfo: ] ) +@contextmanager +def set_env_vars(env_vars: dict[str, str]): + """Temporarily set multiple environment variables, restoring afterwards.""" + originals = {} + try: + # Save originals and set new values + for key, value in env_vars.items(): + originals[key] = os.environ.get(key) + os.environ[key] = value + yield + finally: + # Restore originals + for key, original in originals.items(): + if original is None: + os.environ.pop(key, None) + else: + os.environ[key] = original + @pytest.fixture(scope="session") def client() -> Generator[TestClient, None, None]: # Mock environment - with TestClient(app) as c: - app.dependency_overrides[get_access_info] = override_access_info - app.dependency_overrides[requires_final_review_permission] = override_access_info - async_core: AsyncCore = c.app.state.async_core + with set_env_vars({ + "SCHEDULED_TASKS_FLAG": "0", + "RUN_URL_TASKS_TASK_FLAG": "0", + }): + with TestClient(app) as c: + app.dependency_overrides[get_access_info] = override_access_info + app.dependency_overrides[requires_final_review_permission] = override_access_info + async_core: AsyncCore = c.app.state.async_core - # Interfaces to the web should be mocked - task_manager = async_core.task_manager - task_manager.url_request_interface = AsyncMock() - task_manager.discord_poster = AsyncMock() - # Disable Logger - task_manager.logger.disabled = True - # Set trigger to fail immediately if called, to force it to be manually specified in tests - task_manager.task_trigger._func = fail_task_trigger - yield c + # Interfaces to the web should be mocked + task_manager = async_core.task_manager + task_manager.url_request_interface = AsyncMock() + task_manager.discord_poster = AsyncMock() + # Disable Logger + task_manager.logger.disabled = True + # Set trigger to fail immediately if called, to force it to be manually specified in tests + task_manager.task_trigger._func = fail_task_trigger + yield c # Reset environment variables back to original state diff --git a/tests/automated/integration/core/async_/conclude_task/helpers.py b/tests/automated/integration/core/async_/conclude_task/helpers.py index 35e106c8..923b3cc9 100644 --- a/tests/automated/integration/core/async_/conclude_task/helpers.py +++ b/tests/automated/integration/core/async_/conclude_task/helpers.py @@ -1,4 +1,4 @@ -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo +from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from src.db.enums import TaskType from tests.automated.integration.core.async_.conclude_task.setup_info import TestAsyncCoreSetupInfo @@ -9,10 +9,9 @@ def setup_run_info( outcome: TaskOperatorOutcome, message: str = "" ): - run_info = URLTaskOperatorRunInfo( + run_info = TaskOperatorRunInfo( task_id=setup_info.task_id, task_type=TaskType.HTML, - linked_url_ids=setup_info.url_ids, outcome=outcome, message=message, ) diff --git a/tests/automated/integration/core/async_/conclude_task/test_error.py b/tests/automated/integration/core/async_/conclude_task/test_error.py index 2b8c1996..9507c9ed 100644 --- a/tests/automated/integration/core/async_/conclude_task/test_error.py +++ b/tests/automated/integration/core/async_/conclude_task/test_error.py @@ -27,4 +27,3 @@ async def test_conclude_task_error( assert task_info.task_status == BatchStatus.ERROR assert task_info.error_info == "test error" - assert len(task_info.urls) == 3 diff --git a/tests/automated/integration/core/async_/conclude_task/test_success.py b/tests/automated/integration/core/async_/conclude_task/test_success.py index 54de38f1..d9ba649e 100644 --- a/tests/automated/integration/core/async_/conclude_task/test_success.py +++ b/tests/automated/integration/core/async_/conclude_task/test_success.py @@ -26,4 +26,3 @@ async def test_conclude_task_success( task_info = await ddc.adb_client.get_task_info(task_id=setup.task_id) assert task_info.task_status == BatchStatus.READY_TO_LABEL - assert len(task_info.urls) == 3 diff --git a/tests/automated/integration/core/async_/run_task/test_break_loop.py b/tests/automated/integration/core/async_/run_task/test_break_loop.py index 17ce5e51..0d8a9bc2 100644 --- a/tests/automated/integration/core/async_/run_task/test_break_loop.py +++ b/tests/automated/integration/core/async_/run_task/test_break_loop.py @@ -3,10 +3,10 @@ import pytest +from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.url.models.entry import URLTaskEntry from src.core.tasks.url.operators.base import URLTaskOperatorBase from src.db.enums import TaskType -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from tests.automated.integration.core.async_.helpers import setup_async_core from tests.helpers.data_creator.core import DBDataCreator @@ -21,11 +21,10 @@ async def test_run_task_break_loop(db_data_creator: DBDataCreator): and an alert should be sent to discord """ - async def run_task(self, task_id: int) -> URLTaskOperatorRunInfo: - return URLTaskOperatorRunInfo( + async def run_task(self, task_id: int) -> TaskOperatorRunInfo: + return TaskOperatorRunInfo( task_id=task_id, outcome=TaskOperatorOutcome.SUCCESS, - linked_url_ids=[1, 2, 3], task_type=TaskType.HTML ) diff --git a/tests/automated/integration/core/async_/run_task/test_prereq_met.py b/tests/automated/integration/core/async_/run_task/test_prereq_met.py index 03e3e74c..a7724a45 100644 --- a/tests/automated/integration/core/async_/run_task/test_prereq_met.py +++ b/tests/automated/integration/core/async_/run_task/test_prereq_met.py @@ -4,7 +4,7 @@ import pytest from src.core.enums import BatchStatus -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo +from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from src.core.tasks.url.models.entry import URLTaskEntry from src.core.tasks.url.operators.base import URLTaskOperatorBase @@ -21,12 +21,11 @@ async def test_run_task_prereq_met(db_data_creator: DBDataCreator): And a task entry should be created in the database """ - async def run_task(self, task_id: int) -> URLTaskOperatorRunInfo: - return URLTaskOperatorRunInfo( + async def run_task(self, task_id: int) -> TaskOperatorRunInfo: + return TaskOperatorRunInfo( task_id=task_id, task_type=TaskType.HTML, outcome=TaskOperatorOutcome.SUCCESS, - linked_url_ids=[1, 2, 3] ) core = setup_async_core(db_data_creator.adb_client) diff --git a/tests/automated/integration/tasks/scheduled/impl/huggingface/test_happy_path.py b/tests/automated/integration/tasks/scheduled/impl/huggingface/test_happy_path.py index ddb85104..d3c3e056 100644 --- a/tests/automated/integration/tasks/scheduled/impl/huggingface/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/impl/huggingface/test_happy_path.py @@ -18,7 +18,7 @@ async def test_happy_path( push_function: AsyncMock = hf_client.push_data_sources_raw_to_hub # Check, prior to adding URLs, that task does not run - task_info = await operator.run_task(1) + task_info = await operator.run_task() assert_task_ran_without_error(task_info) push_function.assert_not_called() @@ -27,7 +27,7 @@ async def test_happy_path( await manager.setup() # Run task - task_info = await operator.run_task(2) + task_info = await operator.run_task() assert_task_ran_without_error(task_info) push_function.assert_called_once() @@ -37,6 +37,6 @@ async def test_happy_path( manager.check_results(call_args) # Test that after update, running again yields no results - task_info = await operator.run_task(3) + task_info = await operator.run_task() assert_task_ran_without_error(task_info) push_function.assert_called_once() \ No newline at end of file diff --git a/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_happy_path.py b/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_happy_path.py index 9fadf6ca..d783b5cb 100644 --- a/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_happy_path.py @@ -21,7 +21,7 @@ async def test_agency_sync_happy_path( db_client = operator.adb_client with patch_sync_agencies(AGENCIES_SYNC_RESPONSES): - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_run_success(run_info) mock_func: MagicMock = operator.pdap_client.sync_agencies diff --git a/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_interruption.py b/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_interruption.py index db7f74b5..bf4ff81e 100644 --- a/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_interruption.py +++ b/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_interruption.py @@ -27,7 +27,7 @@ async def test_agency_sync_interruption( with patch_sync_agencies( [FIRST_CALL_RESPONSE, ValueError("test error")] ): - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.ERROR, run_info.message @@ -58,7 +58,7 @@ async def test_agency_sync_interruption( assert sync_state_results.current_cutoff_date is None with patch_sync_agencies([SECOND_CALL_RESPONSE, THIRD_CALL_RESPONSE]): - await operator.run_task(2) + await operator.run_task() await check_sync_concluded(db_client) diff --git a/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_no_new_results.py b/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_no_new_results.py index 68225a51..0db01723 100644 --- a/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_no_new_results.py +++ b/tests/automated/integration/tasks/scheduled/impl/sync/agency/test_no_new_results.py @@ -31,7 +31,7 @@ async def test_agency_sync_task_no_new_results( ) with patch_sync_agencies([THIRD_CALL_RESPONSE]): - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_run_success(run_info) mock_func: AsyncMock = operator.pdap_client.sync_agencies mock_func.assert_called_once_with( diff --git a/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_happy_path.py b/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_happy_path.py index d1042e66..41f38b2a 100644 --- a/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_happy_path.py @@ -28,7 +28,7 @@ async def test_data_sources_sync_happy_path( with patch_sync_data_sources( await manager.get_data_sources_sync_responses([order for order in SyncResponseOrder]) ): - run_info = await test_operator.run_task(1) + run_info = await test_operator.run_task() assert_task_run_success(run_info) mock_func: MagicMock = test_operator.pdap_client.sync_data_sources diff --git a/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_interruption.py b/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_interruption.py index 997859b5..0441a102 100644 --- a/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_interruption.py +++ b/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_interruption.py @@ -34,7 +34,7 @@ async def test_data_sources_sync_interruption( first_response + [ValueError("test error")] ): - run_info = await test_operator.run_task(1) + run_info = await test_operator.run_task() assert run_info.outcome == TaskOperatorOutcome.ERROR, run_info.message await manager.check_via_sync_response_order(SyncResponseOrder.FIRST) @@ -57,7 +57,7 @@ async def test_data_sources_sync_interruption( [SyncResponseOrder.SECOND, SyncResponseOrder.THIRD] ) with patch_sync_data_sources(second_response): - await test_operator.run_task(2) + await test_operator.run_task() await check_sync_concluded(adb_client) diff --git a/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_no_new_results.py b/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_no_new_results.py index fe69cc57..ebcbe856 100644 --- a/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_no_new_results.py +++ b/tests/automated/integration/tasks/scheduled/impl/sync/data_sources/test_no_new_results.py @@ -41,7 +41,7 @@ async def test_data_sources_sync_no_new_results( ) with patch_sync_data_sources(first_response): - run_info = await test_operator.run_task(1) + run_info = await test_operator.run_task() assert_task_run_success(run_info) mock_func: MagicMock = test_operator.pdap_client.sync_data_sources diff --git a/tests/automated/integration/tasks/scheduled/loader/conftest.py b/tests/automated/integration/tasks/scheduled/loader/conftest.py index 67f18283..30d8962e 100644 --- a/tests/automated/integration/tasks/scheduled/loader/conftest.py +++ b/tests/automated/integration/tasks/scheduled/loader/conftest.py @@ -6,6 +6,7 @@ from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader from src.db.client.async_ import AsyncDatabaseClient from src.external.huggingface.hub.client import HuggingFaceHubClient +from src.external.internet_archives.client import InternetArchivesClient from src.external.pdap.client import PDAPClient @@ -16,5 +17,6 @@ def loader() -> ScheduledTaskOperatorLoader: async_core=create_autospec(AsyncCore, instance=True), adb_client=AsyncMock(spec=AsyncDatabaseClient), pdap_client=AsyncMock(spec=PDAPClient), - hf_client=AsyncMock(spec=HuggingFaceHubClient) + hf_client=AsyncMock(spec=HuggingFaceHubClient), + ia_client=AsyncMock(spec=InternetArchivesClient) ) \ No newline at end of file diff --git a/tests/automated/integration/tasks/scheduled/loader/test_flags.py b/tests/automated/integration/tasks/scheduled/loader/test_flags.py index 8176dc11..216210fe 100644 --- a/tests/automated/integration/tasks/scheduled/loader/test_flags.py +++ b/tests/automated/integration/tasks/scheduled/loader/test_flags.py @@ -4,6 +4,7 @@ from src.core.tasks.scheduled.impl.backlog.operator import PopulateBacklogSnapshotTaskOperator from src.core.tasks.scheduled.impl.delete_logs.operator import DeleteOldLogsTaskOperator from src.core.tasks.scheduled.impl.huggingface.operator import PushToHuggingFaceTaskOperator +from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator from src.core.tasks.scheduled.impl.run_url_tasks.operator import RunURLTasksTaskOperator from src.core.tasks.scheduled.impl.sync.agency.operator import SyncAgenciesTaskOperator from src.core.tasks.scheduled.impl.sync.data_sources.operator import SyncDataSourcesTaskOperator @@ -44,7 +45,11 @@ class Config: FlagTestParams( env_var="RUN_URL_TASKS_TASK_FLAG", operator=RunURLTasksTaskOperator - ) + ), + FlagTestParams( + env_var="IA_PROBE_TASK_FLAG", + operator=InternetArchivesProbeTaskOperator + ), ] diff --git a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py index 1fbf24a7..e5cc6d32 100644 --- a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py @@ -2,14 +2,16 @@ from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader -NUMBER_OF_ENTRIES = 6 +NUMBER_OF_ENTRIES = 7 @pytest.mark.asyncio async def test_happy_path( - loader: ScheduledTaskOperatorLoader + loader: ScheduledTaskOperatorLoader, + monkeypatch ): """ Under normal circumstances, all task operators should be returned """ + monkeypatch.setenv("SCHEDULED_TASKS_FLAG", "1") entries = await loader.load_entries() assert len(entries) == NUMBER_OF_ENTRIES \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/agency_identification/happy_path/test_happy_path.py b/tests/automated/integration/tasks/url/impl/agency_identification/happy_path/test_happy_path.py index 57c62fc3..caeb333a 100644 --- a/tests/automated/integration/tasks/url/impl/agency_identification/happy_path/test_happy_path.py +++ b/tests/automated/integration/tasks/url/impl/agency_identification/happy_path/test_happy_path.py @@ -81,7 +81,7 @@ async def test_agency_identification_task( # Confirm meets prerequisites assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message # Confirm tasks are piped into the correct subtasks diff --git a/tests/automated/integration/tasks/url/impl/asserts.py b/tests/automated/integration/tasks/url/impl/asserts.py index fa69d4a1..4187d7ef 100644 --- a/tests/automated/integration/tasks/url/impl/asserts.py +++ b/tests/automated/integration/tasks/url/impl/asserts.py @@ -1,5 +1,4 @@ from src.core.tasks.base.run_info import TaskOperatorRunInfo -from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome @@ -14,6 +13,3 @@ async def assert_prereqs_met(operator): def assert_task_ran_without_error(run_info: TaskOperatorRunInfo): assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message -def assert_url_task_has_expected_run_info(run_info: URLTaskOperatorRunInfo, url_ids: list[int]): - assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message - assert run_info.linked_url_ids == url_ids diff --git a/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py b/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py index 0bd891c9..81b03070 100644 --- a/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py +++ b/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py @@ -3,13 +3,12 @@ import pytest from src.collectors.enums import URLStatus -from src.db.enums import TaskType from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.models.impl.url.suggestion.relevant.auto.sqlalchemy import AutoRelevantSuggestion -from tests.automated.integration.tasks.url.impl.asserts import assert_prereqs_not_met, assert_url_task_has_expected_run_info, \ - assert_prereqs_met +from tests.automated.integration.tasks.url.impl.asserts import assert_prereqs_not_met, assert_prereqs_met from tests.automated.integration.tasks.url.impl.auto_relevant.setup import setup_operator, setup_urls +from tests.helpers.asserts import assert_task_run_success @pytest.mark.asyncio @@ -21,11 +20,9 @@ async def test_url_auto_relevant_task(db_data_creator): url_ids = await setup_urls(db_data_creator) await assert_prereqs_met(operator) - task_id = await db_data_creator.adb_client.initiate_task(task_type=TaskType.RELEVANCY) + run_info = await operator.run_task() - run_info = await operator.run_task(task_id) - - assert_url_task_has_expected_run_info(run_info, url_ids) + assert_task_run_success(run_info) assert not await operator.meets_task_prerequisites() diff --git a/tests/automated/integration/tasks/url/impl/html/test_task.py b/tests/automated/integration/tasks/url/impl/html/test_task.py index 8d4de418..e7462e65 100644 --- a/tests/automated/integration/tasks/url/impl/html/test_task.py +++ b/tests/automated/integration/tasks/url/impl/html/test_task.py @@ -21,8 +21,7 @@ async def test_url_html_task(adb_client_test: AsyncDatabaseClient): records = await setup.setup() await assert_prereqs_met(operator) - task_id = await adb_client_test.initiate_task(task_type=TaskType.HTML) - run_info = await operator.run_task(task_id) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) checker = TestURLHTMLTaskCheckManager( diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/__init__.py b/tests/automated/integration/tasks/url/impl/ia_metadata/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/conftest.py b/tests/automated/integration/tasks/url/impl/ia_metadata/conftest.py new file mode 100644 index 00000000..9fc586e4 --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/ia_metadata/conftest.py @@ -0,0 +1,22 @@ +from unittest.mock import create_autospec, AsyncMock + +import pytest + +from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.external.internet_archives.client import InternetArchivesClient + + +@pytest.fixture +def operator(adb_client_test: AsyncDatabaseClient) -> InternetArchivesProbeTaskOperator: + ia_client = InternetArchivesClient( + session=AsyncMock() + ) + ia_client._get_url_snapshot = create_autospec( + ia_client._get_url_snapshot, + ) + + return InternetArchivesProbeTaskOperator( + adb_client=adb_client_test, + ia_client=ia_client + ) \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/constants.py b/tests/automated/integration/tasks/url/impl/ia_metadata/constants.py new file mode 100644 index 00000000..d41ffb48 --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/ia_metadata/constants.py @@ -0,0 +1,4 @@ + + +TEST_URL_1 = "https://test-ia-metadata.com/1" +TEST_URL_2 = "https://test-ia-metadata.com/2" \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/setup.py b/tests/automated/integration/tasks/url/impl/ia_metadata/setup.py new file mode 100644 index 00000000..0a60ccc7 --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/ia_metadata/setup.py @@ -0,0 +1,28 @@ +from unittest.mock import AsyncMock + +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.url.core.enums import URLSource +from src.db.models.impl.url.core.pydantic.insert import URLInsertModel +from tests.automated.integration.tasks.url.impl.ia_metadata.constants import TEST_URL_1, TEST_URL_2 + + +async def add_urls(dbc: AsyncDatabaseClient) -> list[int]: + """Adds two URLs to the database.""" + insert_models: list[URLInsertModel] = [ + URLInsertModel( + url=TEST_URL_1, + source=URLSource.COLLECTOR + ), + URLInsertModel( + url=TEST_URL_2, + source=URLSource.COLLECTOR + ) + ] + return await dbc.bulk_insert(insert_models, return_ids=True) + +async def add_mock_response(mock_ia_client: AsyncMock, results: list) -> None: + """ + Modifies: + mock_ia_client.search_for_url_snapshot + """ + mock_ia_client.search_for_url_snapshot.side_effect = results \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/test_entry_not_found.py b/tests/automated/integration/tasks/url/impl/ia_metadata/test_entry_not_found.py new file mode 100644 index 00000000..f451f131 --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/ia_metadata/test_entry_not_found.py @@ -0,0 +1,54 @@ +import pytest + +from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.models.impl.url.ia_metadata.sqlalchemy import URLInternetArchivesMetadata +from tests.automated.integration.tasks.url.impl.asserts import assert_task_ran_without_error +from tests.automated.integration.tasks.url.impl.ia_metadata.setup import add_urls + + +@pytest.mark.asyncio +async def test_entry_not_found(operator: InternetArchivesProbeTaskOperator) -> None: + """ + If URLs are present in the database and have not been processed yet, + They should be processed, and flagged as checked for + If the client finds no archive metadata for the URL, + the internet archive metadata should not be added + """ + adb_client: AsyncDatabaseClient = operator.adb_client + + # Confirm operator does not yet meet prerequisites + assert not await operator.meets_task_prerequisites() + + # Add URLs to database + url_ids: list[int] = await add_urls(adb_client) + + # Confirm operator now meets prerequisites + assert await operator.meets_task_prerequisites() + + # Set IA Client to return None + operator.ia_client._get_url_snapshot.side_effect = [ + None, + None + ] + + # Run task + run_info = await operator.run_task() + + # Confirm task ran without error + assert_task_ran_without_error(run_info) + + # Confirm operator no longer meets prerequisites + assert not await operator.meets_task_prerequisites() + + # Confirm URLs have been marked as checked, with success = True + flags: list[FlagURLCheckedForInternetArchives] = await adb_client.get_all(FlagURLCheckedForInternetArchives) + assert len(flags) == 2 + assert {flag.url_id for flag in flags} == set(url_ids) + assert all(flag.success for flag in flags) + + + # Confirm IA metadata has not been added + metadata_list: list[URLInternetArchivesMetadata] = await adb_client.get_all(URLInternetArchivesMetadata) + assert len(metadata_list) == 0 diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/test_error.py b/tests/automated/integration/tasks/url/impl/ia_metadata/test_error.py new file mode 100644 index 00000000..3d5315cc --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/ia_metadata/test_error.py @@ -0,0 +1,64 @@ +import pytest + +from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo +from src.db.models.impl.url.ia_metadata.sqlalchemy import URLInternetArchivesMetadata +from tests.automated.integration.tasks.url.impl.asserts import assert_task_ran_without_error +from tests.automated.integration.tasks.url.impl.ia_metadata.setup import add_urls + + +@pytest.mark.asyncio +async def test_error(operator: InternetArchivesProbeTaskOperator) -> None: + """ + If URLs are present in the database and have not been processed yet, + They should be processed, and flagged as checked for + If the client raises an error, + the internet archive metadata should be added + """ + adb_client: AsyncDatabaseClient = operator.adb_client + + # Confirm operator does not yet meet prerequisites + assert not await operator.meets_task_prerequisites() + + # Add URLs to database + url_ids: list[int] = await add_urls(adb_client) + + # Confirm operator now meets prerequisites + assert await operator.meets_task_prerequisites() + + # Set IA Client to raise error on request + operator.ia_client._get_url_snapshot.side_effect = [ + RuntimeError("Something went wrong"), + ValueError("Something else went wrong"), + ] + + # Run task + run_info = await operator.run_task() + + # Confirm task ran without error + assert_task_ran_without_error(run_info) + + # Confirm operator no longer meets prerequisites + assert not await operator.meets_task_prerequisites() + + # Confirm URLs have been marked as checked, with success = False + flags: list[FlagURLCheckedForInternetArchives] = await adb_client.get_all(FlagURLCheckedForInternetArchives) + assert len(flags) == 2 + assert {flag.url_id for flag in flags} == set(url_ids) + assert all(not flag.success for flag in flags) + + # Confirm IA metadata has not been added + metadata_list: list[URLInternetArchivesMetadata] = await adb_client.get_all(URLInternetArchivesMetadata) + assert len(metadata_list) == 0 + + # Confirm presence of URL Error Info + url_error_info_list: list[URLErrorInfo] = await adb_client.get_all(URLErrorInfo) + assert len(url_error_info_list) == 2 + assert {url_error_info.url_id for url_error_info in url_error_info_list} == set(url_ids) + assert {url_error_info.error for url_error_info in url_error_info_list} == { + "ValueError: Something else went wrong", "RuntimeError: Something went wrong" + } + diff --git a/tests/automated/integration/tasks/url/impl/ia_metadata/test_happy_path.py b/tests/automated/integration/tasks/url/impl/ia_metadata/test_happy_path.py new file mode 100644 index 00000000..8336158c --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/ia_metadata/test_happy_path.py @@ -0,0 +1,80 @@ +import pytest + +from src.core.tasks.base.run_info import TaskOperatorRunInfo +from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.models.impl.url.ia_metadata.sqlalchemy import URLInternetArchivesMetadata +from src.external.internet_archives.models.capture import IACapture +from tests.automated.integration.tasks.url.impl.asserts import assert_task_ran_without_error +from tests.automated.integration.tasks.url.impl.ia_metadata.constants import TEST_URL_1, TEST_URL_2 +from tests.automated.integration.tasks.url.impl.ia_metadata.setup import add_urls + + +@pytest.mark.asyncio +async def test_happy_path(operator: InternetArchivesProbeTaskOperator) -> None: + """ + If URLs are present in the database and have not been processed yet, + They should be processed, and flagged as checked for + If the client returns a valid response, + the internet archive metadata should be added + """ + # TODO: Figure out how to change the check for task pre-requisites to something different, + # like checking that the next time it runs, it cancels immediately? + # Or perhaps add `meets_task_prerequisites` and have it only be required for some operators + # set it up in a configuration + # Maybe make a URLScheduledTask Operator Base? + # Or make both into mixins? + + adb_client: AsyncDatabaseClient = operator.adb_client + + # Confirm operator does not yet meet prerequisites + assert not await operator.meets_task_prerequisites() + + # Add URLs to database + url_ids: list[int] = await add_urls(adb_client) + + # Confirm operator now meets prerequisites + assert await operator.meets_task_prerequisites() + + # Set IA Client to return valid response + operator.ia_client._get_url_snapshot.side_effect = [ + IACapture( + timestamp=1045890000, + original=TEST_URL_1, + length=1000, + digest="a4kf189" + ), + IACapture( + timestamp=1045890001, + original=TEST_URL_2, + length=2000, + digest="g19f189" + ) + ] + + # Run task + run_info: TaskOperatorRunInfo = await operator.run_task() + + # Confirm task ran without error + assert_task_ran_without_error(run_info) + + # Confirm operator no longer meets prerequisites + assert not await operator.meets_task_prerequisites() + + # Confirm URLs have been marked as checked, with success = True + flags: list[FlagURLCheckedForInternetArchives] = await adb_client.get_all(FlagURLCheckedForInternetArchives) + assert len(flags) == 2 + assert {flag.url_id for flag in flags} == set(url_ids) + assert all(flag.success for flag in flags) + + # Confirm IA metadata has been added + metadata_list: list[URLInternetArchivesMetadata] = await adb_client.get_all(URLInternetArchivesMetadata) + assert len(metadata_list) == 2 + assert {metadata.url_id for metadata in metadata_list} == set(url_ids) + assert {metadata.archive_url for metadata in metadata_list} == { + f"https://web.archive.org/web/1045890000/{TEST_URL_1}", + f"https://web.archive.org/web/1045890001/{TEST_URL_2}" + } + assert {metadata.digest for metadata in metadata_list} == {"a4kf189", "g19f189"} + assert {metadata.length for metadata in metadata_list} == {1000, 2000} \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_error.py b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_error.py index 924efb5c..404f00e1 100644 --- a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_error.py +++ b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_error.py @@ -30,7 +30,7 @@ async def test_url_probe_task_error( assert not await operator.meets_task_prerequisites() url_id = await setup_manager.setup_url(URLStatus.SUBMITTED) assert await operator.meets_task_prerequisites() - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) assert not await operator.meets_task_prerequisites() await check_manager.check_url( diff --git a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_not_found.py b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_not_found.py index 400cf3d1..97937c15 100644 --- a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_not_found.py +++ b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_not_found.py @@ -31,7 +31,7 @@ async def test_url_probe_task_not_found( assert not await operator.meets_task_prerequisites() url_id = await setup_manager.setup_url(URLStatus.NOT_RELEVANT) assert await operator.meets_task_prerequisites() - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) assert not await operator.meets_task_prerequisites() await check_manager.check_url( diff --git a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_ok.py b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_ok.py index 2d0dd641..a02f1ba4 100644 --- a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_ok.py +++ b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_ok.py @@ -30,7 +30,7 @@ async def test_url_probe_task_no_redirect_ok( assert not await operator.meets_task_prerequisites() url_id = await setup_manager.setup_url(URLStatus.PENDING) assert await operator.meets_task_prerequisites() - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) assert not await operator.meets_task_prerequisites() await check_manager.check_url( diff --git a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_two_urls.py b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_two_urls.py index 75595ed4..0c1da5fd 100644 --- a/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_two_urls.py +++ b/tests/automated/integration/tasks/url/impl/probe/no_redirect/test_two_urls.py @@ -34,7 +34,7 @@ async def test_two_urls( url_id_1 = await setup_manager.setup_url(URLStatus.PENDING, url=url_1) url_id_2 = await setup_manager.setup_url(URLStatus.NOT_RELEVANT, url=url_2) assert await operator.meets_task_prerequisites() - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) assert not await operator.meets_task_prerequisites() diff --git a/tests/automated/integration/tasks/url/impl/probe/redirect/dest_new/test_dest_ok.py b/tests/automated/integration/tasks/url/impl/probe/redirect/dest_new/test_dest_ok.py index 7c589bd7..88098b16 100644 --- a/tests/automated/integration/tasks/url/impl/probe/redirect/dest_new/test_dest_ok.py +++ b/tests/automated/integration/tasks/url/impl/probe/redirect/dest_new/test_dest_ok.py @@ -29,7 +29,7 @@ async def test_url_probe_task_redirect_dest_new_ok( ) ) source_url_id = await setup_manager.setup_url(URLStatus.PENDING) - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) await check_manager.check_url( url_id=source_url_id, diff --git a/tests/automated/integration/tasks/url/impl/probe/redirect/test_dest_exists_in_db.py b/tests/automated/integration/tasks/url/impl/probe/redirect/test_dest_exists_in_db.py index 75847c4a..0744f3b9 100644 --- a/tests/automated/integration/tasks/url/impl/probe/redirect/test_dest_exists_in_db.py +++ b/tests/automated/integration/tasks/url/impl/probe/redirect/test_dest_exists_in_db.py @@ -40,7 +40,7 @@ async def test_url_probe_task_redirect_dest_exists_in_db( accessed=True ) await setup_manager.adb_client.bulk_insert([web_metadata]) - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) await check_manager.check_url( url_id=source_url_id, diff --git a/tests/automated/integration/tasks/url/impl/probe/redirect/test_redirect_infinite.py b/tests/automated/integration/tasks/url/impl/probe/redirect/test_redirect_infinite.py index c6ef468f..ed9c38ac 100644 --- a/tests/automated/integration/tasks/url/impl/probe/redirect/test_redirect_infinite.py +++ b/tests/automated/integration/tasks/url/impl/probe/redirect/test_redirect_infinite.py @@ -28,7 +28,7 @@ async def test_url_probe_task_redirect_infinite( ) ) url_id = await setup_manager.setup_url(URLStatus.PENDING) - run_info = await operator.run_task(1) + run_info = await operator.run_task() await check_manager.check_url( url_id=url_id, expected_status=URLStatus.PENDING diff --git a/tests/automated/integration/tasks/url/impl/probe/redirect/test_two_urls_same_dest.py b/tests/automated/integration/tasks/url/impl/probe/redirect/test_two_urls_same_dest.py index 47d2ae34..267d9015 100644 --- a/tests/automated/integration/tasks/url/impl/probe/redirect/test_two_urls_same_dest.py +++ b/tests/automated/integration/tasks/url/impl/probe/redirect/test_two_urls_same_dest.py @@ -36,7 +36,7 @@ async def test_url_probe_task_redirect_two_urls_same_dest( ) source_url_id_1 = await setup_manager.setup_url(URLStatus.PENDING) source_url_id_2 = await setup_manager.setup_url(URLStatus.PENDING, url="https://example.com/2") - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) await check_manager.check_url( url_id=source_url_id_1, diff --git a/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_in_db.py b/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_in_db.py index aa26154d..7e8af066 100644 --- a/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_in_db.py +++ b/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_in_db.py @@ -44,7 +44,7 @@ async def test_branch_root_url_in_db( assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) # Check task prerequisites no longer met diff --git a/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_not_in_db.py b/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_not_in_db.py index 845190ad..6c00f8f9 100644 --- a/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_not_in_db.py +++ b/tests/automated/integration/tasks/url/impl/root_url/test_branch_root_url_not_in_db.py @@ -34,7 +34,7 @@ async def test_branch_root_url_not_in_db( assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) # Check task prerequisites no longer met diff --git a/tests/automated/integration/tasks/url/impl/root_url/test_is_root_url.py b/tests/automated/integration/tasks/url/impl/root_url/test_is_root_url.py index e815f564..a6a56c7c 100644 --- a/tests/automated/integration/tasks/url/impl/root_url/test_is_root_url.py +++ b/tests/automated/integration/tasks/url/impl/root_url/test_is_root_url.py @@ -31,7 +31,7 @@ async def test_is_root_url( assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) # Check task prerequisites no longer met diff --git a/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db.py b/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db.py index 141ae93b..be67d23e 100644 --- a/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db.py +++ b/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db.py @@ -48,7 +48,7 @@ async def test_two_branches_one_root_in_db( assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) # Check task prerequisites no longer met diff --git a/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db_not_flagged.py b/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db_not_flagged.py index 88f65596..614796e9 100644 --- a/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db_not_flagged.py +++ b/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_in_db_not_flagged.py @@ -47,7 +47,7 @@ async def test_two_branches_one_root_in_db_not_flagged( assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) # Check task prerequisites no longer met diff --git a/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_not_in_db.py b/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_not_in_db.py index 8bfb8534..f68786b9 100644 --- a/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_not_in_db.py +++ b/tests/automated/integration/tasks/url/impl/root_url/test_two_branches_one_root_not_in_db.py @@ -37,7 +37,7 @@ async def test_two_branches_one_root_in_db_not_flagged( assert await operator.meets_task_prerequisites() # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert_task_ran_without_error(run_info) # Check task prerequisites no longer met diff --git a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py index 8df14a8f..7d56ddcf 100644 --- a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py +++ b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py @@ -44,10 +44,7 @@ async def test_submit_approved_url_task( assert await operator.meets_task_prerequisites() # Run Task - task_id = await db_data_creator.adb_client.initiate_task( - task_type=TaskType.SUBMIT_APPROVED - ) - run_info = await operator.run_task(task_id=task_id) + run_info = await operator.run_task() # Check Task has been marked as completed assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message diff --git a/tests/automated/integration/tasks/url/impl/test_example_task.py b/tests/automated/integration/tasks/url/impl/test_example_task.py index 06678658..00ec7c34 100644 --- a/tests/automated/integration/tasks/url/impl/test_example_task.py +++ b/tests/automated/integration/tasks/url/impl/test_example_task.py @@ -5,9 +5,12 @@ from src.db.enums import TaskType from src.core.tasks.url.enums import TaskOperatorOutcome from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.db.models.impl.link.task_url import LinkTaskURL from tests.helpers.data_creator.core import DBDataCreator -class ExampleTaskOperator(URLTaskOperatorBase): +class ExampleTaskOperator( + URLTaskOperatorBase, +): @property def task_type(self) -> TaskType: @@ -31,14 +34,16 @@ async def test_example_task_success(db_data_creator: DBDataCreator): async def mock_inner_task_logic(self): # Add link to 3 urls - self.linked_url_ids = url_ids + await self.link_urls_to_task(url_ids) operator = ExampleTaskOperator(adb_client=db_data_creator.adb_client) operator.inner_task_logic = types.MethodType(mock_inner_task_logic, operator) - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.SUCCESS - assert run_info.linked_url_ids == url_ids + links: list[LinkTaskURL] = await db_data_creator.adb_client.get_all(LinkTaskURL) + assert len(links) == 3 + assert all(link.url_id in url_ids for link in links) @pytest.mark.asyncio @@ -49,7 +54,7 @@ def mock_inner_task_logic(self): raise ValueError("test error") operator.inner_task_logic = types.MethodType(mock_inner_task_logic, operator) - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.ERROR diff --git a/tests/automated/integration/tasks/url/impl/test_url_404_probe.py b/tests/automated/integration/tasks/url/impl/test_url_404_probe.py index 698c9c59..630f7f4e 100644 --- a/tests/automated/integration/tasks/url/impl/test_url_404_probe.py +++ b/tests/automated/integration/tasks/url/impl/test_url_404_probe.py @@ -100,7 +100,7 @@ async def mock_make_simple_requests(self, urls: list[str]) -> list[URLResponseIn assert meets_prereqs # Run task and validate results - run_info = await operator.run_task(task_id=1) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message @@ -149,7 +149,7 @@ def find_url(url_id: int) -> URL: assert meets_prereqs # Run the task and Ensure all but the URL previously marked as 404 have been checked again - run_info = await operator.run_task(task_id=2) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message probed_for_404_objects: list[URLProbedFor404] = await db_data_creator.adb_client.get_all(URLProbedFor404) diff --git a/tests/automated/integration/tasks/url/impl/test_url_miscellaneous_metadata_task.py b/tests/automated/integration/tasks/url/impl/test_url_miscellaneous_metadata_task.py index 5c6e32ac..0af83bff 100644 --- a/tests/automated/integration/tasks/url/impl/test_url_miscellaneous_metadata_task.py +++ b/tests/automated/integration/tasks/url/impl/test_url_miscellaneous_metadata_task.py @@ -94,7 +94,7 @@ async def test_url_miscellaneous_metadata_task(db_data_creator: DBDataCreator): assert meets_prereqs # Run task - run_info = await operator.run_task(1) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.SUCCESS # Check that each URL has the expected name/description and optional metadata diff --git a/tests/automated/integration/tasks/url/impl/test_url_record_type_task.py b/tests/automated/integration/tasks/url/impl/test_url_record_type_task.py index 1259441e..1373f3fa 100644 --- a/tests/automated/integration/tasks/url/impl/test_url_record_type_task.py +++ b/tests/automated/integration/tasks/url/impl/test_url_record_type_task.py @@ -32,9 +32,8 @@ async def test_url_record_type_task(db_data_creator: DBDataCreator): await db_data_creator.html_data(url_ids) assert await operator.meets_task_prerequisites() - task_id = await db_data_creator.adb_client.initiate_task(task_type=TaskType.RECORD_TYPE) - run_info = await operator.run_task(task_id) + run_info = await operator.run_task() assert run_info.outcome == TaskOperatorOutcome.SUCCESS # Task should have been created @@ -46,7 +45,6 @@ async def test_url_record_type_task(db_data_creator: DBDataCreator): assert len(tasks) == 1 task = tasks[0] assert task.type == TaskType.RECORD_TYPE - assert run_info.linked_url_ids == url_ids assert task.url_error_count == 1 # Get metadata diff --git a/tests/automated/integration/tasks/url/loader/conftest.py b/tests/automated/integration/tasks/url/loader/conftest.py index 814dd48a..045236f9 100644 --- a/tests/automated/integration/tasks/url/loader/conftest.py +++ b/tests/automated/integration/tasks/url/loader/conftest.py @@ -7,6 +7,7 @@ from src.core.tasks.url.operators.html.scraper.parser.core import HTMLResponseParser from src.db.client.async_ import AsyncDatabaseClient from src.external.huggingface.inference.client import HuggingFaceInferenceClient +from src.external.internet_archives.client import InternetArchivesClient from src.external.pdap.client import PDAPClient from src.external.url_request.core import URLRequestInterface @@ -20,5 +21,5 @@ def loader() -> URLTaskOperatorLoader: html_parser=AsyncMock(spec=HTMLResponseParser), pdap_client=AsyncMock(spec=PDAPClient), muckrock_api_interface=AsyncMock(spec=MuckrockAPIInterface), - hf_inference_client=AsyncMock(spec=HuggingFaceInferenceClient) + hf_inference_client=AsyncMock(spec=HuggingFaceInferenceClient), ) \ No newline at end of file diff --git a/tests/manual/core/tasks/url/test_url_html_task_operator.py b/tests/manual/core/tasks/url/test_url_html_task_operator.py index e0a409e3..280d108d 100644 --- a/tests/manual/core/tasks/url/test_url_html_task_operator.py +++ b/tests/manual/core/tasks/url/test_url_html_task_operator.py @@ -36,5 +36,5 @@ async def test_url_html_task_operator( url_request_interface=URLRequestInterface(), html_parser=parser ) - run_info = await operator.run_task(1) + run_info = await operator.run_task() pass \ No newline at end of file diff --git a/tests/manual/external/internet_archive/__init__.py b/tests/manual/external/internet_archive/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/manual/external/internet_archive/test_basic.py b/tests/manual/external/internet_archive/test_basic.py new file mode 100644 index 00000000..a25fa5df --- /dev/null +++ b/tests/manual/external/internet_archive/test_basic.py @@ -0,0 +1,18 @@ +import pytest +from aiohttp import ClientSession + +from src.external.internet_archives.client import InternetArchivesClient +from src.external.internet_archives.models.capture import IACapture + +# BASE_URL = "nola.gov/getattachment/NOPD/Policies/Chapter-12-1-Department-Operations-Manual-EFFECTIVE-1-14-18.pdf/" +BASE_URL = "example.com" +# BASE_URL = "hk45jk" + +@pytest.mark.asyncio +async def test_basic(): + """Test basic requests to the Internet Archive.""" + + async with ClientSession() as session: + client = InternetArchivesClient(session) + response = await client.search_for_url_snapshot(BASE_URL) + print(response) \ No newline at end of file diff --git a/uv.lock b/uv.lock index 70d4fd96..c97b9828 100644 --- a/uv.lock +++ b/uv.lock @@ -81,6 +81,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/3c/143831b32cd23b5263a995b2a1794e10aa42f8a895aae5074c20fda36c07/aiohttp-3.11.18-cp313-cp313-win_amd64.whl", hash = "sha256:bdd619c27e44382cf642223f11cfd4d795161362a5a1fc1fa3940397bc89db01", size = 437658, upload_time = "2025-04-21T09:42:29.209Z" }, ] +[[package]] +name = "aiolimiter" +version = "1.2.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f1/23/b52debf471f7a1e42e362d959a3982bdcb4fe13a5d46e63d28868807a79c/aiolimiter-1.2.1.tar.gz", hash = "sha256:e02a37ea1a855d9e832252a105420ad4d15011505512a1a1d814647451b5cca9", size = 7185, upload_time = "2024-12-08T15:31:51.496Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f3/ba/df6e8e1045aebc4778d19b8a3a9bc1808adb1619ba94ca354d9ba17d86c3/aiolimiter-1.2.1-py3-none-any.whl", hash = "sha256:d3f249e9059a20badcb56b61601a83556133655c11d1eb3dd3e04ff069e5f3c7", size = 6711, upload_time = "2024-12-08T15:31:49.874Z" }, +] + [[package]] name = "aiosignal" version = "1.3.2" @@ -381,6 +390,7 @@ version = "0.1.0" source = { virtual = "." } dependencies = [ { name = "aiohttp" }, + { name = "aiolimiter" }, { name = "alembic" }, { name = "apscheduler" }, { name = "asyncpg" }, @@ -428,6 +438,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiohttp", specifier = "~=3.11.11" }, + { name = "aiolimiter", specifier = ">=1.2.1" }, { name = "alembic", specifier = "~=1.14.0" }, { name = "apscheduler", specifier = "~=3.11.0" }, { name = "asyncpg", specifier = "~=0.30.0" },