Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/lib/pipeline/pipeline-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { describe, it, expect } from 'vitest';
import { PipelineManager, handlePipelineError, PipelineNode, PipelineContext } from './pipeline-manager';

describe('handlePipelineError', () => {
it('returns error message from string', () => {
const result = handlePipelineError('Something went wrong');
expect(result).toEqual({ success: false, error: 'Something went wrong' });
});
it('returns error message from Error object', () => {
const err = new Error('Failure');
const result = handlePipelineError(err);
expect(result).toEqual({ success: false, error: 'Failure' });
});
it('returns default message for unknown input', () => {
const result = handlePipelineError(42);
expect(result).toEqual({ success: false, error: 'Pipeline operation failed.' });
});
it('can include context (no-op)', () => {
const result = handlePipelineError('msg', { foo: 'bar' });
expect(result).toEqual({ success: false, error: 'msg' });
});
it('handles null error', () => {
const result = handlePipelineError(null);
expect(result).toEqual({ success: false, error: 'Pipeline operation failed.' });
});
});

describe('PipelineManager', () => {
it('executes set_variable node', async () => {
const nodes: PipelineNode[] = [
{ id: 'n1', type: 'set_variable', data: { key: 'x', value: 42 } }
];
const mgr = new PipelineManager(nodes);
const result = await mgr.execute({ variables: {} });
expect(result).toEqual({ success: true, data: { x: 42 } });
});
it('returns error on unknown node type', async () => {
const nodes: PipelineNode[] = [
{ id: 'n1', type: 'unknown_type' }
];
const mgr = new PipelineManager(nodes);
const result = await mgr.execute({ variables: {} });
expect(result.success).toBe(false);
expect(typeof result.error).toBe('string');
});
it('updates context for multiple nodes', async () => {
const nodes: PipelineNode[] = [
{ id: 'a', type: 'set_variable', data: { key: 'x', value: 1 } },
{ id: 'b', type: 'set_variable', data: { key: 'y', value: 2 } }
];
const mgr = new PipelineManager(nodes);
const result = await mgr.execute({ variables: { z: 3 } });
expect(result).toEqual({ success: true, data: { z: 3, x: 1, y: 2 } });
});
it('handles empty pipeline', async () => {
const mgr = new PipelineManager([]);
const result = await mgr.execute({ variables: { a: 1 } });
expect(result).toEqual({ success: true, data: { a: 1 } });
});
it('handles node with missing data', async () => {
const nodes: PipelineNode[] = [
{ id: 'n1', type: 'set_variable' }
];
const mgr = new PipelineManager(nodes);
const result = await mgr.execute({ variables: {} });
expect(result.success).toBe(false);
expect(typeof result.error).toBe('string');
});
});
89 changes: 89 additions & 0 deletions src/lib/pipeline/pipeline-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
export interface PipelineContext {
variables: Record<string, unknown>;
}

export interface PipelineNode {
id: string;
type: string;
data?: Record<string, unknown>;
}

export interface PipelineResult {
success: boolean;
data?: unknown;
error?: string;
}

/**
* Helper function for consistent error handling in pipeline operations.
* @param error - Error object or string
* @param context - Optional extra info for logging
* @returns PipelineResult with success: false and error message
*/
export function handlePipelineError(error: unknown, context?: Record<string, unknown>): PipelineResult {
let message = 'Pipeline operation failed.';
if (typeof error === 'string') message = error;
else if (error instanceof Error) message = error.message;
// Here you would log the error with context if a logger was available
// logger.error('Pipeline error', { error, ...context });
return { success: false, error: message };
}

/**
* Manages execution of a pipeline of nodes with provided context.
*/
export class PipelineManager {
/**
* Create a PipelineManager.
* @param nodes - Array of pipeline nodes
*/
constructor(private nodes: PipelineNode[]) {}

/**
* Executes the pipeline starting from the first node, updating context as it proceeds.
* Returns the final PipelineResult.
* @param context - Initial pipeline context
*/
async execute(context: PipelineContext): Promise<PipelineResult> {
let currentContext = { ...context };
try {
for (const node of this.nodes) {
const result = await this.executeNode(node, currentContext);
if (!result.success) return result;
currentContext = {
...currentContext,
variables: {
...currentContext.variables,
...(typeof result.data === 'object' && result.data !== null ? result.data : {})
}
};
}
return { success: true, data: currentContext.variables };
} catch (error) {
return handlePipelineError(error);
}
}

/**
* Executes a single pipeline node. Override to implement node-specific logic.
* @param node - Pipeline node to execute
* @param context - Current pipeline context
*/
async executeNode(node: PipelineNode, context: PipelineContext): Promise<PipelineResult> {
try {
// Example logic: set a variable based on node data
if (node.type === 'set_variable' && node.data && typeof node.data.key === 'string') {
const key = node.data.key;
const value = node.data.value;
return {
success: true,
data: { [key]: value }
};
}
// Unknown node type
return handlePipelineError(`Unknown node type: ${node.type}`, { node });
} catch (error) {
return handlePipelineError(error, { node });
}
}
}