From 6c0958a97480b80ca891aed58f6f506a404e19e1 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Tue, 4 Jul 2023 11:41:13 +0200 Subject: [PATCH 01/18] extend db methods for DBWorkspace --- ocrd_network/ocrd_network/database.py | 71 +++++++++++++++++-- ocrd_network/ocrd_network/models/workspace.py | 3 + 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 2daf71761b..21a053b63e 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -1,6 +1,6 @@ """ The database is used to store information regarding jobs and workspaces. -Jobs: for every process-request a job is inserted into the database with a uuid, status and +Jobs: for every process-request a job is inserted into the database with an uuid, status and information about the process like parameters and file groups. It is mainly used to track the status (`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished jobs are not deleted from the database. @@ -35,18 +35,77 @@ async def sync_initiate_database(db_url: str): await initiate_database(db_url) -async def db_get_workspace(workspace_id: str) -> DBWorkspace: +async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace: + workspace = None + if workspace_id: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_id == workspace_id + ) + if workspace_mets_path: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_mets_path == workspace_mets_path + ) + if not workspace: + raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') + return workspace + + +@call_sync +async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace: + return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path) + + +async def db_get_workspace_by_path(workspace_mets_path: str) -> DBWorkspace: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_mets_path == workspace_mets_path + ) + if not workspace: + raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.') + return workspace + + +@call_sync +async def sync_db_get_workspace_by_path(workspace_mets_path: str) -> DBWorkspace: + return await db_get_workspace_by_path(workspace_mets_path=workspace_mets_path) + + +async def db_update_workspace(workspace_id: str, **kwargs): workspace = await DBWorkspace.find_one( DBWorkspace.workspace_id == workspace_id ) if not workspace: raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') - return workspace + + job_keys = list(workspace.__dict__.keys()) + for key, value in kwargs.items(): + if key not in job_keys: + raise ValueError(f'Field "{key}" is not available.') + if key == 'workspace_id': + workspace.workspace_id = value + elif key == 'workspace_mets_path': + workspace.workspace_mets_path = value + elif key == 'ocrd_identifier': + workspace.ocrd_identifier = value + elif key == 'bagit_profile_identifier': + workspace.bagit_profile_identifier = value + elif key == 'ocrd_base_version_checksum': + workspace.ocrd_base_version_checksum = value + elif key == 'ocrd_mets': + workspace.ocrd_mets = value + elif key == 'bag_info_adds': + workspace.bag_info_adds = value + elif key == 'deleted': + workspace.deleted = value + elif key == 'being_processed': + workspace.being_processed = value + else: + raise ValueError(f'Field "{key}" is not updatable.') + await workspace.save() @call_sync -async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace: - return await db_get_workspace(workspace_id) +async def sync_db_update_workspace(workspace_id: str, **kwargs): + await db_update_workspace(workspace_id=workspace_id, **kwargs) async def db_get_processing_job(job_id: str) -> DBProcessorJob: @@ -68,8 +127,6 @@ async def db_update_processing_job(job_id: str, **kwargs): if not job: raise ValueError(f'Processing job with id "{job_id}" not in the DB.') - # TODO: This may not be the best Pythonic way to do it. However, it works! - # There must be a shorter way with Pydantic. Suggest an improvement. job_keys = list(job.__dict__.keys()) for key, value in kwargs.items(): if key not in job_keys: diff --git a/ocrd_network/ocrd_network/models/workspace.py b/ocrd_network/ocrd_network/models/workspace.py index 2a597b15ba..bb24412e65 100644 --- a/ocrd_network/ocrd_network/models/workspace.py +++ b/ocrd_network/ocrd_network/models/workspace.py @@ -15,6 +15,8 @@ class DBWorkspace(Document): ocrd_mets Ocrd-Mets (optional) bag_info_adds bag-info.txt can also (optionally) contain additional key-value-pairs which are saved here + deleted the document is deleted if set, however, the record is still preserved + being_processed whether the workspace is currently used in a workflow execution or not """ workspace_id: str workspace_mets_path: str @@ -24,6 +26,7 @@ class DBWorkspace(Document): ocrd_mets: Optional[str] bag_info_adds: Optional[dict] deleted: bool = False + being_processed: bool = False class Settings: name = "workspace" From a74e7be9a0287130ac36bfb8401bdf9d9eb720ca Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Tue, 4 Jul 2023 11:50:07 +0200 Subject: [PATCH 02/18] make db workspace method flexible --- ocrd_network/ocrd_network/database.py | 32 ++++++++++----------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 21a053b63e..10bac52327 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -55,24 +55,16 @@ async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: s return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path) -async def db_get_workspace_by_path(workspace_mets_path: str) -> DBWorkspace: - workspace = await DBWorkspace.find_one( - DBWorkspace.workspace_mets_path == workspace_mets_path - ) - if not workspace: - raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.') - return workspace - - -@call_sync -async def sync_db_get_workspace_by_path(workspace_mets_path: str) -> DBWorkspace: - return await db_get_workspace_by_path(workspace_mets_path=workspace_mets_path) - - -async def db_update_workspace(workspace_id: str, **kwargs): - workspace = await DBWorkspace.find_one( - DBWorkspace.workspace_id == workspace_id - ) +async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): + workspace = None + if workspace_id: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_id == workspace_id + ) + if workspace_mets_path: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_mets_path == workspace_mets_path + ) if not workspace: raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') @@ -104,8 +96,8 @@ async def db_update_workspace(workspace_id: str, **kwargs): @call_sync -async def sync_db_update_workspace(workspace_id: str, **kwargs): - await db_update_workspace(workspace_id=workspace_id, **kwargs) +async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): + await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs) async def db_get_processing_job(job_id: str) -> DBProcessorJob: From 669437da53186899ae0fadda5b3e8755334e33f8 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Tue, 4 Jul 2023 12:13:39 +0200 Subject: [PATCH 03/18] db lock/unlock workspaces --- .../ocrd_network/processing_server.py | 29 ++++++++++++++++++- .../ocrd_network/processing_worker.py | 8 ++++- ocrd_network/ocrd_network/processor_server.py | 7 +++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 2a542219f1..478b73bcc8 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -11,7 +11,11 @@ from pika.exceptions import ChannelClosedByBroker from ocrd_utils import getLogger -from .database import initiate_database +from .database import ( + initiate_database, + db_get_workspace, + db_update_workspace +) from .deployer import Deployer from .models import ( DBProcessorJob, @@ -271,6 +275,29 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unknown network agent with value: {data.agent_type}" ) + workspace_db = await db_get_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets + ) + if not workspace_db: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found" + ) + else: + # The workspace is currently locked (being processed) + if workspace_db.being_processed: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with id: {data.workspace_id} or " + f"path: {data.path_to_mets} is currently being processed" + ) + # Lock the workspace + await db_update_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets, + being_processed=True + ) job_output = None if data.agent_type == 'worker': job_output = await self.push_to_processing_queue(processor_name, data) diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index 2fd8a1b25b..cc683ad33e 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -22,6 +22,7 @@ sync_initiate_database, sync_db_get_workspace, sync_db_update_processing_job, + sync_db_update_workspace ) from .models import StateEnum from .process_helpers import invoke_processor @@ -228,7 +229,12 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: end_time=end_time, exec_time=f'{exec_duration} ms' ) - + # Unlock the workspace + sync_db_update_workspace( + workspace_id=workspace_id, + workspace_mets_path=path_to_mets, + being_processed=False + ) if result_queue_name or callback_url: result_message = OcrdResultMessage( job_id=job_id, diff --git a/ocrd_network/ocrd_network/processor_server.py b/ocrd_network/ocrd_network/processor_server.py index 785b82c61b..21e825ed3b 100644 --- a/ocrd_network/ocrd_network/processor_server.py +++ b/ocrd_network/ocrd_network/processor_server.py @@ -14,6 +14,7 @@ from .database import ( DBProcessorJob, db_update_processing_job, + db_update_workspace, initiate_database ) from .models import ( @@ -173,6 +174,12 @@ async def run_processor_task(self, job_id: str, job: DBProcessorJob): end_time=end_time, exec_time=f'{exec_duration} ms' ) + # Unlock the workspace + await db_update_workspace( + workspace_id=job.workspace_id, + workspace_mets_path=job.path_to_mets, + being_processed=False + ) def get_ocrd_tool(self): if self.ocrd_tool: From ce989576efb70f78807086dddd09ba0aa6bd90c9 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Wed, 5 Jul 2023 16:48:17 +0200 Subject: [PATCH 04/18] refactoring to match new requirements --- ocrd_network/ocrd_network/models/job.py | 9 ++ .../ocrd_network/processing_server.py | 113 ++++++++++++------ .../ocrd_network/processing_worker.py | 48 ++++---- ocrd_network/ocrd_network/processor_server.py | 56 ++++++--- ocrd_network/ocrd_network/server_utils.py | 10 +- ocrd_network/ocrd_network/utils.py | 14 +++ 6 files changed, 167 insertions(+), 83 deletions(-) diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index aa50e6aad8..74f30a490c 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -7,9 +7,15 @@ class StateEnum(str, Enum): + # The processing job is cached inside the Processing Server requests cache + cached = 'CACHED' + # The processing job is queued inside the RabbitMQ queued = 'QUEUED' + # Processing job is currently running in a Worker or Processor Server running = 'RUNNING' + # Processing job finished successfully success = 'SUCCESS' + # Processing job failed failed = 'FAILED' @@ -28,6 +34,8 @@ class PYJobInput(BaseModel): # Used to toggle between sending requests to 'worker and 'server', # i.e., Processing Worker and Processor Server, respectively agent_type: Optional[str] = 'worker' + # Auto generated by the Processing Server when forwarding to the Processor Server + job_id: Optional[str] = None class Config: schema_extra = { @@ -67,6 +75,7 @@ class DBProcessorJob(Document): parameters: Optional[dict] result_queue_name: Optional[str] callback_url: Optional[str] + internal_callback_url: Optional[str] start_time: Optional[datetime] end_time: Optional[datetime] exec_time: Optional[str] diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 478b73bcc8..238d5dda5f 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -3,6 +3,7 @@ import httpx from typing import Dict, List import uvicorn +from queue import Queue from fastapi import FastAPI, status, Request, HTTPException from fastapi.exceptions import RequestValidationError @@ -23,10 +24,14 @@ PYJobOutput, StateEnum ) -from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage +from .rabbitmq_utils import ( + RMQPublisher, + OcrdProcessingMessage, + OcrdResultMessage +) from .server_utils import ( _get_processor_job, - validate_and_resolve_mets_path, + validate_and_return_mets_path, validate_job_input, ) from .utils import ( @@ -73,6 +78,11 @@ def __init__(self, config_path: str, host: str, port: int) -> None: # Gets assigned when `connect_publisher` is called on the working object self.rmq_publisher = None + # Used for buffering/caching processing requests in the Processing Server + # Key: `workspace_id` or `path_to_mets` depending on which is provided + # Value: Queue that holds PYInputJob elements + self.processing_requests_cache = {} + # Create routes self.router.add_api_route( path='/stop', @@ -106,6 +116,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None: response_model_exclude_none=True ) + self.router.add_api_route( + path='/processor/result_callback/{job_id}', + endpoint=self.remove_from_request_cache, + methods=['POST'], + tags=['processing'], + status_code=status.HTTP_200_OK, + summary='Callback used by a worker or processor server for successful processing of a request', + ) + self.router.add_api_route( path='/processor/{processor_name}', endpoint=self.get_processor_info, @@ -270,6 +289,13 @@ def query_ocrd_tool_json_from_server(self, processor_name): return ocrd_tool, processor_server_url async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: + if data.job_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Job id field is set but must not be: {data.job_id}" + ) + data.job_id = generate_id() # Generate processing job id + if data.agent_type not in ['worker', 'server']: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, @@ -284,25 +310,55 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found" ) - else: - # The workspace is currently locked (being processed) - if workspace_db.being_processed: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Workspace with id: {data.workspace_id} or " - f"path: {data.path_to_mets} is currently being processed" - ) - # Lock the workspace - await db_update_workspace( - workspace_id=data.workspace_id, - workspace_mets_path=data.path_to_mets, - being_processed=True + + # The workspace is currently locked (being processed) + # TODO: Do the proper caching here, after the refactored code is working + if workspace_db.being_processed: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with id: {data.workspace_id} or " + f"path: {data.path_to_mets} is currently being processed" ) + + workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets + # If a record queue of this workspace_id does not exist in the requests cache + if not self.processing_requests_cache.get(workspace_key, None): + self.processing_requests_cache[workspace_key] = Queue() + # Add the processing request to the internal queue + self.processing_requests_cache[workspace_key].put(data) + + data = self.processing_requests_cache[workspace_key].get() + # Lock the workspace + await db_update_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets, + being_processed=True + ) + + # Since the path is not resolved yet, + # the return value is not important for the Processing Server + await validate_and_return_mets_path(self.log, data) + + # Create a DB entry + job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + processor_name=processor_name, + internal_callback_url=f"/processor/result_callback/{data.job_id}", + state=StateEnum.queued + ) + await job.insert() + job_output = None if data.agent_type == 'worker': - job_output = await self.push_to_processing_queue(processor_name, data) + ocrd_tool = await self.get_processor_info(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(job) + await self.push_to_processing_queue(processor_name, processing_message) + job_output = job.to_job_output() if data.agent_type == 'server': - job_output = await self.push_to_processor_server(processor_name, data) + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) if not job_output: self.log.exception('Failed to create job output') raise HTTPException( @@ -312,10 +368,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ return job_output # TODO: Revisit and remove duplications between push_to_* methods - async def push_to_processing_queue(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: - ocrd_tool = await self.get_processor_info(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, job_input) - job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False) + async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage): if not self.rmq_publisher: raise Exception('RMQPublisher is not connected') deployed_processors = self.deployer.find_matching_processors( @@ -326,14 +379,6 @@ async def push_to_processing_queue(self, processor_name: str, job_input: PYJobIn if processor_name not in deployed_processors: self.check_if_queue_exists(processor_name) - job = DBProcessorJob( - **job_input.dict(exclude_unset=True, exclude_none=True), - job_id=generate_id(), - processor_name=processor_name, - state=StateEnum.queued - ) - await job.insert() - processing_message = self.create_processing_message(job) encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) try: self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) @@ -343,12 +388,8 @@ async def push_to_processing_queue(self, processor_name: str, job_input: PYJobIn status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f'RMQPublisher has failed: {error}' ) - return job.to_job_output() - async def push_to_processor_server(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, job_input) - job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False) + async def push_to_processor_server(self, processor_name: str, processor_server_url: str, job_input: PYJobInput) -> PYJobOutput: try: json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True)) except Exception as e: @@ -384,6 +425,10 @@ async def push_to_processor_server(self, processor_name: str, job_input: PYJobIn async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, processor_name, job_id) + async def remove_from_request_cache(self, processor_name: str, job_id: str, ocrd_result: OcrdResultMessage): + # TODO: Implement, after the refactored code is working + pass + async def get_processor_info(self, processor_name) -> Dict: """ Return a processor's ocrd-tool.json """ diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index cc683ad33e..29ee6fdbf6 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -11,7 +11,6 @@ from datetime import datetime import logging from os import getpid -import requests import pika.spec import pika.adapters.blocking_connection @@ -34,6 +33,7 @@ ) from .utils import ( calculate_execution_time, + post_to_callback_url, tf_disable_interactive_logs, verify_database_uri, verify_and_parse_mq_uri @@ -190,6 +190,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: page_id = processing_message.page_id if 'page_id' in pm_keys else None result_queue_name = processing_message.result_queue_name if 'result_queue_name' in pm_keys else None callback_url = processing_message.callback_url if 'callback_url' in pm_keys else None + internal_callback_url = processing_message.callback_url if 'internal_callback_url' in pm_keys else None parameters = processing_message.parameters if processing_message.parameters else {} if not path_to_mets and workspace_id: @@ -235,21 +236,25 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: workspace_mets_path=path_to_mets, being_processed=False ) - if result_queue_name or callback_url: - result_message = OcrdResultMessage( - job_id=job_id, - state=job_state.value, - path_to_mets=path_to_mets, - # May not be always available - workspace_id=workspace_id - ) - self.log.info(f'Result message: {result_message}') - # If the result_queue field is set, send the result message to a result queue - if result_queue_name: - self.publish_to_result_queue(result_queue_name, result_message) - # If the callback_url field is set, post the result message to a callback url - if callback_url: - self.post_to_callback_url(callback_url, result_message) + result_message = OcrdResultMessage( + job_id=job_id, + state=job_state.value, + path_to_mets=path_to_mets, + # May not be always available + workspace_id=workspace_id + ) + self.log.info(f'Result message: {result_message}') + # If the result_queue field is set, send the result message to a result queue + if result_queue_name: + self.publish_to_result_queue(result_queue_name, result_message) + if callback_url: + # If the callback_url field is set, + # post the result message (callback to a user defined endpoint) + post_to_callback_url(self.log, callback_url, result_message) + if internal_callback_url: + # If the internal callback_url field is set, + # post the result message (callback to Processing Server endpoint) + post_to_callback_url(self.log, internal_callback_url, result_message) def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultMessage): if self.rmq_publisher is None: @@ -264,14 +269,3 @@ def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultM message=encoded_result_message ) - def post_to_callback_url(self, callback_url: str, result_message: OcrdResultMessage): - self.log.info(f'Posting result message to callback_url "{callback_url}"') - headers = {"Content-Type": "application/json"} - json_data = { - "job_id": result_message.job_id, - "state": result_message.state, - "path_to_mets": result_message.path_to_mets, - "workspace_id": result_message.workspace_id - } - response = requests.post(url=callback_url, headers=headers, json=json_data) - self.log.info(f'Response from callback_url "{response}"') diff --git a/ocrd_network/ocrd_network/processor_server.py b/ocrd_network/ocrd_network/processor_server.py index 21e825ed3b..764d27bc14 100644 --- a/ocrd_network/ocrd_network/processor_server.py +++ b/ocrd_network/ocrd_network/processor_server.py @@ -4,7 +4,7 @@ from subprocess import run, PIPE import uvicorn -from fastapi import FastAPI, HTTPException, status, BackgroundTasks +from fastapi import FastAPI, HTTPException, status from ocrd_utils import ( get_ocrd_tool_json, @@ -24,13 +24,15 @@ StateEnum ) from .process_helpers import invoke_processor +from .rabbitmq_utils import OcrdResultMessage from .server_utils import ( _get_processor_job, - validate_and_resolve_mets_path, + validate_and_return_mets_path, validate_job_input ) from .utils import ( calculate_execution_time, + post_to_callback_url, generate_id, tf_disable_interactive_logs ) @@ -126,26 +128,30 @@ async def get_processor_info(self): # Note: The Processing server pushes to a queue, while # the Processor Server creates (pushes to) a background task - async def create_processor_task(self, job_input: PYJobInput, background_tasks: BackgroundTasks): + async def create_processor_task(self, job_input: PYJobInput): validate_job_input(self.log, self.processor_name, self.ocrd_tool, job_input) - job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=True) - - job_id = generate_id() - job = DBProcessorJob( - **job_input.dict(exclude_unset=True, exclude_none=True), - job_id=job_id, - processor_name=self.processor_name, - state=StateEnum.queued - ) - await job.insert() - await self.run_processor_task(job_id=job_id, job=job) + job_input.path_to_mets = await validate_and_return_mets_path(self.log, job_input) + + job = None + # The request is not forwarded from the Processing Server, assign a job_id + if not job_input.job_id: + job_id = generate_id() + # Create a DB entry + job = DBProcessorJob( + **job_input.dict(exclude_unset=True, exclude_none=True), + job_id=job_id, + processor_name=self.processor_name, + state=StateEnum.queued + ) + await job.insert() + await self.run_processor_task(job=job) return job.to_job_output() - async def run_processor_task(self, job_id: str, job: DBProcessorJob): + async def run_processor_task(self, job: DBProcessorJob): execution_failed = False start_time = datetime.now() await db_update_processing_job( - job_id=job_id, + job_id=job.job_id, state=StateEnum.running, start_time=start_time ) @@ -169,7 +175,7 @@ async def run_processor_task(self, job_id: str, job: DBProcessorJob): exec_duration = calculate_execution_time(start_time, end_time) job_state = StateEnum.success if not execution_failed else StateEnum.failed await db_update_processing_job( - job_id=job_id, + job_id=job.job_id, state=job_state, end_time=end_time, exec_time=f'{exec_duration} ms' @@ -180,6 +186,22 @@ async def run_processor_task(self, job_id: str, job: DBProcessorJob): workspace_mets_path=job.path_to_mets, being_processed=False ) + result_message = OcrdResultMessage( + job_id=job.job_id, + state=job_state.value, + path_to_mets=job.path_to_mets, + # May not be always available + workspace_id=job.workspace_id + ) + self.log.info(f'Result message: {result_message}') + if job.callback_url: + # If the callback_url field is set, + # post the result message (callback to a user defined endpoint) + post_to_callback_url(self.log, job.callback_url, result_message) + if job.internal_callback_url: + # If the internal callback_url field is set, + # post the result message (callback to Processing Server endpoint) + post_to_callback_url(self.log, job.internal_callback_url, result_message) def get_ocrd_tool(self): if self.ocrd_tool: diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py index b117cb48bd..dad43b4e8d 100644 --- a/ocrd_network/ocrd_network/server_utils.py +++ b/ocrd_network/ocrd_network/server_utils.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, HTTPException, status, BackgroundTasks +from fastapi import HTTPException, status from ocrd_validators import ParameterValidator from .database import ( db_get_processing_job, @@ -22,21 +22,21 @@ async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobO ) -async def validate_and_resolve_mets_path(logger, job_input: PYJobInput, resolve: bool = False) -> PYJobInput: +async def validate_and_return_mets_path(logger, job_input: PYJobInput) -> str: # This check is done to return early in case the workspace_id is provided # but the abs mets path cannot be queried from the DB if not job_input.path_to_mets and job_input.workspace_id: try: db_workspace = await db_get_workspace(job_input.workspace_id) - if resolve: - job_input.path_to_mets = db_workspace.workspace_mets_path + path_to_mets = db_workspace.workspace_mets_path except ValueError as e: logger.exception(f"Workspace with id '{job_input.workspace_id}' not existing: {e}") raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Workspace with id '{job_input.workspace_id}' not existing" ) - return job_input + return path_to_mets + return job_input.path_to_mets def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index d41a1b13ab..4098ef0fa0 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -10,6 +10,7 @@ from yaml import safe_load from ocrd_validators import ProcessingServerConfigValidator +from .rabbitmq_utils import OcrdResultMessage # Based on: https://gist.github.com/phizaz/20c36c6734878c6ec053245a477572ec @@ -108,3 +109,16 @@ def download_ocrd_all_tool_json(ocrd_all_url: str): if not response.status_code == 200: raise ValueError(f"Failed to download ocrd all tool json from: '{ocrd_all_url}'") return response.json() + + +def post_to_callback_url(logger, callback_url: str, result_message: OcrdResultMessage): + logger.info(f'Posting result message to callback_url "{callback_url}"') + headers = {"Content-Type": "application/json"} + json_data = { + "job_id": result_message.job_id, + "state": result_message.state, + "path_to_mets": result_message.path_to_mets, + "workspace_id": result_message.workspace_id + } + response = requests.post(url=callback_url, headers=headers, json=json_data) + logger.info(f'Response from callback_url "{response}"') From a273439a5ab84b28aa20ba709046a1e15e89fbc6 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Mon, 17 Jul 2023 14:59:39 +0200 Subject: [PATCH 05/18] implement page-wise locking and internal callback --- ocrd_network/ocrd_network/database.py | 4 +- ocrd_network/ocrd_network/models/job.py | 1 + ocrd_network/ocrd_network/models/workspace.py | 11 +- .../ocrd_network/processing_server.py | 188 ++++++++++++++---- ocrd_network/ocrd_network/server_utils.py | 20 ++ 5 files changed, 183 insertions(+), 41 deletions(-) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 10bac52327..5f4b56f71e 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -88,8 +88,8 @@ async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str workspace.bag_info_adds = value elif key == 'deleted': workspace.deleted = value - elif key == 'being_processed': - workspace.being_processed = value + elif key == 'pages_locked': + workspace.pages_locked = value else: raise ValueError(f'Field "{key}" is not updatable.') await workspace.save() diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index 74f30a490c..c710b41c33 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -22,6 +22,7 @@ class StateEnum(str, Enum): class PYJobInput(BaseModel): """ Wraps the parameters required to make a run-processor-request """ + processor_name: Optional[str] = None path_to_mets: Optional[str] = None workspace_id: Optional[str] = None description: Optional[str] = None diff --git a/ocrd_network/ocrd_network/models/workspace.py b/ocrd_network/ocrd_network/models/workspace.py index bb24412e65..9e72446c79 100644 --- a/ocrd_network/ocrd_network/models/workspace.py +++ b/ocrd_network/ocrd_network/models/workspace.py @@ -1,5 +1,5 @@ from beanie import Document -from typing import Optional +from typing import Dict, Optional class DBWorkspace(Document): @@ -16,7 +16,9 @@ class DBWorkspace(Document): bag_info_adds bag-info.txt can also (optionally) contain additional key-value-pairs which are saved here deleted the document is deleted if set, however, the record is still preserved - being_processed whether the workspace is currently used in a workflow execution or not + pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id` + that are currently being processed by an OCR-D processor (server or worker). + If no `page_id` field is set, an identifier "all" will be used to represent all pages. """ workspace_id: str workspace_mets_path: str @@ -26,7 +28,10 @@ class DBWorkspace(Document): ocrd_mets: Optional[str] bag_info_adds: Optional[dict] deleted: bool = False - being_processed: bool = False + # Dictionary structure: + # Key: fileGrp + # Value: Set of `page_id`s + pages_locked: Optional[Dict] = {} class Settings: name = "workspace" diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 238d5dda5f..02fc81621c 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -1,7 +1,7 @@ import json import requests import httpx -from typing import Dict, List +from typing import Dict, List, Optional import uvicorn from queue import Queue @@ -10,12 +10,12 @@ from fastapi.responses import JSONResponse from pika.exceptions import ChannelClosedByBroker - from ocrd_utils import getLogger from .database import ( initiate_database, + db_get_processing_job, db_get_workspace, - db_update_workspace + db_update_workspace, ) from .deployer import Deployer from .models import ( @@ -26,11 +26,11 @@ ) from .rabbitmq_utils import ( RMQPublisher, - OcrdProcessingMessage, - OcrdResultMessage + OcrdProcessingMessage ) from .server_utils import ( _get_processor_job, + expand_page_ids, validate_and_return_mets_path, validate_job_input, ) @@ -117,12 +117,12 @@ def __init__(self, config_path: str, host: str, port: int) -> None: ) self.router.add_api_route( - path='/processor/result_callback/{job_id}', + path='/processor/result_callback', endpoint=self.remove_from_request_cache, methods=['POST'], tags=['processing'], status_code=status.HTTP_200_OK, - summary='Callback used by a worker or processor server for successful processing of a request', + summary='Callback used by a worker or processor server for reporting result of a processing request', ) self.router.add_api_route( @@ -294,7 +294,8 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Job id field is set but must not be: {data.job_id}" ) - data.job_id = generate_id() # Generate processing job id + # Generate processing job id + data.job_id = generate_id() if data.agent_type not in ['worker', 'server']: raise HTTPException( @@ -311,39 +312,71 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found" ) - # The workspace is currently locked (being processed) - # TODO: Do the proper caching here, after the refactored code is working - if workspace_db.being_processed: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Workspace with id: {data.workspace_id} or " - f"path: {data.path_to_mets} is currently being processed" - ) - - workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets - # If a record queue of this workspace_id does not exist in the requests cache - if not self.processing_requests_cache.get(workspace_key, None): - self.processing_requests_cache[workspace_key] = Queue() - # Add the processing request to the internal queue - self.processing_requests_cache[workspace_key].put(data) - - data = self.processing_requests_cache[workspace_key].get() - # Lock the workspace - await db_update_workspace( - workspace_id=data.workspace_id, - workspace_mets_path=data.path_to_mets, - being_processed=True - ) - # Since the path is not resolved yet, # the return value is not important for the Processing Server await validate_and_return_mets_path(self.log, data) + page_ids = expand_page_ids(data.page_id) + + # A flag whether the current request must be cached + # This is set to true if for any output fileGrp there + # is a page_id value that have been previously locked + cache_current_request = False + + # Check if there are any locked pages for the current request + locked_ws_pages = workspace_db.pages_locked + for output_fileGrp in data.output_file_grps: + if output_fileGrp in locked_ws_pages: + if "all_pages" in locked_ws_pages[output_fileGrp]: + cache_current_request = True + break + # If there are request page ids that are already locked + if not locked_ws_pages[output_fileGrp].isdisjoint(page_ids): + cache_current_request = True + break + + if cache_current_request: + # Append the processor name to the request itself + data.processor_name = processor_name + + workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets + # If a record queue of this workspace_id does not exist in the requests cache + if not self.processing_requests_cache.get(workspace_key, None): + self.processing_requests_cache[workspace_key] = Queue() + # Add the processing request to the internal queue + self.processing_requests_cache[workspace_key].put(data) + + return PYJobOutput( + job_id=data.job_id, + processor_name=processor_name, + workspace_id=data.workspace_id, + workspace_path=data.path_to_mets, + state=StateEnum.cached + ) + else: + # Update locked pages by locking the pages in the request + for output_fileGrp in data.output_file_grps: + if output_fileGrp not in locked_ws_pages: + locked_ws_pages[output_fileGrp] = set() + # The page id list is not empty - only some pages are in the request + if page_ids: + locked_ws_pages[output_fileGrp].update(page_ids) + else: + # Lock all pages with a single value + locked_ws_pages[output_fileGrp].add("all_pages") + + # Update the locked pages dictionary in the database + await db_update_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets, + pages_locked=locked_ws_pages + ) + # Create a DB entry job = DBProcessorJob( **data.dict(exclude_unset=True, exclude_none=True), processor_name=processor_name, - internal_callback_url=f"/processor/result_callback/{data.job_id}", + internal_callback_url=f"/processor/result_callback", state=StateEnum.queued ) await job.insert() @@ -425,9 +458,92 @@ async def push_to_processor_server(self, processor_name: str, processor_server_u async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, processor_name, job_id) - async def remove_from_request_cache(self, processor_name: str, job_id: str, ocrd_result: OcrdResultMessage): - # TODO: Implement, after the refactored code is working - pass + async def remove_from_request_cache(self, job_id: str, state: StateEnum, + workspace_id: Optional[str], path_to_mets: Optional[str]): + if state == StateEnum.failed: + # TODO: Call the callback to the Workflow server if the current processing step has failed + pass + + if state != StateEnum.success: + # TODO: Handle other potential error cases + pass + + job_db = await db_get_processing_job(job_id) + if not job_db: + self.log.exception(f"Processing job with id: {job_id} not found in DB") + job_output_file_grps = job_db.output_file_grps + job_page_ids = expand_page_ids(job_db.page_id) + + # TODO: Unlock previously locked workspace pages + workspace_db = await db_get_workspace( + workspace_id=workspace_id, + workspace_mets_path=path_to_mets + ) + if not workspace_db: + self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") + + locked_ws_pages = workspace_db.pages_locked + + # Update locked pages by locking the pages in the request + for output_fileGrp in job_output_file_grps: + if output_fileGrp in locked_ws_pages: + if job_page_ids: + # Unlock the previously locked pages + locked_ws_pages[output_fileGrp].difference_update(set(job_page_ids)) + else: + # Remove the single variable used to indicate all pages are locked + locked_ws_pages[output_fileGrp].remove("all_pages") + + # Update the locked pages dictionary in the database + await db_update_workspace( + workspace_id=workspace_id, + workspace_mets_path=path_to_mets, + pages_locked=locked_ws_pages + ) + + # Take the next request from the cache (if any available) + workspace_key = workspace_id if workspace_id else path_to_mets + + if workspace_key not in self.processing_requests_cache: + # No internal queue available for that workspace + return + + if self.processing_requests_cache[workspace_key].empty(): + # The queue is empty - delete it + try: + del self.processing_requests_cache[workspace_key] + except KeyError as ex: + self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") + return + + # Process the next request in the internal queue + # TODO: Refactor and optimize the duplications here + # and last lines in `push_processor_job` method + data = self.processing_requests_cache[workspace_key].get() + processor_name = data.processor_name + + # Create a DB entry + job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + processor_name=processor_name, + internal_callback_url=f"/processor/result_callback", + state=StateEnum.queued + ) + await job.insert() + + job_output = None + if data.agent_type == 'worker': + ocrd_tool = await self.get_processor_info(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(job) + await self.push_to_processing_queue(processor_name, processing_message) + job_output = job.to_job_output() + if data.agent_type == 'server': + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) + if not job_output: + self.log.exception(f'Failed to create job output for job input data: {data}') async def get_processor_info(self, processor_name) -> Dict: """ Return a processor's ocrd-tool.json diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py index dad43b4e8d..568dfe6f3b 100644 --- a/ocrd_network/ocrd_network/server_utils.py +++ b/ocrd_network/ocrd_network/server_utils.py @@ -1,5 +1,11 @@ +import re from fastapi import HTTPException, status +from typing import Dict, List from ocrd_validators import ParameterValidator +from ocrd_utils import ( + generate_range, + REGEX_PREFIX +) from .database import ( db_get_processing_job, db_get_workspace, @@ -39,6 +45,20 @@ async def validate_and_return_mets_path(logger, job_input: PYJobInput) -> str: return job_input.path_to_mets +def expand_page_ids(page_id: str) -> List: + page_ids = [] + if not page_id: + return page_ids + for page_id_token in re.split(r',', page_id): + if page_id_token.startswith(REGEX_PREFIX): + page_ids.append(re.compile(page_id_token[len(REGEX_PREFIX):])) + elif '..' in page_id_token: + page_ids += generate_range(*page_id_token.split('..', 1)) + else: + page_ids += [page_id_token] + return page_ids + + def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: if bool(job_input.path_to_mets) == bool(job_input.workspace_id): logger.exception("Either 'path' or 'workspace_id' must be provided, but not both") From c7b60468c9e880bcf73d1be1c107bf92a66d80a6 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Mon, 17 Jul 2023 15:35:37 +0200 Subject: [PATCH 06/18] refactor comments --- ocrd_network/ocrd_network/models/workspace.py | 2 +- ocrd_network/ocrd_network/processing_server.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ocrd_network/ocrd_network/models/workspace.py b/ocrd_network/ocrd_network/models/workspace.py index 9e72446c79..d05ddcf02d 100644 --- a/ocrd_network/ocrd_network/models/workspace.py +++ b/ocrd_network/ocrd_network/models/workspace.py @@ -18,7 +18,7 @@ class DBWorkspace(Document): deleted the document is deleted if set, however, the record is still preserved pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id` that are currently being processed by an OCR-D processor (server or worker). - If no `page_id` field is set, an identifier "all" will be used to represent all pages. + If no `page_id` field is set, an identifier "all_pages" will be used. """ workspace_id: str workspace_mets_path: str diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 02fc81621c..79afacb8b1 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -320,7 +320,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # A flag whether the current request must be cached # This is set to true if for any output fileGrp there - # is a page_id value that have been previously locked + # is a page_id value that has been previously locked cache_current_request = False # Check if there are any locked pages for the current request @@ -474,7 +474,7 @@ async def remove_from_request_cache(self, job_id: str, state: StateEnum, job_output_file_grps = job_db.output_file_grps job_page_ids = expand_page_ids(job_db.page_id) - # TODO: Unlock previously locked workspace pages + # Read DB workspace entry workspace_db = await db_get_workspace( workspace_id=workspace_id, workspace_mets_path=path_to_mets @@ -482,9 +482,8 @@ async def remove_from_request_cache(self, job_id: str, state: StateEnum, if not workspace_db: self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") + # Update locked pages by unlocking the pages in the request locked_ws_pages = workspace_db.pages_locked - - # Update locked pages by locking the pages in the request for output_fileGrp in job_output_file_grps: if output_fileGrp in locked_ws_pages: if job_page_ids: From efc9ec78eb46ab991d570763295d68f2f168e047 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Tue, 22 Aug 2023 17:10:41 +0200 Subject: [PATCH 07/18] Fix worker/processor server access to DB --- ocrd_network/ocrd_network/database.py | 16 ++++++++++++---- ocrd_network/ocrd_network/deployer.py | 2 +- ocrd_network/ocrd_network/processing_worker.py | 7 ------- ocrd_network/ocrd_network/processor_server.py | 7 ------- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 5f4b56f71e..acc3b1c58a 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -37,16 +37,20 @@ async def sync_initiate_database(db_url: str): async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace: workspace = None + if not workspace_id and not workspace_mets_path: + raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key') if workspace_id: workspace = await DBWorkspace.find_one( DBWorkspace.workspace_id == workspace_id ) + if not workspace: + raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') if workspace_mets_path: workspace = await DBWorkspace.find_one( DBWorkspace.workspace_mets_path == workspace_mets_path ) - if not workspace: - raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') + if not workspace: + raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.') return workspace @@ -57,16 +61,20 @@ async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: s async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): workspace = None + if not workspace_id and not workspace_mets_path: + raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key') if workspace_id: workspace = await DBWorkspace.find_one( DBWorkspace.workspace_id == workspace_id ) + if not workspace: + raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') if workspace_mets_path: workspace = await DBWorkspace.find_one( DBWorkspace.workspace_mets_path == workspace_mets_path ) - if not workspace: - raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') + if not workspace: + raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.') job_keys = list(workspace.__dict__.keys()) for key, value in kwargs.items(): diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index f5b7f045d5..a9239c4f70 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -302,7 +302,7 @@ def deploy_mongodb( ) -> str: if self.data_mongo.skip_deployment: self.log.debug('MongoDB is externaly managed. Skipping deployment') - verify_mongodb_available(self.data_mongo.url); + verify_mongodb_available(self.data_mongo.url) return self.data_mongo.url self.log.debug(f"Trying to deploy '{image}', with modes: " diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index 29ee6fdbf6..6e72fe9f18 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -21,7 +21,6 @@ sync_initiate_database, sync_db_get_workspace, sync_db_update_processing_job, - sync_db_update_workspace ) from .models import StateEnum from .process_helpers import invoke_processor @@ -230,12 +229,6 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: end_time=end_time, exec_time=f'{exec_duration} ms' ) - # Unlock the workspace - sync_db_update_workspace( - workspace_id=workspace_id, - workspace_mets_path=path_to_mets, - being_processed=False - ) result_message = OcrdResultMessage( job_id=job_id, state=job_state.value, diff --git a/ocrd_network/ocrd_network/processor_server.py b/ocrd_network/ocrd_network/processor_server.py index 764d27bc14..b7abc6f148 100644 --- a/ocrd_network/ocrd_network/processor_server.py +++ b/ocrd_network/ocrd_network/processor_server.py @@ -14,7 +14,6 @@ from .database import ( DBProcessorJob, db_update_processing_job, - db_update_workspace, initiate_database ) from .models import ( @@ -180,12 +179,6 @@ async def run_processor_task(self, job: DBProcessorJob): end_time=end_time, exec_time=f'{exec_duration} ms' ) - # Unlock the workspace - await db_update_workspace( - workspace_id=job.workspace_id, - workspace_mets_path=job.path_to_mets, - being_processed=False - ) result_message = OcrdResultMessage( job_id=job.job_id, state=job_state.value, From 82302d7ec2874c6cbf28aa0afc2ebf94ba02f89b Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Wed, 23 Aug 2023 14:04:56 +0200 Subject: [PATCH 08/18] Limit pydantic to max version 1 --- ocrd_network/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/ocrd_network/requirements.txt b/ocrd_network/requirements.txt index 4e8986c1bd..537dca1adb 100644 --- a/ocrd_network/requirements.txt +++ b/ocrd_network/requirements.txt @@ -1,5 +1,6 @@ uvicorn>=0.17.6 fastapi>=0.78.0 +pydantic==1.* docker paramiko pika>=1.2.0 From becfe23682227adc2c2d5c73bb3e150fb0193ba9 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Thu, 24 Aug 2023 20:48:30 +0200 Subject: [PATCH 09/18] implement depends on feature --- ocrd_network/ocrd_network/models/job.py | 3 + .../ocrd_network/processing_server.py | 86 ++++++++++++++----- .../ocrd_network/processing_worker.py | 16 +--- .../rabbitmq_utils/ocrd_messages.py | 8 +- .../message_processing.schema.yml | 9 +- .../ocrd_validators/message_result.schema.yml | 4 +- 6 files changed, 84 insertions(+), 42 deletions(-) diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index c710b41c33..30fca8e424 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -37,6 +37,8 @@ class PYJobInput(BaseModel): agent_type: Optional[str] = 'worker' # Auto generated by the Processing Server when forwarding to the Processor Server job_id: Optional[str] = None + # If set, specifies a list of job ids this job depends on + depends_on: Optional[List[str]] = None class Config: schema_extra = { @@ -74,6 +76,7 @@ class DBProcessorJob(Document): output_file_grps: Optional[List[str]] page_id: Optional[str] parameters: Optional[dict] + depends_on: Optional[List[str]] result_queue_name: Optional[str] callback_url: Optional[str] internal_callback_url: Optional[str] diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 79afacb8b1..56fc947a70 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -3,7 +3,6 @@ import httpx from typing import Dict, List, Optional import uvicorn -from queue import Queue from fastapi import FastAPI, status, Request, HTTPException from fastapi.exceptions import RequestValidationError @@ -22,6 +21,7 @@ DBProcessorJob, PYJobInput, PYJobOutput, + PYResultMessage, StateEnum ) from .rabbitmq_utils import ( @@ -83,6 +83,9 @@ def __init__(self, config_path: str, host: str, port: int) -> None: # Value: Queue that holds PYInputJob elements self.processing_requests_cache = {} + # Used by processing workers and/or processor servers to report back the results + self.internal_job_callback_url = f'http://{host}:{port}/processor/result_callback' + # Create routes self.router.add_api_route( path='/stop', @@ -249,6 +252,7 @@ def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: parameters=job.parameters, result_queue_name=job.result_queue_name, callback_url=job.callback_url, + internal_callback_url=job.internal_callback_url ) return processing_message @@ -266,6 +270,34 @@ def check_if_queue_exists(self, processor_name): detail=f"Process queue with id '{processor_name}' not existing" ) + # Returns true if all dependent jobs' states are success, else false + @staticmethod + async def check_if_job_dependencies_met(dependencies: List[str]) -> bool: + # Check the states of all dependent jobs + for dependency_job_id in dependencies: + dependency_job_state = (await db_get_processing_job(dependency_job_id)).state + # Found a dependent job whose state is not success + if dependency_job_state != StateEnum.success: + return False + return True + + async def find_next_request_from_internal_queue(self, internal_queue: List[PYJobInput]) -> PYJobInput: + found_index = None + found_request = None + for i in range(0, len(internal_queue)): + current_element = internal_queue[i] + # Request has other job dependencies + if current_element.depends_on: + if not await self.check_if_job_dependencies_met(current_element.depends_on): + continue + found_index = i + break + + if found_index: + # Consume the request from the internal queue + found_request = internal_queue.pop(found_index) + return found_request + def query_ocrd_tool_json_from_server(self, processor_name): processor_server_url = self.deployer.resolve_processor_server_url(processor_name) if not processor_server_url: @@ -323,17 +355,24 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # is a page_id value that has been previously locked cache_current_request = False - # Check if there are any locked pages for the current request + # Check if there are any dependencies of the current request + if data.depends_on: + if not await self.check_if_job_dependencies_met(data.depends_on): + cache_current_request = True + locked_ws_pages = workspace_db.pages_locked - for output_fileGrp in data.output_file_grps: - if output_fileGrp in locked_ws_pages: - if "all_pages" in locked_ws_pages[output_fileGrp]: - cache_current_request = True - break - # If there are request page ids that are already locked - if not locked_ws_pages[output_fileGrp].isdisjoint(page_ids): - cache_current_request = True - break + # No need for further check if the request should be cached + if not cache_current_request: + # Check if there are any locked pages for the current request + for output_fileGrp in data.output_file_grps: + if output_fileGrp in locked_ws_pages: + if "all_pages" in locked_ws_pages[output_fileGrp]: + cache_current_request = True + break + # If there are request page ids that are already locked + if not locked_ws_pages[output_fileGrp].isdisjoint(page_ids): + cache_current_request = True + break if cache_current_request: # Append the processor name to the request itself @@ -342,9 +381,9 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets # If a record queue of this workspace_id does not exist in the requests cache if not self.processing_requests_cache.get(workspace_key, None): - self.processing_requests_cache[workspace_key] = Queue() - # Add the processing request to the internal queue - self.processing_requests_cache[workspace_key].put(data) + self.processing_requests_cache[workspace_key] = [] + # Add the processing request to the end of the internal queue + self.processing_requests_cache[workspace_key].append(data) return PYJobOutput( job_id=data.job_id, @@ -376,7 +415,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ job = DBProcessorJob( **data.dict(exclude_unset=True, exclude_none=True), processor_name=processor_name, - internal_callback_url=f"/processor/result_callback", + internal_callback_url=self.internal_job_callback_url, state=StateEnum.queued ) await job.insert() @@ -504,10 +543,10 @@ async def remove_from_request_cache(self, job_id: str, state: StateEnum, workspace_key = workspace_id if workspace_id else path_to_mets if workspace_key not in self.processing_requests_cache: - # No internal queue available for that workspace + self.log.exception(f"No internal queue available for workspace with key: {workspace_key}") return - if self.processing_requests_cache[workspace_key].empty(): + if not len(self.processing_requests_cache[workspace_key]): # The queue is empty - delete it try: del self.processing_requests_cache[workspace_key] @@ -515,17 +554,18 @@ async def remove_from_request_cache(self, job_id: str, state: StateEnum, self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") return - # Process the next request in the internal queue - # TODO: Refactor and optimize the duplications here - # and last lines in `push_processor_job` method - data = self.processing_requests_cache[workspace_key].get() - processor_name = data.processor_name + data = await self.find_next_request_from_internal_queue(self.processing_requests_cache[workspace_key]) + # Nothing was consumed from the internal queue + if not data: + self.log.exception(f"No data was consumed from the internal queue") + return + processor_name = data.processor_name # Create a DB entry job = DBProcessorJob( **data.dict(exclude_unset=True, exclude_none=True), processor_name=processor_name, - internal_callback_url=f"/processor/result_callback", + internal_callback_url=self.internal_job_callback_url, state=StateEnum.queued ) await job.insert() diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index c76dd1f67a..d8a7f01f7c 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -192,7 +192,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: page_id = processing_message.page_id if 'page_id' in pm_keys else None result_queue_name = processing_message.result_queue_name if 'result_queue_name' in pm_keys else None callback_url = processing_message.callback_url if 'callback_url' in pm_keys else None - internal_callback_url = processing_message.callback_url if 'internal_callback_url' in pm_keys else None + internal_callback_url = processing_message.internal_callback_url if 'internal_callback_url' in pm_keys else None parameters = processing_message.parameters if processing_message.parameters else {} if not path_to_mets and workspace_id: @@ -239,7 +239,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: # May not be always available workspace_id=workspace_id ) - self.log.info(f'Result message: {result_message}') + self.log.info(f'Result message: {str(result_message)}') # If the result_queue field is set, send the result message to a result queue if result_queue_name: self.publish_to_result_queue(result_queue_name, result_message) @@ -265,18 +265,6 @@ def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultM message=encoded_result_message ) - def post_to_callback_url(self, callback_url: str, result_message: OcrdResultMessage): - self.log.info(f'Posting result message to callback_url "{callback_url}"') - headers = {"Content-Type": "application/json"} - json_data = { - "job_id": result_message.job_id, - "state": result_message.state, - "path_to_mets": result_message.path_to_mets, - "workspace_id": result_message.workspace_id - } - response = requests.post(url=callback_url, headers=headers, json=json_data) - self.log.info(f'Response from callback_url "{response}"') - def create_queue(self, connection_attempts=1, retry_delay=1): """Create the queue for this worker diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py b/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py index 80f5e253a9..4016cd71bb 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py @@ -18,7 +18,8 @@ def __init__( page_id: Optional[str], result_queue_name: Optional[str], callback_url: Optional[str], - parameters: Dict[str, Any] = None, + internal_callback_url: Optional[str], + parameters: Dict[str, Any] = None ) -> None: if not job_id: raise ValueError('job_id must be provided') @@ -47,6 +48,8 @@ def __init__( self.result_queue_name = result_queue_name if callback_url: self.callback_url = callback_url + if internal_callback_url: + self.internal_callback_url = internal_callback_url self.parameters = parameters if parameters else {} @staticmethod @@ -71,7 +74,8 @@ def decode_yml(ocrd_processing_message: bytes) -> OcrdProcessingMessage: page_id=data.get('page_id', None), parameters=data.get('parameters', None), result_queue_name=data.get('result_queue_name', None), - callback_url=data.get('callback_url', None) + callback_url=data.get('callback_url', None), + internal_callback_url=data.get('internal_callback_url', None) ) diff --git a/ocrd_validators/ocrd_validators/message_processing.schema.yml b/ocrd_validators/ocrd_validators/message_processing.schema.yml index 3a8042bf42..b4363aeba0 100644 --- a/ocrd_validators/ocrd_validators/message_processing.schema.yml +++ b/ocrd_validators/ocrd_validators/message_processing.schema.yml @@ -58,8 +58,13 @@ properties: callback_url: description: The URL where the result message will be POST-ed to type: string - format: uri, - pattern: "^https?://" + format: uri + pattern: "^http?://" + internal_callback_url: + description: The URL where the internal result message will be POST-ed to the Processing Server + type: string + format: uri + pattern: "^http?://" created_time: description: The Unix timestamp when the message was created type: integer diff --git a/ocrd_validators/ocrd_validators/message_result.schema.yml b/ocrd_validators/ocrd_validators/message_result.schema.yml index aef62821ea..d2c87ba6e4 100644 --- a/ocrd_validators/ocrd_validators/message_result.schema.yml +++ b/ocrd_validators/ocrd_validators/message_result.schema.yml @@ -20,8 +20,10 @@ properties: description: The current status of the job type: string enum: - - SUCCESS + - CACHED + - QUEUED - RUNNING + - SUCCESS - FAILED path_to_mets: description: Path to a METS file From 2b0c4ea68167b70c201ce90e118c8f6227208650 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Thu, 24 Aug 2023 20:50:03 +0200 Subject: [PATCH 10/18] remove unnecessary import --- ocrd_network/ocrd_network/processing_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 56fc947a70..d7d550fdf5 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -21,7 +21,6 @@ DBProcessorJob, PYJobInput, PYJobOutput, - PYResultMessage, StateEnum ) from .rabbitmq_utils import ( From 70dfcec13b5add3b3daa1a5db349e4c2cab185f3 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Fri, 25 Aug 2023 14:32:06 +0200 Subject: [PATCH 11/18] Fix depends on and internal callback mechanism --- ocrd_network/ocrd_network/models/__init__.py | 2 + ocrd_network/ocrd_network/models/messages.py | 22 +++++++++ .../ocrd_network/processing_server.py | 49 +++++++++++-------- 3 files changed, 53 insertions(+), 20 deletions(-) create mode 100644 ocrd_network/ocrd_network/models/messages.py diff --git a/ocrd_network/ocrd_network/models/__init__.py b/ocrd_network/ocrd_network/models/__init__.py index 365e794bee..80dec8acd5 100644 --- a/ocrd_network/ocrd_network/models/__init__.py +++ b/ocrd_network/ocrd_network/models/__init__.py @@ -9,6 +9,7 @@ 'PYJobInput', 'PYJobOutput', 'PYOcrdTool', + 'PYResultMessage', 'StateEnum', ] @@ -18,5 +19,6 @@ PYJobOutput, StateEnum ) +from .messages import PYResultMessage from .ocrd_tool import PYOcrdTool from .workspace import DBWorkspace diff --git a/ocrd_network/ocrd_network/models/messages.py b/ocrd_network/ocrd_network/models/messages.py new file mode 100644 index 0000000000..062f1a9a24 --- /dev/null +++ b/ocrd_network/ocrd_network/models/messages.py @@ -0,0 +1,22 @@ +from pydantic import BaseModel +from typing import Optional +from .job import StateEnum + + +class PYResultMessage(BaseModel): + """ Wraps the parameters required to make a result message request + """ + job_id: str + state: StateEnum + path_to_mets: Optional[str] = None + workspace_id: Optional[str] = None + + class Config: + schema_extra = { + 'example': { + 'job_id': '123123123', + 'state': 'SUCCESS', + 'path_to_mets': '/path/to/mets.xml', + 'workspace_id': 'c7f25615-fc17-4365-a74d-ad20e1ddbd0e' + } + } diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index d7d550fdf5..5e1bd9655a 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -21,6 +21,7 @@ DBProcessorJob, PYJobInput, PYJobOutput, + PYResultMessage, StateEnum ) from .rabbitmq_utils import ( @@ -83,7 +84,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: self.processing_requests_cache = {} # Used by processing workers and/or processor servers to report back the results - self.internal_job_callback_url = f'http://{host}:{port}/processor/result_callback' + self.internal_job_callback_url = f'http://{host}:{port}/result_callback' # Create routes self.router.add_api_route( @@ -119,7 +120,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: ) self.router.add_api_route( - path='/processor/result_callback', + path='/result_callback', endpoint=self.remove_from_request_cache, methods=['POST'], tags=['processing'], @@ -270,31 +271,32 @@ def check_if_queue_exists(self, processor_name): ) # Returns true if all dependent jobs' states are success, else false - @staticmethod - async def check_if_job_dependencies_met(dependencies: List[str]) -> bool: + async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: # Check the states of all dependent jobs for dependency_job_id in dependencies: + self.log.debug(f"dependency_job_id: {dependency_job_id}") dependency_job_state = (await db_get_processing_job(dependency_job_id)).state + self.log.debug(f"dependency_job_state: {dependency_job_state}") # Found a dependent job whose state is not success if dependency_job_state != StateEnum.success: return False return True async def find_next_request_from_internal_queue(self, internal_queue: List[PYJobInput]) -> PYJobInput: - found_index = None found_request = None - for i in range(0, len(internal_queue)): - current_element = internal_queue[i] + for i, current_element in enumerate(internal_queue): # Request has other job dependencies if current_element.depends_on: - if not await self.check_if_job_dependencies_met(current_element.depends_on): + self.log.debug(f"current_element: {current_element}") + self.log.debug(f"job dependencies: {current_element.depends_on}") + satisfied_dependencies = await self.check_if_job_dependencies_met(current_element.depends_on) + self.log.debug(f"satisfied dependencies: {satisfied_dependencies}") + if not satisfied_dependencies: continue - found_index = i - break - - if found_index: # Consume the request from the internal queue - found_request = internal_queue.pop(found_index) + found_request = internal_queue.pop(i) + self.log.debug(f"found cached request to be processed: {found_request}") + break return found_request def query_ocrd_tool_json_from_server(self, processor_name): @@ -328,6 +330,9 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # Generate processing job id data.job_id = generate_id() + # Append the processor name to the request itself + data.processor_name = processor_name + if data.agent_type not in ['worker', 'server']: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, @@ -357,6 +362,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # Check if there are any dependencies of the current request if data.depends_on: if not await self.check_if_job_dependencies_met(data.depends_on): + self.log.debug(f"Caching the received request due to job dependencies") cache_current_request = True locked_ws_pages = workspace_db.pages_locked @@ -366,17 +372,16 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ for output_fileGrp in data.output_file_grps: if output_fileGrp in locked_ws_pages: if "all_pages" in locked_ws_pages[output_fileGrp]: + self.log.debug(f"Caching the received request due to locked output file grp pages") cache_current_request = True break # If there are request page ids that are already locked if not locked_ws_pages[output_fileGrp].isdisjoint(page_ids): + self.log.debug(f"Caching the received request due to locked output file grp pages") cache_current_request = True break if cache_current_request: - # Append the processor name to the request itself - data.processor_name = processor_name - workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets # If a record queue of this workspace_id does not exist in the requests cache if not self.processing_requests_cache.get(workspace_key, None): @@ -413,7 +418,6 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # Create a DB entry job = DBProcessorJob( **data.dict(exclude_unset=True, exclude_none=True), - processor_name=processor_name, internal_callback_url=self.internal_job_callback_url, state=StateEnum.queued ) @@ -496,8 +500,14 @@ async def push_to_processor_server(self, processor_name: str, processor_server_u async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, processor_name, job_id) - async def remove_from_request_cache(self, job_id: str, state: StateEnum, - workspace_id: Optional[str], path_to_mets: Optional[str]): + async def remove_from_request_cache(self, result_message: PYResultMessage): + job_id = result_message.job_id + state = result_message.state + path_to_mets = result_message.path_to_mets + workspace_id = result_message.workspace_id + + self.log.debug(f"Received result for job with id: {job_id} has state: {state}") + if state == StateEnum.failed: # TODO: Call the callback to the Workflow server if the current processing step has failed pass @@ -563,7 +573,6 @@ async def remove_from_request_cache(self, job_id: str, state: StateEnum, # Create a DB entry job = DBProcessorJob( **data.dict(exclude_unset=True, exclude_none=True), - processor_name=processor_name, internal_callback_url=self.internal_job_callback_url, state=StateEnum.queued ) From 23591ecc95a6cb6708e463717a64402f043e45fc Mon Sep 17 00:00:00 2001 From: joschrew Date: Fri, 25 Aug 2023 15:20:28 +0200 Subject: [PATCH 12/18] Add possibility to overwrite internal_callback_url --- ocrd_network/ocrd_network/deployer.py | 1 + ocrd_network/ocrd_network/processing_server.py | 8 ++++++-- .../ocrd_validators/processing_server_config.schema.yml | 3 +++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index a9239c4f70..058fc6e293 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -39,6 +39,7 @@ def __init__(self, config_path: str) -> None: self.data_mongo: DataMongoDB = DataMongoDB(config['database']) self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue']) self.data_hosts: List[DataHost] = [] + self.internal_callback_url = config.get('internal_callback_url', None) for config_host in config['hosts']: self.data_hosts.append(DataHost(config_host)) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 5e1bd9655a..222c2011d3 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -84,7 +84,11 @@ def __init__(self, config_path: str, host: str, port: int) -> None: self.processing_requests_cache = {} # Used by processing workers and/or processor servers to report back the results - self.internal_job_callback_url = f'http://{host}:{port}/result_callback' + if self.deployer.internal_callback_url: + host = self.deployer.internal_callback_url + self.internal_job_callback_url = f'{host.rstrip("/")}/result_callback' + else: + self.internal_job_callback_url = f'http://{host}:{port}/result_callback' # Create routes self.router.add_api_route( @@ -473,7 +477,7 @@ async def push_to_processor_server(self, processor_name: str, processor_server_u status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to json dump the PYJobInput, error: {e}" ) - + # TODO: The amount of pages should come as a request input # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 # currently, use 200 as a default diff --git a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml index ed6fd3d1e9..17a723bf9a 100644 --- a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml +++ b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml @@ -6,6 +6,9 @@ additionalProperties: false required: - process_queue properties: + internal_callback_url: + description: optionally set the host for the internal_callback_url, for example "http://172.17.0.1:8080" + type: string process_queue: description: Information about the Message Queue type: object From b6d7bafa0b8779965bd5d1e76d3cb7e451a0987e Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 28 Aug 2023 11:59:42 +0200 Subject: [PATCH 13/18] Adjust some log levels --- ocrd_network/ocrd_network/processing_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 222c2011d3..22fe323392 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -556,7 +556,7 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): workspace_key = workspace_id if workspace_id else path_to_mets if workspace_key not in self.processing_requests_cache: - self.log.exception(f"No internal queue available for workspace with key: {workspace_key}") + self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") return if not len(self.processing_requests_cache[workspace_key]): @@ -570,7 +570,7 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): data = await self.find_next_request_from_internal_queue(self.processing_requests_cache[workspace_key]) # Nothing was consumed from the internal queue if not data: - self.log.exception(f"No data was consumed from the internal queue") + self.log.debug("No data was consumed from the internal queue") return processor_name = data.processor_name From afd97c819dff220d8f0738da92657b46ea80d2b9 Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 28 Aug 2023 14:14:30 +0200 Subject: [PATCH 14/18] Fix not available job as depends_on --- ocrd_network/ocrd_network/processing_server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 22fe323392..5fdedf868a 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -279,7 +279,11 @@ async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: # Check the states of all dependent jobs for dependency_job_id in dependencies: self.log.debug(f"dependency_job_id: {dependency_job_id}") - dependency_job_state = (await db_get_processing_job(dependency_job_id)).state + try: + dependency_job_state = (await db_get_processing_job(dependency_job_id)).state + except ValueError: + # job_id not (yet) in db. Dependency not met + return False self.log.debug(f"dependency_job_state: {dependency_job_state}") # Found a dependent job whose state is not success if dependency_job_state != StateEnum.success: From 5e5da5c1b6069ebf67f836ca17066d21eac9db7e Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 28 Aug 2023 14:38:58 +0200 Subject: [PATCH 15/18] Use list instead of set for locked pages Apparently mongodb cannot store/load sets --- ocrd_network/ocrd_network/processing_server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 5fdedf868a..9b2ce65e71 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -384,7 +384,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ cache_current_request = True break # If there are request page ids that are already locked - if not locked_ws_pages[output_fileGrp].isdisjoint(page_ids): + if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids): self.log.debug(f"Caching the received request due to locked output file grp pages") cache_current_request = True break @@ -408,13 +408,13 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # Update locked pages by locking the pages in the request for output_fileGrp in data.output_file_grps: if output_fileGrp not in locked_ws_pages: - locked_ws_pages[output_fileGrp] = set() + locked_ws_pages[output_fileGrp] = [] # The page id list is not empty - only some pages are in the request if page_ids: - locked_ws_pages[output_fileGrp].update(page_ids) + locked_ws_pages[output_fileGrp].append(page_ids) else: # Lock all pages with a single value - locked_ws_pages[output_fileGrp].add("all_pages") + locked_ws_pages[output_fileGrp].append("all_pages") # Update the locked pages dictionary in the database await db_update_workspace( @@ -544,7 +544,7 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): if output_fileGrp in locked_ws_pages: if job_page_ids: # Unlock the previously locked pages - locked_ws_pages[output_fileGrp].difference_update(set(job_page_ids)) + locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages if x not in job_page_ids] else: # Remove the single variable used to indicate all pages are locked locked_ws_pages[output_fileGrp].remove("all_pages") From c17a596f9f0d5a839b1cd7636d0368853579a98d Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Tue, 29 Aug 2023 12:49:07 +0200 Subject: [PATCH 16/18] fix page locking --- ocrd_network/ocrd_network/processing_server.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 9b2ce65e71..40b07407db 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -393,7 +393,9 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets # If a record queue of this workspace_id does not exist in the requests cache if not self.processing_requests_cache.get(workspace_key, None): + self.log.debug(f"Creating an internal queue for workspace_key: {workspace_key}") self.processing_requests_cache[workspace_key] = [] + self.log.debug(f"Caching the processing request: {data}") # Add the processing request to the end of the internal queue self.processing_requests_cache[workspace_key].append(data) @@ -408,12 +410,15 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ # Update locked pages by locking the pages in the request for output_fileGrp in data.output_file_grps: if output_fileGrp not in locked_ws_pages: + self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") locked_ws_pages[output_fileGrp] = [] # The page id list is not empty - only some pages are in the request if page_ids: - locked_ws_pages[output_fileGrp].append(page_ids) + self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") + locked_ws_pages[output_fileGrp].extend(page_ids) else: # Lock all pages with a single value + self.log.debug(f"Locking all pages for `{output_fileGrp}`") locked_ws_pages[output_fileGrp].append("all_pages") # Update the locked pages dictionary in the database @@ -544,9 +549,12 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): if output_fileGrp in locked_ws_pages: if job_page_ids: # Unlock the previously locked pages - locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages if x not in job_page_ids] + self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {job_page_ids}") + locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if x not in job_page_ids] + self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}") else: # Remove the single variable used to indicate all pages are locked + self.log.debug(f"Unlocking all pages for: {output_fileGrp}") locked_ws_pages[output_fileGrp].remove("all_pages") # Update the locked pages dictionary in the database From efbb6282c712ddc37a56f2fdf23715d1d3cc2c97 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Wed, 30 Aug 2023 13:01:27 +0200 Subject: [PATCH 17/18] multiple requests consumed from internal queue --- .../ocrd_network/processing_server.py | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 40b07407db..b94e98db28 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -290,8 +290,8 @@ async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: return False return True - async def find_next_request_from_internal_queue(self, internal_queue: List[PYJobInput]) -> PYJobInput: - found_request = None + async def find_next_requests_from_internal_queue(self, internal_queue: List[PYJobInput]) -> List[PYJobInput]: + found_requests = [] for i, current_element in enumerate(internal_queue): # Request has other job dependencies if current_element.depends_on: @@ -304,8 +304,8 @@ async def find_next_request_from_internal_queue(self, internal_queue: List[PYJob # Consume the request from the internal queue found_request = internal_queue.pop(i) self.log.debug(f"found cached request to be processed: {found_request}") - break - return found_request + found_requests.append(found_request) + return found_requests def query_ocrd_tool_json_from_server(self, processor_name): processor_server_url = self.deployer.resolve_processor_server_url(processor_name) @@ -579,34 +579,35 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") return - data = await self.find_next_request_from_internal_queue(self.processing_requests_cache[workspace_key]) - # Nothing was consumed from the internal queue - if not data: + consumed_requests = await self.find_next_requests_from_internal_queue(self.processing_requests_cache[workspace_key]) + + if not len(consumed_requests): self.log.debug("No data was consumed from the internal queue") return - processor_name = data.processor_name - # Create a DB entry - job = DBProcessorJob( - **data.dict(exclude_unset=True, exclude_none=True), - internal_callback_url=self.internal_job_callback_url, - state=StateEnum.queued - ) - await job.insert() - - job_output = None - if data.agent_type == 'worker': - ocrd_tool = await self.get_processor_info(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, data) - processing_message = self.create_processing_message(job) - await self.push_to_processing_queue(processor_name, processing_message) - job_output = job.to_job_output() - if data.agent_type == 'server': - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, data) - job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) - if not job_output: - self.log.exception(f'Failed to create job output for job input data: {data}') + for data in consumed_requests: + processor_name = data.processor_name + # Create a DB entry + job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + internal_callback_url=self.internal_job_callback_url, + state=StateEnum.queued + ) + await job.insert() + + job_output = None + if data.agent_type == 'worker': + ocrd_tool = await self.get_processor_info(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(job) + await self.push_to_processing_queue(processor_name, processing_message) + job_output = job.to_job_output() + if data.agent_type == 'server': + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) + if not job_output: + self.log.exception(f'Failed to create job output for job input data: {data}') async def get_processor_info(self, processor_name) -> Dict: """ Return a processor's ocrd-tool.json From 79016bf94b8fb8d724d1fb23b5b435fc9dc66df8 Mon Sep 17 00:00:00 2001 From: mehmedGIT Date: Wed, 30 Aug 2023 13:40:16 +0200 Subject: [PATCH 18/18] improve code: locking/unlocking --- .../ocrd_network/processing_server.py | 116 ++++++++++++------ 1 file changed, 76 insertions(+), 40 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index b94e98db28..6a028eba88 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -1,7 +1,7 @@ import json import requests import httpx -from typing import Dict, List, Optional +from typing import Dict, List import uvicorn from fastapi import FastAPI, status, Request, HTTPException @@ -274,6 +274,52 @@ def check_if_queue_exists(self, processor_name): detail=f"Process queue with id '{processor_name}' not existing" ) + def check_if_locked_pages_for_output_file_grps( + self, + locked_ws_pages: Dict, + output_file_grps: List[str], + page_ids: List[str] + ) -> bool: + for output_fileGrp in output_file_grps: + self.log.debug(f"Checking output file group: {output_fileGrp}") + if output_fileGrp in locked_ws_pages: + self.log.debug(f"Locked workspace pages has entry for output file group: {output_fileGrp}") + if "all_pages" in locked_ws_pages[output_fileGrp]: + self.log.debug(f"Caching the received request due to locked output file grp pages") + return True + # If there are request page ids that are already locked + if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids): + self.log.debug(f"Caching the received request due to locked output file grp pages") + return True + + def lock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): + for output_fileGrp in output_file_grps: + if output_fileGrp not in locked_ws_pages: + self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") + locked_ws_pages[output_fileGrp] = [] + # The page id list is not empty - only some pages are in the request + if page_ids: + self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") + locked_ws_pages[output_fileGrp].extend(page_ids) + else: + # Lock all pages with a single value + self.log.debug(f"Locking all pages for `{output_fileGrp}`") + locked_ws_pages[output_fileGrp].append("all_pages") + + def unlock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): + for output_fileGrp in output_file_grps: + if output_fileGrp in locked_ws_pages: + if page_ids: + # Unlock the previously locked pages + self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") + locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if + x not in page_ids] + self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}") + else: + # Remove the single variable used to indicate all pages are locked + self.log.debug(f"Unlocking all pages for: {output_fileGrp}") + locked_ws_pages[output_fileGrp].remove("all_pages") + # Returns true if all dependent jobs' states are success, else false async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: # Check the states of all dependent jobs @@ -374,20 +420,16 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ cache_current_request = True locked_ws_pages = workspace_db.pages_locked - # No need for further check if the request should be cached + + # No need for further check of locked pages dependency + # if the request should be already cached if not cache_current_request: # Check if there are any locked pages for the current request - for output_fileGrp in data.output_file_grps: - if output_fileGrp in locked_ws_pages: - if "all_pages" in locked_ws_pages[output_fileGrp]: - self.log.debug(f"Caching the received request due to locked output file grp pages") - cache_current_request = True - break - # If there are request page ids that are already locked - if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids): - self.log.debug(f"Caching the received request due to locked output file grp pages") - cache_current_request = True - break + cache_current_request = self.check_if_locked_pages_for_output_file_grps( + locked_ws_pages=locked_ws_pages, + output_file_grps=data.output_file_grps, + page_ids=page_ids + ) if cache_current_request: workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets @@ -408,18 +450,11 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ ) else: # Update locked pages by locking the pages in the request - for output_fileGrp in data.output_file_grps: - if output_fileGrp not in locked_ws_pages: - self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") - locked_ws_pages[output_fileGrp] = [] - # The page id list is not empty - only some pages are in the request - if page_ids: - self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") - locked_ws_pages[output_fileGrp].extend(page_ids) - else: - # Lock all pages with a single value - self.log.debug(f"Locking all pages for `{output_fileGrp}`") - locked_ws_pages[output_fileGrp].append("all_pages") + self.lock_pages( + locked_ws_pages=locked_ws_pages, + output_file_grps=data.output_file_grps, + page_ids=page_ids + ) # Update the locked pages dictionary in the database await db_update_workspace( @@ -477,7 +512,12 @@ async def push_to_processing_queue(self, processor_name: str, processing_message detail=f'RMQPublisher has failed: {error}' ) - async def push_to_processor_server(self, processor_name: str, processor_server_url: str, job_input: PYJobInput) -> PYJobOutput: + async def push_to_processor_server( + self, + processor_name: str, + processor_server_url: str, + job_input: PYJobInput + ) -> PYJobOutput: try: json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True)) except Exception as e: @@ -543,19 +583,13 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): if not workspace_db: self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") - # Update locked pages by unlocking the pages in the request locked_ws_pages = workspace_db.pages_locked - for output_fileGrp in job_output_file_grps: - if output_fileGrp in locked_ws_pages: - if job_page_ids: - # Unlock the previously locked pages - self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {job_page_ids}") - locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if x not in job_page_ids] - self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}") - else: - # Remove the single variable used to indicate all pages are locked - self.log.debug(f"Unlocking all pages for: {output_fileGrp}") - locked_ws_pages[output_fileGrp].remove("all_pages") + # Update locked pages by unlocking the pages in the request + self.unlock_pages( + locked_ws_pages=locked_ws_pages, + output_file_grps=job_output_file_grps, + page_ids=job_page_ids + ) # Update the locked pages dictionary in the database await db_update_workspace( @@ -575,11 +609,13 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): # The queue is empty - delete it try: del self.processing_requests_cache[workspace_key] - except KeyError as ex: + except KeyError: self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") return - consumed_requests = await self.find_next_requests_from_internal_queue(self.processing_requests_cache[workspace_key]) + consumed_requests = await self.find_next_requests_from_internal_queue( + internal_queue=self.processing_requests_cache[workspace_key] + ) if not len(consumed_requests): self.log.debug("No data was consumed from the internal queue")