-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnerdprompt.py
More file actions
483 lines (429 loc) · 20.9 KB
/
nerdprompt.py
File metadata and controls
483 lines (429 loc) · 20.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
from pathlib import Path
import logging
import traceback
from pygments import highlight
from pygments.lexers import guess_lexer, get_lexer_by_name
from pygments.formatters import Terminal256Formatter
from cerberus import Validator
from dotenv import load_dotenv
import pygments
from openai import OpenAI
import yaml
import os
import string
import sys
import re
import argparse
import time
ANSI_CODES = {
'bold': '\033[1m',
'italic': '\033[3m',
'bold_italic': '\033[1;3m',
'underline': '\033[4m',
'green': '\033[32m',
'blue': '\033[34m',
'red': '\033[31m',
'yellow': '\033[33m',
'magenta': '\033[35m',
'cyan': '\033[36m',
'white': '\033[37m',
'black': '\033[30m',
'bg_red': '\033[41m',
'bg_green': '\033[42m',
'bg_yellow': '\033[43m',
'bg_blue': '\033[44m',
'bg_magenta': '\033[45m',
'bg_cyan': '\033[46m',
'bg_white': '\033[47m',
'bg_black': '\033[40m',
'strikethrough': '\033[9m',
'reverse': '\033[7m',
'conceal': '\033[8m',
'reset': '\033[0m',
}
class PerplexityWrapper:
def __init__(self, config, api_key, prompt_type='default'):
self.api_key = api_key
system_prompt = config['system_content'][prompt_type]
self.messages = [
{
"role": "system",
"content": system_prompt
}
]
def ask(self, config):
client = OpenAI(api_key=self.api_key, base_url=f"{config['llm_url']}")
# chat completion without streaming
start_time = time.perf_counter()
response = client.chat.completions.create(
model = f"{config['llm_model']}",
messages = self.messages,
# stream=True, #Streaming disabled
)
end_time = time.perf_counter()
api_time = end_time - start_time
return response, api_time
def message_appender(self, role, content):
"""Keep context with this function"""
self.messages.append({
"role": role,
"content": content
})
return self.messages
def clear_history(self):
"""Reset conversation while keeping system message"""
system_msg = self.messages[0]
self.messages = [system_msg]
def markdown_to_ansi(self, ANSI_CODES, config, markdown_text):
# Check if headers exist before processing
if '#' in markdown_text:
# Pre-build ANSI strings for all header levels (O(1) setup)
header_ansi = {}
for level in range(1, 7):
header_name = f"header_{level}"
if config[header_name]:
ansi_codes_list = config[header_name]
# Build ANSI prefix from all codes
prefix = ''.join(ANSI_CODES[code] for code in ansi_codes_list)
suffix = ANSI_CODES['reset']
header_ansi[level] = (prefix, suffix)
else:
header_ansi[level] = ('', '')
# Single-pass regex with callback function (O(n) instead of O(6n))
def replace_header(match):
hashes = match.group(1)
text = match.group(2)
level = len(hashes)
if level in header_ansi:
prefix, suffix = header_ansi[level]
return f"{prefix}{text}{suffix}"
return match.group(0)
# One regex pass catches ALL header levels at once
markdown_text = re.sub(r'(?m)^(#{1,6}) (.+)$', replace_header, markdown_text)
# Check if bold/italic exist before processing
if '*' in markdown_text:
# Pre-compute ANSI code strings to avoid repeated dict lookups
reset = ANSI_CODES['reset']
bold_italic = f"{ANSI_CODES['bold_italic']}\\1{reset}"
bold = f"{ANSI_CODES['bold']}\\1{reset}"
italic = f"{ANSI_CODES['italic']}\\1{reset}"
# Convert bold and italic text (***) first to avoid conflicts
markdown_text = re.sub(r'\*\*\*(.+?)\*\*\*', bold_italic, markdown_text)
# Convert bold text (**text**)
markdown_text = re.sub(r'\*\*(.+?)\*\*', bold, markdown_text)
# Convert italic text (*text*)
markdown_text = re.sub(r'\*(.+?)\*', italic, markdown_text)
# Divider choice
ascii_divider_choice = config['ascii_divider_choice']
ascii_divider = config["ascii_dividers"][ascii_divider_choice]
if config["dividers_color"]:
ascii_divider = ANSI_CODES[config['dividers_color']] + ascii_divider + ANSI_CODES[config['dividers_color']] + ANSI_CODES['reset']
# Check if divider exists before processing
if '---' in markdown_text:
markdown_text = re.sub(r'^---$', rf"{ascii_divider}", markdown_text, flags = re.MULTILINE)
# Check if bullets exist before processing
if '\n-' in markdown_text or markdown_text.startswith('-'):
markdown_text = re.sub(r'^\s*-\s+', f" {config['bullet_point_unicode']} ", markdown_text, flags=re.MULTILINE)
return markdown_text
def remove_citations(self, dirty_response):
# Remove citation markers (e.g., [1], [2]) from text
clean_response = re.sub(r'\[\d+\]', '', dirty_response)
return clean_response
# Scrub code from the text to avoid turn comments in headers.
def code_extractor(self, text):
code_blocks = []
# Check if code blocks exist before processing
if '```' not in text:
return {"text": text, "code_blocks": code_blocks}
pattern = r'```[\s\S]*?```'
# Find all code blocks in a single pass (O(n) instead of O(k*n))
matches = list(re.finditer(pattern, text))
# Replace all matches in reverse order to maintain correct positions
for i, match in enumerate(reversed(matches)):
code_blocks.insert(0, match.group(0))
# Replace from end to start so positions don't shift
text = text[:match.start()] + f'<CODE__REMOVED__{len(matches) - 1 - i}>' + text[match.end():]
return {"text": text, "code_blocks": code_blocks}
# Takes dict with doc and code blocks and puts them back together.
# Stylize header before this functions or Python comments become headers
def code_injector(self, doc_and_code_blocks):
md_without_code = doc_and_code_blocks['ansi_converted_text']
code_block_count = 0
for code_block in doc_and_code_blocks['code_blocks']:
md_without_code = md_without_code.replace(f'<CODE__REMOVED__{code_block_count}>',code_block)
code_block_count = code_block_count+1
md_with_code = md_without_code
return md_with_code
class CodeProcesser:
# remove code type (```python.....````) from markup and split out code for syntax highligting
def extract_code_type_and_syntax(self, input_string):
pattern = r'```([A-Za-z]*)([\s\S.]*)```' # Adjusted to handle closing ```
match = re.search(pattern, input_string, re.DOTALL)
if match:
return {
'code_type': match.group(1),
'code_syntax': match.group(2)
}
return None
# Syntax highlighting the 'guts' for the code markdown.first try explict then guess
def syntax_highlighter(self, config, code_type_and_syntax):
try:
lexer = get_lexer_by_name(code_type_and_syntax['code_type'])
except:
lexer = guess_lexer(code_type_and_syntax['code_syntax'])
finally:
formatter = Terminal256Formatter(style = f"{config['code_syntax_theme']}")
highlighted_code = highlight(code_type_and_syntax['code_syntax'], lexer, formatter)
code_type_and_syntax['highlighted_code'] = highlighted_code
return code_type_and_syntax
# We need to piece the highlighted markdown back together ```python\n<code></code>``` and put it
# back in doc converted to ANSI so code displays highlighted
# Added config.yaml code_dividers. This surrounds code blocks and doesn't replace --- markdown
def rebuild_code_type_and_syntax(self, ANSI_CODES, config, extracted_code):
code_type = extracted_code['code_type'].capitalize()
code_syntax = extracted_code['highlighted_code']
code_divider_choice = config["code_divider_choice"]
code_divider = config["code_dividers"][code_divider_choice]
if config["code_dividers_color"]:
code_divider = ANSI_CODES[config['code_dividers_color']] + code_divider + ANSI_CODES[config['code_dividers_color']] + ANSI_CODES['reset']
else:
code_divider = config["code_dividers"][code_divider_choice]
rebuilt_code=f"""\n{code_divider}\n\n{code_type} Code:\n{code_syntax}\n{code_divider}\n"""
return rebuilt_code
class ConfigEater:
def parse_config(self):
script_location = Path(__file__).absolute().parent
with open(f'{script_location}/config.yaml', 'r') as f:
config = yaml.safe_load(f)
return config
def check_config(self, ANSI_CODES, config_dict):
ansi_list = list(ANSI_CODES.keys())
schema = {
'llm_url': {'type': 'string', 'required': True},
'llm_model': {'type': 'string', 'required': True},
'remove_perplexity_citations': {'type': 'boolean', 'required': True},
'dividers_color': {'type': 'string', 'required': True},
'code_dividers_color': {'type': 'string', 'required': True},
'code_syntax_theme': {'type': 'string', 'required': True},
'system_content': {'type': 'dict', 'required': True},
'bullet_point_unicode': {'type': 'string', 'required': True},
'header_1': {'type': 'list', 'schema': {'type': 'string'}, 'required': True, 'allowed' : ansi_list},
'header_2': {'type': 'list', 'schema': {'type': 'string'}, 'required': True, 'allowed' : ansi_list},
'header_3': {'type': 'list', 'schema': {'type': 'string'}, 'required': True, 'allowed' : ansi_list},
'header_4': {'type': 'list', 'schema': {'type': 'string'}, 'required': True, 'allowed' : ansi_list},
'header_5': {'type': 'list', 'schema': {'type': 'string'}, 'required': True, 'allowed' : ansi_list},
'header_6': {'type': 'list', 'schema': {'type': 'string'}, 'required': True, 'allowed' : ansi_list},
'ascii_divider_position': {'type': 'string', 'allowed': ['left', 'center', 'right'], 'required': True},
'ascii_divider_choice': {'type': 'integer', 'required': True},
'ascii_dividers': {
'type': 'dict',
'keysrules': {'type': 'integer'},
'valuesrules': {'type': 'string'},
'required': True
},
'code_divider_choice': {'type': 'integer', 'required': True},
'code_dividers': {
'type': 'dict',
'keysrules': {'type': 'integer'},
'valuesrules': {'type': 'string'},
'required': True
},
}
v = Validator(schema)
is_valid = v.validate(config_dict)
if not is_valid:
raise ValueError(f"Config validation error: {v.errors}")
def test_256_term_colors( ):
for i in range(256):
# Print color block with its code, 6 per line for readability
print(f"\033[48;5;{i}m {i:3d} \033[0m", end=' ')
if i % 16 == 0:
print() # Newline after every 12 colors
print() # Final newline
def parse_arguments():
"""Parse command line arguments with argparse"""
parser = argparse.ArgumentParser(
description='AI-enhanced terminal chat client with fancy customer terminal formatting',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''Examples:
nerdprompt.py "What is Docker?" # Formatted output for humans
nerdprompt.py --raw "Explain Python decorators" # Raw markdown for AI agents
nerdprompt.py -p concise "tell me about the weather" # Use concise system prompt
nerdprompt.py --paste # Paste mode - read multiline input
cat code.py | nerdprompt.py --paste --raw # Pipe content in paste mode
nerdprompt.py -m sonar-pro "complex query" # Use sonar-pro model
nerdprompt.py -m sonar --raw "simple query" # Use sonar model with raw output
For AI agents: Use --raw flag. For multiline input: Use --paste flag.'''
)
parser.add_argument('question', nargs='?',
help='Question to ask the AI (optional, will prompt if not provided)')
parser.add_argument('-r', '--raw', action='store_true',
help='output raw markdown (recommended for AI agents)')
parser.add_argument('-p', '--prompt', default='default',
help='system prompt to use (default: default)')
parser.add_argument('-m', '--model', default=None,
help='LLM model to use (options: sonar, sonar-pro, sonar-reasoning, sonar-research-pro, sonar-deep-research). Overrides config.yaml setting')
parser.add_argument('-n', '--nothread', action='store_true',
help='no thread mode, return only the last answer')
parser.add_argument('--paste', action='store_true',
help='paste mode - read multiline input until EOF (Ctrl+D)')
return parser.parse_args()
def load_api_key():
"""Load API key from environment variables."""
load_dotenv()
api_key = os.getenv("API_KEY")
if not api_key:
logging.error("Error: API_KEY is missing from environment variables. Ensure `.env` is configured.")
sys.exit(1)
return api_key
def get_question(initial_question=None, paste_mode=False):
"""Get question from user, either from initial_question or interactive input"""
if initial_question:
return initial_question
elif paste_mode:
# Paste mode - read multiline input until EOF
# Check if input is being piped
if not sys.stdin.isatty():
# Content is being piped, read from stdin
content = sys.stdin.read()
if not content.strip():
print("No content provided via pipe.")
sys.exit(1)
# Reopen stdin to the terminal for follow-up questions
try:
import os
# Try to reopen stdin to the controlling terminal
tty_fd = os.open('/dev/tty', os.O_RDONLY)
sys.stdin = os.fdopen(tty_fd, 'r')
except (OSError, FileNotFoundError):
# Mark that we had piped input for later handling
sys.stdin.piped_input = True
return content
else:
# Interactive paste mode
print("📋 Paste mode - enter your content (press Ctrl+D when done):")
try:
lines = []
while True:
lines.append(input())
except EOFError:
content = '\n'.join(lines)
if not content.strip():
print("No content provided.")
return get_question(paste_mode=paste_mode)
return content
else:
# Prompt the user for a question interactively
question = input("Please enter your follow up question: ").strip()
while not question:
print("You must enter a question.")
question = input("Please enter your follow up question: ").strip()
return question
def formatted_output(perplexity_client, config, raw_content):
# Formatted output mode - existing formatting pipeline
processing_start = time.perf_counter()
doc_wo_code = perplexity_client.code_extractor(raw_content)
doc_no_code_str = doc_wo_code['text'] #doc without code
ansi_text = perplexity_client.markdown_to_ansi( ANSI_CODES, config, doc_no_code_str) #doc with no code converted to ansi
doc_wo_code['ansi_converted_text'] = ansi_text #adding dict key for ansi converted text
# Only process code blocks if they exist
if doc_wo_code['code_blocks']:
code_processing = CodeProcesser()
rebuilt_code_blocks = []
#process code, 1. take apart markdown 2. explict code highlight 2. reconstruct 3 add to ansi text
for code in doc_wo_code['code_blocks']:
code_type_and_syntax = code_processing.extract_code_type_and_syntax(code)
highlighted_syntax = code_processing.syntax_highlighter(config, code_type_and_syntax)
rebuilt_code = code_processing.rebuild_code_type_and_syntax( ANSI_CODES, config, highlighted_syntax)
rebuilt_code_blocks.append(rebuilt_code)
# replace the original code_blocks with the rebuilt ones
doc_wo_code['code_blocks'] = rebuilt_code_blocks
doc_with_code = perplexity_client.code_injector(doc_wo_code)
no_citation_ansi_text = perplexity_client.remove_citations(doc_with_code)
processing_end = time.perf_counter()
processing_time = processing_end - processing_start
print(no_citation_ansi_text)
return processing_time
def main():
args = parse_arguments()
new_question = ""
new_question_new_context = ""
config_eater = ConfigEater()
config = config_eater.parse_config()
config_eater.check_config( ANSI_CODES, config)
# Override model from command line if specified
if args.model:
config['llm_model'] = args.model
try:
perplexity_client = PerplexityWrapper(config, load_api_key(), args.prompt)
except Exception as e:
print(f"An error occurred: {e}")
traceback.print_exc()
return
while True:
try:
if new_question:
content = new_question
elif new_question_new_context:
content = new_question_new_context
else:
content = get_question(args.question, args.paste)
args.question = None # Clear after first use to allow follow-ups
perplexity_client.message_appender("user", content)
print("🔍 Researching...", end='', flush=True)
response, api_time = perplexity_client.ask(config)
print(f"\r🔍 Researching... done")
raw_content = response.choices[0].message.content
perplexity_client.message_appender("assistant", raw_content)
if args.raw:
# Raw output mode - output exactly response.choices[0].message.content
processing_start = time.perf_counter()
print(raw_content)
processing_time = time.perf_counter() - processing_start
total_time = api_time + processing_time
processing_ms = processing_time * 1000
print(f"\nAPI responded in {api_time:.2f}s | Processing took {processing_ms:.2f}ms | Total: {total_time:.2f}s")
# Exit immediately if no-thread mode is also specified
if args.nothread:
sys.exit(0)
elif args.nothread:
processing_time = formatted_output(perplexity_client, config, raw_content)
total_time = api_time + processing_time
processing_ms = processing_time * 1000
print(f"\nAPI responded in {api_time:.2f}s | Processing took {processing_ms:.2f}ms | Total: {total_time:.2f}s")
# Exit immediately after showing response in no-thread mode
sys.exit(0)
else:
processing_time = formatted_output(perplexity_client, config, raw_content)
total_time = api_time + processing_time
processing_ms = processing_time * 1000
print(f"\nAPI responded in {api_time:.2f}s | Processing took {processing_ms:.2f}ms | Total: {total_time:.2f}s")
try:
follow_up_question = input("y=continue thread; keep context | n=stop; exit | c=clear context; next search starts fresh: ").strip()
except EOFError:
# Handle case where stdin is not available for follow-up input
if hasattr(sys.stdin, 'piped_input'):
print("\nPiped input processed. Cannot continue interactive session.")
else:
print("\nNo content provided via pipe.")
sys.exit(0)
if follow_up_question == "n":
sys.exit(1)
elif follow_up_question == "y":
new_question_new_context = None
if args.paste:
new_question = get_question(paste_mode=True)
else:
new_question = input("Please enter your follow up question: ").strip()
elif follow_up_question == "c":
perplexity_client.clear_history()
new_question = None
if args.paste:
new_question_new_context = get_question(paste_mode=True)
else:
new_question_new_context = input("Please enter a question on a new topic: ").strip()
except Exception as e:
print(f"An error occurred: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()