Skip to content

Commit 034fde7

Browse files
author
Dylan Huang
committed
Refactor Elasticsearch integration to use dedicated client
- Replaced direct HTTP requests with an ElasticsearchClient for handling interactions with Elasticsearch in ElasticSearchDirectHttpHandler and ElasticsearchIndexManager. - Updated configuration handling to encapsulate Elasticsearch settings within a single config object. - Simplified index management and document indexing processes, improving code maintainability and readability. - Adjusted tests to utilize the new client for searching and verifying log entries, enhancing test reliability.
1 parent 4c27734 commit 034fde7

File tree

4 files changed

+375
-258
lines changed

4 files changed

+375
-258
lines changed
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
"""
2+
Centralized Elasticsearch client for all Elasticsearch API operations.
3+
4+
This module provides a unified interface for all Elasticsearch operations
5+
used throughout the codebase, including index management, document operations,
6+
and search functionality.
7+
"""
8+
9+
import json
10+
import requests
11+
from typing import Any, Dict, List, Optional, Union
12+
from urllib.parse import urlparse
13+
from dataclasses import dataclass
14+
15+
16+
@dataclass
17+
class ElasticsearchConfig:
18+
"""Configuration for Elasticsearch client."""
19+
20+
url: str
21+
api_key: str
22+
index_name: str
23+
verify_ssl: bool = True
24+
25+
def __post_init__(self):
26+
"""Parse URL to determine SSL verification setting."""
27+
parsed_url = urlparse(self.url)
28+
self.verify_ssl = parsed_url.scheme == "https"
29+
30+
31+
class ElasticsearchClient:
32+
"""Centralized client for all Elasticsearch operations."""
33+
34+
def __init__(self, config: ElasticsearchConfig):
35+
"""Initialize the Elasticsearch client.
36+
37+
Args:
38+
config: Elasticsearch configuration
39+
"""
40+
self.config = config
41+
self.base_url = config.url.rstrip("/")
42+
self.index_url = f"{self.base_url}/{config.index_name}"
43+
self._headers = {"Content-Type": "application/json", "Authorization": f"ApiKey {config.api_key}"}
44+
45+
def _make_request(
46+
self,
47+
method: str,
48+
url: str,
49+
json_data: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
50+
params: Optional[Dict[str, Any]] = None,
51+
timeout: int = 30,
52+
) -> requests.Response:
53+
"""Make an HTTP request to Elasticsearch.
54+
55+
Args:
56+
method: HTTP method (GET, POST, PUT, DELETE, HEAD)
57+
url: Full URL for the request
58+
json_data: JSON data to send in request body
59+
params: Query parameters
60+
timeout: Request timeout in seconds
61+
62+
Returns:
63+
requests.Response object
64+
65+
Raises:
66+
requests.RequestException: If the request fails
67+
"""
68+
return requests.request(
69+
method=method,
70+
url=url,
71+
headers=self._headers,
72+
json=json_data,
73+
params=params,
74+
verify=self.config.verify_ssl,
75+
timeout=timeout,
76+
)
77+
78+
# Index Management Operations
79+
80+
def create_index(self, mapping: Dict[str, Any]) -> bool:
81+
"""Create an index with the specified mapping.
82+
83+
Args:
84+
mapping: Index mapping configuration
85+
86+
Returns:
87+
bool: True if successful, False otherwise
88+
"""
89+
try:
90+
response = self._make_request("PUT", self.index_url, json_data=mapping)
91+
return response.status_code in [200, 201]
92+
except Exception:
93+
return False
94+
95+
def index_exists(self) -> bool:
96+
"""Check if the index exists.
97+
98+
Returns:
99+
bool: True if index exists, False otherwise
100+
"""
101+
try:
102+
response = self._make_request("HEAD", self.index_url)
103+
return response.status_code == 200
104+
except Exception:
105+
return False
106+
107+
def delete_index(self) -> bool:
108+
"""Delete the index.
109+
110+
Returns:
111+
bool: True if successful, False otherwise
112+
"""
113+
try:
114+
response = self._make_request("DELETE", self.index_url)
115+
return response.status_code in [200, 404] # 404 means index doesn't exist
116+
except Exception:
117+
return False
118+
119+
def get_mapping(self) -> Optional[Dict[str, Any]]:
120+
"""Get the index mapping.
121+
122+
Returns:
123+
Dict containing mapping data, or None if failed
124+
"""
125+
try:
126+
response = self._make_request("GET", f"{self.index_url}/_mapping")
127+
if response.status_code == 200:
128+
return response.json()
129+
return None
130+
except Exception:
131+
return None
132+
133+
def get_index_stats(self) -> Optional[Dict[str, Any]]:
134+
"""Get index statistics.
135+
136+
Returns:
137+
Dict containing index statistics, or None if failed
138+
"""
139+
try:
140+
response = self._make_request("GET", f"{self.index_url}/_stats")
141+
if response.status_code == 200:
142+
return response.json()
143+
return None
144+
except Exception:
145+
return None
146+
147+
# Document Operations
148+
149+
def index_document(self, document: Dict[str, Any], doc_id: Optional[str] = None) -> bool:
150+
"""Index a document.
151+
152+
Args:
153+
document: Document to index
154+
doc_id: Optional document ID
155+
156+
Returns:
157+
bool: True if successful, False otherwise
158+
"""
159+
try:
160+
if doc_id:
161+
url = f"{self.index_url}/_doc/{doc_id}"
162+
else:
163+
url = f"{self.index_url}/_doc"
164+
165+
response = self._make_request("POST", url, json_data=document)
166+
return response.status_code in [200, 201]
167+
except Exception:
168+
return False
169+
170+
def bulk_index_documents(self, documents: List[Dict[str, Any]]) -> bool:
171+
"""Bulk index multiple documents.
172+
173+
Args:
174+
documents: List of documents to index
175+
176+
Returns:
177+
bool: True if successful, False otherwise
178+
"""
179+
try:
180+
# Prepare bulk request body
181+
bulk_body = []
182+
for doc in documents:
183+
bulk_body.append({"index": {}})
184+
bulk_body.append(doc)
185+
186+
response = self._make_request("POST", f"{self.index_url}/_bulk", json_data=bulk_body)
187+
return response.status_code == 200
188+
except Exception:
189+
return False
190+
191+
# Search Operations
192+
193+
def search(
194+
self, query: Dict[str, Any], size: int = 10, from_: int = 0, sort: Optional[List[Dict[str, Any]]] = None
195+
) -> Optional[Dict[str, Any]]:
196+
"""Search documents in the index.
197+
198+
Args:
199+
query: Elasticsearch query
200+
size: Number of results to return
201+
from_: Starting offset
202+
sort: Sort specification
203+
204+
Returns:
205+
Dict containing search results, or None if failed
206+
"""
207+
try:
208+
search_body = {"query": query, "size": size, "from": from_}
209+
210+
if sort:
211+
search_body["sort"] = sort
212+
213+
response = self._make_request("POST", f"{self.index_url}/_search", json_data=search_body)
214+
215+
if response.status_code == 200:
216+
return response.json()
217+
return None
218+
except Exception:
219+
return None
220+
221+
def search_by_term(self, field: str, value: Any, size: int = 10) -> Optional[Dict[str, Any]]:
222+
"""Search documents by exact term match.
223+
224+
Args:
225+
field: Field name to search
226+
value: Value to match
227+
size: Number of results to return
228+
229+
Returns:
230+
Dict containing search results, or None if failed
231+
"""
232+
query = {"term": {field: value}}
233+
return self.search(query, size=size)
234+
235+
def search_by_match(self, field: str, value: str, size: int = 10) -> Optional[Dict[str, Any]]:
236+
"""Search documents by text match.
237+
238+
Args:
239+
field: Field name to search
240+
value: Text to match
241+
size: Number of results to return
242+
243+
Returns:
244+
Dict containing search results, or None if failed
245+
"""
246+
query = {"match": {field: value}}
247+
return self.search(query, size=size)
248+
249+
def search_by_match_phrase_prefix(self, field: str, value: str, size: int = 10) -> Optional[Dict[str, Any]]:
250+
"""Search documents by phrase prefix match.
251+
252+
Args:
253+
field: Field name to search
254+
value: Phrase prefix to match
255+
size: Number of results to return
256+
257+
Returns:
258+
Dict containing search results, or None if failed
259+
"""
260+
query = {"match_phrase_prefix": {field: value}}
261+
return self.search(query, size=size)
262+
263+
def search_all(self, size: int = 10) -> Optional[Dict[str, Any]]:
264+
"""Search all documents in the index.
265+
266+
Args:
267+
size: Number of results to return
268+
269+
Returns:
270+
Dict containing search results, or None if failed
271+
"""
272+
query = {"match_all": {}}
273+
return self.search(query, size=size)
274+
275+
# Health and Status Operations
276+
277+
def health_check(self) -> bool:
278+
"""Check if Elasticsearch is healthy.
279+
280+
Returns:
281+
bool: True if healthy, False otherwise
282+
"""
283+
try:
284+
response = self._make_request("GET", f"{self.base_url}/_cluster/health")
285+
return response.status_code == 200
286+
except Exception:
287+
return False
288+
289+
def get_cluster_info(self) -> Optional[Dict[str, Any]]:
290+
"""Get cluster information.
291+
292+
Returns:
293+
Dict containing cluster info, or None if failed
294+
"""
295+
try:
296+
response = self._make_request("GET", f"{self.base_url}/_cluster/health")
297+
if response.status_code == 200:
298+
return response.json()
299+
return None
300+
except Exception:
301+
return None

eval_protocol/logging/elasticsearch_direct_http_handler.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,23 @@
66
from concurrent.futures import ThreadPoolExecutor
77
from typing import Optional, Tuple, Any, Dict
88
from datetime import datetime
9-
from urllib.parse import urlparse
10-
import requests
119

1210
from eval_protocol.types.remote_rollout_processor import ElasticSearchConfig
11+
from .elasticsearch_client import ElasticsearchClient, ElasticsearchConfig as ESConfig
1312

1413

1514
class ElasticSearchDirectHttpHandler(logging.Handler):
1615
def __init__(self, elasticsearch_config: ElasticSearchConfig) -> None:
1716
super().__init__()
18-
self.base_url: str = elasticsearch_config.url.rstrip("/")
19-
self.index_name: str = elasticsearch_config.index_name
20-
self.api_key: str = elasticsearch_config.api_key
21-
self.url: str = f"{self.base_url}/{self.index_name}/_doc"
17+
self.config = ESConfig(
18+
url=elasticsearch_config.url,
19+
api_key=elasticsearch_config.api_key,
20+
index_name=elasticsearch_config.index_name,
21+
)
22+
self.client = ElasticsearchClient(self.config)
2223
self.formatter: logging.Formatter = logging.Formatter()
2324
self._executor = None
2425

25-
# Parse URL to determine if we should verify SSL
26-
parsed_url = urlparse(elasticsearch_config.url)
27-
self.verify_ssl = parsed_url.scheme == "https"
28-
2926
def emit(self, record: logging.LogRecord) -> None:
3027
"""Emit a log record by scheduling it for async transmission."""
3128
try:
@@ -104,13 +101,9 @@ def _schedule_async_send(self, data: Dict[str, Any], record: logging.LogRecord)
104101
def _send_to_elasticsearch(self, data: Dict[str, Any], record: logging.LogRecord) -> None:
105102
"""Send data to Elasticsearch (runs in thread pool)."""
106103
try:
107-
response: requests.Response = requests.post(
108-
self.url,
109-
headers={"Content-Type": "application/json", "Authorization": f"ApiKey {self.api_key}"},
110-
data=json.dumps(data),
111-
verify=self.verify_ssl, # If using HTTPS, verify SSL certificate
112-
)
113-
response.raise_for_status() # Raise an exception for HTTP errors
104+
success = self.client.index_document(data)
105+
if not success:
106+
raise Exception("Failed to index document to Elasticsearch")
114107
except Exception as e:
115108
# Re-raise to be handled by the callback
116109
raise e

0 commit comments

Comments
 (0)