Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Add link_urls_redirect_url table

Revision ID: 571ada5b81b9
Revises: 99eceed6e614
Create Date: 2025-08-03 18:00:06.345733

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

from src.util.alembic_helpers import id_column, created_at_column, updated_at_column

# revision identifiers, used by Alembic.
revision: str = '571ada5b81b9'
down_revision: Union[str, None] = '99eceed6e614'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

URLS_TABLE = 'urls'
LINK_URLS_REDIRECT_URL_TABLE = 'link_urls_redirect_url'

SOURCE_ENUM = sa.Enum(
'collector',
'data_sources_app',
'redirect',
'root_url',
'manual',
name='url_source'
)

def upgrade() -> None:

Check warning on line 33 in alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py#L33 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py:33:1: D103 Missing docstring in public function
_create_link_urls_redirect_url_table()
_add_source_column_to_urls_table()



def downgrade() -> None:

Check warning on line 39 in alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py#L39 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py:39:1: D103 Missing docstring in public function

Check failure on line 39 in alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py#L39 <303>

too many blank lines (3)
Raw output
./alembic/versions/2025_08_03_1800-571ada5b81b9_add_link_urls_redirect_url_table.py:39:1: E303 too many blank lines (3)
_drop_link_urls_redirect_url_table()
_drop_source_column_from_urls_table()


def _create_link_urls_redirect_url_table():
op.create_table(
LINK_URLS_REDIRECT_URL_TABLE,
id_column(),
sa.Column('source_url_id', sa.Integer(), nullable=False),
sa.Column('destination_url_id', sa.Integer(), nullable=False),
created_at_column(),
updated_at_column(),
sa.ForeignKeyConstraint(['source_url_id'], [URLS_TABLE + '.id'], ),
sa.ForeignKeyConstraint(['destination_url_id'], [URLS_TABLE + '.id'], ),
sa.UniqueConstraint(
'source_url_id',
'destination_url_id',
name='link_urls_redirect_url_uq_source_url_id_destination_url_id'
),
)


def _add_source_column_to_urls_table():
# Create enum
SOURCE_ENUM.create(op.get_bind(), checkfirst=True)
op.add_column(
URLS_TABLE,
sa.Column(
'source',
SOURCE_ENUM,
nullable=True,
comment='The source of the URL.'
)
)
# Add sources to existing URLs
op.execute(
f"""UPDATE {URLS_TABLE}
SET source = 'collector'::url_source
"""
)
op.execute(
f"""UPDATE {URLS_TABLE}
SET source = 'data_sources_app'::url_source
FROM url_data_sources WHERE url_data_sources.url_id = {URLS_TABLE}.id
AND url_data_sources.data_source_id IS NOT NULL;
"""
)
op.execute(
f"""UPDATE {URLS_TABLE}
SET source = 'collector'::url_source
FROM link_batch_urls WHERE link_batch_urls.url_id = {URLS_TABLE}.id
AND link_batch_urls.batch_id IS NOT NULL;
"""
)

# Make source required
op.alter_column(
URLS_TABLE,
'source',
nullable=False
)


def _drop_link_urls_redirect_url_table():
op.drop_table(LINK_URLS_REDIRECT_URL_TABLE)


def _drop_source_column_from_urls_table():
op.drop_column(URLS_TABLE, 'source')
# Drop enum
SOURCE_ENUM.drop(op.get_bind(), checkfirst=True)
2 changes: 2 additions & 0 deletions src/api/endpoints/collector/manual/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.core.enums import BatchStatus
from src.db.models.instantiations.batch.sqlalchemy import Batch
from src.db.models.instantiations.link.batch_url import LinkBatchURL
from src.db.models.instantiations.url.core.enums import URLSource
from src.db.models.instantiations.url.core.sqlalchemy import URL
from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata
from src.db.queries.base.builder import QueryBuilderBase
Expand Down Expand Up @@ -48,6 +49,7 @@ async def run(self, session: AsyncSession) -> ManualBatchResponseDTO:
collector_metadata=entry.collector_metadata,
outcome=URLStatus.PENDING.value,
record_type=entry.record_type.value if entry.record_type is not None else None,
source=URLSource.MANUAL
)

async with session.begin_nested():
Expand Down
2 changes: 2 additions & 0 deletions src/core/preprocessors/autogoogler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

from src.core.preprocessors.base import PreprocessorBase
from src.db.models.instantiations.url.core.enums import URLSource
from src.db.models.instantiations.url.core.pydantic.info import URLInfo


Expand All @@ -18,6 +19,7 @@ def preprocess_entry(self, entry: dict) -> list[URLInfo]:
"snippet": qr["snippet"],
"title": qr["title"]
},
source=URLSource.COLLECTOR
))

return url_infos
Expand Down
2 changes: 2 additions & 0 deletions src/core/preprocessors/common_crawler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

from src.core.preprocessors.base import PreprocessorBase
from src.db.models.instantiations.url.core.enums import URLSource
from src.db.models.instantiations.url.core.pydantic.info import URLInfo


Expand All @@ -12,6 +13,7 @@ def preprocess(self, data: dict) -> List[URLInfo]:
for url in data["urls"]:
url_info = URLInfo(
url=url,
source=URLSource.COLLECTOR
)
url_infos.append(url_info)

Expand Down
2 changes: 2 additions & 0 deletions src/core/preprocessors/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from src.collectors.source_collectors.example.dtos.output import ExampleOutputDTO
from src.core.preprocessors.base import PreprocessorBase
from src.db.models.instantiations.url.core.enums import URLSource
from src.db.models.instantiations.url.core.pydantic.info import URLInfo


Expand All @@ -12,6 +13,7 @@ def preprocess(self, data: ExampleOutputDTO) -> List[URLInfo]:
for url in data.urls:
url_info = URLInfo(
url=url,
source=URLSource.COLLECTOR
)
url_infos.append(url_info)

Expand Down
2 changes: 2 additions & 0 deletions src/core/preprocessors/muckrock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

from src.core.preprocessors.base import PreprocessorBase
from src.db.models.instantiations.url.core.enums import URLSource
from src.db.models.instantiations.url.core.pydantic.info import URLInfo


Expand All @@ -12,6 +13,7 @@ def preprocess(self, data: dict) -> List[URLInfo]:
url_info = URLInfo(
url=entry["url"],
collector_metadata=entry["metadata"],
source=URLSource.COLLECTOR
)
url_infos.append(url_info)

Expand Down
11 changes: 11 additions & 0 deletions src/core/tasks/scheduled/convert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from src.core.tasks.scheduled.enums import IntervalEnum

Check warning on line 1 in src/core/tasks/scheduled/convert.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/convert.py#L1 <100>

Missing docstring in public module
Raw output
./src/core/tasks/scheduled/convert.py:1:1: D100 Missing docstring in public module


def convert_interval_enum_to_hours(interval: IntervalEnum) -> int:

Check warning on line 4 in src/core/tasks/scheduled/convert.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/convert.py#L4 <103>

Missing docstring in public function
Raw output
./src/core/tasks/scheduled/convert.py:4:1: D103 Missing docstring in public function
match interval:
case IntervalEnum.DAILY:
return 24
case IntervalEnum.HOURLY:
return 1
case _:
raise ValueError(f"Invalid interval: {interval}")

Check warning on line 11 in src/core/tasks/scheduled/convert.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/convert.py#L11 <292>

no newline at end of file
Raw output
./src/core/tasks/scheduled/convert.py:11:62: W292 no newline at end of file
6 changes: 6 additions & 0 deletions src/core/tasks/scheduled/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import Enum

Check warning on line 1 in src/core/tasks/scheduled/enums.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/enums.py#L1 <100>

Missing docstring in public module
Raw output
./src/core/tasks/scheduled/enums.py:1:1: D100 Missing docstring in public module


class IntervalEnum(Enum):

Check warning on line 4 in src/core/tasks/scheduled/enums.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/enums.py#L4 <101>

Missing docstring in public class
Raw output
./src/core/tasks/scheduled/enums.py:4:1: D101 Missing docstring in public class
DAILY = "DAILY"
HOURLY = "HOURLY"

Check warning on line 6 in src/core/tasks/scheduled/enums.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/enums.py#L6 <292>

no newline at end of file
Raw output
./src/core/tasks/scheduled/enums.py:6:22: W292 no newline at end of file
119 changes: 62 additions & 57 deletions src/core/tasks/scheduled/manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from datetime import datetime, timedelta

from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from src.core.core import AsyncCore
from src.core.tasks.base.run_info import TaskOperatorRunInfo
from src.core.tasks.handler import TaskHandler
from src.core.tasks.scheduled.convert import convert_interval_enum_to_hours
from src.core.tasks.scheduled.enums import IntervalEnum
from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader
from src.core.tasks.scheduled.models.entry import ScheduledTaskEntry
from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase


Expand All @@ -26,71 +30,72 @@ def __init__(
self.scheduler = AsyncIOScheduler()

# Jobs
self.run_cycles_job = None
self.delete_logs_job = None
self.populate_backlog_snapshot_job = None
self.sync_agencies_job = None
self.sync_data_sources_job = None
self.push_to_hugging_face_job = None
self._jobs: dict[str, Job] = {}


async def setup(self):
self.scheduler.start()
await self.add_scheduled_tasks()

async def add_scheduled_tasks(self):
self.run_cycles_job = self.scheduler.add_job(
self.async_core.run_tasks,
trigger=IntervalTrigger(
hours=1,
start_date=datetime.now() + timedelta(minutes=1)
async def _get_entries(self) -> list[ScheduledTaskEntry]:
return [
ScheduledTaskEntry(
name="Run Task Cycles",
function=self.async_core.run_tasks,
interval=IntervalEnum.HOURLY
),
misfire_grace_time=60
)
self.delete_logs_job = self.scheduler.add_job(
self.async_core.adb_client.delete_old_logs,
trigger=IntervalTrigger(
days=1,
start_date=datetime.now() + timedelta(minutes=10)
)
)
self.populate_backlog_snapshot_job = self.scheduler.add_job(
self.async_core.adb_client.populate_backlog_snapshot,
trigger=IntervalTrigger(
days=1,
start_date=datetime.now() + timedelta(minutes=20)
)
)
self.sync_agencies_job = self.scheduler.add_job(
self.run_task,
trigger=IntervalTrigger(
days=1,
start_date=datetime.now() + timedelta(minutes=2)
ScheduledTaskEntry(
name="Delete Old Logs",
function=self.async_core.adb_client.delete_old_logs,
interval=IntervalEnum.DAILY
),
ScheduledTaskEntry(
name="Populate Backlog Snapshot",
function=self.async_core.adb_client.populate_backlog_snapshot,
interval=IntervalEnum.DAILY
),
kwargs={
"operator": await self.loader.get_sync_agencies_task_operator()
}
)
self.sync_data_sources_job = self.scheduler.add_job(
self.run_task,
trigger=IntervalTrigger(
days=1,
start_date=datetime.now() + timedelta(minutes=3)
ScheduledTaskEntry(
name="Sync Agencies",
function=self.run_task,
interval=IntervalEnum.DAILY,
kwargs={
"operator": await self.loader.get_sync_agencies_task_operator()
}
),
kwargs={
"operator": await self.loader.get_sync_data_sources_task_operator()
}
)
# TODO: enable once more URLs with HTML have been added to the database.
# self.push_to_hugging_face_job = self.scheduler.add_job(
# self.run_task,
# trigger=IntervalTrigger(
# days=1,
# start_date=datetime.now() + timedelta(minutes=4)
# ),
# kwargs={
# "operator": await self.loader.get_push_to_hugging_face_task_operator()
# }
# )
ScheduledTaskEntry(
name="Sync Data Sources",
function=self.run_task,
interval=IntervalEnum.DAILY,
kwargs={
"operator": await self.loader.get_sync_data_sources_task_operator()
}
),
# ScheduledTaskEntry(
# name="Push to Hugging Face",
# function=self.run_task,
# interval=IntervalEnum.DAILY,
# kwargs={
# "operator": await self.loader.get_push_to_hugging_face_task_operator()
# }
# )
]

async def add_scheduled_tasks(self):
"""
Modifies:
self._jobs
"""
entries: list[ScheduledTaskEntry] = await self._get_entries()
for idx, entry in enumerate(entries):
self._jobs[entry.name] = self.scheduler.add_job(
entry.function,
trigger=IntervalTrigger(
hours=convert_interval_enum_to_hours(entry.interval),
start_date=datetime.now() + timedelta(minutes=idx + 1)
),
misfire_grace_time=60,
kwargs=entry.kwargs
)

def shutdown(self):
if self.scheduler.running:
Expand Down
16 changes: 16 additions & 0 deletions src/core/tasks/scheduled/models/entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any

Check warning on line 1 in src/core/tasks/scheduled/models/entry.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/models/entry.py#L1 <100>

Missing docstring in public module
Raw output
./src/core/tasks/scheduled/models/entry.py:1:1: D100 Missing docstring in public module

from pydantic import BaseModel

from src.core.tasks.scheduled.enums import IntervalEnum


class ScheduledTaskEntry(BaseModel):

Check warning on line 8 in src/core/tasks/scheduled/models/entry.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/models/entry.py#L8 <101>

Missing docstring in public class
Raw output
./src/core/tasks/scheduled/models/entry.py:8:1: D101 Missing docstring in public class

class Config:

Check warning on line 10 in src/core/tasks/scheduled/models/entry.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/models/entry.py#L10 <106>

Missing docstring in public nested class
Raw output
./src/core/tasks/scheduled/models/entry.py:10:1: D106 Missing docstring in public nested class
arbitrary_types_allowed = True

name: str
function: Any
interval: IntervalEnum
kwargs: dict[str, Any] = {}

Check warning on line 16 in src/core/tasks/scheduled/models/entry.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/models/entry.py#L16 <292>

no newline at end of file
Raw output
./src/core/tasks/scheduled/models/entry.py:16:32: W292 no newline at end of file
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from src.collectors.enums import URLStatus
from src.core.enums import RecordType
from src.db.models.instantiations.url.core.enums import URLSource
from src.db.models.instantiations.url.core.sqlalchemy import URL
from src.db.templates.markers.bulk.insert import BulkInsertableModel

Expand All @@ -10,6 +11,7 @@ class InsertURLForDataSourcesSyncParams(BulkInsertableModel):
description: str | None
outcome: URLStatus
record_type: RecordType
source: URLSource = URLSource.DATA_SOURCES

@classmethod
def sa_model(cls) -> type[URL]:
Expand Down
Loading