diff --git a/c/driver/postgresql/copy/postgres_copy_writer_test.cc b/c/driver/postgresql/copy/postgres_copy_writer_test.cc index f38bb686c3..8ca9915a2a 100644 --- a/c/driver/postgresql/copy/postgres_copy_writer_test.cc +++ b/c/driver/postgresql/copy/postgres_copy_writer_test.cc @@ -1587,6 +1587,121 @@ TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListInteger) { } } +// Regression test for https://github.com/apache/arrow-adbc/issues/4319. +// When the source array has offset > 0 (a sliced parent), the list writer +// must read child offsets at (array_view->offset + index), not at index. +// Writing rows 3..5 of a 6-row source via offset/length must produce the +// same body as writing those rows as a fresh 3-row array. +TEST_P(PostgresCopyListTest, PostgresCopyWriteListSlicedMatchesDirect) { + adbc_validation::Handle schema; + adbc_validation::Handle source; + adbc_validation::Handle tail; + struct ArrowError na_error; + + ASSERT_EQ(adbc_validation::MakeSchema( + &schema.value, {adbc_validation::SchemaField::Nested( + "col", GetParam(), {{"item", NANOARROW_TYPE_INT32}})}), + ADBC_STATUS_OK); + + ASSERT_EQ( + adbc_validation::MakeBatch>( + &schema.value, &source.value, &na_error, + {std::vector{1, 2}, std::vector{3, 4, 5}, std::nullopt, + std::vector{6}, std::vector{7, 8}, std::vector{9}}), + ADBC_STATUS_OK); + + ASSERT_EQ( + adbc_validation::MakeBatch>( + &schema.value, &tail.value, &na_error, + {std::vector{6}, std::vector{7, 8}, std::vector{9}}), + ADBC_STATUS_OK); + + PostgresCopyStreamWriteTester ref_tester; + ASSERT_EQ(ref_tester.Init(&schema.value, &tail.value, *type_resolver_), NANOARROW_OK); + ASSERT_EQ(ref_tester.WriteAll(nullptr), ENODATA); + const struct ArrowBuffer ref_buf = ref_tester.WriteBuffer(); + + // Slice: hide the first 3 rows by setting offset/length on the struct + // root and on the list-typed column. nanoarrow does not propagate the + // struct's offset down to children, so both levels need updating. + source->offset = 0; + source->length = 3; + source->children[0]->offset = 3; + source->children[0]->length = 3; + + PostgresCopyStreamWriteTester sliced_tester; + ASSERT_EQ(sliced_tester.Init(&schema.value, &source.value, *type_resolver_), + NANOARROW_OK); + ASSERT_EQ(sliced_tester.WriteAll(nullptr), ENODATA); + const struct ArrowBuffer sliced_buf = sliced_tester.WriteBuffer(); + + ASSERT_EQ(sliced_buf.size_bytes, ref_buf.size_bytes); + for (int64_t i = 0; i < sliced_buf.size_bytes; i++) { + ASSERT_EQ(sliced_buf.data[i], ref_buf.data[i]) << "failure at index " << i; + } +} + +// Same regression check for FIXED_SIZE_LIST, which takes the +// IsFixedSize=true branch in PostgresCopyListFieldWriter. +TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListSlicedMatchesDirect) { + adbc_validation::Handle schema; + adbc_validation::Handle source; + adbc_validation::Handle tail; + struct ArrowError na_error; + + // Two FIXED_SIZE_LIST schemas of size 2 — one for the 6-row source, one + // for the 3-row reference. Both are independently allocated because + // MakeBatch consumes the schema state. + auto build_schema = [](struct ArrowSchema* out) { + ASSERT_EQ(ArrowSchemaInitFromType(out, NANOARROW_TYPE_STRUCT), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaAllocateChildren(out, 1), NANOARROW_OK); + ArrowSchemaInit(out->children[0]); + ASSERT_EQ( + ArrowSchemaSetTypeFixedSize(out->children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2), + NANOARROW_OK); + ASSERT_EQ(ArrowSchemaSetName(out->children[0], "col"), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaSetType(out->children[0]->children[0], NANOARROW_TYPE_INT32), + NANOARROW_OK); + }; + + adbc_validation::Handle tail_schema; + build_schema(&schema.value); + build_schema(&tail_schema.value); + + ASSERT_EQ(adbc_validation::MakeBatch>( + &schema.value, &source.value, &na_error, + {std::vector{1, 2}, std::vector{3, 4}, std::nullopt, + std::vector{5, 6}, std::vector{7, 8}, std::nullopt}), + ADBC_STATUS_OK); + + ASSERT_EQ(adbc_validation::MakeBatch>( + &tail_schema.value, &tail.value, &na_error, + {std::vector{5, 6}, std::vector{7, 8}, std::nullopt}), + ADBC_STATUS_OK); + + PostgresCopyStreamWriteTester ref_tester; + ASSERT_EQ(ref_tester.Init(&tail_schema.value, &tail.value, *type_resolver_), + NANOARROW_OK); + ASSERT_EQ(ref_tester.WriteAll(nullptr), ENODATA); + const struct ArrowBuffer ref_buf = ref_tester.WriteBuffer(); + + source->offset = 0; + source->length = 3; + source->children[0]->offset = 3; + source->children[0]->length = 3; + + PostgresCopyStreamWriteTester sliced_tester; + ASSERT_EQ(sliced_tester.Init(&schema.value, &source.value, *type_resolver_), + NANOARROW_OK); + ASSERT_EQ(sliced_tester.WriteAll(nullptr), ENODATA); + const struct ArrowBuffer sliced_buf = sliced_tester.WriteBuffer(); + + ASSERT_EQ(sliced_buf.size_bytes, ref_buf.size_bytes); + for (int64_t i = 0; i < sliced_buf.size_bytes; i++) { + ASSERT_EQ(sliced_buf.data[i], ref_buf.data[i]) << "failure at index " << i; + } +} + TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) { // Regression test for https://github.com/apache/arrow-adbc/issues/1310 adbc_validation::Handle schema; diff --git a/c/driver/postgresql/copy/writer.h b/c/driver/postgresql/copy/writer.h index 512a29afee..eab744fe88 100644 --- a/c/driver/postgresql/copy/writer.h +++ b/c/driver/postgresql/copy/writer.h @@ -641,12 +641,13 @@ class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter { // TODO: the LARGE_LIST should use 64 bit indexes int32_t start, end; + const int64_t logical = array_view_->offset + index; if constexpr (IsFixedSize) { - start = index * array_view_->layout.child_size_elements; + start = logical * array_view_->layout.child_size_elements; end = start + array_view_->layout.child_size_elements; } else { - start = ArrowArrayViewListChildOffset(array_view_, index); - end = ArrowArrayViewListChildOffset(array_view_, index + 1); + start = ArrowArrayViewListChildOffset(array_view_, logical); + end = ArrowArrayViewListChildOffset(array_view_, logical + 1); } const int32_t dim = end - start; diff --git a/c/driver/postgresql/postgresql_test.cc b/c/driver/postgresql/postgresql_test.cc index d457a14a00..6f6b0fd1ce 100644 --- a/c/driver/postgresql/postgresql_test.cc +++ b/c/driver/postgresql/postgresql_test.cc @@ -1892,6 +1892,103 @@ TEST_F(PostgresStatementTest, EmptyStringAndBinaryParameter) { ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } +// Regression test for https://github.com/apache/arrow-adbc/issues/4319. +// The parameterized prepared-statement path (BindAndExecuteCurrentRow) +// constructs its field writers via the same MakeCopyFieldWriter factory +// as the COPY ingest path, so the offset bug in PostgresCopyListFieldWriter +// drifts list-typed bound parameters the same way. This exercises the bind +// path end-to-end with an INSERT … ON CONFLICT DO UPDATE upsert whose list +// parameter comes from a sliced Arrow array (parent.offset > 0). +TEST_F(PostgresStatementTest, BindUpsertWithSlicedListParameter) { + ASSERT_THAT(quirks()->DropTable(&connection, "adbc_test", &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + + ASSERT_THAT( + AdbcStatementSetSqlQuery( + &statement, "CREATE TABLE adbc_test (id INT PRIMARY KEY, tags TEXT[] NOT NULL)", + &error), + IsOkStatus(&error)); + { + adbc_validation::StreamReader reader; + ASSERT_THAT( + AdbcStatementExecuteQuery(&statement, &reader.stream.value, nullptr, &error), + IsOkStatus(&error)); + } + + // 6-row struct(int32, list) where odd rows have 1-element tags + // and even rows have 2-element tags. The size difference makes drift + // structurally observable, not just value-different. + adbc_validation::Handle bind_schema; + adbc_validation::Handle bind; + struct ArrowError na_error; + ASSERT_EQ(adbc_validation::MakeSchema( + &bind_schema.value, + {{"id", NANOARROW_TYPE_INT32}, + adbc_validation::SchemaField::Nested( + "tags", NANOARROW_TYPE_LIST, {{"item", NANOARROW_TYPE_STRING}})}), + ADBC_STATUS_OK); + + ASSERT_EQ( + (adbc_validation::MakeBatch>( + &bind_schema.value, &bind.value, &na_error, {0, 1, 2, 3, 4, 5}, + {std::vector{"r0a", "r0b"}, std::vector{"r1x"}, + std::vector{"r2a", "r2b"}, std::vector{"r3x"}, + std::vector{"r4a", "r4b"}, std::vector{"r5x"}})), + ADBC_STATUS_OK); + + // Hide rows 0..2 by setting offset/length on the struct root and on + // each child column (nanoarrow does not propagate the struct's offset). + bind->offset = 0; + bind->length = 3; + bind->children[0]->offset = 3; + bind->children[0]->length = 3; + bind->children[1]->offset = 3; + bind->children[1]->length = 3; + + ASSERT_THAT( + AdbcStatementSetSqlQuery(&statement, + "INSERT INTO adbc_test (id, tags) VALUES ($1, $2) " + "ON CONFLICT (id) DO UPDATE SET tags = EXCLUDED.tags", + &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementBind(&statement, &bind.value, &bind_schema.value, &error), + IsOkStatus(&error)); + { + adbc_validation::StreamReader reader; + ASSERT_THAT( + AdbcStatementExecuteQuery(&statement, &reader.stream.value, nullptr, &error), + IsOkStatus(&error)); + } + + // SELECT array_length(tags, 1) per id. Expected: id=3→1, id=4→2, id=5→1. + // Under the bug, ids 3..5 would receive tags from underlying rows 0..2 + // and report lengths 2/1/2 instead. + ASSERT_THAT( + AdbcStatementSetSqlQuery( + &statement, + "SELECT id, array_length(tags, 1) AS len FROM adbc_test ORDER BY id", &error), + IsOkStatus(&error)); + adbc_validation::StreamReader reader; + ASSERT_THAT( + AdbcStatementExecuteQuery(&statement, &reader.stream.value, nullptr, &error), + IsOkStatus(&error)); + ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); + ASSERT_NO_FATAL_FAILURE(reader.Next()); + ASSERT_NE(reader.array->release, nullptr); + ASSERT_EQ(reader.array->length, 3); + + // id column (int32) + ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[0], 0), 3); + ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[0], 1), 4); + ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[0], 2), 5); + // len column (int32 — PostgreSQL array_length returns int4) + ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[1], 0), 1); + ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[1], 1), 2); + ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[1], 2), 1); + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + TEST_F(PostgresStatementTest, SqlExecuteCopyZeroRowOutputError) { ASSERT_THAT(quirks()->DropTable(&connection, "adbc_test", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));