-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsales_analysis_agent.py
More file actions
144 lines (122 loc) · 5.05 KB
/
sales_analysis_agent.py
File metadata and controls
144 lines (122 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import logging
import os
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from llama_index.core import (
Document,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)
from llama_index.core.agent import ReActAgent
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.embeddings.gemini import GeminiEmbedding
from llama_index.llms.gemini import Gemini
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
# Initialize Gemini LLM and embeddings
llm = Gemini(model_name="models/gemini-1.5-flash", api_key=GOOGLE_API_KEY)
embed_model = GeminiEmbedding(model_name="models/embedding-001", api_key=GOOGLE_API_KEY)
# Define index storage path
INDEX_STORAGE_DIR = "index_storage"
# Custom analytics tool for statistical calculations
def compute_analytics(metric: str, column: str, filter_condition: str = None) -> float:
"""Compute statistical metrics (e.g., sum, average) on sales data with optional filtering."""
df = pd.read_csv("sales_data.csv")
if filter_condition:
df = df.query(filter_condition)
if metric == "sum":
return df[column].sum()
elif metric == "average":
return df[column].mean()
return 0.0
analytics_tool = FunctionTool.from_defaults(
fn=compute_analytics,
name="analytics_tool",
description="Computes statistical metrics (sum, average) on sales data with optional filters.",
)
# Check if index exists on disk
index_storage_path = Path(INDEX_STORAGE_DIR)
if index_storage_path.exists():
logger.info("Loading existing index from disk...")
storage_context = StorageContext.from_defaults(persist_dir=INDEX_STORAGE_DIR)
index = load_index_from_storage(storage_context, embed_model=embed_model)
else:
logger.info("Indexing sales data...")
# Read sales data from CSV
sales_df = pd.read_csv("sales_data.csv")
# Convert DataFrame to list of Documents
documents = []
for _, row in sales_df.iterrows():
text = (
f"OrderID: {row['OrderID']}, Date: {row['Date']}, Region: {row['Region']}, "
f"Product: {row['Product']}, Category: {row['Category']}, Quantity: {row['Quantity']}, "
f"UnitPrice: {row['UnitPrice']}, TotalSale: {row['TotalSale']}"
)
documents.append(Document(text=text))
# index.insert_nodes(documents) # Insert documents into the index
# index.insert_nodes is used to add documents to the index
# on the opposite, VectorStoreIndex.from_documents is used to create a new index from documents
# Create index with sentence splitter
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=50)
index = VectorStoreIndex.from_documents(
documents, embed_model=embed_model, transformations=[splitter]
)
# Persist index to disk
index.storage_context.persist(persist_dir=INDEX_STORAGE_DIR)
logger.info("Indexing complete and saved to disk.")
# Create query engine
query_engine = index.as_query_engine(llm=llm, similarity_top_k=5)
# Define query engine tool
sales_tool = QueryEngineTool.from_defaults(
query_engine=query_engine,
name="sales_data_tool",
description="Provides insights from sales data including total sales, regional performance, and product analysis.",
)
# Create ReAct agent with multiple tools
agent = ReActAgent.from_tools([sales_tool, analytics_tool], llm=llm, verbose=True)
# Function to query sales data using the agent
def analyze_sales(query, query_history):
response = agent.chat(query)
query_history.append((query, str(response.response)))
return response.response
# Interactive query loop with query history
if __name__ == "__main__":
query_history = []
print("Welcome to InsightPulse: Your AI-Powered Sales Report Analysis Tool!")
print(
"Enter your query (e.g., 'What is the total sales for Laptops in South in 2024?')"
)
print("Type 'history' to view recent queries, 'exit' to quit.")
while True:
user_query = input("\nYour query: ").strip()
if user_query.lower() == "exit":
print("Exiting InsightPulse. Goodbye!")
break
if user_query.lower() == "history":
if query_history:
print("\nRecent Queries:")
for i, (q, r) in enumerate(
query_history[-5:], 1
): # Show last 5 queries
print(
f"{i}. Query: {q}\n Response: {r[:100]}..."
) # Truncate long responses
else:
print("No query history yet.")
continue
if not user_query:
print("Please enter a valid query.")
continue
print(f"\nProcessing query: {user_query}")
try:
response = analyze_sales(user_query, query_history)
print(f"Response: {response}")
except Exception as e:
print(f"Error processing query: {e}")