Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 78 additions & 52 deletions backend/ai_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from typing import Optional
import warnings
from async_lru import alru_cache
from retry_utils import exponential_backoff_retry
import logging
import json
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The json import at module level is redundant since it was already used in the original code within the try block. Consider keeping it at module level throughout for consistency.

Suggested change
import json

Copilot uses AI. Check for mistakes.

# Configure logging
logger = logging.getLogger(__name__)

# Suppress deprecation warnings from google.generativeai
warnings.filterwarnings("ignore", category=FutureWarning, module="google.generativeai")
Expand All @@ -13,78 +19,98 @@
if api_key:
genai.configure(api_key=api_key)

def _get_fallback_action_plan(issue_description: str, category: str) -> dict:
"""Generate fallback action plan when AI is unavailable."""
return {
"whatsapp": f"Hello, I would like to report a {category} issue: {issue_description}",
"email_subject": f"Complaint regarding {category}",
"email_body": f"Respected Authority,\n\nI am writing to bring to your attention a {category} issue: {issue_description}.\n\nPlease take necessary action.\n\nSincerely,\nCitizen"
}


@exponential_backoff_retry(max_retries=3, base_delay=1.0, max_delay=10.0)
async def _generate_action_plan_with_retry(issue_description: str, category: str) -> dict:
"""
Internal function that generates action plan with retry logic.
Raises exception on failure to allow retry decorator to work.
"""
model = genai.GenerativeModel('gemini-1.5-flash')

prompt = f"""
You are a civic action assistant. A user has reported a civic issue.
Category: {category}
Description: {issue_description}

Please generate:
1. A concise WhatsApp message (max 200 chars) that can be sent to authorities.
2. A formal but firm email subject.
3. A formal email body (max 150 words) addressed to the relevant authority (e.g., Municipal Commissioner, Police, etc. based on category).

Return the response in strictly valid JSON format with keys: "whatsapp", "email_subject", "email_body".
Do not use markdown code blocks. Just the raw JSON string.
"""

response = await model.generate_content_async(prompt)
text_response = response.text.strip()

# Cleanup if markdown code blocks are returned
if text_response.startswith("```json"):
text_response = text_response[7:-3]
elif text_response.startswith("```"):
text_response = text_response[3:-3]

return json.loads(text_response)


async def generate_action_plan(issue_description: str, category: str, image_path: Optional[str] = None) -> dict:
"""
Generates an action plan (WhatsApp message, Email draft) using Gemini.
Includes retry logic with exponential backoff for transient failures.
"""
if not api_key:
return {
"whatsapp": f"Hello, I would like to report a {category} issue: {issue_description}",
"email_subject": f"Complaint regarding {category}",
"email_body": f"Respected Authority,\n\nI am writing to bring to your attention a {category} issue: {issue_description}.\n\nPlease take necessary action.\n\nSincerely,\nCitizen"
}
logger.warning("No API key configured, using fallback action plan")
return _get_fallback_action_plan(issue_description, category)

try:
# Use Gemini 1.5 Flash for faster response times
model = genai.GenerativeModel('gemini-1.5-flash')

prompt = f"""
You are a civic action assistant. A user has reported a civic issue.
Category: {category}
Description: {issue_description}

Please generate:
1. A concise WhatsApp message (max 200 chars) that can be sent to authorities.
2. A formal but firm email subject.
3. A formal email body (max 150 words) addressed to the relevant authority (e.g., Municipal Commissioner, Police, etc. based on category).
return await _generate_action_plan_with_retry(issue_description, category)
except Exception as e:
logger.error(f"Gemini Error after all retries: {e}", exc_info=True)
# Return fallback after all retries exhausted
return _get_fallback_action_plan(issue_description, category)

Return the response in strictly valid JSON format with keys: "whatsapp", "email_subject", "email_body".
Do not use markdown code blocks. Just the raw JSON string.
"""
@exponential_backoff_retry(max_retries=3, base_delay=1.0, max_delay=10.0)
async def _chat_with_civic_assistant_with_retry(query: str) -> str:
"""
Internal function that handles chat with retry logic.
Raises exception on failure to allow retry decorator to work.
"""
model = genai.GenerativeModel('gemini-1.5-flash')

response = await model.generate_content_async(prompt)
text_response = response.text.strip()
prompt = f"""
You are VishwaGuru, a helpful civic assistant for Indian citizens.
User Query: {query}

# Cleanup if markdown code blocks are returned
if text_response.startswith("```json"):
text_response = text_response[7:-3]
elif text_response.startswith("```"):
text_response = text_response[3:-3]
Answer the user's question about civic issues, government services, or local administration.
If they ask about specific MLAs, tell them to use the "Find My MLA" feature.
Keep answers concise and helpful.
"""

import json
return json.loads(text_response)
response = await model.generate_content_async(prompt)
return response.text.strip()

except Exception as e:
print(f"Gemini Error: {e}")
# Fallback
return {
"whatsapp": f"Hello, I would like to report a {category} issue: {issue_description}",
"email_subject": f"Complaint regarding {category}",
"email_body": f"Respected Authority,\n\nI am writing to bring to your attention a {category} issue: {issue_description}.\n\nPlease take necessary action.\n\nSincerely,\nCitizen"
}

@alru_cache(maxsize=100)
async def chat_with_civic_assistant(query: str) -> str:
"""
Chat with the civic assistant.
Includes retry logic with exponential backoff for transient failures.
"""
if not api_key:
logger.warning("No API key configured, chat assistant offline")
return "I am currently offline. Please try again later."

try:
model = genai.GenerativeModel('gemini-1.5-flash')

prompt = f"""
You are VishwaGuru, a helpful civic assistant for Indian citizens.
User Query: {query}

Answer the user's question about civic issues, government services, or local administration.
If they ask about specific MLAs, tell them to use the "Find My MLA" feature.
Keep answers concise and helpful.
"""

response = await model.generate_content_async(prompt)
return response.text.strip()
return await _chat_with_civic_assistant_with_retry(query)
except Exception as e:
print(f"Gemini Chat Error: {e}")
return "I encountered an error processing your request."
logger.error(f"Gemini Chat Error after all retries: {e}", exc_info=True)
return "I encountered an error processing your request. Please try again later."
60 changes: 40 additions & 20 deletions backend/gemini_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
Gemini Summary Service for Maharashtra MLA Information

Uses Gemini AI to generate human-readable summaries about MLAs and their roles.
Includes retry logic with exponential backoff for handling transient failures.
"""
import os
import google.generativeai as genai
from typing import Dict, Optional
import warnings
from async_lru import alru_cache
from retry_utils import exponential_backoff_retry
import logging

# Configure logging
logger = logging.getLogger(__name__)

# Suppress deprecation warnings from google.generativeai
warnings.filterwarnings("ignore", category=FutureWarning, module="google.generativeai")
Expand Down Expand Up @@ -38,6 +44,35 @@ def _get_fallback_summary(mla_name: str, assembly_constituency: str, district: s
)


@exponential_backoff_retry(max_retries=3, base_delay=1.0, max_delay=10.0)
async def _generate_mla_summary_with_retry(
district: str,
assembly_constituency: str,
mla_name: str,
issue_category: Optional[str] = None
) -> str:
"""
Internal function that generates MLA summary with retry logic.
Raises exception on failure to allow retry decorator to work.
"""
model = genai.GenerativeModel('gemini-1.5-flash')

issue_context = f" particularly regarding {issue_category} issues" if issue_category else ""

prompt = f"""
You are helping an Indian citizen understand who represents them.
In one short paragraph (max 100 words), explain that the MLA {mla_name} represents
the assembly constituency {assembly_constituency} in district {district}, state Maharashtra{issue_context},
and what type of local issues they typically handle.

Do not hallucinate phone numbers or emails; only talk about roles and responsibilities.
Keep it factual, helpful, and encouraging for civic engagement.
"""

response = await model.generate_content_async(prompt)
return response.text.strip()


@alru_cache(maxsize=100)
async def generate_mla_summary(
district: str,
Expand All @@ -47,6 +82,7 @@ async def generate_mla_summary(
) -> str:
"""
Generate a human-readable summary about an MLA using Gemini.
Includes retry logic with exponential backoff for transient failures.

Args:
district: District name
Expand All @@ -58,28 +94,12 @@ async def generate_mla_summary(
A short paragraph describing the MLA's role and responsibilities
"""
if not api_key:
logger.warning("No API key configured, using fallback MLA summary")
return _get_fallback_summary(mla_name, assembly_constituency, district)

try:
# Use Gemini 1.5 Flash for faster response times
model = genai.GenerativeModel('gemini-1.5-flash')

issue_context = f" particularly regarding {issue_category} issues" if issue_category else ""

prompt = f"""
You are helping an Indian citizen understand who represents them.
In one short paragraph (max 100 words), explain that the MLA {mla_name} represents
the assembly constituency {assembly_constituency} in district {district}, state Maharashtra{issue_context},
and what type of local issues they typically handle.

Do not hallucinate phone numbers or emails; only talk about roles and responsibilities.
Keep it factual, helpful, and encouraging for civic engagement.
"""

response = await model.generate_content_async(prompt)
return response.text.strip()

return await _generate_mla_summary_with_retry(district, assembly_constituency, mla_name, issue_category)
except Exception as e:
print(f"Gemini Summary Error: {e}")
# Fallback to simple description
logger.error(f"Gemini Summary Error after all retries: {e}", exc_info=True)
# Return fallback after all retries exhausted
return _get_fallback_summary(mla_name, assembly_constituency, district)
49 changes: 38 additions & 11 deletions backend/hf_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import httpx
from PIL import Image
import asyncio
from retry_utils import exponential_backoff_retry
import logging
import base64
Comment on lines +6 to +8
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base64 import at module level is unnecessary since it's only used within the _make_request_with_retry function. Moving it there (line 34) would be more appropriate, or keep it at module level and remove the duplicate import location.

Copilot uses AI. Check for mistakes.

# Configure logging
logger = logging.getLogger(__name__)

# HF_TOKEN is optional for public models but recommended for higher limits
token = os.environ.get("HF_TOKEN")
Expand All @@ -19,8 +25,12 @@ async def query_hf_api(image_bytes, labels, client=None):
async with httpx.AsyncClient() as new_client:
return await _make_request(new_client, image_bytes, labels)

async def _make_request(client, image_bytes, labels):
import base64
@exponential_backoff_retry(max_retries=3, base_delay=1.0, max_delay=10.0)
async def _make_request_with_retry(client, image_bytes, labels):
"""
Internal function that makes HF API request with retry logic.
Raises exception on failure to allow retry decorator to work.
"""
image_base64 = base64.b64encode(image_bytes).decode('utf-8')

payload = {
Expand All @@ -30,19 +40,28 @@ async def _make_request(client, image_bytes, labels):
}
}

response = await client.post(API_URL, headers=headers, json=payload, timeout=20.0)
if response.status_code != 200:
error_msg = f"HF API Error: {response.status_code} - {response.text}"
logger.error(error_msg)
raise Exception(error_msg)
return response.json()


async def _make_request(client, image_bytes, labels):
"""
Makes request to Hugging Face API with retry logic and proper error handling.
"""
try:
response = await client.post(API_URL, headers=headers, json=payload, timeout=20.0)
if response.status_code != 200:
print(f"HF API Error: {response.status_code} - {response.text}")
return []
return response.json()
return await _make_request_with_retry(client, image_bytes, labels)
except Exception as e:
print(f"HF API Request Exception: {e}")
logger.error(f"HF API Request failed after all retries: {e}", exc_info=True)
return []

async def detect_vandalism_clip(image: Image.Image, client: httpx.AsyncClient = None):
"""
Detects vandalism/graffiti using Zero-Shot Image Classification with CLIP (Async).
Includes retry logic with exponential backoff for transient failures.
"""
try:
labels = ["graffiti", "vandalism", "spray paint", "street art", "clean wall", "public property", "normal street"]
Expand All @@ -69,10 +88,14 @@ async def detect_vandalism_clip(image: Image.Image, client: httpx.AsyncClient =
})
return detected
except Exception as e:
print(f"HF Detection Error: {e}")
logger.error(f"HF Vandalism Detection Error: {e}", exc_info=True)
return []

async def detect_infrastructure_clip(image: Image.Image, client: httpx.AsyncClient = None):
"""
Detects infrastructure damage using Zero-Shot Image Classification with CLIP (Async).
Includes retry logic with exponential backoff for transient failures.
"""
try:
labels = ["broken streetlight", "damaged traffic sign", "fallen tree", "damaged fence", "pothole", "clean street", "normal infrastructure"]

Expand All @@ -97,10 +120,14 @@ async def detect_infrastructure_clip(image: Image.Image, client: httpx.AsyncClie
})
return detected
except Exception as e:
print(f"HF Detection Error: {e}")
logger.error(f"HF Infrastructure Detection Error: {e}", exc_info=True)
return []

async def detect_flooding_clip(image: Image.Image, client: httpx.AsyncClient = None):
"""
Detects flooding/waterlogging using Zero-Shot Image Classification with CLIP (Async).
Includes retry logic with exponential backoff for transient failures.
"""
try:
labels = ["flooded street", "waterlogging", "blocked drain", "heavy rain", "dry street", "normal road"]

Expand All @@ -125,5 +152,5 @@ async def detect_flooding_clip(image: Image.Image, client: httpx.AsyncClient = N
})
return detected
except Exception as e:
print(f"HF Detection Error: {e}")
logger.error(f"HF Flooding Detection Error: {e}", exc_info=True)
return []
Loading
Loading