diff --git a/README.md b/README.md index f33ae2e..e63c1e7 100644 --- a/README.md +++ b/README.md @@ -46,37 +46,12 @@ make test `make dev` installs the project, syncs development dependencies, and sets up [`prek`](https://prek.j178.dev/) Git hooks. -## Run the Example Workflow +## Examples -The repository includes a minimal runnable example at -[`examples/graphon_openai_slim`](examples/graphon_openai_slim). +Runnable examples live under [`examples/`](examples/). -It builds and executes this workflow: - -```text -start -> llm -> output -``` - -To run it: - -```bash -make dev -source .venv/bin/activate -cd examples/graphon_openai_slim -cp .env.example .env -python3 workflow.py "Explain Graphon in one short sentence." -``` - -Before running the example, fill in the required values in `.env`. - -The example currently expects: - -- an `OPENAI_API_KEY` -- a `SLIM_PLUGIN_ID` -- a local `dify-plugin-daemon-slim` setup or equivalent Slim runtime - -For the exact environment variables and runtime notes, see -[examples/graphon_openai_slim/README.md](examples/graphon_openai_slim/README.md). +Each example is self-contained in its own subdirectory and includes its own +setup instructions, environment template, and `workflow.py` entrypoint. ## How Graphon Fits Together @@ -88,8 +63,8 @@ At a high level, Graphon usage looks like this: 4. Run `GraphEngine` and consume emitted graph events. 5. Read final outputs from runtime state. -The bundled example follows exactly that path. The execution loop is centered -around `GraphEngine.run()`: +The examples under [`examples/`](examples/) follow exactly that path. The +execution loop is centered around `GraphEngine.run()`: ```python engine = GraphEngine( @@ -103,10 +78,8 @@ for event in engine.run(): ... ``` -See -[examples/graphon_openai_slim/workflow.py](examples/graphon_openai_slim/workflow.py) -for the full example, including `SlimRuntime`, `SlimPreparedLLM`, graph -construction, input seeding, and streamed output handling. +See [`examples/`](examples/) for the current runnable workflows and their +example-specific setup notes. ## Project Layout @@ -126,8 +99,7 @@ construction, input seeding, and streamed output handling. ## Internal Docs - [CONTRIBUTING.md](CONTRIBUTING.md): contributor workflow, CI, commit/PR rules -- [examples/graphon_openai_slim/README.md](examples/graphon_openai_slim/README.md): - runnable example setup +- [examples/](examples/): runnable examples and per-example setup notes - [src/graphon/model_runtime/README.md](src/graphon/model_runtime/README.md): model runtime overview - [src/graphon/graph_engine/layers/README.md](src/graphon/graph_engine/layers/README.md): diff --git a/examples/graphon_openai_slim/README.md b/examples/graphon_openai_slim/README.md deleted file mode 100644 index 7c1a22d..0000000 --- a/examples/graphon_openai_slim/README.md +++ /dev/null @@ -1,63 +0,0 @@ -# Graphon OpenAI Slim Example - -This example runs a minimal Graphon workflow: - -`start -> llm -> output` - -It uses: - -- Graphon as the Python package import surface -- `dify-plugin-daemon-slim` as the local model runtime bridge -- the Dify OpenAI plugin package -- the `gpt-5.4` model - -## Files - -- `workflow.py`: runnable example script -- `.env.example`: template configuration -- `.env`: local configuration file for this example only - -## Run - -1. Change into this directory: - -```bash -cd examples/graphon_openai_slim -``` - -2. Copy the template: - -```bash -cp .env.example .env -``` - -3. Fill in the required values in `.env`. - -4. Run the example: - -```bash -python3 workflow.py -``` - -The CLI streams LLM text to stdout as chunks arrive. - -You can also pass a custom prompt: - -```bash -python3 workflow.py "Explain graph sparsity in one sentence." -``` - -## Notes - -- `workflow.py` first tries to import an installed `graphon` package. -- If `graphon` is not installed, it falls back to the local repository `src/` - directory automatically. That lets you run the example directly from this - checkout without setting `PYTHONPATH`. -- If your current interpreter is missing runtime dependencies but the repository - `.venv` exists, `workflow.py` will re-exec itself with that local virtualenv - interpreter automatically. -- Path-like variables in `.env` are resolved relative to this example - directory, not relative to your shell's current working directory. -- By default, `SLIM_PLUGIN_FOLDER` resolves to the repository-root - `.slim/plugins` cache. That keeps generated plugin files out of this example - directory while still letting you run `python3 workflow.py` from here. diff --git a/examples/graphon_openai_slim/__init__.py b/examples/graphon_openai_slim/__init__.py deleted file mode 100644 index 8f0f62d..0000000 --- a/examples/graphon_openai_slim/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""OpenAI Slim workflow example for Graphon.""" diff --git a/examples/graphon_openai_slim/workflow.py b/examples/graphon_openai_slim/workflow.py deleted file mode 100644 index d9146d8..0000000 --- a/examples/graphon_openai_slim/workflow.py +++ /dev/null @@ -1,427 +0,0 @@ -"""Minimal Graphon `start -> LLM -> output` workflow example for Slim. - -Run from this directory: - - python3 workflow.py "Explain Graphon in one short sentence." - -The script automatically loads `examples/graphon_openai_slim/.env`. -Existing environment variables take precedence over `.env` values. - -Required environment variables: -- `OPENAI_API_KEY` -- `SLIM_PLUGIN_ID` - -Optional environment variables: -- `SLIM_BINARY_PATH` points at a custom `dify-plugin-daemon-slim` binary -- `SLIM_PROVIDER` defaults to `openai` -- `SLIM_PLUGIN_FOLDER` defaults to the repository `.slim/plugins` cache -- `SLIM_PLUGIN_ROOT` points at an already unpacked local plugin directory -""" - -from __future__ import annotations - -import argparse -import importlib.util -import os -import sys -import time -from collections.abc import Sequence -from pathlib import Path -from typing import IO - -EXAMPLE_DIR = Path(__file__).resolve().parent -REPO_ROOT = EXAMPLE_DIR.parents[1] -LOCAL_SRC_DIR = REPO_ROOT / "src" -LOCAL_VENV_PYTHON = REPO_ROOT / ".venv" / "bin" / "python" -DEFAULT_ENV_FILE = EXAMPLE_DIR / ".env" -BOOTSTRAP_ENV_VAR = "GRAPHON_EXAMPLE_BOOTSTRAPPED" -RUNTIME_MODULES = ("pydantic", "httpx", "yaml") -MIN_QUOTED_VALUE_LENGTH = 2 - - -def bootstrap_local_python() -> None: - if os.environ.get(BOOTSTRAP_ENV_VAR) == "1": - return - if all(importlib.util.find_spec(module) is not None for module in RUNTIME_MODULES): - return - if not LOCAL_VENV_PYTHON.is_file(): - return - - env = dict(os.environ) - env[BOOTSTRAP_ENV_VAR] = "1" - os.execve( # noqa: S606 - str(LOCAL_VENV_PYTHON), - [str(LOCAL_VENV_PYTHON), str(Path(__file__).resolve()), *sys.argv[1:]], - env, - ) - - -bootstrap_local_python() - -if importlib.util.find_spec("graphon") is None and str(LOCAL_SRC_DIR) not in sys.path: - sys.path.insert(0, str(LOCAL_SRC_DIR)) - -# ruff: noqa: E402 -from graphon.entities.graph_init_params import GraphInitParams -from graphon.file.enums import FileType -from graphon.file.models import File -from graphon.graph.graph import Graph -from graphon.graph_engine.command_channels import InMemoryChannel -from graphon.graph_engine.graph_engine import GraphEngine -from graphon.graph_events.node import NodeRunStreamChunkEvent -from graphon.model_runtime.entities.llm_entities import LLMMode -from graphon.model_runtime.entities.message_entities import ( - PromptMessage, - PromptMessageRole, -) -from graphon.model_runtime.slim import ( - SlimConfig, - SlimLocalSettings, - SlimPreparedLLM, - SlimProviderBinding, - SlimRuntime, -) -from graphon.nodes.answer.answer_node import AnswerNode -from graphon.nodes.answer.entities import AnswerNodeData -from graphon.nodes.llm import ( - LLMNode, - LLMNodeChatModelMessage, - LLMNodeData, - ModelConfig, -) -from graphon.nodes.llm.entities import ContextConfig -from graphon.nodes.start import StartNode -from graphon.nodes.start.entities import StartNodeData -from graphon.runtime.graph_runtime_state import GraphRuntimeState -from graphon.runtime.variable_pool import VariablePool -from graphon.variables.input_entities import VariableEntity, VariableEntityType - -ALLOWED_ENV_VARS: dict[str, str] = { - "OPENAI_API_KEY": "", - "SLIM_PLUGIN_ID": "", - "SLIM_BINARY_PATH": "", - "SLIM_PROVIDER": "openai", - "SLIM_PLUGIN_FOLDER": "../../.slim/plugins", - "SLIM_PLUGIN_ROOT": "", -} -PATH_ENV_VARS = { - "SLIM_BINARY_PATH", - "SLIM_PLUGIN_FOLDER", - "SLIM_PLUGIN_ROOT", -} -STREAM_SELECTOR = ("llm", "text") - - -def load_default_env_file() -> None: - if DEFAULT_ENV_FILE.is_file(): - load_env_file(DEFAULT_ENV_FILE) - - -def load_env_file(path: Path) -> None: - env_dir = path.resolve().parent - for line_number, raw_line in enumerate( - path.read_text(encoding="utf-8").splitlines(), - start=1, - ): - line = raw_line.strip() - if not line or line.startswith("#"): - continue - if line.startswith("export "): - line = line.removeprefix("export ").strip() - if "=" not in line: - msg = f"Invalid .env line {line_number} in {path}: {raw_line}" - raise ValueError(msg) - - key, value = line.split("=", 1) - key = key.strip() - if not key: - msg = f"Invalid .env key on line {line_number} in {path}" - raise ValueError(msg) - if key not in ALLOWED_ENV_VARS: - msg = f"Unsupported .env key {key!r} on line {line_number} in {path}" - raise ValueError(msg) - - os.environ.setdefault( - key, - normalize_env_value( - key, - strip_optional_quotes(value.strip()), - base_dir=env_dir, - ), - ) - - -def strip_optional_quotes(value: str) -> str: - if ( - len(value) >= MIN_QUOTED_VALUE_LENGTH - and value[0] == value[-1] - and value[0] in {'"', "'"} - ): - return value[1:-1] - return value - - -def normalize_env_value(name: str, value: str, *, base_dir: Path) -> str: - if name not in PATH_ENV_VARS or not value: - return value - - path_value = Path(value).expanduser() - if not path_value.is_absolute(): - path_value = (base_dir / path_value).resolve() - else: - path_value = path_value.resolve() - return str(path_value) - - -class PassthroughPromptMessageSerializer: - def serialize( - self, - *, - model_mode: LLMMode, - prompt_messages: Sequence[PromptMessage], - ) -> object: - _ = model_mode - return list(prompt_messages) - - -class TextOnlyFileSaver: - def save_binary_string( - self, - data: bytes, - mime_type: str, - file_type: FileType, - extension_override: str | None = None, - ) -> File: - _ = data, mime_type, file_type, extension_override - msg = "This example only supports text responses." - raise RuntimeError(msg) - - def save_remote_url(self, url: str, file_type: FileType) -> File: - _ = url, file_type - msg = "This example only supports text responses." - raise RuntimeError(msg) - - -def require_env(name: str) -> str: - value = env_value(name) - if value: - return value - msg = f"{name} is required." - raise ValueError(msg) - - -def env_value(name: str) -> str: - raw_value = os.environ.get(name) - if raw_value is not None: - return raw_value.strip() - return normalize_env_value( - name, - ALLOWED_ENV_VARS[name], - base_dir=EXAMPLE_DIR, - ).strip() - - -def optional_path(name: str) -> Path | None: - value = env_value(name) - return Path(value).expanduser() if value else None - - -def build_runtime() -> tuple[SlimRuntime, str]: - provider = env_value("SLIM_PROVIDER") - plugin_folder = Path(env_value("SLIM_PLUGIN_FOLDER")).expanduser() - plugin_root = optional_path("SLIM_PLUGIN_ROOT") - - runtime = SlimRuntime( - SlimConfig( - bindings=[ - SlimProviderBinding( - plugin_id=require_env("SLIM_PLUGIN_ID"), - provider=provider, - plugin_root=plugin_root, - ), - ], - local=SlimLocalSettings(folder=plugin_folder), - ), - ) - return runtime, provider - - -def build_graph( - *, - provider: str, - prepared_llm: SlimPreparedLLM, - graph_init_params: GraphInitParams, - graph_runtime_state: GraphRuntimeState, -) -> Graph: - start_node = StartNode( - node_id="start", - config={ - "id": "start", - "data": StartNodeData( - title="Start", - variables=[ - VariableEntity( - variable="query", - label="Query", - type=VariableEntityType.PARAGRAPH, - required=True, - ), - ], - ), - }, - graph_init_params=graph_init_params, - graph_runtime_state=graph_runtime_state, - ) - - llm_node = LLMNode( - node_id="llm", - config={ - "id": "llm", - "data": LLMNodeData( - title="LLM", - model=ModelConfig( - provider=provider, - name="gpt-5.4", - mode=LLMMode.CHAT, - ), - prompt_template=[ - LLMNodeChatModelMessage( - role=PromptMessageRole.SYSTEM, - text="You are a concise assistant.", - ), - LLMNodeChatModelMessage( - role=PromptMessageRole.USER, - text="{{#start.query#}}", - ), - ], - context=ContextConfig(enabled=False), - ), - }, - graph_init_params=graph_init_params, - graph_runtime_state=graph_runtime_state, - model_instance=prepared_llm, - llm_file_saver=TextOnlyFileSaver(), - prompt_message_serializer=PassthroughPromptMessageSerializer(), - ) - - output_node = AnswerNode( - node_id="output", - config={ - "id": "output", - "data": AnswerNodeData( - title="Output", - answer="{{#llm.text#}}", - ), - }, - graph_init_params=graph_init_params, - graph_runtime_state=graph_runtime_state, - ) - - return ( - Graph - .new() - .add_root(start_node) - .add_node(llm_node) - .add_node(output_node) - .build() - ) - - -def write_stream_chunk(event: object, *, stream_output: IO[str]) -> bool: - if not isinstance(event, NodeRunStreamChunkEvent): - return False - if tuple(event.selector) != STREAM_SELECTOR or not event.chunk: - return False - - stream_output.write(event.chunk) - stream_output.flush() - return True - - -def _execute_workflow( - query: str, - *, - stream_output: IO[str] | None = None, -) -> tuple[str, bool]: - load_default_env_file() - runtime, provider = build_runtime() - workflow_id = "example-start-llm-output" - graph_init_params = GraphInitParams( - workflow_id=workflow_id, - graph_config={"nodes": [], "edges": []}, - run_context={}, - call_depth=0, - ) - graph_runtime_state = GraphRuntimeState( - variable_pool=VariablePool(), - start_at=time.time(), - ) - graph_runtime_state.variable_pool.add(("start", "query"), query) - - prepared_llm = SlimPreparedLLM( - runtime=runtime, - provider=provider, - model_name="gpt-5.4", - credentials={"openai_api_key": require_env("OPENAI_API_KEY")}, - parameters={}, - ) - graph = build_graph( - provider=provider, - prepared_llm=prepared_llm, - graph_init_params=graph_init_params, - graph_runtime_state=graph_runtime_state, - ) - engine = GraphEngine( - workflow_id=workflow_id, - graph=graph, - graph_runtime_state=graph_runtime_state, - command_channel=InMemoryChannel(), - ) - - streamed = False - for event in engine.run(): - if stream_output is not None and write_stream_chunk( - event, - stream_output=stream_output, - ): - streamed = True - - answer = graph_runtime_state.get_output("answer") - if not isinstance(answer, str): - msg = "Workflow did not produce a text answer." - raise TypeError(msg) - if stream_output is not None and streamed and not answer.endswith("\n"): - stream_output.write("\n") - stream_output.flush() - return answer, streamed - - -def run_workflow(query: str) -> str: - answer, _ = _execute_workflow(query) - return answer - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Run a minimal start -> LLM -> output workflow with Slim.", - ) - parser.add_argument( - "query", - nargs="?", - default="Explain Graphon in one short sentence.", - help="User input passed into the Start node.", - ) - return parser.parse_args() - - -def main() -> int: - args = parse_args() - answer, streamed = _execute_workflow(args.query, stream_output=sys.stdout) - if not streamed: - sys.stdout.write(f"{answer}\n") - sys.stdout.flush() - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/examples/graphon_openai_slim/.env.example b/examples/openai_slim_minimal/.env.example similarity index 64% rename from examples/graphon_openai_slim/.env.example rename to examples/openai_slim_minimal/.env.example index 8d7719e..8cfe5c6 100644 --- a/examples/graphon_openai_slim/.env.example +++ b/examples/openai_slim_minimal/.env.example @@ -1,7 +1,7 @@ -# Example configuration for `examples/graphon_openai_slim/workflow.py`. +# Example configuration for `examples/openai_slim_minimal/workflow.py`. # -# The example loads `examples/graphon_openai_slim/.env` automatically. Copy this file to `.env` -# in the same directory and fill in the required values. +# The example loads `.env` from this directory automatically. Copy this file to +# `.env` in the same directory and fill in the required values. # Required: OpenAI API key used by the OpenAI Slim plugin. OPENAI_API_KEY= @@ -13,17 +13,16 @@ OPENAI_API_KEY= SLIM_PLUGIN_ID=langgenius/openai:0.3.0@99770a45f77910fe0f64c985524f4fe2294fc6ea25cbf1053ba6bddd7604d850 # Optional: path to the local `dify-plugin-daemon-slim` binary. -# If empty, Graphon will look for `dify-plugin-daemon-slim` in `PATH`. -SLIM_BINARY_PATH= +# Recommended Unix default: a user-local install under `~/.local/bin`. +SLIM_BINARY_PATH=~/.local/bin/dify-plugin-daemon-slim # Optional: provider name inside the plugin package. # For this example we only support OpenAI, so this should stay `openai`. SLIM_PROVIDER=openai # Optional: local folder where Slim stores downloaded/extracted plugins. -# The default points at the repository-root `.slim/plugins` cache so this -# example directory does not accumulate generated plugin code. -SLIM_PLUGIN_FOLDER=../../.slim/plugins +# Recommended Unix default: a user-local plugin cache under `~/.local/share`. +SLIM_PLUGIN_FOLDER=~/.local/share/graphon/slim/plugins # Optional: path to an already unpacked local plugin directory. # If set, Slim uses this directory directly and skips marketplace download. diff --git a/examples/openai_slim_minimal/README.md b/examples/openai_slim_minimal/README.md new file mode 100644 index 0000000..51c8cf4 --- /dev/null +++ b/examples/openai_slim_minimal/README.md @@ -0,0 +1,29 @@ +# OpenAI Slim Minimal Example + +A tiny Graphon workflow: + +`start -> llm -> output` + +## What You Need + +- `workflow.py`: runnable example +- `.env.example`: template settings +- `.env`: your local copy of the template + +## Run + +```bash +cd examples/openai_slim_minimal +cp .env.example .env +python3 workflow.py +``` + +Fill in `.env` before running. The script reads `.env` from this directory. + +## Custom Prompt + +```bash +python3 workflow.py "Explain graph sparsity in one sentence." +``` + +The example streams text to stdout as it arrives. If nothing is streamed, it prints the final answer at the end. diff --git a/examples/openai_slim_minimal/__init__.py b/examples/openai_slim_minimal/__init__.py new file mode 100644 index 0000000..3c2c522 --- /dev/null +++ b/examples/openai_slim_minimal/__init__.py @@ -0,0 +1 @@ +"""Minimal OpenAI Slim workflow example for Graphon.""" diff --git a/examples/openai_slim_minimal/workflow.py b/examples/openai_slim_minimal/workflow.py new file mode 100644 index 0000000..64011e2 --- /dev/null +++ b/examples/openai_slim_minimal/workflow.py @@ -0,0 +1,190 @@ +"""Minimal Graphon `start -> LLM -> output` workflow example for Slim. + +Run from this directory: + + python3 workflow.py "Explain Graphon in one short sentence." + +The script automatically loads `.env` from this example directory. +Existing environment variables take precedence over `.env` values. +""" + +from __future__ import annotations + +import argparse +import sys +import time +from pathlib import Path +from typing import IO + +EXAMPLE_FILE = Path(__file__).resolve() +EXAMPLE_DIR = EXAMPLE_FILE.parent +REPO_ROOT = EXAMPLE_FILE.parents[2] + +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from examples import openai_slim_support + +openai_slim_support.prepare_example_imports(EXAMPLE_FILE) + +# ruff: noqa: E402 +from graphon.graph_engine.command_channels import InMemoryChannel +from graphon.graph_engine.graph_engine import GraphEngine +from graphon.graph_events.node import NodeRunStreamChunkEvent +from graphon.model_runtime.entities.llm_entities import LLMMode +from graphon.model_runtime.slim import SlimPreparedLLM, SlimRuntime +from graphon.nodes.answer.entities import AnswerNodeData +from graphon.nodes.llm import LLMNodeData, ModelConfig +from graphon.nodes.start.entities import StartNodeData +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.runtime.variable_pool import VariablePool +from graphon.workflow_builder import ( + WorkflowBuilder, + WorkflowRuntime, + WorkflowSpec, + paragraph_input, + system, + template, + user, +) + +STREAM_SELECTOR = ("llm", "text") + + +def build_runtime() -> tuple[SlimRuntime, str]: + runtime, provider = openai_slim_support.build_runtime(example_dir=EXAMPLE_DIR) + return runtime, provider + + +def build_workflow(*, provider: str) -> WorkflowSpec: + workflow = WorkflowBuilder() + start = workflow.root( + "start", + StartNodeData( + title="Start", + variables=[paragraph_input("query", required=True)], + ), + ) + llm = start.then( + "llm", + LLMNodeData( + title="LLM", + model=ModelConfig( + provider=provider, + name="gpt-5.4", + mode=LLMMode.CHAT, + ), + prompt_template=[ + system("You are a concise assistant."), + user(start.ref("query")), + ], + ), + ) + llm.then( + "output", + AnswerNodeData( + title="Output", + answer=template(llm.ref("text")), + ), + ) + return workflow.build() + + +def write_stream_chunk(event: object, *, stream_output: IO[str]) -> bool: + if not isinstance(event, NodeRunStreamChunkEvent): + return False + if tuple(event.selector) != STREAM_SELECTOR or not event.chunk: + return False + + stream_output.write(event.chunk) + stream_output.flush() + return True + + +def _execute_workflow( + query: str, + *, + stream_output: IO[str] | None = None, +) -> tuple[str, bool]: + openai_slim_support.load_default_env_file(EXAMPLE_DIR) + runtime, provider = build_runtime() + workflow_id = "example-start-llm-output" + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(), + start_at=time.time(), + ) + graph_runtime_state.variable_pool.add(("start", "query"), query) + + prepared_llm = SlimPreparedLLM( + runtime=runtime, + provider=provider, + model_name="gpt-5.4", + credentials={ + "openai_api_key": openai_slim_support.require_env( + "OPENAI_API_KEY", + example_dir=EXAMPLE_DIR, + ), + }, + parameters={}, + ) + graph = build_workflow(provider=provider).materialize( + WorkflowRuntime( + workflow_id=workflow_id, + graph_runtime_state=graph_runtime_state, + prepared_llm=prepared_llm, + ), + ) + engine = GraphEngine( + workflow_id=workflow_id, + graph=graph, + graph_runtime_state=graph_runtime_state, + command_channel=InMemoryChannel(), + ) + + streamed = False + for event in engine.run(): + if stream_output is not None and write_stream_chunk( + event, + stream_output=stream_output, + ): + streamed = True + + answer = graph_runtime_state.get_output("answer") + if not isinstance(answer, str): + msg = "Workflow did not produce a text answer." + raise TypeError(msg) + if stream_output is not None and streamed and not answer.endswith("\n"): + stream_output.write("\n") + stream_output.flush() + return answer, streamed + + +def run_workflow(query: str) -> str: + answer, _ = _execute_workflow(query) + return answer + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run a minimal start -> LLM -> output workflow with Slim.", + ) + parser.add_argument( + "query", + nargs="?", + default="Explain Graphon in one short sentence.", + help="User input passed into the Start node.", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + answer, streamed = _execute_workflow(args.query, stream_output=sys.stdout) + if not streamed: + sys.stdout.write(f"{answer}\n") + sys.stdout.flush() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/openai_slim_parallel_translation/.env.example b/examples/openai_slim_parallel_translation/.env.example new file mode 100644 index 0000000..cea2c58 --- /dev/null +++ b/examples/openai_slim_parallel_translation/.env.example @@ -0,0 +1,31 @@ +# Example configuration for +# `examples/openai_slim_parallel_translation/workflow.py`. +# +# The example loads `.env` from this directory automatically. Copy this file to +# `.env` in the same directory and fill in the required values. + +# Required: OpenAI API key used by the OpenAI Slim plugin. +OPENAI_API_KEY= + +# Required: Dify marketplace plugin unique identifier. +# This must match the exact OpenAI plugin package/version you want Slim to use. +# Example format: +# publisher/plugin:version@digest +SLIM_PLUGIN_ID=langgenius/openai:0.3.0@99770a45f77910fe0f64c985524f4fe2294fc6ea25cbf1053ba6bddd7604d850 + +# Optional: path to the local `dify-plugin-daemon-slim` binary. +# Recommended Unix default: a user-local install under `~/.local/bin`. +SLIM_BINARY_PATH=~/.local/bin/dify-plugin-daemon-slim + +# Optional: provider name inside the plugin package. +# For this example we only support OpenAI, so this should stay `openai`. +SLIM_PROVIDER=openai + +# Optional: local folder where Slim stores downloaded/extracted plugins. +# Recommended Unix default: a user-local plugin cache under `~/.local/share`. +SLIM_PLUGIN_FOLDER=~/.local/share/graphon/slim/plugins + +# Optional: path to an already unpacked local plugin directory. +# If set, Slim uses this directory directly and skips marketplace download. +# Leave empty in the normal case. +SLIM_PLUGIN_ROOT= diff --git a/examples/openai_slim_parallel_translation/README.md b/examples/openai_slim_parallel_translation/README.md new file mode 100644 index 0000000..f9b3a3b --- /dev/null +++ b/examples/openai_slim_parallel_translation/README.md @@ -0,0 +1,34 @@ +# OpenAI Slim Parallel Translation Example + +A fan-out / fan-in workflow: + +`start -> 3 llm -> end` + +The `start` node takes `content`. Three LLM nodes translate it into Chinese, +English, and Japanese in parallel. The `end` node returns all three +translations. + +## What You Need + +- `workflow.py`: runnable example +- `.env.example`: template settings +- `.env`: your local copy of the template + +## Run + +```bash +cd examples/openai_slim_parallel_translation +cp .env.example .env +python3 workflow.py "Graph execution is a coordination problem." +``` + +Fill in `.env` before running. The script reads `.env` from this directory. + +## Useful Flags + +```bash +python3 workflow.py --no-stream "Graph execution is a coordination problem." +``` + +By default, the example streams each translation as it becomes available, then +prints the final structured outputs. diff --git a/examples/openai_slim_parallel_translation/__init__.py b/examples/openai_slim_parallel_translation/__init__.py new file mode 100644 index 0000000..5fa2b7c --- /dev/null +++ b/examples/openai_slim_parallel_translation/__init__.py @@ -0,0 +1 @@ +"""Parallel translation OpenAI Slim workflow example for Graphon.""" diff --git a/examples/openai_slim_parallel_translation/workflow.py b/examples/openai_slim_parallel_translation/workflow.py new file mode 100644 index 0000000..2f129c9 --- /dev/null +++ b/examples/openai_slim_parallel_translation/workflow.py @@ -0,0 +1,278 @@ +"""Parallel translation workflow built with the sequential WorkflowBuilder API. + +Run from this directory: + + python3 workflow.py \ + "Graph execution is a coordination problem." + +The script automatically loads `.env` from this example directory. +""" + +from __future__ import annotations + +import argparse +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import IO + +EXAMPLE_FILE = Path(__file__).resolve() +EXAMPLE_DIR = EXAMPLE_FILE.parent +REPO_ROOT = EXAMPLE_FILE.parents[2] + +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from examples import openai_slim_support + +openai_slim_support.prepare_example_imports(EXAMPLE_FILE) + +# ruff: noqa: E402 +from graphon.graph.graph import Graph +from graphon.graph_engine.command_channels import InMemoryChannel +from graphon.graph_engine.graph_engine import GraphEngine +from graphon.graph_events.node import NodeRunStreamChunkEvent +from graphon.model_runtime.entities.llm_entities import LLMMode +from graphon.model_runtime.slim import SlimPreparedLLM +from graphon.nodes.end.entities import EndNodeData +from graphon.nodes.llm import LLMNodeData, ModelConfig +from graphon.nodes.start.entities import StartNodeData +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.runtime.variable_pool import VariablePool +from graphon.workflow_builder import ( + WorkflowBuilder, + WorkflowRuntime, + WorkflowSpec, + paragraph_input, + system, + user, +) + +TARGET_LANGUAGES: tuple[tuple[str, str, str], ...] = ( + ("translate_zh", "Chinese", "chinese"), + ("translate_en", "English", "english"), + ("translate_ja", "Japanese", "japanese"), +) +STREAM_LABEL_BY_SELECTOR = { + (node_id, "text"): language_name + for node_id, language_name, _output_name in TARGET_LANGUAGES +} + + +@dataclass(slots=True) +class TranslationStreamWriter: + stream_output: IO[str] + seen_selectors: set[tuple[str, str]] = field(default_factory=set) + active_selector: tuple[str, str] | None = None + + def write_event(self, event: object) -> bool: + if not isinstance(event, NodeRunStreamChunkEvent): + return False + + selector = tuple(event.selector) + label = STREAM_LABEL_BY_SELECTOR.get(selector) + if label is None: + return False + + if selector not in self.seen_selectors: + self.stream_output.write(f"{label}: ") + self.seen_selectors.add(selector) + self.active_selector = selector + elif self.active_selector is None: + self.active_selector = selector + + if event.chunk: + self.stream_output.write(event.chunk) + if event.is_final: + self.stream_output.write("\n") + self.active_selector = None + + self.stream_output.flush() + return bool(event.chunk) or event.is_final + + +def build_graph( + *, + provider: str, + prepared_llm: SlimPreparedLLM, + workflow_id: str, + graph_runtime_state: GraphRuntimeState, +) -> Graph: + workflow = build_workflow(provider=provider) + return workflow.materialize( + WorkflowRuntime( + workflow_id=workflow_id, + graph_runtime_state=graph_runtime_state, + prepared_llm=prepared_llm, + ), + ) + + +def build_workflow(*, provider: str) -> WorkflowSpec: + workflow = WorkflowBuilder() + + start = workflow.root( + "start", + StartNodeData( + title="Start", + variables=[paragraph_input("content", required=True)], + ), + ) + + model = ModelConfig( + provider=provider, + name="gpt-5.4", + mode=LLMMode.CHAT, + ) + + translation_nodes = [] + for node_id, language_name, _output_name in TARGET_LANGUAGES: + translation_nodes.append( + start.then( + node_id, + LLMNodeData( + title=f"Translate to {language_name}", + model=model, + prompt_template=[ + system( + "Translate the following content to ", + language_name, + ". Return only the translated text.", + ), + user(start.ref("content")), + ], + ), + ), + ) + + output = translation_nodes[0].then( + "output", + EndNodeData( + title="Output", + outputs=[ + node.ref("text").output(output_name) + for node, (_, _, output_name) in zip( + translation_nodes, + TARGET_LANGUAGES, + strict=True, + ) + ], + ), + ) + + for node in translation_nodes[1:]: + node.connect(output) + + return workflow.build() + + +def write_stream_chunk( + event: object, + *, + stream_writer: TranslationStreamWriter, +) -> bool: + return stream_writer.write_event(event) + + +def _execute_workflow( + content: str, + *, + stream_output: IO[str] | None = None, +) -> tuple[dict[str, str], bool]: + openai_slim_support.load_default_env_file(EXAMPLE_DIR) + runtime, provider = openai_slim_support.build_runtime(example_dir=EXAMPLE_DIR) + workflow_id = "parallel-translation-workflow" + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(), + start_at=time.time(), + ) + graph_runtime_state.variable_pool.add(("start", "content"), content) + + prepared_llm = SlimPreparedLLM( + runtime=runtime, + provider=provider, + model_name="gpt-5.4", + credentials={ + "openai_api_key": openai_slim_support.require_env( + "OPENAI_API_KEY", + example_dir=EXAMPLE_DIR, + ), + }, + parameters={}, + ) + + graph = build_graph( + provider=provider, + prepared_llm=prepared_llm, + workflow_id=workflow_id, + graph_runtime_state=graph_runtime_state, + ) + engine = GraphEngine( + workflow_id=workflow_id, + graph=graph, + graph_runtime_state=graph_runtime_state, + command_channel=InMemoryChannel(), + ) + + streamed = False + stream_writer = ( + TranslationStreamWriter(stream_output=stream_output) + if stream_output is not None + else None + ) + for event in engine.run(): + if stream_writer is not None and write_stream_chunk( + event, + stream_writer=stream_writer, + ): + streamed = True + + outputs: dict[str, str] = {} + for _node_id, _language_name, output_name in TARGET_LANGUAGES: + value = graph_runtime_state.get_output(output_name) + if not isinstance(value, str): + msg = f"Workflow did not produce output {output_name!r}." + raise TypeError(msg) + outputs[output_name] = value + + return outputs, streamed + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run a parallel translation workflow built with WorkflowBuilder.", + ) + parser.add_argument( + "content", + nargs="?", + default="Graph execution is a coordination problem.", + help="Input content to translate.", + ) + parser.add_argument( + "--stream", + action=argparse.BooleanOptionalAction, + default=True, + help="Stream translations as they are produced.", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + stream_output = sys.stdout if args.stream else None + if args.stream: + sys.stdout.write("Streaming translations:\n") + sys.stdout.flush() + outputs, streamed = _execute_workflow(args.content, stream_output=stream_output) + if args.stream and streamed: + sys.stdout.write("\n") + sys.stdout.write("Structured outputs:\n") + for _node_id, language_name, output_name in TARGET_LANGUAGES: + sys.stdout.write(f"- {language_name}: {outputs[output_name]}\n") + sys.stdout.flush() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/examples/openai_slim_support.py b/examples/openai_slim_support.py new file mode 100644 index 0000000..69f7c8c --- /dev/null +++ b/examples/openai_slim_support.py @@ -0,0 +1,227 @@ +"""Shared bootstrap, environment, and Slim runtime helpers for examples.""" + +from __future__ import annotations + +import importlib +import importlib.util +import os +import sys +from collections.abc import Sequence +from pathlib import Path + +ALLOWED_ENV_VARS: dict[str, str] = { + "OPENAI_API_KEY": "", + "SLIM_PLUGIN_ID": "", + "SLIM_BINARY_PATH": "", + "SLIM_PROVIDER": "openai", + "SLIM_PLUGIN_FOLDER": "../../.slim/plugins", + "SLIM_PLUGIN_ROOT": "", +} +PATH_ENV_VARS = { + "SLIM_BINARY_PATH", + "SLIM_PLUGIN_FOLDER", + "SLIM_PLUGIN_ROOT", +} +BOOTSTRAP_ENV_VAR = "GRAPHON_EXAMPLE_BOOTSTRAPPED" +RUNTIME_MODULES = ("pydantic", "httpx", "yaml") +MIN_QUOTED_VALUE_LENGTH = 2 + + +def repo_root_for(example_file: Path) -> Path: + return example_file.resolve().parents[2] + + +def local_src_dir_for(example_file: Path) -> Path: + return repo_root_for(example_file) / "src" + + +def local_venv_python_for(example_file: Path) -> Path: + return repo_root_for(example_file) / ".venv" / "bin" / "python" + + +def prepare_example_imports( + example_file: Path, + *, + argv: Sequence[str] | None = None, +) -> None: + bootstrap_local_python(example_file, argv=argv) + ensure_local_src_on_path(example_file) + + +def bootstrap_local_python( + example_file: Path, + *, + argv: Sequence[str] | None = None, +) -> None: + if os.environ.get(BOOTSTRAP_ENV_VAR) == "1": + return + if all(importlib.util.find_spec(module) is not None for module in RUNTIME_MODULES): + return + + local_python = local_venv_python_for(example_file) + if not local_python.is_file(): + return + + env = dict(os.environ) + env[BOOTSTRAP_ENV_VAR] = "1" + os.execve( # noqa: S606 + str(local_python), + [ + str(local_python), + str(example_file.resolve()), + *(argv if argv is not None else sys.argv[1:]), + ], + env, + ) + + +def ensure_local_src_on_path(example_file: Path) -> None: + local_src_dir = local_src_dir_for(example_file) + if ( + importlib.util.find_spec("graphon") is None + and str(local_src_dir) not in sys.path + ): + sys.path.insert(0, str(local_src_dir)) + + +def load_default_env_file(example_dir: Path) -> None: + env_file = example_dir / ".env" + if env_file.is_file(): + load_env_file(env_file) + + +def load_env_file(path: Path) -> None: + env_dir = path.resolve().parent + for line_number, raw_line in enumerate( + path.read_text(encoding="utf-8").splitlines(), + start=1, + ): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if line.startswith("export "): + line = line.removeprefix("export ").strip() + if "=" not in line: + msg = f"Invalid .env line {line_number} in {path}: {raw_line}" + raise ValueError(msg) + + key, value = line.split("=", 1) + key = key.strip() + if not key: + msg = f"Invalid .env key on line {line_number} in {path}" + raise ValueError(msg) + if key not in ALLOWED_ENV_VARS: + msg = f"Unsupported .env key {key!r} on line {line_number} in {path}" + raise ValueError(msg) + + os.environ.setdefault( + key, + normalize_env_value( + key, + strip_optional_quotes(value.strip()), + base_dir=env_dir, + ), + ) + + +def strip_optional_quotes(value: str) -> str: + if ( + len(value) >= MIN_QUOTED_VALUE_LENGTH + and value[0] == value[-1] + and value[0] in {'"', "'"} + ): + return value[1:-1] + return value + + +def normalize_env_value(name: str, value: str, *, base_dir: Path) -> str: + if name not in PATH_ENV_VARS or not value: + return value + + path_value = Path(value).expanduser() + if not path_value.is_absolute(): + path_value = (base_dir / path_value).resolve() + else: + path_value = path_value.resolve() + return str(path_value) + + +def env_value(name: str, *, example_dir: Path) -> str: + raw_value = os.environ.get(name) + if raw_value is not None: + return raw_value.strip() + return normalize_env_value( + name, + ALLOWED_ENV_VARS[name], + base_dir=example_dir, + ).strip() + + +def require_env(name: str, *, example_dir: Path) -> str: + value = env_value(name, example_dir=example_dir) + if value: + return value + msg = f"{name} is required." + raise ValueError(msg) + + +def optional_path(name: str, *, example_dir: Path) -> Path | None: + value = env_value(name, example_dir=example_dir) + return Path(value).expanduser() if value else None + + +def build_runtime(*, example_dir: Path) -> tuple[object, str]: + slim_module = importlib.import_module("graphon.model_runtime.slim") + slim_config = slim_module.SlimConfig + slim_local_settings = slim_module.SlimLocalSettings + slim_provider_binding = slim_module.SlimProviderBinding + slim_runtime = slim_module.SlimRuntime + + provider = env_value("SLIM_PROVIDER", example_dir=example_dir) + plugin_folder = Path( + env_value("SLIM_PLUGIN_FOLDER", example_dir=example_dir), + ).expanduser() + plugin_root = optional_path("SLIM_PLUGIN_ROOT", example_dir=example_dir) + + runtime = slim_runtime( + slim_config( + bindings=[ + slim_provider_binding( + plugin_id=require_env("SLIM_PLUGIN_ID", example_dir=example_dir), + provider=provider, + plugin_root=plugin_root, + ), + ], + local=slim_local_settings(folder=plugin_folder), + ), + ) + return runtime, provider + + +class PassthroughPromptMessageSerializer: + def serialize( + self, + *, + model_mode: object, + prompt_messages: Sequence[object], + ) -> object: + _ = model_mode + return list(prompt_messages) + + +class TextOnlyFileSaver: + def save_binary_string( + self, + data: bytes, + mime_type: str, + file_type: object, + extension_override: str | None = None, + ) -> object: + _ = data, mime_type, file_type, extension_override + msg = "This example only supports text responses." + raise RuntimeError(msg) + + def save_remote_url(self, url: str, file_type: object) -> object: + _ = url, file_type + msg = "This example only supports text responses." + raise RuntimeError(msg) diff --git a/src/graphon/nodes/llm/entities.py b/src/graphon/nodes/llm/entities.py index f0fc6a2..473c171 100644 --- a/src/graphon/nodes/llm/entities.py +++ b/src/graphon/nodes/llm/entities.py @@ -23,7 +23,7 @@ class ModelConfig(BaseModel): class ContextConfig(BaseModel): - enabled: bool + enabled: bool = False variable_selector: list[str] | None = None @@ -72,7 +72,7 @@ class LLMNodeData(BaseNodeData): ) prompt_config: PromptConfig = Field(default_factory=PromptConfig) memory: MemoryConfig | None = None - context: ContextConfig + context: ContextConfig = Field(default_factory=ContextConfig) vision: VisionConfig = Field(default_factory=VisionConfig) structured_output: Mapping[str, Any] | None = None # We used 'structured_output_enabled' in the past, but it's not a good name. diff --git a/src/graphon/workflow_builder.py b/src/graphon/workflow_builder.py new file mode 100644 index 0000000..11316e3 --- /dev/null +++ b/src/graphon/workflow_builder.py @@ -0,0 +1,630 @@ +from __future__ import annotations + +import importlib +import inspect +import pkgutil +from collections.abc import Callable, Mapping, Sequence +from dataclasses import dataclass, field +from typing import Any, Final, cast, final + +import graphon.nodes +from graphon.entities.base_node_data import BaseNodeData +from graphon.entities.graph_config import NodeConfigDict +from graphon.entities.graph_init_params import GraphInitParams +from graphon.file.enums import FileTransferMethod, FileType +from graphon.file.models import File +from graphon.graph.graph import Graph +from graphon.model_runtime.entities.llm_entities import LLMMode +from graphon.model_runtime.entities.message_entities import ( + PromptMessage, + PromptMessageRole, +) +from graphon.nodes.base.entities import OutputVariableEntity, OutputVariableType +from graphon.nodes.base.node import Node +from graphon.nodes.llm import ( + LLMNodeChatModelMessage, + LLMNodeCompletionModelPromptTemplate, +) +from graphon.nodes.llm.file_saver import LLMFileSaver +from graphon.nodes.llm.runtime_protocols import ( + PreparedLLMProtocol, + PromptMessageSerializerProtocol, +) +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.template_rendering import Jinja2TemplateRenderer +from graphon.variables.input_entities import VariableEntity, VariableEntityType + +_NODE_INIT_BASE_PARAMS: Final[frozenset[str]] = frozenset( + { + "self", + "node_id", + "config", + "graph_init_params", + "graph_runtime_state", + }, +) + +_BUILTIN_NODE_MODULES_LOADED = False + + +@dataclass(frozen=True, slots=True) +class WorkflowRuntime: + workflow_id: str + graph_runtime_state: GraphRuntimeState + run_context: Mapping[str, Any] = field(default_factory=dict) + call_depth: int = 0 + prepared_llm: PreparedLLMProtocol | None = None + llm_file_saver: LLMFileSaver | None = None + prompt_message_serializer: PromptMessageSerializerProtocol | None = None + jinja2_template_renderer: Jinja2TemplateRenderer | None = None + node_kwargs_factory: NodeKwargsFactory | None = None + + @classmethod + def from_graph_init_params( + cls, + graph_init_params: GraphInitParams, + *, + graph_runtime_state: GraphRuntimeState, + prepared_llm: PreparedLLMProtocol | None = None, + llm_file_saver: LLMFileSaver | None = None, + prompt_message_serializer: PromptMessageSerializerProtocol | None = None, + jinja2_template_renderer: Jinja2TemplateRenderer | None = None, + node_kwargs_factory: NodeKwargsFactory | None = None, + ) -> WorkflowRuntime: + return cls( + workflow_id=graph_init_params.workflow_id, + graph_runtime_state=graph_runtime_state, + run_context=graph_init_params.run_context, + call_depth=graph_init_params.call_depth, + prepared_llm=prepared_llm, + llm_file_saver=llm_file_saver, + prompt_message_serializer=prompt_message_serializer, + jinja2_template_renderer=jinja2_template_renderer, + node_kwargs_factory=node_kwargs_factory, + ) + + def create_graph_init_params( + self, + *, + graph_config: Mapping[str, Any], + ) -> GraphInitParams: + return GraphInitParams( + workflow_id=self.workflow_id, + graph_config=graph_config, + run_context=dict(self.run_context), + call_depth=self.call_depth, + ) + + +@dataclass(frozen=True, slots=True) +class NodeMaterializationContext[NodeDataT: BaseNodeData]: + node_id: str + data: NodeDataT + runtime: WorkflowRuntime + graph_init_params: GraphInitParams + graph_runtime_state: GraphRuntimeState + + +type NodeKwargsFactory = Callable[ + [NodeMaterializationContext[BaseNodeData], type[Node]], + Mapping[str, object], +] + + +@dataclass(frozen=True, slots=True) +class NodeOutputRef: + node_id: str + output_name: str + + @property + def selector(self) -> tuple[str, str]: + return (self.node_id, self.output_name) + + def as_template(self) -> str: + return "{{#" + ".".join(self.selector) + "#}}" + + def output( + self, + variable: str | None = None, + *, + value_type: OutputVariableType = OutputVariableType.ANY, + ) -> OutputBinding: + return OutputBinding.from_ref( + self, + variable=variable, + value_type=value_type, + ) + + def __str__(self) -> str: + return self.as_template() + + +class OutputBinding(OutputVariableEntity): + @classmethod + def from_ref( + cls, + ref: NodeOutputRef, + *, + variable: str | None = None, + value_type: OutputVariableType = OutputVariableType.ANY, + ) -> OutputBinding: + return cls( + variable=variable or ref.output_name, + value_type=value_type, + value_selector=ref.selector, + ) + + @property + def selector(self) -> tuple[str, ...]: + return tuple(self.value_selector) + + +@dataclass(frozen=True, slots=True) +class WorkflowNodeSpec[NodeDataT: BaseNodeData]: + node_id: str + data: NodeDataT + + def as_node_config(self) -> NodeConfigDict: + return {"id": self.node_id, "data": self.data} + + +@dataclass(frozen=True, slots=True) +class WorkflowEdgeSpec: + tail: str + head: str + source_handle: str = "source" + + def as_edge_config(self) -> dict[str, str]: + return { + "source": self.tail, + "target": self.head, + "sourceHandle": self.source_handle, + } + + +@dataclass(frozen=True, slots=True) +class WorkflowSpec: + root_node_id: str + nodes: tuple[WorkflowNodeSpec[BaseNodeData], ...] + edges: tuple[WorkflowEdgeSpec, ...] + + @property + def graph_config(self) -> dict[str, list[object]]: + return { + "nodes": [node.as_node_config() for node in self.nodes], + "edges": [edge.as_edge_config() for edge in self.edges], + } + + def materialize(self, runtime: WorkflowRuntime) -> Graph: + return _WorkflowMaterializer(runtime=runtime).materialize(self) + + +@dataclass(frozen=True, slots=True) +class NodeHandle: + builder: WorkflowBuilder + node_id: str + + def then( + self, + node_id: str, + data: BaseNodeData, + *, + source_handle: str = "source", + ) -> NodeHandle: + return self.builder.add_node( + node_id=node_id, + data=data, + from_node_id=self.node_id, + source_handle=source_handle, + ) + + def connect( + self, + target: NodeHandle, + *, + source_handle: str = "source", + ) -> NodeHandle: + return self.builder.connect( + tail=self, + head=target, + source_handle=source_handle, + ) + + def ref(self, output_name: str) -> NodeOutputRef: + return NodeOutputRef(node_id=self.node_id, output_name=output_name) + + +@final +class _PassthroughPromptMessageSerializer: + def serialize( + self, + *, + model_mode: LLMMode, + prompt_messages: Sequence[PromptMessage], + ) -> object: + _ = model_mode + return list(prompt_messages) + + +@final +class _TextOnlyFileSaver: + def save_binary_string( + self, + data: bytes, + mime_type: str, + file_type: FileType, + extension_override: str | None = None, + ) -> File: + _ = data, mime_type, file_type, extension_override + msg = "WorkflowBuilder default saver only supports text outputs." + raise RuntimeError(msg) + + def save_remote_url(self, url: str, file_type: FileType) -> File: + _ = url, file_type + msg = "WorkflowBuilder default saver only supports text outputs." + raise RuntimeError(msg) + + +class WorkflowBuilder: + def __init__(self) -> None: + self._node_order: list[str] = [] + self._node_specs: dict[str, WorkflowNodeSpec[BaseNodeData]] = {} + self._edges: list[WorkflowEdgeSpec] = [] + self._handles: dict[str, NodeHandle] = {} + self._root_node_id: str | None = None + + def root(self, node_id: str, data: BaseNodeData) -> NodeHandle: + if self._root_node_id is not None: + msg = f"Root node has already been set to {self._root_node_id!r}." + raise ValueError(msg) + self._store_node(node_id=node_id, data=data) + self._root_node_id = node_id + return self._remember_handle(node_id) + + def add_node( + self, + *, + node_id: str, + data: BaseNodeData, + from_node_id: str, + source_handle: str = "source", + ) -> NodeHandle: + if from_node_id not in self._node_specs: + msg = f"Predecessor node {from_node_id!r} is not registered." + raise ValueError(msg) + self._store_node(node_id=node_id, data=data) + self._edges.append( + WorkflowEdgeSpec( + tail=from_node_id, + head=node_id, + source_handle=source_handle, + ), + ) + return self._remember_handle(node_id) + + def connect( + self, + *, + tail: NodeHandle, + head: NodeHandle, + source_handle: str = "source", + ) -> NodeHandle: + self._ensure_owned_handle(tail) + self._ensure_owned_handle(head) + self._edges.append( + WorkflowEdgeSpec( + tail=tail.node_id, + head=head.node_id, + source_handle=source_handle, + ), + ) + return head + + def handle(self, node_id: str) -> NodeHandle: + try: + return self._handles[node_id] + except KeyError as error: + msg = f"Unknown node id {node_id!r}." + raise KeyError(msg) from error + + def build(self) -> WorkflowSpec: + if self._root_node_id is None: + msg = "WorkflowBuilder requires a root node before build()." + raise ValueError(msg) + return WorkflowSpec( + root_node_id=self._root_node_id, + nodes=tuple(self._node_specs[node_id] for node_id in self._node_order), + edges=tuple(self._edges), + ) + + def materialize(self, runtime: WorkflowRuntime) -> Graph: + return self.build().materialize(runtime) + + def _remember_handle(self, node_id: str) -> NodeHandle: + handle = NodeHandle(builder=self, node_id=node_id) + self._handles[node_id] = handle + return handle + + def _store_node(self, *, node_id: str, data: BaseNodeData) -> None: + if node_id in self._node_specs: + msg = f"Node id {node_id!r} is already registered." + raise ValueError(msg) + self._node_order.append(node_id) + self._node_specs[node_id] = WorkflowNodeSpec(node_id=node_id, data=data) + + def _ensure_owned_handle(self, handle: NodeHandle) -> None: + if handle.builder is not self: + msg = "NodeHandle belongs to a different WorkflowBuilder instance." + raise ValueError(msg) + + +@final +class _WorkflowNodeFactory: + def __init__( + self, + *, + runtime: WorkflowRuntime, + graph_init_params: GraphInitParams, + ) -> None: + self._runtime = runtime + self._graph_init_params = graph_init_params + self._llm_file_saver = runtime.llm_file_saver or _TextOnlyFileSaver() + self._prompt_message_serializer = ( + runtime.prompt_message_serializer or _PassthroughPromptMessageSerializer() + ) + + def create_node(self, node_config: NodeConfigDict) -> Node: + node_id = node_config["id"] + node_data = node_config["data"] + node_cls = _resolve_node_class(node_data) + typed_node_data = cast( + "BaseNodeData", + node_cls.validate_node_data(node_data), + ) + context = NodeMaterializationContext( + node_id=node_id, + data=typed_node_data, + runtime=self._runtime, + graph_init_params=self._graph_init_params, + graph_runtime_state=self._runtime.graph_runtime_state, + ) + return node_cls( + **self._base_node_kwargs(context), + **self._build_extra_node_kwargs(node_cls, context), + ) + + def _build_extra_node_kwargs( + self, + node_cls: type[Node], + context: NodeMaterializationContext[BaseNodeData], + ) -> dict[str, object]: + init_parameters = inspect.signature(node_cls.__init__).parameters + extra_kwargs: dict[str, object] = {} + + if ( + "model_instance" in init_parameters + and context.runtime.prepared_llm is not None + ): + extra_kwargs["model_instance"] = context.runtime.prepared_llm + if "llm_file_saver" in init_parameters: + extra_kwargs["llm_file_saver"] = self._llm_file_saver + if "prompt_message_serializer" in init_parameters: + extra_kwargs["prompt_message_serializer"] = self._prompt_message_serializer + if ( + "jinja2_template_renderer" in init_parameters + and context.runtime.jinja2_template_renderer is not None + ): + extra_kwargs["jinja2_template_renderer"] = ( + context.runtime.jinja2_template_renderer + ) + if ( + "template_renderer" in init_parameters + and context.runtime.jinja2_template_renderer is not None + ): + extra_kwargs["template_renderer"] = context.runtime.jinja2_template_renderer + + if context.runtime.node_kwargs_factory is not None: + extra_kwargs.update( + dict(context.runtime.node_kwargs_factory(context, node_cls)), + ) + + missing_kwargs = [ + name + for name, parameter in init_parameters.items() + if name not in _NODE_INIT_BASE_PARAMS + and name not in extra_kwargs + and parameter.kind + in { + inspect.Parameter.POSITIONAL_OR_KEYWORD, + inspect.Parameter.KEYWORD_ONLY, + } + and parameter.default is inspect.Parameter.empty + ] + if missing_kwargs: + missing_args = ", ".join(sorted(missing_kwargs)) + msg = ( + f"{node_cls.__name__} requires additional runtime arguments: " + f"{missing_args}. " + "Provide them through WorkflowRuntime or `node_kwargs_factory`." + ) + raise ValueError(msg) + + return extra_kwargs + + @staticmethod + def _base_node_kwargs( + context: NodeMaterializationContext[BaseNodeData], + ) -> dict[str, object]: + return { + "node_id": context.node_id, + "config": {"id": context.node_id, "data": context.data}, + "graph_init_params": context.graph_init_params, + "graph_runtime_state": context.graph_runtime_state, + } + + +@final +class _WorkflowMaterializer: + def __init__(self, *, runtime: WorkflowRuntime) -> None: + self._runtime = runtime + + def materialize(self, workflow: WorkflowSpec) -> Graph: + graph_config = workflow.graph_config + graph_init_params = self._runtime.create_graph_init_params( + graph_config=graph_config, + ) + return Graph.init( + graph_config=graph_config, + node_factory=_WorkflowNodeFactory( + runtime=self._runtime, + graph_init_params=graph_init_params, + ), + root_node_id=workflow.root_node_id, + ) + + +def _resolve_node_class(node_data: BaseNodeData) -> type[Node]: + _bootstrap_builtin_node_registry() + version_mapping = Node.get_node_type_classes_mapping().get(node_data.type) + if version_mapping is None: + msg = ( + f"No node class registered for node type {node_data.type!r}. " + "Ensure the node module has been imported before materialization." + ) + raise ValueError(msg) + + node_cls = version_mapping.get(node_data.version) + if node_cls is None: + versions = ", ".join(sorted(key for key in version_mapping if key != "latest")) + msg = ( + f"No node class registered for node type {node_data.type!r} " + f"with version {node_data.version!r}. " + f"Available versions: {versions or ''}." + ) + raise ValueError(msg) + + return node_cls + + +def _bootstrap_builtin_node_registry() -> None: + global _BUILTIN_NODE_MODULES_LOADED # noqa: PLW0603 + if _BUILTIN_NODE_MODULES_LOADED: + return + + for module_info in pkgutil.walk_packages( + graphon.nodes.__path__, + graphon.nodes.__name__ + ".", + ): + module_leaf = module_info.name.rsplit(".", maxsplit=1)[-1] + if module_leaf == "node" or module_leaf.endswith("_node"): + importlib.import_module(module_info.name) + + _BUILTIN_NODE_MODULES_LOADED = True + + +def template(*parts: str | NodeOutputRef) -> str: + return "".join( + part.as_template() if isinstance(part, NodeOutputRef) else part + for part in parts + ) + + +def chat_message( + role: PromptMessageRole, + *parts: str | NodeOutputRef, +) -> LLMNodeChatModelMessage: + return LLMNodeChatModelMessage(role=role, text=template(*parts)) + + +def system(*parts: str | NodeOutputRef) -> LLMNodeChatModelMessage: + return chat_message(PromptMessageRole.SYSTEM, *parts) + + +def user(*parts: str | NodeOutputRef) -> LLMNodeChatModelMessage: + return chat_message(PromptMessageRole.USER, *parts) + + +def assistant(*parts: str | NodeOutputRef) -> LLMNodeChatModelMessage: + return chat_message(PromptMessageRole.ASSISTANT, *parts) + + +def completion_prompt( + *parts: str | NodeOutputRef, +) -> LLMNodeCompletionModelPromptTemplate: + return LLMNodeCompletionModelPromptTemplate(text=template(*parts)) + + +def input_variable( + variable: str, + *, + variable_type: VariableEntityType, + label: str | None = None, + description: str = "", + required: bool = False, + hide: bool = False, + default: object | None = None, + max_length: int | None = None, + options: Sequence[str] = (), + allowed_file_types: Sequence[FileType] | None = (), + allowed_file_extensions: Sequence[str] | None = (), + allowed_file_upload_methods: Sequence[FileTransferMethod] | None = (), + json_schema: Mapping[str, Any] | None = None, +) -> VariableEntity: + return VariableEntity( + variable=variable, + label=label or variable.replace("_", " ").title(), + description=description, + type=variable_type, + required=required, + hide=hide, + default=default, + max_length=max_length, + options=list(options), + allowed_file_types=list(allowed_file_types or []), + allowed_file_extensions=list(allowed_file_extensions or []), + allowed_file_upload_methods=list(allowed_file_upload_methods or []), + json_schema=dict(json_schema) if json_schema is not None else None, + ) + + +def paragraph_input( + variable: str, + *, + label: str | None = None, + description: str = "", + required: bool = False, + hide: bool = False, + default: str | None = None, + max_length: int | None = None, +) -> VariableEntity: + return input_variable( + variable, + variable_type=VariableEntityType.PARAGRAPH, + label=label, + description=description, + required=required, + hide=hide, + default=default, + max_length=max_length, + ) + + +__all__ = [ + "NodeHandle", + "NodeMaterializationContext", + "NodeOutputRef", + "OutputBinding", + "WorkflowBuilder", + "WorkflowEdgeSpec", + "WorkflowNodeSpec", + "WorkflowRuntime", + "WorkflowSpec", + "assistant", + "chat_message", + "completion_prompt", + "input_variable", + "paragraph_input", + "system", + "template", + "user", +] diff --git a/tests/examples/test_graphon_openai_slim.py b/tests/examples/test_openai_slim_examples.py similarity index 76% rename from tests/examples/test_graphon_openai_slim.py rename to tests/examples/test_openai_slim_examples.py index 4eb925c..b6048bc 100644 --- a/tests/examples/test_graphon_openai_slim.py +++ b/tests/examples/test_openai_slim_examples.py @@ -6,11 +6,8 @@ import pytest -from examples.graphon_openai_slim.workflow import ( - ALLOWED_ENV_VARS, - load_env_file, - write_stream_chunk, -) +from examples.openai_slim_minimal.workflow import write_stream_chunk +from examples.openai_slim_support import ALLOWED_ENV_VARS, load_env_file from graphon.enums import BuiltinNodeTypes from graphon.graph_events.node import NodeRunStreamChunkEvent @@ -72,11 +69,18 @@ def test_load_env_file_rejects_unknown_key(tmp_path: Path) -> None: load_env_file(env_file) -def test_env_example_matches_allowed_env_vars() -> None: +@pytest.mark.parametrize( + "example_dir_name", + [ + "openai_slim_minimal", + "openai_slim_parallel_translation", + ], +) +def test_env_example_matches_allowed_env_vars(example_dir_name: str) -> None: env_example = ( Path(__file__).resolve().parents[2] / "examples" - / "graphon_openai_slim" + / example_dir_name / ".env.example" ) keys = { @@ -88,11 +92,30 @@ def test_env_example_matches_allowed_env_vars() -> None: assert keys == set(ALLOWED_ENV_VARS) -def test_env_example_leaves_slim_binary_path_empty() -> None: +@pytest.mark.parametrize( + ("example_dir_name", "expected_binary_path", "expected_plugin_folder"), + [ + ( + "openai_slim_minimal", + "~/.local/bin/dify-plugin-daemon-slim", + "~/.local/share/graphon/slim/plugins", + ), + ( + "openai_slim_parallel_translation", + "~/.local/bin/dify-plugin-daemon-slim", + "~/.local/share/graphon/slim/plugins", + ), + ], +) +def test_env_example_sets_recommended_unix_defaults( + example_dir_name: str, + expected_binary_path: str, + expected_plugin_folder: str, +) -> None: env_example = ( Path(__file__).resolve().parents[2] / "examples" - / "graphon_openai_slim" + / example_dir_name / ".env.example" ) values = { @@ -102,7 +125,8 @@ def test_env_example_leaves_slim_binary_path_empty() -> None: for key, value in [line.removeprefix("export ").split("=", 1)] } - assert not values["SLIM_BINARY_PATH"] + assert values["SLIM_BINARY_PATH"] == expected_binary_path + assert values["SLIM_PLUGIN_FOLDER"] == expected_plugin_folder def test_write_stream_chunk_writes_llm_text_chunks() -> None: diff --git a/tests/examples/test_parallel_translation_workflow.py b/tests/examples/test_parallel_translation_workflow.py new file mode 100644 index 0000000..8e63922 --- /dev/null +++ b/tests/examples/test_parallel_translation_workflow.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from io import StringIO + +from examples.openai_slim_parallel_translation.workflow import ( + TranslationStreamWriter, + write_stream_chunk, +) +from graphon.enums import BuiltinNodeTypes +from graphon.graph_events.node import NodeRunStreamChunkEvent + + +def test_write_stream_chunk_formats_translation_sections() -> None: + output = StringIO() + writer = TranslationStreamWriter(stream_output=output) + + zh_chunk = NodeRunStreamChunkEvent( + id="event-1", + node_id="translate_zh", + node_type=BuiltinNodeTypes.LLM, + selector=["translate_zh", "text"], + chunk="你好", + is_final=False, + ) + zh_final = NodeRunStreamChunkEvent( + id="event-2", + node_id="translate_zh", + node_type=BuiltinNodeTypes.LLM, + selector=["translate_zh", "text"], + chunk="", + is_final=True, + ) + en_chunk = NodeRunStreamChunkEvent( + id="event-3", + node_id="translate_en", + node_type=BuiltinNodeTypes.LLM, + selector=["translate_en", "text"], + chunk="hello", + is_final=False, + ) + en_final = NodeRunStreamChunkEvent( + id="event-4", + node_id="translate_en", + node_type=BuiltinNodeTypes.LLM, + selector=["translate_en", "text"], + chunk="", + is_final=True, + ) + + assert write_stream_chunk(zh_chunk, stream_writer=writer) is True + assert write_stream_chunk(zh_final, stream_writer=writer) is True + assert write_stream_chunk(en_chunk, stream_writer=writer) is True + assert write_stream_chunk(en_final, stream_writer=writer) is True + + assert output.getvalue() == "Chinese: 你好\nEnglish: hello\n" + + +def test_write_stream_chunk_ignores_non_translation_selectors() -> None: + output = StringIO() + writer = TranslationStreamWriter(stream_output=output) + event = NodeRunStreamChunkEvent( + id="event-5", + node_id="output", + node_type=BuiltinNodeTypes.END, + selector=["output", "answer"], + chunk="\n", + is_final=False, + ) + + assert write_stream_chunk(event, stream_writer=writer) is False + assert not output.getvalue() diff --git a/tests/test_workflow_builder.py b/tests/test_workflow_builder.py new file mode 100644 index 0000000..b65b823 --- /dev/null +++ b/tests/test_workflow_builder.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +import time +from collections.abc import Mapping +from typing import cast + +from graphon.entities.base_node_data import BaseNodeData +from graphon.model_runtime.entities.llm_entities import LLMMode +from graphon.nodes.base.entities import OutputVariableType, VariableSelector +from graphon.nodes.base.node import Node +from graphon.nodes.end.end_node import EndNode +from graphon.nodes.end.entities import EndNodeData +from graphon.nodes.llm import LLMNodeData, ModelConfig +from graphon.nodes.llm.runtime_protocols import PreparedLLMProtocol +from graphon.nodes.start.entities import StartNodeData +from graphon.nodes.template_transform.entities import TemplateTransformNodeData +from graphon.nodes.template_transform.template_transform_node import ( + TemplateTransformNode, +) +from graphon.nodes.variable_aggregator.entities import VariableAggregatorNodeData +from graphon.nodes.variable_aggregator.variable_aggregator_node import ( + VariableAggregatorNode, +) +from graphon.runtime.graph_runtime_state import GraphRuntimeState +from graphon.runtime.variable_pool import VariablePool +from graphon.template_rendering import Jinja2TemplateRenderer +from graphon.variables.input_entities import VariableEntityType +from graphon.workflow_builder import ( + NodeMaterializationContext, + NodeOutputRef, + WorkflowBuilder, + WorkflowRuntime, + completion_prompt, + input_variable, + paragraph_input, + system, + template, + user, +) + + +class _EchoTemplateRenderer(Jinja2TemplateRenderer): + def render_template(self, template: str, variables: Mapping[str, object]) -> str: + return template.replace("{{ content }}", str(variables["content"])) + + +def test_llm_node_data_defaults_context_to_disabled() -> None: + node_data = LLMNodeData( + model=ModelConfig( + provider="mock", + name="mock-chat", + mode=LLMMode.CHAT, + ), + prompt_template=[system("Translate this text.")], + ) + + assert node_data.context.enabled is False + assert node_data.context.variable_selector is None + + +def test_workflow_builder_builds_parallel_translation_workflow() -> None: + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(), + start_at=time.time(), + ) + builder = WorkflowBuilder() + + start = builder.root( + "start", + StartNodeData( + variables=[paragraph_input("content", required=True)], + ), + ) + + translation_model = ModelConfig( + provider="mock", + name="mock-chat", + mode=LLMMode.CHAT, + ) + + chinese = start.then( + "translate_zh", + LLMNodeData( + model=translation_model, + prompt_template=[ + system("Translate the following text to Chinese."), + user(start.ref("content")), + ], + ), + ) + english = start.then( + "translate_en", + LLMNodeData( + model=translation_model, + prompt_template=[ + system("Translate the following text to English."), + user(start.ref("content")), + ], + ), + ) + japanese = start.then( + "translate_ja", + LLMNodeData( + model=translation_model, + prompt_template=[ + system("Translate the following text to Japanese."), + user(start.ref("content")), + ], + ), + ) + + output = chinese.then( + "output", + EndNodeData( + outputs=[ + chinese.ref("text").output("chinese"), + english.ref("text").output("english"), + japanese.ref("text").output("japanese"), + ], + ), + ) + english.connect(output) + japanese.connect(output) + + workflow = builder.build() + graph = workflow.materialize( + WorkflowRuntime( + workflow_id="parallel-translation", + graph_runtime_state=graph_runtime_state, + prepared_llm=cast(PreparedLLMProtocol, object()), + ), + ) + + assert graph.root_node.id == "start" + assert isinstance(graph.nodes["output"], EndNode) + assert sorted((edge.tail, edge.head) for edge in graph.edges.values()) == [ + ("start", "translate_en"), + ("start", "translate_ja"), + ("start", "translate_zh"), + ("translate_en", "output"), + ("translate_ja", "output"), + ("translate_zh", "output"), + ] + + output_node = cast(EndNode, graph.nodes["output"]) + assert [item.variable for item in output_node.node_data.outputs] == [ + "chinese", + "english", + "japanese", + ] + assert [tuple(item.value_selector) for item in output_node.node_data.outputs] == [ + ("translate_zh", "text"), + ("translate_en", "text"), + ("translate_ja", "text"), + ] + + +def test_workflow_builder_helpers_produce_typed_authoring_values() -> None: + ref = NodeOutputRef(node_id="llm", output_name="text") + prompt = completion_prompt("Answer in one sentence: ", ref) + binding = ref.output("answer", value_type=OutputVariableType.STRING) + text = input_variable( + "question", + variable_type=VariableEntityType.TEXT_INPUT, + required=True, + max_length=512, + ) + + assert template("Result: ", ref) == "Result: {{#llm.text#}}" + assert prompt.text == "Answer in one sentence: {{#llm.text#}}" + assert binding.variable == "answer" + assert tuple(binding.value_selector) == ("llm", "text") + assert binding.value_type is OutputVariableType.STRING + assert text.variable == "question" + assert text.type is VariableEntityType.TEXT_INPUT + assert text.max_length == 512 + + +def test_workflow_builder_materializes_non_example_builtin_node() -> None: + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(), + start_at=time.time(), + ) + builder = WorkflowBuilder() + + start = builder.root( + "start", + StartNodeData( + variables=[paragraph_input("content", required=True)], + ), + ) + aggregate = start.then( + "aggregate", + VariableAggregatorNodeData( + output_type="string", + variables=[["start", "content"]], + ), + ) + aggregate.then( + "output", + EndNodeData(outputs=[]), + ) + + graph = builder.build().materialize( + WorkflowRuntime( + workflow_id="aggregate", + graph_runtime_state=graph_runtime_state, + ), + ) + + assert isinstance(graph.nodes["aggregate"], VariableAggregatorNode) + + +def test_workflow_builder_supports_runtime_kwargs_for_service_nodes() -> None: + graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool(), + start_at=time.time(), + ) + builder = WorkflowBuilder() + + start = builder.root( + "start", + StartNodeData( + variables=[paragraph_input("content", required=True)], + ), + ) + transform = start.then( + "transform", + TemplateTransformNodeData( + variables=[ + VariableSelector( + variable="content", + value_selector=("start", "content"), + ), + ], + template="{{ content }}", + ), + ) + transform.then( + "output", + EndNodeData(outputs=[]), + ) + + def node_kwargs_factory( + context: NodeMaterializationContext[BaseNodeData], + node_cls: type[Node], + ) -> Mapping[str, object]: + _ = context + if node_cls is TemplateTransformNode: + return {"jinja2_template_renderer": _EchoTemplateRenderer()} + return {} + + graph = builder.build().materialize( + WorkflowRuntime( + workflow_id="transform", + graph_runtime_state=graph_runtime_state, + node_kwargs_factory=node_kwargs_factory, + ), + ) + + assert isinstance(graph.nodes["transform"], TemplateTransformNode)