Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
5bf5d4c
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
400a8bd
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
3c78ed7
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
9625229
Introduce Sea HTTP Client and test script (#583)
varun-edachali-dbx Jun 4, 2025
0887bc1
Introduce `SeaDatabricksClient` (Session Implementation) (#582)
varun-edachali-dbx Jun 9, 2025
6d63df0
Normalise Execution Response (clean backend interfaces) (#587)
varun-edachali-dbx Jun 11, 2025
ba8d9fd
Introduce models for `SeaDatabricksClient` (#595)
varun-edachali-dbx Jun 12, 2025
bb3f15a
Introduce preliminary SEA Result Set (#588)
varun-edachali-dbx Jun 12, 2025
19f1fae
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 17, 2025
6c5ba6d
remove invalid ExecuteResponse import
varun-edachali-dbx Jun 17, 2025
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbx Jun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbx Jun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbx Jun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbx Jun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbx Jun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbx Jun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbx Jun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbx Jun 18, 2025
a74d279
Implement SeaDatabricksClient (Complete Execution Spec) (#590)
varun-edachali-dbx Jun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbx Jun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbx Jun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbx Jun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 24, 2025
c20058e
use lazy logging
varun-edachali-dbx Jun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbx Jun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbx Jun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbx Jun 24, 2025
cbf63f9
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 26, 2025
59b4825
remove duplicate test, correct active_command_id attribute
varun-edachali-dbx Jun 26, 2025
e380654
SeaDatabricksClient: Add Metadata Commands (#593)
varun-edachali-dbx Jun 26, 2025
677a7b0
SEA volume operations fix: assign `manifest.is_volume_operation` to `…
varun-edachali-dbx Jun 26, 2025
45585d4
Introduce manual SEA test scripts for Exec Phase (#589)
varun-edachali-dbx Jun 27, 2025
70c7dc8
Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRAY` forma…
varun-edachali-dbx Jul 2, 2025
abf9aab
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 3, 2025
9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 3, 2025
4f11ff0
Introduce `row_limit` param (#607)
varun-edachali-dbx Jul 7, 2025
45f5c26
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 10, 2025
2c9368a
formatting (black)
varun-edachali-dbx Jul 10, 2025
9b1b1f5
remove repetition from Session.__init__
varun-edachali-dbx Jul 10, 2025
77e23d3
Merge branch 'backend-refactors' into sea-migration
varun-edachali-dbx Jul 11, 2025
3bd3aef
fix merge artifacts
varun-edachali-dbx Jul 11, 2025
6d4701f
correct patch paths
varun-edachali-dbx Jul 11, 2025
dc1cb6d
fix type issues
varun-edachali-dbx Jul 14, 2025
5d04cd0
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 15, 2025
922c448
explicitly close result queue
varun-edachali-dbx Jul 15, 2025
1a0575a
Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW` format…
varun-edachali-dbx Jul 16, 2025
c07beb1
SEA Session Configuration Fix: Explicitly convert values to `str` (#…
varun-edachali-dbx Jul 16, 2025
640cc82
SEA: add support for `Hybrid` disposition (#631)
varun-edachali-dbx Jul 17, 2025
8fbca9d
SEA: Reduce network calls for synchronous commands (#633)
varun-edachali-dbx Jul 19, 2025
806e5f5
SEA: Decouple Link Fetching (#632)
varun-edachali-dbx Jul 21, 2025
b57c3f3
Chunk download latency (#634)
saishreeeee Jul 21, 2025
ef5836b
acquire lock before notif + formatting (black)
varun-edachali-dbx Jul 21, 2025
24332c8
Revert "acquire lock before notif + formatting (black)"
varun-edachali-dbx Jul 22, 2025
1266863
Revert "SEA: Decouple Link Fetching (#632)"
varun-edachali-dbx Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions examples/experimental/sea_connector_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Main script to run all SEA connector tests.

This script runs all the individual test modules and displays
a summary of test results with visual indicators.

In order to run the script, the following environment variables need to be set:
- DATABRICKS_SERVER_HOSTNAME: The hostname of the Databricks server
- DATABRICKS_HTTP_PATH: The HTTP path of the Databricks server
- DATABRICKS_TOKEN: The token to use for authentication
"""

import os
import sys
import logging
import subprocess
from typing import List, Tuple

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

TEST_MODULES = [
"test_sea_session",
"test_sea_sync_query",
"test_sea_async_query",
"test_sea_metadata",
]


def run_test_module(module_name: str) -> bool:
"""Run a test module and return success status."""
module_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "tests", f"{module_name}.py"
)

# Simply run the module as a script - each module handles its own test execution
result = subprocess.run(
[sys.executable, module_path], capture_output=True, text=True
)

# Log the output from the test module
if result.stdout:
for line in result.stdout.strip().split("\n"):
logger.info(line)

if result.stderr:
for line in result.stderr.strip().split("\n"):
logger.error(line)

return result.returncode == 0


def run_tests() -> List[Tuple[str, bool]]:
"""Run all tests and return results."""
results = []

for module_name in TEST_MODULES:
try:
logger.info(f"\n{'=' * 50}")
logger.info(f"Running test: {module_name}")
logger.info(f"{'-' * 50}")

success = run_test_module(module_name)
results.append((module_name, success))

status = "✅ PASSED" if success else "❌ FAILED"
logger.info(f"Test {module_name}: {status}")

except Exception as e:
logger.error(f"Error loading or running test {module_name}: {str(e)}")
import traceback

logger.error(traceback.format_exc())
results.append((module_name, False))

return results


def print_summary(results: List[Tuple[str, bool]]) -> None:
"""Print a summary of test results."""
logger.info(f"\n{'=' * 50}")
logger.info("TEST SUMMARY")
logger.info(f"{'-' * 50}")

passed = sum(1 for _, success in results if success)
total = len(results)

for module_name, success in results:
status = "✅ PASSED" if success else "❌ FAILED"
logger.info(f"{status} - {module_name}")

logger.info(f"{'-' * 50}")
logger.info(f"Total: {total} | Passed: {passed} | Failed: {total - passed}")
logger.info(f"{'=' * 50}")


if __name__ == "__main__":
# Check if required environment variables are set
required_vars = [
"DATABRICKS_SERVER_HOSTNAME",
"DATABRICKS_HTTP_PATH",
"DATABRICKS_TOKEN",
]
missing_vars = [var for var in required_vars if not os.environ.get(var)]

if missing_vars:
logger.error(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logger.error("Please set these variables before running the tests.")
sys.exit(1)

# Run all tests
results = run_tests()

# Print summary
print_summary(results)

# Exit with appropriate status code
all_passed = all(success for _, success in results)
sys.exit(0 if all_passed else 1)
Empty file.
241 changes: 241 additions & 0 deletions examples/experimental/tests/test_sea_async_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
"""
Test for SEA asynchronous query execution functionality.
"""
import os
import sys
import logging
import time
from databricks.sql.client import Connection
from databricks.sql.backend.types import CommandState

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def test_sea_async_query_with_cloud_fetch():
"""
Test executing a query asynchronously using the SEA backend with cloud fetch enabled.

This function connects to a Databricks SQL endpoint using the SEA backend,
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
"""
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error(
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
)
return False

try:
# Create connection with cloud fetch enabled
logger.info(
"Creating connection for asynchronous query execution with cloud fetch enabled"
)
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=True,
enable_query_result_lz4_compression=False,
)

logger.info(
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a query that generates large rows to force multiple chunks
requested_row_count = 5000
cursor = connection.cursor()
query = f"""
SELECT
id,
concat('value_', repeat('a', 10000)) as test_value
FROM range(1, {requested_row_count} + 1) AS t(id)
"""

logger.info(
f"Executing asynchronous query with cloud fetch to generate {requested_row_count} rows"
)
cursor.execute_async(query)
logger.info(
"Asynchronous query submitted successfully with cloud fetch enabled"
)

# Check query state
logger.info("Checking query state...")
while cursor.is_query_pending():
logger.info("Query is still pending, waiting...")
time.sleep(1)

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()

results = [cursor.fetchone()]
results.extend(cursor.fetchmany(10))
results.extend(cursor.fetchall())
actual_row_count = len(results)

logger.info(
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
)

# Verify total row count
if actual_row_count != requested_row_count:
logger.error(
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
)
return False

logger.info(
"PASS: Received correct number of rows with cloud fetch and all fetch methods work correctly"
)

# Close resources
cursor.close()
connection.close()
logger.info("Successfully closed SEA session")

return True

except Exception as e:
logger.error(
f"Error during SEA asynchronous query execution test with cloud fetch: {str(e)}"
)
import traceback

logger.error(traceback.format_exc())
return False


def test_sea_async_query_without_cloud_fetch():
"""
Test executing a query asynchronously using the SEA backend with cloud fetch disabled.

This function connects to a Databricks SQL endpoint using the SEA backend,
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
"""
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error(
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
)
return False

try:
# Create connection with cloud fetch disabled
logger.info(
"Creating connection for asynchronous query execution with cloud fetch disabled"
)
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=False,
enable_query_result_lz4_compression=False,
)

logger.info(
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
requested_row_count = 100
cursor = connection.cursor()
query = f"""
SELECT
id,
concat('value_', repeat('a', 100)) as test_value
FROM range(1, {requested_row_count} + 1) AS t(id)
"""

logger.info(
f"Executing asynchronous query without cloud fetch to generate {requested_row_count} rows"
)
cursor.execute_async(query)
logger.info(
"Asynchronous query submitted successfully with cloud fetch disabled"
)

# Check query state
logger.info("Checking query state...")
while cursor.is_query_pending():
logger.info("Query is still pending, waiting...")
time.sleep(1)

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()
results = [cursor.fetchone()]
results.extend(cursor.fetchmany(10))
results.extend(cursor.fetchall())
actual_row_count = len(results)

logger.info(
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
)

# Verify total row count
if actual_row_count != requested_row_count:
logger.error(
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
)
return False

logger.info(
"PASS: Received correct number of rows without cloud fetch and all fetch methods work correctly"
)

# Close resources
cursor.close()
connection.close()
logger.info("Successfully closed SEA session")

return True

except Exception as e:
logger.error(
f"Error during SEA asynchronous query execution test without cloud fetch: {str(e)}"
)
import traceback

logger.error(traceback.format_exc())
return False


def test_sea_async_query_exec():
"""
Run both asynchronous query tests and return overall success.
"""
with_cloud_fetch_success = test_sea_async_query_with_cloud_fetch()
logger.info(
f"Asynchronous query with cloud fetch: {'✅ PASSED' if with_cloud_fetch_success else '❌ FAILED'}"
)

without_cloud_fetch_success = test_sea_async_query_without_cloud_fetch()
logger.info(
f"Asynchronous query without cloud fetch: {'✅ PASSED' if without_cloud_fetch_success else '❌ FAILED'}"
)

return with_cloud_fetch_success and without_cloud_fetch_success


if __name__ == "__main__":
success = test_sea_async_query_exec()
sys.exit(0 if success else 1)
Loading
Loading