Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 98 additions & 2 deletions c/driver/postgresql/copy/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,70 @@ class PostgresCopyNumericFieldReader : public PostgresCopyFieldReader {
static const uint16_t kNumericNinf = 0xF000;
};

template <enum ArrowTimeUnit TU, typename OutT>
class PostgresCopyTimeOfDayFieldReader : public PostgresCopyFieldReader {
public:
// Microseconds per day (24h)
static inline constexpr int64_t kUsecsPerDay = 86400LL * 1000000LL;
// Nanoseconds per day (24h)
static inline constexpr int64_t kNsecsPerDay = 86400LL * 1000000000LL;

ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array,
ArrowError* error) override {
if (field_size_bytes <= 0) {
return ArrowArrayAppendNull(array, 1);
}

// PostgreSQL TIME binary payload is int64 microseconds since midnight. https://www.postgresql.org/docs/current/datatype-datetime.html
if (field_size_bytes != static_cast<int32_t>(sizeof(int64_t))) {
ArrowErrorSet(error, "Expected field with %d bytes but found field with %d bytes",
static_cast<int>(sizeof(int64_t)),
static_cast<int>(field_size_bytes)); // NOLINT(runtime/int)
return EINVAL;
}

const int64_t time_usec = ReadUnsafe<int64_t>(data);

// PostgreSQL time_recv validates microseconds since midnight (0..USECS_PER_DAY).
// Keep this validation here so we don't produce nonsensical Arrow values.
if (time_usec < 0 || time_usec > kUsecsPerDay) {
ArrowErrorSet(error,
"[libpq] TIME value %" PRId64
" usec is out of range [0, %" PRId64 "]",
time_usec, kUsecsPerDay);
return EINVAL;
}

// Convert to Arrow representation requested by schema:
// Arrow TIME32 uses int32 in seconds or milliseconds; TIME64 uses int64 in microseconds or nanoseconds.
int64_t out64 = 0;
switch (TU) {
case NANOARROW_TIME_UNIT_SECOND:
out64 = time_usec / 1000000LL;
break;
case NANOARROW_TIME_UNIT_MILLI:
out64 = time_usec / 1000LL;
break;
case NANOARROW_TIME_UNIT_MICRO:
out64 = time_usec;
break;
case NANOARROW_TIME_UNIT_NANO:
out64 = time_usec * 1000LL;
break;
}

// Ensure the target type can hold the converted value (TIME32 -> int32).
if constexpr (std::is_same<OutT, int32_t>::value) {
const int32_t out32 = static_cast<int32_t>(out64);
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &out32, sizeof(out32)));
} else {
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &out64, sizeof(out64)));
}

return AppendValid(array);
}
};

// Reader for Pg->Arrow conversions whose Arrow representation is simply the
// bytes of the field representation. This can be used with binary and string
// Arrow types and any Postgres type.
Expand Down Expand Up @@ -935,11 +999,43 @@ static inline ArrowErrorCode MakeCopyFieldReader(
return NANOARROW_OK;
}

case NANOARROW_TYPE_TIME32: {
switch (pg_type.type_id()) {
case PostgresTypeId::kTime:
switch (schema_view.time_unit) {
case NANOARROW_TIME_UNIT_SECOND:
*out = std::make_unique<
PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_SECOND, int32_t>>();
return NANOARROW_OK;
case NANOARROW_TIME_UNIT_MILLI:
*out = std::make_unique<
PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_MILLI, int32_t>>();
return NANOARROW_OK;
default:
// TIME32 only supports second/milli in Arrow. [3](https://arrow.apache.org/docs/cpp/api/datatype.html)
return ErrorCantConvert(error, pg_type, schema_view);
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
}

case NANOARROW_TYPE_TIME64: {
switch (pg_type.type_id()) {
case PostgresTypeId::kTime:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int64_t>>();
return NANOARROW_OK;
switch (schema_view.time_unit) {
case NANOARROW_TIME_UNIT_MICRO:
*out = std::make_unique<
PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_MICRO, int64_t>>();
return NANOARROW_OK;
case NANOARROW_TIME_UNIT_NANO:
*out = std::make_unique<
PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_NANO, int64_t>>();
return NANOARROW_OK;
default:
// TIME64 only supports micro/nano in Arrow. [3](https://arrow.apache.org/docs/cpp/api/datatype.html)
return ErrorCantConvert(error, pg_type, schema_view);
}
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
Expand Down
91 changes: 89 additions & 2 deletions c/driver/postgresql/copy/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,71 @@ class PostgresCopyTimestampFieldWriter : public PostgresCopyFieldWriter {
}
};


template <enum ArrowTimeUnit TU>
class PostgresCopyTimeFieldWriter : public PostgresCopyFieldWriter {
public:
// Microseconds per day (24h)
static inline constexpr int64_t kUsecsPerDay = 86400LL * 1000000LL;

ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
// PostgreSQL TIME binary format is an int64 microseconds-since-midnight
// and the COPY binary field length must be 8 bytes. https://www.postgresql.org/docs/current/datatype-datetime.html
constexpr int32_t field_size_bytes = sizeof(int64_t);
NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));

const int64_t raw_value = ArrowArrayViewGetIntUnsafe(array_view_, index);
int64_t micros = 0;

bool overflow_safe = true;
switch (TU) {
case NANOARROW_TIME_UNIT_SECOND:
overflow_safe =
raw_value <= kMaxSafeSecondsToMicros &&
raw_value >= kMinSafeSecondsToMicros;
if (overflow_safe) {
micros = raw_value * 1000000LL;
}
break;
case NANOARROW_TIME_UNIT_MILLI:
overflow_safe =
raw_value <= kMaxSafeMillisToMicros &&
raw_value >= kMinSafeMillisToMicros;
if (overflow_safe) {
micros = raw_value * 1000LL;
}
break;
case NANOARROW_TIME_UNIT_MICRO:
micros = raw_value;
break;
case NANOARROW_TIME_UNIT_NANO:
micros = raw_value / 1000LL;
break;
}

if (!overflow_safe) {
ArrowErrorSet(
error,
"[libpq] Row %" PRId64 " time value %" PRId64
" with unit %d would overflow",
index, raw_value, TU);
return ADBC_STATUS_INVALID_ARGUMENT;
}

if (micros < 0 || micros > kUsecsPerDay) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. If we assume the Arrow data isn't necessarily valid, don't we have to watch for overflow when we do the multiplication above? Or if we do assume the data is valid, then this can't happen, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reused the overflow validation logic of Duration or Timestamp

ArrowErrorSet(error,
"[libpq] Row %" PRId64
" time value %" PRId64 " (unit %d) -> %" PRId64
" microseconds is out of range [0, %" PRId64 "]",
index, raw_value, TU, micros, kUsecsPerDay);
return ADBC_STATUS_INVALID_ARGUMENT;
}

NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, micros, error));
return ADBC_STATUS_OK;
}
};

static inline ArrowErrorCode MakeCopyFieldWriter(
struct ArrowSchema* schema, struct ArrowArrayView* array_view,
const PostgresTypeResolver& type_resolver,
Expand Down Expand Up @@ -773,12 +838,34 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
*out = T::Create<T>(array_view);
return NANOARROW_OK;
}
case NANOARROW_TYPE_TIME32: {
switch (schema_view.time_unit) {
case NANOARROW_TIME_UNIT_SECOND: {
using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_SECOND>;
*out = T::Create<T>(array_view);
return NANOARROW_OK;
}
case NANOARROW_TIME_UNIT_MILLI: {
using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_MILLI>;
*out = T::Create<T>(array_view);
return NANOARROW_OK;
}
default:
return ADBC_STATUS_NOT_IMPLEMENTED;
}
}
case NANOARROW_TYPE_TIME64: {
switch (schema_view.time_unit) {
case NANOARROW_TIME_UNIT_MICRO:
using T = PostgresCopyNetworkEndianFieldWriter<int64_t>;
case NANOARROW_TIME_UNIT_MICRO: {
using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_MICRO>;
*out = T::Create<T>(array_view);
return NANOARROW_OK;
}
case NANOARROW_TIME_UNIT_NANO: {
using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_NANO>;
*out = T::Create<T>(array_view);
return NANOARROW_OK;
}
default:
return ADBC_STATUS_NOT_IMPLEMENTED;
}
Expand Down
27 changes: 25 additions & 2 deletions c/driver/postgresql/validation/queries/ingest/time_ms.txtcase
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@
// under the License.


// part: metadata
// part: expected_schema

skip = "COPY Writer not implemented"
{
"format": "+s",
"children": [
{
"name": "idx",
"format": "l",
"flags": ["nullable"]
},
{
"name": "value",
"format": "ttm",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"format": "ttm",
"format": "ttu",

We always read microseconds, so let's expect microseconds (also the values below need to be adjusted)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"flags": ["nullable"]
}
]
}

// part: expected

{"idx": 0, "value": null}
{"idx": 1, "value": 0}
{"idx": 2, "value": 1}
{"idx": 3, "value": 3723123}
{"idx": 4, "value": 86399999}
{"idx": 5, "value": 86400000}
27 changes: 25 additions & 2 deletions c/driver/postgresql/validation/queries/ingest/time_ns.txtcase
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@
// under the License.


// part: metadata
// part: expected_schema

skip = "COPY Writer not implemented"
{
"format": "+s",
"children": [
{
"name": "idx",
"format": "l",
"flags": ["nullable"]
},
{
"name": "value",
"format": "ttn",
"flags": ["nullable"]
}
]
}

// part: expected

{"idx": 0, "value": null}
{"idx": 1, "value": 0}
{"idx": 2, "value": 1}
{"idx": 3, "value": 3723123456000}
{"idx": 4, "value": 86399999999999}
{"idx": 5, "value": 86400000000000}
27 changes: 25 additions & 2 deletions c/driver/postgresql/validation/queries/ingest/time_s.txtcase
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@
// under the License.


// part: metadata
// part: expected_schema

skip = "COPY Writer not implemented"
{
"format": "+s",
"children": [
{
"name": "idx",
"format": "l",
"flags": ["nullable"]
},
{
"name": "value",
"format": "tts",
"flags": ["nullable"]
}
]
}

// part: expected

{"idx": 0, "value": null}
{"idx": 1, "value": 0}
{"idx": 2, "value": 1}
{"idx": 3, "value": 3723}
{"idx": 4, "value": 86399}
{"idx": 5, "value": 86400}
44 changes: 44 additions & 0 deletions c/driver/postgresql/validation/queries/ingest/time_us.txtcase
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


// part: expected_schema

{
"format": "+s",
"children": [
{
"name": "idx",
"format": "l",
"flags": ["nullable"]
},
{
"name": "value",
"format": "ttu",
"flags": ["nullable"]
}
]
}

// part: expected

{"idx": 0, "value": null}
{"idx": 1, "value": 0}
{"idx": 2, "value": 1}
{"idx": 3, "value": 3723123456}
{"idx": 4, "value": 86399999999}
{"idx": 5, "value": 86400000000}
Comment on lines +37 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears these values do not line up with what's expected (https://github.com/adbc-drivers/validation/blob/main/adbc_drivers_validation/queries/ingest/time_us.txtcase)

Frankly there should be no need to override this case?

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@
// under the License.


// part: metadata
// part: expected_schema

skip = "COPY Writer not implemented"
{
"format": "+s",
"children": [
{
"name": "value",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field name should be "res"

"format": "ttm",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect you will need microseconds here and below

"flags": ["nullable"]
}
]
}

// part: expected

{"value": null}
{"value": 0}
{"value": 0}
{"value": 3723123}
{"value": 86399999}
{"value": 86400000}
Loading
Loading