From 51aab394e5de35d8015a440a53e394e502d7336d Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 10:25:40 +0400 Subject: [PATCH 01/11] add partitioned ingest, postgres impl --- c/driver/postgresql/CMakeLists.txt | 2 + c/driver/postgresql/connection.h | 19 + c/driver/postgresql/ingest_partition.cc | 561 ++++++++++++++++++ c/driver/postgresql/ingest_partition.h | 61 ++ c/driver/postgresql/meson.build | 1 + .../postgresql/partitioned_ingest_test.cc | 294 +++++++++ c/driver/postgresql/postgresql.cc | 94 ++- c/include/arrow-adbc/adbc.h | 270 ++++++++- .../source/format/partitioned_bulk_ingest.rst | 326 ++++++++++ docs/source/index.rst | 1 + 10 files changed, 1625 insertions(+), 4 deletions(-) create mode 100644 c/driver/postgresql/ingest_partition.cc create mode 100644 c/driver/postgresql/ingest_partition.h create mode 100644 c/driver/postgresql/partitioned_ingest_test.cc create mode 100644 docs/source/format/partitioned_bulk_ingest.rst diff --git a/c/driver/postgresql/CMakeLists.txt b/c/driver/postgresql/CMakeLists.txt index 35f56d1d79..a36a397f1d 100644 --- a/c/driver/postgresql/CMakeLists.txt +++ b/c/driver/postgresql/CMakeLists.txt @@ -31,6 +31,7 @@ add_arrow_lib(adbc_driver_postgresql connection.cc error.cc database.cc + ingest_partition.cc postgresql.cc result_helper.cc result_reader.cc @@ -81,6 +82,7 @@ if(ADBC_BUILD_TESTS) EXTRA_LABELS driver-postgresql SOURCES + partitioned_ingest_test.cc postgres_type_test.cc postgresql_test.cc EXTRA_LINK_LIBS diff --git a/c/driver/postgresql/connection.h b/c/driver/postgresql/connection.h index 02e0c4f1bc..1a868c6426 100644 --- a/c/driver/postgresql/connection.h +++ b/c/driver/postgresql/connection.h @@ -66,6 +66,25 @@ class PostgresConnection { AdbcStatusCode Init(struct AdbcDatabase* database, struct AdbcError* error); AdbcStatusCode Release(struct AdbcError* error); AdbcStatusCode Rollback(struct AdbcError* error); + AdbcStatusCode BeginIngestPartitions(const char* target_catalog, + const char* target_db_schema, + const char* target_table, const char* mode, + struct ArrowSchema* schema, + struct AdbcIngestHandle* out_handle, + struct AdbcError* error); + AdbcStatusCode WriteIngestPartition(const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, + struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error); + AdbcStatusCode CommitIngestPartitions(const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, + const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error); + AdbcStatusCode AbortIngestPartitions(const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, + const size_t* receipt_lens, + struct AdbcError* error); + AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error); AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length, struct AdbcError* error); diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc new file mode 100644 index 0000000000..a7a8f48c28 --- /dev/null +++ b/c/driver/postgresql/ingest_partition.cc @@ -0,0 +1,561 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +#include "ingest_partition.h" + +#include +#include +#include + +#include + +#include "bind_stream.h" +#include "connection.h" +#include "driver/common/utils.h" +#include "error.h" +#include "postgres_type.h" +#include "result_helper.h" + +namespace adbcpq { + +namespace { + +void WriteU32(uint8_t** p, uint32_t v) { + std::memcpy(*p, &v, sizeof(v)); + *p += sizeof(v); +} + +void WriteI64(uint8_t** p, int64_t v) { + std::memcpy(*p, &v, sizeof(v)); + *p += sizeof(v); +} + +void WriteString(uint8_t** p, const std::string& s) { + WriteU32(p, static_cast(s.size())); + std::memcpy(*p, s.data(), s.size()); + *p += s.size(); +} + +bool ReadU32(const uint8_t** p, const uint8_t* end, uint32_t* out) { + if (end - *p < static_cast(sizeof(uint32_t))) return false; + std::memcpy(out, *p, sizeof(uint32_t)); + *p += sizeof(uint32_t); + return true; +} + +bool ReadI64(const uint8_t** p, const uint8_t* end, int64_t* out) { + if (end - *p < static_cast(sizeof(int64_t))) return false; + std::memcpy(out, *p, sizeof(int64_t)); + *p += sizeof(int64_t); + return true; +} + +bool ReadString(const uint8_t** p, const uint8_t* end, std::string* out) { + uint32_t n; + if (!ReadU32(p, end, &n)) return false; + if (end - *p < static_cast(n)) return false; + out->assign(reinterpret_cast(*p), n); + *p += n; + return true; +} + +std::string HexId(const std::array& id) { + static const char kHex[] = "0123456789abcdef"; + std::string s(32, '0'); + for (size_t i = 0; i < 16; i++) { + s[2 * i] = kHex[id[i] >> 4]; + s[2 * i + 1] = kHex[id[i] & 0x0F]; + } + return s; +} + +} // namespace + +void IngestHandle::GenerateId(std::array* out) { + std::random_device rd; + std::mt19937_64 gen(rd()); + uint64_t a = gen(); + uint64_t b = gen(); + std::memcpy(out->data(), &a, 8); + std::memcpy(out->data() + 8, &b, 8); +} + +std::string IngestHandle::StagingPrefix() const { + return "adbc_stg_" + HexId(ingest_id) + "_"; +} + +size_t IngestHandle::SerializedSize() const { + return kMagic.size() + ingest_id.size() + sizeof(uint32_t) * 3 + catalog.size() + + db_schema.size() + table.size(); +} + +void IngestHandle::Serialize(uint8_t* out) const { + uint8_t* p = out; + std::memcpy(p, kMagic.data(), kMagic.size()); + p += kMagic.size(); + std::memcpy(p, ingest_id.data(), ingest_id.size()); + p += ingest_id.size(); + WriteString(&p, catalog); + WriteString(&p, db_schema); + WriteString(&p, table); +} + +AdbcStatusCode IngestHandle::Parse(const uint8_t* bytes, size_t len, IngestHandle* out, + struct AdbcError* error) { + const uint8_t* p = bytes; + const uint8_t* end = bytes + len; + if (end - p < static_cast(kMagic.size() + 16)) { + InternalAdbcSetError(error, "[libpq] ingest handle truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (std::memcmp(p, kMagic.data(), kMagic.size()) != 0) { + InternalAdbcSetError(error, "[libpq] ingest handle bad magic"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + p += kMagic.size(); + std::memcpy(out->ingest_id.data(), p, 16); + p += 16; + if (!ReadString(&p, end, &out->catalog) || + !ReadString(&p, end, &out->db_schema) || + !ReadString(&p, end, &out->table)) { + InternalAdbcSetError(error, "[libpq] ingest handle truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + return ADBC_STATUS_OK; +} + +size_t IngestReceipt::SerializedSize() const { + return kMagic.size() + sizeof(uint32_t) * 3 + staging_schema.size() + + staging_table.size() + escaped_columns.size() + sizeof(int64_t); +} + +void IngestReceipt::Serialize(uint8_t* out) const { + uint8_t* p = out; + std::memcpy(p, kMagic.data(), kMagic.size()); + p += kMagic.size(); + WriteString(&p, staging_schema); + WriteString(&p, staging_table); + WriteString(&p, escaped_columns); + WriteI64(&p, row_count); +} + +AdbcStatusCode IngestReceipt::Parse(const uint8_t* bytes, size_t len, IngestReceipt* out, + struct AdbcError* error) { + const uint8_t* p = bytes; + const uint8_t* end = bytes + len; + if (end - p < static_cast(kMagic.size())) { + InternalAdbcSetError(error, "[libpq] ingest receipt truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (std::memcmp(p, kMagic.data(), kMagic.size()) != 0) { + InternalAdbcSetError(error, "[libpq] ingest receipt bad magic"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + p += kMagic.size(); + if (!ReadString(&p, end, &out->staging_schema) || + !ReadString(&p, end, &out->staging_table) || + !ReadString(&p, end, &out->escaped_columns) || + !ReadI64(&p, end, &out->row_count)) { + InternalAdbcSetError(error, "[libpq] ingest receipt truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + return ADBC_STATUS_OK; +} + +namespace { + +enum class IngestMode { kCreate, kAppend, kReplace, kCreateAppend }; + +AdbcStatusCode ParseMode(const char* mode, IngestMode* out, struct AdbcError* error) { + if (mode == nullptr) { + InternalAdbcSetError(error, "[libpq] ingest mode is required"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0) { + *out = IngestMode::kCreate; + } else if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0) { + *out = IngestMode::kAppend; + } else if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_REPLACE) == 0) { + *out = IngestMode::kReplace; + } else if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE_APPEND) == 0) { + *out = IngestMode::kCreateAppend; + } else { + InternalAdbcSetError(error, "[libpq] unknown ingest mode: %s", mode); + return ADBC_STATUS_INVALID_ARGUMENT; + } + return ADBC_STATUS_OK; +} + +std::string EscapeIdent(PGconn* conn, const std::string& s, struct AdbcError* error, + AdbcStatusCode* status) { + char* esc = PQescapeIdentifier(conn, s.data(), s.size()); + if (esc == nullptr) { + InternalAdbcSetError(error, "[libpq] failed to escape identifier %s: %s", s.c_str(), + PQerrorMessage(conn)); + *status = ADBC_STATUS_INTERNAL; + return {}; + } + std::string out = esc; + PQfreemem(esc); + return out; +} + +AdbcStatusCode ResolveCurrentSchema(PGconn* conn, std::string* out, + struct AdbcError* error) { + PqResultHelper r(conn, "SELECT CURRENT_SCHEMA()"); + Status st = r.Execute(); + if (!st.ok()) { + return st.ToAdbc(error); + } + auto it = r.begin(); + if (it == r.end()) { + InternalAdbcSetError(error, "[libpq] CURRENT_SCHEMA returned no rows"); + return ADBC_STATUS_INTERNAL; + } + *out = (*it)[0].data; + return ADBC_STATUS_OK; +} + +AdbcStatusCode ExecSimple(PGconn* conn, const std::string& sql, struct AdbcError* error) { + PGresult* result = PQexec(conn, sql.c_str()); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + AdbcStatusCode code = + SetError(error, result, "[libpq] %s\nQuery was: %s", PQerrorMessage(conn), + sql.c_str()); + PQclear(result); + return code; + } + PQclear(result); + return ADBC_STATUS_OK; +} + +// Build CREATE TABLE statement from an Arrow schema. `escaped_qualified_table` is +// the already-escaped, schema-qualified target (e.g. `"public"."t"`). +AdbcStatusCode BuildCreateTable(PGconn* conn, const PostgresTypeResolver& resolver, + const std::string& escaped_qualified_table, + const struct ArrowSchema& schema, std::string* sql_out, + struct AdbcError* error) { + std::string sql = "CREATE TABLE " + escaped_qualified_table + " ("; + for (int64_t i = 0; i < schema.n_children; i++) { + if (i > 0) sql += ", "; + AdbcStatusCode code = ADBC_STATUS_OK; + std::string col = EscapeIdent(conn, schema.children[i]->name, error, &code); + if (code != ADBC_STATUS_OK) return code; + sql += col; + sql += " "; + + PostgresType pg_type; + struct ArrowError na_error; + int rc = + PostgresType::FromSchema(resolver, schema.children[i], &pg_type, &na_error); + if (rc != NANOARROW_OK) { + InternalAdbcSetError(error, "[libpq] cannot map column %s: %s", + schema.children[i]->name, na_error.message); + return ADBC_STATUS_INTERNAL; + } + sql += pg_type.sql_type_name(); + } + sql += ")"; + *sql_out = std::move(sql); + return ADBC_STATUS_OK; +} + +} // namespace + +AdbcStatusCode PostgresConnection::BeginIngestPartitions( + const char* target_catalog, const char* target_db_schema, const char* target_table, + const char* mode, struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + if (target_catalog != nullptr && *target_catalog != '\0') { + InternalAdbcSetError(error, + "[libpq] target_catalog is not supported for partitioned ingest"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (target_table == nullptr || *target_table == '\0') { + InternalAdbcSetError(error, "[libpq] target_table is required"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + IngestMode parsed_mode; + AdbcStatusCode code = ParseMode(mode, &parsed_mode, error); + if (code != ADBC_STATUS_OK) return code; + + bool needs_create = parsed_mode != IngestMode::kAppend; + if (needs_create && (schema == nullptr || schema->release == nullptr)) { + InternalAdbcSetError( + error, "[libpq] schema is required for create/replace/create_append modes"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + IngestHandle handle; + IngestHandle::GenerateId(&handle.ingest_id); + handle.table = target_table; + if (target_db_schema != nullptr && *target_db_schema != '\0') { + handle.db_schema = target_db_schema; + } else { + code = ResolveCurrentSchema(conn_, &handle.db_schema, error); + if (code != ADBC_STATUS_OK) return code; + } + + // Build the escaped, schema-qualified target name. + std::string escaped_schema = EscapeIdent(conn_, handle.db_schema, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_table = EscapeIdent(conn_, handle.table, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string qualified = escaped_schema + "." + escaped_table; + + if (parsed_mode == IngestMode::kReplace) { + code = ExecSimple(conn_, "DROP TABLE IF EXISTS " + qualified, error); + if (code != ADBC_STATUS_OK) return code; + } + + if (needs_create) { + std::string create_sql; + code = BuildCreateTable(conn_, *type_resolver_, qualified, *schema, &create_sql, + error); + if (code != ADBC_STATUS_OK) return code; + if (parsed_mode == IngestMode::kCreateAppend) { + // Replace "CREATE TABLE " with "CREATE TABLE IF NOT EXISTS " + create_sql.insert(std::strlen("CREATE TABLE "), "IF NOT EXISTS "); + } + code = ExecSimple(conn_, create_sql, error); + if (code != ADBC_STATUS_OK) return code; + } + + auto* buf = new std::vector(handle.SerializedSize()); + handle.Serialize(buf->data()); + out_handle->length = buf->size(); + out_handle->bytes = buf->data(); + out_handle->private_data = buf; + out_handle->release = [](struct AdbcIngestHandle* self) { + delete reinterpret_cast*>(self->private_data); + self->private_data = nullptr; + self->bytes = nullptr; + self->length = 0; + self->release = nullptr; + }; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresConnection::WriteIngestPartition( + const uint8_t* handle_bytes, size_t handle_len, struct ArrowArrayStream* data, + struct AdbcIngestReceipt* out_receipt, struct AdbcError* error) { + IngestHandle handle; + AdbcStatusCode code = IngestHandle::Parse(handle_bytes, handle_len, &handle, error); + if (code != ADBC_STATUS_OK) return code; + + if (data == nullptr || data->release == nullptr) { + InternalAdbcSetError(error, "[libpq] data stream is required"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + // Generate a unique staging table name scoped to the handle. + std::array suffix_bytes; + { + std::random_device rd; + std::mt19937_64 gen(rd()); + uint64_t v = gen(); + std::memcpy(suffix_bytes.data(), &v, 8); + } + static const char kHex[] = "0123456789abcdef"; + std::string suffix(16, '0'); + for (size_t i = 0; i < 8; i++) { + suffix[2 * i] = kHex[suffix_bytes[i] >> 4]; + suffix[2 * i + 1] = kHex[suffix_bytes[i] & 0x0F]; + } + std::string staging_table = handle.StagingPrefix() + suffix; + + std::string escaped_schema = EscapeIdent(conn_, handle.db_schema, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_target = EscapeIdent(conn_, handle.table, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_staging = EscapeIdent(conn_, staging_table, error, &code); + if (code != ADBC_STATUS_OK) return code; + + std::string qualified_target = escaped_schema + "." + escaped_target; + std::string qualified_staging = escaped_schema + "." + escaped_staging; + + // Initialize the bind stream so we can pull a schema for the column list. + BindStream bind_stream; + bind_stream.SetBind(data); + + std::string escaped_columns; + Status begin_st = bind_stream.Begin([&]() -> Status { + AdbcStatusCode inner = ADBC_STATUS_OK; + for (int64_t i = 0; i < bind_stream.bind_schema->n_children; i++) { + if (i > 0) escaped_columns += ", "; + std::string col = EscapeIdent(conn_, bind_stream.bind_schema->children[i]->name, + error, &inner); + if (inner != ADBC_STATUS_OK) { + return Status::Internal("[libpq] failed to escape column name"); + } + escaped_columns += col; + } + return Status::Ok(); + }); + if (!begin_st.ok()) return begin_st.ToAdbc(error); + + // CREATE UNLOGGED TABLE staging (LIKE target). Constraints/defaults are + // intentionally not copied — staging holds raw rows; the commit INSERT applies + // target's defaults via the explicit column list. + code = ExecSimple( + conn_, "CREATE UNLOGGED TABLE " + qualified_staging + " (LIKE " + qualified_target + + ")", + error); + if (code != ADBC_STATUS_OK) return code; + + // Issue COPY ... FROM STDIN, then stream data via ExecuteCopy. + std::string copy_sql = "COPY " + qualified_staging + " (" + escaped_columns + + ") FROM STDIN WITH (FORMAT binary)"; + PGresult* result = PQexec(conn_, copy_sql.c_str()); + if (PQresultStatus(result) != PGRES_COPY_IN) { + AdbcStatusCode err = SetError(error, result, "[libpq] COPY failed: %s\nQuery: %s", + PQerrorMessage(conn_), copy_sql.c_str()); + PQclear(result); + ExecSimple(conn_, "DROP TABLE IF EXISTS " + qualified_staging, nullptr); + return err; + } + PQclear(result); + + int64_t rows_written = 0; + Status copy_st = bind_stream.ExecuteCopy(conn_, *type_resolver_, &rows_written); + if (!copy_st.ok()) { + ExecSimple(conn_, "DROP TABLE IF EXISTS " + qualified_staging, nullptr); + return copy_st.ToAdbc(error); + } + + IngestReceipt receipt; + receipt.staging_schema = handle.db_schema; + receipt.staging_table = staging_table; + receipt.escaped_columns = escaped_columns; + receipt.row_count = rows_written; + + // Write does irrecoverable work; unlike Begin we do not support two-phase + // sizing. Caller must provide a reasonable buffer (a few KB is plenty). + auto* buf = new std::vector(receipt.SerializedSize()); + receipt.Serialize(buf->data()); + out_receipt->length = buf->size(); + out_receipt->bytes = buf->data(); + out_receipt->private_data = buf; + out_receipt->release = [](struct AdbcIngestReceipt* self) { + delete reinterpret_cast*>(self->private_data); + self->private_data = nullptr; + self->bytes = nullptr; + self->length = 0; + self->release = nullptr; + }; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresConnection::CommitIngestPartitions( + const uint8_t* handle_bytes, size_t handle_len, size_t num_receipts, + const uint8_t** receipts, const size_t* receipt_lens, int64_t* rows_affected, + struct AdbcError* error) { + IngestHandle handle; + AdbcStatusCode code = IngestHandle::Parse(handle_bytes, handle_len, &handle, error); + if (code != ADBC_STATUS_OK) return code; + + std::vector parsed(num_receipts); + for (size_t i = 0; i < num_receipts; i++) { + code = IngestReceipt::Parse(receipts[i], receipt_lens[i], &parsed[i], error); + if (code != ADBC_STATUS_OK) return code; + } + + std::string escaped_target_schema = EscapeIdent(conn_, handle.db_schema, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_target_table = EscapeIdent(conn_, handle.table, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string qualified_target = escaped_target_schema + "." + escaped_target_table; + + code = ExecSimple(conn_, "BEGIN", error); + if (code != ADBC_STATUS_OK) return code; + + int64_t total_rows = 0; + for (const auto& r : parsed) { + std::string esc_sch = EscapeIdent(conn_, r.staging_schema, error, &code); + if (code != ADBC_STATUS_OK) { + ExecSimple(conn_, "ROLLBACK", nullptr); + return code; + } + std::string esc_tbl = EscapeIdent(conn_, r.staging_table, error, &code); + if (code != ADBC_STATUS_OK) { + ExecSimple(conn_, "ROLLBACK", nullptr); + return code; + } + std::string qualified_staging = esc_sch + "." + esc_tbl; + + std::string insert = "INSERT INTO " + qualified_target + " (" + r.escaped_columns + + ") SELECT " + r.escaped_columns + " FROM " + qualified_staging; + code = ExecSimple(conn_, insert, error); + if (code != ADBC_STATUS_OK) { + ExecSimple(conn_, "ROLLBACK", nullptr); + return code; + } + code = ExecSimple(conn_, "DROP TABLE " + qualified_staging, error); + if (code != ADBC_STATUS_OK) { + ExecSimple(conn_, "ROLLBACK", nullptr); + return code; + } + total_rows += r.row_count; + } + + code = ExecSimple(conn_, "COMMIT", error); + if (code != ADBC_STATUS_OK) { + ExecSimple(conn_, "ROLLBACK", nullptr); + return code; + } + + if (rows_affected != nullptr) *rows_affected = total_rows; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresConnection::AbortIngestPartitions( + const uint8_t* handle_bytes, size_t handle_len, size_t num_receipts, + const uint8_t** receipts, const size_t* receipt_lens, struct AdbcError* error) { + (void)num_receipts; + (void)receipts; + (void)receipt_lens; + // Receipts are a hint. The handle is the authority for cleanup scope: + // enumerate every staging table matching the handle's prefix and drop it. + IngestHandle handle; + AdbcStatusCode code = IngestHandle::Parse(handle_bytes, handle_len, &handle, error); + if (code != ADBC_STATUS_OK) return code; + + std::string prefix = handle.StagingPrefix(); + // pg LIKE pattern: escape '%' and '_' in the prefix (not present in our prefix + // by construction, but defensive). + std::string like_pattern; + for (char c : prefix) { + if (c == '%' || c == '_' || c == '\\') like_pattern += '\\'; + like_pattern += c; + } + like_pattern += '%'; + + PqResultHelper q( + conn_, + "SELECT table_schema, table_name FROM information_schema.tables " + "WHERE table_schema = $1 AND table_name LIKE $2"); + Status st = q.Execute({handle.db_schema, like_pattern}); + if (!st.ok()) return st.ToAdbc(error); + + for (auto row : q) { + AdbcStatusCode inner = ADBC_STATUS_OK; + std::string esc_sch = EscapeIdent(conn_, row[0].data, error, &inner); + if (inner != ADBC_STATUS_OK) return inner; + std::string esc_tbl = EscapeIdent(conn_, row[1].data, error, &inner); + if (inner != ADBC_STATUS_OK) return inner; + // Best-effort: ignore individual drop failures so one orphan doesn't block others. + ExecSimple(conn_, "DROP TABLE IF EXISTS " + esc_sch + "." + esc_tbl, nullptr); + } + + return ADBC_STATUS_OK; +} + +} // namespace adbcpq diff --git a/c/driver/postgresql/ingest_partition.h b/c/driver/postgresql/ingest_partition.h new file mode 100644 index 0000000000..707fe69616 --- /dev/null +++ b/c/driver/postgresql/ingest_partition.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +#pragma once + +#include +#include +#include +#include + +#include + +namespace adbcpq { + +// Wire format for the partitioned-ingest handle. Opaque to callers; symmetric +// across coordinator and workers. +struct IngestHandle { + static constexpr std::array kMagic = {'P', 'I', 'H', '1'}; + + std::array ingest_id; + std::string catalog; + std::string db_schema; + std::string table; + + // staging table prefix shared by all writes scoped to this handle. + // Driver-internal — used by Abort to enumerate orphans. + std::string StagingPrefix() const; + + size_t SerializedSize() const; + void Serialize(uint8_t* out) const; + static AdbcStatusCode Parse(const uint8_t* bytes, size_t len, IngestHandle* out, + struct AdbcError* error); + + static void GenerateId(std::array* out); +}; + +// Wire format for a per-partition receipt. +struct IngestReceipt { + static constexpr std::array kMagic = {'P', 'I', 'R', '1'}; + + std::string staging_schema; // empty -> default schema + std::string staging_table; + // Already-escaped, comma-separated column list (e.g. `"a", "b"`). Used by + // Commit to construct INSERT INTO target (cols) SELECT cols FROM staging. + std::string escaped_columns; + int64_t row_count = 0; + + size_t SerializedSize() const; + void Serialize(uint8_t* out) const; + static AdbcStatusCode Parse(const uint8_t* bytes, size_t len, IngestReceipt* out, + struct AdbcError* error); +}; + +} // namespace adbcpq diff --git a/c/driver/postgresql/meson.build b/c/driver/postgresql/meson.build index de9a119afa..aa4a7c8425 100644 --- a/c/driver/postgresql/meson.build +++ b/c/driver/postgresql/meson.build @@ -23,6 +23,7 @@ adbc_postgres_driver_lib = library( 'connection.cc', 'error.cc', 'database.cc', + 'ingest_partition.cc', 'postgresql.cc', 'result_helper.cc', 'result_reader.cc', diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc new file mode 100644 index 0000000000..058d8e9a15 --- /dev/null +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "validation/adbc_validation_util.h" + +namespace { + +const char* RequireUri() { + const char* uri = std::getenv("ADBC_POSTGRESQL_TEST_URI"); + if (!uri) { + ADD_FAILURE() << "ADBC_POSTGRESQL_TEST_URI must be set"; + } + return uri; +} + +struct ConnPair { + AdbcDatabase db{}; + AdbcConnection conn{}; +}; + +void OpenConn(ConnPair* p, AdbcError* error, const char* uri) { + ASSERT_EQ(AdbcDatabaseNew(&p->db, error), ADBC_STATUS_OK) << error->message; + ASSERT_EQ(AdbcDatabaseSetOption(&p->db, "uri", uri, error), ADBC_STATUS_OK) + << error->message; + ASSERT_EQ(AdbcDatabaseInit(&p->db, error), ADBC_STATUS_OK) << error->message; + ASSERT_EQ(AdbcConnectionNew(&p->conn, error), ADBC_STATUS_OK) << error->message; + ASSERT_EQ(AdbcConnectionInit(&p->conn, &p->db, error), ADBC_STATUS_OK) + << error->message; +} + +void CloseConn(ConnPair* p, AdbcError* error) { + AdbcConnectionRelease(&p->conn, error); + AdbcDatabaseRelease(&p->db, error); +} + +// Convenience: drop the test target table and any leftover staging tables. +void Cleanup(ConnPair* c, const std::string& table, AdbcError* error) { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&c->conn, &stmt, error), ADBC_STATUS_OK); + std::string sql = "DROP TABLE IF EXISTS " + table; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, sql.c_str(), error), ADBC_STATUS_OK); + AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, error); + AdbcStatementRelease(&stmt, error); +} + +// Read a count from the target table. +int64_t SelectCount(ConnPair* c, const std::string& table, AdbcError* error) { + AdbcStatement stmt{}; + EXPECT_EQ(AdbcStatementNew(&c->conn, &stmt, error), ADBC_STATUS_OK); + std::string sql = "SELECT COUNT(*) FROM " + table; + EXPECT_EQ(AdbcStatementSetSqlQuery(&stmt, sql.c_str(), error), ADBC_STATUS_OK); + + ArrowArrayStream stream{}; + int64_t rows_affected = 0; + EXPECT_EQ(AdbcStatementExecuteQuery(&stmt, &stream, &rows_affected, error), + ADBC_STATUS_OK) + << error->message; + + ArrowSchema schema{}; + ArrowArray batch{}; + stream.get_schema(&stream, &schema); + stream.get_next(&stream, &batch); + + int64_t count = 0; + if (batch.length > 0) { + nanoarrow::UniqueArrayView view; + ArrowArrayViewInitFromSchema(view.get(), &schema, nullptr); + ArrowArrayViewSetArray(view.get(), &batch, nullptr); + count = ArrowArrayViewGetIntUnsafe(view->children[0], 0); + } + + if (batch.release) batch.release(&batch); + if (schema.release) schema.release(&schema); + stream.release(&stream); + AdbcStatementRelease(&stmt, error); + return count; +} + +// Helper: build a struct schema { id int32, label utf8 }. +void MakeIngestSchema(ArrowSchema* out) { + ASSERT_EQ(adbc_validation::MakeSchema( + out, {{"id", NANOARROW_TYPE_INT32}, {"label", NANOARROW_TYPE_STRING}}), + 0); +} + +// Helper: build a one-batch stream of N rows starting at `start_id`. +void MakeBatchStream(ArrowArrayStream* stream, int32_t start_id, int32_t n) { + ArrowSchema schema{}; + MakeIngestSchema(&schema); + + ArrowArray batch{}; + std::vector> ids; + std::vector> labels; + ids.reserve(n); + labels.reserve(n); + for (int32_t i = 0; i < n; i++) { + ids.push_back(start_id + i); + labels.push_back("row-" + std::to_string(start_id + i)); + } + ArrowError na_error{}; + ASSERT_EQ(adbc_validation::MakeBatch(&schema, &batch, &na_error, ids, labels), 0); + + std::vector batches; + batches.push_back(batch); + adbc_validation::MakeStream(stream, &schema, std::move(batches)); +} + +} // namespace + +class PostgresPartitionedIngestTest : public ::testing::Test {}; + +TEST_F(PostgresPartitionedIngestTest, CreateThenWriteThenCommit) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair coordinator; + OpenConn(&coordinator, &error, uri); + Cleanup(&coordinator, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &coordinator.conn, /*catalog=*/nullptr, /*db_schema=*/nullptr, + table.c_str(), ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, + &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // Spawn N workers, each on its own connection, each writing a partition. + constexpr int kNumWorkers = 4; + constexpr int kRowsPerWorker = 1000; + std::vector> receipts(kNumWorkers); + std::vector workers; + std::mutex err_mu; + std::vector worker_errors; + + for (int w = 0; w < kNumWorkers; w++) { + workers.emplace_back([&, w]() { + AdbcError werr = ADBC_ERROR_INIT; + ConnPair wp; + OpenConn(&wp, &werr, uri); + + ArrowArrayStream stream{}; + MakeBatchStream(&stream, w * kRowsPerWorker, kRowsPerWorker); + + AdbcIngestReceipt rec{}; + AdbcStatusCode rc = AdbcConnectionWriteIngestPartition( + &wp.conn, handle.bytes, handle.length, &stream, &rec, &werr); + if (rc != ADBC_STATUS_OK) { + std::lock_guard g(err_mu); + worker_errors.push_back(std::string("write: ") + + (werr.message ? werr.message : "")); + } else { + // Copy receipt bytes out so we can release the driver-owned struct + // immediately. Mirrors the cross-process flow: caller serializes the + // bytes and ships them to the coordinator. + receipts[w].assign(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + } + CloseConn(&wp, &werr); + }); + } + for (auto& t : workers) t.join(); + ASSERT_TRUE(worker_errors.empty()) << worker_errors[0]; + + // Commit. + std::vector rec_ptrs(kNumWorkers); + std::vector rec_lens(kNumWorkers); + for (int i = 0; i < kNumWorkers; i++) { + rec_ptrs[i] = receipts[i].data(); + rec_lens[i] = receipts[i].size(); + } + int64_t rows_committed = 0; + ASSERT_EQ(AdbcConnectionCommitIngestPartitions(&coordinator.conn, handle.bytes, + handle.length, kNumWorkers, + rec_ptrs.data(), rec_lens.data(), + &rows_committed, &error), + ADBC_STATUS_OK) + << error.message; + EXPECT_EQ(rows_committed, kNumWorkers * kRowsPerWorker); + + // Verify target row count. + EXPECT_EQ(SelectCount(&coordinator, table, &error), kNumWorkers * kRowsPerWorker); + + handle.release(&handle); + Cleanup(&coordinator, table, &error); + CloseConn(&coordinator, &error); +} + +TEST_F(PostgresPartitionedIngestTest, AbortDropsAllStagingIncludingOrphans) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_abort_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair c; + OpenConn(&c, &error, uri); + Cleanup(&c, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &c.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // Write three partitions but only collect two receipts (simulate one lost). + std::vector> receipts; + for (int w = 0; w < 3; w++) { + ArrowArrayStream stream{}; + MakeBatchStream(&stream, w * 10, 10); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&c.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + if (w < 2) receipts.emplace_back(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + } + + // Abort with only the two known receipts. Driver must clean up the orphan too. + std::vector rec_ptrs; + std::vector rec_lens; + for (auto& r : receipts) { + rec_ptrs.push_back(r.data()); + rec_lens.push_back(r.size()); + } + ASSERT_EQ(AdbcConnectionAbortIngestPartitions(&c.conn, handle.bytes, handle.length, + rec_ptrs.size(), rec_ptrs.data(), + rec_lens.data(), &error), + ADBC_STATUS_OK) + << error.message; + + // Verify no staging tables left under the handle's prefix. + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&c.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery( + &stmt, + "SELECT COUNT(*) FROM information_schema.tables " + "WHERE table_name LIKE 'adbc_stg_%'", + &error), + ADBC_STATUS_OK); + ArrowArrayStream stream{}; + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, &stream, nullptr, &error), ADBC_STATUS_OK); + ArrowSchema schema{}; + ArrowArray batch{}; + stream.get_schema(&stream, &schema); + stream.get_next(&stream, &batch); + nanoarrow::UniqueArrayView view; + ArrowArrayViewInitFromSchema(view.get(), &schema, nullptr); + ArrowArrayViewSetArray(view.get(), &batch, nullptr); + int64_t leftover = ArrowArrayViewGetIntUnsafe(view->children[0], 0); + EXPECT_EQ(leftover, 0); + + if (batch.release) batch.release(&batch); + if (schema.release) schema.release(&schema); + stream.release(&stream); + AdbcStatementRelease(&stmt, &error); + + handle.release(&handle); + Cleanup(&c, table, &error); + CloseConn(&c, &error); +} diff --git a/c/driver/postgresql/postgresql.cc b/c/driver/postgresql/postgresql.cc index 16645d0678..a88f47cc82 100644 --- a/c/driver/postgresql/postgresql.cc +++ b/c/driver/postgresql/postgresql.cc @@ -421,6 +421,50 @@ AdbcStatusCode PostgresConnectionRollback(struct AdbcConnection* connection, return (*ptr)->Rollback(error); } +AdbcStatusCode PostgresConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->BeginIngestPartitions(target_catalog, target_db_schema, target_table, + mode, schema, out_handle, error); +} + +AdbcStatusCode PostgresConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->WriteIngestPartition(handle, handle_len, data, out_receipt, error); +} + +AdbcStatusCode PostgresConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->CommitIngestPartitions(handle, handle_len, num_receipts, receipts, + receipt_lens, rows_affected, error); +} + +AdbcStatusCode PostgresConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->AbortIngestPartitions(handle, handle_len, num_receipts, receipts, + receipt_lens, error); +} + AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection, const char* key, const char* value, struct AdbcError* error) { @@ -562,6 +606,42 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, serialized_length, out, error); } +AdbcStatusCode AdbcConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + return PostgresConnectionBeginIngestPartitions(connection, target_catalog, + target_db_schema, target_table, mode, + schema, out_handle, error); +} + +AdbcStatusCode AdbcConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error) { + return PostgresConnectionWriteIngestPartition(connection, handle, handle_len, data, + out_receipt, error); +} + +AdbcStatusCode AdbcConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error) { + return PostgresConnectionCommitIngestPartitions(connection, handle, handle_len, + num_receipts, receipts, receipt_lens, + rows_affected, error); +} + +AdbcStatusCode AdbcConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error) { + return PostgresConnectionAbortIngestPartitions(connection, handle, handle_len, + num_receipts, receipts, receipt_lens, + error); +} + AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection, struct AdbcError* error) { return PostgresConnectionRelease(connection, error); @@ -888,15 +968,23 @@ extern "C" { ADBC_EXPORT AdbcStatusCode AdbcDriverPostgresqlInit(int version, void* raw_driver, struct AdbcError* error) { - if (version != ADBC_VERSION_1_0_0 && version != ADBC_VERSION_1_1_0) { + if (version != ADBC_VERSION_1_0_0 && version != ADBC_VERSION_1_1_0 && + version != ADBC_VERSION_1_2_0) { return ADBC_STATUS_NOT_IMPLEMENTED; } if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT; auto* driver = reinterpret_cast(raw_driver); - if (version >= ADBC_VERSION_1_1_0) { + if (version >= ADBC_VERSION_1_2_0) { + std::memset(driver, 0, ADBC_DRIVER_1_2_0_SIZE); + driver->ConnectionBeginIngestPartitions = PostgresConnectionBeginIngestPartitions; + driver->ConnectionWriteIngestPartition = PostgresConnectionWriteIngestPartition; + driver->ConnectionCommitIngestPartitions = PostgresConnectionCommitIngestPartitions; + driver->ConnectionAbortIngestPartitions = PostgresConnectionAbortIngestPartitions; + } else if (version >= ADBC_VERSION_1_1_0) { std::memset(driver, 0, ADBC_DRIVER_1_1_0_SIZE); - + } + if (version >= ADBC_VERSION_1_1_0) { driver->ErrorGetDetailCount = PostgresErrorGetDetailCount; driver->ErrorGetDetail = PostgresErrorGetDetail; driver->ErrorFromArrayStream = PostgresErrorFromArrayStream; diff --git a/c/include/arrow-adbc/adbc.h b/c/include/arrow-adbc/adbc.h index 57e665f84a..6363ba84ce 100644 --- a/c/include/arrow-adbc/adbc.h +++ b/c/include/arrow-adbc/adbc.h @@ -423,6 +423,14 @@ const struct AdbcError* AdbcErrorFromArrayStream(struct ArrowArrayStream* stream /// \since ADBC API revision 1.1.0 #define ADBC_VERSION_1_1_0 1001000 +/// \brief ADBC revision 1.2.0. +/// +/// When passed to an AdbcDriverInitFunc(), the driver parameter must +/// point to an AdbcDriver. +/// +/// \since ADBC API revision 1.2.0 +#define ADBC_VERSION_1_2_0 1002000 + /// \brief Canonical option value for enabling an option. /// /// For use as the value in SetOption calls. @@ -979,6 +987,9 @@ struct AdbcPartitions { /// driver and the driver manager. /// @{ +struct AdbcIngestHandle; +struct AdbcIngestReceipt; + /// \brief An instance of an initialized database driver. /// /// This provides a common interface for vendor-specific driver @@ -1135,6 +1146,30 @@ struct ADBC_EXPORT AdbcDriver { struct AdbcError*); /// @} + + /// \defgroup adbc-1.2.0 ADBC API Revision 1.2.0 + /// + /// Functions added in ADBC 1.2.0. For backwards compatibility, + /// these members must not be accessed unless the version passed to + /// the AdbcDriverInitFunc is greater than or equal to + /// ADBC_VERSION_1_2_0. + /// + /// @{ + + AdbcStatusCode (*ConnectionBeginIngestPartitions)( + struct AdbcConnection*, const char*, const char*, const char*, const char*, + struct ArrowSchema*, struct AdbcIngestHandle*, struct AdbcError*); + AdbcStatusCode (*ConnectionWriteIngestPartition)( + struct AdbcConnection*, const uint8_t*, size_t, struct ArrowArrayStream*, + struct AdbcIngestReceipt*, struct AdbcError*); + AdbcStatusCode (*ConnectionCommitIngestPartitions)( + struct AdbcConnection*, const uint8_t*, size_t, size_t, const uint8_t**, + const size_t*, int64_t*, struct AdbcError*); + AdbcStatusCode (*ConnectionAbortIngestPartitions)( + struct AdbcConnection*, const uint8_t*, size_t, size_t, const uint8_t**, + const size_t*, struct AdbcError*); + + /// @} }; /// \brief The size of the AdbcDriver structure in ADBC 1.0.0. @@ -1151,7 +1186,15 @@ struct ADBC_EXPORT AdbcDriver { /// ADBC_VERSION_1_1_0. /// /// \since ADBC API revision 1.1.0 -#define ADBC_DRIVER_1_1_0_SIZE (sizeof(struct AdbcDriver)) +#define ADBC_DRIVER_1_1_0_SIZE (offsetof(struct AdbcDriver, ConnectionBeginIngestPartitions)) + +/// \brief The size of the AdbcDriver structure in ADBC 1.2.0. +/// Drivers written for ADBC 1.2.0 and later should never touch more +/// than this portion of an AdbcDriver struct when given +/// ADBC_VERSION_1_2_0. +/// +/// \since ADBC API revision 1.2.0 +#define ADBC_DRIVER_1_2_0_SIZE (sizeof(struct AdbcDriver)) /// @} @@ -1946,6 +1989,231 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, /// @} +/// \defgroup adbc-connection-ingest-partition Partitioned Bulk Ingest +/// @{ + +/// \brief Driver-owned bytes returned by +/// AdbcConnectionBeginIngestPartitions. +/// +/// The bytes are opaque and serializable: the caller may copy +/// `bytes[0..length)` and ship that copy to workers (other processes +/// or hosts) which can pass it to AdbcConnectionWriteIngestPartition +/// directly. +/// +/// The struct itself is owned by the driver. Call `release` exactly +/// once to free it. Releasing the handle does NOT roll back the +/// ingest — call AdbcConnectionAbortIngestPartitions for that. +/// +/// \since ADBC API revision 1.2.0 +struct AdbcIngestHandle { + /// \brief The length of `bytes`. + size_t length; + + /// \brief The serialized handle bytes (driver-owned). + const uint8_t* bytes; + + /// \brief Private driver state. + void* private_data; + + /// \brief Release the handle's memory. Sets `release` to NULL. + void (*release)(struct AdbcIngestHandle* self); +}; + +/// \brief Driver-owned bytes returned by +/// AdbcConnectionWriteIngestPartition. +/// +/// Mirror of AdbcIngestHandle: opaque, serializable, single-use +/// `release`. Releasing a receipt does NOT discard the underlying +/// write; that happens at Commit (commit it) or Abort (drop it). +/// +/// \since ADBC API revision 1.2.0 +struct AdbcIngestReceipt { + /// \brief The length of `bytes`. + size_t length; + + /// \brief The serialized receipt bytes (driver-owned). + const uint8_t* bytes; + + /// \brief Private driver state. + void* private_data; + + /// \brief Release the receipt's memory. Sets `release` to NULL. + void (*release)(struct AdbcIngestReceipt* self); +}; +/// @} + +/// \addtogroup adbc-connection-ingest-partition +/// Some drivers can accept bulk writes from a distributed writer: a +/// coordinator configures an ingest, many workers write partitions in +/// parallel (possibly from different processes or hosts), and the +/// coordinator commits or aborts atomically. +/// +/// This mirrors the read-side partitioned execution model. The +/// coordinator calls AdbcConnectionBeginIngestPartitions to obtain an +/// opaque, serializable handle. The handle is shipped to workers by +/// the caller (e.g. a Spark driver sending it to executors). Workers +/// call AdbcConnectionWriteIngestPartition on their own connections — +/// the connection does not have to be the same one that created the +/// handle. Each write returns an opaque receipt. The coordinator +/// collects receipts and calls AdbcConnectionCommitIngestPartitions +/// (or AdbcConnectionAbortIngestPartitions on failure). +/// +/// Handles and receipts are driver-defined opaque byte strings. They +/// are safe to transmit between processes and to use concurrently +/// from multiple connections. +/// +/// Drivers are not required to support partitioned ingest. +/// +/// \since ADBC API revision 1.2.0 +/// +/// @{ + +/// \brief Begin a partitioned bulk ingest. +/// +/// Uses the same semantics as the ADBC_INGEST_OPTION_* options on +/// AdbcStatement. For ADBC_INGEST_OPTION_MODE_CREATE, +/// ADBC_INGEST_OPTION_MODE_CREATE_APPEND, and +/// ADBC_INGEST_OPTION_MODE_REPLACE, `schema` is required and the +/// driver creates (or recreates) the target table at this call. For +/// ADBC_INGEST_OPTION_MODE_APPEND, `schema` is optional; if provided, +/// the driver validates it against the target and returns +/// ADBC_STATUS_ALREADY_EXISTS on mismatch. +/// +/// The returned handle is opaque, serializable, and usable from any +/// connection that can open the same database. The caller releases +/// it via `out_handle->release`; the bytes can be copied and shipped +/// to workers before release. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection The coordinator's connection. +/// \param[in] target_catalog Catalog of the target table, or NULL. +/// \param[in] target_db_schema Schema of the target table, or NULL. +/// \param[in] target_table Name of the target table. Required. +/// \param[in] mode One of ADBC_INGEST_OPTION_MODE_*. Required. +/// \param[in] schema Arrow schema of the data to be written. +/// Required for create/replace/create_append modes; optional for +/// append. +/// \param[out] out_handle Driver-owned handle. Must be released by +/// the caller via `out_handle->release`. +/// \param[out] error Error details, if any. +/// \return ADBC_STATUS_INVALID_ARGUMENT if mode requires a schema +/// but none was provided. +/// \return ADBC_STATUS_ALREADY_EXISTS if append mode is requested +/// and the target schema disagrees with the provided schema. +/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not +/// support partitioned ingest. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error); + +/// \brief Write one partition of a partitioned bulk ingest. +/// +/// Called by a worker, typically on a different connection than the +/// one that created the handle. The driver reads the bound stream +/// to completion, writes its contents to driver-specific staging +/// (per-call: a unique staging table, unique object-store path, etc. +/// — never shared across concurrent writes), and returns an opaque +/// receipt. +/// +/// The stream's schema is validated against the schema recorded in +/// the handle. On mismatch, returns ADBC_STATUS_INVALID_ARGUMENT +/// and produces no receipt. +/// +/// On error of any kind, `out_receipt` is left with `release == +/// NULL` and the caller should retry the whole partition. Partial +/// receipts are never produced. Staging data the driver wrote +/// before the failure becomes orphaned and will be cleaned up by +/// Abort or by driver housekeeping. +/// +/// This call is safe to invoke concurrently from many connections +/// using the same handle. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection The worker's connection. +/// \param[in] handle The handle bytes from Begin. +/// \param[in] handle_len Length of handle. +/// \param[in] data Arrow stream of partition data. The driver +/// consumes the stream and releases it. +/// \param[out] out_receipt Driver-owned receipt. Must be released +/// by the caller via `out_receipt->release`. +/// \param[out] error Error details, if any. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error); + +/// \brief Commit a partitioned bulk ingest. +/// +/// Atomically promotes all writes named by `receipts` into the +/// target table. Semantics of "atomic" are driver-specific: RDBMS +/// drivers typically swap staging into the target in a transaction; +/// table-format drivers (Iceberg, Delta) write a catalog or +/// transaction-log entry referencing the data files in the +/// receipts. +/// +/// After Commit returns successfully, the handle is consumed and +/// must not be used again. +/// +/// Receipts from failed writes, or writes whose receipts were never +/// observed by the coordinator, are not included in the commit. +/// Their staging data is orphaned and is cleaned up as described in +/// AdbcConnectionAbortIngestPartitions. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection A connection — typically the coordinator's, +/// but any connection that can open the same database works. +/// \param[in] handle The handle from Begin. +/// \param[in] handle_len Length of handle. +/// \param[in] num_receipts Number of receipts in the batch. +/// \param[in] receipts Array of receipt byte-pointers. +/// \param[in] receipt_lens Array of receipt lengths. +/// \param[out] rows_affected Number of rows committed, or -1 if +/// unknown. Pass NULL if not wanted. +/// \param[out] error Error details, if any. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error); + +/// \brief Abort a partitioned bulk ingest. +/// +/// Discards all writes scoped to the handle and releases any +/// driver-side resources. The handle is consumed. +/// +/// Drivers must clean up every write scoped to the handle, including +/// writes whose receipts were lost or never observed — not only +/// those named in `receipts`. The handle is the authority for +/// cleanup scope; `receipts`, when provided, are a hint that allows +/// the driver to fast-path deletion of known writes. +/// +/// Abort is best-effort. If cleanup is incomplete, the driver +/// returns a warning status and orphaned storage may remain; it is +/// the driver's responsibility to provide housekeeping (e.g. TTL, +/// background GC, or documented manual cleanup). Callers may also +/// call Abort if the coordinator crashed and was restarted without +/// the original receipts. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection A connection. +/// \param[in] handle The handle from Begin. +/// \param[in] handle_len Length of handle. +/// \param[in] num_receipts Number of receipts, or 0. +/// \param[in] receipts Array of receipt byte-pointers, or NULL. +/// \param[in] receipt_lens Array of receipt lengths, or NULL. +/// \param[out] error Error details, if any. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error); + +/// @} + /// \defgroup adbc-connection-transaction Transaction Semantics /// /// Connections start out in auto-commit mode by default (if diff --git a/docs/source/format/partitioned_bulk_ingest.rst b/docs/source/format/partitioned_bulk_ingest.rst new file mode 100644 index 0000000000..99fa468a7e --- /dev/null +++ b/docs/source/format/partitioned_bulk_ingest.rst @@ -0,0 +1,326 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at +.. +.. http://www.apache.org/licenses/LICENSE-2.0 + +================================ +Proposal: Partitioned Bulk Ingest +================================ + +.. note:: + + Status: draft. Targets ADBC API revision 1.2.0. + +Motivation +========== + +Today ADBC supports two ingest shapes: + +- **Single-writer bulk ingest** — one connection, one statement, one + ``ArrowArrayStream``, one transaction. Good for loading from a single + process; useless for distributed writers. +- **Per-row binding** — slower, also single-connection. + +Two real workloads do not fit: + +1. **Distributed-writer to RDBMS.** A Spark/Flink/Beam job runs N + executors, each producing a partition of the output. Today each + executor opens its own ADBC connection and runs its own bulk + ingest, but the result is *not atomic*: there is no commit point at + which all N partitions become visible together. Workarounds + (per-job staging tables, ad-hoc swap SQL) are database-specific and + leak into application code. + +2. **Distributed-writer to table-format catalogs (Apache Iceberg, + Delta Lake).** These formats are *designed* for distributed + writes: many workers write data files in parallel, and a single + commit step writes a snapshot/manifest in the catalog or + transaction log. ADBC currently has no way to expose this shape. + A driver author who wants to write to Iceberg today has to pick + between (a) routing all writes through one process (defeats the + point) or (b) inventing a private API. + +The unifying observation is that both workloads need the same shape: +**coordinator decides what to ingest, workers write partitions in +parallel, coordinator commits or aborts atomically**. That is the +mirror image of partitioned read (``ExecutePartitions`` / +``ReadPartition``), which ADBC already supports. + +Goals +----- + +- Allow a coordinator to start an ingest, ship an opaque token to N + workers (possibly in different processes or hosts), have each + worker independently write a partition over its own connection, and + finally commit (or abort) atomically from the coordinator. +- Be implementable by both RDBMS drivers (via per-worker staging + tables) and table-format drivers (via per-worker data files + + catalog commit) without forcing either model on the other. +- Survive lost worker writes, dropped receipts, and coordinator + restarts without leaving silent data corruption. +- Keep the per-driver cost low: most of the ingest plumbing + (CREATE TABLE, COPY, schema mapping) is reused from existing + single-writer ingest. + +Non-goals +--------- + +- Schema evolution mid-ingest. Schema is fixed when ``Begin`` is + called; changing it requires starting a new ingest. +- Cross-driver atomicity (writing to two databases in one commit). +- Defining how a distributed engine (Spark, Flink) ships handles and + receipts between processes. That is the application's problem; + the API guarantees only that handles and receipts are opaque, + serializable byte strings. +- Idempotency on the coordinator side. If the coordinator + double-commits (calls ``Commit`` twice on the same handle) the + second call is undefined. + +Design overview +=============== + +Three new operations on ``AdbcConnection``, plus an ``Abort``: + +:: + + coordinator: Begin(table, mode, schema) → handle + workers: Write(handle, stream) → receipt + Write(handle, stream) → receipt + ... + coordinator: Commit(handle, [receipt, receipt, …]) → rows_affected + (or) Abort(handle, [receipts...]) + +The handle and each receipt are **opaque, serializable byte +strings**. This is the same shape as the existing partitioned-read +side, where ``AdbcStatementExecutePartitions`` returns opaque +``AdbcPartitions`` byte strings that can be shipped to workers and +passed to ``AdbcConnectionReadPartition`` over a different +connection. + +API surface +----------- + +C declarations (see ``adbc.h`` for full doc comments): + +.. code-block:: c + + struct AdbcIngestHandle { + size_t length; + const uint8_t* bytes; + void* private_data; + void (*release)(struct AdbcIngestHandle*); + }; + + struct AdbcIngestReceipt { + size_t length; + const uint8_t* bytes; + void* private_data; + void (*release)(struct AdbcIngestReceipt*); + }; + + AdbcConnectionBeginIngestPartitions( + conn, target_catalog, target_db_schema, target_table, mode, + schema, *out_handle, *error); + + AdbcConnectionWriteIngestPartition( + conn, handle_bytes, handle_len, *data_stream, + *out_receipt, *error); + + AdbcConnectionCommitIngestPartitions( + conn, handle_bytes, handle_len, num_receipts, receipts, + receipt_lens, *rows_affected, *error); + + AdbcConnectionAbortIngestPartitions( + conn, handle_bytes, handle_len, num_receipts, receipts, + receipt_lens, *error); + +The asymmetry — outputs are driver-owned structs, inputs are raw +``bytes + len`` — is deliberate and matches the read side: the bytes +are the part the caller serializes for transport, while the structs +hold driver-owned memory that callers release locally. + +Driver-side semantics +--------------------- + +- **Begin** validates options, creates the target table for + ``create``/``replace``/``create_append`` modes, and returns a handle + that encodes whatever state the driver needs to scope subsequent + writes (UUID, target catalog/schema/table, transaction id, object + store prefix, etc.). +- **Write** takes a handle and a stream, writes the partition into + driver-private staging (a per-write staging table, a per-write + object-store path), and returns a receipt encoding what was + written (staging name, file paths, row count, statistics, ...). + Each ``Write`` call must produce output that can be committed or + discarded *independently* — no shared state across concurrent + writes that would cause duplicate rows on retry. +- **Commit** atomically promotes the union of the supplied receipts + into the target. Atomic semantics are driver-specific: RDBMS + drivers swap staging into target in a transaction; table-format + drivers write a catalog or transaction-log entry referencing the + data files in the receipts. After successful commit the handle is + consumed. +- **Abort** discards all writes scoped to the handle. The driver + must clean up *every* write under the handle, not just the ones + named in the supplied receipts (see "Lost receipts" below). + +Cross-process flow +------------------ + +:: + + ┌──────────────┐ + │ coordinator │ Begin(...) ─→ handle + └──────┬───────┘ + │ copy handle.bytes; ship to workers + ▼ + ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ + │ worker 1 │ │ worker 2 │ … │ worker N │ + │ Write(...) →│ │ Write(...) →│ │ Write(...) →│ + │ receipt₁ │ │ receipt₂ │ │ receipt_N │ + └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ + │ copy receipt.bytes; ship back │ + ▼ ▼ ▼ + ┌──────────────────────────────────────────────────────┐ + │ coordinator: Commit(handle, [r₁, r₂, ..., r_N]) │ + └──────────────────────────────────────────────────────┘ + +Workers may use *different* connections than the coordinator — the +handle is self-contained. + +Key design decisions +==================== + +The decisions below were the ones with non-obvious tradeoffs. + +1. Opaque handles and receipts +------------------------------ + +Driver-defined byte strings, no schema imposed by ADBC. This lets a +Postgres driver encode "staging table prefix + UUID" while an +Iceberg driver encodes "snapshot id + data file paths + column +stats" — without ADBC having to model both. The cost is that +applications cannot inspect handles or receipts. Worth it: the only +party that ever needs to interpret them is the driver. + +2. Schema is fixed at ``Begin``, not per-``Write`` +-------------------------------------------------- + +For ``create``/``replace``/``create_append`` modes, the driver +issues ``CREATE TABLE`` (or the catalog equivalent) at ``Begin`` +time, before any worker writes. Workers cannot race to "create on +first write" because they are on different machines. Iceberg/Delta +also need the schema pinned into the transaction snapshot at start. + +For ``append`` mode, the schema parameter is optional; if supplied +it is validated against the target so a thousand workers don't all +fail independently with the same schema-mismatch error. + +3. No caller-supplied partition IDs +----------------------------------- + +Earlier drafts gave each ``Write`` a caller-supplied ``partition_id`` +for idempotent retry. Dropped: receipts are the source of truth for +what gets committed, and well-designed drivers write each ``Write`` +to a unique location (per-call staging table, per-call data file). +A retried ``Write`` produces a *new* receipt; the original write +becomes orphaned and is collected by ``Abort``. Caller-supplied IDs +only matter for drivers that share staging across writes — which they +shouldn't. + +4. Driver-owned output structs (handle, receipt) +------------------------------------------------ + +An earlier draft used the ``GetOptionBytes`` two-phase sizing +pattern: caller passes a buffer + capacity, driver reports required +length, caller retries with a larger buffer. This is correct only +for *idempotent* operations. ``Begin`` and ``Write`` produce +irrecoverable side effects (``CREATE TABLE``, ``COPY``); a +buffer-too-small failure left the side effects in place but gave the +caller no handle/receipt to pass to ``Abort`` — an unrecoverable +orphan. + +The chosen pattern (driver-owned struct with a release callback) +mirrors ``AdbcPartitions`` on the read side, eliminates the orphan +window, and gives drivers a clean place to free internal state. + +5. ``Commit`` and ``Abort`` take raw bytes, not structs +------------------------------------------------------- + +Symmetric with ``AdbcConnectionReadPartition``, which takes the raw +bytes from a ``partitions[i]`` entry rather than the +``AdbcPartitions`` struct. Receipts that traveled across processes +arrive as raw bytes; forcing the caller to wrap them in +``AdbcIngestReceipt`` structs (with bogus ``release`` callbacks) +would be friction without benefit. + +6. Lost receipts are handled by handle-scoped sweep, not by receipts +-------------------------------------------------------------------- + +If a worker writes data but its receipt is lost in transit, the +coordinator's receipt list is incomplete. ``Commit`` will not +include the orphan (correct: only acknowledged writes are +committed). ``Abort``, however, must clean it up — and ``Abort`` +cannot rely on the supplied receipts alone, because the orphan +isn't in them. + +The handle therefore must encode enough scope (UUID prefix, +transaction id, object-store path) for the driver to enumerate +*everything* written under it. Receipts passed to ``Abort`` are an +optimization (fast-path deletion of known writes); the handle is the +authority for cleanup scope. Drivers that cannot enumerate from +the handle alone cannot correctly implement partitioned ingest. + +7. Coordinator may die without calling ``Commit`` or ``Abort`` +-------------------------------------------------------------- + +The handle is opaque to the driver outside of ``Write``, so the +driver has no built-in liveness signal. Recommended (not required) +behaviors: + +- Drivers may TTL or background-GC handle-scoped writes. +- Callers may persist the handle bytes and call ``Abort`` after + restart to recover. +- Iceberg/Delta drivers can rely on existing orphan-file cleanup + tooling. + +The spec does not mandate any of these; it documents the failure +mode and leaves the policy to drivers. + +Open questions +============== + +These are intentionally unresolved in the initial revision. + +- **Single-coordinator commit only.** Two coordinators calling + ``Commit`` on the same handle concurrently is undefined. Should + drivers be required to detect and reject this, or is it the + caller's responsibility? +- **Subset writes.** Today the prototype assumes each worker writes + the same column set. Receipts encode the column list, so it is + possible to support per-worker subsets in the future, but this is + not specified yet. +- **Append-mode schema validation.** The schema parameter is + optional in ``append`` mode. Should drivers be *required* to + validate when a schema is supplied? Currently "should". +- **Streaming Commit.** Today ``Commit`` takes all receipts at + once. For very-many-partition jobs (10k+ workers) it may be + preferable to incrementally accumulate receipts. Out of scope for + v1. + +Reference implementation +======================== + +A prototype lives in the PostgreSQL driver +(``c/driver/postgresql/ingest_partition.{h,cc}``). It uses +per-worker ``UNLOGGED`` staging tables of the form +``adbc_stg__``, a single ``BEGIN``/``COMMIT`` +wrapping ``INSERT INTO target SELECT cols FROM staging`` for each +receipt, and an ``Abort`` that scans +``information_schema.tables`` for the handle's prefix. Test +coverage is in ``c/driver/postgresql/partitioned_ingest_test.cc``. diff --git a/docs/source/index.rst b/docs/source/index.rst index 5d4b8966ff..5f01e11078 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -253,6 +253,7 @@ Why ADBC? :hidden: format/specification + format/partitioned_bulk_ingest format/versioning format/comparison format/how_manager From 311edc36baa0de49662149558cfc4a6766f842ea Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 10:44:42 +0400 Subject: [PATCH 02/11] wire driver manager, soften schema validation claim --- .../postgresql/partitioned_ingest_test.cc | 3 +- c/driver_manager/adbc_driver_manager_api.cc | 102 +++++++++++++++++- c/include/arrow-adbc/adbc.h | 8 +- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc index 058d8e9a15..c44f12ab5a 100644 --- a/c/driver/postgresql/partitioned_ingest_test.cc +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -268,7 +268,8 @@ TEST_F(PostgresPartitionedIngestTest, AbortDropsAllStagingIncludingOrphans) { ASSERT_EQ(AdbcStatementSetSqlQuery( &stmt, "SELECT COUNT(*) FROM information_schema.tables " - "WHERE table_name LIKE 'adbc_stg_%'", + "WHERE table_schema = current_schema() " + "AND table_name LIKE 'adbc_stg_%'", &error), ADBC_STATUS_OK); ArrowArrayStream stream{}; diff --git a/c/driver_manager/adbc_driver_manager_api.cc b/c/driver_manager/adbc_driver_manager_api.cc index 19f8e7b4c5..9622de92c8 100644 --- a/c/driver_manager/adbc_driver_manager_api.cc +++ b/c/driver_manager/adbc_driver_manager_api.cc @@ -415,6 +415,36 @@ AdbcStatusCode StatementSetSubstraitPlan(struct AdbcStatement*, const uint8_t*, SetError(error, "AdbcStatementSetSubstraitPlan not implemented"); return ADBC_STATUS_NOT_IMPLEMENTED; } + +AdbcStatusCode ConnectionBeginIngestPartitions( + struct AdbcConnection*, const char*, const char*, const char*, const char*, + struct ArrowSchema*, struct AdbcIngestHandle*, struct AdbcError* error) { + SetError(error, "AdbcConnectionBeginIngestPartitions not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode ConnectionWriteIngestPartition(struct AdbcConnection*, const uint8_t*, + size_t, struct ArrowArrayStream*, + struct AdbcIngestReceipt*, + struct AdbcError* error) { + SetError(error, "AdbcConnectionWriteIngestPartition not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode ConnectionCommitIngestPartitions(struct AdbcConnection*, const uint8_t*, + size_t, size_t, const uint8_t**, + const size_t*, int64_t*, + struct AdbcError* error) { + SetError(error, "AdbcConnectionCommitIngestPartitions not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode ConnectionAbortIngestPartitions(struct AdbcConnection*, const uint8_t*, + size_t, size_t, const uint8_t**, + const size_t*, struct AdbcError* error) { + SetError(error, "AdbcConnectionAbortIngestPartitions not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} } // namespace // ============================================================================= @@ -1063,6 +1093,65 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, out, connection); } +AdbcStatusCode AdbcConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionBeginIngestPartitions: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionBeginIngestPartitions( + connection, target_catalog, target_db_schema, target_table, mode, schema, out_handle, + error); +} + +AdbcStatusCode AdbcConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error) { + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionWriteIngestPartition: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionWriteIngestPartition( + connection, handle, handle_len, data, out_receipt, error); +} + +AdbcStatusCode AdbcConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error) { + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionCommitIngestPartitions: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionCommitIngestPartitions( + connection, handle, handle_len, num_receipts, receipts, receipt_lens, rows_affected, + error); +} + +AdbcStatusCode AdbcConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error) { + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionAbortIngestPartitions: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionAbortIngestPartitions( + connection, handle, handle_len, num_receipts, receipts, receipt_lens, error); +} + AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection, struct AdbcError* error) { if (!connection->private_driver) { @@ -1442,7 +1531,8 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint, AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int version, void* raw_driver, struct AdbcError* error) { - constexpr std::array kSupportedVersions = { + constexpr std::array kSupportedVersions = { + ADBC_VERSION_1_2_0, ADBC_VERSION_1_1_0, ADBC_VERSION_1_0_0, }; @@ -1455,9 +1545,10 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers switch (version) { case ADBC_VERSION_1_0_0: case ADBC_VERSION_1_1_0: + case ADBC_VERSION_1_2_0: break; default: - SetError(error, "Only ADBC 1.0.0 and 1.1.0 are supported"); + SetError(error, "Only ADBC 1.0.0, 1.1.0, and 1.2.0 are supported"); return ADBC_STATUS_NOT_IMPLEMENTED; } @@ -1550,6 +1641,13 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers FILL_DEFAULT(driver, StatementSetOptionDouble); FILL_DEFAULT(driver, StatementSetOptionInt); } + if (version >= ADBC_VERSION_1_2_0) { + auto* driver = reinterpret_cast(raw_driver); + FILL_DEFAULT(driver, ConnectionBeginIngestPartitions); + FILL_DEFAULT(driver, ConnectionWriteIngestPartition); + FILL_DEFAULT(driver, ConnectionCommitIngestPartitions); + FILL_DEFAULT(driver, ConnectionAbortIngestPartitions); + } return ADBC_STATUS_OK; diff --git a/c/include/arrow-adbc/adbc.h b/c/include/arrow-adbc/adbc.h index 6363ba84ce..abdc62dbc3 100644 --- a/c/include/arrow-adbc/adbc.h +++ b/c/include/arrow-adbc/adbc.h @@ -2118,9 +2118,11 @@ AdbcStatusCode AdbcConnectionBeginIngestPartitions( /// — never shared across concurrent writes), and returns an opaque /// receipt. /// -/// The stream's schema is validated against the schema recorded in -/// the handle. On mismatch, returns ADBC_STATUS_INVALID_ARGUMENT -/// and produces no receipt. +/// The stream's schema should be compatible with the target table's +/// schema. Drivers may validate this at any point during the write; +/// on mismatch the call fails and produces no receipt. The exact +/// validation mechanism is driver-specific (e.g., RDBMS drivers may +/// rely on the staging table DDL to enforce compatibility). /// /// On error of any kind, `out_receipt` is left with `release == /// NULL` and the caller should retry the whole partition. Partial From b29e713f28e9b3128c332c56b5678532c8a7c49f Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 10:52:26 +0400 Subject: [PATCH 03/11] fix(postgresql): harden partitioned ingest commit and abort test - CommitIngestPartitions: detect open outer transaction via PQtransactionStatus and scope the commit with SAVEPOINT / RELEASE instead of BEGIN / COMMIT, so calling Commit does not silently close or roll back the caller's outer transaction. Reject error/unknown transaction states with INVALID_STATE. - Scope the abort test's leftover-staging query to the current ingest handle's prefix instead of matching all adbc_stg_* tables in the schema, so the test isn't flaky against prior runs or parallel tests. - Add a static_assert that the generated staging table name fits under PostgreSQL's default NAMEDATALEN, so any future widening of the id or prefix fails at compile time instead of silently truncating and causing name collisions during abort. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver/postgresql/ingest_partition.cc | 58 ++++++++++++++++--- .../postgresql/partitioned_ingest_test.cc | 26 ++++++--- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc index a7a8f48c28..d0689a4d4a 100644 --- a/c/driver/postgresql/ingest_partition.cc +++ b/c/driver/postgresql/ingest_partition.cc @@ -87,6 +87,19 @@ void IngestHandle::GenerateId(std::array* out) { std::memcpy(out->data() + 8, &b, 8); } +namespace { +// Staging table name is "adbc_stg_" (9) + 32-hex handle id + "_" (1) + 16-hex +// suffix. Postgres' default NAMEDATALEN is 64, giving a 63-char identifier +// limit before silent truncation — which would cause name collisions and miss +// staging tables during Abort. +constexpr size_t kStagingPrefixLen = 9 + 32 + 1; +constexpr size_t kStagingSuffixLen = 16; +constexpr size_t kStagingMaxIdentLen = 63; +static_assert(kStagingPrefixLen + kStagingSuffixLen <= kStagingMaxIdentLen, + "staging table name would exceed PostgreSQL NAMEDATALEN-1 and be " + "silently truncated"); +} // namespace + std::string IngestHandle::StagingPrefix() const { return "adbc_stg_" + HexId(ingest_id) + "_"; } @@ -474,19 +487,50 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( if (code != ADBC_STATUS_OK) return code; std::string qualified_target = escaped_target_schema + "." + escaped_target_table; - code = ExecSimple(conn_, "BEGIN", error); + // Decide how to scope the commit to avoid silently mutating caller transaction + // state: when no outer transaction is open, use BEGIN/COMMIT; when one is + // already active, use a SAVEPOINT so we only release ingest-local work and + // leave the caller's outer transaction intact. Reject error/unknown states. + PGTransactionStatusType txn_status = PQtransactionStatus(conn_); + bool use_savepoint; + switch (txn_status) { + case PQTRANS_IDLE: + use_savepoint = false; + break; + case PQTRANS_INTRANS: + use_savepoint = true; + break; + default: + InternalAdbcSetError( + error, + "[libpq] cannot commit partitioned ingest: connection transaction state " + "is not idle or in-transaction (status=%d)", + static_cast(txn_status)); + return ADBC_STATUS_INVALID_STATE; + } + + static const char kSavepointName[] = "adbc_ingest_commit"; + const std::string open_sql = + use_savepoint ? std::string("SAVEPOINT ") + kSavepointName : "BEGIN"; + const std::string commit_sql = use_savepoint + ? std::string("RELEASE SAVEPOINT ") + kSavepointName + : "COMMIT"; + const std::string rollback_sql = + use_savepoint ? std::string("ROLLBACK TO SAVEPOINT ") + kSavepointName : "ROLLBACK"; + + code = ExecSimple(conn_, open_sql, error); if (code != ADBC_STATUS_OK) return code; int64_t total_rows = 0; for (const auto& r : parsed) { std::string esc_sch = EscapeIdent(conn_, r.staging_schema, error, &code); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, "ROLLBACK", nullptr); + ExecSimple(conn_, rollback_sql, nullptr); return code; } std::string esc_tbl = EscapeIdent(conn_, r.staging_table, error, &code); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, "ROLLBACK", nullptr); + ExecSimple(conn_, rollback_sql, nullptr); return code; } std::string qualified_staging = esc_sch + "." + esc_tbl; @@ -495,20 +539,20 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( ") SELECT " + r.escaped_columns + " FROM " + qualified_staging; code = ExecSimple(conn_, insert, error); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, "ROLLBACK", nullptr); + ExecSimple(conn_, rollback_sql, nullptr); return code; } code = ExecSimple(conn_, "DROP TABLE " + qualified_staging, error); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, "ROLLBACK", nullptr); + ExecSimple(conn_, rollback_sql, nullptr); return code; } total_rows += r.row_count; } - code = ExecSimple(conn_, "COMMIT", error); + code = ExecSimple(conn_, commit_sql, error); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, "ROLLBACK", nullptr); + ExecSimple(conn_, rollback_sql, nullptr); return code; } diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc index c44f12ab5a..93ea2accfe 100644 --- a/c/driver/postgresql/partitioned_ingest_test.cc +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -262,16 +262,28 @@ TEST_F(PostgresPartitionedIngestTest, AbortDropsAllStagingIncludingOrphans) { ADBC_STATUS_OK) << error.message; + // Derive the handle's staging prefix from the wire format (4-byte magic + // "PIH1" + 16-byte id) so the verification query is scoped to this test's + // handle and not to every ingest ever run against the database. + ASSERT_GE(handle.length, static_cast(4 + 16)); + std::string handle_prefix = "adbc_stg_"; + static const char kHex[] = "0123456789abcdef"; + for (size_t i = 0; i < 16; i++) { + uint8_t b = handle.bytes[4 + i]; + handle_prefix += kHex[b >> 4]; + handle_prefix += kHex[b & 0x0F]; + } + handle_prefix += '_'; + // Verify no staging tables left under the handle's prefix. AdbcStatement stmt{}; ASSERT_EQ(AdbcStatementNew(&c.conn, &stmt, &error), ADBC_STATUS_OK); - ASSERT_EQ(AdbcStatementSetSqlQuery( - &stmt, - "SELECT COUNT(*) FROM information_schema.tables " - "WHERE table_schema = current_schema() " - "AND table_name LIKE 'adbc_stg_%'", - &error), - ADBC_STATUS_OK); + std::string count_sql = + "SELECT COUNT(*) FROM information_schema.tables " + "WHERE table_schema = current_schema() " + "AND table_name LIKE '" + + handle_prefix + "%'"; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, count_sql.c_str(), &error), ADBC_STATUS_OK); ArrowArrayStream stream{}; ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, &stream, nullptr, &error), ADBC_STATUS_OK); ArrowSchema schema{}; From 82d38cff08c7f17f7b8c63f33b117880cda24b1b Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 11:01:27 +0400 Subject: [PATCH 04/11] fix(driver_manager): harden 1.2.0 ingest wrappers and cover version bridging - Guard the four new AdbcConnection*IngestPartitions wrappers against NULL connection (and Begin/Write out-params) before touching private_driver, so bad callers get ADBC_STATUS_INVALID_ARGUMENT instead of a null dereference. - Make kSupportedVersions the single source of truth for the AdbcLoadDriverFromInitFunc version check (std::find) instead of a switch that has to be kept in sync manually. - Add a gtest that loads a pre-1.2.0 driver through the manager at ADBC_VERSION_1_2_0 and asserts each new ingest entry point routes to the NOT_IMPLEMENTED default stub (plus the struct-size sanity check now tracks ADBC_DRIVER_1_2_0_SIZE). - Docstring for AdbcConnectionWriteIngestPartition now notes that a failed write may leave partial server-side state and the caller must still Abort the handle to release staging resources. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver_manager/adbc_driver_manager_api.cc | 37 +++++++++++++---- .../adbc_version_100_compatibility_test.cc | 40 ++++++++++++++++++- c/include/arrow-adbc/adbc.h | 8 ++-- 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/c/driver_manager/adbc_driver_manager_api.cc b/c/driver_manager/adbc_driver_manager_api.cc index 9622de92c8..cfcb3ffa67 100644 --- a/c/driver_manager/adbc_driver_manager_api.cc +++ b/c/driver_manager/adbc_driver_manager_api.cc @@ -17,6 +17,7 @@ // ADBC API implementations +#include #include #include #include @@ -1098,6 +1099,14 @@ AdbcStatusCode AdbcConnectionBeginIngestPartitions( const char* target_db_schema, const char* target_table, const char* mode, struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionBeginIngestPartitions: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!out_handle) { + SetError(error, "AdbcConnectionBeginIngestPartitions: out_handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!connection->private_driver) { SetError(error, "AdbcConnectionBeginIngestPartitions: must call AdbcConnectionNew first"); @@ -1113,6 +1122,14 @@ AdbcStatusCode AdbcConnectionWriteIngestPartition( struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionWriteIngestPartition: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!out_receipt) { + SetError(error, "AdbcConnectionWriteIngestPartition: out_receipt is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!connection->private_driver) { SetError(error, "AdbcConnectionWriteIngestPartition: must call AdbcConnectionNew first"); @@ -1127,6 +1144,10 @@ AdbcStatusCode AdbcConnectionCommitIngestPartitions( struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, int64_t* rows_affected, struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionCommitIngestPartitions: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!connection->private_driver) { SetError(error, "AdbcConnectionCommitIngestPartitions: must call AdbcConnectionNew first"); @@ -1142,6 +1163,10 @@ AdbcStatusCode AdbcConnectionAbortIngestPartitions( struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionAbortIngestPartitions: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!connection->private_driver) { SetError(error, "AdbcConnectionAbortIngestPartitions: must call AdbcConnectionNew first"); @@ -1542,14 +1567,10 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers return ADBC_STATUS_INVALID_ARGUMENT; } - switch (version) { - case ADBC_VERSION_1_0_0: - case ADBC_VERSION_1_1_0: - case ADBC_VERSION_1_2_0: - break; - default: - SetError(error, "Only ADBC 1.0.0, 1.1.0, and 1.2.0 are supported"); - return ADBC_STATUS_NOT_IMPLEMENTED; + if (std::find(kSupportedVersions.begin(), kSupportedVersions.end(), version) == + kSupportedVersions.end()) { + SetError(error, "Only ADBC 1.0.0, 1.1.0, and 1.2.0 are supported"); + return ADBC_STATUS_NOT_IMPLEMENTED; } #define FILL_DEFAULT(DRIVER, STUB) \ diff --git a/c/driver_manager/adbc_version_100_compatibility_test.cc b/c/driver_manager/adbc_version_100_compatibility_test.cc index 43079ecb3e..dc5bc44513 100644 --- a/c/driver_manager/adbc_version_100_compatibility_test.cc +++ b/c/driver_manager/adbc_version_100_compatibility_test.cc @@ -59,7 +59,7 @@ TEST_F(AdbcVersion, StructSize) { ASSERT_EQ(sizeof(AdbcError), ADBC_ERROR_1_1_0_SIZE); ASSERT_EQ(sizeof(AdbcDriverVersion100), ADBC_DRIVER_1_0_0_SIZE); - ASSERT_EQ(sizeof(AdbcDriver), ADBC_DRIVER_1_1_0_SIZE); + ASSERT_EQ(sizeof(AdbcDriver), ADBC_DRIVER_1_2_0_SIZE); } // Initialize a version 1.0.0 driver with the version 1.1.0 driver struct. @@ -105,6 +105,44 @@ TEST_F(AdbcVersion, OldDriverNewManager) { EXPECT_NE(driver.StatementSetOptionDouble, nullptr); } +// When a pre-1.2.0 driver is loaded at ADBC_VERSION_1_2_0, the new +// partitioned-ingest entry points must be populated with default stubs that +// return ADBC_STATUS_NOT_IMPLEMENTED, so the driver-manager wrappers do not +// dereference null function pointers. +TEST_F(AdbcVersion, OldDriverNewManagerPartitionedIngest) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, ADBC_VERSION_1_2_0, + &driver, &error), + IsOkStatus(&error)); + + ASSERT_NE(driver.ConnectionBeginIngestPartitions, nullptr); + ASSERT_NE(driver.ConnectionWriteIngestPartition, nullptr); + ASSERT_NE(driver.ConnectionCommitIngestPartitions, nullptr); + ASSERT_NE(driver.ConnectionAbortIngestPartitions, nullptr); + + struct AdbcError stub_error = {}; + + EXPECT_THAT(driver.ConnectionBeginIngestPartitions(nullptr, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr, + &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); + + EXPECT_THAT(driver.ConnectionWriteIngestPartition(nullptr, nullptr, 0, nullptr, nullptr, + &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); + + EXPECT_THAT(driver.ConnectionCommitIngestPartitions(nullptr, nullptr, 0, 0, nullptr, + nullptr, nullptr, &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); + + EXPECT_THAT(driver.ConnectionAbortIngestPartitions(nullptr, nullptr, 0, 0, nullptr, + nullptr, &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); +} + // N.B. see postgresql_test.cc for backwards compatibility test of AdbcError // N.B. see postgresql_test.cc for backwards compatibility test of AdbcDriver diff --git a/c/include/arrow-adbc/adbc.h b/c/include/arrow-adbc/adbc.h index abdc62dbc3..48ec4fc82c 100644 --- a/c/include/arrow-adbc/adbc.h +++ b/c/include/arrow-adbc/adbc.h @@ -2126,9 +2126,11 @@ AdbcStatusCode AdbcConnectionBeginIngestPartitions( /// /// On error of any kind, `out_receipt` is left with `release == /// NULL` and the caller should retry the whole partition. Partial -/// receipts are never produced. Staging data the driver wrote -/// before the failure becomes orphaned and will be cleaned up by -/// Abort or by driver housekeeping. +/// receipts are never produced. The driver may, however, leave +/// partial server-side state (for example, a per-call staging +/// table); the caller must still call `AbortIngestPartitions` for +/// the handle (with no receipt for this failed write) to release +/// any staging resources, or rely on driver housekeeping. /// /// This call is safe to invoke concurrently from many connections /// using the same handle. From 54ca3349c5402a173d5b0f2abf849213edb54f0f Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 11:51:15 +0400 Subject: [PATCH 05/11] fix(postgresql): isolate ingest-commit savepoint from caller state Scope the CommitIngestPartitions savepoint to a per-call name derived from the handle id so it cannot alias a caller-managed savepoint, and always RELEASE the savepoint after ROLLBACK TO SAVEPOINT so the caller's savepoint stack is restored to its pre-call shape on failure. Add tests covering the SAVEPOINT branch (visibility inside the outer transaction and rollback semantics) and the aborted-transaction rejection path. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver/postgresql/ingest_partition.cc | 37 +++-- .../postgresql/partitioned_ingest_test.cc | 148 ++++++++++++++++++ 2 files changed, 173 insertions(+), 12 deletions(-) diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc index d0689a4d4a..d819143607 100644 --- a/c/driver/postgresql/ingest_partition.cc +++ b/c/driver/postgresql/ingest_partition.cc @@ -509,14 +509,27 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( return ADBC_STATUS_INVALID_STATE; } - static const char kSavepointName[] = "adbc_ingest_commit"; + // Derive a unique savepoint name from the handle's ingest_id so the driver + // cannot collide with a caller-managed savepoint of the same name. Only used + // when use_savepoint is true, but cheap to compute unconditionally. + const std::string savepoint_name = "adbc_ingest_commit_" + HexId(handle.ingest_id); const std::string open_sql = - use_savepoint ? std::string("SAVEPOINT ") + kSavepointName : "BEGIN"; - const std::string commit_sql = use_savepoint - ? std::string("RELEASE SAVEPOINT ") + kSavepointName - : "COMMIT"; - const std::string rollback_sql = - use_savepoint ? std::string("ROLLBACK TO SAVEPOINT ") + kSavepointName : "ROLLBACK"; + use_savepoint ? std::string("SAVEPOINT ") + savepoint_name : "BEGIN"; + const std::string commit_sql = + use_savepoint ? std::string("RELEASE SAVEPOINT ") + savepoint_name : "COMMIT"; + const std::string rollback_sql = use_savepoint ? std::string("ROLLBACK TO SAVEPOINT ") + + savepoint_name + : "ROLLBACK"; + + // ROLLBACK TO SAVEPOINT leaves the savepoint on the stack in PG, so also + // RELEASE it after rollback to restore the caller's savepoint stack to its + // pre-call shape. Best-effort — errors are ignored. + auto abort_ingest = [&]() { + ExecSimple(conn_, rollback_sql, nullptr); + if (use_savepoint) { + ExecSimple(conn_, std::string("RELEASE SAVEPOINT ") + savepoint_name, nullptr); + } + }; code = ExecSimple(conn_, open_sql, error); if (code != ADBC_STATUS_OK) return code; @@ -525,12 +538,12 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( for (const auto& r : parsed) { std::string esc_sch = EscapeIdent(conn_, r.staging_schema, error, &code); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, rollback_sql, nullptr); + abort_ingest(); return code; } std::string esc_tbl = EscapeIdent(conn_, r.staging_table, error, &code); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, rollback_sql, nullptr); + abort_ingest(); return code; } std::string qualified_staging = esc_sch + "." + esc_tbl; @@ -539,12 +552,12 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( ") SELECT " + r.escaped_columns + " FROM " + qualified_staging; code = ExecSimple(conn_, insert, error); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, rollback_sql, nullptr); + abort_ingest(); return code; } code = ExecSimple(conn_, "DROP TABLE " + qualified_staging, error); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, rollback_sql, nullptr); + abort_ingest(); return code; } total_rows += r.row_count; @@ -552,7 +565,7 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( code = ExecSimple(conn_, commit_sql, error); if (code != ADBC_STATUS_OK) { - ExecSimple(conn_, rollback_sql, nullptr); + abort_ingest(); return code; } diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc index 93ea2accfe..ce5edd5964 100644 --- a/c/driver/postgresql/partitioned_ingest_test.cc +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -305,3 +305,151 @@ TEST_F(PostgresPartitionedIngestTest, AbortDropsAllStagingIncludingOrphans) { Cleanup(&c, table, &error); CloseConn(&c, &error); } + +// With the coordinator connection already inside an outer transaction, +// CommitIngestPartitions must take the SAVEPOINT path: the ingest rows become +// visible to in-transaction SELECTs, and the lifetime of the ingest is tied to +// the outer transaction (persists on COMMIT, rolls back on ROLLBACK). +TEST_F(PostgresPartitionedIngestTest, CommitInsideOuterTransactionUsesSavepoint) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_savepoint_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair coordinator; + OpenConn(&coordinator, &error, uri); + Cleanup(&coordinator, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &coordinator.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // One worker writes one partition on a separate connection. + constexpr int32_t kRows = 25; + ConnPair worker; + OpenConn(&worker, &error, uri); + ArrowArrayStream stream{}; + MakeBatchStream(&stream, /*start_id=*/0, kRows); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&worker.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + std::vector receipt_bytes(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + CloseConn(&worker, &error); + + // Put the coordinator connection into an outer transaction so Commit must + // take the SAVEPOINT branch. + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + ADBC_OPTION_VALUE_DISABLED, &error), + ADBC_STATUS_OK) + << error.message; + + const uint8_t* rec_ptr = receipt_bytes.data(); + size_t rec_len = receipt_bytes.size(); + int64_t rows_committed = 0; + ASSERT_EQ(AdbcConnectionCommitIngestPartitions(&coordinator.conn, handle.bytes, + handle.length, 1, &rec_ptr, &rec_len, + &rows_committed, &error), + ADBC_STATUS_OK) + << error.message; + EXPECT_EQ(rows_committed, kRows); + + // (a) Rows visible to in-transaction SELECT on the same connection. + EXPECT_EQ(SelectCount(&coordinator, table, &error), kRows); + + // (b) A caller-driven ROLLBACK undoes the ingest: RELEASE SAVEPOINT merges + // the ingest work into the outer transaction, and rolling back the outer + // transaction rolls back everything in it. + ASSERT_EQ(AdbcConnectionRollback(&coordinator.conn, &error), ADBC_STATUS_OK) + << error.message; + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + "true", &error), + ADBC_STATUS_OK) + << error.message; + EXPECT_EQ(SelectCount(&coordinator, table, &error), 0); + + handle.release(&handle); + Cleanup(&coordinator, table, &error); + CloseConn(&coordinator, &error); +} + +// Calling CommitIngestPartitions while the connection is in an aborted +// transaction must fail cleanly with ADBC_STATUS_INVALID_STATE instead of +// issuing SQL that would further mutate caller transaction state. +TEST_F(PostgresPartitionedIngestTest, CommitRejectsAbortedTransaction) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_inerror_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair c; + OpenConn(&c, &error, uri); + Cleanup(&c, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &c.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + ArrowArrayStream stream{}; + MakeBatchStream(&stream, 0, 5); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&c.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + std::vector receipt_bytes(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + + // Put the connection in an aborted-transaction state: disable autocommit, + // then issue a statement that errors (SELECT from a missing relation). + ASSERT_EQ(AdbcConnectionSetOption(&c.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + ADBC_OPTION_VALUE_DISABLED, &error), + ADBC_STATUS_OK) + << error.message; + { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&c.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery( + &stmt, "SELECT * FROM nonexistent_relation_adbc_x", &error), + ADBC_STATUS_OK); + AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error); + AdbcStatementRelease(&stmt, &error); + } + + const uint8_t* rec_ptr = receipt_bytes.data(); + size_t rec_len = receipt_bytes.size(); + AdbcError commit_err = ADBC_ERROR_INIT; + AdbcStatusCode rc = AdbcConnectionCommitIngestPartitions( + &c.conn, handle.bytes, handle.length, 1, &rec_ptr, &rec_len, + /*rows_affected=*/nullptr, &commit_err); + EXPECT_EQ(rc, ADBC_STATUS_INVALID_STATE) << (commit_err.message ? commit_err.message : ""); + if (commit_err.release) commit_err.release(&commit_err); + + // Restore autocommit so Cleanup can drop the table. + AdbcConnectionRollback(&c.conn, &error); + AdbcConnectionSetOption(&c.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, "true", &error); + + // Best-effort cleanup of staging tables left behind by the failed commit. + AdbcConnectionAbortIngestPartitions(&c.conn, handle.bytes, handle.length, 0, nullptr, + nullptr, &error); + + handle.release(&handle); + Cleanup(&c, table, &error); + CloseConn(&c, &error); +} From 9ccfb23b35dbdea1b4de01318691ff51b8e55812 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 11:56:03 +0400 Subject: [PATCH 06/11] fix(driver_manager): null-guard ingest wrappers and expand wrapper tests - Reject NULL handle/data in AdbcConnectionWriteIngestPartition and NULL handle plus NULL receipts/receipt_lens (when num_receipts > 0) in Commit/Abort, returning ADBC_STATUS_INVALID_ARGUMENT before any driver dispatch. - Add gtests that exercise the public AdbcConnection*IngestPartitions entry points: one case proves a NULL connection short-circuits to INVALID_ARGUMENT, another wires a real connection to a 1.2.0-loaded driver and confirms each wrapper dispatches to the FILL_DEFAULT NOT_IMPLEMENTED stub. - Add a test that AdbcLoadDriverFromInitFunc rejects an unrecognized version constant with ADBC_STATUS_NOT_IMPLEMENTED, guarding the std::find / kSupportedVersions check from silent regressions. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver_manager/adbc_driver_manager_api.cc | 26 ++++++ .../adbc_version_100_compatibility_test.cc | 88 +++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/c/driver_manager/adbc_driver_manager_api.cc b/c/driver_manager/adbc_driver_manager_api.cc index cfcb3ffa67..45cc63c159 100644 --- a/c/driver_manager/adbc_driver_manager_api.cc +++ b/c/driver_manager/adbc_driver_manager_api.cc @@ -1126,6 +1126,14 @@ AdbcStatusCode AdbcConnectionWriteIngestPartition( SetError(error, "AdbcConnectionWriteIngestPartition: connection is NULL"); return ADBC_STATUS_INVALID_ARGUMENT; } + if (!handle) { + SetError(error, "AdbcConnectionWriteIngestPartition: handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!data) { + SetError(error, "AdbcConnectionWriteIngestPartition: data is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!out_receipt) { SetError(error, "AdbcConnectionWriteIngestPartition: out_receipt is NULL"); return ADBC_STATUS_INVALID_ARGUMENT; @@ -1148,6 +1156,15 @@ AdbcStatusCode AdbcConnectionCommitIngestPartitions( SetError(error, "AdbcConnectionCommitIngestPartitions: connection is NULL"); return ADBC_STATUS_INVALID_ARGUMENT; } + if (!handle) { + SetError(error, "AdbcConnectionCommitIngestPartitions: handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (num_receipts > 0 && (!receipts || !receipt_lens)) { + SetError(error, + "AdbcConnectionCommitIngestPartitions: receipts/receipt_lens are NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!connection->private_driver) { SetError(error, "AdbcConnectionCommitIngestPartitions: must call AdbcConnectionNew first"); @@ -1167,6 +1184,15 @@ AdbcStatusCode AdbcConnectionAbortIngestPartitions( SetError(error, "AdbcConnectionAbortIngestPartitions: connection is NULL"); return ADBC_STATUS_INVALID_ARGUMENT; } + if (!handle) { + SetError(error, "AdbcConnectionAbortIngestPartitions: handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (num_receipts > 0 && (!receipts || !receipt_lens)) { + SetError(error, + "AdbcConnectionAbortIngestPartitions: receipts/receipt_lens are NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } if (!connection->private_driver) { SetError(error, "AdbcConnectionAbortIngestPartitions: must call AdbcConnectionNew first"); diff --git a/c/driver_manager/adbc_version_100_compatibility_test.cc b/c/driver_manager/adbc_version_100_compatibility_test.cc index dc5bc44513..bc82cfa3bb 100644 --- a/c/driver_manager/adbc_version_100_compatibility_test.cc +++ b/c/driver_manager/adbc_version_100_compatibility_test.cc @@ -143,6 +143,94 @@ TEST_F(AdbcVersion, OldDriverNewManagerPartitionedIngest) { if (stub_error.release) stub_error.release(&stub_error); } +// The public AdbcConnection*IngestPartitions wrappers must reject a NULL +// connection with ADBC_STATUS_INVALID_ARGUMENT instead of dereferencing it. +TEST_F(AdbcVersion, IngestPartitionsWrappersRejectNullConnection) { + uint8_t handle_bytes[1] = {0}; + struct ArrowArrayStream stream = {}; + struct ArrowSchema schema = {}; + struct AdbcIngestHandle out_handle = {}; + struct AdbcIngestReceipt out_receipt = {}; + int64_t rows_affected = 0; + struct AdbcError local_error = {}; + + EXPECT_THAT( + AdbcConnectionBeginIngestPartitions(nullptr, nullptr, nullptr, "t", + ADBC_INGEST_OPTION_MODE_CREATE, &schema, + &out_handle, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionWriteIngestPartition(nullptr, handle_bytes, 1, &stream, + &out_receipt, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionCommitIngestPartitions(nullptr, handle_bytes, 1, 0, nullptr, + nullptr, &rows_affected, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(nullptr, handle_bytes, 1, 0, nullptr, + nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); +} + +// With a real connection wired to a 1.2.0-loaded driver, the public wrappers +// must dispatch to the FILL_DEFAULT stubs and surface NOT_IMPLEMENTED. +TEST_F(AdbcVersion, IngestPartitionsWrappersDispatchToStubs) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, ADBC_VERSION_1_2_0, + &driver, &error), + IsOkStatus(&error)); + + struct AdbcConnection connection = {}; + connection.private_driver = &driver; + + uint8_t handle_bytes[1] = {0}; + struct ArrowArrayStream stream = {}; + struct ArrowSchema schema = {}; + struct AdbcIngestHandle out_handle = {}; + struct AdbcIngestReceipt out_receipt = {}; + int64_t rows_affected = 0; + struct AdbcError local_error = {}; + + EXPECT_THAT(AdbcConnectionBeginIngestPartitions(&connection, nullptr, nullptr, "t", + ADBC_INGEST_OPTION_MODE_CREATE, &schema, + &out_handle, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, handle_bytes, 1, &stream, + &out_receipt, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT( + AdbcConnectionCommitIngestPartitions(&connection, handle_bytes, 1, 0, nullptr, + nullptr, &rows_affected, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(&connection, handle_bytes, 1, 0, + nullptr, nullptr, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Don't let TearDown observe a borrowed driver pointer. + connection.private_driver = nullptr; +} + +// AdbcLoadDriverFromInitFunc must reject unrecognized version constants with +// ADBC_STATUS_NOT_IMPLEMENTED so a typo in kSupportedVersions cannot silently +// regress version gating. +TEST_F(AdbcVersion, LoadDriverFromInitFuncRejectsUnknownVersion) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, /*bogus*/ 0x12345678, + &driver, &error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &error)); + ASSERT_EQ(driver.release, nullptr); +} + // N.B. see postgresql_test.cc for backwards compatibility test of AdbcError // N.B. see postgresql_test.cc for backwards compatibility test of AdbcDriver From 4197392f8e67aa413c6d65c11bf82f2035bfe003 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 12:13:50 +0400 Subject: [PATCH 07/11] fix(postgresql): guard ingest-commit savepoint name length and cover savepoint abort path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a compile-time guard for the "adbc_ingest_commit_" savepoint name length against Postgres NAMEDATALEN-1, mirroring the existing guard on staging table identifiers so a future rename of the prefix cannot silently produce truncated, aliasing savepoint names. Annotate the abort_ingest lambda to document that cleanup errors are intentionally discarded in favor of preserving the first-cause message already stored in the caller's error. Harden the CommitInsideOuterTransactionUsesSavepoint test by explicitly forcing libpq into PQTRANS_INTRANS before Commit so the savepoint branch is exercised even if the driver ever defers BEGIN. Add a new test (CommitFailureInOuterTxnReleasesSavepoint) that triggers an INSERT failure mid-commit while inside an outer transaction, then verifies the outer transaction remains usable and that the driver's savepoint has been RELEASEd (not leaked onto the caller's stack) — covering the failure path of the ROLLBACK TO / RELEASE sequence that the savepoint isolation motivated. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver/postgresql/ingest_partition.cc | 18 +- .../postgresql/partitioned_ingest_test.cc | 179 +++++++++++++++++- 2 files changed, 193 insertions(+), 4 deletions(-) diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc index d819143607..05ce9b6b37 100644 --- a/c/driver/postgresql/ingest_partition.cc +++ b/c/driver/postgresql/ingest_partition.cc @@ -94,10 +94,19 @@ namespace { // staging tables during Abort. constexpr size_t kStagingPrefixLen = 9 + 32 + 1; constexpr size_t kStagingSuffixLen = 16; -constexpr size_t kStagingMaxIdentLen = 63; -static_assert(kStagingPrefixLen + kStagingSuffixLen <= kStagingMaxIdentLen, +constexpr size_t kIdentMaxLen = 63; +static_assert(kStagingPrefixLen + kStagingSuffixLen <= kIdentMaxLen, "staging table name would exceed PostgreSQL NAMEDATALEN-1 and be " "silently truncated"); + +// Commit savepoint name is "adbc_ingest_commit_" (19) + 32-hex handle id. +// Guard against truncation for the same reason: a truncated name would alias +// across concurrent ingest handles on the same connection. +constexpr size_t kCommitSavepointPrefixLen = 19; +constexpr size_t kHexIdLen = 32; +static_assert(kCommitSavepointPrefixLen + kHexIdLen <= kIdentMaxLen, + "ingest commit savepoint name would exceed PostgreSQL " + "NAMEDATALEN-1 and be silently truncated"); } // namespace std::string IngestHandle::StagingPrefix() const { @@ -523,7 +532,10 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( // ROLLBACK TO SAVEPOINT leaves the savepoint on the stack in PG, so also // RELEASE it after rollback to restore the caller's savepoint stack to its - // pre-call shape. Best-effort — errors are ignored. + // pre-call shape. Errors from these cleanup statements are intentionally + // discarded (nullptr error sink): the caller's `error` already holds the + // first-cause message from the failing step, and surfacing a secondary + // cleanup failure would overwrite it with a less useful diagnostic. auto abort_ingest = [&]() { ExecSimple(conn_, rollback_sql, nullptr); if (use_savepoint) { diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc index ce5edd5964..fdf99b6037 100644 --- a/c/driver/postgresql/partitioned_ingest_test.cc +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -347,11 +347,26 @@ TEST_F(PostgresPartitionedIngestTest, CommitInsideOuterTransactionUsesSavepoint) CloseConn(&worker, &error); // Put the coordinator connection into an outer transaction so Commit must - // take the SAVEPOINT branch. + // take the SAVEPOINT branch. Some drivers defer BEGIN to the next statement, + // which would leave libpq in PQTRANS_IDLE and silently take the BEGIN/COMMIT + // branch instead; issue a trivial SELECT to force the transaction open. ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &error), ADBC_STATUS_OK) << error.message; + { + // Run a no-result statement so libpq ends in PQTRANS_INTRANS rather than + // PQTRANS_ACTIVE (which a streaming SELECT would leave behind until the + // cursor is drained). DO $$ BEGIN END $$ executes and returns no rows. + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, "DO $$ BEGIN END $$", &error), + ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error), + ADBC_STATUS_OK) + << error.message; + AdbcStatementRelease(&stmt, &error); + } const uint8_t* rec_ptr = receipt_bytes.data(); size_t rec_len = receipt_bytes.size(); @@ -382,6 +397,168 @@ TEST_F(PostgresPartitionedIngestTest, CommitInsideOuterTransactionUsesSavepoint) CloseConn(&coordinator, &error); } +// When CommitIngestPartitions fails mid-loop while inside an outer +// transaction, the savepoint must be fully unwound: ROLLBACK TO SAVEPOINT +// followed by RELEASE SAVEPOINT. If the RELEASE is missed, the caller's +// savepoint stack leaks an entry, and if the rollback is skipped the outer +// transaction is left in PQTRANS_INERROR. Force a failure by dropping a +// staging table out-of-band before Commit runs — the INSERT FROM that +// staging table will fail — then verify the outer transaction is still +// usable (not in error state) and that the next caller SAVEPOINT of the +// same name succeeds (proving the driver's savepoint was released). +TEST_F(PostgresPartitionedIngestTest, CommitFailureInOuterTxnReleasesSavepoint) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_savepoint_abort_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair coordinator; + OpenConn(&coordinator, &error, uri); + Cleanup(&coordinator, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &coordinator.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // Write one partition on a separate worker connection. + constexpr int32_t kRows = 10; + ConnPair worker; + OpenConn(&worker, &error, uri); + ArrowArrayStream stream{}; + MakeBatchStream(&stream, /*start_id=*/0, kRows); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&worker.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + std::vector receipt_bytes(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + CloseConn(&worker, &error); + + // Derive the staging table name from the receipt wire format (4-byte + // "PIR1" magic + u32 schema len + schema + u32 table len + table + ...). + ASSERT_GE(receipt_bytes.size(), static_cast(4 + 4)); + const uint8_t* rp = receipt_bytes.data() + 4; + uint32_t schema_len = 0; + std::memcpy(&schema_len, rp, sizeof(schema_len)); + rp += 4; + std::string staging_schema(reinterpret_cast(rp), schema_len); + rp += schema_len; + uint32_t tbl_len = 0; + std::memcpy(&tbl_len, rp, sizeof(tbl_len)); + rp += 4; + std::string staging_table(reinterpret_cast(rp), tbl_len); + std::string qualified_staging = + "\"" + staging_schema + "\".\"" + staging_table + "\""; + + // Drop the staging table out-of-band so the Commit loop's INSERT will fail. + { + ConnPair saboteur; + OpenConn(&saboteur, &error, uri); + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&saboteur.conn, &stmt, &error), ADBC_STATUS_OK); + std::string drop_sql = "DROP TABLE " + qualified_staging; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, drop_sql.c_str(), &error), + ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error), + ADBC_STATUS_OK) + << error.message; + AdbcStatementRelease(&stmt, &error); + CloseConn(&saboteur, &error); + } + + // Put the coordinator into an outer transaction and force libpq to + // PQTRANS_INTRANS before calling Commit. + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + ADBC_OPTION_VALUE_DISABLED, &error), + ADBC_STATUS_OK) + << error.message; + { + // Run a no-result statement so libpq ends in PQTRANS_INTRANS rather than + // PQTRANS_ACTIVE (which a streaming SELECT would leave behind until the + // cursor is drained). DO $$ BEGIN END $$ executes and returns no rows. + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, "DO $$ BEGIN END $$", &error), + ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error), + ADBC_STATUS_OK) + << error.message; + AdbcStatementRelease(&stmt, &error); + } + + // Commit must fail, but the outer transaction must remain usable. + const uint8_t* rec_ptr = receipt_bytes.data(); + size_t rec_len = receipt_bytes.size(); + AdbcError commit_err = ADBC_ERROR_INIT; + AdbcStatusCode rc = AdbcConnectionCommitIngestPartitions( + &coordinator.conn, handle.bytes, handle.length, 1, &rec_ptr, &rec_len, + /*rows_affected=*/nullptr, &commit_err); + EXPECT_NE(rc, ADBC_STATUS_OK); + if (commit_err.release) commit_err.release(&commit_err); + + // Post-failure, the outer transaction must still be usable: a SELECT must + // succeed (would be rejected with "current transaction is aborted" if the + // savepoint rollback was skipped). + { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, "SELECT 2", &error), ADBC_STATUS_OK); + ArrowArrayStream post{}; + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, &post, nullptr, &error), ADBC_STATUS_OK) + << error.message; + post.release(&post); + AdbcStatementRelease(&stmt, &error); + } + + // The driver's savepoint must have been RELEASEd (not left on the stack): + // if it were still live, a caller SAVEPOINT of the same name would still + // work, so instead prove release by issuing a ROLLBACK TO on the driver's + // savepoint name and expecting it to fail ("savepoint does not exist"). + // That asserts the savepoint is no longer on the stack after abort. + { + ASSERT_GE(handle.length, static_cast(4 + 16)); + std::string driver_savepoint = "adbc_ingest_commit_"; + static const char kHex[] = "0123456789abcdef"; + for (size_t i = 0; i < 16; i++) { + uint8_t b = handle.bytes[4 + i]; + driver_savepoint += kHex[b >> 4]; + driver_savepoint += kHex[b & 0x0F]; + } + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + std::string sql = "ROLLBACK TO SAVEPOINT " + driver_savepoint; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, sql.c_str(), &error), ADBC_STATUS_OK); + AdbcError probe_err = ADBC_ERROR_INIT; + AdbcStatusCode probe_rc = + AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &probe_err); + EXPECT_NE(probe_rc, ADBC_STATUS_OK) + << "driver savepoint was not released after commit failure"; + if (probe_err.release) probe_err.release(&probe_err); + AdbcStatementRelease(&stmt, &error); + } + + // Cleanly commit the (now empty) outer transaction. + AdbcConnectionRollback(&coordinator.conn, &error); + AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, "true", + &error); + + // Best-effort cleanup of any remaining staging tables. + AdbcConnectionAbortIngestPartitions(&coordinator.conn, handle.bytes, handle.length, 0, + nullptr, nullptr, &error); + + handle.release(&handle); + Cleanup(&coordinator, table, &error); + CloseConn(&coordinator, &error); +} + // Calling CommitIngestPartitions while the connection is in an aborted // transaction must fail cleanly with ADBC_STATUS_INVALID_STATE instead of // issuing SQL that would further mutate caller transaction state. From 98d3699a8f6f835324323476814ba3ae7a6da522 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 12:17:14 +0400 Subject: [PATCH 08/11] test(driver_manager): pin ingest-wrapper NULL-arg guards with valid connection The existing dispatch test only exercised the new handle/data/receipts NULL guards via a NULL connection, so a future reorder that hides those checks behind the connection check would not regress the suite. Add a per-wrapper case that supplies a valid connection but NULL handle/data/out_handle/ out_receipt (and num_receipts > 0 with NULL receipts/receipt_lens for Commit/Abort) to pin each guard independently. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../adbc_version_100_compatibility_test.cc | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/c/driver_manager/adbc_version_100_compatibility_test.cc b/c/driver_manager/adbc_version_100_compatibility_test.cc index bc82cfa3bb..62b48f04e6 100644 --- a/c/driver_manager/adbc_version_100_compatibility_test.cc +++ b/c/driver_manager/adbc_version_100_compatibility_test.cc @@ -221,6 +221,83 @@ TEST_F(AdbcVersion, IngestPartitionsWrappersDispatchToStubs) { connection.private_driver = nullptr; } +// With a valid connection, the public wrappers must still reject NULL +// handle/data/out_handle/out_receipt and (when num_receipts > 0) NULL +// receipts/receipt_lens with ADBC_STATUS_INVALID_ARGUMENT, so a future reorder +// that hides these guards behind the connection check is caught here. +TEST_F(AdbcVersion, IngestPartitionsWrappersGuardNullArgsWithValidConnection) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, ADBC_VERSION_1_2_0, + &driver, &error), + IsOkStatus(&error)); + + struct AdbcConnection connection = {}; + connection.private_driver = &driver; + + uint8_t handle_bytes[1] = {0}; + struct ArrowArrayStream stream = {}; + struct ArrowSchema schema = {}; + struct AdbcIngestReceipt out_receipt = {}; + int64_t rows_affected = 0; + struct AdbcError local_error = {}; + + // Begin: NULL out_handle. + EXPECT_THAT(AdbcConnectionBeginIngestPartitions(&connection, nullptr, nullptr, "t", + ADBC_INGEST_OPTION_MODE_CREATE, &schema, + /*out_handle=*/nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Write: NULL handle. + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, /*handle=*/nullptr, 0, + &stream, &out_receipt, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Write: NULL data. + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, handle_bytes, 1, + /*data=*/nullptr, &out_receipt, + &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Write: NULL out_receipt. + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, handle_bytes, 1, &stream, + /*out_receipt=*/nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Commit: NULL handle. + EXPECT_THAT( + AdbcConnectionCommitIngestPartitions(&connection, /*handle=*/nullptr, 0, 0, nullptr, + nullptr, &rows_affected, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Commit: num_receipts > 0 with NULL receipts/receipt_lens. + EXPECT_THAT( + AdbcConnectionCommitIngestPartitions(&connection, handle_bytes, 1, /*num=*/1, + /*receipts=*/nullptr, /*lens=*/nullptr, + &rows_affected, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Abort: NULL handle. + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(&connection, /*handle=*/nullptr, 0, 0, + nullptr, nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Abort: num_receipts > 0 with NULL receipts/receipt_lens. + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(&connection, handle_bytes, 1, /*num=*/1, + /*receipts=*/nullptr, + /*lens=*/nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Don't let TearDown observe a borrowed driver pointer. + connection.private_driver = nullptr; +} + // AdbcLoadDriverFromInitFunc must reject unrecognized version constants with // ADBC_STATUS_NOT_IMPLEMENTED so a typo in kSupportedVersions cannot silently // regress version gating. From 088e11415ab37b08a573cf564bc812abddb4b8c4 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 12:23:51 +0400 Subject: [PATCH 09/11] refactor(postgresql): share ingest-commit savepoint name with tests Make IngestHandle::kCommitSavepointPrefix and an inline HexId16 helper the single source of truth for the commit-savepoint name, defined header-inline so tests linking against the shared driver library can reuse them. The NAMEDATALEN-1 static_assert now guards the same constant the construction site uses, so a future rename can no longer drift past the assert. Tighten the savepoint-abort regression test accordingly: it now builds the probed savepoint name from kCommitSavepointPrefix + HexId16 instead of duplicating the literal and hex encoding, bounds-checks the receipt-decoded schema/table lengths before advancing the read pointer, and explicitly recovers the outer transaction (asserting the rollback succeeds) after the ROLLBACK TO probe leaves libpq in PQTRANS_INERROR. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver/postgresql/ingest_partition.cc | 13 +++-- c/driver/postgresql/ingest_partition.h | 27 +++++++++ .../postgresql/partitioned_ingest_test.cc | 58 +++++++++++++------ 3 files changed, 74 insertions(+), 24 deletions(-) diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc index 05ce9b6b37..5ab0eeebd9 100644 --- a/c/driver/postgresql/ingest_partition.cc +++ b/c/driver/postgresql/ingest_partition.cc @@ -99,12 +99,13 @@ static_assert(kStagingPrefixLen + kStagingSuffixLen <= kIdentMaxLen, "staging table name would exceed PostgreSQL NAMEDATALEN-1 and be " "silently truncated"); -// Commit savepoint name is "adbc_ingest_commit_" (19) + 32-hex handle id. -// Guard against truncation for the same reason: a truncated name would alias -// across concurrent ingest handles on the same connection. -constexpr size_t kCommitSavepointPrefixLen = 19; +// Commit savepoint name is IngestHandle::kCommitSavepointPrefix + 32-hex +// handle id. Guard against truncation for the same reason: a truncated name +// would alias across concurrent ingest handles on the same connection. The +// assert references the same constant CommitSavepointName() builds from, so a +// future rename cannot drift past the assert. constexpr size_t kHexIdLen = 32; -static_assert(kCommitSavepointPrefixLen + kHexIdLen <= kIdentMaxLen, +static_assert(IngestHandle::kCommitSavepointPrefix.size() + kHexIdLen <= kIdentMaxLen, "ingest commit savepoint name would exceed PostgreSQL " "NAMEDATALEN-1 and be silently truncated"); } // namespace @@ -521,7 +522,7 @@ AdbcStatusCode PostgresConnection::CommitIngestPartitions( // Derive a unique savepoint name from the handle's ingest_id so the driver // cannot collide with a caller-managed savepoint of the same name. Only used // when use_savepoint is true, but cheap to compute unconditionally. - const std::string savepoint_name = "adbc_ingest_commit_" + HexId(handle.ingest_id); + const std::string savepoint_name = handle.CommitSavepointName(); const std::string open_sql = use_savepoint ? std::string("SAVEPOINT ") + savepoint_name : "BEGIN"; const std::string commit_sql = diff --git a/c/driver/postgresql/ingest_partition.h b/c/driver/postgresql/ingest_partition.h index 707fe69616..9a26d0ab5b 100644 --- a/c/driver/postgresql/ingest_partition.h +++ b/c/driver/postgresql/ingest_partition.h @@ -13,16 +13,35 @@ #include #include #include +#include #include #include namespace adbcpq { +namespace internal { +// Hex-encode a 16-byte ingest id. Header-inline so tests linking against the +// shared driver library (which hides internal symbols) can reuse it. +inline std::string HexId16(const std::array& id) { + static constexpr char kHex[] = "0123456789abcdef"; + std::string s(32, '0'); + for (size_t i = 0; i < 16; i++) { + s[2 * i] = kHex[id[i] >> 4]; + s[2 * i + 1] = kHex[id[i] & 0x0F]; + } + return s; +} +} // namespace internal + // Wire format for the partitioned-ingest handle. Opaque to callers; symmetric // across coordinator and workers. struct IngestHandle { static constexpr std::array kMagic = {'P', 'I', 'H', '1'}; + // Prefix for the savepoint Commit installs around its INSERT loop when the + // caller is already inside a transaction. Exposed so tests can reproduce the + // exact name without duplicating the literal. + static constexpr std::string_view kCommitSavepointPrefix = "adbc_ingest_commit_"; std::array ingest_id; std::string catalog; @@ -33,6 +52,14 @@ struct IngestHandle { // Driver-internal — used by Abort to enumerate orphans. std::string StagingPrefix() const; + // Savepoint name used by Commit when wrapping the INSERT loop in a + // caller-provided transaction. Single source of truth shared between the + // driver and tests; defined inline so tests don't need to link against the + // driver-internal symbol. + std::string CommitSavepointName() const { + return std::string(kCommitSavepointPrefix) + internal::HexId16(ingest_id); + } + size_t SerializedSize() const; void Serialize(uint8_t* out) const; static AdbcStatusCode Parse(const uint8_t* bytes, size_t len, IngestHandle* out, diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc index fdf99b6037..a44e6e6ef4 100644 --- a/c/driver/postgresql/partitioned_ingest_test.cc +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -23,6 +23,7 @@ #include #include +#include "postgresql/ingest_partition.h" #include "validation/adbc_validation_util.h" namespace { @@ -444,16 +445,25 @@ TEST_F(PostgresPartitionedIngestTest, CommitFailureInOuterTxnReleasesSavepoint) // Derive the staging table name from the receipt wire format (4-byte // "PIR1" magic + u32 schema len + schema + u32 table len + table + ...). + // The production wire format uses host-endian u32 (see WriteU32/ReadU32 in + // ingest_partition.cc) — match that here, with strict bounds checks so a + // malformed length cannot run rp past the end of the buffer. + auto receipt_remaining = [&](const uint8_t* rp) -> size_t { + return receipt_bytes.size() - static_cast(rp - receipt_bytes.data()); + }; ASSERT_GE(receipt_bytes.size(), static_cast(4 + 4)); const uint8_t* rp = receipt_bytes.data() + 4; uint32_t schema_len = 0; std::memcpy(&schema_len, rp, sizeof(schema_len)); - rp += 4; + rp += sizeof(schema_len); + ASSERT_LE(schema_len, receipt_remaining(rp)); std::string staging_schema(reinterpret_cast(rp), schema_len); rp += schema_len; + ASSERT_GE(receipt_remaining(rp), sizeof(uint32_t)); uint32_t tbl_len = 0; std::memcpy(&tbl_len, rp, sizeof(tbl_len)); - rp += 4; + rp += sizeof(tbl_len); + ASSERT_LE(tbl_len, receipt_remaining(rp)); std::string staging_table(reinterpret_cast(rp), tbl_len); std::string qualified_staging = "\"" + staging_schema + "\".\"" + staging_table + "\""; @@ -522,16 +532,17 @@ TEST_F(PostgresPartitionedIngestTest, CommitFailureInOuterTxnReleasesSavepoint) // if it were still live, a caller SAVEPOINT of the same name would still // work, so instead prove release by issuing a ROLLBACK TO on the driver's // savepoint name and expecting it to fail ("savepoint does not exist"). - // That asserts the savepoint is no longer on the stack after abort. + // That asserts the savepoint is no longer on the stack after abort. Build + // the name from IngestHandle::kCommitSavepointPrefix and the inline HexId16 + // helper so the probe shares the literal and the encoding with the driver + // — a future rename forces this test to update too. + ASSERT_GE(handle.length, static_cast(4 + 16)); + std::array ingest_id{}; + std::memcpy(ingest_id.data(), handle.bytes + 4, ingest_id.size()); + const std::string driver_savepoint = + std::string(adbcpq::IngestHandle::kCommitSavepointPrefix) + + adbcpq::internal::HexId16(ingest_id); { - ASSERT_GE(handle.length, static_cast(4 + 16)); - std::string driver_savepoint = "adbc_ingest_commit_"; - static const char kHex[] = "0123456789abcdef"; - for (size_t i = 0; i < 16; i++) { - uint8_t b = handle.bytes[4 + i]; - driver_savepoint += kHex[b >> 4]; - driver_savepoint += kHex[b & 0x0F]; - } AdbcStatement stmt{}; ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); std::string sql = "ROLLBACK TO SAVEPOINT " + driver_savepoint; @@ -545,14 +556,25 @@ TEST_F(PostgresPartitionedIngestTest, CommitFailureInOuterTxnReleasesSavepoint) AdbcStatementRelease(&stmt, &error); } - // Cleanly commit the (now empty) outer transaction. - AdbcConnectionRollback(&coordinator.conn, &error); - AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, "true", - &error); + // The failed ROLLBACK TO above pushed libpq into PQTRANS_INERROR. Recover by + // explicitly rolling back the outer transaction; the rollback must succeed + // for AbortIngestPartitions below to operate on a clean connection. + ASSERT_EQ(AdbcConnectionRollback(&coordinator.conn, &error), ADBC_STATUS_OK) + << error.message; + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + "true", &error), + ADBC_STATUS_OK) + << error.message; - // Best-effort cleanup of any remaining staging tables. - AdbcConnectionAbortIngestPartitions(&coordinator.conn, handle.bytes, handle.length, 0, - nullptr, nullptr, &error); + // Best-effort cleanup of any remaining staging tables. The saboteur dropped + // the only staging table out-of-band, so this should find nothing — but call + // it anyway to exercise the abort path on a connection that just unwound an + // error. + ASSERT_EQ(AdbcConnectionAbortIngestPartitions(&coordinator.conn, handle.bytes, + handle.length, 0, nullptr, nullptr, + &error), + ADBC_STATUS_OK) + << error.message; handle.release(&handle); Cleanup(&coordinator, table, &error); From 97ba1848393fd6178f256150273ffc9062f8bd86 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sat, 18 Apr 2026 12:33:21 +0400 Subject: [PATCH 10/11] refactor(postgresql): collapse ingest hex encoders to internal::HexId16 Replace the .cc-local HexId with the header-inline internal::HexId16 in StagingPrefix so both call sites share one implementation, and derive kHexIdLen from sizeof(IngestHandle::ingest_id) so the NAMEDATALEN assert cannot drift if the id width ever changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- c/driver/postgresql/ingest_partition.cc | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc index 5ab0eeebd9..2ce6f9ea1d 100644 --- a/c/driver/postgresql/ingest_partition.cc +++ b/c/driver/postgresql/ingest_partition.cc @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -66,16 +67,6 @@ bool ReadString(const uint8_t** p, const uint8_t* end, std::string* out) { return true; } -std::string HexId(const std::array& id) { - static const char kHex[] = "0123456789abcdef"; - std::string s(32, '0'); - for (size_t i = 0; i < 16; i++) { - s[2 * i] = kHex[id[i] >> 4]; - s[2 * i + 1] = kHex[id[i] & 0x0F]; - } - return s; -} - } // namespace void IngestHandle::GenerateId(std::array* out) { @@ -99,19 +90,19 @@ static_assert(kStagingPrefixLen + kStagingSuffixLen <= kIdentMaxLen, "staging table name would exceed PostgreSQL NAMEDATALEN-1 and be " "silently truncated"); -// Commit savepoint name is IngestHandle::kCommitSavepointPrefix + 32-hex -// handle id. Guard against truncation for the same reason: a truncated name -// would alias across concurrent ingest handles on the same connection. The -// assert references the same constant CommitSavepointName() builds from, so a -// future rename cannot drift past the assert. -constexpr size_t kHexIdLen = 32; +// Commit savepoint name is IngestHandle::kCommitSavepointPrefix + a hex- +// encoded ingest_id. Guard against truncation for the same reason: a truncated +// name would alias across concurrent ingest handles on the same connection. +// The assert references the same constants CommitSavepointName() builds from, +// so a future rename or width change cannot drift past the assert. +constexpr size_t kHexIdLen = 2 * std::tuple_size_v; static_assert(IngestHandle::kCommitSavepointPrefix.size() + kHexIdLen <= kIdentMaxLen, "ingest commit savepoint name would exceed PostgreSQL " "NAMEDATALEN-1 and be silently truncated"); } // namespace std::string IngestHandle::StagingPrefix() const { - return "adbc_stg_" + HexId(ingest_id) + "_"; + return "adbc_stg_" + internal::HexId16(ingest_id) + "_"; } size_t IngestHandle::SerializedSize() const { From 95d027c9c95aff6f93e3af0ffdd53c950d93fe69 Mon Sep 17 00:00:00 2001 From: tokoko Date: Sun, 17 May 2026 11:11:19 +0400 Subject: [PATCH 11/11] partitioned ingest spec --- .../source/format/partitioned_bulk_ingest.rst | 53 ++++--------------- 1 file changed, 10 insertions(+), 43 deletions(-) diff --git a/docs/source/format/partitioned_bulk_ingest.rst b/docs/source/format/partitioned_bulk_ingest.rst index 99fa468a7e..b8a90aee44 100644 --- a/docs/source/format/partitioned_bulk_ingest.rst +++ b/docs/source/format/partitioned_bulk_ingest.rst @@ -147,11 +147,11 @@ hold driver-owned memory that callers release locally. Driver-side semantics --------------------- -- **Begin** validates options, creates the target table for - ``create``/``replace``/``create_append`` modes, and returns a handle - that encodes whatever state the driver needs to scope subsequent - writes (UUID, target catalog/schema/table, transaction id, object - store prefix, etc.). +- **Begin** validates options, performs whatever setup the driver + requires for writes to proceed (e.g., creating the target table for + ``create``/``replace``/``create_append`` modes, reserving a + transaction snapshot, allocating an object-store prefix), and returns + a handle that encodes the state needed to scope subsequent writes. - **Write** takes a handle and a stream, writes the partition into driver-private staging (a per-write staging table, a per-write object-store path), and returns a receipt encoding what was @@ -221,20 +221,8 @@ For ``append`` mode, the schema parameter is optional; if supplied it is validated against the target so a thousand workers don't all fail independently with the same schema-mismatch error. -3. No caller-supplied partition IDs ------------------------------------ - -Earlier drafts gave each ``Write`` a caller-supplied ``partition_id`` -for idempotent retry. Dropped: receipts are the source of truth for -what gets committed, and well-designed drivers write each ``Write`` -to a unique location (per-call staging table, per-call data file). -A retried ``Write`` produces a *new* receipt; the original write -becomes orphaned and is collected by ``Abort``. Caller-supplied IDs -only matter for drivers that share staging across writes — which they -shouldn't. - -4. Driver-owned output structs (handle, receipt) ------------------------------------------------- +3. Driver-owned output structs (handle, receipt) +------------------------------------------------- An earlier draft used the ``GetOptionBytes`` two-phase sizing pattern: caller passes a buffer + capacity, driver reports required @@ -249,7 +237,7 @@ The chosen pattern (driver-owned struct with a release callback) mirrors ``AdbcPartitions`` on the read side, eliminates the orphan window, and gives drivers a clean place to free internal state. -5. ``Commit`` and ``Abort`` take raw bytes, not structs +4. ``Commit`` and ``Abort`` take raw bytes, not structs ------------------------------------------------------- Symmetric with ``AdbcConnectionReadPartition``, which takes the raw @@ -259,7 +247,7 @@ arrive as raw bytes; forcing the caller to wrap them in ``AdbcIngestReceipt`` structs (with bogus ``release`` callbacks) would be friction without benefit. -6. Lost receipts are handled by handle-scoped sweep, not by receipts +5. Lost receipts are handled by handle-scoped sweep, not by receipts -------------------------------------------------------------------- If a worker writes data but its receipt is lost in transit, the @@ -276,7 +264,7 @@ optimization (fast-path deletion of known writes); the handle is the authority for cleanup scope. Drivers that cannot enumerate from the handle alone cannot correctly implement partitioned ingest. -7. Coordinator may die without calling ``Commit`` or ``Abort`` +6. Coordinator may die without calling ``Commit`` or ``Abort`` -------------------------------------------------------------- The handle is opaque to the driver outside of ``Write``, so the @@ -292,27 +280,6 @@ behaviors: The spec does not mandate any of these; it documents the failure mode and leaves the policy to drivers. -Open questions -============== - -These are intentionally unresolved in the initial revision. - -- **Single-coordinator commit only.** Two coordinators calling - ``Commit`` on the same handle concurrently is undefined. Should - drivers be required to detect and reject this, or is it the - caller's responsibility? -- **Subset writes.** Today the prototype assumes each worker writes - the same column set. Receipts encode the column list, so it is - possible to support per-worker subsets in the future, but this is - not specified yet. -- **Append-mode schema validation.** The schema parameter is - optional in ``append`` mode. Should drivers be *required* to - validate when a schema is supplied? Currently "should". -- **Streaming Commit.** Today ``Commit`` takes all receipts at - once. For very-many-partition jobs (10k+ workers) it may be - preferable to incrementally accumulate receipts. Out of scope for - v1. - Reference implementation ========================