Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 137 additions & 8 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ struct ArrowIpcField {
struct ArrowArray* array;
// The cumulative number of buffers preceding this node.
int64_t buffer_offset;
// Dictionary information for dictionary-encoded fields
struct ArrowIpcDictionary* dictionary;
};

// Internal data specific to the read/decode process
Expand Down Expand Up @@ -95,6 +97,8 @@ struct ArrowIpcDecoderPrivate {
int64_t n_union_fields;
// A pointer to the last flatbuffers message.
const void* last_message;
// Storage for a DictionaryBatch
struct ArrowIpcDictionaryBatch dictionary;
// Storage for a Footer
struct ArrowIpcFooter footer;
// Decompressor for compression support
Expand Down Expand Up @@ -824,17 +828,56 @@ static int ArrowIpcDecoderSetType(struct ArrowSchema* schema, ns(Field_table_t)
}
}

static int ArrowIpcSetDictionaryEncoding(
struct ArrowSchema* schema, ns(DictionaryEncoding_table_t dictionary_encoding),
struct ArrowError* error) {
switch (
org_apache_arrow_flatbuf_DictionaryEncoding_dictionaryKind(dictionary_encoding)) {
case ns(DictionaryKind_DenseArray):
break;
default:
ArrowErrorSet(error, "Uexpected value for DictionaryKind");
return EINVAL;
}

struct ArrowSchema tmp;
ArrowSchemaMove(schema, &tmp);

ArrowSchemaInit(schema);
int result = ArrowSchemaAllocateDictionary(schema);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
ArrowErrorSet(error, "ArrowSchemaAllocateDictionary() failed");
return result;
}

ArrowSchemaMove(&tmp, schema->dictionary);

NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowSchemaSetName(schema, schema->dictionary->name),
error);
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowSchemaSetName(schema->dictionary, ""), error);

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetTypeInt(
schema, ns(DictionaryEncoding_indexType_get(dictionary_encoding)), error));

if (ns(DictionaryEncoding_isOrdered_get(dictionary_encoding))) {
schema->flags |= ARROW_FLAG_DICTIONARY_ORDERED;
}

// TODO: should the non extension-type field metadata get moved back to the
// parent field?

// TODO: Record the dictionary id in Schema metadata? (Or keep track of it
// some other way?)

return NANOARROW_OK;
}

static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, ns(Field_vec_t) fields,
struct ArrowError* error);

static int ArrowIpcDecoderSetField(struct ArrowSchema* schema, ns(Field_table_t) field,
struct ArrowError* error) {
// No dictionary support yet
if (ns(Field_dictionary_is_present(field))) {
ArrowErrorSet(error, "Schema message field with DictionaryEncoding not supported");
return ENOTSUP;
}

int result;
if (ns(Field_name_is_present(field))) {
result = ArrowSchemaSetName(schema, ns(Field_name_get(field)));
Expand Down Expand Up @@ -875,7 +918,16 @@ static int ArrowIpcDecoderSetField(struct ArrowSchema* schema, ns(Field_table_t)
}

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(schema, children, error));
return ArrowIpcDecoderSetMetadata(schema, ns(Field_custom_metadata(field)), error);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderSetMetadata(schema, ns(Field_custom_metadata(field)), error));

// If this is a dictionary encoded field, set the dictionary encoding
if (ns(Field_dictionary_is_present(field))) {
NANOARROW_RETURN_NOT_OK(
ArrowIpcSetDictionaryEncoding(schema, ns(Field_dictionary(field)), error));
}

return NANOARROW_OK;
}

static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, ns(Field_vec_t) fields,
Expand Down Expand Up @@ -931,6 +983,19 @@ static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder,
return NANOARROW_OK;
}

static int ArrowIpcDecoderDecodeDictionaryBatchHeader(
struct ArrowIpcDecoder* decoder, flatbuffers_generic_t message_header) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ns(DictionaryBatch_table_t) dictionary = (ns(DictionaryBatch_table_t))message_header;
private_data->dictionary.id = ns(DictionaryBatch_id(dictionary));
private_data->dictionary.is_delta = ns(DictionaryBatch_isDelta(dictionary));

decoder->dictionary = &private_data->dictionary;
return NANOARROW_OK;
}

static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder* decoder,
flatbuffers_generic_t message_header,
struct ArrowError* error) {
Expand Down Expand Up @@ -1001,6 +1066,8 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode
decoder->codec = 0;
decoder->header_size_bytes = 0;
decoder->body_size_bytes = 0;
decoder->dictionary = NULL;
memset(&private_data->dictionary, 0, sizeof(struct ArrowIpcDictionaryBatch));
decoder->footer = NULL;
ArrowIpcFooterReset(&private_data->footer);
private_data->last_message = NULL;
Expand Down Expand Up @@ -1243,11 +1310,14 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeSchemaHeader(decoder, message_header, error));
break;
case ns(MessageHeader_DictionaryBatch):
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeDictionaryBatchHeader(decoder, message_header));
break;
case ns(MessageHeader_RecordBatch):
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeRecordBatchHeader(decoder, message_header, error));
break;
case ns(MessageHeader_DictionaryBatch):
case ns(MessageHeader_Tensor):
case ns(MessageHeader_SparseTensor):
ArrowErrorSet(error, "Unsupported message type: '%s'",
Expand Down Expand Up @@ -1359,6 +1429,7 @@ static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields,
field->array_view = array_view;
field->array = array;
field->buffer_offset = *n_buffers;
field->dictionary = NULL;

for (int i = 0; i < NANOARROW_MAX_FIXED_BUFFERS; i++) {
*n_buffers += array_view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
Expand Down Expand Up @@ -2049,3 +2120,61 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}

void ArrowIpcDictionaryInit(struct ArrowIpcDictionary* dictionary) {
NANOARROW_DCHECK(dictionary != NULL);

memset(dictionary, 0, sizeof(struct ArrowIpcDictionary));
}

void ArrowIpcDictionaryReset(struct ArrowIpcDictionary* dictionary) {
NANOARROW_DCHECK(dictionary != NULL);

if (dictionary->schema.release != NULL) {
ArrowSchemaRelease(&dictionary->schema);
}

if (dictionary->array.release != NULL) {
ArrowArrayRelease(&dictionary->array);
}
}

void ArrowIpcDictionariesInit(struct ArrowIpcDictionaries* dictionaries) {
NANOARROW_DCHECK(dictionaries != NULL);
memset(dictionaries, 0, sizeof(struct ArrowIpcDictionaries));
}

void ArrowIpcDictionariesReset(struct ArrowIpcDictionaries* dictionaries) {
NANOARROW_DCHECK(dictionaries != NULL);

for (int64_t i = 0; i < dictionaries->n_dictionaries; i++) {
ArrowIpcDictionaryReset(dictionaries->dictionaries + i);
}

if (dictionaries->dictionaries != NULL) {
ArrowFree(dictionaries->dictionaries);
}
}

ArrowErrorCode ArrowIpcDictionariesAppend(struct ArrowIpcDictionaries* dictionaries,
int64_t id, struct ArrowSchema* schema) {
if ((dictionaries->n_dictionaries + 1) > dictionaries->capacity) {
int64_t new_capacity =
1 ? dictionaries->n_dictionaries == 0 : dictionaries->n_dictionaries * 2;
struct ArrowIpcDictionaries* new_dictionaries = realloc(
dictionaries->dictionaries, new_capacity * sizeof(struct ArrowIpcDictionary));
if (new_dictionaries == NULL) {
return ENOMEM;
}

dictionaries->capacity = new_capacity;
}

struct ArrowIpcDictionary* dictionary =
dictionaries->dictionaries + dictionaries->n_dictionaries;
ArrowIpcDictionaryInit(dictionary);
dictionary->id = id;
ArrowSchemaMove(schema, &dictionary->schema);
dictionaries->n_dictionaries++;
return NANOARROW_OK;
}
127 changes: 127 additions & 0 deletions src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct ArrowIpcField {
struct ArrowArrayView* array_view;
struct ArrowArray* array;
int64_t buffer_offset;
int64_t dictionary_id;
};

struct ArrowIpcDecoderPrivate {
Expand All @@ -53,6 +54,7 @@ struct ArrowIpcDecoderPrivate {
struct ArrowIpcField* fields;
int64_t n_buffers;
const void* last_message;
struct ArrowIpcDictionaryBatch dictionary;
struct ArrowIpcFooter footer;
struct ArrowIpcDecompressor decompressor;
};
Expand Down Expand Up @@ -141,6 +143,64 @@ static uint8_t kSimpleRecordBatchUncompressible[] = {
0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00};

static uint8_t kDictionarySchema[] = {
0xff, 0xff, 0xff, 0xff, 0x50, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00,
0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00,
0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0xb0, 0x00, 0x00, 0x00,
0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00,
0x0c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x8c, 0x00, 0x00, 0x00,
0x04, 0x00, 0x00, 0x00, 0x7e, 0x00, 0x00, 0x00, 0x41, 0x0a, 0x33, 0x0a, 0x32, 0x36,
0x33, 0x31, 0x37, 0x30, 0x0a, 0x31, 0x39, 0x37, 0x38, 0x38, 0x38, 0x0a, 0x35, 0x0a,
0x55, 0x54, 0x46, 0x2d, 0x38, 0x0a, 0x35, 0x33, 0x31, 0x0a, 0x31, 0x0a, 0x35, 0x33,
0x31, 0x0a, 0x31, 0x0a, 0x32, 0x35, 0x34, 0x0a, 0x31, 0x30, 0x32, 0x36, 0x0a, 0x31,
0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, 0x35, 0x0a, 0x6e, 0x61, 0x6d, 0x65,
0x73, 0x0a, 0x31, 0x36, 0x0a, 0x31, 0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a,
0x38, 0x0a, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x0a, 0x32, 0x35, 0x34,
0x0a, 0x31, 0x30, 0x32, 0x36, 0x0a, 0x35, 0x31, 0x31, 0x0a, 0x31, 0x36, 0x0a, 0x31,
0x0a, 0x32, 0x36, 0x32, 0x31, 0x35, 0x33, 0x0a, 0x37, 0x0a, 0x63, 0x6f, 0x6c, 0x75,
0x6d, 0x6e, 0x73, 0x0a, 0x32, 0x35, 0x34, 0x0a, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
0x72, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x10, 0x00,
0x18, 0x00, 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x10, 0x00, 0x14, 0x00,
0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x05, 0x14, 0x00, 0x00, 0x00, 0x48, 0x00,
0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00,
0x00, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x00, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x04, 0x00,
0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};

static uint8_t kDictionaryBatch[] = {
0xff, 0xff, 0xff, 0xff, 0xa8, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x0c, 0x00, 0x14, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x00, 0x02, 0x04, 0x00, 0x14, 0x00, 0x00, 0x00, 0x20, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x04, 0x00,
0x08, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00,
0x0c, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x4c, 0x00, 0x00, 0x00,
0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00,
0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x7a, 0x65, 0x72, 0x6f,
0x6f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
};

static uint8_t kDictionaryRecordBatch[] = {
0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x08, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00,
0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00,
0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00};

TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
Expand Down Expand Up @@ -505,6 +565,73 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchErrors) {
ArrowIpcDecoderReset(&decoder);
}

TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionarySchema) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
struct ArrowSchema schema;

struct ArrowBufferView data;
data.data.as_uint8 = kDictionarySchema;
data.size_bytes = sizeof(kDictionarySchema);

ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK);

EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
ASSERT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);

ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, &error), NANOARROW_OK);
ASSERT_EQ(schema.n_children, 1);
EXPECT_STREQ(schema.children[0]->name, "some_col");
EXPECT_EQ(schema.children[0]->flags, ARROW_FLAG_NULLABLE);
EXPECT_STREQ(schema.children[0]->format, "c");

ASSERT_NE(schema.children[0]->dictionary, nullptr);
EXPECT_STREQ(schema.children[0]->dictionary->format, "u");

ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryBatch) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;

struct ArrowBufferView data;
data.data.as_uint8 = kDictionaryBatch;
data.size_bytes = sizeof(kDictionaryBatch);

ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK);

EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
ASSERT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_DICTIONARY_BATCH);

ASSERT_NE(decoder.dictionary, nullptr);
EXPECT_EQ(decoder.dictionary->id, 0);
EXPECT_FALSE(decoder.dictionary->is_delta);

// TODO: Access RecordBatch content

ArrowIpcDecoderReset(&decoder);
}

TEST(NanoarrowIpcTest, NanoarrowIpcDecodeDictionaryRecordBatch) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;

struct ArrowBufferView data;
data.data.as_uint8 = kDictionaryRecordBatch;
data.size_bytes = sizeof(kDictionaryRecordBatch);

ASSERT_EQ(ArrowIpcDecoderInit(&decoder), NANOARROW_OK);

EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK);
ASSERT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);

// TODO: Decode RecordBatch content populating dictionary array member

ArrowIpcDecoderReset(&decoder);
}

TEST(NanoarrowIpcTest, NanoarrowIpcSetSchema) {
struct ArrowIpcDecoder decoder;
struct ArrowSchema schema;
Expand Down
Loading
Loading