-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathscript.py
More file actions
executable file
·392 lines (356 loc) · 15 KB
/
script.py
File metadata and controls
executable file
·392 lines (356 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
###################
# Autobooga
# Copyright (C) 2023 by Sammy Fischer (autobooga@cosmic-bandito.com)
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.
#
import os.path
import string
import requests
import json
from bs4 import BeautifulSoup
from summarizer import Summarizer
from modules import chat, shared
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import re
import gradio as gr
from PyPDF2 import PdfReader
CONFIG_FILE="extensions/Autobooga/autobooga_config.json"
LOG_DIR="logs/AB_"
LOG_FILE="_logs.txt"
############# TRIGGER PHRASES #############
## you can add anything you like here, just be careful not to trigger unwanted searches or even loops
INTERNET_QUERY_PROMPTS=[ "search the internet for information on", "search the internet for information about",
"search for information about", "search for information on", "search for ",
"i need more information on ", "search the internet for ",
"can you provide me with more specific details on ",
"can you provide me with details on ",
"can you provide me with more details on ",
"can you provide me with more specific details about ",
"can you provide me with details about ",
"can you provide me with more details about ",
"what can you find out about ", "what information can you find out about ",
"what can you find out on ", "what information can you find out on ",
"what can you tell me about ", "what do you know about ", "ask the search engine on ",
"ask the search engine about "]
FILE_QUERY_PROMPTS=[
"open the file ",
"read the file ",
"summarize the file ",
"get the file "
]
DBNAME = ""
character = "unknown"
# If 'state' is True, will hijack the next chat generation
input_hijack = {
'state': False,
'value': ["", ""]
}
def write_config():
with open(CONFIG_FILE, 'w') as f:
json.dump(params, f, indent=4)
def write_log(char, s):
with open(LOG_DIR+char+LOG_FILE, 'a') as f:
f.write(s)
config = []
try:
with open(CONFIG_FILE) as f:
config = json.load(f)
except:
config = []
params = {
"searx_server":"enter the url to a searx server capable of json here.",
"max_search_results":5,
"max_text_length":1000,
"upload_prompt":"Please summarize the following text, one paragraph at a time:",
"upload_position":"before",
"logging_enabled":1
}
if 'searx_server' in config:
params.update({"searx_server":config['searx_server']})
if 'max_search_results' in config:
try:
params.update({"max_search_results":int(config['max_search_results'])})
except:
pass
if 'max_text_length' in config:
try:
params.update({"max_text_length":int(config['max_text_length'])})
except:
pass
if 'upload_prompt' in config:
params.update({"upload_prompt":config['upload_prompt']})
if 'upload_position' in config:
params.update({"upload_position": config['upload_position']})
write_config()
def set_upload_prompt( x):
params.update({"upload_prompt": x})
write_config()
def set_upload_position( x):
params.update({"upload_position": x})
write_config()
def set_searx_server( x):
params.update({"searx_server": x})
write_config()
def set_max_search_results( x):
try:
params.update({"max_search_results": int(x)})
except:
pass
write_config()
def set_max_extracted_text(x):
try:
params.update({"max_text_length": int(x)})
except:
pass
write_config()
def set_logging_enabled(x):
try:
params.update({"logging_enabled": int(x)})
except:
pass
write_config()
def call_searx_api(query):
url = f"{params['searx_server']}?q={query}&format=json"
try:
response = requests.get(url)
except:
return "An internet search returned no results as the SEARX server did not answer."
# Load the response data into a JSON object.
try:
data = json.loads(response.text)
except:
return "An internet search returned no results as the SEARX server doesn't seem to output json."
# Initialize variables for the extracted texts and count of results.
texts = ''
count = 0
max_results = params['max_search_results']
rs = "An internet search returned these results:"
result_max_characters = params['max_text_length']
# If there are items in the data, proceed with parsing the result.
if 'results' in data:
# For each result, fetch the webpage content, parse it, summarize it, and append it to the string.
for result in data['results']:
# Check if the number of processed results is less than or equal to the maximum number of results allowed.
if count <= max_results:
# Get the URL of the result.
# we won't use it right now, as it would be too much for the context size we have at hand
link = result['url']
# Fetch the webpage content of the result.
content = result['content']
if len(content) > 0: # ensure content is not empty
# Append the summary to the previously extracted texts.
texts = texts + ' ' + content+"\n"
# Increase the count of processed results.
count += 1
# Add the first 'result_max_characters' characters of the extracted texts to the input string.
rs += texts[:result_max_characters]
# Return the modified string.
return rs
## returns only the first URL in a prompt
def extract_url(prompt):
url=""
# Regular expression to match URLs
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
# Find all URLs in the text
urls = re.findall(url_pattern, prompt.lower())
if len(urls)>0:
url=urls[0]
return url
def trim_to_x_words(prompt:string, limit:int):
rev_rs = []
words = prompt.split(" ")
rev_words = reversed(words)
for w in rev_words:
rev_rs.append(w)
limit -= 1
if limit <= 0:
break;
rs = reversed(rev_rs)
return " ".join(rs)
def extract_query(prompt):
rs=["",""]
# Define your sentence-terminating symbols
terminators = [".", "!", "?"]
# Join the terminators into a single string, separating each with a pipe (|), which means "or" in regex
pattern = "|".join(map(re.escape, terminators))
search_prompt = ""
for qry in INTERNET_QUERY_PROMPTS:
if qry in prompt.lower():
search_prompt = qry
break
if search_prompt != "":
query_raw = prompt.lower().split(search_prompt)[1]
rs[1] = query_raw[0]+"."
# Split the text so that we only have the search query
query = re.split(pattern, query_raw)
q = query[0]
q = q.replace(" this year ", datetime.now().strftime("%Y"))
q = q.replace(" this month ", datetime.now().strftime("%B %Y"))
q = q.replace(" today ", datetime.now().strftime("'%B,%d %Y'"))
q = q.replace(" this month ", datetime.now().strftime("%B %Y"))
q = q.replace(" yesterday ", (datetime.today() - timedelta(days=1)).strftime("'%B,%d %Y'"))
q = q.replace(" last month ", (datetime.today() - relativedelta(months=1)).strftime("%B %Y"))
q = q.replace(" last year ", (datetime.today() - relativedelta(years=1)).strftime("%Y"))
rs[0] = q
for rest in q[1:]:
rs[1] += rest
return rs
def extract_file_name( prompt):
rs=""
query_raw = ""
for qry in FILE_QUERY_PROMPTS:
pattern = rf'{qry}(.*)'
match = re.search(pattern, prompt, re.IGNORECASE) # re.IGNORECASE makes the search case-insensitive
if match:
query_raw = match.group(1)
break
if query_raw != "":
pattern = r"([\"'])(.*?)\1"
query = re.search(pattern, query_raw)
if query is not None:
rs = query.group(2)
return rs
def get_page(url, prompt):
text = f"The web page at {url} doesn't have any useable content. Sorry."
try:
response = requests.get(url)
except:
return f"The page {url} could not be loaded"
soup = BeautifulSoup(response.content, 'html.parser')
paragraphs = soup.find_all('p')
if len(paragraphs) > 0:
text = '\n'.join(p.get_text() for p in paragraphs)
text = f"Content of {url} : \n{trim_to_x_words(text, params['max_text_length'])}[...]\n"
else:
text = f"The web page at {url} doesn't seem to have any readable content."
metas = soup.find_all("meta")
for m in metas:
if 'content' in m.attrs:
try:
if 'name' in m and m['name'] == 'page-topic' or m['name'] == 'description':
if 'content' in m and m['content'] != None:
text += f"It's {m['name']} is '{m['content']}'"
except:
pass
if prompt.strip() == url:
text += f"\nSummarize the content from this url : {url}"
return text
def read_pdf( fname):
parts = []
def visitor_body(text, cm, tm, fontDict, fontSize):
y = tm[5]
if y > 50 and y < 720:
parts.append(text)
pdf = PdfReader(fname)
rs = ""
for page in pdf.pages:
page.extract_text(visitor_text=visitor_body)
text_body = "".join(parts)
text_body = text_body.replace("\n", "")
rs += text_body+"\n"
if rs != trim_to_x_words(rs, params['max_text_length']):
break
return rs
def open_file(fname):
rs = ""
print(f"Reading {fname}")
if fname.lower().endswith(".pdf"):
try:
rs = read_pdf(fname)
except:
return "The file can not be opened. Perhaps the filename is wrong?"
else:
try:
with open(fname, 'r') as f:
lines = f.readlines()
except:
return "The file can not be opened. Perhaps the filename is wrong?"
rs = "\n".join(lines)
rs = trim_to_x_words(rs, params['max_text_length'] )
return f"This is the content of the file '{fname}':\n{rs}"
def output_modifier(llm_response, state):
global character
character = state["character_menu"]+"("+shared.model_name+")"
# print("original response : "+llm_response)
# If the LLM needs more information, we call the SEARX API.
q = extract_query(llm_response)
if q[0] != "":
input_hijack.update({'state':True,'value':[f"\nsearch for '"+q[0]+"'\n", f"Searching the internet for information on '{q[0]}' ...\n"]})
## this is needed to avoid a death loop.
llm_response = f"I'll ask the search engine on {q[0]} ..."
if params['logging_enabled'] == 1:
now = datetime.now().strftime("%H:%M on %A %B,%d %Y")
write_log(character, "("+now+")"+character+"> "+llm_response+"\n")
return llm_response
def input_modifier(prompt, state):
global character
character = state["character_menu"]+"("+shared.model_name+")"
now = "it is " + datetime.now().strftime("%H:%M on %A %B,%d %Y") + "."
fn = extract_file_name(prompt)
url = extract_url(prompt)
q = extract_query(prompt)
print(f"Filename found : '{fn}'\nQuery found : {q[0]}\nUrl found : {url}\n")
if fn != "":
prompt = open_file(fn)+prompt
elif url != "":
prompt = get_page(url, prompt)+prompt
elif q[0] != "":
searx_results = call_searx_api(q[0])
# Pass the SEARX results back to the LLM.
if(q[1] == ""):
q[1] = "Summarize the results."
prompt = prompt + "\n" + searx_results+"."+q[1]
if params['logging_enabled'] == 1:
_now = datetime.now().strftime("%H:%M on %A %B,%d %Y")
write_log(character, "\n\n("+_now+") USER > "+prompt+"\n")
return now+"\n"+prompt
def dragAndDropFile(path):
prompt = f"{open_file(path)}\n{params['upload_prompt']}\n"
if params['upload_position'] == "before":
prompt = f"{params['upload_prompt']}\n{open_file(path)}\n"
input_hijack.update({"state": True,
"value": [
prompt,
f"{params['upload_prompt']}"]})
def upload_file(file):
file_path = file.name
print(f"\nUPLOAD-PATH : {file_path}\n")
dragAndDropFile(file_path)
return file_path
def ui():
with gr.Accordion("AutoBooga"):
with gr.Row():
file_output = gr.File()
upload_button = gr.UploadButton("Click to Upload a PDF, TXT or CSV file.NOTE: Some text files do not work if they are, apparently, using newline/formfeed as end of line sequence instead of just newline.", file_types=[".txt", ".pdf", ".csv", ".*"], file_count="single")
upload_button.upload(upload_file, upload_button, file_output).then(
chat.generate_chat_reply_wrapper, shared.input_params, shared.gradio['display'],
show_progress=False)
with gr.Row():
fu_prompt = gr.Textbox(value=params['upload_prompt'], label='Prompt accompanying uploaded files.')
with gr.Row():
fu_position = gr.Dropdown(choices=["before", "after"], value=params['upload_position'], label='Position of the uploaded files prompt in respect to the files content.')
with gr.Row():
searx_server = gr.Textbox(value=params['searx_server'], label='Searx-NG Server capable of returning JSon')
with gr.Row():
max_search_results = gr.Textbox(value=params['max_search_results'], label='The amount of search results to read.')
with gr.Row():
max_extracted_text = gr.Textbox(value=params['max_text_length'], label='The maximum amount of words to read. Anything after that is truncated')
with gr.Row():
logging = gr.Checkbox(value=params['logging_enabled'], label='Log all the dialogs for posterity')
fu_prompt.change(lambda x: set_upload_prompt(x), fu_prompt, None)
fu_position.change(lambda x: set_upload_position(x), fu_position, None)
searx_server.change(lambda x: set_searx_server(x), searx_server, None)
max_search_results.change(lambda x: set_max_search_results(x), max_search_results, None)
max_extracted_text.change(lambda x: set_max_extracted_text(x), max_extracted_text, None)
logging.change( lambda x: set_logging_enabled(x), logging, None)