From 5a7726d9b0f8d771bd35606f4c339c43ee74e6ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Ercolanelli?= Date: Tue, 25 Nov 2025 17:54:58 +0100 Subject: [PATCH 1/3] feat: implement `ingest` support --- src/altertable_flightsql/client.py | 83 +++++++++++++++ tests/test_ingest.py | 158 +++++++++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 tests/test_ingest.py diff --git a/src/altertable_flightsql/client.py b/src/altertable_flightsql/client.py index 6d58e32..e68c1d4 100644 --- a/src/altertable_flightsql/client.py +++ b/src/altertable_flightsql/client.py @@ -5,6 +5,9 @@ """ from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from enum import Enum +import json from typing import Any, Optional, Union import pyarrow as pa @@ -27,6 +30,28 @@ def _unpack_command(bytes, packed): any_msg.Unpack(packed) +class IngestTableMode(Enum): + """Mode for ingesting data into a table.""" + CREATE = "CREATE" + """Create the table if it does not exist, fail if it does""" + APPEND = "APPEND" + """Append to the table if it exists, fail if it does not""" + CREATE_APPEND = "CREATE_APPEND" + """Create the table if it does not exist, append to it if it does""" + REPLACE = "REPLACE" + """Create the table if it does not exist, recreate it if it does""" + + +@dataclass(frozen=True) +class IngestIncrementalOptions: + """Options for incremental ingestion.""" + primary_key: Sequence[str] + """Primary key for the table.""" + + cursor_field: Sequence[str] + """Cursor field for the table.""" + + class BearerAuthMiddleware(flight.ClientMiddleware): """Client middleware that adds Bearer token authentication to all requests.""" @@ -239,6 +264,64 @@ def execute( return result.record_count + def ingest( + self, + *, + table_name: str, + schema: pa.Schema, + schema_name: str = "", + catalog_name: str = "", + mode: IngestTableMode = IngestTableMode.CREATE_APPEND, + incremental_options: Optional[IngestIncrementalOptions] = None, + transaction: Optional["Transaction"] = None, + ) -> flight.FlightStreamWriter: + cmd = sql_pb2.CommandStatementIngest( + table=table_name, + table_definition_options=self._ingest_mode_to_table_definition_options(mode), + ) + + if catalog_name: + cmd.catalog = catalog_name + + if schema_name: + cmd.schema = schema_name + + if txn_id := self._get_transaction_id(transaction): + cmd.transaction_id = txn_id + + if incremental_options and incremental_options.primary_key: + cmd.options["primary_key"] = json.dumps(incremental_options.primary_key) + + if incremental_options and incremental_options.cursor_field: + cmd.options["cursor_field"] = json.dumps(incremental_options.cursor_field) + + descriptor = flight.FlightDescriptor.for_command(_pack_command(cmd)) + writer, _ = self._client.do_put(descriptor, schema) + + return writer + + def _ingest_mode_to_table_definition_options(self, mode: IngestTableMode) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions: + if mode == IngestTableMode.CREATE: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL + ) + elif mode == IngestTableMode.APPEND: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND + ) + elif mode == IngestTableMode.CREATE_APPEND: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND + ) + elif mode == IngestTableMode.REPLACE: + return sql_pb2.CommandStatementIngest.TableDefinitionOptions( + if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE + ) + def prepare( self, query: str, diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..6d9dc86 --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,158 @@ +""" +Integration tests for data ingestion. + +Tests the ingest method for bulk data loading. +""" + +import pyarrow as pa +import pytest + +from altertable_flightsql import Client +from altertable_flightsql.client import IngestIncrementalOptions +from tests.conftest import SchemaInfo + + +class TestBasicIngest: + """Test basic ingest functionality.""" + + def test_ingest_simple_table(self, altertable_client: Client, test_schema: SchemaInfo): + """Test ingesting data into a new table.""" + import uuid + + table_name = f"test_ingest_{uuid.uuid4().hex[:8]}" + fully_qualified_table = f"{test_schema.full_name}.{table_name}" + + # Define schema + schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ("value", pa.float64()), + ]) + + # Create test data + data = pa.record_batch([ + [1, 2, 3, 4, 5], + ["Alice", "Bob", "Charlie", "David", "Eve"], + [100.5, 200.0, 300.75, 400.25, 500.5], + ], schema=schema) + + try: + # Ingest data + with altertable_client.ingest( + table_name=table_name, + schema=schema, + schema_name=test_schema.schema, + catalog_name=test_schema.catalog, + ) as writer: + writer.write(data) + + reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") + result = reader.read_all() + + assert result.num_rows == 5 + result_df = result.to_pandas() + assert list(result_df["id"]) == [1, 2, 3, 4, 5] + assert list(result_df["name"]) == ["Alice", "Bob", "Charlie", "David", "Eve"] + + finally: + # Cleanup + try: + altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") + except Exception as e: + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") + + def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: SchemaInfo): + """Test ingesting multiple batches of data.""" + import uuid + + table_name = f"test_ingest_{uuid.uuid4().hex[:8]}" + fully_qualified_table = f"{test_schema.full_name}.{table_name}" + + # Define schema + schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + + try: + # Ingest data + with altertable_client.ingest( + table_name=table_name, + schema=schema, + schema_name=test_schema.schema, + catalog_name=test_schema.catalog, + ) as writer: + # Write multiple batches + batch1 = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema) + batch2 = pa.record_batch([[3, 4], ["Charlie", "David"]], schema=schema) + batch3 = pa.record_batch([[5], ["Eve"]], schema=schema) + + writer.write(batch1) + writer.write(batch2) + writer.write(batch3) + + reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") + result = reader.read_all() + + assert result.num_rows == 5 + result_df = result.to_pandas() + assert list(result_df["id"]) == [1, 2, 3, 4, 5] + assert list(result_df["name"]) == ["Alice", "Bob", "Charlie", "David", "Eve"] + + finally: + # Cleanup + try: + altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") + except Exception as e: + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") + +class TestIngestWithPrimaryKey: + """Test ingest with primary key specification.""" + + def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: SchemaInfo): + """Test ingesting data with primary key constraint.""" + import uuid + + table_name = f"test_ingest_{uuid.uuid4().hex[:8]}" + fully_qualified_table = f"{test_schema.full_name}.{table_name}" + + # Define schema + schema = pa.schema([ + ("id", pa.int64()), + ("email", pa.string()), + ("name", pa.string()), + ("created_at", pa.int64()), + ]) + + try: + # Ingest data with primary key + with altertable_client.ingest( + table_name=table_name, + schema=schema, + schema_name=test_schema.schema, + catalog_name=test_schema.catalog, + incremental_options=IngestIncrementalOptions(primary_key=["id"], cursor_field=["created_at"]), + ) as writer: + writer.write(pa.record_batch([ + [1, 2, 3, 1], + ["alice@example.com", "bob@example.com", "charlie@example.com", "alice+1@example.com"], + ["Alice", "Bob", "Charlie", "Alice"], + [1, 2, 3, 4], + ], schema=schema)) + + # Verify data was ingested + reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") + result = reader.read_all() + + assert result.num_rows == 3 + result_df = result.to_pandas() + assert list(result_df["id"]) == [1, 2, 3] + assert list(result_df["email"]) == ["alice+1@example.com", "bob@example.com", "charlie@example.com"] + assert list(result_df["name"]) == ["Alice", "Bob", "Charlie"] + + finally: + # Cleanup + try: + altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") + except Exception as e: + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") \ No newline at end of file From de5f65e9b714e4fcfce5670947a482afbe9f5869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Ercolanelli?= Date: Tue, 25 Nov 2025 18:02:32 +0100 Subject: [PATCH 2/3] docs: add example and docstring to ingest --- examples/client_usage.py | 70 ++++++++++++++++++++++++++++++ src/altertable_flightsql/client.py | 49 +++++++++++++++++++++ 2 files changed, 119 insertions(+) diff --git a/examples/client_usage.py b/examples/client_usage.py index 38b82a4..ebff226 100644 --- a/examples/client_usage.py +++ b/examples/client_usage.py @@ -8,7 +8,10 @@ import os +import pyarrow as pa + from altertable_flightsql import Client +from altertable_flightsql.client import IngestIncrementalOptions ALTERTABLE_HOST = os.getenv("ALTERTABLE_HOST", "flight.altertable.ai") ALTERTABLE_PORT = int(os.getenv("ALTERTABLE_PORT", "443")) @@ -133,6 +136,72 @@ def example_transactions(): print() +def example_bulk_ingest(): + """Bulk ingest data using Arrow Flight.""" + print("=" * 60) + print("Example: Bulk Data Ingestion") + print("=" * 60) + + with Client( + username=ALTERTABLE_USERNAME, + password=ALTERTABLE_PASSWORD, + **CONNECTION_SETTINGS, + ) as client: + # Define schema for the data + schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ("created_at", pa.int64()), + ]) + + # First batch + first_batch = pa.record_batch([ + [1, 2, 3], + ["Alice", "Bob", "Charlie"], + [1000, 2000, 3000], + ], schema=schema) + + # Second batch with updated data (same IDs 1,2 and new ID 4) + second_batch = pa.record_batch([ + [1, 2, 4], + ["Alice Updated", "Bob Updated", "David"], + [1500, 2500, 4000], + ], schema=schema) + + with client.ingest( + table_name="incremental_users", + schema=schema, + incremental_options=IngestIncrementalOptions( + primary_key=["id"], + cursor_field=["created_at"], + ), + ) as writer: + writer.write(first_batch) + + # Upsert with second batch + with client.ingest( + table_name="incremental_users", + schema=schema, + incremental_options=IngestIncrementalOptions( + primary_key=["id"], + cursor_field=["created_at"], + ), + ) as writer: + writer.write(second_batch) + + # Verify - should have 4 rows (3 from first batch, 2 updated, 1 new) + reader = client.query("SELECT * FROM incremental_users ORDER BY id") + result = reader.read_pandas() + print(f"\nIncremental ingestion results ({len(result)} rows):") + print(result) + + # Cleanup + client.execute("DROP TABLE IF EXISTS bulk_users") + client.execute("DROP TABLE IF EXISTS incremental_users") + + print() + + def example_metadata(): """Query database metadata.""" print("=" * 60) @@ -171,5 +240,6 @@ def example_metadata(): example_updates() example_basic_query() example_transactions() + example_bulk_ingest() example_prepared_statement() example_metadata() diff --git a/src/altertable_flightsql/client.py b/src/altertable_flightsql/client.py index e68c1d4..70474d4 100644 --- a/src/altertable_flightsql/client.py +++ b/src/altertable_flightsql/client.py @@ -275,6 +275,55 @@ def ingest( incremental_options: Optional[IngestIncrementalOptions] = None, transaction: Optional["Transaction"] = None, ) -> flight.FlightStreamWriter: + """ + Bulk ingest data into a table using Apache Arrow Flight. + + This method provides high-performance bulk data loading by streaming + Arrow record batches directly to the server. The writer can be used as + a context manager for automatic resource cleanup. + + Args: + table_name: Name of the table to ingest data into. + schema: PyArrow schema defining the table structure. + schema_name: Optional schema name. If not provided, uses the client's + default schema. + catalog_name: Optional catalog name. If not provided, uses the client's + default catalog. + mode: Table creation/append mode. Options: + - CREATE: Create table, fail if it exists + - APPEND: Append to existing table, fail if it doesn't exist + - CREATE_APPEND: Create if not exists, append if exists (default) + - REPLACE: Drop and recreate table if it exists + incremental_options: Options for incremental ingestion, including: + - primary_key: Columns to use as primary key + - cursor_field: Columns used to determine which row to keep in case of conflict on primary key + transaction: Optional transaction to execute ingestion within. + + Returns: + FlightStreamWriter for writing record batches to the table. + The writer should be closed after all data is written, or used + as a context manager. + + Example: + >>> # Basic ingestion + >>> schema = pa.schema([("id", pa.int64()), ("name", pa.string())]) + >>> with client.ingest(table_name="users", schema=schema) as writer: + ... batch = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema) + ... writer.write(batch) + + >>> # Incremental ingestion with primary key + >>> from altertable_flightsql.client import IngestIncrementalOptions + >>> opts = IngestIncrementalOptions( + ... primary_key=["id"], + ... cursor_field=["updated_at"] + ... ) + >>> with client.ingest( + ... table_name="users", + ... schema=schema, + ... incremental_options=opts + ... ) as writer: + ... writer.write(batch) + """ cmd = sql_pb2.CommandStatementIngest( table=table_name, table_definition_options=self._ingest_mode_to_table_definition_options(mode), From cf91a946d31ff99fcbc54f901aeaa8456af96432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Ercolanelli?= Date: Tue, 25 Nov 2025 18:05:35 +0100 Subject: [PATCH 3/3] style: make linters happy --- examples/client_usage.py | 38 +++++++------ src/altertable_flightsql/client.py | 16 +++--- tests/test_ingest.py | 85 +++++++++++++++++++----------- 3 files changed, 88 insertions(+), 51 deletions(-) diff --git a/examples/client_usage.py b/examples/client_usage.py index ebff226..9e29046 100644 --- a/examples/client_usage.py +++ b/examples/client_usage.py @@ -148,25 +148,33 @@ def example_bulk_ingest(): **CONNECTION_SETTINGS, ) as client: # Define schema for the data - schema = pa.schema([ - ("id", pa.int64()), - ("name", pa.string()), - ("created_at", pa.int64()), - ]) + schema = pa.schema( + [ + ("id", pa.int64()), + ("name", pa.string()), + ("created_at", pa.int64()), + ] + ) # First batch - first_batch = pa.record_batch([ - [1, 2, 3], - ["Alice", "Bob", "Charlie"], - [1000, 2000, 3000], - ], schema=schema) + first_batch = pa.record_batch( + [ + [1, 2, 3], + ["Alice", "Bob", "Charlie"], + [1000, 2000, 3000], + ], + schema=schema, + ) # Second batch with updated data (same IDs 1,2 and new ID 4) - second_batch = pa.record_batch([ - [1, 2, 4], - ["Alice Updated", "Bob Updated", "David"], - [1500, 2500, 4000], - ], schema=schema) + second_batch = pa.record_batch( + [ + [1, 2, 4], + ["Alice Updated", "Bob Updated", "David"], + [1500, 2500, 4000], + ], + schema=schema, + ) with client.ingest( table_name="incremental_users", diff --git a/src/altertable_flightsql/client.py b/src/altertable_flightsql/client.py index 70474d4..40dc617 100644 --- a/src/altertable_flightsql/client.py +++ b/src/altertable_flightsql/client.py @@ -4,10 +4,10 @@ This module provides a high-level Python client for Altertable. """ +import json from collections.abc import Mapping, Sequence from dataclasses import dataclass from enum import Enum -import json from typing import Any, Optional, Union import pyarrow as pa @@ -32,6 +32,7 @@ def _unpack_command(bytes, packed): class IngestTableMode(Enum): """Mode for ingesting data into a table.""" + CREATE = "CREATE" """Create the table if it does not exist, fail if it does""" APPEND = "APPEND" @@ -45,6 +46,7 @@ class IngestTableMode(Enum): @dataclass(frozen=True) class IngestIncrementalOptions: """Options for incremental ingestion.""" + primary_key: Sequence[str] """Primary key for the table.""" @@ -349,26 +351,28 @@ def ingest( return writer - def _ingest_mode_to_table_definition_options(self, mode: IngestTableMode) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions: + def _ingest_mode_to_table_definition_options( + self, mode: IngestTableMode + ) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions: if mode == IngestTableMode.CREATE: return sql_pb2.CommandStatementIngest.TableDefinitionOptions( if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, - if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL, ) elif mode == IngestTableMode.APPEND: return sql_pb2.CommandStatementIngest.TableDefinitionOptions( if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL, - if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND, ) elif mode == IngestTableMode.CREATE_APPEND: return sql_pb2.CommandStatementIngest.TableDefinitionOptions( if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, - if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND, ) elif mode == IngestTableMode.REPLACE: return sql_pb2.CommandStatementIngest.TableDefinitionOptions( if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE, - if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE + if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE, ) def prepare( diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 6d9dc86..523bb88 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -5,7 +5,6 @@ """ import pyarrow as pa -import pytest from altertable_flightsql import Client from altertable_flightsql.client import IngestIncrementalOptions @@ -23,18 +22,23 @@ def test_ingest_simple_table(self, altertable_client: Client, test_schema: Schem fully_qualified_table = f"{test_schema.full_name}.{table_name}" # Define schema - schema = pa.schema([ - ("id", pa.int64()), - ("name", pa.string()), - ("value", pa.float64()), - ]) + schema = pa.schema( + [ + ("id", pa.int64()), + ("name", pa.string()), + ("value", pa.float64()), + ] + ) # Create test data - data = pa.record_batch([ - [1, 2, 3, 4, 5], - ["Alice", "Bob", "Charlie", "David", "Eve"], - [100.5, 200.0, 300.75, 400.25, 500.5], - ], schema=schema) + data = pa.record_batch( + [ + [1, 2, 3, 4, 5], + ["Alice", "Bob", "Charlie", "David", "Eve"], + [100.5, 200.0, 300.75, 400.25, 500.5], + ], + schema=schema, + ) try: # Ingest data @@ -69,10 +73,12 @@ def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: S fully_qualified_table = f"{test_schema.full_name}.{table_name}" # Define schema - schema = pa.schema([ - ("id", pa.int64()), - ("name", pa.string()), - ]) + schema = pa.schema( + [ + ("id", pa.int64()), + ("name", pa.string()), + ] + ) try: # Ingest data @@ -106,6 +112,7 @@ def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: S except Exception as e: print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") + class TestIngestWithPrimaryKey: """Test ingest with primary key specification.""" @@ -117,12 +124,14 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S fully_qualified_table = f"{test_schema.full_name}.{table_name}" # Define schema - schema = pa.schema([ - ("id", pa.int64()), - ("email", pa.string()), - ("name", pa.string()), - ("created_at", pa.int64()), - ]) + schema = pa.schema( + [ + ("id", pa.int64()), + ("email", pa.string()), + ("name", pa.string()), + ("created_at", pa.int64()), + ] + ) try: # Ingest data with primary key @@ -131,14 +140,26 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S schema=schema, schema_name=test_schema.schema, catalog_name=test_schema.catalog, - incremental_options=IngestIncrementalOptions(primary_key=["id"], cursor_field=["created_at"]), + incremental_options=IngestIncrementalOptions( + primary_key=["id"], cursor_field=["created_at"] + ), ) as writer: - writer.write(pa.record_batch([ - [1, 2, 3, 1], - ["alice@example.com", "bob@example.com", "charlie@example.com", "alice+1@example.com"], - ["Alice", "Bob", "Charlie", "Alice"], - [1, 2, 3, 4], - ], schema=schema)) + writer.write( + pa.record_batch( + [ + [1, 2, 3, 1], + [ + "alice@example.com", + "bob@example.com", + "charlie@example.com", + "alice+1@example.com", + ], + ["Alice", "Bob", "Charlie", "Alice"], + [1, 2, 3, 4], + ], + schema=schema, + ) + ) # Verify data was ingested reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id") @@ -147,7 +168,11 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S assert result.num_rows == 3 result_df = result.to_pandas() assert list(result_df["id"]) == [1, 2, 3] - assert list(result_df["email"]) == ["alice+1@example.com", "bob@example.com", "charlie@example.com"] + assert list(result_df["email"]) == [ + "alice+1@example.com", + "bob@example.com", + "charlie@example.com", + ] assert list(result_df["name"]) == ["Alice", "Bob", "Charlie"] finally: @@ -155,4 +180,4 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S try: altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}") except Exception as e: - print(f"Warning: Failed to drop table {fully_qualified_table}: {e}") \ No newline at end of file + print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")