Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,121 @@ TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListInteger) {
}
}

// Regression test for https://github.com/apache/arrow-adbc/issues/4319.
// When the source array has offset > 0 (a sliced parent), the list writer
// must read child offsets at (array_view->offset + index), not at index.
// Writing rows 3..5 of a 6-row source via offset/length must produce the
// same body as writing those rows as a fresh 3-row array.
TEST_P(PostgresCopyListTest, PostgresCopyWriteListSlicedMatchesDirect) {
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> source;
adbc_validation::Handle<struct ArrowArray> tail;
struct ArrowError na_error;

ASSERT_EQ(adbc_validation::MakeSchema(
&schema.value, {adbc_validation::SchemaField::Nested(
"col", GetParam(), {{"item", NANOARROW_TYPE_INT32}})}),
ADBC_STATUS_OK);

ASSERT_EQ(
adbc_validation::MakeBatch<std::vector<int32_t>>(
&schema.value, &source.value, &na_error,
{std::vector<int32_t>{1, 2}, std::vector<int32_t>{3, 4, 5}, std::nullopt,
std::vector<int32_t>{6}, std::vector<int32_t>{7, 8}, std::vector<int32_t>{9}}),
ADBC_STATUS_OK);

ASSERT_EQ(
adbc_validation::MakeBatch<std::vector<int32_t>>(
&schema.value, &tail.value, &na_error,
{std::vector<int32_t>{6}, std::vector<int32_t>{7, 8}, std::vector<int32_t>{9}}),
ADBC_STATUS_OK);

PostgresCopyStreamWriteTester ref_tester;
ASSERT_EQ(ref_tester.Init(&schema.value, &tail.value, *type_resolver_), NANOARROW_OK);
ASSERT_EQ(ref_tester.WriteAll(nullptr), ENODATA);
const struct ArrowBuffer ref_buf = ref_tester.WriteBuffer();

// Slice: hide the first 3 rows by setting offset/length on the struct
// root and on the list-typed column. nanoarrow does not propagate the
// struct's offset down to children, so both levels need updating.
source->offset = 0;
source->length = 3;
source->children[0]->offset = 3;
source->children[0]->length = 3;

PostgresCopyStreamWriteTester sliced_tester;
ASSERT_EQ(sliced_tester.Init(&schema.value, &source.value, *type_resolver_),
NANOARROW_OK);
ASSERT_EQ(sliced_tester.WriteAll(nullptr), ENODATA);
const struct ArrowBuffer sliced_buf = sliced_tester.WriteBuffer();

ASSERT_EQ(sliced_buf.size_bytes, ref_buf.size_bytes);
for (int64_t i = 0; i < sliced_buf.size_bytes; i++) {
ASSERT_EQ(sliced_buf.data[i], ref_buf.data[i]) << "failure at index " << i;
}
}

// Same regression check for FIXED_SIZE_LIST, which takes the
// IsFixedSize=true branch in PostgresCopyListFieldWriter.
TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListSlicedMatchesDirect) {
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> source;
adbc_validation::Handle<struct ArrowArray> tail;
struct ArrowError na_error;

// Two FIXED_SIZE_LIST schemas of size 2 — one for the 6-row source, one
// for the 3-row reference. Both are independently allocated because
// MakeBatch consumes the schema state.
auto build_schema = [](struct ArrowSchema* out) {
ASSERT_EQ(ArrowSchemaInitFromType(out, NANOARROW_TYPE_STRUCT), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaAllocateChildren(out, 1), NANOARROW_OK);
ArrowSchemaInit(out->children[0]);
ASSERT_EQ(
ArrowSchemaSetTypeFixedSize(out->children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2),
NANOARROW_OK);
ASSERT_EQ(ArrowSchemaSetName(out->children[0], "col"), NANOARROW_OK);
ASSERT_EQ(ArrowSchemaSetType(out->children[0]->children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
};

adbc_validation::Handle<struct ArrowSchema> tail_schema;
build_schema(&schema.value);
build_schema(&tail_schema.value);

ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int32_t>>(
&schema.value, &source.value, &na_error,
{std::vector<int32_t>{1, 2}, std::vector<int32_t>{3, 4}, std::nullopt,
std::vector<int32_t>{5, 6}, std::vector<int32_t>{7, 8}, std::nullopt}),
ADBC_STATUS_OK);

ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int32_t>>(
&tail_schema.value, &tail.value, &na_error,
{std::vector<int32_t>{5, 6}, std::vector<int32_t>{7, 8}, std::nullopt}),
ADBC_STATUS_OK);

PostgresCopyStreamWriteTester ref_tester;
ASSERT_EQ(ref_tester.Init(&tail_schema.value, &tail.value, *type_resolver_),
NANOARROW_OK);
ASSERT_EQ(ref_tester.WriteAll(nullptr), ENODATA);
const struct ArrowBuffer ref_buf = ref_tester.WriteBuffer();

source->offset = 0;
source->length = 3;
source->children[0]->offset = 3;
source->children[0]->length = 3;

PostgresCopyStreamWriteTester sliced_tester;
ASSERT_EQ(sliced_tester.Init(&schema.value, &source.value, *type_resolver_),
NANOARROW_OK);
ASSERT_EQ(sliced_tester.WriteAll(nullptr), ENODATA);
const struct ArrowBuffer sliced_buf = sliced_tester.WriteBuffer();

ASSERT_EQ(sliced_buf.size_bytes, ref_buf.size_bytes);
for (int64_t i = 0; i < sliced_buf.size_bytes; i++) {
ASSERT_EQ(sliced_buf.data[i], ref_buf.data[i]) << "failure at index " << i;
}
}

TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) {
// Regression test for https://github.com/apache/arrow-adbc/issues/1310
adbc_validation::Handle<struct ArrowSchema> schema;
Expand Down
7 changes: 4 additions & 3 deletions c/driver/postgresql/copy/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,13 @@ class PostgresCopyListFieldWriter : public PostgresCopyFieldWriter {

// TODO: the LARGE_LIST should use 64 bit indexes
int32_t start, end;
const int64_t logical = array_view_->offset + index;
if constexpr (IsFixedSize) {
start = index * array_view_->layout.child_size_elements;
start = logical * array_view_->layout.child_size_elements;
end = start + array_view_->layout.child_size_elements;
} else {
start = ArrowArrayViewListChildOffset(array_view_, index);
end = ArrowArrayViewListChildOffset(array_view_, index + 1);
start = ArrowArrayViewListChildOffset(array_view_, logical);
end = ArrowArrayViewListChildOffset(array_view_, logical + 1);
}

const int32_t dim = end - start;
Expand Down
97 changes: 97 additions & 0 deletions c/driver/postgresql/postgresql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,103 @@ TEST_F(PostgresStatementTest, EmptyStringAndBinaryParameter) {
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}

// Regression test for https://github.com/apache/arrow-adbc/issues/4319.
// The parameterized prepared-statement path (BindAndExecuteCurrentRow)
// constructs its field writers via the same MakeCopyFieldWriter factory
// as the COPY ingest path, so the offset bug in PostgresCopyListFieldWriter
// drifts list-typed bound parameters the same way. This exercises the bind
// path end-to-end with an INSERT … ON CONFLICT DO UPDATE upsert whose list
// parameter comes from a sliced Arrow array (parent.offset > 0).
TEST_F(PostgresStatementTest, BindUpsertWithSlicedListParameter) {
ASSERT_THAT(quirks()->DropTable(&connection, "adbc_test", &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));

ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "CREATE TABLE adbc_test (id INT PRIMARY KEY, tags TEXT[] NOT NULL)",
&error),
IsOkStatus(&error));
{
adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcStatementExecuteQuery(&statement, &reader.stream.value, nullptr, &error),
IsOkStatus(&error));
}

// 6-row struct(int32, list<string>) where odd rows have 1-element tags
// and even rows have 2-element tags. The size difference makes drift
// structurally observable, not just value-different.
adbc_validation::Handle<struct ArrowSchema> bind_schema;
adbc_validation::Handle<struct ArrowArray> bind;
struct ArrowError na_error;
ASSERT_EQ(adbc_validation::MakeSchema(
&bind_schema.value,
{{"id", NANOARROW_TYPE_INT32},
adbc_validation::SchemaField::Nested(
"tags", NANOARROW_TYPE_LIST, {{"item", NANOARROW_TYPE_STRING}})}),
ADBC_STATUS_OK);

ASSERT_EQ(
(adbc_validation::MakeBatch<int32_t, std::vector<std::string>>(
&bind_schema.value, &bind.value, &na_error, {0, 1, 2, 3, 4, 5},
{std::vector<std::string>{"r0a", "r0b"}, std::vector<std::string>{"r1x"},
std::vector<std::string>{"r2a", "r2b"}, std::vector<std::string>{"r3x"},
std::vector<std::string>{"r4a", "r4b"}, std::vector<std::string>{"r5x"}})),
ADBC_STATUS_OK);

// Hide rows 0..2 by setting offset/length on the struct root and on
// each child column (nanoarrow does not propagate the struct's offset).
bind->offset = 0;
bind->length = 3;
bind->children[0]->offset = 3;
bind->children[0]->length = 3;
bind->children[1]->offset = 3;
bind->children[1]->length = 3;

ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement,
"INSERT INTO adbc_test (id, tags) VALUES ($1, $2) "
"ON CONFLICT (id) DO UPDATE SET tags = EXCLUDED.tags",
&error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &bind.value, &bind_schema.value, &error),
IsOkStatus(&error));
{
adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcStatementExecuteQuery(&statement, &reader.stream.value, nullptr, &error),
IsOkStatus(&error));
}

// SELECT array_length(tags, 1) per id. Expected: id=3→1, id=4→2, id=5→1.
// Under the bug, ids 3..5 would receive tags from underlying rows 0..2
// and report lengths 2/1/2 instead.
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement,
"SELECT id, array_length(tags, 1) AS len FROM adbc_test ORDER BY id", &error),
IsOkStatus(&error));
adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcStatementExecuteQuery(&statement, &reader.stream.value, nullptr, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(reader.array->release, nullptr);
ASSERT_EQ(reader.array->length, 3);

// id column (int32)
ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[0], 0), 3);
ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[0], 1), 4);
ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[0], 2), 5);
// len column (int32 — PostgreSQL array_length returns int4)
ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[1], 0), 1);
ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[1], 1), 2);
ASSERT_EQ(ArrowArrayViewGetIntUnsafe(reader.array_view->children[1], 2), 1);

ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}

TEST_F(PostgresStatementTest, SqlExecuteCopyZeroRowOutputError) {
ASSERT_THAT(quirks()->DropTable(&connection, "adbc_test", &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
Expand Down