-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
139 lines (114 loc) · 3.79 KB
/
utils.py
File metadata and controls
139 lines (114 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Utility functions for progress tracking, logging, and common operations.
"""
import logging
import json
from datetime import datetime
from typing import Optional
# Configure structured logger
logger = logging.getLogger(__name__)
def get_progress_message(
stage: str,
current: int,
total: int,
extra: str = ""
) -> str:
"""
Generate detailed progress messages for different processing stages.
Args:
stage: Processing stage (extracting, chunking, analyzing, tree)
current: Current progress count
total: Total items to process
extra: Optional extra context (e.g., token counts)
Returns:
Human-readable progress message
"""
messages = {
"extracting": f"Extracting text... {current}/{total} pages",
"chunking": f"Creating chunks... {current}/{total} chars processed",
"analyzing": f"AI analysis... {current}/{total} chunks" + (f" ({extra})" if extra else ""),
"tree": f"Building memory tree... {current}/{total} chunks",
"processing": f"Processing... {current}/{total}" + (f" ({extra})" if extra else "")
}
return messages.get(stage, f"{stage}: {current}/{total}")
def log_chunk_analysis(
chunk_idx: int,
tokens_in: int,
tokens_out: int,
success: bool,
cost: Optional[float] = None,
error: Optional[str] = None
) -> None:
"""
Log chunk analysis in structured JSON format for easy parsing.
Args:
chunk_idx: Index of the chunk
tokens_in: Input tokens used
tokens_out: Output tokens generated
success: Whether analysis succeeded
cost: Optional cost in USD
error: Optional error message if failed
"""
log_data = {
"event": "chunk_analyzed",
"chunk_idx": chunk_idx,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"success": success,
"timestamp": datetime.utcnow().isoformat()
}
if cost is not None:
log_data["cost_usd"] = cost
if error:
log_data["error"] = error
logger.info(json.dumps(log_data))
def log_source_processing(
source_id: str,
stage: str,
status: str,
chunks_processed: Optional[int] = None,
total_chunks: Optional[int] = None,
error: Optional[str] = None
) -> None:
"""
Log source processing events in structured format.
Args:
source_id: ID of the source being processed
stage: Processing stage (extraction, analysis, tree)
status: Status (started, in_progress, completed, failed)
chunks_processed: Number of chunks processed
total_chunks: Total chunks in source
error: Optional error message
"""
log_data = {
"event": "source_processing",
"source_id": source_id,
"stage": stage,
"status": status,
"timestamp": datetime.utcnow().isoformat()
}
if chunks_processed is not None:
log_data["chunks_processed"] = chunks_processed
if total_chunks is not None:
log_data["total_chunks"] = total_chunks
if error:
log_data["error"] = error
if status == "failed":
logger.error(json.dumps(log_data))
else:
logger.info(json.dumps(log_data))
def calculate_progress_percent(current: int, total: int, min_percent: int = 0, max_percent: int = 100) -> int:
"""
Calculate progress percentage with bounds.
Args:
current: Current progress
total: Total items
min_percent: Minimum percentage to return
max_percent: Maximum percentage to return
Returns:
Progress percentage clamped to [min_percent, max_percent]
"""
if total == 0:
return max_percent
percent = int((current / total) * 100)
return max(min_percent, min(max_percent, percent))