-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
234 lines (214 loc) · 9.9 KB
/
script.py
File metadata and controls
234 lines (214 loc) · 9.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import os
import shutil
import re
import fitz
import easyocr
import numpy as np
import pandas as pd
import cv2
import warnings
import time
import torch
import locale
warnings.filterwarnings("ignore", category=UserWarning)
class NameFinder:
def __init__(self):
# 1. detecta a linguagem
self.lang = self._get_system_language()
self.msgs = self._get_translations()
# 2. detecta os recursos
self.reference_path = self._auto_detect_reference()
self.source_folder = self._auto_detect_source_folder()
self.found_dir = "NAMES_FIND"
self.not_found_dir = "NAMES_NOT_FIND"
self.allowed_names = set()
self.total_estao = 0
self.total_nao_estao = 0
# detecta hardware
self.device_ready = torch.cuda.is_available()
self.device_name = torch.cuda.get_device_name(0) if self.device_ready else "CPU"
print(f"\n🚀 {self.msgs['hw_detected']}: {self.device_name}")
print(f"📂 {self.msgs['ref_file']}: {self.reference_path}")
print(f"📂 {self.msgs['src_folder']}: {self.source_folder}")
print(f"\n🤖 {self.msgs['ai_start']} {'GPU' if self.device_ready else 'CPU'}...")
self.reader = easyocr.Reader(['pt'], gpu=self.device_ready)
def _get_system_language(self):
"""Detecta o idioma do SO. Retorna 'pt' ou 'en'."""
try:
# Obtém a tupla (língua, encoding), ex: ('pt_BR', 'UTF-8')
sys_lang = locale.getdefaultlocale()[0]
if sys_lang and sys_lang.startswith('pt'):
return 'pt'
except:
pass
return 'en'
def _get_translations(self):
"""Centraliza todas as strings do sistema (I18n)."""
translations = {
'en': {
'hw_detected': 'Hardware detected',
'ref_file': 'Reference file',
'src_folder': 'Source folder',
'ai_start': 'Starting AI engine (EasyOCR) in',
'cleaning': 'Cleaning workspace',
'old_removed': 'Old folder removed',
'new_created': 'New folder created',
'base_loaded': 'Base loaded with {n} valid names',
'processing': 'Processing',
'analyzing': 'AI analyzing',
'found': 'Found',
'not_found': 'Not found',
'report': 'REPORT',
'ref_base': 'Reference base',
'total_time': 'Total time',
'avg_time': 'AVG time by file',
'processed_by': 'AI processing by',
'error_file': 'Error in file',
'fatal_error': 'Fatal Error',
'no_txt': 'No .txt reference file found',
'no_pdf': 'No folder containing PDF files found'
},
'pt': {
'hw_detected': 'Hardware detectado',
'ref_file': 'Arquivo de referência',
'src_folder': 'Pasta de origem',
'ai_start': 'Iniciando motor de IA (EasyOCR) em',
'cleaning': 'Limpando área de trabalho',
'old_removed': 'Pasta antiga removida',
'new_created': 'Nova pasta criada',
'base_loaded': 'Base carregada com {n} nomes válidos',
'processing': 'Processando',
'analyzing': 'IA analisando',
'found': 'Encontrado',
'not_found': 'Não encontrado',
'report': 'RELATÓRIO',
'ref_base': 'Base de referência',
'total_time': 'Tempo total',
'avg_time': 'Tempo médio por arquivo',
'processed_by': 'IA processada por',
'error_file': 'Erro no arquivo',
'fatal_error': 'Erro Fatal',
'no_txt': 'Nenhum arquivo .txt de referência encontrado',
'no_pdf': 'Nenhuma pasta com PDFs encontrada'
}
}
return translations[self.lang]
def _auto_detect_reference(self):
txt_files = [f for f in os.listdir('.') if f.endswith('.txt')]
if not txt_files:
raise FileNotFoundError(f"❌ {self.msgs['no_txt']}")
return txt_files[0]
def _auto_detect_source_folder(self):
output_dirs = ["NAMES_FIND", "NAMES_NOT_FIND"]
for entry in os.scandir('.'):
if entry.is_dir() and entry.name not in output_dirs:
pdf_files = [f for f in os.listdir(entry.path) if f.lower().endswith('.pdf')]
if pdf_files:
return entry.path
raise FileNotFoundError(f"❌ {self.msgs['no_pdf']}")
def setup_directories(self):
pastas = [self.found_dir, self.not_found_dir]
print(f"\n🧹 {self.msgs['cleaning']}...")
for pasta in pastas:
if os.path.exists(pasta):
shutil.rmtree(pasta)
print(f" 🗑️ {self.msgs['old_removed']}: '{pasta}'")
os.makedirs(pasta)
print(f" 📁 {self.msgs['new_created']}: '{pasta}'")
def is_valid_person_name(self, text):
name = text.strip()
name_upper = name.upper()
if not name or len(name) < 5 or ' ' not in name: return False
if any(char.isdigit() for char in name): return False
corporate_terms = {'LTDA', 'S.A.', 'S/A', 'EIRELI', 'MEI', 'EPP', 'GROUP', 'BRASIL', 'MEDICINA'}
name_parts = set(name_upper.split())
return name_parts.isdisjoint(corporate_terms) and len(name_parts) >= 2
def normalize(self, text):
if not text: return ""
text = text.upper()
subs = {'Á':'A', 'À':'A', 'Â':'A', 'Ã':'A', 'É':'E', 'Ê':'E', 'Í':'I', 'Ó':'O', 'Ô':'O', 'Õ':'O', 'Ú':'U', 'Ç':'C'}
for char, sub in subs.items(): text = text.replace(char, sub)
return " ".join(re.sub(r'[^A-Z\s]', ' ', text).split())
def check_match(self, text_to_search, name_set):
pdf_content = self.normalize(text_to_search)
for full_name in name_set:
clean_name = self.normalize(full_name)
name_tokens = [t for t in clean_name.split() if len(t) > 2]
if len(name_tokens) < 2: continue
pattern = r'\b' + r'\b.{0,30}\b'.join([re.escape(t) for t in name_tokens]) + r'\b'
if re.search(pattern, pdf_content):
return True, full_name
return False, None
def get_text_via_ai(self, file_path):
try:
doc = fitz.open(file_path)
text_result = ""
for i in range(min(len(doc), 2)):
page = doc[i]
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0))
img_np = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
if pix.n == 4: img_np = cv2.cvtColor(img_np, cv2.COLOR_RGBA2RGB)
elif pix.n == 1: img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB)
results = self.reader.readtext(img_np, detail=0)
text_result += " ".join(results) + " "
doc.close()
return text_result
except Exception as e:
print(f" ⚠️ AI error: {e}")
return ""
def load_reference_data(self):
try:
with open(self.reference_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
self.allowed_names = {n.strip() for n in lines if self.is_valid_person_name(n)}
print(f"✅ {self.msgs['base_loaded'].format(n=len(self.allowed_names))}")
except Exception as e:
print(f"\n⚠️ Error: {e}")
def process_validation(self):
pdf_files = [f for f in os.listdir(self.source_folder) if f.lower().endswith('.pdf')]
total = len(pdf_files)
for idx, pdf_file in enumerate(pdf_files, 1):
file_path = os.path.join(self.source_folder, pdf_file)
print(f"\n[{idx}/{total}] {self.msgs['processing']}: {pdf_file}")
try:
doc = fitz.open(file_path)
digital_text = " ".join([p.get_text() for p in doc])
doc.close()
match, name = self.check_match(digital_text, self.allowed_names)
if not match:
print(f"🔍 {self.msgs['analyzing']}...")
ai_text = self.get_text_via_ai(file_path)
match, name = self.check_match(ai_text, self.allowed_names)
dest = self.found_dir if match else self.not_found_dir
shutil.copy(file_path, os.path.join(dest, pdf_file))
icon = "✅" if match else "❌"
status = name if match else self.msgs['not_found']
print(f" └──{icon} {status}")
if match: self.total_estao += 1
else: self.total_nao_estao += 1
except Exception as e:
print(f"⚠️ {self.msgs['error_file']} {pdf_file}: {e}")
def run(self):
start_time = time.time()
self.setup_directories()
self.load_reference_data()
self.process_validation()
duration = time.time() - start_time
total_proc = self.total_estao + self.total_nao_estao
avg = duration / total_proc if total_proc > 0 else 0
print("\n" + "="*50)
print(f"📊 {self.msgs['report']}")
print(f"\n📚 {self.msgs['ref_base']}: {len(self.allowed_names)}")
print(f"⏱️ {self.msgs['total_time']}: {duration:.2f}s | {duration / 60:.2f}m")
print(f"⚡ {self.msgs['avg_time']}: {avg:.2f}s")
print(f"🖥️ {self.msgs['processed_by']}: {self.device_name}")
print(f"📂 {self.msgs['found']}: {self.total_estao} | {self.msgs['not_found']}: {self.total_nao_estao}")
print("="*50 + "\n\n")
if __name__ == "__main__":
try:
app = NameFinder()
app.run()
except Exception as e:
# Tenta traduzir até o erro fatal se as mensagens já estiverem carregadas
msg = f"🚨 Fatal Error: {e}"
print(msg)