Skip to content

Commit 8231845

Browse files
committed
feature: HTTP task service
1 parent 4c6bb47 commit 8231845

16 files changed

Lines changed: 1418 additions & 9 deletions

icij-worker/Dockerfile

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# syntax=docker/dockerfile:1.14.0
2+
FROM python:3.11-slim-bullseye AS python-base
3+
4+
ENV HOME=/home/user
5+
WORKDIR $HOME
6+
RUN apt-get update && apt-get install -y curl
7+
8+
RUN curl -LsSf https://astral.sh/uv/0.6.7/install.sh | sh
9+
ENV PATH="$HOME/.local/bin:$PATH"
10+
ENV UV_LINK_MODE=copy
11+
ENV UV_COMPILE_BYTECODE=1
12+
13+
##### HTTP serivce
14+
FROM python-base AS http-service
15+
16+
ARG dbmate_arch
17+
WORKDIR $HOME/src/app
18+
RUN curl -fsSL -o /usr/local/bin/dbmate https://github.com/amacneil/dbmate/releases/download/v2.19.0/dbmate-linux-${dbmate_arch} \
19+
&& chmod +x /usr/local/bin/dbmate
20+
# Install deps first to optimize layer cache
21+
RUN --mount=type=cache,target=~/.cache/uv \
22+
--mount=type=bind,source=uv.lock,target=uv.lock \
23+
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
24+
uv sync -v --frozen --no-editable --no-sources --no-install-project --extra http --extra amqp --extra postgres
25+
# Then copy code
26+
ADD uv.lock pyproject.toml README.md ./
27+
ADD icij_worker ./icij_worker/
28+
# Then install service
29+
RUN uv sync -v --frozen --no-editable --no-sources --extra http --extra amqp --extra postgres
30+
RUN uv pip list
31+
RUN rm -rf ~/.cache/pip $(uv cache dir)
32+
33+
ENTRYPOINT ["uv", "run", "--frozen", "--no-editable", "--no-sources", "icij-http-server"]

icij-worker/docker-compose.yml

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
version: '3.7'
2+
3+
x-tm-amqp-config-variables: &tm-amqp-config
4+
TASK_MANAGER__RABBITMQ_HOST: rabbitmq
5+
6+
x-postgres-storage-config: &tm-postgres-storage-config
7+
TASK_MANAGER__BACKEND: amqp
8+
# Change this to a FSKeyValueStorageConfig if you don't want to use postgres
9+
TASK_MANAGER__STORAGE__HOST: postgres
10+
TASK_MANAGER__STORAGE__PORT: 5432
11+
TASK_MANAGER__STORAGE__PASSWORD: changeme
12+
13+
14+
services:
15+
rabbitmq:
16+
image: rabbitmq:3.12.0-management
17+
container_name: http-service-rabbitmq
18+
healthcheck:
19+
test: rabbitmq-diagnostics -q status
20+
interval: 5s
21+
timeout: 2s
22+
retries: 10
23+
start_period: 5s
24+
ports:
25+
- "5672:5672"
26+
- "15672:15672"
27+
28+
postgres:
29+
image: postgres
30+
container_name: http-service-postgres
31+
environment:
32+
POSTGRES_PASSWORD: changeme
33+
healthcheck:
34+
test: pg_isready
35+
interval: 2s
36+
timeout: 2s
37+
retries: 10
38+
start_period: 5s
39+
ports:
40+
- "5435:5432"
41+
42+
http-service:
43+
depends_on:
44+
rabbitmq:
45+
condition: service_healthy
46+
postgres:
47+
condition: service_healthy
48+
build:
49+
context: .
50+
target: http-service
51+
args:
52+
dbmate_arch: ${DBMATE_ARCH}
53+
environment:
54+
<<: [ *tm-amqp-config, *tm-postgres-storage-config ]
55+
PORT: 8000
56+
# Uncomment this and set it to your app path
57+
#TASK_MANAGER__APP_PATH: path.to.app_module.app_variable
58+
59+
healthcheck:
60+
test: curl -f http://localhost:8000/health
61+
interval: 5s
62+
timeout: 2s
63+
retries: 10
64+
start_period: 5s
65+
ports:
66+
- "8000:8000"

icij-worker/http_service

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env bash
2+
3+
function _export_globals() {
4+
DBMATE_ARCH=$(dbmate_arch)
5+
export DBMATE_ARCH
6+
}
7+
8+
function _helpers() {
9+
function dbmate_arch() {
10+
local host_arch
11+
if command -v arch >/dev/null 2>&1; then
12+
host_arch=$(arch)
13+
else
14+
host_arch=$(uname -m)
15+
fi
16+
local dbmate_arch_
17+
if [ "$host_arch" == "x86_64" ] ||[ "$host_arch" == "amd64" ]; then
18+
dbmate_arch_="amd64"
19+
elif [ "$host_arch" == "aarch64" ] || [ "$host_arch" == "arm64" ]; then
20+
dbmate_arch_="arm64"
21+
elif [ "$host_arch" == "i386" ] ; then
22+
dbmate_arch_="386"
23+
else
24+
_exit_with_message "Unsupported architecture $host_arch"
25+
fi
26+
echo "$dbmate_arch_"
27+
}
28+
29+
}
30+
31+
function _main() {
32+
set -e
33+
function _exit_with_message() {
34+
echo "$1"
35+
exit "${2:-1}"
36+
}
37+
_helpers
38+
_export_globals
39+
docker compose "$@"
40+
}
41+
42+
_main "$@"

icij-worker/icij_worker/cli/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
import icij_worker
88
from icij_common.logging_utils import setup_loggers
9+
from icij_worker.cli.tasks import task_app
910
from icij_worker.cli.workers import worker_app
1011

1112
cli_app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]})
1213
cli_app.add_typer(worker_app)
14+
cli_app.add_typer(task_app)
1315

1416

1517
def version_callback(value: bool):
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import asyncio
2+
import json
3+
import logging
4+
import sys
5+
from pathlib import Path
6+
from traceback import FrameSummary, StackSummary
7+
from typing import Annotated, Any
8+
9+
import typer
10+
from alive_progress import alive_bar
11+
12+
from icij_worker import TaskState
13+
from icij_worker.cli.utils import AsyncTyper, eprint
14+
from icij_worker.http_ import TaskClient
15+
from icij_worker.objects import READY_STATES, Task, TaskError
16+
17+
logger = logging.getLogger(__name__)
18+
19+
DEFAULT_SERVICE_ADDRESS = "http://localhost:8000"
20+
21+
_ARGS_HELP = "task argument as a JSON string or file path"
22+
_GROUP_HELP = "task group"
23+
_SERVICE_URL_HELP = "service URL address"
24+
_POLLING_INTERVAL_S_HELP = "task state polling interval in seconds"
25+
_NAME_HELP = "registered task name"
26+
_RESULT_HELP = "get a task result"
27+
_START_HELP = "creates a new task and start it"
28+
_TASK_ID_HELP = "task ID"
29+
_WATCH_HELP = "watch a task until it's complete"
30+
31+
TaskArgs = str
32+
33+
task_app = AsyncTyper(name="tasks")
34+
35+
36+
@task_app.command(help=_START_HELP)
37+
async def start(
38+
name: Annotated[str, typer.Argument(help=_NAME_HELP)],
39+
args: Annotated[TaskArgs, typer.Argument(help=_ARGS_HELP)] = None,
40+
group: Annotated[
41+
str | None,
42+
typer.Option("--group", "-g", help=_GROUP_HELP),
43+
] = None,
44+
service_address: Annotated[
45+
str, typer.Option("--ds-address", "-a", help=_SERVICE_URL_HELP)
46+
] = DEFAULT_SERVICE_ADDRESS,
47+
):
48+
match args:
49+
case str():
50+
as_path = Path(name)
51+
if as_path.exists():
52+
args = json.loads(as_path.read_text())
53+
else:
54+
args = json.loads(args)
55+
case None:
56+
args = dict()
57+
case _:
58+
raise TypeError(f"Invalid args {args}")
59+
client = TaskClient(service_address)
60+
async with client:
61+
task_id = await client.create_task(name, args, group=group)
62+
eprint(f"Task({task_id}) started !")
63+
eprint(f"Task({task_id}) 🛫")
64+
print(task_id)
65+
66+
67+
@task_app.command(help=_WATCH_HELP)
68+
async def watch(
69+
task_id: Annotated[str, typer.Argument(help=_TASK_ID_HELP)],
70+
service_address: Annotated[
71+
str, typer.Option("--ds-address", "-a", help=_SERVICE_URL_HELP)
72+
] = DEFAULT_SERVICE_ADDRESS,
73+
polling_interval_s: Annotated[
74+
float, typer.Option("--polling-interval-s", "-p", help=_POLLING_INTERVAL_S_HELP)
75+
] = 1.0,
76+
):
77+
client = TaskClient(service_address)
78+
async with client:
79+
task = await client.get_task(task_id)
80+
if task.state is READY_STATES:
81+
await _handle_ready(task, client, already_done=True)
82+
await _handle_alive(task, client, polling_interval_s)
83+
print(task_id)
84+
85+
86+
@task_app.command(help=_RESULT_HELP)
87+
async def result(
88+
task_id: Annotated[str, typer.Argument(help=_TASK_ID_HELP)],
89+
service_address: Annotated[
90+
str, typer.Option("--ds-address", "-a", help=_SERVICE_URL_HELP)
91+
] = DEFAULT_SERVICE_ADDRESS,
92+
) -> Any:
93+
client = TaskClient(service_address)
94+
async with client:
95+
res = await client.get_task_result(task_id)
96+
if isinstance(res, (dict, list)):
97+
res = json.dumps(res, indent=2)
98+
print(res)
99+
100+
101+
async def _handle_ready(
102+
task: Task, client: TaskClient, already_done: bool = False
103+
) -> None:
104+
match task.state:
105+
case TaskState.ERROR:
106+
await _handle_error(task, client)
107+
case TaskState.CANCELLED:
108+
await _handle_cancelled(task)
109+
case TaskState.DONE:
110+
if already_done:
111+
await _handle_already_done(task)
112+
else:
113+
await _handle_done(task)
114+
case _:
115+
raise ValueError(f"Unexpected task state {task.state}")
116+
117+
118+
async def _handle_error(task, client: TaskClient):
119+
error = await client.get_task_error(task.id)
120+
eprint(
121+
f"Task({task.id}) failed with the following"
122+
f" error:\n\n{_format_error(error)}"
123+
)
124+
eprint(f"Task({task.id}) ❌")
125+
raise typer.Exit(code=1)
126+
127+
128+
async def _handle_cancelled(task):
129+
eprint(f"Task({task.id}) was cancelled !")
130+
eprint(f"Task({task.id}) 🛑")
131+
raise typer.Exit(code=1)
132+
133+
134+
async def _handle_already_done(task):
135+
eprint(f"Task({task.id}) ✅ is already completed !")
136+
137+
138+
async def _handle_done(task):
139+
eprint(f"Task({task.id}) 🛬")
140+
eprint(f"Task({task.id}) ✅")
141+
142+
143+
async def _handle_alive(
144+
task: Task, client: TaskClient, polling_interval_s: float
145+
) -> None:
146+
title = f"Task({task.id}) 🛫"
147+
stats = "(ETA: {eta})"
148+
monitor = "{percent}"
149+
progress_bar = alive_bar(
150+
title=title, manual=True, stats=stats, monitor=monitor, file=sys.stderr
151+
)
152+
with progress_bar as bar:
153+
task_state = task.state
154+
while task_state not in READY_STATES:
155+
task = await client.get_task(task.id)
156+
task_state = task.state
157+
progress = task.progress or 0.0
158+
bar(progress) # pylint: disable=not-callable
159+
await asyncio.sleep(polling_interval_s)
160+
if task_state in READY_STATES:
161+
await _handle_ready(task, client)
162+
163+
164+
def _format_error(error: TaskError) -> str:
165+
stack = StackSummary.from_list(
166+
[FrameSummary(f.name, f.lineno, f.name) for f in error.stacktrace]
167+
)
168+
msg = f"{error.name}:\n{stack}\n{error.message}"
169+
if error.cause:
170+
msg += "\n cause by {error.cause}"
171+
return msg
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import inspect
5+
from functools import partial, wraps
6+
from typing import Any, Callable
7+
8+
from typer import Typer
9+
10+
11+
class AsyncTyper(Typer):
12+
@staticmethod
13+
def maybe_run_async(decorator: Callable, func: Callable) -> Any:
14+
if inspect.iscoroutinefunction(func):
15+
16+
@wraps(func)
17+
def runner(*args: Any, **kwargs: Any) -> Any:
18+
return asyncio.run(func(*args, **kwargs))
19+
20+
decorator(runner)
21+
else:
22+
decorator(func)
23+
return func
24+
25+
def callback(self, *args: Any, **kwargs: Any) -> Any:
26+
decorator = super().callback(*args, **kwargs)
27+
return partial(self.maybe_run_async, decorator)
28+
29+
def command(self, *args: Any, **kwargs: Any) -> Any:
30+
decorator = super().command(*args, **kwargs)
31+
return partial(self.maybe_run_async, decorator)
32+
33+
34+
def eprint(*args, **kwargs):
35+
print(*args, file=sys.stderr, **kwargs) # noqa: F821
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .task_client import ( # pylint: disable=useless-import-alias
2+
TaskClient as TaskClient,
3+
)
4+
5+
OTHER_TAG = "Other"
6+
TASKS_TAG = "Tasks"

0 commit comments

Comments
 (0)