From b4d144fa293cce32f7816ef42403b46ad0165ea1 Mon Sep 17 00:00:00 2001 From: Mani Varshith Date: Mon, 25 May 2026 21:25:06 +0530 Subject: [PATCH 1/3] feat(cli): add langship pipelines validate command Local-only validation that checks pipeline definitions before hitting the server. Validates: - valid JSON/YAML parsing - nodes is a non-empty array - each node has id, name, type (non-empty strings) - no duplicate node names or ids - exactly one trigger node (sync'd with pkg/engine/parser.go types) - connections reference valid node names - position is an array of 2 numbers --- .../src/langship/commands/pipelines.py | 157 +++++++++++++++++- 1 file changed, 156 insertions(+), 1 deletion(-) diff --git a/langship-cli/src/langship/commands/pipelines.py b/langship-cli/src/langship/commands/pipelines.py index 2dfdfb1..50a9465 100644 --- a/langship-cli/src/langship/commands/pipelines.py +++ b/langship-cli/src/langship/commands/pipelines.py @@ -1,15 +1,18 @@ -"""pipelines list / get / push / delete. +"""pipelines list / get / push / delete / validate. `push` takes a local file containing the pipeline definition (n8n-format JSON, or YAML if PyYAML is installed). If the file (or the top-level object) carries an `id`, the pipeline is updated; otherwise a new one is created and its id printed. The file is the source of truth — GitOps. + +`validate` checks the definition locally before hitting the server. """ from __future__ import annotations import json from pathlib import Path +from typing import Any import typer @@ -18,6 +21,36 @@ app = typer.Typer(no_args_is_help=True) +# n8n trigger types that map to flow-nodes-base.trigger — kept in sync with +# pkg/engine/parser.go so local validation matches server behaviour. +_TRIGGER_TYPES = frozenset({ + "n8n-nodes-base.webhook", + "n8n-nodes-base.scheduleTrigger", + "n8n-nodes-base.manualTrigger", + "n8n-nodes-base.formTrigger", + "n8n-nodes-base.activationTrigger", + "n8n-nodes-base.errorTrigger", + "n8n-nodes-base.executeWorkflowTrigger", + "n8n-nodes-base.n8nTrigger", + "n8n-nodes-base.sseTrigger", + "n8n-nodes-base.workflowTrigger", + "n8n-nodes-base.localFileTrigger", + "n8n-nodes-base.emailReadImap", + "n8n-nodes-base.rssFeedReadTrigger", + "n8n-nodes-base.evaluationTrigger", + "n8n-nodes-base.start", + "n8n-nodes-base.cron", + "n8n-nodes-base.interval", + "n8n-nodes-langchain.chatTrigger", + "n8n-nodes-langchain.mcpTrigger", + "flow-nodes-base.trigger", +}) + + +def _is_trigger_type(node_type: str) -> bool: + """Return True if the node type is treated as a trigger.""" + return node_type in _TRIGGER_TYPES + def _load_file(path: Path) -> dict: text = path.read_text() @@ -31,6 +64,128 @@ def _load_file(path: Path) -> dict: return json.loads(text) +def _validate_definition(doc: dict) -> list[str]: + """Validate a parsed pipeline definition, returning a list of errors. + + Checks match the server-side validations in pkg/engine/parser.go plus + common-sense structural rules. + """ + errors: list[str] = [] + + definition = doc.get("definition", doc) + + nodes = definition.get("nodes") + if not isinstance(nodes, list): + errors.append("'nodes' must be a non-empty array") + return errors + if len(nodes) == 0: + errors.append("'nodes' array is empty") + return errors + + seen_ids: set[str] = set() + seen_names: set[str] = set() + trigger_count = 0 + + for i, node in enumerate(nodes): + prefix = f"nodes[{i}]" + if not isinstance(node, dict): + errors.append(f"{prefix}: expected an object, got {type(node).__name__}") + continue + + node_id = node.get("id") + if not node_id or not isinstance(node_id, str): + errors.append(f"{prefix}: missing or invalid 'id' (must be a non-empty string)") + elif node_id in seen_ids: + errors.append(f"{prefix}: duplicate node id '{node_id}'") + else: + seen_ids.add(node_id) + + node_name = node.get("name") + if not node_name or not isinstance(node_name, str): + errors.append(f"{prefix}: missing or invalid 'name' (must be a non-empty string)") + elif node_name in seen_names: + errors.append(f"{prefix}: duplicate node name '{node_name}'") + else: + seen_names.add(node_name) + + node_type = node.get("type") + if not node_type or not isinstance(node_type, str): + errors.append(f"{prefix}: missing or invalid 'type' (must be a non-empty string)") + elif _is_trigger_type(node_type): + trigger_count += 1 + + pos = node.get("position") + if pos is not None and (not isinstance(pos, (list, tuple)) or len(pos) != 2): + errors.append(f"{prefix}: 'position' must be an array of 2 numbers") + + if trigger_count == 0: + errors.append("workflow must have exactly one trigger node (found 0)") + elif trigger_count > 1: + errors.append(f"workflow must have exactly one trigger node (found {trigger_count})") + + connections = definition.get("connections") + if connections is not None: + if not isinstance(connections, dict): + errors.append("'connections' must be an object") + else: + for source_name, conn_targets in connections.items(): + if source_name not in seen_names: + errors.append(f"connections: source node '{source_name}' not found in nodes") + continue + if not isinstance(conn_targets, dict): + errors.append(f"connections['{source_name}']: expected an object with 'main' key") + continue + main = conn_targets.get("main") + if not isinstance(main, list): + errors.append(f"connections['{source_name}']: 'main' must be an array") + continue + for output_idx, output_group in enumerate(main): + if not isinstance(output_group, list): + errors.append(f"connections['{source_name}'].main[{output_idx}]: expected an array of targets") + continue + for target_idx, target in enumerate(output_group): + if not isinstance(target, dict): + errors.append(f"connections['{source_name}'].main[{output_idx}][{target_idx}]: expected an object with 'node' field") + continue + target_node = target.get("node") + if not target_node or not isinstance(target_node, str): + errors.append(f"connections['{source_name}'].main[{output_idx}][{target_idx}]: missing or invalid 'node'") + elif target_node not in seen_names: + errors.append(f"connections['{source_name}'].main[{output_idx}][{target_idx}]: references unknown node '{target_node}'") + + return errors + + +@app.command("validate") +def validate_pipeline( + file: Path = typer.Argument(..., exists=True, readable=True, help="JSON/YAML file with the pipeline definition."), +) -> None: + """Validate a pipeline definition locally without contacting the server. + + Checks structure, required fields, duplicate names/ids, exactly one + trigger node, and that connections reference valid nodes. + """ + from ..utils import err_console + + try: + doc = _load_file(file) + except (json.JSONDecodeError, ValueError) as e: + die(f"invalid {file.suffix} file: {e}") + + if not isinstance(doc, dict): + die("file must contain a JSON/YAML object") + + errors = _validate_definition(doc) + if not errors: + console.print(f"[green]✓[/green] [bold]{file}[/bold] is valid") + return + + err_console.print(f"[red]✗[/red] [bold]{file}[/bold] has {len(errors)} error{'s' if len(errors) > 1 else ''}:") + for err in errors: + err_console.print(f" [red]·[/red] {err}") + raise typer.Exit(code=1) + + @app.command("list") def list_pipelines(output: str = typer.Option("table", "--output", "-o", help="table | json | yaml")) -> None: """List pipeline definitions.""" From f765aba313f3dfa2da9bbd2e5ecbbac4cb030c71 Mon Sep 17 00:00:00 2001 From: Mani Varshith Date: Mon, 25 May 2026 21:25:24 +0530 Subject: [PATCH 2/3] chore(cli): update pipelines help text to mention validate --- langship-cli/src/langship/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/langship-cli/src/langship/cli.py b/langship-cli/src/langship/cli.py index 2458405..a5b78f7 100644 --- a/langship-cli/src/langship/cli.py +++ b/langship-cli/src/langship/cli.py @@ -30,7 +30,7 @@ app.add_typer(agents_cmd.app, name="agents", help="Manage agents (repos + env subscriptions).") app.add_typer(envs_cmd.app, name="envs", help="Manage environments (named, ordered pipeline lists).") -app.add_typer(pipelines_cmd.app, name="pipelines", help="Manage pipeline definitions (push from file, dump).") +app.add_typer(pipelines_cmd.app, name="pipelines", help="Manage pipeline definitions (push, validate, list, get, delete).") app.add_typer(creds_cmd.app, name="creds", help="Manage the global credential pool (aws / gcp / kv).") app.add_typer(runs_cmd.app, name="runs", help="Inspect executions and stream logs.") From 253ad6313bed0f0506bcb01a7c1c1b1eb8506699 Mon Sep 17 00:00:00 2001 From: Mani Varshith Date: Mon, 25 May 2026 21:25:39 +0530 Subject: [PATCH 3/3] docs(cli): add validate to pipelines command listing --- langship-cli/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/langship-cli/README.md b/langship-cli/README.md index 6b76c2e..dab818a 100644 --- a/langship-cli/README.md +++ b/langship-cli/README.md @@ -91,6 +91,7 @@ langship envs reorder ... langship pipelines list [-o ...] langship pipelines get [-o json|yaml|table] langship pipelines push [--id ] [--name ...] +langship pipelines validate langship pipelines delete [-y] langship creds list [-o ...]