From 3144298ac0dc9c7eee4043b1f685a35a079824bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Thu, 11 Dec 2025 18:05:07 +0800 Subject: [PATCH 1/9] run ruff and fix lint (#385) --- .github/workflows/update_version.py | 16 +++++++++++----- docs/7_orm_example.py | 4 +++- example.py | 1 + scripts/add_apache_headers.py | 1 - src/nebulagraph_python/decoder/size_constant.py | 4 ++-- src/nebulagraph_python/orm/model.py | 2 +- src/nebulagraph_python/py_data_types.py | 5 ++++- src/nebulagraph_python/result_set.py | 2 +- src/nebulagraph_python/value_wrapper.py | 3 +++ 9 files changed, 26 insertions(+), 12 deletions(-) diff --git a/.github/workflows/update_version.py b/.github/workflows/update_version.py index d8c0b89f..1d346624 100644 --- a/.github/workflows/update_version.py +++ b/.github/workflows/update_version.py @@ -7,7 +7,9 @@ from typing import Optional -def update_version(version_type: str = "dev", custom_suffix: Optional[str] = None) -> str: +def update_version( + version_type: str = "dev", custom_suffix: Optional[str] = None +) -> str: """ Update the `version` field in `pyproject.toml` for supported manual build types. @@ -25,17 +27,21 @@ def update_version(version_type: str = "dev", custom_suffix: Optional[str] = Non content: str = pyproject_path.read_text() # Extract current version - version_match: Optional[re.Match[str]] = re.search(r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE) + version_match: Optional[re.Match[str]] = re.search( + r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE + ) if not version_match: raise ValueError("Could not find version in pyproject.toml") current_version: str = version_match.group(1) - + # Parse the base version (remove any existing suffixes) - base_version_match: Optional[re.Match[str]] = re.match(r'^(\d+\.\d+\.\d+)', current_version) + base_version_match: Optional[re.Match[str]] = re.match( + r"^(\d+\.\d+\.\d+)", current_version + ) if not base_version_match: raise ValueError(f"Invalid version format: {current_version}") - + base_version: str = base_version_match.group(1) # Only dev/custom builds mutate the version; all others keep current version diff --git a/docs/7_orm_example.py b/docs/7_orm_example.py index 58ed9065..1c40ddec 100644 --- a/docs/7_orm_example.py +++ b/docs/7_orm_example.py @@ -117,7 +117,9 @@ class Edge1( if input("Execute the DDL? (y/N)") == "y": client.execute_py(graph_type.to_gql()) - client.execute_py("CREATE GRAPH IF NOT EXISTS define_type_test define_type_test_type") + client.execute_py( + "CREATE GRAPH IF NOT EXISTS define_type_test define_type_test_type" + ) q = upsert_gql( diff --git a/example.py b/example.py index 7d55897e..ee1e8165 100644 --- a/example.py +++ b/example.py @@ -187,6 +187,7 @@ def sync_session_pool_example(): if __name__ == "__main__": import asyncio import logging + logging.basicConfig(level=logging.DEBUG) logging.getLogger("nebulagraph_python").setLevel(logging.DEBUG) diff --git a/scripts/add_apache_headers.py b/scripts/add_apache_headers.py index 8f4357a0..5aa13dd2 100644 --- a/scripts/add_apache_headers.py +++ b/scripts/add_apache_headers.py @@ -203,4 +203,3 @@ def main() -> None: if __name__ == "__main__": main() - diff --git a/src/nebulagraph_python/decoder/size_constant.py b/src/nebulagraph_python/decoder/size_constant.py index 00769564..0760782c 100644 --- a/src/nebulagraph_python/decoder/size_constant.py +++ b/src/nebulagraph_python/decoder/size_constant.py @@ -25,8 +25,8 @@ BOOL_SIZE = 1 # Vector element size EMBEDDING_VECTOR_DIM_SIZE = 4 # Size of vector dimension in row type (int32) -ELEMENT_NUMBER_SIZE_FOR_VECTOR_VALUE=2 -EMBEDDING_VECTOR_FLOAT_VALUE_SIZE=4 +ELEMENT_NUMBER_SIZE_FOR_VECTOR_VALUE = 2 +EMBEDDING_VECTOR_FLOAT_VALUE_SIZE = 4 FLOAT32_SIZE = 4 # Size of each float32 element in vector # String size: 4 byte string value length + 4 byte prefix string diff --git a/src/nebulagraph_python/orm/model.py b/src/nebulagraph_python/orm/model.py index 58820c48..1b2068bc 100644 --- a/src/nebulagraph_python/orm/model.py +++ b/src/nebulagraph_python/orm/model.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections.abc import Sequence as ABCSequence from dataclasses import dataclass from datetime import date from types import UnionType from typing import Iterable -from collections.abc import Sequence as ABCSequence from pydantic import ( AfterValidator, diff --git a/src/nebulagraph_python/py_data_types.py b/src/nebulagraph_python/py_data_types.py index 840fadeb..11e15070 100644 --- a/src/nebulagraph_python/py_data_types.py +++ b/src/nebulagraph_python/py_data_types.py @@ -902,7 +902,7 @@ def __eq__(self, other) -> bool: logger.warning("Expected Vector, got %s", type(other)) return False return self.dimension == other.dimension and all( - abs(a - b) < 1e-7 for a, b in zip(self.values, other.values) + abs(a - b) < 1e-7 for a, b in zip(self.values, other.values, strict=True) ) def __str__(self) -> str: @@ -913,6 +913,9 @@ def __repr__(self) -> str: """Return detailed string representation of the vector.""" return f"NVector({self.values})" + def __hash__(self) -> int: + return hash(str(self)) + BasicTargetType = Union[ None, diff --git a/src/nebulagraph_python/result_set.py b/src/nebulagraph_python/result_set.py index 2c0b0f28..26e1fca0 100644 --- a/src/nebulagraph_python/result_set.py +++ b/src/nebulagraph_python/result_set.py @@ -392,7 +392,7 @@ def as_ascii_table( row_num = 1 for record in self.records(): console.print(f"\n[bold blue]Row {row_num}[/bold blue]") - for col, val in zip(self.column_names, record.values()): + for col, val in zip(self.column_names, record.values(), strict=True): console.print(f" [cyan]{col}:[/cyan] {val.cast_primitive()}") row_num += 1 diff --git a/src/nebulagraph_python/value_wrapper.py b/src/nebulagraph_python/value_wrapper.py index ee15d660..78000c21 100644 --- a/src/nebulagraph_python/value_wrapper.py +++ b/src/nebulagraph_python/value_wrapper.py @@ -357,6 +357,9 @@ def __eq__(self, other): return False return self.value == other.value and self.data_type == other.data_type + def __hash__(self) -> int: + return hash((self.value, self.data_type)) + class Row: def __init__(self, values: Optional[List[ValueWrapper]] = None): From 71b4d2b662db7bae7b9da061aa908af6ef5357d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Fri, 12 Dec 2025 16:25:22 +0800 Subject: [PATCH 2/9] [Feature] Support borrow session for contextual execution (#386) * support borrow session on client * add doc * run lint --- docs/2_concurrency.md | 20 +++ src/nebulagraph_python/client/_connection.py | 8 + src/nebulagraph_python/client/_session.py | 28 ++-- .../client/_session_pool.py | 16 +- src/nebulagraph_python/client/client.py | 69 ++++---- tests/test_session_pool.py | 152 +++++++++--------- 6 files changed, 162 insertions(+), 131 deletions(-) diff --git a/docs/2_concurrency.md b/docs/2_concurrency.md index d9e48f4f..eb063b36 100644 --- a/docs/2_concurrency.md +++ b/docs/2_concurrency.md @@ -62,6 +62,26 @@ async def concurrent_example(): asyncio.run(concurrent_example()) ``` +## Contextual Execution + +By default, statements run on a random session from the pool. When you need to run several queries on the same session, call `borrow` to obtain and reuse a specific session. + +```python +async def contextual_example(): + async with await NebulaAsyncClient.connect( + hosts=["127.0.0.1:9669"], + username="root", + password="NebulaGraph01", + session_pool_config=SessionPoolConfig(), + ) as client: + print("Connected to the server...") + async with client.borrow() as session: + await session.execute("SESSION SET GRAPH movie") + res = await session.execute("MATCH (v:Movie) RETURN count(v)") + res.print() +``` + + ## Understanding Timeout Values The client uses three different timeouts that apply at different stages: diff --git a/src/nebulagraph_python/client/_connection.py b/src/nebulagraph_python/client/_connection.py index 3dc320dc..a2f02630 100644 --- a/src/nebulagraph_python/client/_connection.py +++ b/src/nebulagraph_python/client/_connection.py @@ -96,6 +96,8 @@ class Connection: # Config config: ConnectionConfig + # Track which host was successfully connected for session routing + connected: HostAddress | None = field(default=None, init=False) # Owned Resources _stub: Optional[graph_pb2_grpc.GraphServiceStub] = field(default=None, init=False) @@ -152,6 +154,8 @@ def connect(self): logger.info( f"Successfully connected to {host_addr.host}:{host_addr.port}." ) + # Remember which host we actually connected to + self.connected = host_addr return except Exception as e: logger.warning( @@ -174,6 +178,7 @@ def close(self): self._channel.close() self._channel = None self._stub = None + self.connected = None except Exception: logger.exception("Failed to close connection") @@ -303,6 +308,7 @@ class AsyncConnection: """ config: ConnectionConfig + connected: HostAddress | None = None _stub: Optional[graph_pb2_grpc.GraphServiceStub] = field(default=None, init=False) _channel: Optional[grpc.aio.Channel] = field( default=None, init=False @@ -358,6 +364,7 @@ async def connect(self): logger.info( f"Successfully connected to {host_addr.host}:{host_addr.port} asynchronously." ) + self.connected = host_addr return except Exception as e: logger.warning( @@ -380,6 +387,7 @@ async def close(self): await self._channel.close() self._channel = None self._stub = None + self.connected = None except BaseException: logger.exception("Failed to close async connection") diff --git a/src/nebulagraph_python/client/_session.py b/src/nebulagraph_python/client/_session.py index 09184d50..83bd322b 100644 --- a/src/nebulagraph_python/client/_session.py +++ b/src/nebulagraph_python/client/_session.py @@ -25,7 +25,7 @@ from nebulagraph_python.error import ExecutingError -@dataclass +@dataclass(kw_only=True, frozen=True) class SessionConfig: schema: Optional[str] = None graph: Optional[str] = None @@ -47,31 +47,32 @@ class SessionBase: @dataclass class Session(SessionBase): - conn: "Connection" + _conn: "Connection" def execute( self, statement: str, *, timeout: Optional[float] = None, do_ping: bool = False ): - res = self.conn.execute( + res = self._conn.execute( self._session, statement, timeout=timeout, do_ping=do_ping ) # Retry for only one time if res.status_code == ErrorCode.SESSION_NOT_FOUND.value: - self._session = self.conn.authenticate( + self._session = self._conn.authenticate( self.username, self.password, session_config=self.session_config, auth_options=self.auth_options, ) - res = self.conn.execute( + res = self._conn.execute( self._session, statement, timeout=timeout, do_ping=do_ping ) + res.raise_on_error() return res - def close(self): + def _close(self): """Close session""" try: - self.conn.execute(self._session, "SESSION CLOSE") + self._conn.execute(self._session, "SESSION CLOSE") except Exception: logger.exception("Failed to close session") @@ -84,30 +85,31 @@ def __eq__(self, other): @dataclass class AsyncSession(SessionBase): - conn: "AsyncConnection" + _conn: "AsyncConnection" async def execute( self, statement: str, *, timeout: Optional[float] = None, do_ping: bool = False ): - res = await self.conn.execute( + res = await self._conn.execute( self._session, statement, timeout=timeout, do_ping=do_ping ) # Retry for only one time if res.status_code == ErrorCode.SESSION_NOT_FOUND.value: - self._session = await self.conn.authenticate( + self._session = await self._conn.authenticate( self.username, self.password, session_config=self.session_config, auth_options=self.auth_options, ) - res = await self.conn.execute( + res = await self._conn.execute( self._session, statement, timeout=timeout, do_ping=do_ping ) + res.raise_on_error() return res - async def close(self): + async def _close(self): try: - await self.conn.execute(self._session, "SESSION CLOSE") + await self._conn.execute(self._session, "SESSION CLOSE") except Exception: logger.exception("Failed to close async session") diff --git a/src/nebulagraph_python/client/_session_pool.py b/src/nebulagraph_python/client/_session_pool.py index d08f930b..2a085a33 100644 --- a/src/nebulagraph_python/client/_session_pool.py +++ b/src/nebulagraph_python/client/_session_pool.py @@ -91,7 +91,7 @@ async def connect( except Exception: # Clean up any sessions that were successfully created for session in sessions: - await session.close() + await session._close() raise def __init__( @@ -157,20 +157,20 @@ async def borrow(self): self.busy_sessions_queue.remove(got_session) self.queue_count.release() - async def close(self): + async def _close(self): # Acquire all semaphore permits to prevent new borrows for _ in range(self.config.size): await self.queue_count.acquire() async with self.queue_lock: # Close all free sessions for session in self.free_sessions_queue: - await session.close() + await session._close() # Close all busy sessions (if any remain) for session in self.busy_sessions_queue: logger.error( "Busy sessions remain after acquire all semaphore permits, which indicates a bug in the AsyncSessionPool" ) - await session.close() + await session._close() class SessionPool: @@ -209,7 +209,7 @@ def connect( except Exception: # Clean up any sessions that were successfully created for session in sessions: - session.close() + session._close() raise def __init__( @@ -273,17 +273,17 @@ def borrow(self): self.busy_sessions_queue.remove(got_session) self.queue_count.release() - def close(self): + def _close(self): # Acquire all semaphore permits to prevent new borrows for _ in range(self.config.size): self.queue_count.acquire() with self.queue_lock: # Close all free sessions for session in self.free_sessions_queue: - session.close() + session._close() # Close all busy sessions (if any remain) for session in self.busy_sessions_queue: logger.error( "Busy sessions remain after acquire all semaphore permits, which indicates a bug in the SessionPool" ) - session.close() + session._close() diff --git a/src/nebulagraph_python/client/client.py b/src/nebulagraph_python/client/client.py index 57be1d7b..1b57fa81 100644 --- a/src/nebulagraph_python/client/client.py +++ b/src/nebulagraph_python/client/client.py @@ -13,6 +13,8 @@ # limitations under the License. import logging +from collections.abc import AsyncGenerator, Generator +from contextlib import asynccontextmanager, contextmanager from typing import Any, Dict, List, Literal, Optional, Union from nebulagraph_python.client._connection import ( @@ -120,7 +122,7 @@ async def connect( ) else: self._sessions[host_addr] = AsyncSession( - conn=conn, + _conn=conn, username=username, password=password, session_config=session_config or SessionConfig(), @@ -134,40 +136,31 @@ async def connect( async def execute( self, statement: str, *, timeout: Optional[float] = None, do_ping: bool = False ) -> ResultSet: + async with self.borrow() as session: + return await session.execute(statement, timeout=timeout, do_ping=do_ping) + + @asynccontextmanager + async def borrow(self) -> AsyncGenerator[AsyncSession, None]: if isinstance(self._conn, AsyncConnectionPool): - addr, _conn = await self._conn.next_connection() + addr, conn = await self._conn.next_connection() else: - addr = self._conn.config.hosts[0] + conn = self._conn + addr = conn.connected + if addr is None: + raise ValueError("Connection not connected") + _session = self._sessions[addr] if isinstance(_session, AsyncSessionPool): async with _session.borrow() as session: - return ( - await session.execute(statement, timeout=timeout, do_ping=do_ping) - ).raise_on_error() + yield session else: - return ( - await _session.execute(statement, timeout=timeout, do_ping=do_ping) - ).raise_on_error() - - async def ping(self, timeout: Optional[float] = None) -> bool: - try: - res = ( - (await self.execute(statement="RETURN 1", timeout=timeout)) - .one() - .as_primitive() - ) - if not res == {"1": 1}: - raise ValueError(f"Unexpected result from ping: {res}") - return True - except Exception: - logger.exception("Failed to ping NebulaGraph") - return False + yield _session async def close(self): """Close the client connection and session. No Exception will be raised but an error will be logged.""" for session in self._sessions.values(): - await session.close() + await session._close() await self._conn.close() async def __aenter__(self): @@ -245,7 +238,7 @@ def __init__( ) else: self._sessions[host_addr] = Session( - conn=conn, + _conn=conn, username=username, password=password, session_config=session_config or SessionConfig(), @@ -258,21 +251,29 @@ def __init__( def execute( self, statement: str, *, timeout: Optional[float] = None, do_ping: bool = False ) -> ResultSet: + """Execute a statement using a borrowed session, raising on errors.""" + with self.borrow() as session: + return session.execute(statement, timeout=timeout, do_ping=do_ping) + + @contextmanager + def borrow(self) -> Generator[Session, None, None]: + """Yield a session bound to the selected connection.""" if isinstance(self._conn, ConnectionPool): - addr, _conn = self._conn.next_connection() + addr, conn = self._conn.next_connection() else: - addr = self._conn.config.hosts[0] + conn = self._conn + addr = conn.connected + if addr is None: + raise ValueError("Connection not connected") + + # Route to the correct session (pool or single session) _session = self._sessions[addr] if isinstance(_session, SessionPool): with _session.borrow() as session: - return session.execute( - statement, timeout=timeout, do_ping=do_ping - ).raise_on_error() + yield session else: - return _session.execute( - statement, timeout=timeout, do_ping=do_ping - ).raise_on_error() + yield _session def ping(self, timeout: Optional[float] = None) -> bool: try: @@ -291,7 +292,7 @@ def ping(self, timeout: Optional[float] = None) -> bool: def close(self): """Close the client connection and session. No Exception will be raised but an error will be logged.""" for session in self._sessions.values(): - session.close() + session._close() self._conn.close() def __enter__(self): diff --git a/tests/test_session_pool.py b/tests/test_session_pool.py index 7cc5d38f..05b40872 100644 --- a/tests/test_session_pool.py +++ b/tests/test_session_pool.py @@ -26,9 +26,9 @@ def test_init_basic(self): """Test basic initialization""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig(size=3) pool = SessionPool(copy(sessions), config) @@ -42,8 +42,8 @@ def test_init_with_config(self): """Test initialization with custom config""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2, wait_timeout=10.0) pool = SessionPool(copy(sessions), config) @@ -55,9 +55,9 @@ def test_init_with_all_config_params(self): """Test initialization with all configuration parameters""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig( size=3, @@ -73,9 +73,9 @@ def test_borrow_single_session(self): """Test borrowing a single session""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } pool = SessionPool(copy(sessions), SessionPoolConfig(size=3)) @@ -95,8 +95,8 @@ def test_borrow_all_sessions(self): """Test borrowing all available sessions""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } pool = SessionPool(copy(sessions), SessionPoolConfig(size=2)) @@ -110,7 +110,7 @@ def test_borrow_timeout_exceeded(self): """Test borrowing when timeout is exceeded""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1, wait_timeout=0.2) pool = SessionPool(copy(sessions), config) @@ -125,7 +125,7 @@ def test_borrow_infinite_wait_with_release(self): """Test borrowing with infinite wait that succeeds when session becomes available""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1, wait_timeout=None) pool = SessionPool(copy(sessions), config) @@ -158,7 +158,7 @@ def test_concurrent_borrowing(self): """Test concurrent borrowing from multiple threads""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) + Session(_conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) for i in range(5) } config = SessionPoolConfig(size=5) @@ -196,8 +196,8 @@ def test_semaphore_consistency(self): """Test that semaphore behavior stays consistent with actual session availability""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = SessionPool(copy(sessions), config) @@ -225,49 +225,49 @@ def test_close_all_free_sessions(self): """Test closing pool with all sessions free""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig(size=3) pool = SessionPool(copy(sessions), config) # Mock the close_session method for all sessions for session in sessions: - session.close = Mock() + session._close = Mock() - pool.close() + pool._close() # Should close all sessions for session in sessions: - session.close.assert_called_once() + session._close.assert_called_once() @patch('nebulagraph_python.client._session_pool.logger') def test_close_with_busy_sessions(self, mock_logger): """Test closing pool with some busy sessions""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig(size=3) pool = SessionPool(copy(sessions), config) # Mock the close_session method for all sessions for session in sessions: - session.close = Mock() + session._close = Mock() # Manually move a session to busy state busy_session = list(sessions)[1] # Get the second session pool.free_sessions_queue.remove(busy_session) pool.busy_sessions_queue.add(busy_session) - pool.close() + pool._close() # Should close all sessions for session in sessions: - session.close.assert_called_once() + session._close.assert_called_once() # Should log error about busy sessions mock_logger.error.assert_called_once() assert "Busy sessions remain" in mock_logger.error.call_args[0][0] @@ -337,8 +337,8 @@ def test_multiple_borrow_release_cycles(self): """Test multiple borrow-release cycles work correctly""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = SessionPool(copy(sessions), config) @@ -370,9 +370,9 @@ async def test_init_basic(self): """Test basic initialization""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig(size=3) pool = AsyncSessionPool(copy(sessions), config) @@ -389,8 +389,8 @@ async def test_init_with_config(self): """Test initialization with custom config""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2, wait_timeout=10.0) pool = AsyncSessionPool(copy(sessions), config) @@ -403,9 +403,9 @@ async def test_init_with_all_config_params(self): """Test initialization with all configuration parameters""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig( size=3, @@ -422,9 +422,9 @@ async def test_borrow_single_session(self): """Test borrowing a single session""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig(size=3) pool = AsyncSessionPool(copy(sessions), config) @@ -446,8 +446,8 @@ async def test_borrow_all_sessions(self): """Test borrowing all available sessions""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = AsyncSessionPool(copy(sessions), config) @@ -463,7 +463,7 @@ async def test_borrow_timeout_exceeded(self): """Test borrowing when timeout is exceeded""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1, wait_timeout=0.2) pool = AsyncSessionPool(copy(sessions), config) @@ -479,7 +479,7 @@ async def test_borrow_infinite_wait_with_release(self): """Test borrowing with infinite wait that succeeds when session becomes available""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1, wait_timeout=None) pool = AsyncSessionPool(copy(sessions), config) @@ -507,7 +507,7 @@ async def test_concurrent_borrowing(self): """Test concurrent borrowing from multiple coroutines""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) + AsyncSession(_conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) for i in range(5) } config = SessionPoolConfig(size=5) @@ -539,8 +539,8 @@ async def test_semaphore_consistency(self): """Test that semaphore behavior stays consistent with actual session availability""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = AsyncSessionPool(copy(sessions), config) @@ -581,22 +581,22 @@ async def test_close_all_free_sessions(self): """Test closing pool with all sessions free""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } config = SessionPoolConfig(size=3) pool = AsyncSessionPool(copy(sessions), config) # Mock the close_session method for all sessions for session in sessions: - session.close = AsyncMock() + session._close = AsyncMock() - await pool.close() + await pool._close() # Should close all sessions for session in sessions: - session.close.assert_called_once() + session._close.assert_called_once() @pytest.mark.asyncio @patch('nebulagraph_python.client._session_pool.logger') @@ -604,26 +604,26 @@ async def test_close_with_busy_sessions(self, mock_logger): """Test closing pool with some busy sessions""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user3", password="pass3", session_config=None, auth_options=None), } pool = AsyncSessionPool(copy(sessions), config=SessionPoolConfig(size=3)) # Mock the close_session method for all sessions for session in sessions: - session.close = AsyncMock() + session._close = AsyncMock() # Manually move a session to busy state busy_session = list(sessions)[1] # Get the second session pool.free_sessions_queue.remove(busy_session) pool.busy_sessions_queue.add(busy_session) - await pool.close() + await pool._close() # Should close all sessions for session in sessions: - session.close.assert_called_once() + session._close.assert_called_once() # Should log error about busy sessions mock_logger.error.assert_called_once() assert "Busy sessions remain" in mock_logger.error.call_args[0][0] @@ -696,8 +696,8 @@ async def test_multiple_borrow_release_cycles(self): """Test multiple borrow-release cycles work correctly""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = AsyncSessionPool(copy(sessions), config) @@ -728,7 +728,7 @@ def test_sync_pool_exception_in_context(self): """Test that sessions are properly returned even when exceptions occur in sync pool""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1) pool = SessionPool(copy(sessions), config) @@ -747,7 +747,7 @@ async def test_async_pool_exception_in_context(self): """Test that sessions are properly returned even when exceptions occur in async pool""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1) pool = AsyncSessionPool(copy(sessions), config) @@ -765,8 +765,8 @@ def test_sync_multiple_exceptions_in_context(self): """Test multiple exceptions in sync pool context managers""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - Session(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = SessionPool(copy(sessions), config) @@ -786,8 +786,8 @@ async def test_async_multiple_exceptions_in_context(self): """Test multiple exceptions in async pool context managers""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), - AsyncSession(conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user2", password="pass2", session_config=None, auth_options=None), } config = SessionPoolConfig(size=2) pool = AsyncSessionPool(copy(sessions), config) @@ -821,7 +821,7 @@ def test_sync_zero_timeout(self): mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } pool = SessionPool(copy(sessions), config) @@ -838,7 +838,7 @@ async def test_async_zero_timeout(self): mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } pool = AsyncSessionPool(copy(sessions), config) @@ -888,7 +888,7 @@ def test_sync_pool_with_custom_retry_interval(self): """Test sync pool behavior with custom retry interval""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + Session(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1, wait_timeout=0.3) pool = SessionPool(copy(sessions), config) @@ -907,7 +907,7 @@ async def test_async_pool_with_custom_retry_interval(self): """Test async pool behavior with custom retry interval""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), + AsyncSession(_conn=mock_conn, username="user1", password="pass1", session_config=None, auth_options=None), } config = SessionPoolConfig(size=1, wait_timeout=0.3) pool = AsyncSessionPool(copy(sessions), config) @@ -929,7 +929,7 @@ def test_sync_high_concurrency_stress(self): """Test sync pool under high concurrency stress""" mock_conn = Mock() sessions = { - Session(conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) + Session(_conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) for i in range(10) } config = SessionPoolConfig(size=10) @@ -969,7 +969,7 @@ async def test_async_high_concurrency_stress(self): """Test async pool under high concurrency stress""" mock_conn = AsyncMock() sessions = { - AsyncSession(conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) + AsyncSession(_conn=mock_conn, username=f"user{i}", password=f"pass{i}", session_config=None, auth_options=None) for i in range(10) } config = SessionPoolConfig(size=10) From 4c94eaa43275d3560dfca6eb2f4b9ef7e1c5861b Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:35:19 +0800 Subject: [PATCH 3/9] fix typo (#387) --- .gitignore | 3 +-- src/nebulagraph_python/client/_connection.py | 2 +- src/nebulagraph_python/client/client.py | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index ff6b0b57..11b02249 100644 --- a/.gitignore +++ b/.gitignore @@ -161,7 +161,6 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ -.idea +.idea/ .DS_Store docs/.DS_Store diff --git a/src/nebulagraph_python/client/_connection.py b/src/nebulagraph_python/client/_connection.py index a2f02630..ced1580f 100644 --- a/src/nebulagraph_python/client/_connection.py +++ b/src/nebulagraph_python/client/_connection.py @@ -66,7 +66,7 @@ class ConnectionConfig: ping_before_execute: bool = False @classmethod - def from_defults( + def from_defaults( cls, hosts: Union[str, List[str], List[HostAddress]], ssl_param: Union[SSLParam, Literal[True], None] = None, diff --git a/src/nebulagraph_python/client/client.py b/src/nebulagraph_python/client/client.py index 1b57fa81..4b6abc06 100644 --- a/src/nebulagraph_python/client/client.py +++ b/src/nebulagraph_python/client/client.py @@ -91,7 +91,7 @@ async def connect( session_config: Session configuration. """ self = super().__new__(cls) - conn_conf = conn_config or ConnectionConfig.from_defults(hosts, ssl_param) + conn_conf = conn_config or ConnectionConfig.from_defaults(hosts, ssl_param) hosts = conn_conf.hosts self._sessions = {} if len(hosts) == 1: @@ -207,7 +207,7 @@ def __init__( session_config: Session configuration. session_pool_config: Session pool configuration. If provided, a session pool will be created. """ - conn_conf = conn_config or ConnectionConfig.from_defults(hosts, ssl_param) + conn_conf = conn_config or ConnectionConfig.from_defaults(hosts, ssl_param) hosts = conn_conf.hosts self._sessions = {} if len(hosts) == 1: From e71030ac6373dc0090c24d688bd9e4e335657a0e Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Tue, 6 Jan 2026 11:56:12 +0800 Subject: [PATCH 4/9] add example for NebulaPool (#390) --- example/NebulaPoolExample.py | 66 ++++++++++++++++++++++++++++++++++++ run_example.sh | 6 ++++ 2 files changed, 72 insertions(+) create mode 100755 example/NebulaPoolExample.py create mode 100755 run_example.sh diff --git a/example/NebulaPoolExample.py b/example/NebulaPoolExample.py new file mode 100755 index 00000000..575eaf77 --- /dev/null +++ b/example/NebulaPoolExample.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from typing import Optional, Dict +from nebulagraph_python.client.pool import NebulaPool +from nebulagraph_python.data import HostAddress +from dataclasses import dataclass, field + +@dataclass +class SessionConfig: + schema: Optional[str] = None + graph: Optional[str] = None + timezone: Optional[str] = None + values: Dict[str, str] = field(default_factory=dict) + configs: Dict[str, str] = field(default_factory=dict) + +graph_name = "test_graph" + +def main(): + # config the connect information + hosts = ["127.0.0.1:9669"] + username = "root" + password = "NebulaGraph01" + + # create NebulaPool + pool = NebulaPool( + hosts=hosts, + username=username, + password=password, + session_config=SessionConfig(graph=graph_name) + ) + + try: + print("use execute_py to execute `SHOW GRAPHS` ...") + result = pool.execute_py("SHOW GRAPHS") + + # 打印结果 + print("\n query result:") + print("-" * 50) + result.print(style="table") + + print("\n\nuse execute to execute `SHOW GRAPHS`:") + print("-" * 50) + result2 = pool.execute("SHOW GRAPHS") + result2.print(style="table") + + # get the query result + print("-" * 50) + if result.size > 0: + for row in result: + print(f"Row: {row}") + else: + print("Empty") + + except Exception as e: + print(f"\nerror: {e}") + import traceback + traceback.print_exc() + finally: + print("\nclose the pool...") + pool.close() + print("closed") + + +if __name__ == "__main__": + main() diff --git a/run_example.sh b/run_example.sh new file mode 100755 index 00000000..312ee8a7 --- /dev/null +++ b/run_example.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# run example program, using local source code and dependencies + +export PYTHONPATH="${PYTHONPATH}:$(pwd)/src:$(pwd)/deps" + +python3 example/NebulaPoolExample.py From e01315e3ebdf288e66ec0bb55cd04a26d96cfa90 Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:13:18 +0800 Subject: [PATCH 5/9] fix time overflow for hour 25 (#391) --- src/nebulagraph_python/decoder/value_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nebulagraph_python/decoder/value_parser.py b/src/nebulagraph_python/decoder/value_parser.py index c931a37a..d4cb869f 100644 --- a/src/nebulagraph_python/decoder/value_parser.py +++ b/src/nebulagraph_python/decoder/value_parser.py @@ -727,7 +727,7 @@ def bytes_to_zoned_time(self, data: bytes) -> datetime.time: ) # Create base time and add timezone offset minutes - base_time = datetime.time(hour, minute, second, microsecond) + base_time = datetime.time(hour % 24, minute, second, microsecond) adjusted_time = ( datetime.datetime.combine(datetime.date.today(), base_time) + datetime.timedelta(minutes=current_offset) From eecca11f4699542ef9a7c6c2a41d900411626583 Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:51:07 +0800 Subject: [PATCH 6/9] fix None list value (#392) --- src/nebulagraph_python/decoder/value_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nebulagraph_python/decoder/value_parser.py b/src/nebulagraph_python/decoder/value_parser.py index d4cb869f..0a4fbca9 100644 --- a/src/nebulagraph_python/decoder/value_parser.py +++ b/src/nebulagraph_python/decoder/value_parser.py @@ -1003,7 +1003,7 @@ def _decode_composite_value( values = [] for i in range(list_size): if (null_bit_bytes[i // 8] & (1 << (i % 8))) == 0: - values.append(None) + values.append(ValueWrapper(None, ColumnType.NULL)) else: value = self._decode_composite_value(reader, ele_type) values.append(ValueWrapper(value, ele_type)) From 47e0457cfccd478b92f60a2801fd03649e891c4b Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Thu, 8 Jan 2026 10:21:50 +0800 Subject: [PATCH 7/9] fix duration type bug (#395) --- .../decoder/value_parser.py | 21 ++++++++++---- src/nebulagraph_python/py_data_types.py | 28 ++++++------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/nebulagraph_python/decoder/value_parser.py b/src/nebulagraph_python/decoder/value_parser.py index 0a4fbca9..0565c69f 100644 --- a/src/nebulagraph_python/decoder/value_parser.py +++ b/src/nebulagraph_python/decoder/value_parser.py @@ -77,6 +77,8 @@ INT64_SIZE, LIST_HEADER_SIZE, LOCAL_TIME_SIZE, + MICRO_SECONDS_OF_DAY, + MICRO_SECONDS_OF_HOUR, MICRO_SECONDS_OF_MINUTE, MICRO_SECONDS_OF_SECOND, MONTH_SIZE, @@ -783,21 +785,28 @@ def bytes_to_duration(self, data: bytes) -> "NDuration": duration_value = qword >> 1 # Initialize all fields - month, second, micro_sec = 0, 0, 0 + year, month, day, hour, minute, second, micro_sec = 0, 0, 0, 0, 0, 0, 0 if is_month_based: # For month-based duration + year = int(duration_value / 12) month = int(duration_value % 12) else: # For time-based duration - second = int( - (duration_value % MICRO_SECONDS_OF_MINUTE) // MICRO_SECONDS_OF_SECOND, - ) - micro_sec = int(duration_value % MICRO_SECONDS_OF_SECOND) + day = int (duration_value / MICRO_SECONDS_OF_DAY) + hour = int (duration_value % MICRO_SECONDS_OF_DAY / MICRO_SECONDS_OF_HOUR) + minute = int (duration_value % MICRO_SECONDS_OF_HOUR / MICRO_SECONDS_OF_MINUTE) + second = int ((duration_value % MICRO_SECONDS_OF_MINUTE) / MICRO_SECONDS_OF_SECOND) + micro_sec = int (duration_value % MICRO_SECONDS_OF_SECOND) return NDuration( + is_month_based=is_month_based, + year=year, + month=month, + day=day, + hour=hour, + minute=minute, seconds=second, microseconds=micro_sec, - months=month, ) def bytes_to_any( diff --git a/src/nebulagraph_python/py_data_types.py b/src/nebulagraph_python/py_data_types.py index 11e15070..574f6381 100644 --- a/src/nebulagraph_python/py_data_types.py +++ b/src/nebulagraph_python/py_data_types.py @@ -741,25 +741,15 @@ def __hash__(self): class NDuration(BaseDataObject): - def __init__(self, seconds: int, microseconds: int, months: int): - self.is_month_based = months != 0 - - # Convert seconds and microseconds to time components - total_seconds = abs(seconds) - hours = total_seconds // 3600 - minutes = (total_seconds % 3600) // 60 - secs = total_seconds % 60 - - # Convert months to year/month - years = months // 12 - remaining_months = months % 12 - - self.year = years - self.month = remaining_months - self.day = 0 # Not month based - self.hour = hours - self.minute = minutes - self.second = secs + def __init__(self, is_month_based: bool, year: int, month: int, day: int, + hour: int, minute: int, seconds: int, microseconds: int): + self.is_month_based = is_month_based + self.year = year + self.month = month + self.day = day + self.hour = hour + self.minute = minute + self.second = seconds self.microsec = microseconds def get_year(self) -> int: From ed029020a899b7a4087e75ce49b1d5b678f2fbab Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Thu, 8 Jan 2026 11:21:08 +0800 Subject: [PATCH 8/9] update doc for special types (#396) * upadte doc for special types * update start desc --- docs/1_started.md | 8 +++++--- docs/5_vector_and_special_types.md | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/1_started.md b/docs/1_started.md index 4ef54d72..a26a45d0 100644 --- a/docs/1_started.md +++ b/docs/1_started.md @@ -1,15 +1,17 @@ # NebulaGraph Python Client Getting Started ## Installation +from pypi ```bash -pip install ng-python # not yet published +pip install nebula5_python==5.2.0 ``` from source ```bash -cd python +git clone -b https://github.com/vesoft-inc/nebula-python.git +cd nebula-python pip install -e . ``` @@ -152,4 +154,4 @@ Run `ngcli --help` to get the help message. An example to connect to NebulaGraph ```bash ngcli -h 127.0.0.1:9669 -u root -p NebulaGraph01 -``` \ No newline at end of file +``` diff --git a/docs/5_vector_and_special_types.md b/docs/5_vector_and_special_types.md index b054f3f5..e95b2b48 100644 --- a/docs/5_vector_and_special_types.md +++ b/docs/5_vector_and_special_types.md @@ -53,15 +53,15 @@ cli.close() ### NDuration `NDuration` represents a duration with support for both month-based and time-based forms. -- If `months != 0`, the instance is month-based (`is_month_based = True`), and `years`/`months` are derived from the `months` argument. -- If `months == 0`, the instance is time-based (days default to 0 in current implementation) and uses the `seconds` and `microseconds` arguments to derive `hour`, `minute`, `second`, `microsec`. +- If `is_month_based = True`, the instance uses `year` and `month` fields for duration representation. +- If `is_month_based = False`, the instance uses `day`, `hour`, `minute`, `second`, and `microsecond` fields for duration representation. The `__str__` produces an ISO-8601–like string: - Month-based examples: `P1Y2M`, `P0M` - Time-based examples: `PT0S`, `PT1H2M3S`, `PT3.5S`, `PT-0.000123S` API: -- Constructor: `NDuration(seconds: int, microseconds: int, months: int)` +- Constructor: `NDuration(is_month_based: bool, year: int, month: int, day: int, hour: int, minute: int, seconds: int, microseconds: int)` - Properties/Methods: - `is_month_based: bool` - `get_year() -> int`, `get_month() -> int`, `get_day() -> int` From 446c375d6f0662bed72a7d2d75fc7edc388a4665 Mon Sep 17 00:00:00 2001 From: Anqi <16240361+Nicole00@users.noreply.github.com> Date: Thu, 8 Jan 2026 11:37:01 +0800 Subject: [PATCH 9/9] update doc (#397) --- docs/1_started.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/docs/1_started.md b/docs/1_started.md index a26a45d0..d7eb7074 100644 --- a/docs/1_started.md +++ b/docs/1_started.md @@ -10,7 +10,7 @@ pip install nebula5_python==5.2.0 from source ```bash -git clone -b https://github.com/vesoft-inc/nebula-python.git +git clone -b dev https://github.com/vesoft-inc/nebula-python.git cd nebula-python pip install -e . ``` @@ -127,6 +127,23 @@ with NebulaClient( If you prefer manual lifecycle control, you can explicitly open and close clients. +- Sync version: + +```python +from nebulagraph_python import NebulaClient + +client = NebulaClient( + hosts=["127.0.0.1:9669"], + username="root", + password="NebulaGraph01", +) +try: + result = client.execute("RETURN 1 AS a, 2 AS b") + result.print() +finally: + client.close() +``` + - Async version: ```python