Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ venv
build
__pycache__
*.db
src/harmonization_framework.egg-info
src/harmonization_framework.egg-info
node_modules
replay.log
61 changes: 61 additions & 0 deletions demo/client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# RPC Client Demo (TypeScript + fetch)

This folder contains a minimal client that calls the Harmonization Framework RPC API
and polls for completion. It uses **plain Node + fetch** (no extra dependencies required
if you are on Node 18+).

> Note: This is a demo client intended for learning and quick validation, not a production-ready SDK.

## Prerequisites
- The harmonization API server is running:

```bash
./scripts/run_api.sh
```

- Node 18+ (for built-in `fetch`)
- TypeScript users may need Node type definitions:
`npm i --save-dev @types/node`

If you want a quick setup in this folder, run:

```bash
cd demo/client
npm install
```

## Run the demo
From the `demo/client` directory:

```bash
node harmonize_example_client.ts
```

If you prefer to run with ts-node:

```bash
npx ts-node harmonize_example_client.ts
```

## What it does
- Sends a `harmonize` RPC request using the same input and rules as
`demo/harmonize_example/`.
- Polls `get_job` until the job completes or fails.
- Prints progress and output paths.
- RPC response shapes are documented in `rpc_types.ts`.
- Error responses include an error code, message, and optional details.

## Reusable client
`rpc_client.ts` contains a small `RpcClient` class that can be reused by other
scripts or integrations. It wraps the `/api` RPC calls, handles timeouts, and
raises rich errors when the server returns an error payload.

## Outputs
The demo writes the harmonized CSV and replay log to:

- `demo/harmonize_example/output.csv`
- `demo/harmonize_example/replay.log`

## Notes
- The client uses absolute paths resolved from the script location (`demo/client`).
- If the output files already exist, the demo sets `overwrite: true`.
115 changes: 115 additions & 0 deletions demo/client/harmonize_example_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
Minimal RPC client example (Node + fetch).

This script shows how to call the Harmonization Framework RPC API from a
simple Node/TypeScript client and poll for completion.

Assumptions:
- The harmonization server is already running on http://localhost:8000
- You run this script from the repo root so relative paths resolve

Run (Node 18+):
node demo/client/harmonize_example_client.ts
or with ts-node:
npx ts-node demo/client/harmonize_example_client.ts
*/

import * as path from "node:path";
import { fileURLToPath } from "node:url";
import { GetJobResponse, HarmonizeRequest, HarmonizeResponse, JOB_STATUS } from "./rpc_types.js";
import { RpcClient } from "./rpc_client.js";

// The single RPC endpoint exposed by the server.
const API_URL = "http://localhost:8000/api";
const HARMONIZE_TIMEOUT_MS = 20_000;
const GET_JOB_TIMEOUT_MS = 5_000;

// Simple delay helper used for polling.
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

// Build an absolute path from this script's directory.
// This lets the script run from demo/client without relying on process.cwd().
const clientDir = path.dirname(fileURLToPath(import.meta.url));
function buildAbsPath(...parts: string[]): string {
return path.resolve(clientDir, ...parts);
}

// Generic RPC caller. All methods are POSTs to /api with { method, params }.
async function main() {
const client = new RpcClient(API_URL);
// Use the demo inputs/rules from demo/harmonize_example.
// This script lives in demo/client, so go up one level to demo/.
const dataFilePath = buildAbsPath("..", "harmonize_example", "input.csv");
const rulesFilePath = buildAbsPath("..", "harmonize_example", "rules.json");
const replayLogFilePath = buildAbsPath("..", "harmonize_example", "replay.log");
const outputFilePath = buildAbsPath("..", "harmonize_example", "output.csv");

// Start harmonization. The server returns a job_id you can poll.
const harmonizeParams: HarmonizeRequest = {
data_file_path: dataFilePath,
rules_file_path: rulesFilePath,
replay_log_file_path: replayLogFilePath,
output_file_path: outputFilePath,
mode: "pairs",
pairs: [
{ source: "age", target: "age_years" },
{ source: "weight_lbs", target: "weight_kg" },
{ source: "name", target: "given_name" },
{ source: "name", target: "family_name" },
{ source: "visit_type_code", target: "visit_type_label" },
],
overwrite: true,
};

const startResponse = await client.call<HarmonizeResponse>({
method: "harmonize",
params: harmonizeParams,
timeoutMs: HARMONIZE_TIMEOUT_MS,
});

const jobId = startResponse.job_id;
console.log(`Harmonization started. job_id=${jobId}`);

// Poll until completion.
while (true) {
const statusResponse = await client.call<GetJobResponse>({
method: "get_job",
params: { job_id: jobId },
timeoutMs: GET_JOB_TIMEOUT_MS,
});

const job = statusResponse.result;
const percent = Math.round(job.progress * 100);
console.log(`Progress: ${percent}%`);

if (job.status === JOB_STATUS.COMPLETED) {
console.log("Harmonization completed.");
console.log(`Output: ${job.output_path}`);
console.log(`Replay log: ${job.replay_log_path}`);
break;
}

if (job.status === JOB_STATUS.FAILED) {
console.error("Harmonization failed.");
console.error(job.error ?? "Unknown error");
break;
}

await sleep(500);
}
}

main().catch((err) => {
if (err && typeof err === "object" && "message" in err) {
console.error((err as Error).message);
const details = (err as Error & { details?: unknown }).details;
if (details) {
console.error("Details:", details);
}
} else {
console.error(err);
}
process.exit(1);
});
30 changes: 30 additions & 0 deletions demo/client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions demo/client/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "harmonization-client-demo",
"private": true,
"type": "module",
"devDependencies": {
"@types/node": "^20.11.30"
}
}
58 changes: 58 additions & 0 deletions demo/client/rpc_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { RpcError } from "./rpc_types.js";

export interface RpcCallOptions {
method: string;
// JSON-serializable params object for the RPC method.
params: object;
timeoutMs?: number;
}

const DEFAULT_TIMEOUT_MS = 10_000;

export class RpcClient {
private apiUrl: string;

constructor(apiUrl: string) {
this.apiUrl = apiUrl;
}

async call<T>(options: RpcCallOptions): Promise<T> {
// Extract request options and apply a default timeout if not provided.
const { method, params, timeoutMs = DEFAULT_TIMEOUT_MS } = options;

// Use AbortController so we can time out the request cleanly.
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);

// Send the RPC request. All methods are POSTed to /api with { method, params }.
const res = await fetch(this.apiUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ method, params }),
signal: controller.signal,
});

// Clear the timeout once the request completes.
clearTimeout(timeoutId);

// Parse the JSON response body.
const data = await res.json();

// If the HTTP response is not OK, or the RPC response indicates an error,
// convert it into a thrown Error with a stable code and optional details.
if (!res.ok || data.status === "error") {
const err: RpcError | undefined = data?.error;
const code = err?.code ?? "RPC_ERROR";
const message = err?.message ?? `RPC call failed (${res.status})`;
const details = err?.details;
const error = new Error(`${code}: ${message}`);
if (details) {
(error as Error & { details?: unknown }).details = details;
}
throw error;
}

// Successful response: return the parsed data as the expected type.
return data as T;
}
}
73 changes: 73 additions & 0 deletions demo/client/rpc_types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Status values returned by the get_job endpoint for an in-flight or finished job.
export const JOB_STATUS = {
QUEUED: "queued",
RUNNING: "running",
COMPLETED: "completed",
FAILED: "failed",
} as const;

export type JobStatus = (typeof JOB_STATUS)[keyof typeof JOB_STATUS];



// Params accepted by the harmonize RPC method.
export interface HarmonizeRequest {
// Absolute path to the input CSV file.
data_file_path: string;
// Absolute path to the rules JSON file (RuleRegistry.save()).
rules_file_path: string;
// Absolute path where the replay log should be written.
replay_log_file_path: string;
// Absolute path where the output CSV should be written.
output_file_path: string;
// "pairs" applies only the specified pairs; "all" uses all rules in the registry.
mode: "pairs" | "all";
// Explicit list of (source, target) pairs to harmonize.
pairs?: Array<{ source: string; target: string }>;
// Overwrite output and replay log files if they already exist.
overwrite?: boolean;
}

// Response returned by the harmonize RPC method.
export interface HarmonizeResponse {
// Always "accepted" for a successful request.
status: string;
// Opaque job identifier used to poll progress and results.
job_id: string;
}

// Details for a harmonization job returned by get_job.
export interface JobInfo {
// The same job_id returned by the harmonize call.
job_id: string;
// Job lifecycle status.
status: JobStatus;
// Progress as a fraction from 0.0 to 1.0.
progress: number;
// Path to the output CSV on success.
output_path?: string;
// Path to the replay log on success.
replay_log_path?: string;
// Optional result payload (if provided by the server).
result?: unknown;
// Error payload (if the job failed).
error?: unknown;
}

// Response returned by the get_job RPC method.
export interface GetJobResponse {
// Always "success" for a successful request.
status: string;
// Job details payload.
result: JobInfo;
}

// Standard error payload returned by the RPC API.
export interface RpcError {
// Error code string (e.g., FILE_NOT_FOUND, JOB_NOT_FOUND).
code: string;
// Human-readable error message.
message: string;
// Optional error details (field/path/job_id/etc).
details?: Record<string, unknown> | null;
}
10 changes: 10 additions & 0 deletions demo/client/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"compilerOptions": {
"target": "ES2020",
"lib": ["ES2020", "DOM"],
"module": "ESNext",
"moduleResolution": "Node",
"types": ["node"],
"strict": false
}
}
Loading