From 4e1af90ec83d73953d840716f469a547597123c8 Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Tue, 10 Mar 2026 11:19:27 +0100 Subject: [PATCH 1/7] feat(stream): add SSE support --- app/routers/workflows.py | 30 ++++++++++++++++++++++-------- pyproject.toml | 1 + uv.lock | 6 ++++-- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/app/routers/workflows.py b/app/routers/workflows.py index 175ede5..19fbadf 100644 --- a/app/routers/workflows.py +++ b/app/routers/workflows.py @@ -3,10 +3,10 @@ import asyncio from fastapi import APIRouter, Depends, HTTPException, Request -from fastapi.responses import StreamingResponse from pydantic import BaseModel from sqlalchemy.exc import SQLAlchemyError from sqlmodel import Session, select +from sse_starlette import EventSourceResponse, ServerSentEvent from temporalio.client import Client from app.auth import AuthContext, decode_access_token @@ -163,14 +163,28 @@ async def workflow_event(request: Request, workflow_id: str): workflow = session.exec( select(Workflow).where(Workflow.public_id == workflow_id) ).one() - if workflow.status.name == "ERROR" or workflow.status.name == "SUCCESS": - yield workflow.status.name - break - yield workflow.status.name + status = workflow.status.name + yield ServerSentEvent(data=status) + + if status in {"ERROR", "SUCCESS"}: + yield ServerSentEvent(data="{}", event="done") + return + except SQLAlchemyError as e: - print("Error(stream)", e) - raise HTTPException(status_code=500) + print("Error in fetching from database (stream_workflow)", e) + yield ServerSentEvent( + data="Failed to read workflow status.", + event="error", + ) + break + + except Exception as e: + print("Error(stream_workflow)", e) + yield ServerSentEvent( + data="An unexpected error occurred while streaming workflow results.", # noqa: E501 + event="error", + ) await asyncio.sleep(STREAM_DELAY) @@ -211,6 +225,6 @@ async def stream( verify_tenant_owns_workflow(auth, workflow) - return StreamingResponse( + return EventSourceResponse( workflow_event(request, workflow_id), media_type="text/event-stream" ) diff --git a/pyproject.toml b/pyproject.toml index c3b5c8f..636cc37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "temporalio>=1.7.0", "PyJWT>=2.0.0", "cryptography>=44.0.0", + "sse-starlette>=3.2.0", ] [project.scripts] diff --git a/uv.lock b/uv.lock index b5a52f4..0847ed1 100644 --- a/uv.lock +++ b/uv.lock @@ -117,6 +117,7 @@ dependencies = [ { name = "pymupdf4llm" }, { name = "python-multipart" }, { name = "sqlalchemy" }, { name = "sqlmodel" }, + { name = "sse-starlette" }, { name = "temporalio" }, { name = "typer" }, ] @@ -145,6 +146,7 @@ requires-dist = [ { name = "pymupdf4llm", specifier = "==0.2.4" }, { name = "python-multipart", specifier = ">=0.0.9" }, { name = "sqlalchemy", specifier = ">=2.0.46" }, { name = "sqlmodel", specifier = ">=0.0.37" }, + { name = "sse-starlette", specifier = ">=3.2.0" }, { name = "temporalio", specifier = ">=1.7.0" }, { name = "typer", specifier = ">=0.15.0" }, ] @@ -290,6 +292,8 @@ sdist = { url = "https://files.pythonhosted.org/packages/92/88/b8527e1b00c1811db wheels = [ { url = "https://files.pythonhosted.org/packages/69/ca/a08fdc7efdcc24e6a6131a93c85be1f204d41c58f474c42b0670af8c016b/caio-0.9.25-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:fab6078b9348e883c80a5e14b382e6ad6aabbc4429ca034e76e730cf464269db", size = 36978, upload-time = "2025-12-26T15:21:41.055Z" }, { url = "https://files.pythonhosted.org/packages/5e/6c/d4d24f65e690213c097174d26eda6831f45f4734d9d036d81790a27e7b78/caio-0.9.25-cp314-cp314-manylinux2010_x86_64.manylinux2014_x86_64.manylinux_2_12_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:44a6b58e52d488c75cfaa5ecaa404b2b41cc965e6c417e03251e868ecd5b6d77", size = 81832, upload-time = "2025-12-26T15:22:22.757Z" }, + { url = "https://files.pythonhosted.org/packages/87/a4/e534cf7d2d0e8d880e25dd61e8d921ffcfe15bd696734589826f5a2df727/caio-0.9.25-cp314-cp314-manylinux_2_34_aarch64.whl", hash = "sha256:628a630eb7fb22381dd8e3c8ab7f59e854b9c806639811fc3f4310c6bd711d79", size = 81565, upload-time = "2026-03-04T22:08:27.483Z" }, + { url = "https://files.pythonhosted.org/packages/3f/ed/bf81aeac1d290017e5e5ac3e880fd56ee15e50a6d0353986799d1bc5cfd5/caio-0.9.25-cp314-cp314-manylinux_2_34_x86_64.whl", hash = "sha256:0ba16aa605ccb174665357fc729cf500679c2d94d5f1458a6f0d5ca48f2060a7", size = 80071, upload-time = "2026-03-04T22:08:28.751Z" }, { url = "https://files.pythonhosted.org/packages/86/93/1f76c8d1bafe3b0614e06b2195784a3765bbf7b0a067661af9e2dd47fc33/caio-0.9.25-py3-none-any.whl", hash = "sha256:06c0bb02d6b929119b1cfbe1ca403c768b2013a369e2db46bfa2a5761cf82e40", size = 19087, upload-time = "2025-12-26T15:22:00.221Z" }, ] @@ -845,7 +849,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ae/fb/011c7c717213182caf78084a9bea51c8590b0afda98001f69d9f853a495b/greenlet-3.3.1-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:bd59acd8529b372775cd0fcbc5f420ae20681c5b045ce25bd453ed8455ab99b5", size = 275737, upload-time = "2026-01-23T15:32:16.889Z" }, { url = "https://files.pythonhosted.org/packages/41/2e/a3a417d620363fdbb08a48b1dd582956a46a61bf8fd27ee8164f9dfe87c2/greenlet-3.3.1-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b31c05dd84ef6871dd47120386aed35323c944d86c3d91a17c4b8d23df62f15b", size = 646422, upload-time = "2026-01-23T16:01:00.354Z" }, { url = "https://files.pythonhosted.org/packages/b4/09/c6c4a0db47defafd2d6bab8ddfe47ad19963b4e30f5bed84d75328059f8c/greenlet-3.3.1-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:02925a0bfffc41e542c70aa14c7eda3593e4d7e274bfcccca1827e6c0875902e", size = 658219, upload-time = "2026-01-23T16:05:30.956Z" }, - { url = "https://files.pythonhosted.org/packages/e2/89/b95f2ddcc5f3c2bc09c8ee8d77be312df7f9e7175703ab780f2014a0e781/greenlet-3.3.1-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:3e0f3878ca3a3ff63ab4ea478585942b53df66ddde327b59ecb191b19dbbd62d", size = 671455, upload-time = "2026-01-23T16:15:57.232Z" }, { url = "https://files.pythonhosted.org/packages/80/38/9d42d60dffb04b45f03dbab9430898352dba277758640751dc5cc316c521/greenlet-3.3.1-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:34a729e2e4e4ffe9ae2408d5ecaf12f944853f40ad724929b7585bca808a9d6f", size = 660237, upload-time = "2026-01-23T15:32:53.967Z" }, { url = "https://files.pythonhosted.org/packages/96/61/373c30b7197f9e756e4c81ae90a8d55dc3598c17673f91f4d31c3c689c3f/greenlet-3.3.1-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:aec9ab04e82918e623415947921dea15851b152b822661cce3f8e4393c3df683", size = 1615261, upload-time = "2026-01-23T16:04:25.066Z" }, { url = "https://files.pythonhosted.org/packages/fd/d3/ca534310343f5945316f9451e953dcd89b36fe7a19de652a1dc5a0eeef3f/greenlet-3.3.1-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:71c767cf281a80d02b6c1bdc41c9468e1f5a494fb11bc8688c360524e273d7b1", size = 1683719, upload-time = "2026-01-23T15:33:50.61Z" }, @@ -854,7 +857,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/28/24/cbbec49bacdcc9ec652a81d3efef7b59f326697e7edf6ed775a5e08e54c2/greenlet-3.3.1-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:3e63252943c921b90abb035ebe9de832c436401d9c45f262d80e2d06cc659242", size = 282706, upload-time = "2026-01-23T15:33:05.525Z" }, { url = "https://files.pythonhosted.org/packages/86/2e/4f2b9323c144c4fe8842a4e0d92121465485c3c2c5b9e9b30a52e80f523f/greenlet-3.3.1-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:76e39058e68eb125de10c92524573924e827927df5d3891fbc97bd55764a8774", size = 651209, upload-time = "2026-01-23T16:01:01.517Z" }, { url = "https://files.pythonhosted.org/packages/d9/87/50ca60e515f5bb55a2fbc5f0c9b5b156de7d2fc51a0a69abc9d23914a237/greenlet-3.3.1-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:c9f9d5e7a9310b7a2f416dd13d2e3fd8b42d803968ea580b7c0f322ccb389b97", size = 654300, upload-time = "2026-01-23T16:05:32.199Z" }, - { url = "https://files.pythonhosted.org/packages/7c/25/c51a63f3f463171e09cb586eb64db0861eb06667ab01a7968371a24c4f3b/greenlet-3.3.1-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:4b9721549a95db96689458a1e0ae32412ca18776ed004463df3a9299c1b257ab", size = 662574, upload-time = "2026-01-23T16:15:58.364Z" }, { url = "https://files.pythonhosted.org/packages/1d/94/74310866dfa2b73dd08659a3d18762f83985ad3281901ba0ee9a815194fb/greenlet-3.3.1-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:92497c78adf3ac703b57f1e3813c2d874f27f71a178f9ea5887855da413cd6d2", size = 653842, upload-time = "2026-01-23T15:32:55.671Z" }, { url = "https://files.pythonhosted.org/packages/97/43/8bf0ffa3d498eeee4c58c212a3905dd6146c01c8dc0b0a046481ca29b18c/greenlet-3.3.1-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:ed6b402bc74d6557a705e197d47f9063733091ed6357b3de33619d8a8d93ac53", size = 1614917, upload-time = "2026-01-23T16:04:26.276Z" }, { url = "https://files.pythonhosted.org/packages/89/90/a3be7a5f378fc6e84abe4dcfb2ba32b07786861172e502388b4c90000d1b/greenlet-3.3.1-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:59913f1e5ada20fde795ba906916aea25d442abcc0593fba7e26c92b7ad76249", size = 1676092, upload-time = "2026-01-23T15:33:52.176Z" }, From 9399e2334ab7807d7f9a1f5bebe70d5860fa12e2 Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Thu, 12 Mar 2026 17:03:09 +0100 Subject: [PATCH 2/7] fix: accept both url and uri file paths --- app/activities/extract_pdf_content.py | 17 ++++++++++++----- app/config.py | 13 +++++++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/app/activities/extract_pdf_content.py b/app/activities/extract_pdf_content.py index 5950ab0..945a4e8 100644 --- a/app/activities/extract_pdf_content.py +++ b/app/activities/extract_pdf_content.py @@ -3,9 +3,12 @@ from temporalio import activity from temporalio.exceptions import ApplicationError +from app.config import Environment, get_settings from app.extractors import get_extractor from app.extractors.errors import InvalidPageSelectionError +settings = get_settings() + class ExtractPdfContentRequest(BaseModel): """Request to extract PDF content from a URL.""" @@ -27,11 +30,15 @@ class ExtractPdfContentResponse(BaseModel): async def extract_pdf_text( request: ExtractPdfContentRequest, ) -> ExtractPdfContentResponse: - """Download PDF from a URL and extract its content using the specified extractor.""" - async with httpx.AsyncClient() as client: - response = await client.get(request.url) - response.raise_for_status() - pdf_bytes = response.content + """Read a file and extract its text content using the specified extractor.""" + if settings.orcha_env in [Environment.LOCAL, Environment.DEV]: + with open(request.url, "rb") as f: + pdf_bytes = f.read() + else: + async with httpx.AsyncClient() as client: + response = await client.get(request.url) + response.raise_for_status() + pdf_bytes = response.content # Extract content using the extraction module try: diff --git a/app/config.py b/app/config.py index ffceaa0..a7c22e6 100644 --- a/app/config.py +++ b/app/config.py @@ -1,10 +1,20 @@ """Centralized application settings using Pydantic Settings.""" +from enum import Enum from functools import lru_cache from pydantic_settings import BaseSettings +class Environment(str, Enum): + """Application environment.""" + + LOCAL = "local" + DEV = "dev" + QA = "qa" + PROD = "prod" + + class Settings(BaseSettings): """Application configuration loaded from environment variables.""" @@ -35,6 +45,9 @@ class Settings(BaseSettings): ollama_base_url: str = "http://localhost:11434/v1" ollama_api_key: str | None = None + # Environment + orcha_env: Environment = Environment.LOCAL + @property def database_url(self) -> str: """Build the PostgreSQL connection string.""" From 9ad42138cf782d63c5ae94e6d08c8e57873169f3 Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Mon, 16 Mar 2026 17:36:49 +0100 Subject: [PATCH 3/7] refactor: manage errors in open file --- app/activities/extract_pdf_content.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/app/activities/extract_pdf_content.py b/app/activities/extract_pdf_content.py index 945a4e8..0749046 100644 --- a/app/activities/extract_pdf_content.py +++ b/app/activities/extract_pdf_content.py @@ -31,9 +31,15 @@ async def extract_pdf_text( request: ExtractPdfContentRequest, ) -> ExtractPdfContentResponse: """Read a file and extract its text content using the specified extractor.""" - if settings.orcha_env in [Environment.LOCAL, Environment.DEV]: - with open(request.url, "rb") as f: - pdf_bytes = f.read() + if settings.orcha_env in {Environment.LOCAL, Environment.DEV}: + try: + with open(request.url, "rb") as f: + pdf_bytes = f.read() + except FileNotFoundError as e: + raise ApplicationError( + str(e), + non_retryable=True, + ) from e else: async with httpx.AsyncClient() as client: response = await client.get(request.url) From 6e9a647f1947c089d48e71abccd497fff3d541dd Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Mon, 16 Mar 2026 23:22:09 +0100 Subject: [PATCH 4/7] feature(stream): improve streaming of results --- app/routers/workflows.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/app/routers/workflows.py b/app/routers/workflows.py index 19fbadf..4bf606b 100644 --- a/app/routers/workflows.py +++ b/app/routers/workflows.py @@ -1,6 +1,7 @@ """Workflow API routes with tenant-scoped access control.""" import asyncio +import json from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel @@ -164,12 +165,23 @@ async def workflow_event(request: Request, workflow_id: str): select(Workflow).where(Workflow.public_id == workflow_id) ).one() - status = workflow.status.name - yield ServerSentEvent(data=status) + status = workflow.status - if status in {"ERROR", "SUCCESS"}: - yield ServerSentEvent(data="{}", event="done") - return + if status == WorkflowStatus.SUCCESS: + yield ServerSentEvent( + data=json.dumps(workflow.result), event="metadata" + ) + yield ServerSentEvent(data="done", event="end") + break + + if status == WorkflowStatus.ERROR: + yield ServerSentEvent( + # TODO: improve it with a better error message for end users + data="The Temporal Workflow failed", + event="error", + ) + yield ServerSentEvent(data="done", event="end") + break except SQLAlchemyError as e: print("Error in fetching from database (stream_workflow)", e) @@ -177,6 +189,7 @@ async def workflow_event(request: Request, workflow_id: str): data="Failed to read workflow status.", event="error", ) + yield ServerSentEvent(data="done", event="end") break except Exception as e: @@ -185,6 +198,8 @@ async def workflow_event(request: Request, workflow_id: str): data="An unexpected error occurred while streaming workflow results.", # noqa: E501 event="error", ) + yield ServerSentEvent(data="done", event="end") + break await asyncio.sleep(STREAM_DELAY) From 21d3e79f49e18bd8b3351a5eefa17ee67f300ecc Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Tue, 17 Mar 2026 14:20:35 +0100 Subject: [PATCH 5/7] refactor: change README header for clarity --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c912cf4..12eef94 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Orcha Workflows +# Orcha Backend service for Orcha AI extraction, built with **FastAPI**, **Temporal**, and **PostgreSQL**. From 2a7497815c4ac197ec6cd25338785890c7f51867 Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Fri, 20 Mar 2026 11:15:03 +0100 Subject: [PATCH 6/7] fix: better error handling --- app/activities/extract_pdf_content.py | 22 +++++++--- app/config.py | 12 +----- app/errors.py | 9 ++++ app/routers/workflows.py | 59 +++++++++++++-------------- 4 files changed, 55 insertions(+), 47 deletions(-) create mode 100644 app/errors.py diff --git a/app/activities/extract_pdf_content.py b/app/activities/extract_pdf_content.py index 0749046..cf53cf4 100644 --- a/app/activities/extract_pdf_content.py +++ b/app/activities/extract_pdf_content.py @@ -1,9 +1,10 @@ import httpx +from fastapi import HTTPException from pydantic import BaseModel from temporalio import activity from temporalio.exceptions import ApplicationError -from app.config import Environment, get_settings +from app.config import get_settings from app.extractors import get_extractor from app.extractors.errors import InvalidPageSelectionError @@ -31,7 +32,7 @@ async def extract_pdf_text( request: ExtractPdfContentRequest, ) -> ExtractPdfContentResponse: """Read a file and extract its text content using the specified extractor.""" - if settings.orcha_env in {Environment.LOCAL, Environment.DEV}: + if settings.orcha_env in {"local", "dev"}: try: with open(request.url, "rb") as f: pdf_bytes = f.read() @@ -42,9 +43,20 @@ async def extract_pdf_text( ) from e else: async with httpx.AsyncClient() as client: - response = await client.get(request.url) - response.raise_for_status() - pdf_bytes = response.content + try: + response = await client.get(request.url) + response.raise_for_status() + pdf_bytes = response.content + except httpx.TimeoutException: + raise HTTPException( + status_code=504, + detail="Request to fetch file timed out.", + ) + except httpx.HTTPError as e: + raise HTTPException( + status_code=502, + detail=f"Failed to fetch file from URL: {e.response.text}.", + ) # Extract content using the extraction module try: diff --git a/app/config.py b/app/config.py index a7c22e6..f458321 100644 --- a/app/config.py +++ b/app/config.py @@ -1,20 +1,10 @@ """Centralized application settings using Pydantic Settings.""" -from enum import Enum from functools import lru_cache from pydantic_settings import BaseSettings -class Environment(str, Enum): - """Application environment.""" - - LOCAL = "local" - DEV = "dev" - QA = "qa" - PROD = "prod" - - class Settings(BaseSettings): """Application configuration loaded from environment variables.""" @@ -46,7 +36,7 @@ class Settings(BaseSettings): ollama_api_key: str | None = None # Environment - orcha_env: Environment = Environment.LOCAL + orcha_env: str = "local" @property def database_url(self) -> str: diff --git a/app/errors.py b/app/errors.py new file mode 100644 index 0000000..50c7f14 --- /dev/null +++ b/app/errors.py @@ -0,0 +1,9 @@ +"""Exception classes.""" + + +class WorkflowEventError(Exception): + """Exception raised when generating SSE events is failed.""" + + def __init__(self, error_code: str): + """Initialize exception.""" + self.error_code = error_code diff --git a/app/routers/workflows.py b/app/routers/workflows.py index 4bf606b..eb2e79e 100644 --- a/app/routers/workflows.py +++ b/app/routers/workflows.py @@ -6,6 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm.exc import NoResultFound from sqlmodel import Session, select from sse_starlette import EventSourceResponse, ServerSentEvent from temporalio.client import Client @@ -14,6 +15,7 @@ from app.database.models import Workflow, WorkflowStatus from app.database.session import get_db_session from app.dependencies import get_current_user +from app.errors import WorkflowEventError from app.workflows.extract_metadata_workflow import ( ExtractMetadata, ExtractMetadataWorkflowRequest, @@ -159,48 +161,43 @@ async def workflow_event(request: Request, workflow_id: str): if await request.is_disconnected(): break - with Session(request.app.state.db_engine) as session: - try: - workflow = session.exec( - select(Workflow).where(Workflow.public_id == workflow_id) - ).one() - - status = workflow.status - - if status == WorkflowStatus.SUCCESS: - yield ServerSentEvent( - data=json.dumps(workflow.result), event="metadata" - ) - yield ServerSentEvent(data="done", event="end") - break - - if status == WorkflowStatus.ERROR: - yield ServerSentEvent( - # TODO: improve it with a better error message for end users - data="The Temporal Workflow failed", - event="error", - ) - yield ServerSentEvent(data="done", event="end") - break - - except SQLAlchemyError as e: - print("Error in fetching from database (stream_workflow)", e) + try: + with Session(request.app.state.db_engine) as session: + try: + workflow = session.exec( + select(Workflow).where(Workflow.public_id == workflow_id) + ).one() + except NoResultFound: + raise WorkflowEventError(error_code="WORKFLOW_NOT_FOUND") + except SQLAlchemyError as e: + print("Error in fetching from database (stream_workflow)", e) + raise WorkflowEventError(error_code="DB_FETCH_FAILED") + + status = workflow.status + + if status == WorkflowStatus.SUCCESS: yield ServerSentEvent( - data="Failed to read workflow status.", - event="error", + data=json.dumps(workflow.result), event="metadata" ) yield ServerSentEvent(data="done", event="end") break - except Exception as e: - print("Error(stream_workflow)", e) + if status == WorkflowStatus.ERROR: yield ServerSentEvent( - data="An unexpected error occurred while streaming workflow results.", # noqa: E501 + data=json.dumps({"error_code": "WORKFLOW_FAILED"}), event="error", ) yield ServerSentEvent(data="done", event="end") break + except WorkflowEventError as e: + yield ServerSentEvent( + data=json.dumps({"error_code": e.error_code}), + event="error", + ) + yield ServerSentEvent(data="done", event="end") + break + await asyncio.sleep(STREAM_DELAY) From e970fcee2f0d7ed761063aa90cc20d329d947051 Mon Sep 17 00:00:00 2001 From: Maira Salazar Date: Tue, 28 Apr 2026 15:50:20 +0200 Subject: [PATCH 7/7] refactor: use match/case --- app/activities/extract_pdf_content.py | 2 +- app/config.py | 2 +- app/routers/workflows.py | 28 +++++++++++++-------------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/app/activities/extract_pdf_content.py b/app/activities/extract_pdf_content.py index cf53cf4..4d1136b 100644 --- a/app/activities/extract_pdf_content.py +++ b/app/activities/extract_pdf_content.py @@ -32,7 +32,7 @@ async def extract_pdf_text( request: ExtractPdfContentRequest, ) -> ExtractPdfContentResponse: """Read a file and extract its text content using the specified extractor.""" - if settings.orcha_env in {"local", "dev"}: + if settings.env in {"local", "dev"}: try: with open(request.url, "rb") as f: pdf_bytes = f.read() diff --git a/app/config.py b/app/config.py index f458321..684111d 100644 --- a/app/config.py +++ b/app/config.py @@ -36,7 +36,7 @@ class Settings(BaseSettings): ollama_api_key: str | None = None # Environment - orcha_env: str = "local" + env: str = "local" @property def database_url(self) -> str: diff --git a/app/routers/workflows.py b/app/routers/workflows.py index eb2e79e..a16e038 100644 --- a/app/routers/workflows.py +++ b/app/routers/workflows.py @@ -175,20 +175,20 @@ async def workflow_event(request: Request, workflow_id: str): status = workflow.status - if status == WorkflowStatus.SUCCESS: - yield ServerSentEvent( - data=json.dumps(workflow.result), event="metadata" - ) - yield ServerSentEvent(data="done", event="end") - break - - if status == WorkflowStatus.ERROR: - yield ServerSentEvent( - data=json.dumps({"error_code": "WORKFLOW_FAILED"}), - event="error", - ) - yield ServerSentEvent(data="done", event="end") - break + match status: + case WorkflowStatus.SUCCESS: + yield ServerSentEvent( + data=json.dumps(workflow.result), event="metadata" + ) + yield ServerSentEvent(data="done", event="end") + break + case WorkflowStatus.ERROR: + yield ServerSentEvent( + data=json.dumps({"error_code": "WORKFLOW_FAILED"}), + event="error", + ) + yield ServerSentEvent(data="done", event="end") + break except WorkflowEventError as e: yield ServerSentEvent(