Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions lib/codec/rle.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const varint = require('varint')

function encodeRunBitpacked(values, opts) {
if (values.length % 8 !== 0) {
throw 'must be a multiple of 8';
for (let i = 0; i < values.length % 8; i++) {
values.push(0);
}

let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8)));
Expand Down Expand Up @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) {
}

let buf = Buffer.alloc(0);
let runs = [];
for (let cur = 0; cur < values.length; cur += 8) {
let repeating = true;
for (let i = 1; i < 8; ++i) {
if (values[cur + i] !== values[cur]) {
repeating = false;
let run = [];
let repeats = 0;

for (let i = 0; i < values.length; i++) {
// If we are at the beginning of a run and the next value is same we start
// collecting repeated values
if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) {
// If we have any data in runs we need to encode them
if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
run = [];
}
repeats = 1;
} else if (repeats > 0 && values[i] === values[i-1]) {
repeats += 1;
} else {
// If values changes we need to post any previous repeated values
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]);
repeats = 0;
}
run.push(values[i]);
}

const append =
runs.length > 0 &&
(runs[runs.length - 1][1] !== null) === repeating &&
(!repeating || runs[runs.length - 1][1] === values[cur]);

if (!append) {
runs.push([cur, repeating ? values[cur] : null]);
}
}

for (let i = values.length - (values.length % 8); i < values.length; ++i) {
runs.push([i, values[i]]);
}

for (let i = 0; i < runs.length; ++i) {
const begin = runs[i][0];
const end = i < runs.length - 1 ? runs[i + 1][0] : values.length;
const rep = runs[i][1];

if (rep === null) {
buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]);
} else {
buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]);
}
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]);
} else if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
}

if (opts.disableEnvelope) {
Expand All @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) {
buf.copy(envelope, 4);

return envelope;
}
};

function decodeRunBitpacked(cursor, count, opts) {
if (count % 8 !== 0) {
Expand Down Expand Up @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) {
values.push(...decodeRunRepeated(cursor, count, opts));
}
}
values = values.slice(0,count);

if (values.length !== count) {
throw "invalid RLE encoding";
}

return values;
}

};
4 changes: 2 additions & 2 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ function decodeDataPages(buffer, opts) {

while (cursor.offset < cursor.size) {
const pageHeader = new parquet_thrift.PageHeader();
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer);
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));

const pageType = parquet_util.getThriftEnum(
parquet_thrift.PageType,
Expand Down Expand Up @@ -407,7 +407,7 @@ function decodeDataPage(cursor, header, opts) {
function decodeDataPageV2(cursor, header, opts) {
const cursorEnd = cursor.offset + header.compressed_page_size;

const valueCount = header.data_page_header_v2.num_values;
const valueCount = header.data_page_header_v2.num_rows;
const valueCountNonNull = valueCount - header.data_page_header_v2.num_nulls;
const valueEncoding = parquet_util.getThriftEnum(
parquet_thrift.Encoding,
Expand Down
4 changes: 4 additions & 0 deletions lib/shred.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ exports.shredRecord = function(schema, record, buffer) {
/* if no error during shredding, add the shredded record to the buffer */
if (!('columnData' in buffer) || !('rowCount' in buffer)) {
buffer.rowCount = 0;
buffer.pageRowCount = 0;
buffer.columnData = {};
buffer.pages = {};

for (let field of schema.fieldList) {
buffer.columnData[field.path] = {
Expand All @@ -51,10 +53,12 @@ exports.shredRecord = function(schema, record, buffer) {
values: [],
count: 0
};
buffer.pages[field.path] = [];
}
}

buffer.rowCount += 1;
buffer.pageRowCount += 1;
for (let field of schema.fieldList) {
Array.prototype.push.apply(
buffer.columnData[field.path].rlevels,
Expand Down
115 changes: 78 additions & 37 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,67 @@ class ParquetWriter {

parquet_shredder.shredRecord(this.schema, row, this.rowBuffer);

if (this.rowBuffer.pageRowCount > this.envelopeWriter.pageSize) {
this.encodePages();
}

if (this.rowBuffer.rowCount >= this.rowGroupSize) {
await this.envelopeWriter.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
this.encodePages();
await this.writeRowGroup();
}
}

encodePages() {
if (!this.rowBuffer.pageRowCount) {
return;
}

for (let field of this.schema.fieldList) {
if (field.isNested) {
continue;
}

let page;
const values = this.rowBuffer.columnData[field.path];

if (this.envelopeWriter.useDataPageV2) {
page = encodeDataPageV2(
field,
values.count,
this.rowBuffer.pageRowCount,
values.values,
values.rlevels,
values.dlevels);
} else {
page = encodeDataPage(
field,
values.count,
values.values,
values.rlevels,
values.dlevels);
}

this.rowBuffer.pages[field.path].push({page, count: values.values.length });

values.values = [];
values.rlevels = [];
values.dlevels = [];
values.count = 0;
}

this.rowBuffer.pageRowCount = 0;
}

async writeRowGroup() {
const rowBuffer = this.rowBuffer;
this.rowBuffer = {};
return this.envelopeWriter.writeRowGroup(rowBuffer);
}

async writeFooter() {
return this.envelopeWriter.writeFooter(this.userMetadata);
}

/**
* Finish writing the parquet file and commit the footer to disk. This method
* MUST be called after you are finished adding rows. You must not call this
Expand All @@ -113,11 +168,12 @@ class ParquetWriter {
this.closed = true;

if (this.rowBuffer.rowCount > 0 || this.rowBuffer.rowCount >= this.rowGroupSize) {
await this.envelopeWriter.writeRowGroup(this.rowBuffer);
this.encodePages();
await this.writeRowGroup();
this.rowBuffer = {};
}

await this.envelopeWriter.writeFooter(this.userMetadata);
await this.envelopeWriter.writeFooter();
await this.envelopeWriter.close();
this.envelopeWriter = null;

Expand Down Expand Up @@ -177,7 +233,7 @@ class ParquetEnvelopeWriter {
this.offset = fileOffset;
this.rowCount = 0;
this.rowGroups = [];
this.pageSize = PARQUET_DEFAULT_PAGE_SIZE;
this.pageSize = opts.pageSize || PARQUET_DEFAULT_PAGE_SIZE;
this.useDataPageV2 = ("useDataPageV2" in opts) ? opts.useDataPageV2 : true;
}

Expand All @@ -198,6 +254,10 @@ class ParquetEnvelopeWriter {
* shredRecord method
*/
writeRowGroup(records) {
if (!records.length) {
return;
}

let rgroup = encodeRowGroup(
this.schema,
records,
Expand Down Expand Up @@ -287,6 +347,8 @@ function encodeValues(type, encoding, values, opts) {
return parquet_codec[encoding].encodeValues(type, values, opts);
}



/**
* Encode a parquet data page
*/
Expand Down Expand Up @@ -326,7 +388,7 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
pageHeader.uncompressed_page_size = pageBody.length;
pageHeader.compressed_page_size = pageBody.length;
pageHeader.data_page_header = new parquet_thrift.DataPageHeader();
pageHeader.data_page_header.num_values = valueCount;
pageHeader.data_page_header.num_values = rlevels.length;

pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding];
pageHeader.data_page_header.definition_level_encoding =
Expand Down Expand Up @@ -382,9 +444,9 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels
let pageHeader = new parquet_thrift.PageHeader();
pageHeader.type = parquet_thrift.PageType['DATA_PAGE_V2'];
pageHeader.data_page_header_v2 = new parquet_thrift.DataPageHeaderV2();
pageHeader.data_page_header_v2.num_values = valueCount;
pageHeader.data_page_header_v2.num_values = values.length;
pageHeader.data_page_header_v2.num_nulls = valueCount - values.length;
pageHeader.data_page_header_v2.num_rows = rowCount;
pageHeader.data_page_header_v2.num_rows = valueCount;

pageHeader.uncompressed_page_size =
rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length;
Expand All @@ -407,41 +469,20 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels
valuesBufCompressed]);
}

/**
* Encode an array of values into a parquet column chunk
*/
function encodeColumnChunk(values, opts) {
/* encode data page(s) */
let pages = [];

{
let dataPage;
if (opts.useDataPageV2) {
dataPage = encodeDataPageV2(
opts.column,
values.count,
opts.rowCount,
values.values,
values.rlevels,
values.dlevels);
} else {
dataPage = encodeDataPage(
opts.column,
values.count,
values.values,
values.rlevels,
values.dlevels);
}

pages.push(dataPage);
}

let pagesBuf = Buffer.concat(pages);
/**
* Encode an array of values into a parquet column chunk
*/
function encodeColumnChunk(pages, opts) {
let pagesBuf = Buffer.concat(pages.map(d => d.page));
let count = pages.reduce((p,d) => p + d.count, 0);

/* prepare metadata header */
let metadata = new parquet_thrift.ColumnMetaData();
metadata.path_in_schema = opts.column.path;
metadata.num_values = values.count;
metadata.num_values = count;
metadata.data_page_offset = opts.baseOffset;
metadata.encodings = [];
metadata.total_uncompressed_size = pagesBuf.length;
Expand Down Expand Up @@ -481,7 +522,7 @@ function encodeRowGroup(schema, data, opts) {
}

let cchunkData = encodeColumnChunk(
data.columnData[field.path],
data.pages[field.path],
{
column: field,
baseOffset: opts.baseOffset + body.length,
Expand Down
30 changes: 30 additions & 0 deletions test/codec_rle.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ describe('ParquetCodec::RLE', function() {
assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7]);
});

describe('number of values not a multiple of 8', function() {
it('should encode bitpacked values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
[0, 1, 2, 3, 4, 5, 6, 7, 6, 5],
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(buf, new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]));
});

it('should decode bitpacked values', function() {
let vals = parquet_codec_rle.decodeValues(
'INT32',
{
buffer: new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]),
offset: 0,
},
10,
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 6, 5]);
});
});

it('should encode repeated values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
Expand Down
Loading