-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrci_rag.py
More file actions
111 lines (93 loc) · 4.1 KB
/
rci_rag.py
File metadata and controls
111 lines (93 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from code_generation.gemini import CodeGenerator
from config import config
from vector_db_gen import load_vector_db, create_vector_db, query_vector_db
import json
import time
response = CodeGenerator()
def read_LLMSecEval_Py(file_path: str):
with open(file_path, 'r') as file:
lines = file.readlines()
return lines
def read_Sallms_tasks(file_path: str):
security_eval_tasks = []
with open(file_path, 'r') as file:
for line in file:
data = json.loads(line)
security_eval_tasks.append(data.get('prompt', '').strip())
return security_eval_tasks
def read_SecurityEval_tasks(file_path: str):
security_eval_tasks = []
with open(file_path, 'r') as file:
for line in file:
data = json.loads(line)
security_eval_tasks.append(data.get('Prompt', '').strip())
return security_eval_tasks
def prompt_template(task: str, preconditions_guidelines: list[str]):
task_prompt = f"Generate secure Python code for the following:\n {task}"
additional_info = "\nHere are some additional security guidelines that can be followed only if the coding scenario satisfies the specific preconditions:\n"
guideline_num = 1
info = ""
for pair in preconditions_guidelines:
# Access the page_content attribute of the Document object
content = pair.page_content
info += f"#{guideline_num}\n{content}\n"
guideline_num += 1
return task_prompt + additional_info + info
def rci_task_iterative(task_prompt, task_id, iterations):
model_response = task_prompt
f = open(f"output/sample_rci_query/{task_id}.txt", "w+")
# 1. Get initial answer
initial_prompt = model_response
f.write(f"{initial_prompt}\n")
model_response = response.generate_response(initial_prompt, task_id)
f.write(f"{model_response}\n")
for _ in range(iterations):
# 2. Critique the response
if model_response:
critique_prompt = f"Review the following answer and find security problems with it: '{
model_response}'"
f.write(f"{critique_prompt}\n")
critique = response.generate_response(critique_prompt, task_id)
f.write(f"{critique}\n")
if critique:
# 3. Improve the answer
improve_prompt = f"Based on the critique: \n#CRITIQUE_BEGIN#\n'{
critique}'\n#CRITIQUE_END#\n, write the full code by improving the security of the following code: \n#CODE_BEGIN#\n'{model_response}'\n#CODE_END#\n"
f.write(f"{improve_prompt}\n")
model_response = response.generate_response(
improve_prompt, task_id)
f.write(f"{model_response}\n")
else:
return f"improved output was None in iteration {_} for task {task_id}"
else:
return f"Critique was None in iteration {_} for task {task_id}"
f.close()
return model_response
if __name__ == "__main__":
iterations = 2 # Set the number of iterative improvements
# Read tasks from a file
coding_tasks = read_Sallms_tasks(config.prompt_dataset_file)
try:
# Try to load existing database first
db = load_vector_db()
except FileNotFoundError:
# Create new database if none exists
db = create_vector_db()
count = 1
time_in_seconds = 0
# Process each task
for task in coding_tasks:
prompt_id = f"{config.prompt_id_prefix}{count}"
start_time = time.time()
preconditions_guidelines = query_vector_db(task, db)
full_prompt = prompt_template(task, preconditions_guidelines)
final_answer = rci_task_iterative(full_prompt, prompt_id, iterations)
response.write_code_to_file(prompt_id, final_answer)
prompt_file = f"{config.prompt_file_dir}/{prompt_id}.txt"
with open(prompt_file, "w+") as file:
file.write(full_prompt)
count += 1
end_time = time.time()
time_in_seconds += end_time - start_time
print(f"Time taken: {time_in_seconds} seconds")
print(f"Average time per task: {time_in_seconds / count} seconds")