diff --git a/examples/client_usage.py b/examples/client_usage.py index 38b82a4..9e29046 100644 --- a/examples/client_usage.py +++ b/examples/client_usage.py @@ -8,7 +8,10 @@ import os +import pyarrow as pa + from altertable_flightsql import Client +from altertable_flightsql.client import IngestIncrementalOptions ALTERTABLE_HOST = os.getenv("ALTERTABLE_HOST", "flight.altertable.ai") ALTERTABLE_PORT = int(os.getenv("ALTERTABLE_PORT", "443")) @@ -133,6 +136,80 @@ def example_transactions(): print() +def example_bulk_ingest(): + """Bulk ingest data using Arrow Flight.""" + print("=" * 60) + print("Example: Bulk Data Ingestion") + print("=" * 60) + + with Client( + username=ALTERTABLE_USERNAME, + password=ALTERTABLE_PASSWORD, + **CONNECTION_SETTINGS, + ) as client: + # Define schema for the data + schema = pa.schema( + [ + ("id", pa.int64()), + ("name", pa.string()), + ("created_at", pa.int64()), + ] + ) + + # First batch + first_batch = pa.record_batch( + [ + [1, 2, 3], + ["Alice", "Bob", "Charlie"], + [1000, 2000, 3000], + ], + schema=schema, + ) + + # Second batch with updated data (same IDs 1,2 and new ID 4) + second_batch = pa.record_batch( + [ + [1, 2, 4], + ["Alice Updated", "Bob Updated", "David"], + [1500, 2500, 4000], + ], + schema=schema, + ) + + with client.ingest( + table_name="incremental_users", + schema=schema, + incremental_options=IngestIncrementalOptions( + primary_key=["id"], + cursor_field=["created_at"], + ), + ) as writer: + writer.write(first_batch) + + # Upsert with second batch + with client.ingest( + table_name="incremental_users", + schema=schema, + incremental_options=IngestIncrementalOptions( + primary_key=["id"], + cursor_field=["created_at"], + ), + ) as writer: + writer.write(second_batch) + + # Verify - should have 4 rows (3 from first batch, 2 updated, 1 new) + reader = client.query("SELECT * FROM incremental_users ORDER BY id") + result = reader.read_pandas() + print(f"\nIncremental ingestion results ({len(result)} rows):") + print(result) + + # Cleanup + client.execute("DROP TABLE IF EXISTS bulk_users") + client.execute("DROP TABLE IF EXISTS incremental_users") + + print() + + def example_metadata(): """Query database metadata.""" print("=" * 60) @@ -171,5 +248,6 @@ def example_metadata(): example_updates() example_basic_query() example_transactions() + example_bulk_ingest() example_prepared_statement() example_metadata() diff --git a/src/altertable_flightsql/client.py b/src/altertable_flightsql/client.py index 6d58e32..40dc617 100644 --- a/src/altertable_flightsql/client.py +++ b/src/altertable_flightsql/client.py @@ -4,7 +4,10 @@ This module provides a high-level Python client for Altertable. """ +import json from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from enum import Enum from typing import Any, Optional, Union import pyarrow as pa @@ -27,6 +30,30 @@ def _unpack_command(bytes, packed): any_msg.Unpack(packed) +class IngestTableMode(Enum): + """Mode for ingesting data into a table.""" + + CREATE = "CREATE" + """Create the table if it does not exist, fail if it does""" + APPEND = "APPEND" + """Append to the table if it exists, fail if it does not""" + CREATE_APPEND = "CREATE_APPEND" + """Create the table if it does not exist, append to it if it does""" + REPLACE = "REPLACE" + """Create the table if it does not exist, recreate it if it does""" + + +@dataclass(frozen=True) +class IngestIncrementalOptions: + """Options for incremental ingestion.""" + + primary_key: Sequence[str] + """Primary key for the table.""" + + cursor_field: Sequence[str] + """Cursor field for the table.""" + + class BearerAuthMiddleware(flight.ClientMiddleware): """Client middleware that adds Bearer token authentication to all requests.""" @@ -239,6 +266,115 @@ def execute( return result.record_count + def ingest( + self, + *, + table_name: str, + schema: pa.Schema, + schema_name: str = "", + catalog_name: str = "", + mode: IngestTableMode = IngestTableMode.CREATE_APPEND, + incremental_options: Optional[IngestIncrementalOptions] = None, + transaction: Optional["Transaction"] = None, + ) -> flight.FlightStreamWriter: + """ + Bulk ingest data into a table using Apache Arrow Flight. + + This method provides high-performance bulk data loading by streaming + Arrow record batches directly to the server. The writer can be used as + a context manager for automatic resource cleanup. + + Args: + table_name: Name of the table to ingest data into. + schema: PyArrow schema defining the table structure. + schema_name: Optional schema name. If not provided, uses the client's + default schema. + catalog_name: Optional catalog name. If not provided, uses the client's + default catalog. + mode: Table creation/append mode. Options: + - CREATE: Create table, fail if it exists + - APPEND: Append to existing table, fail if it doesn't exist + - CREATE_APPEND: Create if not exists, append if exists (default) + - REPLACE: Drop and recreate table if it exists + incremental_options: Options for incremental ingestion, including: + - primary_key: Columns to use as primary key + - cursor_field: Columns used to determine which row to keep in case of conflict on primary key + transaction: Optional transaction to execute ingestion within. + + Returns: + FlightStreamWriter for writing record batches to the table. + The writer should be closed after all data is written, or used + as a context manager. + + Example: + >>> # Basic ingestion + >>> schema = pa.schema([("id", pa.int64()), ("name", pa.string())]) + >>> with client.ingest(table_name="users", schema=schema) as writer: + ... batch = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema) + ... writer.write(batch) + + >>> # Incremental ingestion with primary key + >>> from altertable_flightsql.client import IngestIncrementalOptions + >>> opts = IngestIncrementalOptions( + ... primary_key=["id"], + ... cursor_field=["updated_at"] + ... ) + >>> with client.ingest( + ... table_name="users", + ... schema=schema, + ... incremental_options=opts + ... ) as writer: + ... writer.write(batch) + """ + cmd = sql_pb2.CommandStatementIngest( + table=table_name, + table_definition_options=self._ingest_mode_to_table_definition_options(mode), + ) + + if catalog_name: + cmd.catalog = catalog_name + + if schema_name: + cmd.schema = schema_name + + if txn_id := self._get_transaction_id(transaction): + cmd.transaction_id = txn_id + + if incremental_options and incremental_options.primary_key: + cmd.options["primary_key"] = json.dumps(incremental_options.primary_key) + + if incremental_options and incremental_options.cursor_field: + cmd.options["cursor_field"] = json.dumps(incremental_options.cursor_field) + + descriptor = flight.FlightDescriptor.for_command(_pack_command(cmd)) + writer, _ = self._client.do_put(descriptor, schema) + + return writer + + def _ingest_mode_to_table_definition_options( + self, mode: IngestTableMode + ) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions: + if mode == IngestTableMode.CREATE: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL, + ) + elif mode == IngestTableMode.APPEND: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND, + ) + elif mode == IngestTableMode.CREATE_APPEND: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND, + ) + elif mode == IngestTableMode.REPLACE: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE, + ) + def prepare( self, query: str, diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..523bb88 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,183 @@ +""" +Integration tests for data ingestion. + +Tests the ingest method for bulk data loading. +""" + +import pyarrow as pa + +from altertable_flightsql import Client +from altertable_flightsql.client import IngestIncrementalOptions +from tests.conftest import SchemaInfo + + +class TestBasicIngest: + """Test basic ingest functionality.""" + + def test_ingest_simple_table(self, altertable_client: Client, test_schema: SchemaInfo): + """Test ingesting data into a new table.""" + import uuid + + table_name = f"test_ingest_{uuid.uuid4().hex[:8]}" + fully_qualified_table = f"{test_schema.full_name}.{table_name}" + + # Define schema + schema = pa.schema( + [ + ("id", pa.int64()), + ("name", pa.string()), + ("value", pa.float64()), + ] + ) + + # Create test data + data = pa.record_batch( + [ + [1, 2, 3, 4, 5], + ["Alice", "Bob", "Charlie", "David", "Eve"], + [100.5, 200.0, 300.75, 400.25, 500.5], + ], + schema=schema, + ) + + try: + # Ingest data + with altertable_client.ingest( + table_name=table_name, + schema=schema, + schema_name=test_schema.schema, + catalog_name=test_schema.catalog, + ) as writer: + writer.write(data) + + reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") + result = reader.read_all() + + assert result.num_rows == 5 + result_df = result.to_pandas() + assert list(result_df["id"]) == [1, 2, 3, 4, 5] + assert list(result_df["name"]) == ["Alice", "Bob", "Charlie", "David", "Eve"] + + finally: + # Cleanup + try: + altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") + except Exception as e: + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") + + def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: SchemaInfo): + """Test ingesting multiple batches of data.""" + import uuid + + table_name = f"test_ingest_{uuid.uuid4().hex[:8]}" + fully_qualified_table = f"{test_schema.full_name}.{table_name}" + + # Define schema + schema = pa.schema( + [ + ("id", pa.int64()), + ("name", pa.string()), + ] + ) + + try: + # Ingest data + with altertable_client.ingest( + table_name=table_name, + schema=schema, + schema_name=test_schema.schema, + catalog_name=test_schema.catalog, + ) as writer: + # Write multiple batches + batch1 = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema) + batch2 = pa.record_batch([[3, 4], ["Charlie", "David"]], schema=schema) + batch3 = pa.record_batch([[5], ["Eve"]], schema=schema) + + writer.write(batch1) + writer.write(batch2) + writer.write(batch3) + + reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") + result = reader.read_all() + + assert result.num_rows == 5 + result_df = result.to_pandas() + assert list(result_df["id"]) == [1, 2, 3, 4, 5] + assert list(result_df["name"]) == ["Alice", "Bob", "Charlie", "David", "Eve"] + + finally: + # Cleanup + try: + altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") + except Exception as e: + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") + + +class TestIngestWithPrimaryKey: + """Test ingest with primary key specification.""" + + def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: SchemaInfo): + """Test ingesting data with primary key constraint.""" + import uuid + + table_name = f"test_ingest_{uuid.uuid4().hex[:8]}" + fully_qualified_table = f"{test_schema.full_name}.{table_name}" + + # Define schema + schema = pa.schema( + [ + ("id", pa.int64()), + ("email", pa.string()), + ("name", pa.string()), + ("created_at", pa.int64()), + ] + ) + + try: + # Ingest data with primary key + with altertable_client.ingest( + table_name=table_name, + schema=schema, + schema_name=test_schema.schema, + catalog_name=test_schema.catalog, + incremental_options=IngestIncrementalOptions( + primary_key=["id"], cursor_field=["created_at"] + ), + ) as writer: + writer.write( + pa.record_batch( + [ + [1, 2, 3, 1], + [ + "alice@example.com", + "bob@example.com", + "charlie@example.com", + "alice+1@example.com", + ], + ["Alice", "Bob", "Charlie", "Alice"], + [1, 2, 3, 4], + ], + schema=schema, + ) + ) + + # Verify data was ingested + reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") + result = reader.read_all() + + assert result.num_rows == 3 + result_df = result.to_pandas() + assert list(result_df["id"]) == [1, 2, 3] + assert list(result_df["email"]) == [ + "alice+1@example.com", + "bob@example.com", + "charlie@example.com", + ] + assert list(result_df["name"]) == ["Alice", "Bob", "Charlie"] + + finally: + # Cleanup + try: + altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") + except Exception as e: + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")