-
Notifications
You must be signed in to change notification settings - Fork 3.2k
How do I call the save_csv_artifact function and give agents access to the artifact for code execution? #633
Description
I am trying to build a multi-agent system to perform data analysis tasks. I am using 4 agents which will be run sequentially. I am using a planner agent which takes the natural language query from a business user and converts it into a JSON plan to guide Python code generation. The code generation agent takes the JSON plan and generates Python code using Pandas for data analysis. The code executor agent executes the Python code using built_in_code_execution tool and returns the result. The interpreter agent understands the result and presents results in non technical way to the business user so they can understand their data better.
I looked at the examples in the documentation and I am confused about how exactly to use ToolContext, how to call the save_csv_artifact function in the code and how to give agents access to the csv artifact to code generation and execution happens without any errors.
I am new to using google adk and If you think there are mistakes in my code or I think about restructuring the system, do give me suggestions and improvements
`APP_NAME = "data_analysis_app"
USER_ID = str(uuid.uuid4())
SESSION_ID = str(uuid.uuid4())
GEMINI_2_FLASH = "gemini-2.0-flash"
OPENAI_GPT_4O = LiteLlm(model="openai/gpt-4o")
df = pd.read_csv("./data/supply_chain_data.csv")
print("Data loaded")
def save_csv_artifact(context: ToolContext):
"""Saves CSV data as an artifact."""
with open('./data/supply_chain_data.csv', 'rb') as f:
csv_bytes = f.read()
print("CSV read and converted to bytes")
csv_part = types.Part.from_data(
data=csv_bytes,
mime_type="text/csv"
)
filename = "user:supply_chain_data.csv" # 'user:' prefix scopes it to the user
try:
version = context.save_artifact(filename=filename, artifact=csv_part)
print(f"Successfully saved artifact '{filename}' as version {version}.")
except ValueError as e:
print(f"Error saving artifact: {e}. Is ArtifactService configured?")
except Exception as e:
print(f"An unexpected error occurred during artifact save: {e}")
# Initialize the session service
session_service = InMemorySessionService()
artifact_service = InMemoryArtifactService()
# Create a session with the artifact filename in the state
session = session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state={
'columns': df.columns.tolist(),
'data_types': df.dtypes.astype(str).tolist()
}
)
print(f"Initial session state: {session.state}")
agent = LlmAgent(
name="DataAgent",
model="gemini-2.0-flash",
instruction="Use the CSV data for analysis.",
before_tool_callback=save_csv_artifact
)
# Agent 1: Planner
planner_agent = LlmAgent(
name="PlannerAgent",
model=OPENAI_GPT_4O,
instruction="""
You are a data analysis planner. The user has asked a question about a pandas DataFrame.
Here are the column names and the data types of the DataFrame:
{columns}
{data_types}
Based on the user's question and the DataFrame structure, generate a structured JSON plan to guide Python code generation.
The JSON must clearly specify the analysis task, relevant columns, and the aggregation or filtering logic if applicable.
Only output:
1. A list of column names (as a Python list).
2. A single valid JSON object (no markdown or extra commentary).
""",
output_key="analysis_plan"
)
# Agent 2: Code Generator
code_gen_agent = LlmAgent(
name="CodeGenAgent",
model=OPENAI_GPT_4O,
instruction="""
You are an expert Python coder. Read the structured JSON plan provided in the session state with key 'analysis_plan' to write an executable Python script to perform the task.
Use the `context.load_artifact` method to load the CSV data from the artifact named 'user:supply_chain_data.csv'. Here's how you can do it:
```python
import pandas as pd
from io import BytesIO
# Load the artifact
artifact = context.load_artifact(filename="user:supply_chain_data.csv")
if artifact and artifact.inline_data:
csv_bytes = artifact.inline_data.data
data = pd.read_csv(BytesIO(csv_bytes))
else:
raise FileNotFoundError("Artifact 'user:supply_chain_data.csv' not found.")
Proceed to perform the analysis as per the plan.
Only output valid Python code.
""",
output_key="generated_code"
)
code_executor_agent = LlmAgent(
name="CodeExecAgent",
model=GEMINI_2_FLASH,
instruction=f"""
You are a Python code executor. Execute the Python code provided in the session state under 'generated_code'.
Use the 'built_in_code_execution' tool to run the code.
After execution, return the output as plain text.
""",
tools = [built_in_code_execution],
output_key="code_output"
)
# Agent 3: Interpreter
interpreter_agent = LlmAgent(
name="InterpreterAgent",
model=OPENAI_GPT_4O,
instruction="""
You are a Business Analyst. Your task is to interpret the technical output provided in the session state key 'code_output' and explain it in simple, clear language to a non-technical business user.
The user wants to understand the results of their data analysis. Focus on the key findings and insights, avoiding jargon.
""",
output_key="final_answer"
)
# Sequential workflow
analysis_pipeline = SequentialAgent(
name="AnalysisPipeline",
sub_agents=[agent, planner_agent, code_gen_agent, code_executor_agent, interpreter_agent],
description="Executes a sequence of planning, code generation, code execution, and interpreting"
)
root_agent = analysis_pipeline
runner = Runner(
app_name=APP_NAME,
agent=root_agent,
session_service=session_service,
artifact_service=InMemoryArtifactService()
)
async def call_agent_async(query):
content = types.Content(role='user', parts=[types.Part(text=query)])
print(f"\n--- Running Query: {query} ---")
final_response_text = "No final text response captured."
try:
# Use run_async
async for event in runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content):
print(f"Event ID: {event.id}, Author: {event.author}")
# --- Check for specific parts FIRST ---
has_specific_part = False
if event.content and event.content.parts:
for part in event.content.parts: # Iterate through all parts
if part.executable_code:
# Access the actual code string via .code
print(f" Debug: Agent generated code:\n```python\n{part.executable_code.code}\n```")
has_specific_part = True
elif part.code_execution_result:
# Access outcome and output correctly
print(f" Debug: Code Execution Result: {part.code_execution_result.outcome} - Output:\n{part.code_execution_result.output}")
has_specific_part = True
# Also print any text parts found in any event for debugging
elif part.text and not part.text.isspace():
print(f" Text: '{part.text.strip()}'")
# Do not set has_specific_part=True here, as we want the final response logic below
# --- Check for final response AFTER specific parts ---
# Only consider it final if it doesn't have the specific code parts we just handled
if not has_specific_part and event.is_final_response():
if event.content and event.content.parts and event.content.parts[0].text:
final_response_text = event.content.parts[0].text.strip()
print(f"==> Final Agent Response: {final_response_text}")
else:
print("==> Final Agent Response: [No text content in final event]")
except Exception as e:
print(f"ERROR during agent run: {e}")
print("-" * 30)
async def main():
await call_agent_async("Which SKUs have high sales but low profit margins or vice versa?")
try:
asyncio.run(main())
except RuntimeError as e:
# Handle specific error when running asyncio.run in an already running loop (like Jupyter/Colab)
if "cannot be called from a running event loop" in str(e):
print("\nRunning in an existing event loop (like Colab/Jupyter).")
print("Please run `await main()` in a notebook cell instead.")
# If in an interactive environment like a notebook, you might need to run:
# await main()
else:
raise e # Re-raise other runtime errors
`