-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathutils.py
More file actions
366 lines (302 loc) · 11.3 KB
/
utils.py
File metadata and controls
366 lines (302 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
"""
Utility functions for file saving and text processing
"""
import os
import re
from datetime import datetime
from typing import Dict, List, Optional, Tuple
def save_prompts_to_file(
positive_prompt: str,
negative_prompt: str,
breakdown: str,
metadata: Dict,
filename_base: str,
output_dir: str = "output/video_prompts"
) -> Dict:
"""
Save prompts and metadata to a text file
Args:
positive_prompt: Enhanced positive prompt
negative_prompt: Generated negative prompt
breakdown: Structured breakdown
metadata: Dict with generation metadata
filename_base: Base filename (will append timestamp)
output_dir: Directory to save to
Returns:
Dict with success status and filepath
"""
try:
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_base = sanitize_filename(filename_base)
filename = f"{safe_base}_{timestamp}.txt"
filepath = os.path.join(output_dir, filename)
# Format content
content = format_prompt_file(
positive_prompt,
negative_prompt,
breakdown,
metadata
)
# Write to file
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return {
"success": True,
"filepath": filepath,
"error": None
}
except Exception as e:
return {
"success": False,
"filepath": None,
"error": f"File save error: {str(e)}"
}
def format_prompt_file(
positive_prompt: str,
negative_prompt: str,
breakdown: str,
metadata: Dict
) -> str:
"""Format the prompt file content"""
separator = "=" * 70
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
parts: List[str] = [
separator,
"AI VIDEO PROMPT EXPANDER - GENERATED PROMPTS",
f"Generated: {timestamp}",
separator,
"",
"POSITIVE PROMPT:",
positive_prompt or "",
"",
separator,
"",
"NEGATIVE PROMPT:",
negative_prompt or "",
"",
separator,
"",
"BREAKDOWN:",
breakdown or "",
"",
separator,
"",
"METADATA:",
f"Preset: {metadata.get('preset', 'N/A')}",
f"Expansion Tier: {metadata.get('tier', 'N/A')}",
f"Mode: {metadata.get('mode', 'N/A')}",
f"LLM Backend: {metadata.get('backend', 'N/A')}",
f"Model: {metadata.get('model', 'N/A')}",
f"Temperature: {metadata.get('temperature', 'N/A')}",
f"Variation: {metadata.get('variation_num', 'N/A')}",
"",
separator,
"",
"ORIGINAL INPUT:",
metadata.get('original_prompt', 'N/A') or "",
"",
separator
]
reference_guidance = metadata.get("reference_guidance")
if reference_guidance:
parts.extend([
"",
"REFERENCE GUIDANCE PROVIDED TO MAIN LLM:",
str(reference_guidance),
"",
separator
])
creative_brainstorm = metadata.get("creative_brainstorm")
if creative_brainstorm:
parts.extend([
"",
"CREATIVE BRAINSTORM NOTE:",
str(creative_brainstorm),
"",
separator
])
main_llm_error = metadata.get("main_llm_error")
if main_llm_error and not metadata.get("main_llm_success", True):
parts.extend([
"",
"MAIN LLM ERROR DETAILS:",
str(main_llm_error),
"",
separator
])
# Detailed LLM instruction/response trace
trace_lines: List[str] = []
system_prompt = metadata.get("system_prompt")
user_prompt = metadata.get("user_prompt")
raw_llm_output = metadata.get("raw_llm_output")
if system_prompt or user_prompt or raw_llm_output:
trace_lines.append("MAIN PROMPT LLM:")
if system_prompt:
trace_lines.append(" System Prompt:")
trace_lines.extend(" " + line for line in str(system_prompt).splitlines())
if user_prompt:
trace_lines.append(" User Prompt:")
trace_lines.extend(" " + line for line in str(user_prompt).splitlines())
if raw_llm_output:
trace_lines.append(" Response:")
trace_lines.extend(" " + line for line in str(raw_llm_output).splitlines())
else:
trace_lines.append(" Response: [no response recorded]")
trace_lines.append("")
reference_logs = metadata.get("reference_llm_logs") or []
if reference_logs:
trace_lines.append("REFERENCE ANALYSIS CALLS:")
for idx, log in enumerate(reference_logs, start=1):
label = log.get("label", f"Reference {idx}")
directive = log.get("directive", "N/A")
mode = log.get("mode", "N/A")
outcome = "success" if log.get("success") else "fallback"
trace_lines.append(f" {idx}. {label} [{directive} | {mode}] → {outcome}")
sys_prompt = log.get("system_prompt")
if sys_prompt:
trace_lines.append(" System Prompt:")
trace_lines.extend(" " + line for line in str(sys_prompt).splitlines())
user_payload = log.get("user_prompt")
if user_payload:
trace_lines.append(" User Prompt:")
trace_lines.extend(" " + line for line in str(user_payload).splitlines())
raw_response = log.get("raw_response")
if raw_response:
trace_lines.append(" Response:")
trace_lines.extend(" " + line for line in str(raw_response).splitlines())
else:
trace_lines.append(" Response: [no response recorded]")
trace_lines.append("")
if trace_lines:
parts.extend([
"",
"LLM INSTRUCTIONS & RESPONSES:",
"",
*trace_lines,
separator
])
return "\n".join(parts) + "\n"
def sanitize_filename(filename: str) -> str:
"""Remove invalid filename characters"""
# Remove invalid characters
filename = re.sub(r'[<>:"/\\|?*]', '', filename)
# Replace spaces with underscores
filename = filename.replace(' ', '_')
# Limit length
filename = filename[:50]
# Ensure not empty
if not filename:
filename = "prompt"
return filename
def parse_keywords(keyword_string: str) -> List[str]:
"""Parse comma-separated keywords into list"""
if not keyword_string or keyword_string.strip() == "":
return []
# Split by comma and clean
keywords = [kw.strip() for kw in keyword_string.split(',')]
# Remove empty strings
keywords = [kw for kw in keywords if kw]
return keywords
def detect_complexity(prompt: str) -> str:
"""
Analyze prompt complexity and suggest appropriate tier
Returns: 'basic', 'enhanced', 'advanced', or 'cinematic'
"""
word_count = len(prompt.split())
# Technical cinematography terms
technical_terms = [
'shot', 'camera', 'lens', 'lighting', 'composition', 'angle',
'close-up', 'wide shot', 'medium shot', 'tracking', 'dolly',
'pan', 'tilt', 'bokeh', 'depth of field', 'frame', 'focal length',
'aperture', 'exposure', 'soft light', 'hard light', 'backlight',
'rim light', 'key light', 'fill light', 'practical', 'motivated'
]
prompt_lower = prompt.lower()
has_technical_terms = any(term in prompt_lower for term in technical_terms)
# Check for detailed descriptions
has_adjectives = len(re.findall(r'\b(very|extremely|highly|beautiful|stunning|dramatic|vivid)\b', prompt_lower)) > 2
# Tier detection logic
if word_count < 10 and not has_technical_terms:
return "basic"
elif word_count < 25 and not has_technical_terms:
return "enhanced"
elif word_count < 50 or (has_technical_terms and word_count < 80):
return "advanced"
else:
return "cinematic"
def format_breakdown(breakdown_dict: Dict) -> str:
"""Format breakdown dictionary into readable text"""
lines = []
if "subject" in breakdown_dict:
lines.append(f"Subject: {breakdown_dict['subject']}")
if "scene" in breakdown_dict:
lines.append(f"Scene: {breakdown_dict['scene']}")
if "motion" in breakdown_dict:
lines.append(f"Motion: {breakdown_dict['motion']}")
if "aesthetic_control" in breakdown_dict:
lines.append(f"Aesthetic Control: {breakdown_dict['aesthetic_control']}")
if "camera" in breakdown_dict:
lines.append(f"Camera: {breakdown_dict['camera']}")
if "lighting" in breakdown_dict:
lines.append(f"Lighting: {breakdown_dict['lighting']}")
if "style" in breakdown_dict:
lines.append(f"Style: {breakdown_dict['style']}")
if "detected_tier" in breakdown_dict:
lines.append(f"\nDetected Tier: {breakdown_dict['detected_tier']}")
if "applied_preset" in breakdown_dict:
lines.append(f"Applied Preset: {breakdown_dict['applied_preset']}")
return "\n".join(lines)
def truncate_text(text: str, max_length: int = 500, add_ellipsis: bool = True) -> str:
"""Truncate text to maximum length"""
if len(text) <= max_length:
return text
truncated = text[:max_length]
if add_ellipsis:
truncated += "..."
return truncated
def validate_positive_keywords(keywords: List[str], prompt: str) -> Tuple[bool, List[str]]:
"""Check if positive keywords are already in the prompt"""
prompt_lower = prompt.lower()
missing_keywords = []
for keyword in keywords:
if keyword.lower() not in prompt_lower:
missing_keywords.append(keyword)
return len(missing_keywords) == 0, missing_keywords
def clean_llm_output(text: str) -> str:
"""Clean up LLM output (remove markdown, extra whitespace, etc.)"""
# Remove markdown code blocks
text = re.sub(r'```[\w]*\n?', '', text)
# Remove excessive newlines
text = re.sub(r'\n{3,}', '\n\n', text)
# Strip leading/trailing whitespace
text = text.strip()
return text
def extract_prompt_from_response(response: str) -> str:
"""
Extract just the prompt from LLM response
Handles cases where LLM adds explanations
"""
# Look for common prompt markers
markers = [
"enhanced prompt:",
"expanded prompt:",
"final prompt:",
"prompt:",
"here's the prompt:",
"here is the prompt:"
]
response_lower = response.lower()
for marker in markers:
if marker in response_lower:
# Find the marker and take everything after it
idx = response_lower.index(marker)
prompt = response[idx + len(marker):].strip()
# Take only up to the first double newline (end of prompt)
if '\n\n' in prompt:
prompt = prompt.split('\n\n')[0]
return prompt
# If no marker found, return the whole response cleaned
return clean_llm_output(response)