Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions examples/client_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

import os

import pyarrow as pa

from altertable_flightsql import Client
from altertable_flightsql.client import IngestIncrementalOptions

ALTERTABLE_HOST = os.getenv("ALTERTABLE_HOST", "flight.altertable.ai")
ALTERTABLE_PORT = int(os.getenv("ALTERTABLE_PORT", "443"))
Expand Down Expand Up @@ -133,6 +136,80 @@ def example_transactions():
print()


def example_bulk_ingest():
"""Bulk ingest data using Arrow Flight."""
print("=" * 60)
print("Example: Bulk Data Ingestion")
print("=" * 60)

with Client(
username=ALTERTABLE_USERNAME,
password=ALTERTABLE_PASSWORD,
**CONNECTION_SETTINGS,
) as client:
# Define schema for the data
schema = pa.schema(
[
("id", pa.int64()),
("name", pa.string()),
("created_at", pa.int64()),
]
)

# First batch
first_batch = pa.record_batch(
[
[1, 2, 3],
["Alice", "Bob", "Charlie"],
[1000, 2000, 3000],
],
schema=schema,
)

# Second batch with updated data (same IDs 1,2 and new ID 4)
second_batch = pa.record_batch(
[
[1, 2, 4],
["Alice Updated", "Bob Updated", "David"],
[1500, 2500, 4000],
],
schema=schema,
)

with client.ingest(
table_name="incremental_users",
schema=schema,
incremental_options=IngestIncrementalOptions(
primary_key=["id"],
cursor_field=["created_at"],
),
) as writer:
writer.write(first_batch)

# Upsert with second batch
with client.ingest(
table_name="incremental_users",
schema=schema,
incremental_options=IngestIncrementalOptions(
primary_key=["id"],
cursor_field=["created_at"],
),
) as writer:
writer.write(second_batch)

# Verify - should have 4 rows (3 from first batch, 2 updated, 1 new)
reader = client.query("SELECT * FROM incremental_users ORDER BY id")
result = reader.read_pandas()
print(f"\nIncremental ingestion results ({len(result)} rows):")
print(result)

# Cleanup
client.execute("DROP TABLE IF EXISTS bulk_users")
client.execute("DROP TABLE IF EXISTS incremental_users")

print()


def example_metadata():
"""Query database metadata."""
print("=" * 60)
Expand Down Expand Up @@ -171,5 +248,6 @@ def example_metadata():
example_updates()
example_basic_query()
example_transactions()
example_bulk_ingest()
example_prepared_statement()
example_metadata()
136 changes: 136 additions & 0 deletions src/altertable_flightsql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
This module provides a high-level Python client for Altertable.
"""

import json
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional, Union

import pyarrow as pa
Expand All @@ -27,6 +30,30 @@ def _unpack_command(bytes, packed):
any_msg.Unpack(packed)


class IngestTableMode(Enum):
"""Mode for ingesting data into a table."""

CREATE = "CREATE"
"""Create the table if it does not exist, fail if it does"""
APPEND = "APPEND"
"""Append to the table if it exists, fail if it does not"""
CREATE_APPEND = "CREATE_APPEND"
"""Create the table if it does not exist, append to it if it does"""
REPLACE = "REPLACE"
"""Create the table if it does not exist, recreate it if it does"""


@dataclass(frozen=True)
class IngestIncrementalOptions:
"""Options for incremental ingestion."""

primary_key: Sequence[str]
"""Primary key for the table."""

cursor_field: Sequence[str]
"""Cursor field for the table."""


class BearerAuthMiddleware(flight.ClientMiddleware):
"""Client middleware that adds Bearer token authentication to all requests."""

Expand Down Expand Up @@ -239,6 +266,115 @@ def execute(

return result.record_count

def ingest(
self,
*,
table_name: str,
schema: pa.Schema,
schema_name: str = "",
catalog_name: str = "",
mode: IngestTableMode = IngestTableMode.CREATE_APPEND,
incremental_options: Optional[IngestIncrementalOptions] = None,
transaction: Optional["Transaction"] = None,
) -> flight.FlightStreamWriter:
"""
Bulk ingest data into a table using Apache Arrow Flight.

This method provides high-performance bulk data loading by streaming
Arrow record batches directly to the server. The writer can be used as
a context manager for automatic resource cleanup.

Args:
table_name: Name of the table to ingest data into.
schema: PyArrow schema defining the table structure.
schema_name: Optional schema name. If not provided, uses the client's
default schema.
catalog_name: Optional catalog name. If not provided, uses the client's
default catalog.
mode: Table creation/append mode. Options:
- CREATE: Create table, fail if it exists
- APPEND: Append to existing table, fail if it doesn't exist
- CREATE_APPEND: Create if not exists, append if exists (default)
- REPLACE: Drop and recreate table if it exists
incremental_options: Options for incremental ingestion, including:
- primary_key: Columns to use as primary key
- cursor_field: Columns used to determine which row to keep in case of conflict on primary key
transaction: Optional transaction to execute ingestion within.

Returns:
FlightStreamWriter for writing record batches to the table.
The writer should be closed after all data is written, or used
as a context manager.

Example:
>>> # Basic ingestion
>>> schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
>>> with client.ingest(table_name="users", schema=schema) as writer:
... batch = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema)
... writer.write(batch)

>>> # Incremental ingestion with primary key
>>> from altertable_flightsql.client import IngestIncrementalOptions
>>> opts = IngestIncrementalOptions(
... primary_key=["id"],
... cursor_field=["updated_at"]
... )
>>> with client.ingest(
... table_name="users",
... schema=schema,
... incremental_options=opts
... ) as writer:
... writer.write(batch)
"""
cmd = sql_pb2.CommandStatementIngest(
table=table_name,
table_definition_options=self._ingest_mode_to_table_definition_options(mode),
)

if catalog_name:
cmd.catalog = catalog_name

if schema_name:
cmd.schema = schema_name

if txn_id := self._get_transaction_id(transaction):
cmd.transaction_id = txn_id

if incremental_options and incremental_options.primary_key:
cmd.options["primary_key"] = json.dumps(incremental_options.primary_key)

if incremental_options and incremental_options.cursor_field:
cmd.options["cursor_field"] = json.dumps(incremental_options.cursor_field)

descriptor = flight.FlightDescriptor.for_command(_pack_command(cmd))
writer, _ = self._client.do_put(descriptor, schema)

return writer

def _ingest_mode_to_table_definition_options(
self, mode: IngestTableMode
) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions:
if mode == IngestTableMode.CREATE:
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL,
)
elif mode == IngestTableMode.APPEND:
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL,
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND,
)
elif mode == IngestTableMode.CREATE_APPEND:
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND,
)
elif mode == IngestTableMode.REPLACE:
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE,
)

def prepare(
self,
query: str,
Expand Down
Loading