Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .github/workflows/update_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from typing import Optional


def update_version(version_type: str = "dev", custom_suffix: Optional[str] = None) -> str:
def update_version(
version_type: str = "dev", custom_suffix: Optional[str] = None
) -> str:
"""
Update the `version` field in `pyproject.toml` for supported manual build types.

Expand All @@ -25,17 +27,21 @@ def update_version(version_type: str = "dev", custom_suffix: Optional[str] = Non
content: str = pyproject_path.read_text()

# Extract current version
version_match: Optional[re.Match[str]] = re.search(r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE)
version_match: Optional[re.Match[str]] = re.search(
r'^version\s*=\s*"([^"]+)"', content, re.MULTILINE
)
if not version_match:
raise ValueError("Could not find version in pyproject.toml")

current_version: str = version_match.group(1)

# Parse the base version (remove any existing suffixes)
base_version_match: Optional[re.Match[str]] = re.match(r'^(\d+\.\d+\.\d+)', current_version)
base_version_match: Optional[re.Match[str]] = re.match(
r"^(\d+\.\d+\.\d+)", current_version
)
if not base_version_match:
raise ValueError(f"Invalid version format: {current_version}")

base_version: str = base_version_match.group(1)

# Only dev/custom builds mutate the version; all others keep current version
Expand Down
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea
.idea/
.DS_Store
docs/.DS_Store
25 changes: 22 additions & 3 deletions docs/1_started.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# NebulaGraph Python Client Getting Started

## Installation
from pypi

```bash
pip install ng-python # not yet published
pip install nebula5_python==5.2.0
```

from source

```bash
cd python
git clone -b dev https://github.com/vesoft-inc/nebula-python.git
cd nebula-python
pip install -e .
```

Expand Down Expand Up @@ -125,6 +127,23 @@ with NebulaClient(

If you prefer manual lifecycle control, you can explicitly open and close clients.

- Sync version:

```python
from nebulagraph_python import NebulaClient

client = NebulaClient(
hosts=["127.0.0.1:9669"],
username="root",
password="NebulaGraph01",
)
try:
result = client.execute("RETURN 1 AS a, 2 AS b")
result.print()
finally:
client.close()
```

- Async version:

```python
Expand Down Expand Up @@ -152,4 +171,4 @@ Run `ngcli --help` to get the help message. An example to connect to NebulaGraph

```bash
ngcli -h 127.0.0.1:9669 -u root -p NebulaGraph01
```
```
20 changes: 20 additions & 0 deletions docs/2_concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ async def concurrent_example():
asyncio.run(concurrent_example())
```

## Contextual Execution

By default, statements run on a random session from the pool. When you need to run several queries on the same session, call `borrow` to obtain and reuse a specific session.

```python
async def contextual_example():
async with await NebulaAsyncClient.connect(
hosts=["127.0.0.1:9669"],
username="root",
password="NebulaGraph01",
session_pool_config=SessionPoolConfig(),
) as client:
print("Connected to the server...")
async with client.borrow() as session:
await session.execute("SESSION SET GRAPH movie")
res = await session.execute("MATCH (v:Movie) RETURN count(v)")
res.print()
```


## Understanding Timeout Values

The client uses three different timeouts that apply at different stages:
Expand Down
6 changes: 3 additions & 3 deletions docs/5_vector_and_special_types.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ cli.close()
### NDuration

`NDuration` represents a duration with support for both month-based and time-based forms.
- If `months != 0`, the instance is month-based (`is_month_based = True`), and `years`/`months` are derived from the `months` argument.
- If `months == 0`, the instance is time-based (days default to 0 in current implementation) and uses the `seconds` and `microseconds` arguments to derive `hour`, `minute`, `second`, `microsec`.
- If `is_month_based = True`, the instance uses `year` and `month` fields for duration representation.
- If `is_month_based = False`, the instance uses `day`, `hour`, `minute`, `second`, and `microsecond` fields for duration representation.

The `__str__` produces an ISO-8601–like string:
- Month-based examples: `P1Y2M`, `P0M`
- Time-based examples: `PT0S`, `PT1H2M3S`, `PT3.5S`, `PT-0.000123S`

API:
- Constructor: `NDuration(seconds: int, microseconds: int, months: int)`
- Constructor: `NDuration(is_month_based: bool, year: int, month: int, day: int, hour: int, minute: int, seconds: int, microseconds: int)`
- Properties/Methods:
- `is_month_based: bool`
- `get_year() -> int`, `get_month() -> int`, `get_day() -> int`
Expand Down
4 changes: 3 additions & 1 deletion docs/7_orm_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class Edge1(

if input("Execute the DDL? (y/N)") == "y":
client.execute_py(graph_type.to_gql())
client.execute_py("CREATE GRAPH IF NOT EXISTS define_type_test define_type_test_type")
client.execute_py(
"CREATE GRAPH IF NOT EXISTS define_type_test define_type_test_type"
)


q = upsert_gql(
Expand Down
1 change: 1 addition & 0 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def sync_session_pool_example():
if __name__ == "__main__":
import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
logging.getLogger("nebulagraph_python").setLevel(logging.DEBUG)

Expand Down
66 changes: 66 additions & 0 deletions example/NebulaPoolExample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Optional, Dict
from nebulagraph_python.client.pool import NebulaPool
from nebulagraph_python.data import HostAddress
from dataclasses import dataclass, field

@dataclass
class SessionConfig:
schema: Optional[str] = None
graph: Optional[str] = None
timezone: Optional[str] = None
values: Dict[str, str] = field(default_factory=dict)
configs: Dict[str, str] = field(default_factory=dict)

graph_name = "test_graph"

def main():
# config the connect information
hosts = ["127.0.0.1:9669"]
username = "root"
password = "NebulaGraph01"

# create NebulaPool
pool = NebulaPool(
hosts=hosts,
username=username,
password=password,
session_config=SessionConfig(graph=graph_name)
)

try:
print("use execute_py to execute `SHOW GRAPHS` ...")
result = pool.execute_py("SHOW GRAPHS")

# 打印结果
print("\n query result:")
print("-" * 50)
result.print(style="table")

print("\n\nuse execute to execute `SHOW GRAPHS`:")
print("-" * 50)
result2 = pool.execute("SHOW GRAPHS")
result2.print(style="table")

# get the query result
print("-" * 50)
if result.size > 0:
for row in result:
print(f"Row: {row}")
else:
print("Empty")

except Exception as e:
print(f"\nerror: {e}")
import traceback
traceback.print_exc()
finally:
print("\nclose the pool...")
pool.close()
print("closed")


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions run_example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
# run example program, using local source code and dependencies

export PYTHONPATH="${PYTHONPATH}:$(pwd)/src:$(pwd)/deps"

python3 example/NebulaPoolExample.py
1 change: 0 additions & 1 deletion scripts/add_apache_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,3 @@ def main() -> None:

if __name__ == "__main__":
main()

10 changes: 9 additions & 1 deletion src/nebulagraph_python/client/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ConnectionConfig:
ping_before_execute: bool = False

@classmethod
def from_defults(
def from_defaults(
cls,
hosts: Union[str, List[str], List[HostAddress]],
ssl_param: Union[SSLParam, Literal[True], None] = None,
Expand Down Expand Up @@ -96,6 +96,8 @@ class Connection:

# Config
config: ConnectionConfig
# Track which host was successfully connected for session routing
connected: HostAddress | None = field(default=None, init=False)

# Owned Resources
_stub: Optional[graph_pb2_grpc.GraphServiceStub] = field(default=None, init=False)
Expand Down Expand Up @@ -152,6 +154,8 @@ def connect(self):
logger.info(
f"Successfully connected to {host_addr.host}:{host_addr.port}."
)
# Remember which host we actually connected to
self.connected = host_addr
return
except Exception as e:
logger.warning(
Expand All @@ -174,6 +178,7 @@ def close(self):
self._channel.close()
self._channel = None
self._stub = None
self.connected = None
except Exception:
logger.exception("Failed to close connection")

Expand Down Expand Up @@ -303,6 +308,7 @@ class AsyncConnection:
"""

config: ConnectionConfig
connected: HostAddress | None = None
_stub: Optional[graph_pb2_grpc.GraphServiceStub] = field(default=None, init=False)
_channel: Optional[grpc.aio.Channel] = field(
default=None, init=False
Expand Down Expand Up @@ -358,6 +364,7 @@ async def connect(self):
logger.info(
f"Successfully connected to {host_addr.host}:{host_addr.port} asynchronously."
)
self.connected = host_addr
return
except Exception as e:
logger.warning(
Expand All @@ -380,6 +387,7 @@ async def close(self):
await self._channel.close()
self._channel = None
self._stub = None
self.connected = None
except BaseException:
logger.exception("Failed to close async connection")

Expand Down
28 changes: 15 additions & 13 deletions src/nebulagraph_python/client/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from nebulagraph_python.error import ExecutingError


@dataclass
@dataclass(kw_only=True, frozen=True)
class SessionConfig:
schema: Optional[str] = None
graph: Optional[str] = None
Expand All @@ -47,31 +47,32 @@ class SessionBase:

@dataclass
class Session(SessionBase):
conn: "Connection"
_conn: "Connection"

def execute(
self, statement: str, *, timeout: Optional[float] = None, do_ping: bool = False
):
res = self.conn.execute(
res = self._conn.execute(
self._session, statement, timeout=timeout, do_ping=do_ping
)
# Retry for only one time
if res.status_code == ErrorCode.SESSION_NOT_FOUND.value:
self._session = self.conn.authenticate(
self._session = self._conn.authenticate(
self.username,
self.password,
session_config=self.session_config,
auth_options=self.auth_options,
)
res = self.conn.execute(
res = self._conn.execute(
self._session, statement, timeout=timeout, do_ping=do_ping
)
res.raise_on_error()
return res

def close(self):
def _close(self):
"""Close session"""
try:
self.conn.execute(self._session, "SESSION CLOSE")
self._conn.execute(self._session, "SESSION CLOSE")
except Exception:
logger.exception("Failed to close session")

Expand All @@ -84,30 +85,31 @@ def __eq__(self, other):

@dataclass
class AsyncSession(SessionBase):
conn: "AsyncConnection"
_conn: "AsyncConnection"

async def execute(
self, statement: str, *, timeout: Optional[float] = None, do_ping: bool = False
):
res = await self.conn.execute(
res = await self._conn.execute(
self._session, statement, timeout=timeout, do_ping=do_ping
)
# Retry for only one time
if res.status_code == ErrorCode.SESSION_NOT_FOUND.value:
self._session = await self.conn.authenticate(
self._session = await self._conn.authenticate(
self.username,
self.password,
session_config=self.session_config,
auth_options=self.auth_options,
)
res = await self.conn.execute(
res = await self._conn.execute(
self._session, statement, timeout=timeout, do_ping=do_ping
)
res.raise_on_error()
return res

async def close(self):
async def _close(self):
try:
await self.conn.execute(self._session, "SESSION CLOSE")
await self._conn.execute(self._session, "SESSION CLOSE")
except Exception:
logger.exception("Failed to close async session")

Expand Down
Loading