forked from andrewmccalip/Scribe
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathface_db.py
More file actions
294 lines (245 loc) · 9.78 KB
/
face_db.py
File metadata and controls
294 lines (245 loc) · 9.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""
Geometry-based face metadata database.
Each face gets a fingerprint from its geometry (surface type, centroid, area,
bounding box dimensions, edge/vertex counts). Metadata is stored in SQLite.
Matching uses TWO strategies:
1. Exact hash match (fast, covers same-kernel round-trips)
2. Fuzzy match with tolerance (covers cross-kernel round-trips like SolidWorks)
Topology must match exactly (surface type, edge count, vertex count).
Continuous values (centroid, area, dimensions) use configurable tolerance.
"""
import hashlib
import json
import math
import os
import sqlite3
from datetime import datetime, timezone
from OCP.BRepAdaptor import BRepAdaptor_Surface
from OCP.BRepBndLib import BRepBndLib
from OCP.BRepGProp import BRepGProp
from OCP.Bnd import Bnd_Box
from OCP.GProp import GProp_GProps
from OCP.TopAbs import TopAbs_EDGE, TopAbs_VERTEX
from OCP.TopExp import TopExp_Explorer
# ── Config ───────────────────────────────────────────────────────────────────
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "stepviewer.db")
# Tolerance for fuzzy matching (absolute, in model units — typically mm)
FUZZY_TOL_POSITION = 0.01 # centroid: 10 microns
FUZZY_TOL_AREA = 0.1 # area: 0.1 mm^2
FUZZY_TOL_DIM = 0.01 # bbox dimensions: 10 microns
def _get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL")
return conn
def init_db():
"""Create/migrate tables."""
conn = _get_conn()
conn.executescript("""
CREATE TABLE IF NOT EXISTS face_meta (
face_hash TEXT PRIMARY KEY,
meta_json TEXT NOT NULL,
updated_at TEXT NOT NULL,
-- Raw fingerprint values for fuzzy matching
surf_type INTEGER,
cx REAL,
cy REAL,
cz REAL,
area REAL,
dx REAL,
dy REAL,
dz REAL,
n_edges INTEGER,
n_verts INTEGER
);
-- Index for fuzzy lookups (filter by topology first, then range-check numerics)
CREATE INDEX IF NOT EXISTS idx_face_topo
ON face_meta (surf_type, n_edges, n_verts);
""")
conn.close()
# ── Face fingerprinting ─────────────────────────────────────────────────────
def _norm(val):
"""Normalize -0.0 to 0.0 for consistent hashing."""
return val + 0.0
def face_fingerprint_raw(face) -> dict:
"""
Compute raw fingerprint values for a TopoDS_Face.
Returns a dict with all the numeric components.
"""
# Surface type
surf = BRepAdaptor_Surface(face)
surf_type = int(surf.GetType())
# Area + centroid
props = GProp_GProps()
BRepGProp.SurfaceProperties_s(face, props)
area = _norm(props.Mass())
cm = props.CentreOfMass()
cx, cy, cz = _norm(cm.X()), _norm(cm.Y()), _norm(cm.Z())
# Bounding box dimensions (sorted, orientation-independent)
bbox = Bnd_Box()
BRepBndLib.Add_s(face, bbox)
xmin, ymin, zmin, xmax, ymax, zmax = bbox.Get()
dims = sorted([
_norm(abs(xmax - xmin)),
_norm(abs(ymax - ymin)),
_norm(abs(zmax - zmin)),
])
# Topology counts
n_edges = 0
exp_e = TopExp_Explorer(face, TopAbs_EDGE)
while exp_e.More():
n_edges += 1
exp_e.Next()
n_verts = 0
exp_v = TopExp_Explorer(face, TopAbs_VERTEX)
while exp_v.More():
n_verts += 1
exp_v.Next()
return {
"surf_type": surf_type,
"cx": cx, "cy": cy, "cz": cz,
"area": area,
"dx": dims[0], "dy": dims[1], "dz": dims[2],
"n_edges": n_edges, "n_verts": n_verts,
}
def face_fingerprint(face) -> str:
"""
Compute a 16-char hex hash from face geometry.
Uses 3-decimal rounding for the hash (exact match within same kernel).
"""
raw = face_fingerprint_raw(face)
canonical = (
f"T{raw['surf_type']}|"
f"C{round(raw['cx'],3)},{round(raw['cy'],3)},{round(raw['cz'],3)}|"
f"A{round(raw['area'],3)}|"
f"D{round(raw['dx'],3)},{round(raw['dy'],3)},{round(raw['dz'],3)}|"
f"E{raw['n_edges']}V{raw['n_verts']}"
)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
# ── Database operations ──────────────────────────────────────────────────────
def save_face_meta(face_hash: str, meta: dict, raw: dict = None):
"""Upsert metadata + raw fingerprint values for a face hash."""
conn = _get_conn()
now = datetime.now(timezone.utc).isoformat()
if raw:
conn.execute(
"""INSERT OR REPLACE INTO face_meta
(face_hash, meta_json, updated_at,
surf_type, cx, cy, cz, area, dx, dy, dz, n_edges, n_verts)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(face_hash, json.dumps(meta), now,
raw["surf_type"], raw["cx"], raw["cy"], raw["cz"], raw["area"],
raw["dx"], raw["dy"], raw["dz"], raw["n_edges"], raw["n_verts"]),
)
else:
conn.execute(
"INSERT OR REPLACE INTO face_meta (face_hash, meta_json, updated_at) VALUES (?,?,?)",
(face_hash, json.dumps(meta), now),
)
conn.commit()
conn.close()
def lookup_faces_batch(face_hashes: list[str],
face_raws: list[dict],
tol_pos=FUZZY_TOL_POSITION,
tol_area=FUZZY_TOL_AREA,
tol_dim=FUZZY_TOL_DIM) -> dict[str, dict]:
"""
Batch lookup using temp table JOIN + indexed fuzzy queries.
Returns {hash: meta_dict} for all faces that found a match.
Strategy 1: Exact hash match via temp table JOIN (no IN clauses).
Strategy 2: Fuzzy match via indexed SQL per unique topology signature.
"""
if not face_hashes:
return {}
conn = _get_conn()
result = {}
# ── 1. Exact hash match (temp table + JOIN) ─────────────────────────
conn.execute("CREATE TEMP TABLE _lookup_hashes (h TEXT PRIMARY KEY)")
conn.executemany(
"INSERT OR IGNORE INTO _lookup_hashes (h) VALUES (?)",
[(h,) for h in face_hashes]
)
rows = conn.execute(
"""SELECT fm.face_hash, fm.meta_json
FROM face_meta fm
JOIN _lookup_hashes lh ON fm.face_hash = lh.h"""
).fetchall()
conn.execute("DROP TABLE _lookup_hashes")
for fh, mj in rows:
result[fh] = json.loads(mj)
# ── 2. Fuzzy match for misses (SQL with topology index) ─────────────
# Group unmatched faces by topology signature to minimize queries
topo_groups = {} # (surf_type, n_edges, n_verts) -> [(hash, raw), ...]
for h, raw in zip(face_hashes, face_raws):
if h in result or raw is None:
continue
key = (raw["surf_type"], raw["n_edges"], raw["n_verts"])
topo_groups.setdefault(key, []).append((h, raw))
for (surf_type, n_edges, n_verts), faces in topo_groups.items():
# One SQL query per topology signature — uses idx_face_topo index
candidates = conn.execute(
"""SELECT face_hash, meta_json, cx, cy, cz, area, dx, dy, dz
FROM face_meta
WHERE surf_type = ? AND n_edges = ? AND n_verts = ?""",
(surf_type, n_edges, n_verts)
).fetchall()
if not candidates:
continue
for h, raw in faces:
best = None
best_dist = float("inf")
for fh, mj, cx, cy, cz, area, dx, dy, dz in candidates:
if (abs(cx - raw["cx"]) > tol_pos or
abs(cy - raw["cy"]) > tol_pos or
abs(cz - raw["cz"]) > tol_pos):
continue
if abs(area - raw["area"]) > tol_area:
continue
if (abs(dx - raw["dx"]) > tol_dim or
abs(dy - raw["dy"]) > tol_dim or
abs(dz - raw["dz"]) > tol_dim):
continue
dist = math.sqrt(
(cx - raw["cx"])**2 + (cy - raw["cy"])**2 + (cz - raw["cz"])**2
+ (area - raw["area"])**2
)
if dist < best_dist:
best_dist = dist
best = (fh, mj)
if best:
result[h] = json.loads(best[1])
conn.close()
return result
def delete_face_meta(face_hash: str):
conn = _get_conn()
conn.execute("DELETE FROM face_meta WHERE face_hash = ?", (face_hash,))
conn.commit()
conn.close()
def delete_faces(face_hashes: list[str]):
"""Delete metadata for a list of face hashes."""
if not face_hashes:
return
conn = _get_conn()
conn.execute("CREATE TEMP TABLE _del_hashes (h TEXT PRIMARY KEY)")
conn.executemany(
"INSERT OR IGNORE INTO _del_hashes (h) VALUES (?)",
[(h,) for h in face_hashes]
)
conn.execute(
"DELETE FROM face_meta WHERE face_hash IN (SELECT h FROM _del_hashes)"
)
conn.execute("DROP TABLE _del_hashes")
conn.commit()
conn.close()
def clear_database():
"""Delete ALL entries from face_meta (Global Nuke)."""
conn = _get_conn()
conn.execute("DELETE FROM face_meta")
conn.commit()
conn.close()
def get_db_stats() -> dict:
conn = _get_conn()
count = conn.execute("SELECT COUNT(*) FROM face_meta").fetchone()[0]
conn.close()
return {"total_faces": count, "db_path": DB_PATH}
# Initialize on import
init_db()