-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmissing_service.py
More file actions
256 lines (206 loc) · 8.42 KB
/
missing_service.py
File metadata and controls
256 lines (206 loc) · 8.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import logging
import weaviate
import os
import uuid
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
from sentence_transformers import SentenceTransformer
import datetime
# Set up logging for this module
logger = logging.getLogger('la_fires_api.missing')
# Constants
MISSING_CLASS_NAME = "Missing"
MODEL_NAME = "all-MiniLM-L6-v2" # Smaller but efficient model
class MissingService:
"""Service for managing missing person/pet entries with vector search"""
def __init__(self, weaviate_url: str = None):
"""
Initialize the Missing service
Args:
weaviate_url: URL of the Weaviate instance (default: http://localhost:8080)
"""
self.weaviate_url = weaviate_url or "http://localhost:8080"
self.client = None
self.model = None
try:
# Initialize Weaviate client
self.client = weaviate.Client(self.weaviate_url)
logger.info(f"Connected to Weaviate at {self.weaviate_url}")
# Initialize the sentence transformer model
self.model = SentenceTransformer(MODEL_NAME)
logger.info(f"Initialized sentence transformer model: {MODEL_NAME}")
except Exception as e:
logger.error(f"Failed to initialize MissingService: {str(e)}")
raise
def create_schema(self) -> bool:
"""
Create the missing entries schema in Weaviate if it doesn't exist
Returns:
Boolean indicating success
"""
if not self.client:
logger.error("Weaviate client not initialized")
return False
try:
# Check if schema already exists
schema = self.client.schema.get()
existing_classes = [c["class"] for c in schema.get("classes", [])]
if MISSING_CLASS_NAME in existing_classes:
logger.info(f"Schema for {MISSING_CLASS_NAME} already exists")
return True
# Define the missing class schema
missing_class = {
"class": MISSING_CLASS_NAME,
"description": "Information about missing persons or pets",
"vectorizer": "none", # We'll provide vectors directly
"properties": [
{
"name": "content",
"dataType": ["text"],
"description": "The description of the missing person or pet"
},
{
"name": "timestamp",
"dataType": ["date"],
"description": "When this entry was created"
}
]
}
# Create the schema
self.client.schema.create_class(missing_class)
logger.info(f"Created schema for {MISSING_CLASS_NAME}")
return True
except Exception as e:
logger.error(f"Failed to create schema: {str(e)}")
return False
def vectorize_text(self, text: str) -> List[float]:
"""
Convert text to a vector representation
Args:
text: The text to vectorize
Returns:
Vector representation as a list of floats
"""
if not self.model:
logger.error("Sentence transformer model not initialized")
raise RuntimeError("Sentence transformer model not initialized")
try:
vector = self.model.encode(text)
return vector.tolist()
except Exception as e:
logger.error(f"Failed to vectorize text: {str(e)}")
raise
def add_missing_entry(self, content: str) -> str:
"""
Add a missing person/pet entry to Weaviate
Args:
content: Description of the missing person/pet
Returns:
ID of the created object or empty string if failed
"""
if not self.client:
logger.error("Weaviate client not initialized")
return ""
try:
# Ensure schema exists
self.create_schema()
# Vectorize the content
vector = self.vectorize_text(content)
# Prepare data for Weaviate - use RFC3339 format for timestamp
# Format example: 2020-01-01T00:00:00Z
current_time = datetime.datetime.now(datetime.timezone.utc)
rfc3339_time = current_time.strftime('%Y-%m-%dT%H:%M:%SZ')
data_object = {
"content": content,
"timestamp": rfc3339_time
}
# Add object with vector
result = self.client.data_object.create(
data_object=data_object,
class_name=MISSING_CLASS_NAME,
vector=vector
)
logger.info(f"Added missing entry with ID: {result}")
return result
except Exception as e:
logger.error(f"Failed to add missing entry: {str(e)}")
return ""
def search_missing_entries(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""
Search for missing entries similar to the query
Args:
query: The search query
limit: Maximum number of results to return
Returns:
List of matching entries
"""
if not self.client:
logger.error("Weaviate client not initialized")
return []
try:
# Vectorize the query
query_vector = self.vectorize_text(query)
# Perform vector search
result = (
self.client.query
.get(MISSING_CLASS_NAME, ["content", "timestamp"])
.with_near_vector({"vector": query_vector})
.with_limit(limit)
.do()
)
# Extract results
entries = []
if result and "data" in result and "Get" in result["data"] and MISSING_CLASS_NAME in result["data"]["Get"]:
weaviate_entries = result["data"]["Get"][MISSING_CLASS_NAME]
for entry in weaviate_entries:
entries.append({
"content": entry.get("content", ""),
"timestamp": entry.get("timestamp", "")
})
return entries
except Exception as e:
logger.error(f"Failed to search missing entries: {str(e)}")
return []
def get_all_missing_entries(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get all missing entries
Args:
limit: Maximum number of entries to return
Returns:
List of all missing entries
"""
if not self.client:
logger.error("Weaviate client not initialized")
return []
try:
result = (
self.client.query
.get(MISSING_CLASS_NAME, ["content", "timestamp"])
.with_limit(limit)
.do()
)
entries = []
if result and "data" in result and "Get" in result["data"] and MISSING_CLASS_NAME in result["data"]["Get"]:
weaviate_entries = result["data"]["Get"][MISSING_CLASS_NAME]
for entry in weaviate_entries:
entries.append({
"content": entry.get("content", ""),
"timestamp": entry.get("timestamp", "")
})
return entries
except Exception as e:
logger.error(f"Failed to get all missing entries: {str(e)}")
return []
# Global service instance
_missing_service = None
def get_missing_service() -> MissingService:
"""
Get or create a MissingService instance
Returns:
MissingService instance
"""
global _missing_service
if _missing_service is None:
weaviate_url = os.getenv("WEAVIATE_URL", "http://localhost:8080")
_missing_service = MissingService(weaviate_url)
return _missing_service