Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@ const { Transform } = require('stream');
const Iconv = require('iconv-lite');

class YADBF extends Transform {

/**
*
* @param {options} [options]
* @param {number} [options.offset=0]
* @param {number} [options.size=Infinity]
* @param {boolean} [options.deleted=false] whether deleted records should be included on the output
* @param {string} [options.encoding="utf-8"]
* @param {object} [options.customFieldParsers] an object whose keys are field names and their values are functions to be called for custom-parsing them. The parameters passed are the Buffer value of the field and and object describing the field
* @param {object} [options.quirks]
* @param {booelan} [options.quirks.typeL_allowUnknownValues] treat any unknown value as unknown instead of throwing
* @param {booelan} [options.quirks.typeM_allowLeftSpacePadding] allow M-type fields to be left-padded with spaces
* @param {booelan} [options.quirks.ignoreUnknownEncryptionByte] ignores if the encryption flag is different then 0 or 1
* @param {booelan} [options.quirks.allowFieldLength255] allow field lengths to be equal to 255
*/
constructor(options = {}) {
super({ readableObjectMode: true });

Expand All @@ -12,6 +27,8 @@ class YADBF extends Transform {
this.size = validateSize(options.size);
this.includeDeletedRecords = validateDeleted(options.deleted);
this.encoding = validateEncoding(options.encoding);
this.customFieldParsers = options.customFieldParsers || {};
this.quirks = options.quirks || {};

// keep track of how many records have been made readable (used for end-of-stream detection)
this.totalRecordCount = 0;
Expand All @@ -32,7 +49,7 @@ class YADBF extends Transform {

_transform(chunk, encoding, callback) {
// append the chunk to unconsumed bytes for easier bookkeeping
this.unconsumedBytes = Buffer.concat( [this.unconsumedBytes, chunk] );
this.unconsumedBytes = Buffer.concat([this.unconsumedBytes, chunk]);

// if the header hasn't been parsed yet, do so now and emit it
if (!this.header) {
Expand All @@ -44,7 +61,7 @@ class YADBF extends Transform {

// otherwise, attempt to parse the header
try {
this.header = parseHeader(this.unconsumedBytes);
this.header = parseHeader.call(this, this.unconsumedBytes);

// emit the header for outside consumption
this.emit('header', this.header);
Expand All @@ -64,7 +81,7 @@ class YADBF extends Transform {
const recordSizedChunk = this.unconsumedBytes.slice(0, this.header.numberOfBytesInRecord);

try {
const record = convertToRecord(recordSizedChunk, this.header, this.encoding);
const record = convertToRecord.call(this, recordSizedChunk, this.header, this.encoding);

// only push if it's eligble for output and within the pagination params
if (isEligibleForOutput(record, this.includeDeletedRecords)) {
Expand All @@ -73,11 +90,11 @@ class YADBF extends Transform {
}

// increment total # of records pushed for pagination check
this.eligibleRecordCount+=1;
this.eligibleRecordCount += 1;
}

// increment total # of records consumed for end-of-stream check
this.totalRecordCount+=1;
this.totalRecordCount += 1;

// remove the slice from the unconsumed bytes
this.unconsumedBytes = this.unconsumedBytes.slice(recordSizedChunk.length);
Expand All @@ -91,7 +108,7 @@ class YADBF extends Transform {

// if all the records have been emitted, proceed with shutdown
if (allRecordsHaveBeenProcessed(this.header.numberOfRecords, this.totalRecordCount) &&
aSingleByteRemains(this.unconsumedBytes)) {
aSingleByteRemains(this.unconsumedBytes)) {
// throw an error if the last byte isn't the expected EOF marker
if (!firstByteIsEOFMarker(this.unconsumedBytes)) {
this.destroy('Last byte of file is not end-of-file marker');
Expand Down Expand Up @@ -121,24 +138,25 @@ const falseyValues = new Set(['N', 'n', 'F', 'f']);

// valid M-type value regex (10 digits or 10 spaces)
const validMTypeValueRegex = /^(\d{10}| {10})$/;
const validMTypeValuePaddedRegex = /^ {0,10}\d{0,10}$/;

// type handlers keyed by the single character type designator
const typeHandlers = {
D(value) {
return new Date(
value.substr(0, 4)
+ '-'
+ value.substr(4, 2)
+ '-'
+ value.substr(6, 2)
value.substr(0, 4)
+ '-'
+ value.substr(4, 2)
+ '-'
+ value.substr(6, 2)
);
},
L(value) {
if (truthyValues.has(value)) {
return true;
} else if (falseyValues.has(value)) {
return false;
} else if (value !== '?' && value !== ' ') { // '?' or ' ' means undefined
} else if (value !== '?' && value !== ' ' && !this.quirks.typeL_allowUnknownValues) { // '?' or ' ' means undefined
throw new Error(`Invalid L-type field value: ${value}`);
}
},
Expand All @@ -152,7 +170,8 @@ const typeHandlers = {
return value.replace(/[\u0000 ]+$/, '');
},
M(value) {
if (!validMTypeValueRegex.test(value)) {
let regex = this.quirks.typeM_allowLeftSpacePadding ? validMTypeValuePaddedRegex : validMTypeValueRegex;
if (!regex.test(value)) {
throw new Error(`Invalid M-type field value: '${value}'`);
} else {
return value;
Expand Down Expand Up @@ -214,21 +233,21 @@ function parseHeader(buffer) {
}

// there are 32 bytes per header field + 1 byte for terminator + 32 bytes for the initial header
const numberOfFields = (numberOfHeaderBytes-32-1)/32;
const numberOfFields = (numberOfHeaderBytes - 32 - 1) / 32;

const fieldBytes = buffer.slice(32, numberOfHeaderBytes);
// emit an error if the header bytes does not end with 0x0D (per spec)
if (fieldBytes.readUInt8(numberOfHeaderBytes-32-1) !== 0x0D) {
if (fieldBytes.readUInt8(numberOfHeaderBytes - 32 - 1) !== 0x0D) {
throw new Error(`Invalid field descriptor array terminator at byte ${numberOfHeaderBytes}`);
}

const encryptionByte = buffer.readUInt8(15);
// if the source is encrypted, then emit an error
if (encryptionByte === 1) {
if (encryptionByte === 1 && !this.quirks.ignoreUnknownEncryptionByte) {
throw new Error('Encryption flag is set, cannot process');
}
// valid values for the encryption byte are 0x00 and 0x01, emit an error otherwise
if (encryptionByte > 1) {
if (encryptionByte > 1 && !this.quirks.ignoreUnknownEncryptionByte) {
throw new Error(`Invalid encryption flag value: ${encryptionByte}`);
}

Expand All @@ -251,7 +270,7 @@ function parseHeader(buffer) {
numberOfBytesInRecord: buffer.readInt16LE(10),
hasProductionMDXFile: hasProductionMDXFile,
langaugeDriverId: buffer.readUInt8(29),
fields: Array.from( {length: numberOfFields }, parseHeaderField.bind(null, fieldBytes))
fields: Array.from({ length: numberOfFields }, parseHeaderField.bind(this, fieldBytes))
};

// if there are any duplicate field names, throw an error
Expand All @@ -267,11 +286,11 @@ function parseHeader(buffer) {

// parses up 32 bytes from `fieldBytes` into a valid field definition
function parseHeaderField(fieldBytes, val, i) {
const field = fieldBytes.slice(i*32, i*32+32);
const field = fieldBytes.slice(i * 32, i * 32 + 32);

// extract the field length from the 16th byte
const length = field.readUInt8(16);
if (length === 255) {
if (length === 255 && !this.quirks.allowFieldLength255) {
throw new Error('Field length must be less than 255');
}

Expand Down Expand Up @@ -323,10 +342,16 @@ function convertToRecord(chunk, header, encoding) {

header.fields.forEach(field => {
// read the value out with given encoding
const value = Iconv.decode(chunk.slice(byteOffset, byteOffset+field.length), encoding);
const bValue = chunk.slice(byteOffset, byteOffset + field.length);

// assign the field into the record
record[field.name] = typeHandlers[field.type](value);
if (this.customFieldParsers[field.name]) {
record[field.name] = this.customFieldParsers[field.name](bValue, field);
} else {
const value = Iconv.decode(bValue, encoding);
record[field.name] = typeHandlers[field.type].call(this, value);
}


// update where the next field starts
byteOffset += field.length;
Expand Down
Loading