diff --git a/README.md b/README.md index 315b4e76..aeabc762 100644 --- a/README.md +++ b/README.md @@ -133,8 +133,11 @@ Check `output/stories.json` for the full generated content. ## SyGra Studio -**SyGra Studio** is a visual workflow builder that replaces manual YAML editing with an interactive drag-and-drop interface: +**SyGra Studio** is a visual workflow builder that replaces manual YAML editing with an interactive drag-and-drop interface. It also allows you to execute a task, monitor during execution and view the result along with metadata like latency, token usage etc. +![SyGraStudio](https://raw.githubusercontent.com/ServiceNow/SyGra/refs/heads/main/docs/resources/videos/studio_create_new_flow.gif) + +**Studio Features:** - **Visual Graph Builder** — Drag-and-drop nodes, connect them visually, configure with forms - **Real-time Execution** — Watch your workflow run with live node status and streaming logs - **Rich Analytics** — Track usage, tokens, latency, and success rates across runs diff --git a/docs/concepts/nodes/lambda_node.md b/docs/concepts/nodes/lambda_node.md index 59daf553..3cfc24f0 100644 --- a/docs/concepts/nodes/lambda_node.md +++ b/docs/concepts/nodes/lambda_node.md @@ -29,6 +29,9 @@ lambda_function_node: - **`node_state`**: Optional. Node-specific state key. +- **`node_name`**: + Optional. Node name used in the studio UI. + ### Example Lambda Implementation You can implement a lambda either as a class or a function: @@ -38,6 +41,7 @@ You can implement a lambda either as a class or a function: from sygra.core.graph.functions.lambda_function import LambdaFunction from sygra.core.graph.sygra_state import SygraState +# Lambda function with sync apply(), to use async flow use AsyncLambdaFunction class TestLambda(LambdaFunction): def apply(lambda_node_dict: dict, state: SygraState): state["return_key1"] = "hello world" @@ -56,7 +60,111 @@ def lambda_function(lambda_node_dict: dict, state: SygraState): - Lambda nodes give you full control over data transformation, allowing you to bridge, preprocess, or postprocess state as needed. - All keys you want accessible in the next node should be listed in `output_keys`. - Use lambda nodes for any custom task, especially when built-in nodes do not cover your use case. +- If you have async programming in your lambda function, use `AsyncLambdaFunction` instead of `LambdaFunction`. In this case, the `apply` function is async in nature, and you can call async function with `await` keyword. + +---- +### Example workflow with sync and async lambda function: + +YAML configuration +```yaml +data_config: + source: + type: hf + repo_id: openai/gsm8k + config_name: main + split: train +graph_config: + nodes: + lambda_1: + node_type: lambda + lambda: tasks.examples.lambda_test.task_executor.Lambda1Function + node_name: Sync Node + lambda_2: + node_type: lambda + lambda: tasks.examples.lambda_test.task_executor.Lambda2Function + node_name: Async Node + edges: + - from: START + to: lambda_1 + - from: lambda_1 + to: lambda_2 + - from: lambda_2 + to: END +``` +Task Executor Code: +```python +""" +Task executor for lambda test workflow having sync and async implementation. +""" +import asyncio +import time +from sygra.core.graph.functions.lambda_function import LambdaFunction, AsyncLambdaFunction +from sygra.core.graph.sygra_state import SygraState +from sygra.logger.logger_config import logger + +async def count_async(): + print("One") + logger.info("One...") + await asyncio.sleep(1) + print("Two") + logger.info("Two...") + await asyncio.sleep(1) + +def count_sync(count:int): + print("One") + logger.info("One...") + time.sleep(1) + print("Two") + logger.info("Two...") + time.sleep(1) + logger.info("Count..." + str(count)) + +async def wrapper_count_sync(count:int): + return count_sync(count) + +# sync lambda function +class Lambda1Function(LambdaFunction): + """Execute custom logic on workflow state.""" + + @staticmethod + def apply(lambda_node_dict: dict, state: SygraState) -> SygraState: + """Implement this method to apply lambda function. + + Args: + lambda_node_dict: configuration dictionary + state: current state of the graph + + Returns: + SygraState: the updated state object + """ + logger.info("sync function calling.......class1...") + + count_sync(2) + + logger.info("task done") + return state + +#async lambda function +class Lambda2Function(AsyncLambdaFunction): + """Execute custom logic on workflow state.""" + + @staticmethod + async def apply(lambda_node_dict: dict, state: SygraState) -> SygraState: + """Implement this method to apply lambda function. + + Args: + lambda_node_dict: configuration dictionary + state: current state of the graph + + Returns: + SygraState: the updated state object + """ + logger.info("async function calling.......class2...") + await count_async() + return state + +``` --- **Tip:** Keep your lambda logic modular and reusable across tasks for maximum flexibility. diff --git a/docs/getting_started/create_task_ui.md b/docs/getting_started/create_task_ui.md index 15e21eb9..ebbfddce 100644 --- a/docs/getting_started/create_task_ui.md +++ b/docs/getting_started/create_task_ui.md @@ -3,6 +3,7 @@ **Visual workflow builder and execution platform for SyGra synthetic data pipelines** --- +![SyGraStudio](https://raw.githubusercontent.com/ServiceNow/SyGra/refs/heads/main/docs/resources/videos/studio_create_new_flow.gif) ## Why This Exists diff --git a/docs/resources/videos/studio_create_new_flow.gif b/docs/resources/videos/studio_create_new_flow.gif new file mode 100644 index 00000000..56caf4fd Binary files /dev/null and b/docs/resources/videos/studio_create_new_flow.gif differ diff --git a/sygra/core/graph/backend_factory.py b/sygra/core/graph/backend_factory.py index da9d3f19..cb9d56d3 100644 --- a/sygra/core/graph/backend_factory.py +++ b/sygra/core/graph/backend_factory.py @@ -11,12 +11,13 @@ class BackendFactory(ABC): """ @abstractmethod - def create_lambda_runnable(self, exec_wrapper): + def create_lambda_runnable(self, exec_wrapper, async_func=True): """ Abstract method to create a Lambda runnable. Args: exec_wrapper: Async function to execute + async_func: True if the function is async Returns: Any: backend specific runnable object like Runnable for backend=Langgraph diff --git a/sygra/core/graph/functions/lambda_function.py b/sygra/core/graph/functions/lambda_function.py index cfa884ad..dea359ec 100644 --- a/sygra/core/graph/functions/lambda_function.py +++ b/sygra/core/graph/functions/lambda_function.py @@ -21,3 +21,23 @@ def apply(lambda_node_dict: dict, state: SygraState): SygraState: the updated state object """ pass + + +class AsyncLambdaFunction(ABC): + """ + This is a function class represent Async Lambda Function class. + Implement async apply() method for lambda function to be called by graph node. + """ + + @staticmethod + @abstractmethod + async def apply(lambda_node_dict: dict, state: SygraState): + """ + Implement this method to apply lambda function + Args: + lambda_node_dict: configuration dictionary + state: current state of the graph + Returns: + SygraState: the updated state object + """ + pass diff --git a/sygra/core/graph/langgraph/langgraph_factory.py b/sygra/core/graph/langgraph/langgraph_factory.py index 9dd70e51..ba50dbc0 100644 --- a/sygra/core/graph/langgraph/langgraph_factory.py +++ b/sygra/core/graph/langgraph/langgraph_factory.py @@ -22,17 +22,21 @@ class LangGraphFactory(BackendFactory): A factory class to convert Nodes into Runnable objects which LangGraph framework can execute. """ - def create_lambda_runnable(self, exec_wrapper): + def create_lambda_runnable(self, exec_wrapper, async_func=True): """ Abstract method to create a Lambda runnable. Args: - exec_wrapper: Async function to execute + exec_wrapper: Async/sync function to execute + async_func: True if the function is async Returns: Any: backend specific runnable object like Runnable for backend=Langgraph """ - return RunnableLambda(lambda x: x, afunc=exec_wrapper) + if async_func: + return RunnableLambda(lambda x: x, afunc=exec_wrapper) + else: + return RunnableLambda(func=exec_wrapper) def create_llm_runnable(self, exec_wrapper): """ diff --git a/sygra/core/graph/nodes/lambda_node.py b/sygra/core/graph/nodes/lambda_node.py index 0709c564..67152a54 100644 --- a/sygra/core/graph/nodes/lambda_node.py +++ b/sygra/core/graph/nodes/lambda_node.py @@ -1,3 +1,4 @@ +import asyncio import time from inspect import isclass from typing import Any @@ -23,8 +24,35 @@ def __init__(self, node_name: str, config: dict): self.func_to_execute = utils.get_func_from_str(self.node_config["lambda"]) if isclass(self.func_to_execute): self.func_to_execute = self.func_to_execute.apply + # deduce if the function type is sync or async + if asyncio.iscoroutinefunction(self.func_to_execute): + self.func_type = "async" + else: + self.func_type = "sync" - async def _exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]: + async def _async_exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]: + """ + Wrapper to track lambda node execution. + + Args: + state: State of the node. + + Returns: + Updated state + """ + start_time = time.time() + success = True + + try: + result: dict[str, Any] = await self.func_to_execute(self.node_config, state) + return result + except Exception: + success = False + raise + finally: + self._record_execution_metadata(start_time, success) + + def _sync_exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]: """ Wrapper to track lambda node execution. @@ -53,7 +81,15 @@ def to_backend(self) -> Any: Returns: Any: platform specific runnable object like Runnable in LangGraph. """ - return utils.backend_factory.create_lambda_runnable(self._exec_wrapper) + if self.func_type == "sync": + return utils.backend_factory.create_lambda_runnable( + self._sync_exec_wrapper, async_func=False + ) + elif self.func_type == "async": + # default to async function as old behavior(default async_func is True) + return utils.backend_factory.create_lambda_runnable(self._async_exec_wrapper) + else: + raise Exception("Invalid function type") def validate_node(self): """