Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Orcha Workflows
# Orcha

Backend service for Orcha AI extraction, built with **FastAPI**, **Temporal**, and **PostgreSQL**.

Expand Down
35 changes: 30 additions & 5 deletions app/activities/extract_pdf_content.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import httpx
from fastapi import HTTPException
from pydantic import BaseModel
from temporalio import activity
from temporalio.exceptions import ApplicationError

from app.config import get_settings
from app.extractors import get_extractor
from app.extractors.errors import InvalidPageSelectionError

settings = get_settings()


class ExtractPdfContentRequest(BaseModel):
"""Request to extract PDF content from a URL."""
Expand All @@ -27,11 +31,32 @@ class ExtractPdfContentResponse(BaseModel):
async def extract_pdf_text(
request: ExtractPdfContentRequest,
) -> ExtractPdfContentResponse:
"""Download PDF from a URL and extract its content using the specified extractor."""
async with httpx.AsyncClient() as client:
response = await client.get(request.url)
response.raise_for_status()
pdf_bytes = response.content
"""Read a file and extract its text content using the specified extractor."""
if settings.env in {"local", "dev"}:
try:
with open(request.url, "rb") as f:
pdf_bytes = f.read()
except FileNotFoundError as e:
raise ApplicationError(
str(e),
non_retryable=True,
) from e
else:
async with httpx.AsyncClient() as client:
try:
response = await client.get(request.url)
response.raise_for_status()
pdf_bytes = response.content
except httpx.TimeoutException:
raise HTTPException(
status_code=504,
detail="Request to fetch file timed out.",
)
except httpx.HTTPError as e:
raise HTTPException(
status_code=502,
detail=f"Failed to fetch file from URL: {e.response.text}.",
)

# Extract content using the extraction module
try:
Expand Down
3 changes: 3 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Settings(BaseSettings):
ollama_base_url: str = "http://localhost:11434/v1"
ollama_api_key: str | None = None

# Environment
env: str = "local"

@property
def database_url(self) -> str:
"""Build the PostgreSQL connection string."""
Expand Down
9 changes: 9 additions & 0 deletions app/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Exception classes."""


class WorkflowEventError(Exception):
"""Exception raised when generating SSE events is failed."""

def __init__(self, error_code: str):
"""Initialize exception."""
self.error_code = error_code
52 changes: 39 additions & 13 deletions app/routers/workflows.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
"""Workflow API routes with tenant-scoped access control."""

import asyncio
import json

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound
from sqlmodel import Session, select
from sse_starlette import EventSourceResponse, ServerSentEvent

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I see from the FastAPI SSE docs, you can import these from fastapi.sse. Does this maybe imply also that sse-starlette is already bundled with FastAPI (or comes via an extra?)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastapi.sse was introduced in version 0.135.0 and Orcha is on version 0.129.0, which I guess we could bump without issues :) From what I've seen, the implementation doesn't use sse-starlette for the SSE support

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yashlamba fyi, maybe we can look into using a newer version of fastapi

from temporalio.client import Client

from app.auth import AuthContext, decode_access_token
from app.database.models import Workflow, WorkflowStatus
from app.database.session import get_db_session
from app.dependencies import get_current_user
from app.errors import WorkflowEventError
from app.workflows.extract_metadata_workflow import (
ExtractMetadata,
ExtractMetadataWorkflowRequest,
Expand Down Expand Up @@ -158,19 +161,42 @@ async def workflow_event(request: Request, workflow_id: str):
if await request.is_disconnected():
break

with Session(request.app.state.db_engine) as session:
try:
workflow = session.exec(
select(Workflow).where(Workflow.public_id == workflow_id)
).one()
if workflow.status.name == "ERROR" or workflow.status.name == "SUCCESS":
yield workflow.status.name
try:
with Session(request.app.state.db_engine) as session:
try:
workflow = session.exec(
select(Workflow).where(Workflow.public_id == workflow_id)
).one()
except NoResultFound:
raise WorkflowEventError(error_code="WORKFLOW_NOT_FOUND")

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: still need to handle error codes on the ui side

except SQLAlchemyError as e:
print("Error in fetching from database (stream_workflow)", e)
raise WorkflowEventError(error_code="DB_FETCH_FAILED")

status = workflow.status

match status:
case WorkflowStatus.SUCCESS:
yield ServerSentEvent(
data=json.dumps(workflow.result), event="metadata"
)
yield ServerSentEvent(data="done", event="end")
break
case WorkflowStatus.ERROR:
yield ServerSentEvent(
data=json.dumps({"error_code": "WORKFLOW_FAILED"}),
event="error",
)
yield ServerSentEvent(data="done", event="end")
break

Comment thread
mairasalazar marked this conversation as resolved.
yield workflow.status.name
except SQLAlchemyError as e:
print("Error(stream)", e)
raise HTTPException(status_code=500)
except WorkflowEventError as e:
yield ServerSentEvent(
data=json.dumps({"error_code": e.error_code}),
event="error",
)
yield ServerSentEvent(data="done", event="end")
break

await asyncio.sleep(STREAM_DELAY)

Expand Down Expand Up @@ -211,6 +237,6 @@ async def stream(

verify_tenant_owns_workflow(auth, workflow)

return StreamingResponse(
return EventSourceResponse(
workflow_event(request, workflow_id), media_type="text/event-stream"
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"temporalio>=1.7.0",
"PyJWT>=2.0.0",
"cryptography>=44.0.0",
"sse-starlette>=3.2.0",
]

[project.scripts]
Expand Down
6 changes: 4 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading