From 48d85009e2146bdf5afb29097a28dcd2027bf8cf Mon Sep 17 00:00:00 2001 From: Matthew Horridge Date: Fri, 23 Jan 2026 15:02:38 -0800 Subject: [PATCH 1/4] Add RPC /api endpoint with harmonize and getJob - Implement RPC router with harmonize + getJob methods - Add in-memory job registry with row-based progress - Add progress callback support in harmonize_dataset - Add RuleRegistry.list_pairs helper - Document RPC usage in API examples --- src/harmonization_framework/api/app.py | 2 + .../api/harmonization_api_examples.md | 34 ++- .../api/routes/__init__.py | 2 + src/harmonization_framework/api/routes/rpc.py | 247 ++++++++++++++++++ src/harmonization_framework/harmonize.py | 18 +- src/harmonization_framework/rule_registry.py | 10 + 6 files changed, 310 insertions(+), 3 deletions(-) create mode 100644 src/harmonization_framework/api/routes/rpc.py diff --git a/src/harmonization_framework/api/app.py b/src/harmonization_framework/api/app.py index d5548f4..ba0f05f 100644 --- a/src/harmonization_framework/api/app.py +++ b/src/harmonization_framework/api/app.py @@ -1,7 +1,9 @@ from fastapi import FastAPI from harmonization_framework.api.routes.health import router as health_router +from harmonization_framework.api.routes.rpc import router as rpc_router app = FastAPI(title="Harmonization Framework API") app.include_router(health_router, prefix="/health") +app.include_router(rpc_router, prefix="/api") diff --git a/src/harmonization_framework/api/harmonization_api_examples.md b/src/harmonization_framework/api/harmonization_api_examples.md index bc5de40..1ad51f3 100644 --- a/src/harmonization_framework/api/harmonization_api_examples.md +++ b/src/harmonization_framework/api/harmonization_api_examples.md @@ -12,4 +12,36 @@ The following is a demo for using `curl` to interact with the Harmonization Fram ## Example Usage -This section will be updated when the Electron app exposes the relevant workflows. +### Harmonize (RPC) + +```bash +curl -X POST http://localhost:8000/api \ + -H "Content-Type: application/json" \ + -d '{ + "method": "harmonize", + "params": { + "data_path": "/absolute/path/to/input.csv", + "rules_path": "/absolute/path/to/rules.json", + "output_path": "/absolute/path/to/output.csv", + "replay_log_path": "/absolute/path/to/replay.log", + "mode": "pairs", + "pairs": [ + {"source": "col_a", "target": "col_b"} + ], + "overwrite": false + } + }' +``` + +### Get Job Status (RPC) + +```bash +curl -X POST http://localhost:8000/api \ + -H "Content-Type: application/json" \ + -d '{ + "method": "getJob", + "params": { + "job_id": "job-uuid" + } + }' +``` diff --git a/src/harmonization_framework/api/routes/__init__.py b/src/harmonization_framework/api/routes/__init__.py index 58055d8..7347021 100644 --- a/src/harmonization_framework/api/routes/__init__.py +++ b/src/harmonization_framework/api/routes/__init__.py @@ -1,7 +1,9 @@ # src/api/routes/__init__.py from .health import router as health_router +from .rpc import router as rpc_router __all__ = [ "health_router", + "rpc_router", ] diff --git a/src/harmonization_framework/api/routes/rpc.py b/src/harmonization_framework/api/routes/rpc.py new file mode 100644 index 0000000..4160b3c --- /dev/null +++ b/src/harmonization_framework/api/routes/rpc.py @@ -0,0 +1,247 @@ +import os +import threading +import uuid +from dataclasses import dataclass +from typing import Dict, List, Literal, Optional, Tuple + +import pandas as pd +from fastapi import APIRouter +from pydantic import BaseModel + +from harmonization_framework.harmonize import harmonize_dataset +from harmonization_framework.replay_log import replay_logger as rlog +from harmonization_framework.rule_registry import RuleRegistry + + +router = APIRouter() + + +class Pair(BaseModel): + source: str + target: str + + +class HarmonizeParams(BaseModel): + data_path: str + rules_path: str + output_path: str + replay_log_path: str + mode: Literal["pairs", "all"] + pairs: Optional[List[Pair]] = None + overwrite: bool = False + + +class RpcRequest(BaseModel): + method: str + params: Dict + + +class ErrorDetail(BaseModel): + code: str + message: str + details: Optional[Dict] = None + + +class RpcResponse(BaseModel): + status: str + result: Optional[Dict] = None + error: Optional[ErrorDetail] = None + job_id: Optional[str] = None + + +@dataclass +class JobInfo: + job_id: str + status: str + progress: float + output_path: str + replay_log_path: str + error: Optional[Dict] = None + result: Optional[Dict] = None + + +_jobs: Dict[str, JobInfo] = {} +_jobs_lock = threading.Lock() + + +def _error(code: str, message: str, details: Optional[Dict] = None) -> RpcResponse: + return RpcResponse( + status="error", + error=ErrorDetail(code=code, message=message, details=details), + ) + + +def _validate_paths(params: HarmonizeParams) -> Optional[RpcResponse]: + for path_name, path_value in [ + ("data_path", params.data_path), + ("rules_path", params.rules_path), + ("output_path", params.output_path), + ("replay_log_path", params.replay_log_path), + ]: + if not os.path.isabs(path_value): + return _error("INVALID_PATH", f"{path_name} must be an absolute path") + + if not os.path.exists(params.data_path): + return _error("FILE_NOT_FOUND", f"Data file not found: {params.data_path}") + if not os.path.exists(params.rules_path): + return _error("FILE_NOT_FOUND", f"Rules file not found: {params.rules_path}") + + if os.path.exists(params.output_path) and not params.overwrite: + return _error("ALREADY_EXISTS", f"Output path already exists: {params.output_path}") + + return None + + +def _load_rules(params: HarmonizeParams) -> Tuple[Optional[RuleRegistry], Optional[RpcResponse]]: + registry = RuleRegistry() + try: + registry.load(params.rules_path, clean=True) + except Exception as exc: + return None, _error("INVALID_FORMAT", f"Failed to load rules: {exc}") + return registry, None + + +def _resolve_pairs( + params: HarmonizeParams, registry: RuleRegistry +) -> Tuple[Optional[List[Tuple[str, str]]], Optional[RpcResponse]]: + if params.mode == "all": + pairs = registry.list_pairs() + if not pairs: + return None, _error("NOT_FOUND", "No rules found in rules file") + return pairs, None + + if not params.pairs: + return None, _error("MISSING_FIELD", "pairs is required when mode is 'pairs'") + + pairs = [(pair.source, pair.target) for pair in params.pairs] + for source, target in pairs: + try: + registry.query(source, target) + except Exception: + return None, _error( + "RULE_NOT_FOUND", + f"Rule not found for source={source} target={target}", + ) + return pairs, None + + +def _update_progress(job_id: str, processed: int, total: int) -> None: + with _jobs_lock: + job = _jobs.get(job_id) + if not job: + return + if total == 0: + job.progress = 1.0 + else: + job.progress = min(1.0, processed / total) + + +def _run_harmonize(job_id: str, params: HarmonizeParams) -> None: + with _jobs_lock: + job = _jobs[job_id] + job.status = "running" + job.progress = 0.0 + + validation_error = _validate_paths(params) + if validation_error: + with _jobs_lock: + job.status = "failed" + job.error = validation_error.error.model_dump() + return + + registry, error = _load_rules(params) + if error: + with _jobs_lock: + job.status = "failed" + job.error = error.error.model_dump() + return + + pairs, error = _resolve_pairs(params, registry) + if error: + with _jobs_lock: + job.status = "failed" + job.error = error.error.model_dump() + return + + os.makedirs(os.path.dirname(params.output_path), exist_ok=True) + os.makedirs(os.path.dirname(params.replay_log_path), exist_ok=True) + + dataset = pd.read_csv(params.data_path) + logger = rlog.configure_logger(3, params.replay_log_path) + + def progress_callback(processed: int, total: int) -> None: + _update_progress(job_id, processed, total) + + try: + harmonized = harmonize_dataset( + dataset=dataset, + harmonization_pairs=pairs, + rules=registry, + dataset_name=os.path.basename(params.data_path), + logger=logger, + progress_callback=progress_callback, + ) + harmonized.to_csv(params.output_path, index=False) + except Exception as exc: + with _jobs_lock: + job.status = "failed" + job.error = { + "code": "HARMONIZATION_FAILED", + "message": str(exc), + } + return + + with _jobs_lock: + job.status = "completed" + job.progress = 1.0 + job.result = { + "output_path": params.output_path, + "replay_log_path": params.replay_log_path, + } + + +@router.post("") +def rpc_call(request: RpcRequest) -> RpcResponse: + if request.method == "harmonize": + try: + params = HarmonizeParams(**request.params) + except Exception as exc: + return _error("VALIDATION_ERROR", str(exc)) + + job_id = str(uuid.uuid4()) + job = JobInfo( + job_id=job_id, + status="queued", + progress=0.0, + output_path=params.output_path, + replay_log_path=params.replay_log_path, + ) + with _jobs_lock: + _jobs[job_id] = job + + thread = threading.Thread(target=_run_harmonize, args=(job_id, params), daemon=True) + thread.start() + return RpcResponse(status="accepted", job_id=job_id) + + if request.method == "getJob": + job_id = request.params.get("job_id") + if not job_id: + return _error("MISSING_FIELD", "job_id is required") + with _jobs_lock: + job = _jobs.get(job_id) + if not job: + return _error("NOT_FOUND", f"Job not found: {job_id}") + return RpcResponse( + status="success", + result={ + "job_id": job.job_id, + "status": job.status, + "progress": job.progress, + "output_path": job.output_path, + "replay_log_path": job.replay_log_path, + "result": job.result, + "error": job.error, + }, + ) + + return _error("METHOD_NOT_FOUND", f"Unknown method: {request.method}") diff --git a/src/harmonization_framework/harmonize.py b/src/harmonization_framework/harmonize.py index d54784d..dea3db2 100644 --- a/src/harmonization_framework/harmonize.py +++ b/src/harmonization_framework/harmonize.py @@ -1,7 +1,7 @@ import os import pandas as pd -from typing import List, Optional, Tuple +from typing import Callable, List, Optional, Tuple from .rule_registry import RuleRegistry from .replay_log import replay_logger as rlog @@ -13,6 +13,7 @@ def harmonize_dataset( rules: RuleRegistry, dataset_name: str, logger=None, + progress_callback: Optional[Callable[[int, int], None]] = None, ) -> pd.DataFrame: """ Apply harmonization rules to the provided dataset and return a new dataframe. @@ -28,6 +29,7 @@ def harmonize_dataset( rules: RuleRegistry with harmonization rules keyed by source/target. dataset_name: Name used for the `source dataset` metadata column. logger: Optional replay logger for recording applied rules. + progress_callback: Optional callback invoked with (processed, total) counts. """ # make a new dataframe with the same number of rows and columns # and rename the columns @@ -37,6 +39,9 @@ def harmonize_dataset( inplace=True, ) # apply harmonization rule to each column + total_steps = len(dataset) * len(harmonization_pairs) if harmonization_pairs else 0 + processed = 0 + for source, target in harmonization_pairs: print(f"Requested rule: {source} -> {target}") rule = rules.query(source, target) @@ -44,7 +49,16 @@ def harmonize_dataset( if logger: rlog.log_operation(logger, rule, dataset_name) dataset_harmonized.rename(columns={source: target}, inplace=True) - dataset_harmonized[target] = dataset[source].apply(rule.transform) + + def transform_with_progress(value): + nonlocal processed + result = rule.transform(value) + processed += 1 + if progress_callback: + progress_callback(processed, total_steps) + return result + + dataset_harmonized[target] = dataset[source].apply(transform_with_progress) # save source dataset dataset_harmonized["source dataset"] = [dataset_name] * len(dataset) # save old ids diff --git a/src/harmonization_framework/rule_registry.py b/src/harmonization_framework/rule_registry.py index 0e1ceac..f22e09a 100644 --- a/src/harmonization_framework/rule_registry.py +++ b/src/harmonization_framework/rule_registry.py @@ -42,6 +42,16 @@ def add_rule(self, rule: HarmonizationRule): print(f"Warning: rule already exists for source {rule.source} and target {rule.target}.") self._rules[source][target] = rule + def list_pairs(self): + """ + Return a list of (source, target) pairs for all stored rules. + """ + pairs = [] + for source, targets in self._rules.items(): + for target in targets: + pairs.append((source, target)) + return pairs + def clean(self): """ Remove all rules from the store. From a21704083d066bbf6550dafc6d16ffff4e2d5c1f Mon Sep 17 00:00:00 2001 From: Matthew Horridge Date: Fri, 23 Jan 2026 18:38:52 -0800 Subject: [PATCH 2/4] Document RPC errors and add path validation tests - Expand RPC docs with error-code guidance and examples - Add detailed docstrings for validation and worker flow - Normalize method aliases and add error code enum - Add tests for path validation details --- .../api/harmonization_api_examples.md | 23 ++- src/harmonization_framework/api/routes/rpc.py | 173 ++++++++++++++++-- tests/test_rpc_paths.py | 79 ++++++++ 3 files changed, 257 insertions(+), 18 deletions(-) create mode 100644 tests/test_rpc_paths.py diff --git a/src/harmonization_framework/api/harmonization_api_examples.md b/src/harmonization_framework/api/harmonization_api_examples.md index 1ad51f3..5bfdc1e 100644 --- a/src/harmonization_framework/api/harmonization_api_examples.md +++ b/src/harmonization_framework/api/harmonization_api_examples.md @@ -39,9 +39,30 @@ curl -X POST http://localhost:8000/api \ curl -X POST http://localhost:8000/api \ -H "Content-Type: application/json" \ -d '{ - "method": "getJob", + "method": "get_job", "params": { "job_id": "job-uuid" } }' ``` + +## Error Responses + +Errors follow a stable schema: + +```json +{ + "status": "error", + "error": { + "code": "FILE_NOT_FOUND", + "message": "Rules file not found: /abs/path/to/rules.json", + "details": { + "path_type": "rules_path", + "path": "/abs/path/to/rules.json" + } + } +} +``` + +The `code` is a small, stable set of values. Use `details` to determine +which field/path caused the error. diff --git a/src/harmonization_framework/api/routes/rpc.py b/src/harmonization_framework/api/routes/rpc.py index 4160b3c..e903226 100644 --- a/src/harmonization_framework/api/routes/rpc.py +++ b/src/harmonization_framework/api/routes/rpc.py @@ -2,6 +2,7 @@ import threading import uuid from dataclasses import dataclass +from enum import Enum from typing import Dict, List, Literal, Optional, Tuple import pandas as pd @@ -16,12 +17,32 @@ router = APIRouter() +""" +RPC API router. + +Implements a single POST /api endpoint with method dispatch. Currently supported: +- harmonize: async CSV harmonization with row-based progress tracking +- get_job: retrieve status/progress/result for a job + +Method names use snake_case. The router also accepts camelCase aliases +(e.g., getJob) for convenience. + +Error handling: +- Error codes are intentionally small and stable (e.g., MISSING_FIELD, INVALID_PATH, + FILE_NOT_FOUND, INVALID_FORMAT, VALIDATION_ERROR, RULE_NOT_FOUND, JOB_NOT_FOUND). +- Callers should use the `details` object to identify which field/path caused the error. + Example: {"field": "rules_path"} or {"path_type": "rules_path", "path": "/abs/..."}. +""" + + class Pair(BaseModel): + """Source/target column pair.""" source: str target: str class HarmonizeParams(BaseModel): + """Parameters for the harmonize RPC call.""" data_path: str rules_path: str output_path: str @@ -32,25 +53,42 @@ class HarmonizeParams(BaseModel): class RpcRequest(BaseModel): + """RPC request envelope.""" method: str params: Dict class ErrorDetail(BaseModel): + """Structured error payload for RPC responses.""" code: str message: str details: Optional[Dict] = None class RpcResponse(BaseModel): + """RPC response envelope.""" status: str result: Optional[Dict] = None error: Optional[ErrorDetail] = None job_id: Optional[str] = None +class ErrorCode(str, Enum): + INVALID_PATH = "INVALID_PATH" + FILE_NOT_FOUND = "FILE_NOT_FOUND" + ALREADY_EXISTS = "ALREADY_EXISTS" + MISSING_FIELD = "MISSING_FIELD" + VALIDATION_ERROR = "VALIDATION_ERROR" + RULE_NOT_FOUND = "RULE_NOT_FOUND" + JOB_NOT_FOUND = "JOB_NOT_FOUND" + INVALID_FORMAT = "INVALID_FORMAT" + HARMONIZATION_FAILED = "HARMONIZATION_FAILED" + METHOD_NOT_FOUND = "METHOD_NOT_FOUND" + + @dataclass class JobInfo: + """In-memory job state for async operations.""" job_id: str status: str progress: float @@ -64,14 +102,36 @@ class JobInfo: _jobs_lock = threading.Lock() -def _error(code: str, message: str, details: Optional[Dict] = None) -> RpcResponse: +def _error(code: ErrorCode, message: str, details: Optional[Dict] = None) -> RpcResponse: + """Helper to build a standardized error response.""" return RpcResponse( status="error", - error=ErrorDetail(code=code, message=message, details=details), + error=ErrorDetail(code=code.value, message=message, details=details), ) +def _normalize_method(method: str) -> str: + """ + Normalize method names to snake_case and support legacy/camelCase aliases. + """ + aliases = { + "getJob": "get_job", + "harmonize": "harmonize", + "get_job": "get_job", + } + return aliases.get(method, method) + + def _validate_paths(params: HarmonizeParams) -> Optional[RpcResponse]: + """ + Validate input/output/replay paths and overwrite behavior. + + Rules: + - All paths must be absolute. + - data_path and rules_path must exist. + - output_path must not exist unless overwrite=True. + - replay_log_path may be a new file and will be created if needed. + """ for path_name, path_value in [ ("data_path", params.data_path), ("rules_path", params.rules_path), @@ -79,39 +139,65 @@ def _validate_paths(params: HarmonizeParams) -> Optional[RpcResponse]: ("replay_log_path", params.replay_log_path), ]: if not os.path.isabs(path_value): - return _error("INVALID_PATH", f"{path_name} must be an absolute path") + return _error( + ErrorCode.INVALID_PATH, + f"{path_name} must be an absolute path", + details={"path": path_value, "path_type": path_name}, + ) if not os.path.exists(params.data_path): - return _error("FILE_NOT_FOUND", f"Data file not found: {params.data_path}") + return _error( + ErrorCode.FILE_NOT_FOUND, + f"Data file not found: {params.data_path}", + details={"path": params.data_path, "path_type": "data_path"}, + ) if not os.path.exists(params.rules_path): - return _error("FILE_NOT_FOUND", f"Rules file not found: {params.rules_path}") + return _error( + ErrorCode.FILE_NOT_FOUND, + f"Rules file not found: {params.rules_path}", + details={"path": params.rules_path, "path_type": "rules_path"}, + ) if os.path.exists(params.output_path) and not params.overwrite: - return _error("ALREADY_EXISTS", f"Output path already exists: {params.output_path}") + return _error( + ErrorCode.ALREADY_EXISTS, + f"Output path already exists: {params.output_path}", + details={"path": params.output_path, "path_type": "output_path"}, + ) return None def _load_rules(params: HarmonizeParams) -> Tuple[Optional[RuleRegistry], Optional[RpcResponse]]: + """Load a RuleRegistry from a rules JSON file.""" registry = RuleRegistry() try: registry.load(params.rules_path, clean=True) except Exception as exc: - return None, _error("INVALID_FORMAT", f"Failed to load rules: {exc}") + return None, _error(ErrorCode.INVALID_FORMAT, f"Failed to load rules: {exc}") return registry, None def _resolve_pairs( params: HarmonizeParams, registry: RuleRegistry ) -> Tuple[Optional[List[Tuple[str, str]]], Optional[RpcResponse]]: + """Resolve requested (source, target) pairs based on mode and availability.""" if params.mode == "all": pairs = registry.list_pairs() if not pairs: - return None, _error("NOT_FOUND", "No rules found in rules file") + return None, _error( + ErrorCode.RULE_NOT_FOUND, + "No rules found in rules file", + details={"path": params.rules_path}, + ) return pairs, None if not params.pairs: - return None, _error("MISSING_FIELD", "pairs is required when mode is 'pairs'") + return None, _error( + ErrorCode.MISSING_FIELD, + "pairs is required when mode is 'pairs'", + details={"field": "pairs"}, + ) pairs = [(pair.source, pair.target) for pair in params.pairs] for source, target in pairs: @@ -119,13 +205,15 @@ def _resolve_pairs( registry.query(source, target) except Exception: return None, _error( - "RULE_NOT_FOUND", + ErrorCode.RULE_NOT_FOUND, f"Rule not found for source={source} target={target}", + details={"source": source, "target": target}, ) return pairs, None def _update_progress(job_id: str, processed: int, total: int) -> None: + """Update job progress in the in-memory registry.""" with _jobs_lock: job = _jobs.get(job_id) if not job: @@ -137,6 +225,19 @@ def _update_progress(job_id: str, processed: int, total: int) -> None: def _run_harmonize(job_id: str, params: HarmonizeParams) -> None: + """ + Worker that performs harmonization and updates job state. + + Workflow: + 1) Validate paths and overwrite behavior. + 2) Load rules from the registry JSON file. + 3) Resolve rule pairs based on the requested mode. + 4) Create output/log directories as needed. + 5) Read input CSV, apply harmonization with row-based progress callbacks. + 6) Write output CSV and finalize job state. + + On failure, sets job status to "failed" and records a structured error. + """ with _jobs_lock: job = _jobs[job_id] job.status = "running" @@ -186,7 +287,7 @@ def progress_callback(processed: int, total: int) -> None: with _jobs_lock: job.status = "failed" job.error = { - "code": "HARMONIZATION_FAILED", + "code": ErrorCode.HARMONIZATION_FAILED.value, "message": str(exc), } return @@ -202,11 +303,41 @@ def progress_callback(processed: int, total: int) -> None: @router.post("") def rpc_call(request: RpcRequest) -> RpcResponse: - if request.method == "harmonize": + """ + Dispatch RPC methods and return a standardized response envelope. + + Supported methods (snake_case; camelCase aliases accepted): + - harmonize: + params: + data_path: absolute path to input CSV + rules_path: absolute path to RuleRegistry JSON + output_path: absolute path to write harmonized CSV + replay_log_path: absolute path to write replay log + mode: "pairs" | "all" + pairs: optional list of {source, target} when mode="pairs" + overwrite: boolean (default false) + + response: + status: "accepted" + job_id: string + + - get_job: + params: + job_id: string + + response: + status: "success" + result: { + job_id, status, progress, output_path, replay_log_path, result, error + } + """ + method = _normalize_method(request.method) + + if method == "harmonize": try: params = HarmonizeParams(**request.params) except Exception as exc: - return _error("VALIDATION_ERROR", str(exc)) + return _error(ErrorCode.VALIDATION_ERROR, str(exc), details={"params": request.params}) job_id = str(uuid.uuid4()) job = JobInfo( @@ -223,14 +354,22 @@ def rpc_call(request: RpcRequest) -> RpcResponse: thread.start() return RpcResponse(status="accepted", job_id=job_id) - if request.method == "getJob": + if method == "get_job": job_id = request.params.get("job_id") if not job_id: - return _error("MISSING_FIELD", "job_id is required") + return _error( + ErrorCode.MISSING_FIELD, + "job_id is required", + details={"field": "job_id"}, + ) with _jobs_lock: job = _jobs.get(job_id) if not job: - return _error("NOT_FOUND", f"Job not found: {job_id}") + return _error( + ErrorCode.JOB_NOT_FOUND, + f"Job not found: {job_id}", + details={"job_id": job_id}, + ) return RpcResponse( status="success", result={ @@ -244,4 +383,4 @@ def rpc_call(request: RpcRequest) -> RpcResponse: }, ) - return _error("METHOD_NOT_FOUND", f"Unknown method: {request.method}") + return _error(ErrorCode.METHOD_NOT_FOUND, f"Unknown method: {request.method}") diff --git a/tests/test_rpc_paths.py b/tests/test_rpc_paths.py new file mode 100644 index 0000000..db2273a --- /dev/null +++ b/tests/test_rpc_paths.py @@ -0,0 +1,79 @@ +import os + +from harmonization_framework.api.routes.rpc import HarmonizeParams, _validate_paths + + +def test_validate_paths_rejects_relative_paths(tmp_path): + params = HarmonizeParams( + data_path="relative.csv", + rules_path=str(tmp_path / "rules.json"), + output_path=str(tmp_path / "out.csv"), + replay_log_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is not None + assert response.error.code == "INVALID_PATH" + assert response.error.details["path"] == "relative.csv" + assert response.error.details["path_type"] == "data_path" + + +def test_validate_paths_missing_inputs(tmp_path): + data_path = tmp_path / "input.csv" + rules_path = tmp_path / "rules.json" + + params = HarmonizeParams( + data_path=str(data_path), + rules_path=str(rules_path), + output_path=str(tmp_path / "out.csv"), + replay_log_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is not None + assert response.error.code == "FILE_NOT_FOUND" + assert response.error.details["path_type"] == "data_path" + + +def test_validate_paths_rejects_existing_output_without_overwrite(tmp_path): + data_path = tmp_path / "input.csv" + rules_path = tmp_path / "rules.json" + output_path = tmp_path / "out.csv" + + data_path.write_text("a,b\n1,2\n") + rules_path.write_text("{}") + output_path.write_text("already") + + params = HarmonizeParams( + data_path=str(data_path), + rules_path=str(rules_path), + output_path=str(output_path), + replay_log_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is not None + assert response.error.code == "ALREADY_EXISTS" + assert response.error.details["path"] == str(output_path) + + +def test_validate_paths_accepts_valid_paths(tmp_path): + data_path = tmp_path / "input.csv" + rules_path = tmp_path / "rules.json" + + data_path.write_text("a,b\n1,2\n") + rules_path.write_text("{}") + + params = HarmonizeParams( + data_path=str(data_path), + rules_path=str(rules_path), + output_path=str(tmp_path / "out.csv"), + replay_log_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is None From bd5b6968d3c1c3c4c40a3c5d5e3217b418ea157a Mon Sep 17 00:00:00 2001 From: Matthew Horridge Date: Fri, 23 Jan 2026 20:09:35 -0800 Subject: [PATCH 3/4] Refactor RPC implementation and documentation - Split RPC routing, models, errors, jobs, and handlers into dedicated modules - Add in-memory job registry helpers and harmonize worker handlers - Normalize method aliases and document error-code strategy with details - Rename request fields to *_file_path and update examples/tests - Add path validation tests and improve RPC docstrings --- .../api/harmonization_api_examples.md | 10 +- src/harmonization_framework/api/routes/rpc.py | 331 +----------------- src/harmonization_framework/api/rpc_errors.py | 50 +++ .../api/rpc_handlers.py | 242 +++++++++++++ src/harmonization_framework/api/rpc_jobs.py | 75 ++++ src/harmonization_framework/api/rpc_models.py | 56 +++ tests/test_rpc_paths.py | 37 +- 7 files changed, 458 insertions(+), 343 deletions(-) create mode 100644 src/harmonization_framework/api/rpc_errors.py create mode 100644 src/harmonization_framework/api/rpc_handlers.py create mode 100644 src/harmonization_framework/api/rpc_jobs.py create mode 100644 src/harmonization_framework/api/rpc_models.py diff --git a/src/harmonization_framework/api/harmonization_api_examples.md b/src/harmonization_framework/api/harmonization_api_examples.md index 5bfdc1e..40c70a1 100644 --- a/src/harmonization_framework/api/harmonization_api_examples.md +++ b/src/harmonization_framework/api/harmonization_api_examples.md @@ -20,10 +20,10 @@ curl -X POST http://localhost:8000/api \ -d '{ "method": "harmonize", "params": { - "data_path": "/absolute/path/to/input.csv", - "rules_path": "/absolute/path/to/rules.json", - "output_path": "/absolute/path/to/output.csv", - "replay_log_path": "/absolute/path/to/replay.log", + "data_file_path": "/absolute/path/to/input.csv", + "rules_file_path": "/absolute/path/to/rules.json", + "replay_log_file_path": "/absolute/path/to/replay.log", + "output_file_path": "/absolute/path/to/output.csv", "mode": "pairs", "pairs": [ {"source": "col_a", "target": "col_b"} @@ -57,7 +57,7 @@ Errors follow a stable schema: "code": "FILE_NOT_FOUND", "message": "Rules file not found: /abs/path/to/rules.json", "details": { - "path_type": "rules_path", + "path_type": "rules_file_path", "path": "/abs/path/to/rules.json" } } diff --git a/src/harmonization_framework/api/routes/rpc.py b/src/harmonization_framework/api/routes/rpc.py index e903226..720f4a3 100644 --- a/src/harmonization_framework/api/routes/rpc.py +++ b/src/harmonization_framework/api/routes/rpc.py @@ -1,22 +1,11 @@ -import os -import threading -import uuid -from dataclasses import dataclass -from enum import Enum -from typing import Dict, List, Literal, Optional, Tuple - -import pandas as pd from fastapi import APIRouter -from pydantic import BaseModel - -from harmonization_framework.harmonize import harmonize_dataset -from harmonization_framework.replay_log import replay_logger as rlog -from harmonization_framework.rule_registry import RuleRegistry +from harmonization_framework.api.rpc_errors import ErrorCode, build_error +from harmonization_framework.api.rpc_handlers import handle_get_job, handle_harmonize +from harmonization_framework.api.rpc_models import RpcRequest, RpcResponse router = APIRouter() - """ RPC API router. @@ -31,85 +20,10 @@ - Error codes are intentionally small and stable (e.g., MISSING_FIELD, INVALID_PATH, FILE_NOT_FOUND, INVALID_FORMAT, VALIDATION_ERROR, RULE_NOT_FOUND, JOB_NOT_FOUND). - Callers should use the `details` object to identify which field/path caused the error. - Example: {"field": "rules_path"} or {"path_type": "rules_path", "path": "/abs/..."}. + Example: {"field": "rules_file_path"} or {"path_type": "rules_file_path", "path": "/abs/..."}. """ -class Pair(BaseModel): - """Source/target column pair.""" - source: str - target: str - - -class HarmonizeParams(BaseModel): - """Parameters for the harmonize RPC call.""" - data_path: str - rules_path: str - output_path: str - replay_log_path: str - mode: Literal["pairs", "all"] - pairs: Optional[List[Pair]] = None - overwrite: bool = False - - -class RpcRequest(BaseModel): - """RPC request envelope.""" - method: str - params: Dict - - -class ErrorDetail(BaseModel): - """Structured error payload for RPC responses.""" - code: str - message: str - details: Optional[Dict] = None - - -class RpcResponse(BaseModel): - """RPC response envelope.""" - status: str - result: Optional[Dict] = None - error: Optional[ErrorDetail] = None - job_id: Optional[str] = None - - -class ErrorCode(str, Enum): - INVALID_PATH = "INVALID_PATH" - FILE_NOT_FOUND = "FILE_NOT_FOUND" - ALREADY_EXISTS = "ALREADY_EXISTS" - MISSING_FIELD = "MISSING_FIELD" - VALIDATION_ERROR = "VALIDATION_ERROR" - RULE_NOT_FOUND = "RULE_NOT_FOUND" - JOB_NOT_FOUND = "JOB_NOT_FOUND" - INVALID_FORMAT = "INVALID_FORMAT" - HARMONIZATION_FAILED = "HARMONIZATION_FAILED" - METHOD_NOT_FOUND = "METHOD_NOT_FOUND" - - -@dataclass -class JobInfo: - """In-memory job state for async operations.""" - job_id: str - status: str - progress: float - output_path: str - replay_log_path: str - error: Optional[Dict] = None - result: Optional[Dict] = None - - -_jobs: Dict[str, JobInfo] = {} -_jobs_lock = threading.Lock() - - -def _error(code: ErrorCode, message: str, details: Optional[Dict] = None) -> RpcResponse: - """Helper to build a standardized error response.""" - return RpcResponse( - status="error", - error=ErrorDetail(code=code.value, message=message, details=details), - ) - - def _normalize_method(method: str) -> str: """ Normalize method names to snake_case and support legacy/camelCase aliases. @@ -122,185 +36,6 @@ def _normalize_method(method: str) -> str: return aliases.get(method, method) -def _validate_paths(params: HarmonizeParams) -> Optional[RpcResponse]: - """ - Validate input/output/replay paths and overwrite behavior. - - Rules: - - All paths must be absolute. - - data_path and rules_path must exist. - - output_path must not exist unless overwrite=True. - - replay_log_path may be a new file and will be created if needed. - """ - for path_name, path_value in [ - ("data_path", params.data_path), - ("rules_path", params.rules_path), - ("output_path", params.output_path), - ("replay_log_path", params.replay_log_path), - ]: - if not os.path.isabs(path_value): - return _error( - ErrorCode.INVALID_PATH, - f"{path_name} must be an absolute path", - details={"path": path_value, "path_type": path_name}, - ) - - if not os.path.exists(params.data_path): - return _error( - ErrorCode.FILE_NOT_FOUND, - f"Data file not found: {params.data_path}", - details={"path": params.data_path, "path_type": "data_path"}, - ) - if not os.path.exists(params.rules_path): - return _error( - ErrorCode.FILE_NOT_FOUND, - f"Rules file not found: {params.rules_path}", - details={"path": params.rules_path, "path_type": "rules_path"}, - ) - - if os.path.exists(params.output_path) and not params.overwrite: - return _error( - ErrorCode.ALREADY_EXISTS, - f"Output path already exists: {params.output_path}", - details={"path": params.output_path, "path_type": "output_path"}, - ) - - return None - - -def _load_rules(params: HarmonizeParams) -> Tuple[Optional[RuleRegistry], Optional[RpcResponse]]: - """Load a RuleRegistry from a rules JSON file.""" - registry = RuleRegistry() - try: - registry.load(params.rules_path, clean=True) - except Exception as exc: - return None, _error(ErrorCode.INVALID_FORMAT, f"Failed to load rules: {exc}") - return registry, None - - -def _resolve_pairs( - params: HarmonizeParams, registry: RuleRegistry -) -> Tuple[Optional[List[Tuple[str, str]]], Optional[RpcResponse]]: - """Resolve requested (source, target) pairs based on mode and availability.""" - if params.mode == "all": - pairs = registry.list_pairs() - if not pairs: - return None, _error( - ErrorCode.RULE_NOT_FOUND, - "No rules found in rules file", - details={"path": params.rules_path}, - ) - return pairs, None - - if not params.pairs: - return None, _error( - ErrorCode.MISSING_FIELD, - "pairs is required when mode is 'pairs'", - details={"field": "pairs"}, - ) - - pairs = [(pair.source, pair.target) for pair in params.pairs] - for source, target in pairs: - try: - registry.query(source, target) - except Exception: - return None, _error( - ErrorCode.RULE_NOT_FOUND, - f"Rule not found for source={source} target={target}", - details={"source": source, "target": target}, - ) - return pairs, None - - -def _update_progress(job_id: str, processed: int, total: int) -> None: - """Update job progress in the in-memory registry.""" - with _jobs_lock: - job = _jobs.get(job_id) - if not job: - return - if total == 0: - job.progress = 1.0 - else: - job.progress = min(1.0, processed / total) - - -def _run_harmonize(job_id: str, params: HarmonizeParams) -> None: - """ - Worker that performs harmonization and updates job state. - - Workflow: - 1) Validate paths and overwrite behavior. - 2) Load rules from the registry JSON file. - 3) Resolve rule pairs based on the requested mode. - 4) Create output/log directories as needed. - 5) Read input CSV, apply harmonization with row-based progress callbacks. - 6) Write output CSV and finalize job state. - - On failure, sets job status to "failed" and records a structured error. - """ - with _jobs_lock: - job = _jobs[job_id] - job.status = "running" - job.progress = 0.0 - - validation_error = _validate_paths(params) - if validation_error: - with _jobs_lock: - job.status = "failed" - job.error = validation_error.error.model_dump() - return - - registry, error = _load_rules(params) - if error: - with _jobs_lock: - job.status = "failed" - job.error = error.error.model_dump() - return - - pairs, error = _resolve_pairs(params, registry) - if error: - with _jobs_lock: - job.status = "failed" - job.error = error.error.model_dump() - return - - os.makedirs(os.path.dirname(params.output_path), exist_ok=True) - os.makedirs(os.path.dirname(params.replay_log_path), exist_ok=True) - - dataset = pd.read_csv(params.data_path) - logger = rlog.configure_logger(3, params.replay_log_path) - - def progress_callback(processed: int, total: int) -> None: - _update_progress(job_id, processed, total) - - try: - harmonized = harmonize_dataset( - dataset=dataset, - harmonization_pairs=pairs, - rules=registry, - dataset_name=os.path.basename(params.data_path), - logger=logger, - progress_callback=progress_callback, - ) - harmonized.to_csv(params.output_path, index=False) - except Exception as exc: - with _jobs_lock: - job.status = "failed" - job.error = { - "code": ErrorCode.HARMONIZATION_FAILED.value, - "message": str(exc), - } - return - - with _jobs_lock: - job.status = "completed" - job.progress = 1.0 - job.result = { - "output_path": params.output_path, - "replay_log_path": params.replay_log_path, - } - - @router.post("") def rpc_call(request: RpcRequest) -> RpcResponse: """ @@ -309,10 +44,10 @@ def rpc_call(request: RpcRequest) -> RpcResponse: Supported methods (snake_case; camelCase aliases accepted): - harmonize: params: - data_path: absolute path to input CSV - rules_path: absolute path to RuleRegistry JSON - output_path: absolute path to write harmonized CSV - replay_log_path: absolute path to write replay log + data_file_path: absolute path to input CSV + rules_file_path: absolute path to RuleRegistry JSON + replay_log_file_path: absolute path to write replay log + output_file_path: absolute path to write harmonized CSV mode: "pairs" | "all" pairs: optional list of {source, target} when mode="pairs" overwrite: boolean (default false) @@ -334,53 +69,9 @@ def rpc_call(request: RpcRequest) -> RpcResponse: method = _normalize_method(request.method) if method == "harmonize": - try: - params = HarmonizeParams(**request.params) - except Exception as exc: - return _error(ErrorCode.VALIDATION_ERROR, str(exc), details={"params": request.params}) - - job_id = str(uuid.uuid4()) - job = JobInfo( - job_id=job_id, - status="queued", - progress=0.0, - output_path=params.output_path, - replay_log_path=params.replay_log_path, - ) - with _jobs_lock: - _jobs[job_id] = job - - thread = threading.Thread(target=_run_harmonize, args=(job_id, params), daemon=True) - thread.start() - return RpcResponse(status="accepted", job_id=job_id) + return handle_harmonize(request) if method == "get_job": - job_id = request.params.get("job_id") - if not job_id: - return _error( - ErrorCode.MISSING_FIELD, - "job_id is required", - details={"field": "job_id"}, - ) - with _jobs_lock: - job = _jobs.get(job_id) - if not job: - return _error( - ErrorCode.JOB_NOT_FOUND, - f"Job not found: {job_id}", - details={"job_id": job_id}, - ) - return RpcResponse( - status="success", - result={ - "job_id": job.job_id, - "status": job.status, - "progress": job.progress, - "output_path": job.output_path, - "replay_log_path": job.replay_log_path, - "result": job.result, - "error": job.error, - }, - ) + return handle_get_job(request) - return _error(ErrorCode.METHOD_NOT_FOUND, f"Unknown method: {request.method}") + return build_error(ErrorCode.METHOD_NOT_FOUND, f"Unknown method: {request.method}") diff --git a/src/harmonization_framework/api/rpc_errors.py b/src/harmonization_framework/api/rpc_errors.py new file mode 100644 index 0000000..41eb4c9 --- /dev/null +++ b/src/harmonization_framework/api/rpc_errors.py @@ -0,0 +1,50 @@ +from enum import Enum +from typing import Dict, Optional + +from pydantic import BaseModel + + +class ErrorDetail(BaseModel): + """Structured error payload for RPC responses.""" + code: str + message: str + details: Optional[Dict] = None + + +class ErrorCode(str, Enum): + """ + Stable error codes used across RPC responses. + + These codes are intentionally small in number; use `error.details` to identify + the field/path/context that caused the failure. + """ + # A required path is not absolute or is malformed. + INVALID_PATH = "INVALID_PATH" + # An expected file does not exist (data_path or rules_path). + FILE_NOT_FOUND = "FILE_NOT_FOUND" + # Output path exists and overwrite was not allowed. + ALREADY_EXISTS = "ALREADY_EXISTS" + # A required field is missing from the request params. + MISSING_FIELD = "MISSING_FIELD" + # Request params failed model validation. + VALIDATION_ERROR = "VALIDATION_ERROR" + # Requested rule pairs cannot be found in the registry. + RULE_NOT_FOUND = "RULE_NOT_FOUND" + # Job id does not correspond to a known job. + JOB_NOT_FOUND = "JOB_NOT_FOUND" + # Rules file could not be parsed or is invalid. + INVALID_FORMAT = "INVALID_FORMAT" + # Harmonization failed during execution. + HARMONIZATION_FAILED = "HARMONIZATION_FAILED" + # Method name is unknown. + METHOD_NOT_FOUND = "METHOD_NOT_FOUND" + + +def build_error(code: ErrorCode, message: str, details: Optional[Dict] = None) -> "RpcResponse": + """Helper to build a standardized error response.""" + from harmonization_framework.api.rpc_models import RpcResponse + + return RpcResponse( + status="error", + error=ErrorDetail(code=code.value, message=message, details=details), + ) diff --git a/src/harmonization_framework/api/rpc_handlers.py b/src/harmonization_framework/api/rpc_handlers.py new file mode 100644 index 0000000..f705c3a --- /dev/null +++ b/src/harmonization_framework/api/rpc_handlers.py @@ -0,0 +1,242 @@ +import os +import threading +import uuid +from typing import Dict, List, Optional, Tuple + +import pandas as pd + +from harmonization_framework.harmonize import harmonize_dataset +from harmonization_framework.replay_log import replay_logger as rlog +from harmonization_framework.rule_registry import RuleRegistry +from harmonization_framework.api.rpc_errors import ErrorCode, build_error +from harmonization_framework.api.rpc_jobs import ( + JobId, + JobInfo, + get_job, + register_job, + update_job_status, + update_progress, +) +from harmonization_framework.api.rpc_models import HarmonizeParams, RpcRequest, RpcResponse + + +def _validate_paths(params: HarmonizeParams) -> Optional[RpcResponse]: + """ + Validate input/output/replay paths and overwrite behavior. + + Rules: + - All paths must be absolute. + - data_file_path and rules_file_path must exist. + - output_file_path must not exist unless overwrite=True. + - replay_log_file_path may be a new file and will be created if needed. + """ + for path_name, path_value in [ + ("data_file_path", params.data_file_path), + ("rules_file_path", params.rules_file_path), + ("output_file_path", params.output_file_path), + ("replay_log_file_path", params.replay_log_file_path), + ]: + if not os.path.isabs(path_value): + return build_error( + ErrorCode.INVALID_PATH, + f"{path_name} must be an absolute path", + details={"path": path_value, "path_type": path_name}, + ) + + if not os.path.exists(params.data_file_path): + return build_error( + ErrorCode.FILE_NOT_FOUND, + f"Data file not found: {params.data_file_path}", + details={"path": params.data_file_path, "path_type": "data_path"}, + ) + if not os.path.exists(params.rules_file_path): + return build_error( + ErrorCode.FILE_NOT_FOUND, + f"Rules file not found: {params.rules_file_path}", + details={"path": params.rules_file_path, "path_type": "rules_path"}, + ) + + if os.path.exists(params.output_file_path) and not params.overwrite: + return build_error( + ErrorCode.ALREADY_EXISTS, + f"Output path already exists: {params.output_file_path}", + details={"path": params.output_file_path, "path_type": "output_path"}, + ) + + return None + + +def _load_rules(params: HarmonizeParams) -> Tuple[Optional[RuleRegistry], Optional[RpcResponse]]: + """Load a RuleRegistry from a rules JSON file.""" + registry = RuleRegistry() + try: + registry.load(params.rules_file_path, clean=True) + except Exception as exc: + return None, build_error(ErrorCode.INVALID_FORMAT, f"Failed to load rules: {exc}") + return registry, None + + +def _resolve_harmonization_pairs( + params: HarmonizeParams, registry: RuleRegistry +) -> Tuple[Optional[List[Tuple[str, str]]], Optional[RpcResponse]]: + """ + Resolve requested (source, target) pairs based on mode and availability. + + - mode="all": return all pairs from the rules registry (error if empty). + - mode="pairs": validate that requested pairs exist in the registry. + + Returns (pairs, error). On error, pairs is None and error is a RpcResponse. + """ + if params.mode == "all": + pairs = registry.list_pairs() + if not pairs: + return None, build_error( + ErrorCode.RULE_NOT_FOUND, + "No rules found in rules file", + details={"path": params.rules_file_path}, + ) + return pairs, None + + if not params.harmonization_pairs: + return None, build_error( + ErrorCode.MISSING_FIELD, + "pairs is required when mode is 'pairs'", + details={"field": "pairs"}, + ) + + pairs = [(pair.source, pair.target) for pair in params.harmonization_pairs] + for source, target in pairs: + try: + registry.query(source, target) + except Exception: + return None, build_error( + ErrorCode.RULE_NOT_FOUND, + f"Rule not found for source={source} target={target}", + details={"source": source, "target": target}, + ) + return pairs, None + + +def _run_harmonize(job_id: JobId, params: HarmonizeParams) -> None: + """ + Worker that performs harmonization and updates job state. + + Workflow: + 1) Validate paths and overwrite behavior. + 2) Load rules from the registry JSON file. + 3) Resolve rule pairs based on the requested mode. + 4) Create output/log directories as needed. + 5) Read input CSV, apply harmonization with row-based progress callbacks. + 6) Write output CSV and finalize job state. + + On failure, sets job status to "failed" and records a structured error. + """ + update_job_status(job_id, status="running", progress=0.0) + + validation_error = _validate_paths(params) + if validation_error: + update_job_status(job_id, status="failed", error=validation_error.error.model_dump()) + return + + registry, error = _load_rules(params) + if error: + update_job_status(job_id, status="failed", error=error.error.model_dump()) + return + + pairs, error = _resolve_harmonization_pairs(params, registry) + if error: + update_job_status(job_id, status="failed", error=error.error.model_dump()) + return + + os.makedirs(os.path.dirname(params.output_file_path), exist_ok=True) + os.makedirs(os.path.dirname(params.replay_log_file_path), exist_ok=True) + + dataset = pd.read_csv(params.data_file_path) + logger = rlog.configure_logger(3, params.replay_log_file_path) + + def progress_callback(processed: int, total: int) -> None: + update_progress(job_id, processed, total) + + try: + harmonized = harmonize_dataset( + dataset=dataset, + harmonization_pairs=pairs, + rules=registry, + dataset_name=os.path.basename(params.data_file_path), + logger=logger, + progress_callback=progress_callback, + ) + harmonized.to_csv(params.output_file_path, index=False) + except Exception as exc: + update_job_status( + job_id, + status="failed", + error={ + "code": ErrorCode.HARMONIZATION_FAILED.value, + "message": str(exc), + }, + ) + return + + update_job_status( + job_id, + status="completed", + progress=1.0, + result={ + "output_path": params.output_file_path, + "replay_log_path": params.replay_log_file_path, + }, + ) + + +def handle_harmonize(request: RpcRequest) -> RpcResponse: + """Handle the harmonize RPC method.""" + try: + params = HarmonizeParams(**request.params) + except Exception as exc: + return build_error(ErrorCode.VALIDATION_ERROR, str(exc), details={"params": request.params}) + + job_id = JobId(str(uuid.uuid4())) + job = JobInfo( + job_id=job_id, + status="queued", + progress=0.0, + output_path=params.output_file_path, + replay_log_path=params.replay_log_file_path, + ) + register_job(job) + + thread = threading.Thread(target=_run_harmonize, args=(job_id, params), daemon=True) + thread.start() + return RpcResponse(status="accepted", job_id=job_id) + + +def handle_get_job(request: RpcRequest) -> RpcResponse: + """Handle the get_job RPC method.""" + job_id_value = request.params.get("job_id") + if not job_id_value: + return build_error( + ErrorCode.MISSING_FIELD, + "job_id is required", + details={"field": "job_id"}, + ) + job_id = JobId(job_id_value) + job = get_job(job_id) + if not job: + return build_error( + ErrorCode.JOB_NOT_FOUND, + f"Job not found: {job_id}", + details={"job_id": job_id}, + ) + return RpcResponse( + status="success", + result={ + "job_id": job.job_id, + "status": job.status, + "progress": job.progress, + "output_path": job.output_path, + "replay_log_path": job.replay_log_path, + "result": job.result, + "error": job.error, + }, + ) diff --git a/src/harmonization_framework/api/rpc_jobs.py b/src/harmonization_framework/api/rpc_jobs.py new file mode 100644 index 0000000..73c6207 --- /dev/null +++ b/src/harmonization_framework/api/rpc_jobs.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass +from typing import Dict, Optional, NewType +import threading + + +JobId = NewType("JobId", str) + + +@dataclass +class JobInfo: + """ + In-memory job state for async operations. + + Fields: + job_id: Unique identifier for the job. + status: One of queued|running|completed|failed. + progress: Float in [0.0, 1.0] representing completion. + output_path: Target CSV path for the harmonized output. + replay_log_path: Path where the replay log is written. + error: Optional structured error payload (matches ErrorDetail schema). + result: Optional result payload (e.g., output/replay paths). + """ + job_id: JobId + status: str + progress: float + output_path: str + replay_log_path: str + error: Optional[Dict] = None + result: Optional[Dict] = None + + +# In-memory job registry guarded by a lock for thread-safe updates. +_jobs: Dict[JobId, JobInfo] = {} +_jobs_lock = threading.Lock() + + +def register_job(job: JobInfo) -> None: + with _jobs_lock: + _jobs[job.job_id] = job + + +def get_job(job_id: JobId) -> Optional[JobInfo]: + with _jobs_lock: + return _jobs.get(job_id) + + +def update_progress(job_id: JobId, processed: int, total: int) -> None: + with _jobs_lock: + job = _jobs.get(job_id) + if not job: + return + if total == 0: + job.progress = 1.0 + else: + job.progress = min(1.0, processed / total) + + +def update_job_status( + job_id: JobId, + status: str, + progress: Optional[float] = None, + error: Optional[Dict] = None, + result: Optional[Dict] = None, +) -> None: + with _jobs_lock: + job = _jobs.get(job_id) + if not job: + return + job.status = status + if progress is not None: + job.progress = progress + if error is not None: + job.error = error + if result is not None: + job.result = result diff --git a/src/harmonization_framework/api/rpc_models.py b/src/harmonization_framework/api/rpc_models.py new file mode 100644 index 0000000..548c2ac --- /dev/null +++ b/src/harmonization_framework/api/rpc_models.py @@ -0,0 +1,56 @@ +from typing import Dict, List, Literal, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class HarmonizationPair(BaseModel): + """Source/target column pair.""" + source: str = Field(..., description="Source column name") + target: str = Field(..., description="Target column name") + + +class HarmonizeParams(BaseModel): + """ + Parameters for the harmonize RPC call. + + Required: + data_file_path: absolute path to the input CSV file. + rules_file_path: absolute path to a RuleRegistry JSON file. + replay_log_file_path: absolute path for the replay log output. + output_file_path: absolute path for the harmonized CSV output. + mode: "pairs" to apply a specific subset of rules, or "all" to apply all rules. + + Optional: + harmonization_pairs: list of source/target column mappings (required when mode="pairs"). + overwrite: when True, allows output_path to be overwritten if it already exists. + """ + data_file_path: str + rules_file_path: str + replay_log_file_path: str + output_file_path: str + mode: Literal["pairs", "all"] + harmonization_pairs: Optional[List[HarmonizationPair]] = Field( + None, + alias="pairs", + description="List of (source, target) column mappings when mode='pairs'.", + ) + overwrite: bool = False + + model_config = ConfigDict(populate_by_name=True) + + +class RpcRequest(BaseModel): + """RPC request envelope with method name and parameters payload.""" + method: str + params: Dict + + +class RpcResponse(BaseModel): + """RPC response envelope.""" + status: str + result: Optional[Dict] = None + error: Optional["ErrorDetail"] = None + job_id: Optional[str] = None + + +from harmonization_framework.api.rpc_errors import ErrorDetail # noqa: E402 diff --git a/tests/test_rpc_paths.py b/tests/test_rpc_paths.py index db2273a..c9a1ace 100644 --- a/tests/test_rpc_paths.py +++ b/tests/test_rpc_paths.py @@ -1,14 +1,15 @@ import os -from harmonization_framework.api.routes.rpc import HarmonizeParams, _validate_paths +from harmonization_framework.api.rpc_models import HarmonizeParams +from harmonization_framework.api.rpc_handlers import _validate_paths def test_validate_paths_rejects_relative_paths(tmp_path): params = HarmonizeParams( - data_path="relative.csv", - rules_path=str(tmp_path / "rules.json"), - output_path=str(tmp_path / "out.csv"), - replay_log_path=str(tmp_path / "replay.log"), + data_file_path="relative.csv", + rules_file_path=str(tmp_path / "rules.json"), + output_file_path=str(tmp_path / "out.csv"), + replay_log_file_path=str(tmp_path / "replay.log"), mode="all", overwrite=False, ) @@ -16,7 +17,7 @@ def test_validate_paths_rejects_relative_paths(tmp_path): assert response is not None assert response.error.code == "INVALID_PATH" assert response.error.details["path"] == "relative.csv" - assert response.error.details["path_type"] == "data_path" + assert response.error.details["path_type"] == "data_file_path" def test_validate_paths_missing_inputs(tmp_path): @@ -24,10 +25,10 @@ def test_validate_paths_missing_inputs(tmp_path): rules_path = tmp_path / "rules.json" params = HarmonizeParams( - data_path=str(data_path), - rules_path=str(rules_path), - output_path=str(tmp_path / "out.csv"), - replay_log_path=str(tmp_path / "replay.log"), + data_file_path=str(data_path), + rules_file_path=str(rules_path), + output_file_path=str(tmp_path / "out.csv"), + replay_log_file_path=str(tmp_path / "replay.log"), mode="all", overwrite=False, ) @@ -47,10 +48,10 @@ def test_validate_paths_rejects_existing_output_without_overwrite(tmp_path): output_path.write_text("already") params = HarmonizeParams( - data_path=str(data_path), - rules_path=str(rules_path), - output_path=str(output_path), - replay_log_path=str(tmp_path / "replay.log"), + data_file_path=str(data_path), + rules_file_path=str(rules_path), + output_file_path=str(output_path), + replay_log_file_path=str(tmp_path / "replay.log"), mode="all", overwrite=False, ) @@ -68,10 +69,10 @@ def test_validate_paths_accepts_valid_paths(tmp_path): rules_path.write_text("{}") params = HarmonizeParams( - data_path=str(data_path), - rules_path=str(rules_path), - output_path=str(tmp_path / "out.csv"), - replay_log_path=str(tmp_path / "replay.log"), + data_file_path=str(data_path), + rules_file_path=str(rules_path), + output_file_path=str(tmp_path / "out.csv"), + replay_log_file_path=str(tmp_path / "replay.log"), mode="all", overwrite=False, ) From cca8766a440d7bdf3f02242c2a6a1c4ed195841f Mon Sep 17 00:00:00 2001 From: Matthew Horridge Date: Fri, 23 Jan 2026 20:30:42 -0800 Subject: [PATCH 4/4] Add detailed RPC API README - Document request/response schema and error codes - Detail harmonize/get_job parameters and call flow - Add example responses (accepted, running, completed) --- src/harmonization_framework/api/README.md | 294 ++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 src/harmonization_framework/api/README.md diff --git a/src/harmonization_framework/api/README.md b/src/harmonization_framework/api/README.md new file mode 100644 index 0000000..7b6e405 --- /dev/null +++ b/src/harmonization_framework/api/README.md @@ -0,0 +1,294 @@ +# Harmonization Framework RPC API + +This document describes the RPC-style API used by the Electron app. It explains +the request/response schema, method semantics, and expected call order. + +## Base URL + +``` +http://localhost:8000 +``` + +## Endpoints + +### `GET /health/` + +Health check endpoint. + +**Response** +```json +{ + "status": "ok", + "message": "API is available" +} +``` + +--- + +### `POST /api` + +Single RPC endpoint. Each request specifies a `method` and a `params` object. + +**Request envelope** +```json +{ + "method": "method_name", + "params": { } +} +``` + +**Response envelope (success)** +```json +{ + "status": "success", + "result": { } +} +``` + +**Response envelope (accepted / async)** +```json +{ + "status": "accepted", + "job_id": "job-uuid" +} +``` + +**Response envelope (error)** +```json +{ + "status": "error", + "error": { + "code": "ERROR_CODE", + "message": "Human readable error message", + "details": { } + } +} +``` + +Method names use **snake_case**. The server also accepts camelCase aliases +(e.g., `getJob`) because this is often more convenient for JavaScript clients. + +## Error Codes + +Error codes are intentionally small and stable. Use `error.details` to identify +the specific field/path/context. + +| Code | Meaning | Common details | +|------|---------|----------------| +| `INVALID_PATH` | A required path is not absolute or malformed | `{ "path_type": "...", "path": "..." }` | +| `FILE_NOT_FOUND` | Data or rules file does not exist | `{ "path_type": "...", "path": "..." }` | +| `ALREADY_EXISTS` | Output path exists and overwrite is false | `{ "path_type": "output_file_path", "path": "..." }` | +| `MISSING_FIELD` | Required request field missing | `{ "field": "..." }` | +| `VALIDATION_ERROR` | Request params failed validation | `{ "params": { ... } }` | +| `RULE_NOT_FOUND` | Requested rule pair not found or rules file empty | `{ "source": "...", "target": "..." }` or `{ "path": "..." }` | +| `JOB_NOT_FOUND` | Job ID not found | `{ "job_id": "..." }` | +| `INVALID_FORMAT` | Rules file is invalid JSON/format | `{}` | +| `HARMONIZATION_FAILED` | Harmonization failed during execution | `{}` | +| `METHOD_NOT_FOUND` | Unknown RPC method | `{}` | + +## Methods + +### 1) `harmonize` + +Asynchronously harmonize a CSV dataset using rules from a RuleRegistry JSON file. + +**Request** +```json +{ + "method": "harmonize", + "params": { + "data_file_path": "/abs/input.csv", + "rules_file_path": "/abs/rules.json", + "replay_log_file_path": "/abs/replay.log", + "output_file_path": "/abs/output.csv", + "mode": "pairs", + "pairs": [ + { "source": "col_a", "target": "col_b" } + ], + "overwrite": false + } +} +``` + +**Parameters** +- `data_file_path` (string, required): Absolute path to input CSV. +- `rules_file_path` (string, required): Absolute path to RuleRegistry JSON file + produced by `RuleRegistry.save()`. +- `replay_log_file_path` (string, required): Absolute path for replay log output. +- `output_file_path` (string, required): Absolute path for harmonized CSV output. +- `mode` (string, required): `"pairs"` or `"all"`. + - `"pairs"`: apply only specified pairs. + - `"all"`: apply every rule in the registry file. +- `pairs` (array, required when `mode="pairs"`): List of `{source, target}` mappings. +- `overwrite` (boolean, optional, default `false`): Whether to overwrite existing output. + +**Response** +```json +{ + "status": "accepted", + "job_id": "job-uuid" +} +``` + +**Possible errors** + +- `INVALID_PATH` — one or more provided paths are not absolute. + - `details.path_type`: `data_file_path` | `rules_file_path` | `output_file_path` | `replay_log_file_path` + - `details.path`: the invalid path value +- `FILE_NOT_FOUND` — input file or rules file does not exist. + - `details.path_type`: `data_file_path` or `rules_file_path` + - `details.path`: missing file path +- `ALREADY_EXISTS` — output file exists and `overwrite=false`. + - `details.path_type`: `output_file_path` + - `details.path`: existing output path +- `MISSING_FIELD` — required field missing (e.g., `pairs` when `mode="pairs"`). + - `details.field`: missing field name +- `RULE_NOT_FOUND` — requested pairs not found or no rules in file. + - `details.source` / `details.target` for missing pairs, or `details.path` for empty rules file +- `INVALID_FORMAT` — rules file could not be parsed. + - `details` may be empty +- `VALIDATION_ERROR` — request params failed validation. + - `details.params`: original params object + +**Behavior** +- Paths must be absolute. +- `data_file_path` and `rules_file_path` must exist. +- `output_file_path` must not exist unless `overwrite=true`. +- `replay_log_file_path` will be created if needed. +- Work runs asynchronously; track with `get_job`. + +--- + +### 2) `get_job` + +Retrieve status and progress for an async job. + +**Request** +```json +{ + "method": "get_job", + "params": { "job_id": "job-uuid" } +} +``` + +**Response** +```json +{ + "status": "success", + "result": { + "job_id": "job-uuid", + "status": "queued|running|completed|failed", + "progress": 0.42, + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log", + "result": { + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log" + }, + "error": { + "code": "HARMONIZATION_FAILED", + "message": "...", + "details": {} + } + } +} +``` + +**Possible errors** + +- `MISSING_FIELD` — `job_id` is missing. + - `details.field`: `"job_id"` +- `JOB_NOT_FOUND` — job id not found. + - `details.job_id`: requested job id + +**Progress** +- Progress is **row-based** and reported as a float in `[0.0, 1.0]`. +- Computed as `(processed cells) / (rows * number_of_pairs)`. + +## Call Order + +1. Submit a `harmonize` request. +2. Poll `get_job` using the returned `job_id`. +3. When `status == "completed"`, read `output_path` and `replay_log_path`. + +## Example Call Flow + +The typical client flow is: + +1. Submit a `harmonize` request with absolute paths and the desired rule mode. +2. Receive a `job_id` immediately (the work runs asynchronously). +3. Poll `get_job` until the status is `completed` or `failed`. +4. On completion, read the output CSV and replay log from the returned paths. + +```bash +curl -X POST http://localhost:8000/api \\ + -H "Content-Type: application/json" \\ + -d '{ + "method": "harmonize", + "params": { + "data_file_path": "/abs/input.csv", + "rules_file_path": "/abs/rules.json", + "replay_log_file_path": "/abs/replay.log", + "output_file_path": "/abs/output.csv", + "mode": "all", + "overwrite": true + } + }' +``` + +The server responds with a `job_id` you can use to query progress: + +```json +{ + "status": "accepted", + "job_id": "0c4f5c44-9c2a-4d11-9a8d-1b3e71df4b4a" +} +``` + +```bash +curl -X POST http://localhost:8000/api \\ + -H "Content-Type: application/json" \\ + -d '{ + "method": "get_job", + "params": { "job_id": "job-uuid" } + }' +``` + +Example response while the job is still running: + +```json +{ + "status": "success", + "result": { + "job_id": "0c4f5c44-9c2a-4d11-9a8d-1b3e71df4b4a", + "status": "running", + "progress": 0.37, + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log", + "result": null, + "error": null + } +} +``` + +When `status` becomes `completed`, the `result` object includes the final +`output_path` and `replay_log_path`. If `status` is `failed`, inspect the +`error` object for a stable error code and contextual details. + +```json +{ + "status": "success", + "result": { + "job_id": "0c4f5c44-9c2a-4d11-9a8d-1b3e71df4b4a", + "status": "completed", + "progress": 1.0, + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log", + "result": { + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log" + }, + "error": null + } +} +```