Skip to content

Commit

Permalink
Decimal Writer Support (#90)
Browse files Browse the repository at this point in the history
Problem
=======
Need to support writing decimal types

Also closes: #87 

Solution
========

Add basic encoding support for decimals

Change summary:
---------------
* Added better decimal field errors
* Default scale to 0 per spec
* Cleanup types on RLE so it only asks for what is needed
* Support decimal encoding
* Add test for write / read of decimal

Steps to Verify:
----------------
1. Generate a schema with decimal field
2. Use it!
  • Loading branch information
wilwade authored Jun 23, 2023
1 parent fa1865b commit 19f3ffa
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 44 deletions.
34 changes: 21 additions & 13 deletions lib/codec/plain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ function decodeValues_BOOLEAN(cursor: Cursor, count: number) {
return values;
}

function encodeValues_INT32(values: Array<number>) {
function encodeValues_INT32(values: Array<number>, opts: Options) {
const isDecimal = opts?.originalType === 'DECIMAL' || opts?.column?.originalType === 'DECIMAL';
const scale = opts?.scale || 0;
let buf = Buffer.alloc(4 * values.length);
for (let i = 0; i < values.length; i++) {
buf.writeInt32LE(values[i], i * 4);
if (isDecimal) {
buf.writeInt32LE(values[i] * Math.pow(10, scale), i * 4);
} else {
buf.writeInt32LE(values[i], i * 4);
}
}

return buf;
Expand All @@ -55,10 +61,16 @@ function decodeValues_INT32(cursor: Cursor, count: number, opts: Options) {
return values;
}

function encodeValues_INT64(values: Array<number>) {
function encodeValues_INT64(values: Array<number>, opts: Options) {
const isDecimal = opts?.originalType === 'DECIMAL' || opts?.column?.originalType === 'DECIMAL';
const scale = opts?.scale || 0;
let buf = Buffer.alloc(8 * values.length);
for (let i = 0; i < values.length; i++) {
buf.writeBigInt64LE(BigInt(values[i]), i * 8);
if (isDecimal) {
buf.writeBigInt64LE(BigInt(Math.floor(values[i] * Math.pow(10, scale))), i * 8);
} else {
buf.writeBigInt64LE(BigInt(values[i]), i * 8);
}
}

return buf;
Expand Down Expand Up @@ -86,15 +98,11 @@ function decodeValues_INT64(cursor: Cursor, count: number, opts: Options) {
}

function decodeValues_DECIMAL(cursor: Cursor, count: number, opts: Options) {
let {
scale,
precision
} = opts;
const precision = opts.precision;
// Default scale to 0 per spec
const scale = opts.scale || 0;

const name = opts.name || undefined
if (!scale) {
throw `missing option: scale (required for DECIMAL) for column: ${name}`;
}
if (!precision) {
throw `missing option: precision (required for DECIMAL) for column: ${name}`;
}
Expand Down Expand Up @@ -283,10 +291,10 @@ export const encodeValues = function (
return encodeValues_BOOLEAN(values as Array<boolean>);

case "INT32":
return encodeValues_INT32(values as Array<number>);
return encodeValues_INT32(values as Array<number>, opts);

case "INT64":
return encodeValues_INT64(values as Array<number>);
return encodeValues_INT64(values as Array<number>, opts);

case "INT96":
return encodeValues_INT96(values as Array<number>);
Expand Down
4 changes: 2 additions & 2 deletions lib/codec/plain_dictionary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as rle from './rle'
import { Cursor, Options } from './types'

export const decodeValues = function(type: string, cursor: Cursor, count: number, opts: Options) {
opts.bitWidth = cursor.buffer.slice(cursor.offset, cursor.offset+1).readInt8(0);
const bitWidth = cursor.buffer.slice(cursor.offset, cursor.offset+1).readInt8(0);
cursor.offset += 1;
return rle.decodeValues(type, cursor, count, Object.assign({}, opts, {disableEnvelope: true}));
return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth }));
};
14 changes: 7 additions & 7 deletions lib/codec/rle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
// https://github.com/apache/parquet-format/blob/master/Encodings.md

import varint from 'varint'
import {Cursor, Options} from './types'
import { Cursor } from './types'

function encodeRunBitpacked(values: Array<number>, opts: Options) {
function encodeRunBitpacked(values: Array<number>, opts: { bitWidth: number }) {
for (let i = 0; i < values.length % 8; i++) {
values.push(0);
}
Expand All @@ -23,7 +23,7 @@ function encodeRunBitpacked(values: Array<number>, opts: Options) {
]);
}

function encodeRunRepeated(value: number, count: number, opts: Options) {
function encodeRunRepeated(value: number, count: number, opts: { bitWidth: number }) {
let buf = Buffer.alloc(Math.ceil(opts.bitWidth / 8));
let remainingValue = value

Expand All @@ -48,7 +48,7 @@ function unknownToParsedInt(value: string | number) {
}
}

export const encodeValues = function(type: string, values: Array<number>, opts: Options) {
export const encodeValues = function(type: string, values: Array<number>, opts: { bitWidth: number, disableEnvelope?: boolean }) {
if (!('bitWidth' in opts)) {
throw 'bitWidth is required';
}
Expand Down Expand Up @@ -108,7 +108,7 @@ export const encodeValues = function(type: string, values: Array<number>, opts:
return envelope;
};

function decodeRunBitpacked(cursor : Cursor, count: number, opts: Options) {
function decodeRunBitpacked(cursor : Cursor, count: number, opts: { bitWidth: number }) {
if (count % 8 !== 0) {
throw 'must be a multiple of 8';
}
Expand All @@ -124,7 +124,7 @@ function decodeRunBitpacked(cursor : Cursor, count: number, opts: Options) {
return values;
}

function decodeRunRepeated(cursor: Cursor, count: number, opts: Options) {
function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }) {
var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8);
let value = 0;

Expand All @@ -139,7 +139,7 @@ function decodeRunRepeated(cursor: Cursor, count: number, opts: Options) {
return new Array(count).fill(value);
}

export const decodeValues = function(_: string, cursor: Cursor, count: number, opts: Options) {
export const decodeValues = function(_: string, cursor: Cursor, count: number, opts: { bitWidth: number, disableEnvelope?: boolean }) {
if (!('bitWidth' in opts)) {
throw 'bitWidth is required';
}
Expand Down
2 changes: 1 addition & 1 deletion lib/codec/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Statistics } from "../../gen-nodejs/parquet_types";
export interface Options {
typeLength: number,
bitWidth: number,
disableEnvelope: boolean
disableEnvelope?: boolean
primitiveType?: PrimitiveType;
originalType?: OriginalType;
encoding?: ParquetCodec;
Expand Down
10 changes: 5 additions & 5 deletions lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const {
const PARQUET_MAGIC = 'PAR1';

/**
* Parquet File Format Version
* Supported Parquet File Format Version for reading
*/
const PARQUET_VERSION = 1;
const PARQUET_VERSIONS = [1, 2];

/**
* Internal type used for repetition/definition levels
Expand Down Expand Up @@ -166,7 +166,7 @@ export class ParquetReader {
*/
constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, opts?: BufferReaderOptions) {
opts = opts || {};
if (metadata.version != PARQUET_VERSION) {
if (!PARQUET_VERSIONS.includes(metadata.version)) {
throw 'invalid parquet version';
}

Expand Down Expand Up @@ -1021,8 +1021,8 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade
valuesBufCursor,
valueCountNonNull,
{
typeLength: opts.column!.typeLength!,
bitWidth: opts.column!.typeLength!
bitWidth: opts.column!.typeLength!,
...opts.column!
});

return {
Expand Down
28 changes: 23 additions & 5 deletions lib/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
}

if (typeDef.originalType === 'DECIMAL') {
// Default scale to 0 per https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
if (typeof opts.scale === "undefined") opts.scale = 0;
fieldErrors = fieldErrors.concat(errorsForDecimalOpts(typeDef.originalType, opts, nameWithPath));
}

Expand Down Expand Up @@ -219,19 +221,35 @@ function isDefined<T>(val: T | undefined): val is T {

function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: string): string[] {
const fieldErrors = []
if(!opts.precision) {
if(opts.precision === undefined || opts.precision < 1) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, precision is required`
`invalid schema for type: ${type}, for Column: ${columnName}, precision is required and must be be greater than 0`
);
}
else if (!Number.isInteger(opts.precision)) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, precision must be an integer`
);
}
else if (opts.precision > 18) {
fieldErrors.push(
`invalid precision for type: ${type}, for Column: ${columnName}, can not handle precision over 18`
`invalid schema for type: ${type}, for Column: ${columnName}, can not handle precision over 18`
);
}
if (typeof opts.scale === "undefined" || opts.scale < 0) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, scale is required to be 0 or greater`
);
}
else if (!Number.isInteger(opts.scale)) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, scale must be an integer`
);
}
if (!opts.scale) {
// Default precision to 18 if it is undefined as that is a different error
else if (opts.scale > (opts.precision || 18)) {
fieldErrors.push(
`invalid schema for type: ${type}, for Column: ${columnName}, scale is required`
`invalid schema or precision for type: ${type}, for Column: ${columnName}, precision must be greater than or equal to scale`
);
}
return fieldErrors
Expand Down
18 changes: 13 additions & 5 deletions lib/writer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import stream from 'stream'
import parquet_thrift from '../gen-nodejs/parquet_types'
import parquet_thrift, { ConvertedType } from '../gen-nodejs/parquet_types'
import * as parquet_shredder from './shred'
import * as parquet_util from './util'
import * as parquet_codec from './codec'
Expand Down Expand Up @@ -487,8 +487,8 @@ async function encodeDataPage(column: ParquetField, values: number[], rlevels: n
column.primitiveType!,
column.encoding!,
values, {
typeLength: column.typeLength,
bitWidth: column.typeLength
bitWidth: column.typeLength,
...column
});

/* encode repetition and definition levels */
Expand Down Expand Up @@ -545,8 +545,8 @@ async function encodeDataPageV2(column: ParquetField, rowCount: number, values:
column.primitiveType!,
column.encoding!,
values, {
typeLength: column.typeLength,
bitWidth: column.typeLength
bitWidth: column.typeLength,
...column,
});

let valuesBufCompressed = await parquet_compression.deflate(
Expand Down Expand Up @@ -772,6 +772,14 @@ function encodeFooter(schema: ParquetSchema, rowCount: Int64, rowGroups: RowGrou
schemaElem.converted_type = parquet_thrift.ConvertedType[field.originalType];
}

// Support Decimal
switch(schemaElem.converted_type) {
case (ConvertedType.DECIMAL):
schemaElem.precision = field.precision;
schemaElem.scale = field.scale || 0;
break;
}

schemaElem.type_length = field.typeLength;

metadata.schema.push(schemaElem);
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions test/integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -576,5 +576,61 @@ describe('Parquet', function() {
);
});
});

describe('Decimal schema', function() {
const schema = new parquet.ParquetSchema({
zero_column: { type: 'DECIMAL', precision: 10, scale: 0 },
no_scale_column: { type: 'DECIMAL', precision: 10 },
scale_64_column: { type: 'DECIMAL', precision: 10, scale: 2 },
scale_32_column: { type: 'DECIMAL', precision: 8, scale: 2 },
});

const rowData = {
zero_column: 1,
no_scale_column: 2,
scale_64_column: 3.345678901234567,
scale_32_column: 3.3,
};

it('write a test file with decimals in v1 data page and read it back', async function() {
const file = "decimal-test-v1.parquet";
const opts = { useDataPageV2: false };
const writer = await parquet.ParquetWriter.openFile(schema, file, opts);

await writer.appendRow(rowData);
await writer.close();

const reader = await parquet.ParquetReader.openFile(file);

const cursor = reader.getCursor();
const row = await cursor.next();
assert.deepEqual(row, {
zero_column: 1,
no_scale_column: 2,
scale_64_column: 3.34, // Scale 2
scale_32_column: 3.3,
})
});

it('write a test file with decimals in v2 data page and read it back', async function() {
const file = "decimal-test-v2.parquet";
const opts = { useDataPageV2: true };
const writer = await parquet.ParquetWriter.openFile(schema, file, opts);

await writer.appendRow(rowData);
await writer.close();

const reader = await parquet.ParquetReader.openFile(file);

const cursor = reader.getCursor();
const row = await cursor.next();
assert.deepEqual(row, {
zero_column: 1,
no_scale_column: 2,
scale_64_column: 3.34, // Scale 2
scale_32_column: 3.3,
})
});
});
});

Loading

0 comments on commit 19f3ffa

Please sign in to comment.