Skip to content

Commit 2f6b0f3

Browse files
committed
Added a tool to start the ingestion pipeline
1 parent a9f1e7f commit 2f6b0f3

2 files changed

Lines changed: 25 additions & 2 deletions

File tree

packages/fetchcraft-ingestion-admin/src/fetchcraft/ingestion/admin/ingestion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from fetchcraft.embeddings import OpenAIEmbeddings
1111
from fetchcraft.index.vector_index import VectorIndex
1212
from fetchcraft.ingestion.base import ConnectorSource, IngestionPipeline, Record, Sink
13-
from fetchcraft.ingestion.pipeline.transformation import ChunkingTransformation
13+
from fetchcraft.ingestion.pipeline.transformation import ChunkingTransformation, DocumentSummarization
1414
from fetchcraft.ingestion.sqlite_backend import AsyncSQLiteQueue
1515
from fetchcraft.node import DocumentNode, Node
1616
from fetchcraft.node_parser import HierarchicalNodeParser
@@ -149,7 +149,7 @@ def filter_fn(file: LocalFile) -> bool:
149149
"application/pdf": RemoteDoclingParser(docling_url=DOCLING_SERVER)
150150
}
151151
))
152-
# .add_transformation(DocumentSummarization(max_sentences=2, simulate_latency=0.2), deferred=True)
152+
# .add_transformation(DocumentSummarization(max_sentences=2), deferred=True)
153153
# .add_transformation(ExtractKeywords())
154154
.add_transformation(ChunkingTransformation(chunker=chunker))
155155
.add_sink(DocumentStoreSink(doc_store=doc_store))

packages/fetchcraft-ingestion-admin/src/fetchcraft/ingestion/admin/server.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,29 @@ async def list_queues() -> dict:
583583
raise RuntimeError(f"Error listing queues: {str(e)}")
584584

585585

586+
@mcp.tool()
587+
async def execute_ingestion() -> str:
588+
"""
589+
Run the ingestion pipeline.
590+
591+
This tool will run the ingestion pipeline to refresh the knowledgebase.
592+
593+
Returns:
594+
Dictionary confirming the retry operation
595+
"""
596+
from multiprocessing import Process
597+
598+
def _run_ingestion_wrapper():
599+
"""Wrapper to run the async ingestion function in a separate process."""
600+
asyncio.run(run_ingestion())
601+
602+
print("Starting ingestion in a separate process...")
603+
p = Process(target=_run_ingestion_wrapper, name="ingestion-process")
604+
p.start()
605+
606+
return f"Ingestion started in process {p.pid}. The process will run independently."
607+
608+
586609
# ============================================================================
587610
# Main Entry Point
588611
# ============================================================================

0 commit comments

Comments
 (0)