-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebhook_api.py
More file actions
1940 lines (1677 loc) · 86.9 KB
/
webhook_api.py
File metadata and controls
1940 lines (1677 loc) · 86.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# main_api.py
import os
import json
import uuid
import requests
import logging
import datetime
import time
import re
import asyncio
from contextlib import contextmanager
from io import BytesIO
from typing import List, Dict, Any, Optional, Tuple
from urllib.parse import urlparse
# --- FastAPI Imports ---
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, HttpUrl
import uvicorn
# --- ML/RAG Imports from ImprovedSemanticChunker ---
import torch
import numpy as np
import google.generativeai as genai
from groq import Groq
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
# llama_parse / llama_index imports removed — now using LlamaParse v2 REST API directly
import tiktoken
from rank_bm25 import BM25Okapi
from dotenv import load_dotenv
# --- Web parsing/search imports ---
from bs4 import BeautifulSoup
try:
from duckduckgo_search import DDGS
except Exception:
DDGS = None
# --- Language detection and OCR imports ---
try:
from langdetect import detect, DetectorFactory, LangDetectException
DetectorFactory.seed = 0
except Exception:
detect = None
LangDetectException = Exception
import pytesseract
from PIL import Image
import fitz # PyMuPDF
# --- Load Environment Variables ---
load_dotenv()
# ==============================================================================
# Simple Web Tooling (fetch + search)
# ==============================================================================
class WebTool:
"""Lightweight web tool for HTTP fetch and optional web search."""
def __init__(self) -> None:
self.session = requests.Session()
self.default_headers = {
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36"
),
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
def fetch_url(self, url: str, timeout_seconds: int = 30) -> Dict[str, Any]:
"""Fetch a URL and return response metadata and content."""
resp = self.session.get(url, headers=self.default_headers, timeout=timeout_seconds, allow_redirects=True)
content_type = resp.headers.get("content-type", "")
text: Optional[str] = None
json_data: Optional[Any] = None
binary_bytes: Optional[bytes] = None
# Try JSON first when content-type hints it
if "application/json" in content_type:
try:
json_data = resp.json()
text = json.dumps(json_data, indent=2, ensure_ascii=False)
except Exception:
text = resp.text
elif any(ct in content_type for ct in ["text/html", "text/plain", "application/xml"]):
text = resp.text
else:
# Unknown/binary content
binary_bytes = resp.content
return {
"status_code": resp.status_code,
"url": resp.url,
"headers": dict(resp.headers),
"content_type": content_type,
"text": text,
"json": json_data,
"content": binary_bytes,
}
def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]:
"""Basic web search using DuckDuckGo, if available."""
if DDGS is None:
return []
try:
results: List[Dict[str, Any]] = []
with DDGS() as ddgs:
for i, r in enumerate(ddgs.text(query, max_results=max_results)):
# r keys typically: title, href, body
results.append({
"rank": i + 1,
"title": r.get("title"),
"href": r.get("href"),
"snippet": r.get("body"),
})
return results
except Exception:
return []
@staticmethod
def html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
# Remove script/style
for tag in soup(["script", "style", "noscript"]):
tag.extract()
text = soup.get_text(" ")
# Normalize whitespace
text = re.sub(r"\s+", " ", text).strip()
return text
# ==============================================================================
# IMPROVED SEMANTIC CHUNKER CLASS (Your core logic)
# ==============================================================================
class ImprovedSemanticChunker:
"""
A self-contained class to handle the entire RAG pipeline:
document fetching, text extraction, chunking, embedding, and answer generation.
"""
def __init__(self):
"""Initialize the improved semantic chunker with better models and configurations"""
# Configure logging for application status
app_log_directory = "logs"
os.makedirs(app_log_directory, exist_ok=True)
log_filename = f"rag_query_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
log_path = os.path.join(app_log_directory, log_filename)
# Structured logger writing both to rotating file and console
self.logger = logging.getLogger("ImprovedSemanticChunker")
self.logger.setLevel(logging.INFO)
# Clear pre-existing handlers (to avoid duplicates on reload)
self.logger.handlers.clear()
# File handler
file_handler = logging.FileHandler(log_path, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Formatter
fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(fmt)
console_handler.setFormatter(fmt)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
# Create directory for detailed transaction logs
os.makedirs("transaction_logs", exist_ok=True)
# Configure Groq LLM (Primary)
self.groq_api_key = os.getenv('GROQ_API_KEY')
self.groq_client = None
self.groq_model = "openai/gpt-oss-120b"
# Configure Google Gemini (Fallback)
self.google_api_key = os.getenv('GOOGLE_API_KEY')
# Initialize Groq client if API key is available
if self.groq_api_key:
try:
self.groq_client = Groq(api_key=self.groq_api_key)
self.logger.info(f"Groq LLM initialized successfully with model: {self.groq_model}")
self.primary_llm = "groq"
except Exception as e:
self.logger.warning(f"Failed to initialize Groq LLM: {e}")
self.groq_client = None
self.primary_llm = "gemini"
else:
self.logger.warning("GROQ_API_KEY not found, using Gemini as primary LLM")
self.primary_llm = "gemini"
# Initialize Gemini (fallback or primary if Groq fails)
if self.google_api_key:
try:
genai.configure(api_key=self.google_api_key)
self.llm_model_lite = genai.GenerativeModel('gemini-2.5-flash-lite')
self.llm_model_full = genai.GenerativeModel('gemini-2.5-flash')
self.current_llm_model = self.llm_model_lite
self.logger.info("Gemini LLM initialized successfully (fallback)")
except Exception as e:
self.logger.error(f"Failed to initialize Gemini LLM: {e}")
if self.primary_llm == "gemini":
raise ValueError("Both Groq and Gemini LLM initialization failed")
else:
if self.primary_llm == "gemini":
raise ValueError("GOOGLE_API_KEY environment variable is required when Groq is not available")
# Initialize memory-efficient embedding model
self.logger.info("Loading BGE-large-EN embedding model (memory optimized)...")
# Use GPU if available, otherwise CPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.embedding_model = SentenceTransformer('BAAI/bge-large-en-v1.5', device=device)
# Initialize ChromaDB with persistent storage
os.makedirs("vector_store", exist_ok=True)
self.chroma_client = chromadb.PersistentClient(path="vector_store")
# Initialize tokenizer for token-based chunking
self.tokenizer = tiktoken.get_encoding("cl100k_base")
# Chunking parameters
self.chunk_size_tokens = 300
self.overlap_tokens = 50
# Document cache for processed chunks
self.cache_dir = "document_cache"
os.makedirs(self.cache_dir, exist_ok=True)
self.document_cache: Dict[str, str] = {} # url -> cache_file_path
self._load_cache_index()
# Clean cache if it gets too large
self._clean_cache(max_entries=50) # Keep max 50 cached documents
# ZIP file error message
self.ZIP_ERROR_MESSAGE = "ZIP file is not allowed, please upload a valid file"
# Initialize BM25 for keyword search (hybrid search component)
self.bm25_index = None
self.bm25_documents = [] # Store documents for BM25 indexing
self.document_chunks = [] # Store chunk texts for retrieval
# BIN file error message
self.BIN_ERROR_MESSAGE = "BIN file is not allowed, please upload a valid file"
# Archive file error message
self.ARCHIVE_ERROR_MESSAGE = "Archive files (RAR, 7Z) are not allowed, please upload a valid file"
# Track language of the active collection for query translation
self.collection_language: Optional[str] = None
# --- Web tools ---
self.web_tool = WebTool()
# Set to None to allow all hosts for agentic HTTP GET actions
self.allowed_action_hosts: Optional[set[str]] = None
# Track current document URL for per-request overrides
self.current_doc_url: Optional[str] = None
# OCR availability
try:
_ = pytesseract.get_tesseract_version()
self.tesseract_available = True
# Optional: log languages if obtainable
try:
langs = pytesseract.get_languages(config='')
self.logger.info(f"Tesseract available. Languages: {', '.join(langs)}")
except Exception:
self.logger.info("Tesseract available.")
except Exception:
self.tesseract_available = False
self.logger.warning("Tesseract not available on PATH; OCR will be skipped.")
# --------------------------- Document Caching System ---------------------------
def _get_cache_key(self, doc_url: str) -> str:
"""Generate a safe cache key from document URL."""
import hashlib
return hashlib.md5(doc_url.encode()).hexdigest()
def _load_cache_index(self):
"""Load the cache index from disk."""
cache_index_path = os.path.join(self.cache_dir, "cache_index.json")
try:
if os.path.exists(cache_index_path):
with open(cache_index_path, 'r', encoding='utf-8') as f:
self.document_cache = json.load(f)
self.logger.info(f"Loaded document cache index with {len(self.document_cache)} entries")
else:
self.document_cache = {}
self.logger.info("No existing cache index found, starting fresh")
except Exception as e:
self.logger.warning(f"Failed to load cache index: {e}")
self.document_cache = {}
def _save_cache_index(self):
"""Save the cache index to disk."""
cache_index_path = os.path.join(self.cache_dir, "cache_index.json")
try:
with open(cache_index_path, 'w', encoding='utf-8') as f:
json.dump(self.document_cache, f, indent=2, ensure_ascii=False)
self.logger.debug("Cache index saved successfully")
except Exception as e:
self.logger.warning(f"Failed to save cache index: {e}")
def _get_cached_chunks(self, doc_url: str) -> Optional[List[Dict[str, Any]]]:
"""Retrieve cached chunks for a document URL."""
cache_key = self._get_cache_key(doc_url)
if cache_key not in self.document_cache:
return None
cache_file_path = self.document_cache[cache_key]
cache_full_path = os.path.join(self.cache_dir, cache_file_path)
try:
if not os.path.exists(cache_full_path):
# Cache file missing, remove from index
del self.document_cache[cache_key]
self._save_cache_index()
return None
with open(cache_full_path, 'r', encoding='utf-8') as f:
cached_data = json.load(f)
# Validate cache structure
if not isinstance(cached_data, dict) or 'chunks' not in cached_data:
self.logger.warning(f"Invalid cache structure for {doc_url}")
return None
chunks = cached_data['chunks']
cached_time = cached_data.get('cached_time', 'unknown')
self.logger.info(f"✅ Retrieved {len(chunks)} cached chunks for document (cached: {cached_time})")
return chunks
except Exception as e:
self.logger.warning(f"Failed to load cached chunks for {doc_url}: {e}")
# Remove corrupted cache entry
if cache_key in self.document_cache:
del self.document_cache[cache_key]
self._save_cache_index()
return None
def _cache_chunks(self, doc_url: str, chunks: List[Dict[str, Any]]):
"""Cache processed chunks for a document URL."""
cache_key = self._get_cache_key(doc_url)
cache_filename = f"{cache_key}.json"
cache_full_path = os.path.join(self.cache_dir, cache_filename)
try:
cache_data = {
'url': doc_url,
'chunks': chunks,
'cached_time': datetime.datetime.now().isoformat(),
'chunk_count': len(chunks),
'total_size': sum(chunk.get('size', 0) for chunk in chunks)
}
with open(cache_full_path, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
# Update cache index
self.document_cache[cache_key] = cache_filename
self._save_cache_index()
self.logger.info(f"💾 Cached {len(chunks)} chunks for document")
except Exception as e:
self.logger.warning(f"Failed to cache chunks for {doc_url}: {e}")
def _clean_cache(self, max_entries: int = 100):
"""Clean old cache entries if cache grows too large."""
if len(self.document_cache) <= max_entries:
return
try:
# Get file modification times
cache_files = []
for cache_key, filename in self.document_cache.items():
cache_path = os.path.join(self.cache_dir, filename)
if os.path.exists(cache_path):
mtime = os.path.getmtime(cache_path)
cache_files.append((cache_key, filename, mtime))
# Sort by modification time (oldest first)
cache_files.sort(key=lambda x: x[2])
# Remove oldest entries
entries_to_remove = len(cache_files) - max_entries
for i in range(entries_to_remove):
cache_key, filename, _ = cache_files[i]
cache_path = os.path.join(self.cache_dir, filename)
try:
os.remove(cache_path)
del self.document_cache[cache_key]
except Exception as e:
self.logger.warning(f"Failed to remove cache file {filename}: {e}")
self._save_cache_index()
self.logger.info(f"🧹 Cleaned {entries_to_remove} old cache entries")
except Exception as e:
self.logger.warning(f"Failed to clean cache: {e}")
def clear_cache(self):
"""Clear all cached documents."""
try:
import shutil
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir, exist_ok=True)
self.document_cache = {}
self._save_cache_index()
self.logger.info("🗑️ Cache cleared successfully")
except Exception as e:
self.logger.warning(f"Failed to clear cache: {e}")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
try:
total_size = 0
file_count = len(self.document_cache)
for filename in self.document_cache.values():
cache_path = os.path.join(self.cache_dir, filename)
if os.path.exists(cache_path):
total_size += os.path.getsize(cache_path)
return {
'cached_documents': file_count,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'cache_directory': self.cache_dir
}
except Exception as e:
self.logger.warning(f"Failed to get cache stats: {e}")
return {'error': str(e)}
def timeout_context(self, seconds):
"""Context manager for implementing timeout on operations (cross-platform)."""
# We'll use this in a different way - see parse_and_chunk_with_llamaparse
return seconds
def is_zip_file(self, file_url: str) -> bool:
"""Check if the file URL points to a ZIP file."""
try:
parsed_url = urlparse(file_url)
path = parsed_url.path.lower()
return path.endswith('.zip')
except Exception:
return False
def is_bin_file(self, file_url: str) -> bool:
"""Check if the file URL points to a BIN file."""
try:
parsed_url = urlparse(file_url)
path = parsed_url.path.lower()
return path.endswith('.bin')
except Exception:
return False
def is_archive_file(self, file_url: str) -> bool:
"""Check if the file URL points to an archive file (RAR, 7Z)."""
try:
parsed_url = urlparse(file_url)
path = parsed_url.path.lower()
return path.endswith(('.rar', '.7z'))
except Exception:
return False
def is_pdf_url(self, file_url: str) -> bool:
try:
parsed_url = urlparse(file_url)
return parsed_url.path.lower().endswith('.pdf')
except Exception:
return False
def is_unsupported_file(self, file_url: str) -> tuple[bool, str]:
"""Check if the file URL points to an unsupported file type and return error message."""
if self.is_zip_file(file_url):
return True, self.ZIP_ERROR_MESSAGE
elif self.is_bin_file(file_url):
return True, self.BIN_ERROR_MESSAGE
elif self.is_archive_file(file_url):
return True, self.ARCHIVE_ERROR_MESSAGE
return False, ""
def select_llm_model(self, doc_url: str) -> None:
"""
Select the appropriate LLM model based on the document URL.
Uses gemini-2.5-flash for Pincode data URL, gemini-2.5-flash-lite for others.
"""
if doc_url == self.PINCODE_DATA_URL:
self.current_llm_model = self.llm_model_full
self.logger.info("Using gemini-2.5-flash for Pincode data URL")
else:
self.current_llm_model = self.llm_model_lite
self.logger.info("Using gemini-2.5-flash-lite for standard processing")
# --------------------------------------------------------------------------
# Agentic tool-use helpers
# --------------------------------------------------------------------------
@staticmethod
def _normalize_whitespace(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def decide_use_web_tool(self, doc_url: str, questions: List[str]) -> Dict[str, Any]:
"""Ask the LLM to decide whether to use the web tool, and how."""
try:
sample_questions = "\n".join([f"- {q}" for q in questions[:3]])
prompt = f"""
You are a tool-use planner for a RAG API. Decide if answering the questions requires using a web tool INSTEAD OF processing the document.
Tools available:
- fetch_url: fetch the exact document_url and use its returned content.
- web_search: search the public web for answers to the questions.
Constraints:
- Prefer fetch_url if the provided document_url appears to be an HTTP(S) endpoint returning HTML/JSON/text (e.g., api links, webpages) or the question says to open that link.
- Prefer web_search if the questions cannot be answered from the provided pdf document_url and require general web context.
- Otherwise, return none to use the normal RAG flow (parse document + potentially use agentic tools based on document content).
Document URL: {doc_url}
Questions:
{sample_questions}
Return ONLY a compact JSON object with keys: use_tool (true/false), tool ("fetch_url"|"web_search"|"none"), reason, target_url (optional), search_query (optional).
"""
raw = self.generate_llm_response(prompt)
decision: Dict[str, Any] = {}
try:
decision = json.loads(raw)
except Exception:
# Heuristic fallback
lowered = (" ".join(questions)).lower()
if any(k in lowered for k in ["go to the link", "visit", "open the link", "fetch from url", "api"]):
decision = {"use_tool": True, "tool": "fetch_url", "reason": "Instruction explicitly asks to open the provided link.", "target_url": doc_url}
else:
decision = {"use_tool": False, "tool": "none", "reason": "Use normal RAG flow."}
# Defensive defaults
decision.setdefault("use_tool", False)
decision.setdefault("tool", "none")
decision.setdefault("reason", "")
if decision.get("tool") == "fetch_url" and not decision.get("target_url"):
decision["target_url"] = doc_url
return decision
except Exception as e:
self.logger.warning(f"Tool decision failed: {e}")
return {"use_tool": False, "tool": "none", "reason": "Decision error; default to RAG."}
def decide_agentic_from_context(self, question: str, retrieved_chunks: List[Dict[str, Any]]) -> bool:
"""Decide whether to trigger agentic multi-step actions in a generic way using context/question cues."""
try:
combined_texts = "\n".join([c.get('text', '') for c in retrieved_chunks])
combined_lower = (question + "\n" + combined_texts).lower()
# Generic cues suggesting API/tool interaction
generic_cues = [
"endpoint", "api", "http", "https", "call this", "make a get", "perform get",
"token", "teams/public", "/utils/", "/flights/"
]
has_generic_signal = any(cue in combined_lower for cue in generic_cues)
if has_generic_signal:
self.logger.info("🧭 Agentic trigger detected via generic API/endpoint cues")
return True
return False
except Exception as e:
self.logger.warning(f"Agentic decision failed: {e}")
return False
def build_chunks_from_text(self, text: str) -> List[Dict[str, Any]]:
text = self._normalize_whitespace(text)
if not text:
return []
# If small, keep single chunk
if len(text) < 2000:
return [{"id": "chunk_0", "text": text, "size": len(text)}]
# Else token-based chunking
return self.token_based_chunking(text)
def fetch_url_as_chunks(self, url: str) -> List[Dict[str, Any]]:
"""Fetch a URL and convert response to text chunks."""
try:
fetched = self.web_tool.fetch_url(url)
if fetched.get("text"):
if "text/html" in fetched.get("content_type", ""):
text = self.web_tool.html_to_text(fetched["text"]) # strip markup
else:
text = fetched["text"]
return self.build_chunks_from_text(text)
# If binary but JSON is present
if fetched.get("json") is not None:
return self.build_chunks_from_text(json.dumps(fetched["json"], indent=2, ensure_ascii=False))
# If binary only, cannot chunk
return []
except Exception as e:
self.logger.error(f"Failed to fetch URL as chunks: {e}")
return []
# --------------------------- Language detection & OCR for non-English PDFs ---------------------------
def is_english_text(self, text: str) -> bool:
snippet = (text or "").strip()
if not snippet:
self.logger.debug("🔍 Language detection: Empty text, assuming English")
return True
# Quick ASCII heuristic
letters = sum(ch.isalpha() for ch in snippet)
ascii_letters = sum((ch.isalpha() and ord(ch) < 128) for ch in snippet)
ascii_ratio = ascii_letters / max(letters, 1) if letters > 0 else 1.0
self.logger.info(f"🔍 Language detection stats: {letters} total letters, {ascii_letters} ASCII letters, ratio: {ascii_ratio:.3f}")
if letters >= 20 and ascii_ratio < 0.4:
self.logger.info(f"🔍 Non-English detected by ASCII heuristic: ratio {ascii_ratio:.3f} < 0.4")
return False
# langdetect when available
if detect is None:
result = ascii_ratio > 0.6
self.logger.info(f"🔍 langdetect unavailable, using ASCII heuristic: {result} (ratio {ascii_ratio:.3f})")
return result
try:
lang = detect(snippet)
result = lang == 'en'
self.logger.info(f"🔍 langdetect result: '{lang}' -> English: {result}")
return result
except LangDetectException as e:
result = ascii_ratio > 0.6
self.logger.info(f"🔍 langdetect failed ({e}), using ASCII heuristic: {result} (ratio {ascii_ratio:.3f})")
return result
def ocr_pdf_to_text(self, file_url: str, dpi: int = 200, max_pages: Optional[int] = None) -> str:
try:
if not getattr(self, 'tesseract_available', False):
self.logger.warning("OCR requested but Tesseract is not available; skipping OCR and returning empty text.")
return ""
self.logger.info(f"🖼 Starting OCR for PDF: {file_url}")
resp = requests.get(file_url, timeout=30)
resp.raise_for_status()
pdf_bytes = resp.content
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
texts: List[str] = []
num_pages = doc.page_count
pages_to_process = range(num_pages if max_pages is None else min(num_pages, max_pages))
# Allow configuring OCR languages via env; default to English + Malayalam
ocr_langs = os.getenv("TESSERACT_LANGS", "eng+mal")
for page_index in pages_to_process:
page = doc.load_page(page_index)
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat, alpha=False)
img_bytes = pix.tobytes("png")
img = Image.open(BytesIO(img_bytes))
page_text = ""
try:
page_text = pytesseract.image_to_string(img, lang=ocr_langs)
except Exception as e:
self.logger.warning(f"Tesseract OCR failed on page {page_index} with langs='{ocr_langs}': {e}; falling back to 'eng'")
try:
page_text = pytesseract.image_to_string(img, lang="eng")
except Exception as e2:
self.logger.warning(f"Tesseract OCR fallback to 'eng' failed on page {page_index}: {e2}")
page_text = ""
if page_text:
texts.append(page_text)
doc.close()
full_text = self._normalize_whitespace("\n\n".join(texts))
self.logger.info(f"🖼 OCR completed, extracted {len(full_text)} characters")
return full_text
except Exception as e:
self.logger.error(f"OCR pipeline failed: {e}")
return ""
async def process_questions_with_fixed_context(self, questions: List[str], log_dir_for_request: str, fixed_chunks: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[str]]:
"""Deprecated sequential fixed-context path; kept for compatibility."""
return await self.process_questions_with_fixed_context_parallel(questions, log_dir_for_request, fixed_chunks)
async def process_questions_with_fixed_context_parallel(self, questions: List[str], log_dir_for_request: str, fixed_chunks: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[str]]:
self.logger.info(f"Using fixed full-text context for {len(questions)} questions (parallel)")
loop = asyncio.get_running_loop()
# Precompute ranked chunks once
ranked_chunks_template = [dict(c, **{"rank": idx + 1}) for idx, c in enumerate(fixed_chunks)]
async def handle_question(i: int, question: str) -> Dict[str, Any]:
try:
chunks_log_path = os.path.join(log_dir_for_request, f"query_{i + 1}_chunks.json")
with open(chunks_log_path, 'w', encoding='utf-8') as f_chunks:
json.dump(ranked_chunks_template, f_chunks, indent=2, ensure_ascii=False)
answer = await loop.run_in_executor(None, self.generate_improved_answer, question, ranked_chunks_template)
return {
'question': question,
'answer': answer,
'retrieved_chunks_file': chunks_log_path,
'index': i,
'success': True
}
except Exception as e:
self.logger.error(f"Fixed-context processing failed for question {i + 1}: {e}")
return {
'question': question,
'answer': f"Error processing question: {str(e)}",
'retrieved_chunks_file': None,
'index': i,
'success': False,
'error': str(e)
}
tasks = [handle_question(i, q) for i, q in enumerate(questions)]
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results: List[Dict[str, Any]] = []
for i, r in enumerate(results):
if isinstance(r, Exception):
processed_results.append({
'question': questions[i],
'answer': f"Error processing question: {str(r)}",
'retrieved_chunks_file': None,
'index': i,
'success': False,
'error': str(r)
})
else:
processed_results.append(r)
processed_results.sort(key=lambda x: x['index'])
final_answers = [res['answer'] for res in processed_results]
return processed_results, final_answers
# --------------------------- Language utilities ---------------------------
def detect_language(self, text: str) -> str:
"""Return ISO-639-1 language code when possible; fallback to 'en'."""
snippet = (text or "").strip()
if not snippet:
return "en"
if detect is None:
letters = sum(ch.isalpha() for ch in snippet)
ascii_letters = sum((ch.isalpha() and ord(ch) < 128) for ch in snippet)
return "en" if ascii_letters / max(letters, 1) > 0.6 else "ml"
try:
lang = detect(snippet)
return lang or "en"
except Exception:
return "en"
def translate_text(self, text: str, target_lang_code: str) -> str:
"""Translate text to target language code ('en' or 'ml') via LLM; return text on failure."""
try:
lang_name = "English" if target_lang_code == "en" else "Malayalam"
prompt = (
f"Translate the following text to {lang_name} and return ONLY the translation without quotes, preface, or explanation.\n"
f"Text:\n{text}"
)
out = self.generate_llm_response(prompt)
# Best-effort cleanup of extra wrappers
return (out or text).strip().strip('"').strip("`")
except Exception:
return text
def define_tools_for_groq(self):
"""Define tool schemas for Groq's function calling."""
return [
{
"type": "function",
"function": {
"name": "http_get",
"description": "Perform an HTTP GET request to any URL to fetch data or interact with APIs",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to make a GET request to"
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds for the request",
"default": 20
}
},
"required": ["url"]
}
}
}
]
def execute_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool call and return the result."""
try:
if tool_name == "http_get":
url = arguments.get("url")
timeout = arguments.get("timeout", 20)
if not url:
return {"error": "URL is required for http_get"}
fetched = self.web_tool.fetch_url(url, timeout_seconds=timeout)
# Return structured response
result = {
"status_code": fetched.get("status_code"),
"url": fetched.get("url"),
"content_type": fetched.get("content_type"),
}
if fetched.get("json") is not None:
result["data"] = fetched["json"]
result["type"] = "json"
elif fetched.get("text"):
if "text/html" in (fetched.get("content_type") or ""):
result["data"] = self.web_tool.html_to_text(fetched["text"])[:4000]
result["type"] = "html_text"
else:
result["data"] = fetched["text"][:4000]
result["type"] = "text"
else:
result["data"] = f"Binary content ({len(fetched.get('content') or b'')} bytes)"
result["type"] = "binary"
return result
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
return {"error": f"Tool execution failed: {str(e)}"}
def resolve_question_with_tools(self, question: str, retrieved_chunks: List[Dict[str, Any]], log_dir_for_request: str) -> Tuple[bool, str]:
"""Use Groq's tool calling to resolve questions that require API interactions."""
if not self.decide_agentic_from_context(question, retrieved_chunks):
return False, ""
if not self.groq_client:
self.logger.warning("Groq client not available for tool calling")
return False, ""
self.logger.info("🤖 Starting tool-based resolution with Groq")
context_text = "\n\n".join([c.get('text', '') for c in retrieved_chunks])
tools = self.define_tools_for_groq()
tool_calls_log = []
try:
# Initial prompt with context and question
messages = [
{
"role": "user",
"content": f"""Based on the following context, answer this question: "{question}"
Context:
{context_text}
Instructions:
- If the context contains API endpoints or instructions to call specific URLs, use the http_get tool to fetch the required data
- Follow any step-by-step instructions mentioned in the context
- If you need to chain multiple API calls, make them one by one
- Once you have all the necessary information, provide a concise final answer
- If the question asks for a specific value, return only that value
+"""
}
]
max_iterations = 5
for iteration in range(max_iterations):
response = self.groq_client.chat.completions.create(
model=self.groq_model,
messages=messages,
tools=tools,
tool_choice="auto",
temperature=0.1,
max_tokens=2048
)
message = response.choices[0].message
messages.append({
"role": "assistant",
"content": message.content,
"tool_calls": message.tool_calls
})
# If no tool calls, we have the final answer
if not message.tool_calls:
answer = message.content.strip() if message.content else "No answer provided"
# Save tool calls log
agent_log_path = os.path.join(log_dir_for_request, "tool_calls_log.json")
with open(agent_log_path, 'w', encoding='utf-8') as f_log:
json.dump({"tool_calls": tool_calls_log, "final_answer": answer}, f_log, indent=2, ensure_ascii=False)
return True, answer
# Execute tool calls
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
try:
arguments = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
arguments = {}
self.logger.info(f"🔧 Executing tool: {tool_name} with args: {arguments}")
result = self.execute_tool_call(tool_name, arguments)
tool_calls_log.append({
"tool": tool_name,
"arguments": arguments,
"result": result
})
# Add tool result to conversation
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result, ensure_ascii=False)
})
# If we reach max iterations without a final answer
agent_log_path = os.path.join(log_dir_for_request, "tool_calls_log.json")
with open(agent_log_path, 'w', encoding='utf-8') as f_log:
json.dump({"tool_calls": tool_calls_log, "final_answer": None, "status": "max_iterations_reached"}, f_log, indent=2, ensure_ascii=False)
return False, ""
except Exception as e:
self.logger.error(f"Tool-based resolution failed: {e}")
return False, ""
async def process_questions_with_web_search(self, questions: List[str], log_dir_for_request: str) -> Tuple[List[Dict[str, Any]], List[str]]:
"""Process questions using web search results as context (agentic path) in parallel."""
self.logger.info(f"Using agentic web search for {len(questions)} questions")
loop = asyncio.get_running_loop()
async def handle_question(i: int, question: str) -> Dict[str, Any]:
try:
def search_and_fetch() -> List[Dict[str, Any]]:
search_results = self.web_tool.search(question, max_results=5)
retrieved: List[Dict[str, Any]] = []
for r in search_results[:2]:
href = r.get("href")
page_text = ""
if href:
try:
fetched = self.web_tool.fetch_url(href, timeout_seconds=15)
if fetched.get("text"):
if "text/html" in fetched.get("content_type", ""):
page_text = self.web_tool.html_to_text(fetched["text"])[:4000]
else:
page_text = (fetched.get("text") or "")[:4000]
except Exception:
page_text = r.get("snippet") or ""
combined_text = self._normalize_whitespace((page_text or r.get("snippet") or r.get("title") or "")[:4000])
if combined_text:
retrieved.append({
"rank": len(retrieved) + 1,
"text": combined_text,
"similarity_score": 0.0,
"search_type": "web_search",
"source": href or ""
})
return retrieved
retrieved_chunks = await loop.run_in_executor(None, search_and_fetch)
chunks_log_path = os.path.join(log_dir_for_request, f"query_{i + 1}_chunks.json")
with open(chunks_log_path, 'w', encoding='utf-8') as f_chunks:
json.dump(retrieved_chunks, f_chunks, indent=2, ensure_ascii=False)
answer = await loop.run_in_executor(None, self.generate_improved_answer, question, retrieved_chunks)
return {
'question': question,
'answer': answer,
'retrieved_chunks_file': chunks_log_path,
'index': i,
'success': True
}
except Exception as e:
self.logger.error(f"Web search failed for question {i + 1}: {e}")
return {
'question': question,
'answer': f"Error processing question: {str(e)}",
'retrieved_chunks_file': None,
'index': i,
'success': False,
'error': str(e)
}
tasks = [handle_question(i, q) for i, q in enumerate(questions)]
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results: List[Dict[str, Any]] = []
for i, r in enumerate(results):
if isinstance(r, Exception):
self.logger.error(f"Exception in web search question {i + 1}: {r}")
processed_results.append({
'question': questions[i],
'answer': f"Error processing question: {str(r)}",
'retrieved_chunks_file': None,
'index': i,
'success': False,
'error': str(r)
})
else:
processed_results.append(r)