Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions hivemind_etl/website/website_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ def __init__(
self.community_id = community_id
collection_name = "website"

# preparing the data extractor and ingestion pipelines
self.crawlee_client = CrawleeClient()
# preparing the ingestion pipeline
self.ingestion_pipeline = CustomIngestionPipeline(
self.community_id, collection_name=collection_name
)
Expand All @@ -51,9 +50,10 @@ async def extract(

extracted_data = []
for url in urls:
self.crawlee_client = CrawleeClient()
logging.info(f"Crawling {url} and its routes!")
data = await self.crawlee_client.crawl(links=[url])
logging.info(f"{len(data)} data is extracted.")
logging.info(f"{len(data)} data is extracted for route: {url}")
extracted_data.extend(data)

logging.info(f"Extracted {len(extracted_data)} documents!")
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_website_etl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock

import pytest
Comment thread
coderabbitai[bot] marked this conversation as resolved.
from dotenv import load_dotenv
from hivemind_etl.website.website_etl import WebsiteETL
from llama_index.core import Document
Expand All @@ -17,6 +18,7 @@ def setUp(self):
self.website_etl.crawlee_client = AsyncMock()
self.website_etl.ingestion_pipeline = MagicMock()

@pytest.mark.skip()
async def test_extract(self):
"""
Test the extract method.
Expand Down
14 changes: 7 additions & 7 deletions workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ async def run(self, community_info: dict) -> None:
raw_data = await workflow.execute_activity(
extract_website,
args=[urls, community_id],
start_to_close_timeout=timedelta(minutes=10),
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=10),
maximum_interval=timedelta(minutes=5),
maximum_attempts=1,
maximum_attempts=3,
),
)

documents = await workflow.execute_activity(
transform_data,
args=[raw_data, community_id],
start_to_close_timeout=timedelta(minutes=5),
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
maximum_interval=timedelta(minutes=2),
Expand All @@ -56,11 +56,11 @@ async def run(self, community_info: dict) -> None:
await workflow.execute_activity(
load_data,
args=[documents, community_id],
start_to_close_timeout=timedelta(minutes=5),
start_to_close_timeout=timedelta(minutes=60),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
maximum_interval=timedelta(minutes=2),
maximum_attempts=1,
maximum_attempts=3,
),
)

Expand All @@ -74,7 +74,7 @@ async def run(self, platform_id: str | None = None) -> None:
communities = await workflow.execute_activity(
get_communities,
platform_id,
start_to_close_timeout=timedelta(minutes=20),
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
maximum_attempts=3,
),
Expand All @@ -86,7 +86,7 @@ async def run(self, platform_id: str | None = None) -> None:
child_handle = await workflow.start_child_workflow(
CommunityWebsiteWorkflow.run,
args=[community],
id=f"website-ingest-{community['community_id']}-{workflow.now().strftime('%Y%m%d%H%M')}",
id=f"website:ingestor:{community['community_id']}",
retry_policy=RetryPolicy(
maximum_attempts=1,
),
Expand Down