Skip to content

Commit 5504e24

Browse files
committed
support service mode
1 parent 96e2b3b commit 5504e24

2 files changed

Lines changed: 267 additions & 0 deletions

File tree

src_py/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from .database import Database
5454
from .prepared_statement import PreparedStatement
5555
from .query_result import QueryResult
56+
from .session import RemoteQueryResult, Session
5657
from .types import Type
5758

5859

@@ -76,6 +77,8 @@ def __getattr__(name: str) -> str | int:
7677
"Database",
7778
"PreparedStatement",
7879
"QueryResult",
80+
"RemoteQueryResult",
81+
"Session",
7982
"Type",
8083
"__version__", # noqa: F822
8184
"storage_version", # noqa: F822

src_py/session.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
"""Remote session client for Lbug HTTP Service Mode."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import urllib.request
7+
import urllib.error
8+
import urllib.parse
9+
from typing import TYPE_CHECKING
10+
11+
if TYPE_CHECKING:
12+
import sys
13+
from collections.abc import Iterator
14+
from types import TracebackType
15+
from typing import Any
16+
17+
if sys.version_info >= (3, 11):
18+
from typing import Self
19+
else:
20+
from typing_extensions import Self
21+
22+
23+
class RemoteQueryResult:
24+
"""Stores the result of a remote query execution over HTTP."""
25+
26+
def __init__(self, data: dict[str, Any]) -> None:
27+
self._data = data
28+
self._columns: list[str] = data.get("columns", [])
29+
self._rows: list[list[str]] = data.get("rows", [])
30+
self._num_rows: int = data.get("numRows", len(self._rows))
31+
self._compiling_time: float = data.get("compilingTime", 0.0)
32+
self._execution_time: float = data.get("executionTime", 0.0)
33+
self._error: str | None = data.get("error")
34+
self._cursor = 0
35+
self.is_closed = False
36+
37+
def __enter__(self) -> Self:
38+
return self
39+
40+
def __exit__(
41+
self,
42+
exc_type: type[BaseException] | None,
43+
exc_value: BaseException | None,
44+
exc_traceback: TracebackType | None,
45+
) -> None:
46+
self.close()
47+
48+
def __iter__(self) -> Iterator[list[str]]:
49+
return self
50+
51+
def __next__(self) -> list[str]:
52+
if self.has_next():
53+
return self.get_next()
54+
raise StopIteration
55+
56+
def is_success(self) -> bool:
57+
"""Check if the query executed successfully."""
58+
return self._error is None
59+
60+
def get_error_message(self) -> str | None:
61+
"""Get the error message if the query failed."""
62+
return self._error
63+
64+
def has_next(self) -> bool:
65+
"""Check if there are more rows to read."""
66+
self._check_closed()
67+
return self._cursor < len(self._rows)
68+
69+
def get_next(self) -> list[str]:
70+
"""Get the next row."""
71+
self._check_closed()
72+
if self._cursor >= len(self._rows):
73+
msg = "No more rows"
74+
raise StopIteration(msg)
75+
row = self._rows[self._cursor]
76+
self._cursor += 1
77+
return row
78+
79+
def get_all(self) -> list[list[str]]:
80+
"""Get all remaining rows."""
81+
return list(self)
82+
83+
def get_column_names(self) -> list[str]:
84+
"""Get column names."""
85+
return self._columns
86+
87+
def get_num_tuples(self) -> int:
88+
"""Get total number of rows."""
89+
return self._num_rows
90+
91+
def get_compiling_time(self) -> float:
92+
"""Get query compiling time in ms."""
93+
return self._compiling_time
94+
95+
def get_execution_time(self) -> float:
96+
"""Get query execution time in ms."""
97+
return self._execution_time
98+
99+
def reset_iterator(self) -> None:
100+
"""Reset the row iterator to the beginning."""
101+
self._cursor = 0
102+
103+
def get_as_df(self) -> Any:
104+
"""
105+
Get the query result as a Pandas DataFrame.
106+
107+
Returns
108+
-------
109+
pandas.DataFrame
110+
Query result as a Pandas DataFrame.
111+
"""
112+
import pandas as pd
113+
114+
self._check_closed()
115+
return pd.DataFrame(self._rows, columns=self._columns)
116+
117+
def close(self) -> None:
118+
"""Close the query result."""
119+
self.is_closed = True
120+
121+
def _check_closed(self) -> None:
122+
if self.is_closed:
123+
msg = "Query result is closed"
124+
raise RuntimeError(msg)
125+
126+
def __repr__(self) -> str:
127+
if self._error:
128+
return f"RemoteQueryResult(error={self._error!r})"
129+
return f"RemoteQueryResult(columns={self._columns}, numRows={self._num_rows})"
130+
131+
132+
class Session:
133+
"""
134+
HTTP client session for connecting to a Lbug Service Mode server.
135+
136+
Example
137+
-------
138+
>>> session = Session("http://localhost:8000")
139+
>>> result = session.execute("MATCH (n:Person) RETURN n.name, n.age")
140+
>>> while result.has_next():
141+
... print(result.get_next())
142+
>>> session.close()
143+
144+
Or as a context manager:
145+
146+
>>> with Session("http://localhost:8000") as session:
147+
... result = session.execute("MATCH (n:Person) RETURN n.name")
148+
... print(result.get_all())
149+
"""
150+
151+
def __init__(self, endpoint: str = "http://localhost:8000", timeout: float = 30.0) -> None:
152+
"""
153+
Create a session connected to a Lbug Service Mode server.
154+
155+
Parameters
156+
----------
157+
endpoint : str
158+
Base URL of the server (e.g. "http://localhost:8000").
159+
timeout : float
160+
Request timeout in seconds.
161+
"""
162+
self._endpoint = endpoint.rstrip("/")
163+
self._timeout = timeout
164+
self._closed = False
165+
# Use a no-proxy opener for localhost connections
166+
self._opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
167+
168+
# Verify connectivity
169+
try:
170+
self.health()
171+
except Exception as e:
172+
msg = f"Failed to connect to Lbug server at {self._endpoint}: {e}"
173+
raise ConnectionError(msg) from e
174+
175+
def __enter__(self) -> Self:
176+
return self
177+
178+
def __exit__(
179+
self,
180+
exc_type: type[BaseException] | None,
181+
exc_value: BaseException | None,
182+
exc_traceback: TracebackType | None,
183+
) -> None:
184+
self.close()
185+
186+
def execute(self, query: str) -> RemoteQueryResult:
187+
"""
188+
Execute a Cypher query.
189+
190+
Parameters
191+
----------
192+
query : str
193+
The Cypher query to execute.
194+
195+
Returns
196+
-------
197+
RemoteQueryResult
198+
The query result.
199+
"""
200+
self._check_closed()
201+
payload = json.dumps({"query": query}).encode("utf-8")
202+
req = urllib.request.Request(
203+
f"{self._endpoint}/cypher",
204+
data=payload,
205+
headers={"Content-Type": "application/json"},
206+
method="POST",
207+
)
208+
data = self._send(req)
209+
result = RemoteQueryResult(data)
210+
if not result.is_success():
211+
msg = result.get_error_message()
212+
raise RuntimeError(msg)
213+
return result
214+
215+
def health(self) -> dict[str, Any]:
216+
"""
217+
Check server health.
218+
219+
Returns
220+
-------
221+
dict
222+
Server health status.
223+
"""
224+
req = urllib.request.Request(f"{self._endpoint}/health")
225+
return self._send(req)
226+
227+
def schema(self) -> RemoteQueryResult:
228+
"""
229+
Get database schema.
230+
231+
Returns
232+
-------
233+
RemoteQueryResult
234+
Schema information as a query result.
235+
"""
236+
self._check_closed()
237+
req = urllib.request.Request(f"{self._endpoint}/schema")
238+
data = self._send(req)
239+
return RemoteQueryResult(data)
240+
241+
def close(self) -> None:
242+
"""Close the session."""
243+
self._closed = True
244+
245+
def _send(self, req: urllib.request.Request) -> dict[str, Any]:
246+
"""Send an HTTP request and return parsed JSON response."""
247+
try:
248+
with self._opener.open(req, timeout=self._timeout) as resp:
249+
return json.loads(resp.read().decode("utf-8"))
250+
except urllib.error.HTTPError as e:
251+
body = e.read().decode("utf-8")
252+
try:
253+
return json.loads(body)
254+
except json.JSONDecodeError:
255+
raise RuntimeError(f"HTTP {e.code}: {body}") from e
256+
257+
def _check_closed(self) -> None:
258+
if self._closed:
259+
msg = "Session is closed"
260+
raise RuntimeError(msg)
261+
262+
def __repr__(self) -> str:
263+
state = "closed" if self._closed else "connected"
264+
return f"Session(endpoint={self._endpoint!r}, {state})"

0 commit comments

Comments
 (0)