Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ Check `output/stories.json` for the full generated content.

## SyGra Studio

**SyGra Studio** is a visual workflow builder that replaces manual YAML editing with an interactive drag-and-drop interface:
**SyGra Studio** is a visual workflow builder that replaces manual YAML editing with an interactive drag-and-drop interface. It also allows you to execute a task, monitor during execution and view the result along with metadata like latency, token usage etc.

![SyGraStudio](https://raw.githubusercontent.com/ServiceNow/SyGra/refs/heads/main/docs/resources/videos/studio_create_new_flow.gif)

**Studio Features:**
- **Visual Graph Builder** — Drag-and-drop nodes, connect them visually, configure with forms
- **Real-time Execution** — Watch your workflow run with live node status and streaming logs
- **Rich Analytics** — Track usage, tokens, latency, and success rates across runs
Expand Down
108 changes: 108 additions & 0 deletions docs/concepts/nodes/lambda_node.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ lambda_function_node:
- **`node_state`**:
Optional. Node-specific state key.

- **`node_name`**:
Optional. Node name used in the studio UI.

### Example Lambda Implementation

You can implement a lambda either as a class or a function:
Expand All @@ -38,6 +41,7 @@ You can implement a lambda either as a class or a function:
from sygra.core.graph.functions.lambda_function import LambdaFunction
from sygra.core.graph.sygra_state import SygraState

# Lambda function with sync apply(), to use async flow use AsyncLambdaFunction
class TestLambda(LambdaFunction):
def apply(lambda_node_dict: dict, state: SygraState):
state["return_key1"] = "hello world"
Expand All @@ -56,7 +60,111 @@ def lambda_function(lambda_node_dict: dict, state: SygraState):
- Lambda nodes give you full control over data transformation, allowing you to bridge, preprocess, or postprocess state as needed.
- All keys you want accessible in the next node should be listed in `output_keys`.
- Use lambda nodes for any custom task, especially when built-in nodes do not cover your use case.
- If you have async programming in your lambda function, use `AsyncLambdaFunction` instead of `LambdaFunction`. In this case, the `apply` function is async in nature, and you can call async function with `await` keyword.

----
### Example workflow with sync and async lambda function:

YAML configuration
```yaml
data_config:
source:
type: hf
repo_id: openai/gsm8k
config_name: main
split: train
graph_config:
nodes:
lambda_1:
node_type: lambda
lambda: tasks.examples.lambda_test.task_executor.Lambda1Function
node_name: Sync Node
lambda_2:
node_type: lambda
lambda: tasks.examples.lambda_test.task_executor.Lambda2Function
node_name: Async Node
edges:
- from: START
to: lambda_1
- from: lambda_1
to: lambda_2
- from: lambda_2
to: END
```

Task Executor Code:
```python
"""
Task executor for lambda test workflow having sync and async implementation.
"""
import asyncio
import time
from sygra.core.graph.functions.lambda_function import LambdaFunction, AsyncLambdaFunction
from sygra.core.graph.sygra_state import SygraState
from sygra.logger.logger_config import logger

async def count_async():
print("One")
logger.info("One...")
await asyncio.sleep(1)
print("Two")
logger.info("Two...")
await asyncio.sleep(1)

def count_sync(count:int):
print("One")
logger.info("One...")
time.sleep(1)
print("Two")
logger.info("Two...")
time.sleep(1)
logger.info("Count..." + str(count))

async def wrapper_count_sync(count:int):
return count_sync(count)

# sync lambda function
class Lambda1Function(LambdaFunction):
"""Execute custom logic on workflow state."""

@staticmethod
def apply(lambda_node_dict: dict, state: SygraState) -> SygraState:
"""Implement this method to apply lambda function.

Args:
lambda_node_dict: configuration dictionary
state: current state of the graph

Returns:
SygraState: the updated state object
"""
logger.info("sync function calling.......class1...")

count_sync(2)

logger.info("task done")
return state

#async lambda function
class Lambda2Function(AsyncLambdaFunction):
"""Execute custom logic on workflow state."""

@staticmethod
async def apply(lambda_node_dict: dict, state: SygraState) -> SygraState:
"""Implement this method to apply lambda function.

Args:
lambda_node_dict: configuration dictionary
state: current state of the graph

Returns:
SygraState: the updated state object
"""
logger.info("async function calling.......class2...")
await count_async()
return state

```
---

**Tip:** Keep your lambda logic modular and reusable across tasks for maximum flexibility.
Expand Down
1 change: 1 addition & 0 deletions docs/getting_started/create_task_ui.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
**Visual workflow builder and execution platform for SyGra synthetic data pipelines**

---
![SyGraStudio](https://raw.githubusercontent.com/ServiceNow/SyGra/refs/heads/main/docs/resources/videos/studio_create_new_flow.gif)

## Why This Exists

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion sygra/core/graph/backend_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class BackendFactory(ABC):
"""

@abstractmethod
def create_lambda_runnable(self, exec_wrapper):
def create_lambda_runnable(self, exec_wrapper, async_func=True):
"""
Abstract method to create a Lambda runnable.

Args:
exec_wrapper: Async function to execute
async_func: True if the function is async

Returns:
Any: backend specific runnable object like Runnable for backend=Langgraph
Expand Down
20 changes: 20 additions & 0 deletions sygra/core/graph/functions/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,23 @@ def apply(lambda_node_dict: dict, state: SygraState):
SygraState: the updated state object
"""
pass


class AsyncLambdaFunction(ABC):
"""
This is a function class represent Async Lambda Function class.
Implement async apply() method for lambda function to be called by graph node.
"""

@staticmethod
@abstractmethod
async def apply(lambda_node_dict: dict, state: SygraState):
"""
Implement this method to apply lambda function
Args:
lambda_node_dict: configuration dictionary
state: current state of the graph
Returns:
SygraState: the updated state object
"""
pass
10 changes: 7 additions & 3 deletions sygra/core/graph/langgraph/langgraph_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ class LangGraphFactory(BackendFactory):
A factory class to convert Nodes into Runnable objects which LangGraph framework can execute.
"""

def create_lambda_runnable(self, exec_wrapper):
def create_lambda_runnable(self, exec_wrapper, async_func=True):
"""
Abstract method to create a Lambda runnable.

Args:
exec_wrapper: Async function to execute
exec_wrapper: Async/sync function to execute
async_func: True if the function is async

Returns:
Any: backend specific runnable object like Runnable for backend=Langgraph
"""
return RunnableLambda(lambda x: x, afunc=exec_wrapper)
if async_func:
return RunnableLambda(lambda x: x, afunc=exec_wrapper)
else:
return RunnableLambda(func=exec_wrapper)

def create_llm_runnable(self, exec_wrapper):
"""
Expand Down
40 changes: 38 additions & 2 deletions sygra/core/graph/nodes/lambda_node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import time
from inspect import isclass
from typing import Any
Expand All @@ -23,8 +24,35 @@ def __init__(self, node_name: str, config: dict):
self.func_to_execute = utils.get_func_from_str(self.node_config["lambda"])
if isclass(self.func_to_execute):
self.func_to_execute = self.func_to_execute.apply
# deduce if the function type is sync or async
if asyncio.iscoroutinefunction(self.func_to_execute):
self.func_type = "async"
else:
self.func_type = "sync"

async def _exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]:
async def _async_exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]:
"""
Wrapper to track lambda node execution.

Args:
state: State of the node.

Returns:
Updated state
"""
start_time = time.time()
success = True

try:
result: dict[str, Any] = await self.func_to_execute(self.node_config, state)
return result
except Exception:
success = False
raise
finally:
self._record_execution_metadata(start_time, success)

def _sync_exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]:
"""
Wrapper to track lambda node execution.

Expand Down Expand Up @@ -53,7 +81,15 @@ def to_backend(self) -> Any:
Returns:
Any: platform specific runnable object like Runnable in LangGraph.
"""
return utils.backend_factory.create_lambda_runnable(self._exec_wrapper)
if self.func_type == "sync":
return utils.backend_factory.create_lambda_runnable(
self._sync_exec_wrapper, async_func=False
)
elif self.func_type == "async":
# default to async function as old behavior(default async_func is True)
return utils.backend_factory.create_lambda_runnable(self._async_exec_wrapper)
else:
raise Exception("Invalid function type")

def validate_node(self):
"""
Expand Down