-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathke_client.py
More file actions
121 lines (100 loc) · 4.23 KB
/
ke_client.py
File metadata and controls
121 lines (100 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Read-only client for the Knowledge Engine database.
Consumers (other projects) should use this library instead of touching
knowledge.db directly. It opens SQLite in read-only mode and reads from
stable views, so internal table changes don't break downstream code.
Usage:
from ke_client import KEClient
ke = KEClient() # or KEClient("/path/to/knowledge.db")
docs = ke.search_documents("gold panning", limit=10)
props = ke.search_properties("cabin", max_price=20000)
"""
import os
import sqlite3
DEFAULT_DB = os.path.join(os.path.dirname(__file__), "knowledge.db")
class KEClient:
def __init__(self, db_path: str = DEFAULT_DB):
if not os.path.exists(db_path):
raise FileNotFoundError(f"Knowledge DB not found: {db_path}")
self.db_path = db_path
self._conn = sqlite3.connect(
f"file:{db_path}?mode=ro", uri=True
)
self._conn.row_factory = sqlite3.Row
def close(self):
self._conn.close()
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
# ---------- Documents ----------
def search_documents(self, query: str, limit: int = 20):
"""FTS5 search over documents. Returns list of dict rows."""
rows = self._conn.execute("""
SELECT v.*, bm25(documents_fts, 5.0, 1.0, 3.0) AS bm25_score
FROM documents_fts fts
JOIN v_documents v ON v.id = fts.rowid
WHERE documents_fts MATCH ?
ORDER BY (bm25(documents_fts, 5.0, 1.0, 3.0) * (0.5 + v.relevance_score))
LIMIT ?
""", (query, limit)).fetchall()
return [dict(r) for r in rows]
def get_document(self, doc_id: int):
row = self._conn.execute(
"SELECT * FROM v_documents WHERE id = ?", (doc_id,)
).fetchone()
return dict(row) if row else None
def list_documents(self, source_name: str = None, limit: int = 100):
if source_name:
rows = self._conn.execute(
"SELECT * FROM v_documents WHERE source_name = ? ORDER BY created_at DESC LIMIT ?",
(source_name, limit),
).fetchall()
else:
rows = self._conn.execute(
"SELECT * FROM v_documents ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
# ---------- Properties ----------
def search_properties(self, query: str = None, max_price: int = 30000,
region: str = None, ptype: str = None, limit: int = 50):
if query:
rows = self._conn.execute("""
SELECT v.*
FROM properties_fts fts
JOIN v_properties v ON v.id = fts.rowid
WHERE properties_fts MATCH ?
AND (v.price IS NULL OR v.price <= ?)
AND (? IS NULL OR v.region = ?)
AND (? IS NULL OR v.property_type = ?)
ORDER BY v.score DESC, v.price ASC
LIMIT ?
""", (query, max_price, region, region, ptype, ptype, limit)).fetchall()
else:
rows = self._conn.execute("""
SELECT * FROM v_properties
WHERE (price IS NULL OR price <= ?)
AND (? IS NULL OR region = ?)
AND (? IS NULL OR property_type = ?)
ORDER BY score DESC, price ASC
LIMIT ?
""", (max_price, region, region, ptype, ptype, limit)).fetchall()
return [dict(r) for r in rows]
def get_property(self, prop_id: int):
row = self._conn.execute(
"SELECT * FROM v_properties WHERE id = ?", (prop_id,)
).fetchone()
return dict(row) if row else None
# ---------- Meta ----------
def sources(self):
rows = self._conn.execute("SELECT * FROM v_sources").fetchall()
return [dict(r) for r in rows]
def schema_version(self):
rows = self._conn.execute(
"SELECT version, name, applied_at FROM schema_version ORDER BY version"
).fetchall()
return [dict(r) for r in rows]
if __name__ == "__main__":
with KEClient() as ke:
print("Schema:", ke.schema_version())
print("Sources:", len(ke.sources()))