diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 2daf71761b..acc3b1c58a 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -1,6 +1,6 @@ """ The database is used to store information regarding jobs and workspaces. -Jobs: for every process-request a job is inserted into the database with a uuid, status and +Jobs: for every process-request a job is inserted into the database with an uuid, status and information about the process like parameters and file groups. It is mainly used to track the status (`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished jobs are not deleted from the database. @@ -35,18 +35,77 @@ async def sync_initiate_database(db_url: str): await initiate_database(db_url) -async def db_get_workspace(workspace_id: str) -> DBWorkspace: - workspace = await DBWorkspace.find_one( - DBWorkspace.workspace_id == workspace_id - ) - if not workspace: - raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') +async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace: + workspace = None + if not workspace_id and not workspace_mets_path: + raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key') + if workspace_id: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_id == workspace_id + ) + if not workspace: + raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') + if workspace_mets_path: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_mets_path == workspace_mets_path + ) + if not workspace: + raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.') return workspace @call_sync -async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace: - return await db_get_workspace(workspace_id) +async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace: + return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path) + + +async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): + workspace = None + if not workspace_id and not workspace_mets_path: + raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key') + if workspace_id: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_id == workspace_id + ) + if not workspace: + raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') + if workspace_mets_path: + workspace = await DBWorkspace.find_one( + DBWorkspace.workspace_mets_path == workspace_mets_path + ) + if not workspace: + raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.') + + job_keys = list(workspace.__dict__.keys()) + for key, value in kwargs.items(): + if key not in job_keys: + raise ValueError(f'Field "{key}" is not available.') + if key == 'workspace_id': + workspace.workspace_id = value + elif key == 'workspace_mets_path': + workspace.workspace_mets_path = value + elif key == 'ocrd_identifier': + workspace.ocrd_identifier = value + elif key == 'bagit_profile_identifier': + workspace.bagit_profile_identifier = value + elif key == 'ocrd_base_version_checksum': + workspace.ocrd_base_version_checksum = value + elif key == 'ocrd_mets': + workspace.ocrd_mets = value + elif key == 'bag_info_adds': + workspace.bag_info_adds = value + elif key == 'deleted': + workspace.deleted = value + elif key == 'pages_locked': + workspace.pages_locked = value + else: + raise ValueError(f'Field "{key}" is not updatable.') + await workspace.save() + + +@call_sync +async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs): + await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs) async def db_get_processing_job(job_id: str) -> DBProcessorJob: @@ -68,8 +127,6 @@ async def db_update_processing_job(job_id: str, **kwargs): if not job: raise ValueError(f'Processing job with id "{job_id}" not in the DB.') - # TODO: This may not be the best Pythonic way to do it. However, it works! - # There must be a shorter way with Pydantic. Suggest an improvement. job_keys = list(job.__dict__.keys()) for key, value in kwargs.items(): if key not in job_keys: diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index f5b7f045d5..058fc6e293 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -39,6 +39,7 @@ def __init__(self, config_path: str) -> None: self.data_mongo: DataMongoDB = DataMongoDB(config['database']) self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue']) self.data_hosts: List[DataHost] = [] + self.internal_callback_url = config.get('internal_callback_url', None) for config_host in config['hosts']: self.data_hosts.append(DataHost(config_host)) @@ -302,7 +303,7 @@ def deploy_mongodb( ) -> str: if self.data_mongo.skip_deployment: self.log.debug('MongoDB is externaly managed. Skipping deployment') - verify_mongodb_available(self.data_mongo.url); + verify_mongodb_available(self.data_mongo.url) return self.data_mongo.url self.log.debug(f"Trying to deploy '{image}', with modes: " diff --git a/ocrd_network/ocrd_network/models/__init__.py b/ocrd_network/ocrd_network/models/__init__.py index 365e794bee..80dec8acd5 100644 --- a/ocrd_network/ocrd_network/models/__init__.py +++ b/ocrd_network/ocrd_network/models/__init__.py @@ -9,6 +9,7 @@ 'PYJobInput', 'PYJobOutput', 'PYOcrdTool', + 'PYResultMessage', 'StateEnum', ] @@ -18,5 +19,6 @@ PYJobOutput, StateEnum ) +from .messages import PYResultMessage from .ocrd_tool import PYOcrdTool from .workspace import DBWorkspace diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index aa50e6aad8..30fca8e424 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -7,15 +7,22 @@ class StateEnum(str, Enum): + # The processing job is cached inside the Processing Server requests cache + cached = 'CACHED' + # The processing job is queued inside the RabbitMQ queued = 'QUEUED' + # Processing job is currently running in a Worker or Processor Server running = 'RUNNING' + # Processing job finished successfully success = 'SUCCESS' + # Processing job failed failed = 'FAILED' class PYJobInput(BaseModel): """ Wraps the parameters required to make a run-processor-request """ + processor_name: Optional[str] = None path_to_mets: Optional[str] = None workspace_id: Optional[str] = None description: Optional[str] = None @@ -28,6 +35,10 @@ class PYJobInput(BaseModel): # Used to toggle between sending requests to 'worker and 'server', # i.e., Processing Worker and Processor Server, respectively agent_type: Optional[str] = 'worker' + # Auto generated by the Processing Server when forwarding to the Processor Server + job_id: Optional[str] = None + # If set, specifies a list of job ids this job depends on + depends_on: Optional[List[str]] = None class Config: schema_extra = { @@ -65,8 +76,10 @@ class DBProcessorJob(Document): output_file_grps: Optional[List[str]] page_id: Optional[str] parameters: Optional[dict] + depends_on: Optional[List[str]] result_queue_name: Optional[str] callback_url: Optional[str] + internal_callback_url: Optional[str] start_time: Optional[datetime] end_time: Optional[datetime] exec_time: Optional[str] diff --git a/ocrd_network/ocrd_network/models/messages.py b/ocrd_network/ocrd_network/models/messages.py new file mode 100644 index 0000000000..062f1a9a24 --- /dev/null +++ b/ocrd_network/ocrd_network/models/messages.py @@ -0,0 +1,22 @@ +from pydantic import BaseModel +from typing import Optional +from .job import StateEnum + + +class PYResultMessage(BaseModel): + """ Wraps the parameters required to make a result message request + """ + job_id: str + state: StateEnum + path_to_mets: Optional[str] = None + workspace_id: Optional[str] = None + + class Config: + schema_extra = { + 'example': { + 'job_id': '123123123', + 'state': 'SUCCESS', + 'path_to_mets': '/path/to/mets.xml', + 'workspace_id': 'c7f25615-fc17-4365-a74d-ad20e1ddbd0e' + } + } diff --git a/ocrd_network/ocrd_network/models/workspace.py b/ocrd_network/ocrd_network/models/workspace.py index 2a597b15ba..d05ddcf02d 100644 --- a/ocrd_network/ocrd_network/models/workspace.py +++ b/ocrd_network/ocrd_network/models/workspace.py @@ -1,5 +1,5 @@ from beanie import Document -from typing import Optional +from typing import Dict, Optional class DBWorkspace(Document): @@ -15,6 +15,10 @@ class DBWorkspace(Document): ocrd_mets Ocrd-Mets (optional) bag_info_adds bag-info.txt can also (optionally) contain additional key-value-pairs which are saved here + deleted the document is deleted if set, however, the record is still preserved + pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id` + that are currently being processed by an OCR-D processor (server or worker). + If no `page_id` field is set, an identifier "all_pages" will be used. """ workspace_id: str workspace_mets_path: str @@ -24,6 +28,10 @@ class DBWorkspace(Document): ocrd_mets: Optional[str] bag_info_adds: Optional[dict] deleted: bool = False + # Dictionary structure: + # Key: fileGrp + # Value: Set of `page_id`s + pages_locked: Optional[Dict] = {} class Settings: name = "workspace" diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 2a542219f1..6a028eba88 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -9,20 +9,29 @@ from fastapi.responses import JSONResponse from pika.exceptions import ChannelClosedByBroker - from ocrd_utils import getLogger -from .database import initiate_database +from .database import ( + initiate_database, + db_get_processing_job, + db_get_workspace, + db_update_workspace, +) from .deployer import Deployer from .models import ( DBProcessorJob, PYJobInput, PYJobOutput, + PYResultMessage, StateEnum ) -from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage +from .rabbitmq_utils import ( + RMQPublisher, + OcrdProcessingMessage +) from .server_utils import ( _get_processor_job, - validate_and_resolve_mets_path, + expand_page_ids, + validate_and_return_mets_path, validate_job_input, ) from .utils import ( @@ -69,6 +78,18 @@ def __init__(self, config_path: str, host: str, port: int) -> None: # Gets assigned when `connect_publisher` is called on the working object self.rmq_publisher = None + # Used for buffering/caching processing requests in the Processing Server + # Key: `workspace_id` or `path_to_mets` depending on which is provided + # Value: Queue that holds PYInputJob elements + self.processing_requests_cache = {} + + # Used by processing workers and/or processor servers to report back the results + if self.deployer.internal_callback_url: + host = self.deployer.internal_callback_url + self.internal_job_callback_url = f'{host.rstrip("/")}/result_callback' + else: + self.internal_job_callback_url = f'http://{host}:{port}/result_callback' + # Create routes self.router.add_api_route( path='/stop', @@ -102,6 +123,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None: response_model_exclude_none=True ) + self.router.add_api_route( + path='/result_callback', + endpoint=self.remove_from_request_cache, + methods=['POST'], + tags=['processing'], + status_code=status.HTTP_200_OK, + summary='Callback used by a worker or processor server for reporting result of a processing request', + ) + self.router.add_api_route( path='/processor/{processor_name}', endpoint=self.get_processor_info, @@ -226,6 +256,7 @@ def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: parameters=job.parameters, result_queue_name=job.result_queue_name, callback_url=job.callback_url, + internal_callback_url=job.internal_callback_url ) return processing_message @@ -243,6 +274,85 @@ def check_if_queue_exists(self, processor_name): detail=f"Process queue with id '{processor_name}' not existing" ) + def check_if_locked_pages_for_output_file_grps( + self, + locked_ws_pages: Dict, + output_file_grps: List[str], + page_ids: List[str] + ) -> bool: + for output_fileGrp in output_file_grps: + self.log.debug(f"Checking output file group: {output_fileGrp}") + if output_fileGrp in locked_ws_pages: + self.log.debug(f"Locked workspace pages has entry for output file group: {output_fileGrp}") + if "all_pages" in locked_ws_pages[output_fileGrp]: + self.log.debug(f"Caching the received request due to locked output file grp pages") + return True + # If there are request page ids that are already locked + if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids): + self.log.debug(f"Caching the received request due to locked output file grp pages") + return True + + def lock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): + for output_fileGrp in output_file_grps: + if output_fileGrp not in locked_ws_pages: + self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") + locked_ws_pages[output_fileGrp] = [] + # The page id list is not empty - only some pages are in the request + if page_ids: + self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") + locked_ws_pages[output_fileGrp].extend(page_ids) + else: + # Lock all pages with a single value + self.log.debug(f"Locking all pages for `{output_fileGrp}`") + locked_ws_pages[output_fileGrp].append("all_pages") + + def unlock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): + for output_fileGrp in output_file_grps: + if output_fileGrp in locked_ws_pages: + if page_ids: + # Unlock the previously locked pages + self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") + locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if + x not in page_ids] + self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}") + else: + # Remove the single variable used to indicate all pages are locked + self.log.debug(f"Unlocking all pages for: {output_fileGrp}") + locked_ws_pages[output_fileGrp].remove("all_pages") + + # Returns true if all dependent jobs' states are success, else false + async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: + # Check the states of all dependent jobs + for dependency_job_id in dependencies: + self.log.debug(f"dependency_job_id: {dependency_job_id}") + try: + dependency_job_state = (await db_get_processing_job(dependency_job_id)).state + except ValueError: + # job_id not (yet) in db. Dependency not met + return False + self.log.debug(f"dependency_job_state: {dependency_job_state}") + # Found a dependent job whose state is not success + if dependency_job_state != StateEnum.success: + return False + return True + + async def find_next_requests_from_internal_queue(self, internal_queue: List[PYJobInput]) -> List[PYJobInput]: + found_requests = [] + for i, current_element in enumerate(internal_queue): + # Request has other job dependencies + if current_element.depends_on: + self.log.debug(f"current_element: {current_element}") + self.log.debug(f"job dependencies: {current_element.depends_on}") + satisfied_dependencies = await self.check_if_job_dependencies_met(current_element.depends_on) + self.log.debug(f"satisfied dependencies: {satisfied_dependencies}") + if not satisfied_dependencies: + continue + # Consume the request from the internal queue + found_request = internal_queue.pop(i) + self.log.debug(f"found cached request to be processed: {found_request}") + found_requests.append(found_request) + return found_requests + def query_ocrd_tool_json_from_server(self, processor_name): processor_server_url = self.deployer.resolve_processor_server_url(processor_name) if not processor_server_url: @@ -266,16 +376,112 @@ def query_ocrd_tool_json_from_server(self, processor_name): return ocrd_tool, processor_server_url async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: + if data.job_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Job id field is set but must not be: {data.job_id}" + ) + # Generate processing job id + data.job_id = generate_id() + + # Append the processor name to the request itself + data.processor_name = processor_name + if data.agent_type not in ['worker', 'server']: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unknown network agent with value: {data.agent_type}" ) + workspace_db = await db_get_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets + ) + if not workspace_db: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found" + ) + + # Since the path is not resolved yet, + # the return value is not important for the Processing Server + await validate_and_return_mets_path(self.log, data) + + page_ids = expand_page_ids(data.page_id) + + # A flag whether the current request must be cached + # This is set to true if for any output fileGrp there + # is a page_id value that has been previously locked + cache_current_request = False + + # Check if there are any dependencies of the current request + if data.depends_on: + if not await self.check_if_job_dependencies_met(data.depends_on): + self.log.debug(f"Caching the received request due to job dependencies") + cache_current_request = True + + locked_ws_pages = workspace_db.pages_locked + + # No need for further check of locked pages dependency + # if the request should be already cached + if not cache_current_request: + # Check if there are any locked pages for the current request + cache_current_request = self.check_if_locked_pages_for_output_file_grps( + locked_ws_pages=locked_ws_pages, + output_file_grps=data.output_file_grps, + page_ids=page_ids + ) + + if cache_current_request: + workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets + # If a record queue of this workspace_id does not exist in the requests cache + if not self.processing_requests_cache.get(workspace_key, None): + self.log.debug(f"Creating an internal queue for workspace_key: {workspace_key}") + self.processing_requests_cache[workspace_key] = [] + self.log.debug(f"Caching the processing request: {data}") + # Add the processing request to the end of the internal queue + self.processing_requests_cache[workspace_key].append(data) + + return PYJobOutput( + job_id=data.job_id, + processor_name=processor_name, + workspace_id=data.workspace_id, + workspace_path=data.path_to_mets, + state=StateEnum.cached + ) + else: + # Update locked pages by locking the pages in the request + self.lock_pages( + locked_ws_pages=locked_ws_pages, + output_file_grps=data.output_file_grps, + page_ids=page_ids + ) + + # Update the locked pages dictionary in the database + await db_update_workspace( + workspace_id=data.workspace_id, + workspace_mets_path=data.path_to_mets, + pages_locked=locked_ws_pages + ) + + # Create a DB entry + job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + internal_callback_url=self.internal_job_callback_url, + state=StateEnum.queued + ) + await job.insert() + job_output = None if data.agent_type == 'worker': - job_output = await self.push_to_processing_queue(processor_name, data) + ocrd_tool = await self.get_processor_info(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(job) + await self.push_to_processing_queue(processor_name, processing_message) + job_output = job.to_job_output() if data.agent_type == 'server': - job_output = await self.push_to_processor_server(processor_name, data) + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) if not job_output: self.log.exception('Failed to create job output') raise HTTPException( @@ -285,10 +491,7 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ return job_output # TODO: Revisit and remove duplications between push_to_* methods - async def push_to_processing_queue(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: - ocrd_tool = await self.get_processor_info(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, job_input) - job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False) + async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage): if not self.rmq_publisher: raise Exception('RMQPublisher is not connected') deployed_processors = self.deployer.find_matching_processors( @@ -299,14 +502,6 @@ async def push_to_processing_queue(self, processor_name: str, job_input: PYJobIn if processor_name not in deployed_processors: self.check_if_queue_exists(processor_name) - job = DBProcessorJob( - **job_input.dict(exclude_unset=True, exclude_none=True), - job_id=generate_id(), - processor_name=processor_name, - state=StateEnum.queued - ) - await job.insert() - processing_message = self.create_processing_message(job) encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) try: self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) @@ -316,12 +511,13 @@ async def push_to_processing_queue(self, processor_name: str, job_input: PYJobIn status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f'RMQPublisher has failed: {error}' ) - return job.to_job_output() - async def push_to_processor_server(self, processor_name: str, job_input: PYJobInput) -> PYJobOutput: - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) - validate_job_input(self.log, processor_name, ocrd_tool, job_input) - job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=False) + async def push_to_processor_server( + self, + processor_name: str, + processor_server_url: str, + job_input: PYJobInput + ) -> PYJobOutput: try: json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True)) except Exception as e: @@ -330,7 +526,7 @@ async def push_to_processor_server(self, processor_name: str, job_input: PYJobIn status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to json dump the PYJobInput, error: {e}" ) - + # TODO: The amount of pages should come as a request input # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 # currently, use 200 as a default @@ -357,6 +553,98 @@ async def push_to_processor_server(self, processor_name: str, job_input: PYJobIn async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: return await _get_processor_job(self.log, processor_name, job_id) + async def remove_from_request_cache(self, result_message: PYResultMessage): + job_id = result_message.job_id + state = result_message.state + path_to_mets = result_message.path_to_mets + workspace_id = result_message.workspace_id + + self.log.debug(f"Received result for job with id: {job_id} has state: {state}") + + if state == StateEnum.failed: + # TODO: Call the callback to the Workflow server if the current processing step has failed + pass + + if state != StateEnum.success: + # TODO: Handle other potential error cases + pass + + job_db = await db_get_processing_job(job_id) + if not job_db: + self.log.exception(f"Processing job with id: {job_id} not found in DB") + job_output_file_grps = job_db.output_file_grps + job_page_ids = expand_page_ids(job_db.page_id) + + # Read DB workspace entry + workspace_db = await db_get_workspace( + workspace_id=workspace_id, + workspace_mets_path=path_to_mets + ) + if not workspace_db: + self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") + + locked_ws_pages = workspace_db.pages_locked + # Update locked pages by unlocking the pages in the request + self.unlock_pages( + locked_ws_pages=locked_ws_pages, + output_file_grps=job_output_file_grps, + page_ids=job_page_ids + ) + + # Update the locked pages dictionary in the database + await db_update_workspace( + workspace_id=workspace_id, + workspace_mets_path=path_to_mets, + pages_locked=locked_ws_pages + ) + + # Take the next request from the cache (if any available) + workspace_key = workspace_id if workspace_id else path_to_mets + + if workspace_key not in self.processing_requests_cache: + self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") + return + + if not len(self.processing_requests_cache[workspace_key]): + # The queue is empty - delete it + try: + del self.processing_requests_cache[workspace_key] + except KeyError: + self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") + return + + consumed_requests = await self.find_next_requests_from_internal_queue( + internal_queue=self.processing_requests_cache[workspace_key] + ) + + if not len(consumed_requests): + self.log.debug("No data was consumed from the internal queue") + return + + for data in consumed_requests: + processor_name = data.processor_name + # Create a DB entry + job = DBProcessorJob( + **data.dict(exclude_unset=True, exclude_none=True), + internal_callback_url=self.internal_job_callback_url, + state=StateEnum.queued + ) + await job.insert() + + job_output = None + if data.agent_type == 'worker': + ocrd_tool = await self.get_processor_info(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + processing_message = self.create_processing_message(job) + await self.push_to_processing_queue(processor_name, processing_message) + job_output = job.to_job_output() + if data.agent_type == 'server': + ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) + validate_job_input(self.log, processor_name, ocrd_tool, data) + job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) + if not job_output: + self.log.exception(f'Failed to create job output for job input data: {data}') + async def get_processor_info(self, processor_name) -> Dict: """ Return a processor's ocrd-tool.json """ diff --git a/ocrd_network/ocrd_network/processing_worker.py b/ocrd_network/ocrd_network/processing_worker.py index 8708f4596f..d8a7f01f7c 100644 --- a/ocrd_network/ocrd_network/processing_worker.py +++ b/ocrd_network/ocrd_network/processing_worker.py @@ -11,7 +11,6 @@ from datetime import datetime import logging from os import getpid -import requests import pika.spec import pika.adapters.blocking_connection @@ -36,6 +35,7 @@ ) from .utils import ( calculate_execution_time, + post_to_callback_url, tf_disable_interactive_logs, verify_database_uri, verify_and_parse_mq_uri @@ -192,6 +192,7 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: page_id = processing_message.page_id if 'page_id' in pm_keys else None result_queue_name = processing_message.result_queue_name if 'result_queue_name' in pm_keys else None callback_url = processing_message.callback_url if 'callback_url' in pm_keys else None + internal_callback_url = processing_message.internal_callback_url if 'internal_callback_url' in pm_keys else None parameters = processing_message.parameters if processing_message.parameters else {} if not path_to_mets and workspace_id: @@ -231,22 +232,25 @@ def process_message(self, processing_message: OcrdProcessingMessage) -> None: end_time=end_time, exec_time=f'{exec_duration} ms' ) - - if result_queue_name or callback_url: - result_message = OcrdResultMessage( - job_id=job_id, - state=job_state.value, - path_to_mets=path_to_mets, - # May not be always available - workspace_id=workspace_id - ) - self.log.info(f'Result message: {result_message}') - # If the result_queue field is set, send the result message to a result queue - if result_queue_name: - self.publish_to_result_queue(result_queue_name, result_message) - # If the callback_url field is set, post the result message to a callback url - if callback_url: - self.post_to_callback_url(callback_url, result_message) + result_message = OcrdResultMessage( + job_id=job_id, + state=job_state.value, + path_to_mets=path_to_mets, + # May not be always available + workspace_id=workspace_id + ) + self.log.info(f'Result message: {str(result_message)}') + # If the result_queue field is set, send the result message to a result queue + if result_queue_name: + self.publish_to_result_queue(result_queue_name, result_message) + if callback_url: + # If the callback_url field is set, + # post the result message (callback to a user defined endpoint) + post_to_callback_url(self.log, callback_url, result_message) + if internal_callback_url: + # If the internal callback_url field is set, + # post the result message (callback to Processing Server endpoint) + post_to_callback_url(self.log, internal_callback_url, result_message) def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultMessage): if self.rmq_publisher is None: @@ -261,18 +265,6 @@ def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultM message=encoded_result_message ) - def post_to_callback_url(self, callback_url: str, result_message: OcrdResultMessage): - self.log.info(f'Posting result message to callback_url "{callback_url}"') - headers = {"Content-Type": "application/json"} - json_data = { - "job_id": result_message.job_id, - "state": result_message.state, - "path_to_mets": result_message.path_to_mets, - "workspace_id": result_message.workspace_id - } - response = requests.post(url=callback_url, headers=headers, json=json_data) - self.log.info(f'Response from callback_url "{response}"') - def create_queue(self, connection_attempts=1, retry_delay=1): """Create the queue for this worker diff --git a/ocrd_network/ocrd_network/processor_server.py b/ocrd_network/ocrd_network/processor_server.py index 785b82c61b..b7abc6f148 100644 --- a/ocrd_network/ocrd_network/processor_server.py +++ b/ocrd_network/ocrd_network/processor_server.py @@ -4,7 +4,7 @@ from subprocess import run, PIPE import uvicorn -from fastapi import FastAPI, HTTPException, status, BackgroundTasks +from fastapi import FastAPI, HTTPException, status from ocrd_utils import ( get_ocrd_tool_json, @@ -23,13 +23,15 @@ StateEnum ) from .process_helpers import invoke_processor +from .rabbitmq_utils import OcrdResultMessage from .server_utils import ( _get_processor_job, - validate_and_resolve_mets_path, + validate_and_return_mets_path, validate_job_input ) from .utils import ( calculate_execution_time, + post_to_callback_url, generate_id, tf_disable_interactive_logs ) @@ -125,26 +127,30 @@ async def get_processor_info(self): # Note: The Processing server pushes to a queue, while # the Processor Server creates (pushes to) a background task - async def create_processor_task(self, job_input: PYJobInput, background_tasks: BackgroundTasks): + async def create_processor_task(self, job_input: PYJobInput): validate_job_input(self.log, self.processor_name, self.ocrd_tool, job_input) - job_input = await validate_and_resolve_mets_path(self.log, job_input, resolve=True) - - job_id = generate_id() - job = DBProcessorJob( - **job_input.dict(exclude_unset=True, exclude_none=True), - job_id=job_id, - processor_name=self.processor_name, - state=StateEnum.queued - ) - await job.insert() - await self.run_processor_task(job_id=job_id, job=job) + job_input.path_to_mets = await validate_and_return_mets_path(self.log, job_input) + + job = None + # The request is not forwarded from the Processing Server, assign a job_id + if not job_input.job_id: + job_id = generate_id() + # Create a DB entry + job = DBProcessorJob( + **job_input.dict(exclude_unset=True, exclude_none=True), + job_id=job_id, + processor_name=self.processor_name, + state=StateEnum.queued + ) + await job.insert() + await self.run_processor_task(job=job) return job.to_job_output() - async def run_processor_task(self, job_id: str, job: DBProcessorJob): + async def run_processor_task(self, job: DBProcessorJob): execution_failed = False start_time = datetime.now() await db_update_processing_job( - job_id=job_id, + job_id=job.job_id, state=StateEnum.running, start_time=start_time ) @@ -168,11 +174,27 @@ async def run_processor_task(self, job_id: str, job: DBProcessorJob): exec_duration = calculate_execution_time(start_time, end_time) job_state = StateEnum.success if not execution_failed else StateEnum.failed await db_update_processing_job( - job_id=job_id, + job_id=job.job_id, state=job_state, end_time=end_time, exec_time=f'{exec_duration} ms' ) + result_message = OcrdResultMessage( + job_id=job.job_id, + state=job_state.value, + path_to_mets=job.path_to_mets, + # May not be always available + workspace_id=job.workspace_id + ) + self.log.info(f'Result message: {result_message}') + if job.callback_url: + # If the callback_url field is set, + # post the result message (callback to a user defined endpoint) + post_to_callback_url(self.log, job.callback_url, result_message) + if job.internal_callback_url: + # If the internal callback_url field is set, + # post the result message (callback to Processing Server endpoint) + post_to_callback_url(self.log, job.internal_callback_url, result_message) def get_ocrd_tool(self): if self.ocrd_tool: diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py b/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py index 80f5e253a9..4016cd71bb 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/ocrd_messages.py @@ -18,7 +18,8 @@ def __init__( page_id: Optional[str], result_queue_name: Optional[str], callback_url: Optional[str], - parameters: Dict[str, Any] = None, + internal_callback_url: Optional[str], + parameters: Dict[str, Any] = None ) -> None: if not job_id: raise ValueError('job_id must be provided') @@ -47,6 +48,8 @@ def __init__( self.result_queue_name = result_queue_name if callback_url: self.callback_url = callback_url + if internal_callback_url: + self.internal_callback_url = internal_callback_url self.parameters = parameters if parameters else {} @staticmethod @@ -71,7 +74,8 @@ def decode_yml(ocrd_processing_message: bytes) -> OcrdProcessingMessage: page_id=data.get('page_id', None), parameters=data.get('parameters', None), result_queue_name=data.get('result_queue_name', None), - callback_url=data.get('callback_url', None) + callback_url=data.get('callback_url', None), + internal_callback_url=data.get('internal_callback_url', None) ) diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py index fffa8e8015..b30e856301 100644 --- a/ocrd_network/ocrd_network/server_utils.py +++ b/ocrd_network/ocrd_network/server_utils.py @@ -1,5 +1,11 @@ -from fastapi import FastAPI, HTTPException, status, BackgroundTasks +import re +from fastapi import HTTPException, status +from typing import Dict, List from ocrd_validators import ParameterValidator +from ocrd_utils import ( + generate_range, + REGEX_PREFIX +) from .database import ( db_get_processing_job, db_get_workspace, @@ -22,21 +28,35 @@ async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobO ) -async def validate_and_resolve_mets_path(logger, job_input: PYJobInput, resolve: bool = False) -> PYJobInput: +async def validate_and_return_mets_path(logger, job_input: PYJobInput) -> str: # This check is done to return early in case the workspace_id is provided # but the abs mets path cannot be queried from the DB if not job_input.path_to_mets and job_input.workspace_id: try: db_workspace = await db_get_workspace(job_input.workspace_id) - if resolve: - job_input.path_to_mets = db_workspace.workspace_mets_path + path_to_mets = db_workspace.workspace_mets_path except ValueError as e: logger.exception(f"Workspace with id '{job_input.workspace_id}' not existing: {e}") raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Workspace with id '{job_input.workspace_id}' not existing" ) - return job_input + return path_to_mets + return job_input.path_to_mets + + +def expand_page_ids(page_id: str) -> List: + page_ids = [] + if not page_id: + return page_ids + for page_id_token in re.split(r',', page_id): + if page_id_token.startswith(REGEX_PREFIX): + page_ids.append(re.compile(page_id_token[len(REGEX_PREFIX):])) + elif '..' in page_id_token: + page_ids += generate_range(*page_id_token.split('..', 1)) + else: + page_ids += [page_id_token] + return page_ids def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index d41a1b13ab..4098ef0fa0 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -10,6 +10,7 @@ from yaml import safe_load from ocrd_validators import ProcessingServerConfigValidator +from .rabbitmq_utils import OcrdResultMessage # Based on: https://gist.github.com/phizaz/20c36c6734878c6ec053245a477572ec @@ -108,3 +109,16 @@ def download_ocrd_all_tool_json(ocrd_all_url: str): if not response.status_code == 200: raise ValueError(f"Failed to download ocrd all tool json from: '{ocrd_all_url}'") return response.json() + + +def post_to_callback_url(logger, callback_url: str, result_message: OcrdResultMessage): + logger.info(f'Posting result message to callback_url "{callback_url}"') + headers = {"Content-Type": "application/json"} + json_data = { + "job_id": result_message.job_id, + "state": result_message.state, + "path_to_mets": result_message.path_to_mets, + "workspace_id": result_message.workspace_id + } + response = requests.post(url=callback_url, headers=headers, json=json_data) + logger.info(f'Response from callback_url "{response}"') diff --git a/ocrd_validators/ocrd_validators/message_processing.schema.yml b/ocrd_validators/ocrd_validators/message_processing.schema.yml index 3a8042bf42..b4363aeba0 100644 --- a/ocrd_validators/ocrd_validators/message_processing.schema.yml +++ b/ocrd_validators/ocrd_validators/message_processing.schema.yml @@ -58,8 +58,13 @@ properties: callback_url: description: The URL where the result message will be POST-ed to type: string - format: uri, - pattern: "^https?://" + format: uri + pattern: "^http?://" + internal_callback_url: + description: The URL where the internal result message will be POST-ed to the Processing Server + type: string + format: uri + pattern: "^http?://" created_time: description: The Unix timestamp when the message was created type: integer diff --git a/ocrd_validators/ocrd_validators/message_result.schema.yml b/ocrd_validators/ocrd_validators/message_result.schema.yml index aef62821ea..d2c87ba6e4 100644 --- a/ocrd_validators/ocrd_validators/message_result.schema.yml +++ b/ocrd_validators/ocrd_validators/message_result.schema.yml @@ -20,8 +20,10 @@ properties: description: The current status of the job type: string enum: - - SUCCESS + - CACHED + - QUEUED - RUNNING + - SUCCESS - FAILED path_to_mets: description: Path to a METS file diff --git a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml index ed6fd3d1e9..17a723bf9a 100644 --- a/ocrd_validators/ocrd_validators/processing_server_config.schema.yml +++ b/ocrd_validators/ocrd_validators/processing_server_config.schema.yml @@ -6,6 +6,9 @@ additionalProperties: false required: - process_queue properties: + internal_callback_url: + description: optionally set the host for the internal_callback_url, for example "http://172.17.0.1:8080" + type: string process_queue: description: Information about the Message Queue type: object