-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocparser.py
More file actions
167 lines (136 loc) · 4.87 KB
/
docparser.py
File metadata and controls
167 lines (136 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import os
import re
from glob import glob
import nltk
import pdfplumber
import weaviate
from dotenv import dotenv_values
from tqdm import tqdm
from weaviate.classes.config import DataType, Property
from weaviate.util import generate_uuid5
from utils import compute_sha256, get_embedding
nltk.download("punkt")
config = dotenv_values(".env")
client = weaviate.connect_to_local()
if not client.collections.exists(config["COLLECTION_NAME"]):
print(
f'Collection does not exist, creating collection {config["COLLECTION_NAME"]}.'
)
client.collections.create(
name=config["COLLECTION_NAME"],
properties=[
Property(name="content", data_type=DataType.TEXT),
Property(name="doc_hash", data_type=DataType.TEXT_ARRAY),
Property(name="file_name", data_type=DataType.TEXT_ARRAY),
],
)
print("Done")
collection = client.collections.get(name=config["COLLECTION_NAME"])
def clean_text(text):
text = re.sub(r"\s+", " ", text)
text = re.sub(r"\n+", "\n", text)
text = re.sub(r"\.{2,}", ".", text)
return text.strip()
def convert_to_dict_format(table):
result = {}
current_key = None
for row in table:
if len(row) > 1:
if row[0]:
current_key = row[0]
combined_value = ":".join(
[str(cell) if cell is not None else "" for cell in row[1:]]
)
result[current_key] = combined_value
elif len(row) == 1 and row[0]: # Handle single-element rows
current_key = row[0]
result[current_key] = ""
return result
def format_dict_as_string(dictionary):
return "\n".join([f"'{key}': '{value}'" for key, value in dictionary.items()])
def table_to_string(table):
converted_dict = convert_to_dict_format(table)
formatted_string = format_dict_as_string(converted_dict)
return formatted_string
def extract_pdf_content(pdf_path):
content = ""
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
text = page.extract_text(x_tolerance=2, y_tolerance=2)
tables = page.extract_tables()
page_content = text if text else ""
for table in tables:
table_str = table_to_string(table)
page_content += "\n\n" + table_str + "\n\n"
content += page_content + "\n\n"
return content
def word_count_chunk_sentences(text, chunk_size=150):
sentences = [
[sentence.strip(), pair[1], pair[2]]
for pair in text
for sentence in nltk.sent_tokenize(pair[0])
]
chunks = []
current_chunk = []
word_count = 0
for i in range(len(sentences)):
words = sentences[i][0].split()
if word_count + len(words) <= chunk_size:
current_chunk.extend(words)
word_count += len(words)
else:
chunks.append([" ".join(current_chunk), sentences[i][1], sentences[i][2]])
current_chunk = sentences[i - 1][0].split() + words
word_count = len(current_chunk)
if current_chunk:
chunks.append([" ".join(current_chunk), sentences[-1][1], sentences[-1][2]])
return chunks
PDF_DIRECTORY = "./Guides"
# PDF_DIRECTORY = "./PDF"
pdf_paths = glob(PDF_DIRECTORY + "/**/*.pdf", recursive=True)
all_texts = []
for pdf_path in tqdm(pdf_paths, desc="Chunking Documents ", colour="#bd4ced"):
file_name = os.path.basename(pdf_path)
filehash = compute_sha256(pdf_path)
extracted_text = extract_pdf_content(pdf_path)
y = clean_text(extracted_text)
all_texts.append([y, filehash, file_name])
documents = word_count_chunk_sentences(all_texts)
fails = 0
duplicates = 0
for idx, document in enumerate(
tqdm(documents, desc="Parsing Chunks ", colour="#bd4ced")
):
embedding = get_embedding(document[0])
data_object = {
"content": document[0],
"doc_hash": [document[1]],
"file_name": [document[2]],
}
content_uuid = generate_uuid5(data_object["content"])
try:
collection.data.insert(
properties=data_object,
uuid=content_uuid,
vector=embedding,
)
except Exception as e:
# Handle the specific exception
if "already exists" in str(e):
# Append document names and hashes in cases of duplicate chunks
duplicates += 1
data_object = collection.query.fetch_object_by_id(content_uuid)
item = data_object.properties
item["doc_hash"].append(document[1])
item["file_name"].append(document[2])
data_object = collection.data.update(
uuid=content_uuid,
properties=item
)
else:
fails += 1
print("An unexpected error occurred.")
print(e)
client.close()
print(f"{duplicates} duplicate chunks have not been added.")
print(f"Failed to add {fails} chunks.")