diff --git a/src/harmonization_framework/api/README.md b/src/harmonization_framework/api/README.md new file mode 100644 index 0000000..7b6e405 --- /dev/null +++ b/src/harmonization_framework/api/README.md @@ -0,0 +1,294 @@ +# Harmonization Framework RPC API + +This document describes the RPC-style API used by the Electron app. It explains +the request/response schema, method semantics, and expected call order. + +## Base URL + +``` +http://localhost:8000 +``` + +## Endpoints + +### `GET /health/` + +Health check endpoint. + +**Response** +```json +{ + "status": "ok", + "message": "API is available" +} +``` + +--- + +### `POST /api` + +Single RPC endpoint. Each request specifies a `method` and a `params` object. + +**Request envelope** +```json +{ + "method": "method_name", + "params": { } +} +``` + +**Response envelope (success)** +```json +{ + "status": "success", + "result": { } +} +``` + +**Response envelope (accepted / async)** +```json +{ + "status": "accepted", + "job_id": "job-uuid" +} +``` + +**Response envelope (error)** +```json +{ + "status": "error", + "error": { + "code": "ERROR_CODE", + "message": "Human readable error message", + "details": { } + } +} +``` + +Method names use **snake_case**. The server also accepts camelCase aliases +(e.g., `getJob`) because this is often more convenient for JavaScript clients. + +## Error Codes + +Error codes are intentionally small and stable. Use `error.details` to identify +the specific field/path/context. + +| Code | Meaning | Common details | +|------|---------|----------------| +| `INVALID_PATH` | A required path is not absolute or malformed | `{ "path_type": "...", "path": "..." }` | +| `FILE_NOT_FOUND` | Data or rules file does not exist | `{ "path_type": "...", "path": "..." }` | +| `ALREADY_EXISTS` | Output path exists and overwrite is false | `{ "path_type": "output_file_path", "path": "..." }` | +| `MISSING_FIELD` | Required request field missing | `{ "field": "..." }` | +| `VALIDATION_ERROR` | Request params failed validation | `{ "params": { ... } }` | +| `RULE_NOT_FOUND` | Requested rule pair not found or rules file empty | `{ "source": "...", "target": "..." }` or `{ "path": "..." }` | +| `JOB_NOT_FOUND` | Job ID not found | `{ "job_id": "..." }` | +| `INVALID_FORMAT` | Rules file is invalid JSON/format | `{}` | +| `HARMONIZATION_FAILED` | Harmonization failed during execution | `{}` | +| `METHOD_NOT_FOUND` | Unknown RPC method | `{}` | + +## Methods + +### 1) `harmonize` + +Asynchronously harmonize a CSV dataset using rules from a RuleRegistry JSON file. + +**Request** +```json +{ + "method": "harmonize", + "params": { + "data_file_path": "/abs/input.csv", + "rules_file_path": "/abs/rules.json", + "replay_log_file_path": "/abs/replay.log", + "output_file_path": "/abs/output.csv", + "mode": "pairs", + "pairs": [ + { "source": "col_a", "target": "col_b" } + ], + "overwrite": false + } +} +``` + +**Parameters** +- `data_file_path` (string, required): Absolute path to input CSV. +- `rules_file_path` (string, required): Absolute path to RuleRegistry JSON file + produced by `RuleRegistry.save()`. +- `replay_log_file_path` (string, required): Absolute path for replay log output. +- `output_file_path` (string, required): Absolute path for harmonized CSV output. +- `mode` (string, required): `"pairs"` or `"all"`. + - `"pairs"`: apply only specified pairs. + - `"all"`: apply every rule in the registry file. +- `pairs` (array, required when `mode="pairs"`): List of `{source, target}` mappings. +- `overwrite` (boolean, optional, default `false`): Whether to overwrite existing output. + +**Response** +```json +{ + "status": "accepted", + "job_id": "job-uuid" +} +``` + +**Possible errors** + +- `INVALID_PATH` — one or more provided paths are not absolute. + - `details.path_type`: `data_file_path` | `rules_file_path` | `output_file_path` | `replay_log_file_path` + - `details.path`: the invalid path value +- `FILE_NOT_FOUND` — input file or rules file does not exist. + - `details.path_type`: `data_file_path` or `rules_file_path` + - `details.path`: missing file path +- `ALREADY_EXISTS` — output file exists and `overwrite=false`. + - `details.path_type`: `output_file_path` + - `details.path`: existing output path +- `MISSING_FIELD` — required field missing (e.g., `pairs` when `mode="pairs"`). + - `details.field`: missing field name +- `RULE_NOT_FOUND` — requested pairs not found or no rules in file. + - `details.source` / `details.target` for missing pairs, or `details.path` for empty rules file +- `INVALID_FORMAT` — rules file could not be parsed. + - `details` may be empty +- `VALIDATION_ERROR` — request params failed validation. + - `details.params`: original params object + +**Behavior** +- Paths must be absolute. +- `data_file_path` and `rules_file_path` must exist. +- `output_file_path` must not exist unless `overwrite=true`. +- `replay_log_file_path` will be created if needed. +- Work runs asynchronously; track with `get_job`. + +--- + +### 2) `get_job` + +Retrieve status and progress for an async job. + +**Request** +```json +{ + "method": "get_job", + "params": { "job_id": "job-uuid" } +} +``` + +**Response** +```json +{ + "status": "success", + "result": { + "job_id": "job-uuid", + "status": "queued|running|completed|failed", + "progress": 0.42, + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log", + "result": { + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log" + }, + "error": { + "code": "HARMONIZATION_FAILED", + "message": "...", + "details": {} + } + } +} +``` + +**Possible errors** + +- `MISSING_FIELD` — `job_id` is missing. + - `details.field`: `"job_id"` +- `JOB_NOT_FOUND` — job id not found. + - `details.job_id`: requested job id + +**Progress** +- Progress is **row-based** and reported as a float in `[0.0, 1.0]`. +- Computed as `(processed cells) / (rows * number_of_pairs)`. + +## Call Order + +1. Submit a `harmonize` request. +2. Poll `get_job` using the returned `job_id`. +3. When `status == "completed"`, read `output_path` and `replay_log_path`. + +## Example Call Flow + +The typical client flow is: + +1. Submit a `harmonize` request with absolute paths and the desired rule mode. +2. Receive a `job_id` immediately (the work runs asynchronously). +3. Poll `get_job` until the status is `completed` or `failed`. +4. On completion, read the output CSV and replay log from the returned paths. + +```bash +curl -X POST http://localhost:8000/api \\ + -H "Content-Type: application/json" \\ + -d '{ + "method": "harmonize", + "params": { + "data_file_path": "/abs/input.csv", + "rules_file_path": "/abs/rules.json", + "replay_log_file_path": "/abs/replay.log", + "output_file_path": "/abs/output.csv", + "mode": "all", + "overwrite": true + } + }' +``` + +The server responds with a `job_id` you can use to query progress: + +```json +{ + "status": "accepted", + "job_id": "0c4f5c44-9c2a-4d11-9a8d-1b3e71df4b4a" +} +``` + +```bash +curl -X POST http://localhost:8000/api \\ + -H "Content-Type: application/json" \\ + -d '{ + "method": "get_job", + "params": { "job_id": "job-uuid" } + }' +``` + +Example response while the job is still running: + +```json +{ + "status": "success", + "result": { + "job_id": "0c4f5c44-9c2a-4d11-9a8d-1b3e71df4b4a", + "status": "running", + "progress": 0.37, + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log", + "result": null, + "error": null + } +} +``` + +When `status` becomes `completed`, the `result` object includes the final +`output_path` and `replay_log_path`. If `status` is `failed`, inspect the +`error` object for a stable error code and contextual details. + +```json +{ + "status": "success", + "result": { + "job_id": "0c4f5c44-9c2a-4d11-9a8d-1b3e71df4b4a", + "status": "completed", + "progress": 1.0, + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log", + "result": { + "output_path": "/abs/output.csv", + "replay_log_path": "/abs/replay.log" + }, + "error": null + } +} +``` diff --git a/src/harmonization_framework/api/app.py b/src/harmonization_framework/api/app.py index d5548f4..ba0f05f 100644 --- a/src/harmonization_framework/api/app.py +++ b/src/harmonization_framework/api/app.py @@ -1,7 +1,9 @@ from fastapi import FastAPI from harmonization_framework.api.routes.health import router as health_router +from harmonization_framework.api.routes.rpc import router as rpc_router app = FastAPI(title="Harmonization Framework API") app.include_router(health_router, prefix="/health") +app.include_router(rpc_router, prefix="/api") diff --git a/src/harmonization_framework/api/harmonization_api_examples.md b/src/harmonization_framework/api/harmonization_api_examples.md index bc5de40..40c70a1 100644 --- a/src/harmonization_framework/api/harmonization_api_examples.md +++ b/src/harmonization_framework/api/harmonization_api_examples.md @@ -12,4 +12,57 @@ The following is a demo for using `curl` to interact with the Harmonization Fram ## Example Usage -This section will be updated when the Electron app exposes the relevant workflows. +### Harmonize (RPC) + +```bash +curl -X POST http://localhost:8000/api \ + -H "Content-Type: application/json" \ + -d '{ + "method": "harmonize", + "params": { + "data_file_path": "/absolute/path/to/input.csv", + "rules_file_path": "/absolute/path/to/rules.json", + "replay_log_file_path": "/absolute/path/to/replay.log", + "output_file_path": "/absolute/path/to/output.csv", + "mode": "pairs", + "pairs": [ + {"source": "col_a", "target": "col_b"} + ], + "overwrite": false + } + }' +``` + +### Get Job Status (RPC) + +```bash +curl -X POST http://localhost:8000/api \ + -H "Content-Type: application/json" \ + -d '{ + "method": "get_job", + "params": { + "job_id": "job-uuid" + } + }' +``` + +## Error Responses + +Errors follow a stable schema: + +```json +{ + "status": "error", + "error": { + "code": "FILE_NOT_FOUND", + "message": "Rules file not found: /abs/path/to/rules.json", + "details": { + "path_type": "rules_file_path", + "path": "/abs/path/to/rules.json" + } + } +} +``` + +The `code` is a small, stable set of values. Use `details` to determine +which field/path caused the error. diff --git a/src/harmonization_framework/api/routes/__init__.py b/src/harmonization_framework/api/routes/__init__.py index 58055d8..7347021 100644 --- a/src/harmonization_framework/api/routes/__init__.py +++ b/src/harmonization_framework/api/routes/__init__.py @@ -1,7 +1,9 @@ # src/api/routes/__init__.py from .health import router as health_router +from .rpc import router as rpc_router __all__ = [ "health_router", + "rpc_router", ] diff --git a/src/harmonization_framework/api/routes/rpc.py b/src/harmonization_framework/api/routes/rpc.py new file mode 100644 index 0000000..720f4a3 --- /dev/null +++ b/src/harmonization_framework/api/routes/rpc.py @@ -0,0 +1,77 @@ +from fastapi import APIRouter + +from harmonization_framework.api.rpc_errors import ErrorCode, build_error +from harmonization_framework.api.rpc_handlers import handle_get_job, handle_harmonize +from harmonization_framework.api.rpc_models import RpcRequest, RpcResponse + +router = APIRouter() + +""" +RPC API router. + +Implements a single POST /api endpoint with method dispatch. Currently supported: +- harmonize: async CSV harmonization with row-based progress tracking +- get_job: retrieve status/progress/result for a job + +Method names use snake_case. The router also accepts camelCase aliases +(e.g., getJob) for convenience. + +Error handling: +- Error codes are intentionally small and stable (e.g., MISSING_FIELD, INVALID_PATH, + FILE_NOT_FOUND, INVALID_FORMAT, VALIDATION_ERROR, RULE_NOT_FOUND, JOB_NOT_FOUND). +- Callers should use the `details` object to identify which field/path caused the error. + Example: {"field": "rules_file_path"} or {"path_type": "rules_file_path", "path": "/abs/..."}. +""" + + +def _normalize_method(method: str) -> str: + """ + Normalize method names to snake_case and support legacy/camelCase aliases. + """ + aliases = { + "getJob": "get_job", + "harmonize": "harmonize", + "get_job": "get_job", + } + return aliases.get(method, method) + + +@router.post("") +def rpc_call(request: RpcRequest) -> RpcResponse: + """ + Dispatch RPC methods and return a standardized response envelope. + + Supported methods (snake_case; camelCase aliases accepted): + - harmonize: + params: + data_file_path: absolute path to input CSV + rules_file_path: absolute path to RuleRegistry JSON + replay_log_file_path: absolute path to write replay log + output_file_path: absolute path to write harmonized CSV + mode: "pairs" | "all" + pairs: optional list of {source, target} when mode="pairs" + overwrite: boolean (default false) + + response: + status: "accepted" + job_id: string + + - get_job: + params: + job_id: string + + response: + status: "success" + result: { + job_id, status, progress, output_path, replay_log_path, result, error + } + """ + method = _normalize_method(request.method) + + if method == "harmonize": + return handle_harmonize(request) + + if method == "get_job": + return handle_get_job(request) + + return build_error(ErrorCode.METHOD_NOT_FOUND, f"Unknown method: {request.method}") diff --git a/src/harmonization_framework/api/rpc_errors.py b/src/harmonization_framework/api/rpc_errors.py new file mode 100644 index 0000000..41eb4c9 --- /dev/null +++ b/src/harmonization_framework/api/rpc_errors.py @@ -0,0 +1,50 @@ +from enum import Enum +from typing import Dict, Optional + +from pydantic import BaseModel + + +class ErrorDetail(BaseModel): + """Structured error payload for RPC responses.""" + code: str + message: str + details: Optional[Dict] = None + + +class ErrorCode(str, Enum): + """ + Stable error codes used across RPC responses. + + These codes are intentionally small in number; use `error.details` to identify + the field/path/context that caused the failure. + """ + # A required path is not absolute or is malformed. + INVALID_PATH = "INVALID_PATH" + # An expected file does not exist (data_path or rules_path). + FILE_NOT_FOUND = "FILE_NOT_FOUND" + # Output path exists and overwrite was not allowed. + ALREADY_EXISTS = "ALREADY_EXISTS" + # A required field is missing from the request params. + MISSING_FIELD = "MISSING_FIELD" + # Request params failed model validation. + VALIDATION_ERROR = "VALIDATION_ERROR" + # Requested rule pairs cannot be found in the registry. + RULE_NOT_FOUND = "RULE_NOT_FOUND" + # Job id does not correspond to a known job. + JOB_NOT_FOUND = "JOB_NOT_FOUND" + # Rules file could not be parsed or is invalid. + INVALID_FORMAT = "INVALID_FORMAT" + # Harmonization failed during execution. + HARMONIZATION_FAILED = "HARMONIZATION_FAILED" + # Method name is unknown. + METHOD_NOT_FOUND = "METHOD_NOT_FOUND" + + +def build_error(code: ErrorCode, message: str, details: Optional[Dict] = None) -> "RpcResponse": + """Helper to build a standardized error response.""" + from harmonization_framework.api.rpc_models import RpcResponse + + return RpcResponse( + status="error", + error=ErrorDetail(code=code.value, message=message, details=details), + ) diff --git a/src/harmonization_framework/api/rpc_handlers.py b/src/harmonization_framework/api/rpc_handlers.py new file mode 100644 index 0000000..f705c3a --- /dev/null +++ b/src/harmonization_framework/api/rpc_handlers.py @@ -0,0 +1,242 @@ +import os +import threading +import uuid +from typing import Dict, List, Optional, Tuple + +import pandas as pd + +from harmonization_framework.harmonize import harmonize_dataset +from harmonization_framework.replay_log import replay_logger as rlog +from harmonization_framework.rule_registry import RuleRegistry +from harmonization_framework.api.rpc_errors import ErrorCode, build_error +from harmonization_framework.api.rpc_jobs import ( + JobId, + JobInfo, + get_job, + register_job, + update_job_status, + update_progress, +) +from harmonization_framework.api.rpc_models import HarmonizeParams, RpcRequest, RpcResponse + + +def _validate_paths(params: HarmonizeParams) -> Optional[RpcResponse]: + """ + Validate input/output/replay paths and overwrite behavior. + + Rules: + - All paths must be absolute. + - data_file_path and rules_file_path must exist. + - output_file_path must not exist unless overwrite=True. + - replay_log_file_path may be a new file and will be created if needed. + """ + for path_name, path_value in [ + ("data_file_path", params.data_file_path), + ("rules_file_path", params.rules_file_path), + ("output_file_path", params.output_file_path), + ("replay_log_file_path", params.replay_log_file_path), + ]: + if not os.path.isabs(path_value): + return build_error( + ErrorCode.INVALID_PATH, + f"{path_name} must be an absolute path", + details={"path": path_value, "path_type": path_name}, + ) + + if not os.path.exists(params.data_file_path): + return build_error( + ErrorCode.FILE_NOT_FOUND, + f"Data file not found: {params.data_file_path}", + details={"path": params.data_file_path, "path_type": "data_path"}, + ) + if not os.path.exists(params.rules_file_path): + return build_error( + ErrorCode.FILE_NOT_FOUND, + f"Rules file not found: {params.rules_file_path}", + details={"path": params.rules_file_path, "path_type": "rules_path"}, + ) + + if os.path.exists(params.output_file_path) and not params.overwrite: + return build_error( + ErrorCode.ALREADY_EXISTS, + f"Output path already exists: {params.output_file_path}", + details={"path": params.output_file_path, "path_type": "output_path"}, + ) + + return None + + +def _load_rules(params: HarmonizeParams) -> Tuple[Optional[RuleRegistry], Optional[RpcResponse]]: + """Load a RuleRegistry from a rules JSON file.""" + registry = RuleRegistry() + try: + registry.load(params.rules_file_path, clean=True) + except Exception as exc: + return None, build_error(ErrorCode.INVALID_FORMAT, f"Failed to load rules: {exc}") + return registry, None + + +def _resolve_harmonization_pairs( + params: HarmonizeParams, registry: RuleRegistry +) -> Tuple[Optional[List[Tuple[str, str]]], Optional[RpcResponse]]: + """ + Resolve requested (source, target) pairs based on mode and availability. + + - mode="all": return all pairs from the rules registry (error if empty). + - mode="pairs": validate that requested pairs exist in the registry. + + Returns (pairs, error). On error, pairs is None and error is a RpcResponse. + """ + if params.mode == "all": + pairs = registry.list_pairs() + if not pairs: + return None, build_error( + ErrorCode.RULE_NOT_FOUND, + "No rules found in rules file", + details={"path": params.rules_file_path}, + ) + return pairs, None + + if not params.harmonization_pairs: + return None, build_error( + ErrorCode.MISSING_FIELD, + "pairs is required when mode is 'pairs'", + details={"field": "pairs"}, + ) + + pairs = [(pair.source, pair.target) for pair in params.harmonization_pairs] + for source, target in pairs: + try: + registry.query(source, target) + except Exception: + return None, build_error( + ErrorCode.RULE_NOT_FOUND, + f"Rule not found for source={source} target={target}", + details={"source": source, "target": target}, + ) + return pairs, None + + +def _run_harmonize(job_id: JobId, params: HarmonizeParams) -> None: + """ + Worker that performs harmonization and updates job state. + + Workflow: + 1) Validate paths and overwrite behavior. + 2) Load rules from the registry JSON file. + 3) Resolve rule pairs based on the requested mode. + 4) Create output/log directories as needed. + 5) Read input CSV, apply harmonization with row-based progress callbacks. + 6) Write output CSV and finalize job state. + + On failure, sets job status to "failed" and records a structured error. + """ + update_job_status(job_id, status="running", progress=0.0) + + validation_error = _validate_paths(params) + if validation_error: + update_job_status(job_id, status="failed", error=validation_error.error.model_dump()) + return + + registry, error = _load_rules(params) + if error: + update_job_status(job_id, status="failed", error=error.error.model_dump()) + return + + pairs, error = _resolve_harmonization_pairs(params, registry) + if error: + update_job_status(job_id, status="failed", error=error.error.model_dump()) + return + + os.makedirs(os.path.dirname(params.output_file_path), exist_ok=True) + os.makedirs(os.path.dirname(params.replay_log_file_path), exist_ok=True) + + dataset = pd.read_csv(params.data_file_path) + logger = rlog.configure_logger(3, params.replay_log_file_path) + + def progress_callback(processed: int, total: int) -> None: + update_progress(job_id, processed, total) + + try: + harmonized = harmonize_dataset( + dataset=dataset, + harmonization_pairs=pairs, + rules=registry, + dataset_name=os.path.basename(params.data_file_path), + logger=logger, + progress_callback=progress_callback, + ) + harmonized.to_csv(params.output_file_path, index=False) + except Exception as exc: + update_job_status( + job_id, + status="failed", + error={ + "code": ErrorCode.HARMONIZATION_FAILED.value, + "message": str(exc), + }, + ) + return + + update_job_status( + job_id, + status="completed", + progress=1.0, + result={ + "output_path": params.output_file_path, + "replay_log_path": params.replay_log_file_path, + }, + ) + + +def handle_harmonize(request: RpcRequest) -> RpcResponse: + """Handle the harmonize RPC method.""" + try: + params = HarmonizeParams(**request.params) + except Exception as exc: + return build_error(ErrorCode.VALIDATION_ERROR, str(exc), details={"params": request.params}) + + job_id = JobId(str(uuid.uuid4())) + job = JobInfo( + job_id=job_id, + status="queued", + progress=0.0, + output_path=params.output_file_path, + replay_log_path=params.replay_log_file_path, + ) + register_job(job) + + thread = threading.Thread(target=_run_harmonize, args=(job_id, params), daemon=True) + thread.start() + return RpcResponse(status="accepted", job_id=job_id) + + +def handle_get_job(request: RpcRequest) -> RpcResponse: + """Handle the get_job RPC method.""" + job_id_value = request.params.get("job_id") + if not job_id_value: + return build_error( + ErrorCode.MISSING_FIELD, + "job_id is required", + details={"field": "job_id"}, + ) + job_id = JobId(job_id_value) + job = get_job(job_id) + if not job: + return build_error( + ErrorCode.JOB_NOT_FOUND, + f"Job not found: {job_id}", + details={"job_id": job_id}, + ) + return RpcResponse( + status="success", + result={ + "job_id": job.job_id, + "status": job.status, + "progress": job.progress, + "output_path": job.output_path, + "replay_log_path": job.replay_log_path, + "result": job.result, + "error": job.error, + }, + ) diff --git a/src/harmonization_framework/api/rpc_jobs.py b/src/harmonization_framework/api/rpc_jobs.py new file mode 100644 index 0000000..73c6207 --- /dev/null +++ b/src/harmonization_framework/api/rpc_jobs.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass +from typing import Dict, Optional, NewType +import threading + + +JobId = NewType("JobId", str) + + +@dataclass +class JobInfo: + """ + In-memory job state for async operations. + + Fields: + job_id: Unique identifier for the job. + status: One of queued|running|completed|failed. + progress: Float in [0.0, 1.0] representing completion. + output_path: Target CSV path for the harmonized output. + replay_log_path: Path where the replay log is written. + error: Optional structured error payload (matches ErrorDetail schema). + result: Optional result payload (e.g., output/replay paths). + """ + job_id: JobId + status: str + progress: float + output_path: str + replay_log_path: str + error: Optional[Dict] = None + result: Optional[Dict] = None + + +# In-memory job registry guarded by a lock for thread-safe updates. +_jobs: Dict[JobId, JobInfo] = {} +_jobs_lock = threading.Lock() + + +def register_job(job: JobInfo) -> None: + with _jobs_lock: + _jobs[job.job_id] = job + + +def get_job(job_id: JobId) -> Optional[JobInfo]: + with _jobs_lock: + return _jobs.get(job_id) + + +def update_progress(job_id: JobId, processed: int, total: int) -> None: + with _jobs_lock: + job = _jobs.get(job_id) + if not job: + return + if total == 0: + job.progress = 1.0 + else: + job.progress = min(1.0, processed / total) + + +def update_job_status( + job_id: JobId, + status: str, + progress: Optional[float] = None, + error: Optional[Dict] = None, + result: Optional[Dict] = None, +) -> None: + with _jobs_lock: + job = _jobs.get(job_id) + if not job: + return + job.status = status + if progress is not None: + job.progress = progress + if error is not None: + job.error = error + if result is not None: + job.result = result diff --git a/src/harmonization_framework/api/rpc_models.py b/src/harmonization_framework/api/rpc_models.py new file mode 100644 index 0000000..548c2ac --- /dev/null +++ b/src/harmonization_framework/api/rpc_models.py @@ -0,0 +1,56 @@ +from typing import Dict, List, Literal, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class HarmonizationPair(BaseModel): + """Source/target column pair.""" + source: str = Field(..., description="Source column name") + target: str = Field(..., description="Target column name") + + +class HarmonizeParams(BaseModel): + """ + Parameters for the harmonize RPC call. + + Required: + data_file_path: absolute path to the input CSV file. + rules_file_path: absolute path to a RuleRegistry JSON file. + replay_log_file_path: absolute path for the replay log output. + output_file_path: absolute path for the harmonized CSV output. + mode: "pairs" to apply a specific subset of rules, or "all" to apply all rules. + + Optional: + harmonization_pairs: list of source/target column mappings (required when mode="pairs"). + overwrite: when True, allows output_path to be overwritten if it already exists. + """ + data_file_path: str + rules_file_path: str + replay_log_file_path: str + output_file_path: str + mode: Literal["pairs", "all"] + harmonization_pairs: Optional[List[HarmonizationPair]] = Field( + None, + alias="pairs", + description="List of (source, target) column mappings when mode='pairs'.", + ) + overwrite: bool = False + + model_config = ConfigDict(populate_by_name=True) + + +class RpcRequest(BaseModel): + """RPC request envelope with method name and parameters payload.""" + method: str + params: Dict + + +class RpcResponse(BaseModel): + """RPC response envelope.""" + status: str + result: Optional[Dict] = None + error: Optional["ErrorDetail"] = None + job_id: Optional[str] = None + + +from harmonization_framework.api.rpc_errors import ErrorDetail # noqa: E402 diff --git a/src/harmonization_framework/harmonize.py b/src/harmonization_framework/harmonize.py index d54784d..dea3db2 100644 --- a/src/harmonization_framework/harmonize.py +++ b/src/harmonization_framework/harmonize.py @@ -1,7 +1,7 @@ import os import pandas as pd -from typing import List, Optional, Tuple +from typing import Callable, List, Optional, Tuple from .rule_registry import RuleRegistry from .replay_log import replay_logger as rlog @@ -13,6 +13,7 @@ def harmonize_dataset( rules: RuleRegistry, dataset_name: str, logger=None, + progress_callback: Optional[Callable[[int, int], None]] = None, ) -> pd.DataFrame: """ Apply harmonization rules to the provided dataset and return a new dataframe. @@ -28,6 +29,7 @@ def harmonize_dataset( rules: RuleRegistry with harmonization rules keyed by source/target. dataset_name: Name used for the `source dataset` metadata column. logger: Optional replay logger for recording applied rules. + progress_callback: Optional callback invoked with (processed, total) counts. """ # make a new dataframe with the same number of rows and columns # and rename the columns @@ -37,6 +39,9 @@ def harmonize_dataset( inplace=True, ) # apply harmonization rule to each column + total_steps = len(dataset) * len(harmonization_pairs) if harmonization_pairs else 0 + processed = 0 + for source, target in harmonization_pairs: print(f"Requested rule: {source} -> {target}") rule = rules.query(source, target) @@ -44,7 +49,16 @@ def harmonize_dataset( if logger: rlog.log_operation(logger, rule, dataset_name) dataset_harmonized.rename(columns={source: target}, inplace=True) - dataset_harmonized[target] = dataset[source].apply(rule.transform) + + def transform_with_progress(value): + nonlocal processed + result = rule.transform(value) + processed += 1 + if progress_callback: + progress_callback(processed, total_steps) + return result + + dataset_harmonized[target] = dataset[source].apply(transform_with_progress) # save source dataset dataset_harmonized["source dataset"] = [dataset_name] * len(dataset) # save old ids diff --git a/src/harmonization_framework/rule_registry.py b/src/harmonization_framework/rule_registry.py index 0e1ceac..f22e09a 100644 --- a/src/harmonization_framework/rule_registry.py +++ b/src/harmonization_framework/rule_registry.py @@ -42,6 +42,16 @@ def add_rule(self, rule: HarmonizationRule): print(f"Warning: rule already exists for source {rule.source} and target {rule.target}.") self._rules[source][target] = rule + def list_pairs(self): + """ + Return a list of (source, target) pairs for all stored rules. + """ + pairs = [] + for source, targets in self._rules.items(): + for target in targets: + pairs.append((source, target)) + return pairs + def clean(self): """ Remove all rules from the store. diff --git a/tests/test_rpc_paths.py b/tests/test_rpc_paths.py new file mode 100644 index 0000000..c9a1ace --- /dev/null +++ b/tests/test_rpc_paths.py @@ -0,0 +1,80 @@ +import os + +from harmonization_framework.api.rpc_models import HarmonizeParams +from harmonization_framework.api.rpc_handlers import _validate_paths + + +def test_validate_paths_rejects_relative_paths(tmp_path): + params = HarmonizeParams( + data_file_path="relative.csv", + rules_file_path=str(tmp_path / "rules.json"), + output_file_path=str(tmp_path / "out.csv"), + replay_log_file_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is not None + assert response.error.code == "INVALID_PATH" + assert response.error.details["path"] == "relative.csv" + assert response.error.details["path_type"] == "data_file_path" + + +def test_validate_paths_missing_inputs(tmp_path): + data_path = tmp_path / "input.csv" + rules_path = tmp_path / "rules.json" + + params = HarmonizeParams( + data_file_path=str(data_path), + rules_file_path=str(rules_path), + output_file_path=str(tmp_path / "out.csv"), + replay_log_file_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is not None + assert response.error.code == "FILE_NOT_FOUND" + assert response.error.details["path_type"] == "data_path" + + +def test_validate_paths_rejects_existing_output_without_overwrite(tmp_path): + data_path = tmp_path / "input.csv" + rules_path = tmp_path / "rules.json" + output_path = tmp_path / "out.csv" + + data_path.write_text("a,b\n1,2\n") + rules_path.write_text("{}") + output_path.write_text("already") + + params = HarmonizeParams( + data_file_path=str(data_path), + rules_file_path=str(rules_path), + output_file_path=str(output_path), + replay_log_file_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is not None + assert response.error.code == "ALREADY_EXISTS" + assert response.error.details["path"] == str(output_path) + + +def test_validate_paths_accepts_valid_paths(tmp_path): + data_path = tmp_path / "input.csv" + rules_path = tmp_path / "rules.json" + + data_path.write_text("a,b\n1,2\n") + rules_path.write_text("{}") + + params = HarmonizeParams( + data_file_path=str(data_path), + rules_file_path=str(rules_path), + output_file_path=str(tmp_path / "out.csv"), + replay_log_file_path=str(tmp_path / "replay.log"), + mode="all", + overwrite=False, + ) + response = _validate_paths(params) + assert response is None