From 19f3ffa810ff373b3afd12bdef694e72cd51ceab Mon Sep 17 00:00:00 2001 From: Wil Wade Date: Fri, 23 Jun 2023 08:38:03 -0400 Subject: [PATCH] Decimal Writer Support (#90) Problem ======= Need to support writing decimal types Also closes: #87 Solution ======== Add basic encoding support for decimals Change summary: --------------- * Added better decimal field errors * Default scale to 0 per spec * Cleanup types on RLE so it only asks for what is needed * Support decimal encoding * Add test for write / read of decimal Steps to Verify: ---------------- 1. Generate a schema with decimal field 2. Use it! --- lib/codec/plain.ts | 34 +++++++++++++-------- lib/codec/plain_dictionary.ts | 4 +-- lib/codec/rle.ts | 14 ++++----- lib/codec/types.ts | 2 +- lib/reader.ts | 10 +++---- lib/schema.ts | 28 ++++++++++++++---- lib/writer.ts | 18 +++++++---- package-lock.json | 2 +- test/integration.js | 56 +++++++++++++++++++++++++++++++++++ test/schema.js | 50 +++++++++++++++++++++++++++---- 10 files changed, 174 insertions(+), 44 deletions(-) diff --git a/lib/codec/plain.ts b/lib/codec/plain.ts index 7e5f9af7..cc191a17 100644 --- a/lib/codec/plain.ts +++ b/lib/codec/plain.ts @@ -26,10 +26,16 @@ function decodeValues_BOOLEAN(cursor: Cursor, count: number) { return values; } -function encodeValues_INT32(values: Array) { +function encodeValues_INT32(values: Array, opts: Options) { + const isDecimal = opts?.originalType === 'DECIMAL' || opts?.column?.originalType === 'DECIMAL'; + const scale = opts?.scale || 0; let buf = Buffer.alloc(4 * values.length); for (let i = 0; i < values.length; i++) { - buf.writeInt32LE(values[i], i * 4); + if (isDecimal) { + buf.writeInt32LE(values[i] * Math.pow(10, scale), i * 4); + } else { + buf.writeInt32LE(values[i], i * 4); + } } return buf; @@ -55,10 +61,16 @@ function decodeValues_INT32(cursor: Cursor, count: number, opts: Options) { return values; } -function encodeValues_INT64(values: Array) { +function encodeValues_INT64(values: Array, opts: Options) { + const isDecimal = opts?.originalType === 'DECIMAL' || opts?.column?.originalType === 'DECIMAL'; + const scale = opts?.scale || 0; let buf = Buffer.alloc(8 * values.length); for (let i = 0; i < values.length; i++) { - buf.writeBigInt64LE(BigInt(values[i]), i * 8); + if (isDecimal) { + buf.writeBigInt64LE(BigInt(Math.floor(values[i] * Math.pow(10, scale))), i * 8); + } else { + buf.writeBigInt64LE(BigInt(values[i]), i * 8); + } } return buf; @@ -86,15 +98,11 @@ function decodeValues_INT64(cursor: Cursor, count: number, opts: Options) { } function decodeValues_DECIMAL(cursor: Cursor, count: number, opts: Options) { - let { - scale, - precision - } = opts; + const precision = opts.precision; + // Default scale to 0 per spec + const scale = opts.scale || 0; const name = opts.name || undefined - if (!scale) { - throw `missing option: scale (required for DECIMAL) for column: ${name}`; - } if (!precision) { throw `missing option: precision (required for DECIMAL) for column: ${name}`; } @@ -283,10 +291,10 @@ export const encodeValues = function ( return encodeValues_BOOLEAN(values as Array); case "INT32": - return encodeValues_INT32(values as Array); + return encodeValues_INT32(values as Array, opts); case "INT64": - return encodeValues_INT64(values as Array); + return encodeValues_INT64(values as Array, opts); case "INT96": return encodeValues_INT96(values as Array); diff --git a/lib/codec/plain_dictionary.ts b/lib/codec/plain_dictionary.ts index 3e5c3ea0..70121462 100644 --- a/lib/codec/plain_dictionary.ts +++ b/lib/codec/plain_dictionary.ts @@ -2,7 +2,7 @@ import * as rle from './rle' import { Cursor, Options } from './types' export const decodeValues = function(type: string, cursor: Cursor, count: number, opts: Options) { - opts.bitWidth = cursor.buffer.slice(cursor.offset, cursor.offset+1).readInt8(0); + const bitWidth = cursor.buffer.slice(cursor.offset, cursor.offset+1).readInt8(0); cursor.offset += 1; - return rle.decodeValues(type, cursor, count, Object.assign({}, opts, {disableEnvelope: true})); + return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth })); }; diff --git a/lib/codec/rle.ts b/lib/codec/rle.ts index ba022e6a..0e519587 100644 --- a/lib/codec/rle.ts +++ b/lib/codec/rle.ts @@ -3,9 +3,9 @@ // https://github.com/apache/parquet-format/blob/master/Encodings.md import varint from 'varint' -import {Cursor, Options} from './types' +import { Cursor } from './types' -function encodeRunBitpacked(values: Array, opts: Options) { +function encodeRunBitpacked(values: Array, opts: { bitWidth: number }) { for (let i = 0; i < values.length % 8; i++) { values.push(0); } @@ -23,7 +23,7 @@ function encodeRunBitpacked(values: Array, opts: Options) { ]); } -function encodeRunRepeated(value: number, count: number, opts: Options) { +function encodeRunRepeated(value: number, count: number, opts: { bitWidth: number }) { let buf = Buffer.alloc(Math.ceil(opts.bitWidth / 8)); let remainingValue = value @@ -48,7 +48,7 @@ function unknownToParsedInt(value: string | number) { } } -export const encodeValues = function(type: string, values: Array, opts: Options) { +export const encodeValues = function(type: string, values: Array, opts: { bitWidth: number, disableEnvelope?: boolean }) { if (!('bitWidth' in opts)) { throw 'bitWidth is required'; } @@ -108,7 +108,7 @@ export const encodeValues = function(type: string, values: Array, opts: return envelope; }; -function decodeRunBitpacked(cursor : Cursor, count: number, opts: Options) { +function decodeRunBitpacked(cursor : Cursor, count: number, opts: { bitWidth: number }) { if (count % 8 !== 0) { throw 'must be a multiple of 8'; } @@ -124,7 +124,7 @@ function decodeRunBitpacked(cursor : Cursor, count: number, opts: Options) { return values; } -function decodeRunRepeated(cursor: Cursor, count: number, opts: Options) { +function decodeRunRepeated(cursor: Cursor, count: number, opts: { bitWidth: number }) { var bytesNeededForFixedBitWidth = Math.ceil(opts.bitWidth / 8); let value = 0; @@ -139,7 +139,7 @@ function decodeRunRepeated(cursor: Cursor, count: number, opts: Options) { return new Array(count).fill(value); } -export const decodeValues = function(_: string, cursor: Cursor, count: number, opts: Options) { +export const decodeValues = function(_: string, cursor: Cursor, count: number, opts: { bitWidth: number, disableEnvelope?: boolean }) { if (!('bitWidth' in opts)) { throw 'bitWidth is required'; } diff --git a/lib/codec/types.ts b/lib/codec/types.ts index d98620ab..c4b0ce1c 100644 --- a/lib/codec/types.ts +++ b/lib/codec/types.ts @@ -5,7 +5,7 @@ import { Statistics } from "../../gen-nodejs/parquet_types"; export interface Options { typeLength: number, bitWidth: number, - disableEnvelope: boolean + disableEnvelope?: boolean primitiveType?: PrimitiveType; originalType?: OriginalType; encoding?: ParquetCodec; diff --git a/lib/reader.ts b/lib/reader.ts index 869a1cbd..1ddc0e78 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -22,9 +22,9 @@ const { const PARQUET_MAGIC = 'PAR1'; /** - * Parquet File Format Version + * Supported Parquet File Format Version for reading */ -const PARQUET_VERSION = 1; +const PARQUET_VERSIONS = [1, 2]; /** * Internal type used for repetition/definition levels @@ -166,7 +166,7 @@ export class ParquetReader { */ constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, opts?: BufferReaderOptions) { opts = opts || {}; - if (metadata.version != PARQUET_VERSION) { + if (!PARQUET_VERSIONS.includes(metadata.version)) { throw 'invalid parquet version'; } @@ -1021,8 +1021,8 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade valuesBufCursor, valueCountNonNull, { - typeLength: opts.column!.typeLength!, - bitWidth: opts.column!.typeLength! + bitWidth: opts.column!.typeLength!, + ...opts.column! }); return { diff --git a/lib/schema.ts b/lib/schema.ts index c31160d8..ac950d75 100644 --- a/lib/schema.ts +++ b/lib/schema.ts @@ -170,6 +170,8 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP } if (typeDef.originalType === 'DECIMAL') { + // Default scale to 0 per https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal + if (typeof opts.scale === "undefined") opts.scale = 0; fieldErrors = fieldErrors.concat(errorsForDecimalOpts(typeDef.originalType, opts, nameWithPath)); } @@ -219,19 +221,35 @@ function isDefined(val: T | undefined): val is T { function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: string): string[] { const fieldErrors = [] - if(!opts.precision) { + if(opts.precision === undefined || opts.precision < 1) { fieldErrors.push( - `invalid schema for type: ${type}, for Column: ${columnName}, precision is required` + `invalid schema for type: ${type}, for Column: ${columnName}, precision is required and must be be greater than 0` + ); + } + else if (!Number.isInteger(opts.precision)) { + fieldErrors.push( + `invalid schema for type: ${type}, for Column: ${columnName}, precision must be an integer` ); } else if (opts.precision > 18) { fieldErrors.push( - `invalid precision for type: ${type}, for Column: ${columnName}, can not handle precision over 18` + `invalid schema for type: ${type}, for Column: ${columnName}, can not handle precision over 18` + ); + } + if (typeof opts.scale === "undefined" || opts.scale < 0) { + fieldErrors.push( + `invalid schema for type: ${type}, for Column: ${columnName}, scale is required to be 0 or greater` + ); + } + else if (!Number.isInteger(opts.scale)) { + fieldErrors.push( + `invalid schema for type: ${type}, for Column: ${columnName}, scale must be an integer` ); } - if (!opts.scale) { + // Default precision to 18 if it is undefined as that is a different error + else if (opts.scale > (opts.precision || 18)) { fieldErrors.push( - `invalid schema for type: ${type}, for Column: ${columnName}, scale is required` + `invalid schema or precision for type: ${type}, for Column: ${columnName}, precision must be greater than or equal to scale` ); } return fieldErrors diff --git a/lib/writer.ts b/lib/writer.ts index 8ea73908..1aa535cc 100644 --- a/lib/writer.ts +++ b/lib/writer.ts @@ -1,5 +1,5 @@ import stream from 'stream' -import parquet_thrift from '../gen-nodejs/parquet_types' +import parquet_thrift, { ConvertedType } from '../gen-nodejs/parquet_types' import * as parquet_shredder from './shred' import * as parquet_util from './util' import * as parquet_codec from './codec' @@ -487,8 +487,8 @@ async function encodeDataPage(column: ParquetField, values: number[], rlevels: n column.primitiveType!, column.encoding!, values, { - typeLength: column.typeLength, - bitWidth: column.typeLength + bitWidth: column.typeLength, + ...column }); /* encode repetition and definition levels */ @@ -545,8 +545,8 @@ async function encodeDataPageV2(column: ParquetField, rowCount: number, values: column.primitiveType!, column.encoding!, values, { - typeLength: column.typeLength, - bitWidth: column.typeLength + bitWidth: column.typeLength, + ...column, }); let valuesBufCompressed = await parquet_compression.deflate( @@ -772,6 +772,14 @@ function encodeFooter(schema: ParquetSchema, rowCount: Int64, rowGroups: RowGrou schemaElem.converted_type = parquet_thrift.ConvertedType[field.originalType]; } + // Support Decimal + switch(schemaElem.converted_type) { + case (ConvertedType.DECIMAL): + schemaElem.precision = field.precision; + schemaElem.scale = field.scale || 0; + break; + } + schemaElem.type_length = field.typeLength; metadata.schema.push(schemaElem); diff --git a/package-lock.json b/package-lock.json index 094b3d1f..fcde414f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,7 +26,7 @@ "devDependencies": { "@types/bson": "^4.2.0", "@types/chai": "^4.3.5", - "@types/json-schema": "", + "@types/json-schema": "^7.0.11", "@types/mocha": "^10.0.1", "@types/node": "^16.18.32", "@types/sinon": "^10.0.15", diff --git a/test/integration.js b/test/integration.js index 4cf8ebc9..7dc3b8c0 100644 --- a/test/integration.js +++ b/test/integration.js @@ -576,5 +576,61 @@ describe('Parquet', function() { ); }); }); + + describe('Decimal schema', function() { + const schema = new parquet.ParquetSchema({ + zero_column: { type: 'DECIMAL', precision: 10, scale: 0 }, + no_scale_column: { type: 'DECIMAL', precision: 10 }, + scale_64_column: { type: 'DECIMAL', precision: 10, scale: 2 }, + scale_32_column: { type: 'DECIMAL', precision: 8, scale: 2 }, + }); + + const rowData = { + zero_column: 1, + no_scale_column: 2, + scale_64_column: 3.345678901234567, + scale_32_column: 3.3, + }; + + it('write a test file with decimals in v1 data page and read it back', async function() { + const file = "decimal-test-v1.parquet"; + const opts = { useDataPageV2: false }; + const writer = await parquet.ParquetWriter.openFile(schema, file, opts); + + await writer.appendRow(rowData); + await writer.close(); + + const reader = await parquet.ParquetReader.openFile(file); + + const cursor = reader.getCursor(); + const row = await cursor.next(); + assert.deepEqual(row, { + zero_column: 1, + no_scale_column: 2, + scale_64_column: 3.34, // Scale 2 + scale_32_column: 3.3, + }) + }); + + it('write a test file with decimals in v2 data page and read it back', async function() { + const file = "decimal-test-v2.parquet"; + const opts = { useDataPageV2: true }; + const writer = await parquet.ParquetWriter.openFile(schema, file, opts); + + await writer.appendRow(rowData); + await writer.close(); + + const reader = await parquet.ParquetReader.openFile(file); + + const cursor = reader.getCursor(); + const row = await cursor.next(); + assert.deepEqual(row, { + zero_column: 1, + no_scale_column: 2, + scale_64_column: 3.34, // Scale 2 + scale_32_column: 3.3, + }) + }); + }); }); diff --git a/test/schema.js b/test/schema.js index 7d6a74b0..0729c646 100644 --- a/test/schema.js +++ b/test/schema.js @@ -529,15 +529,23 @@ describe('ParquetSchema', function() { new parquet.ParquetSchema({ test_decimal_col: {type: 'DECIMAL', scale: 4}, }) - }, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, precision is required'); + }, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, precision is required and must be be greater than 0'); }); - it('should throw error given decimal with no scale', function() { - assert.throws(() => { + it('should NOT throw error given decimal with no scale', function() { + assert.doesNotThrow(() => { new parquet.ParquetSchema({ test_decimal_col: {type: 'DECIMAL', precision: 4}, }) - }, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, scale is required'); + }); + }); + + it('should throw error given decimal with negative precision', function() { + assert.throws(() => { + new parquet.ParquetSchema({ + decimal_column: {type: 'DECIMAL', precision: -1, scale: 0}, + }) + }, 'invalid schema for type: DECIMAL, for Column: decimal_column, precision is required and must be be greater than 0'); }); it('should throw error given decimal with over 18 precision', function() { @@ -545,7 +553,39 @@ describe('ParquetSchema', function() { new parquet.ParquetSchema({ decimal_column: {type: 'DECIMAL', precision: 19, scale: 5}, }) - }, 'invalid precision for type: DECIMAL, for Column: decimal_column, can not handle precision over 18'); + }, 'invalid schema for type: DECIMAL, for Column: decimal_column, can not handle precision over 18'); + }); + + it('should throw error given decimal with a non-integer precision', function() { + assert.throws(() => { + new parquet.ParquetSchema({ + decimal_column: {type: 'DECIMAL', precision: 6.1, scale: 5}, + }) + }, 'invalid schema for type: DECIMAL, for Column: decimal_column, precision must be an integer'); + }); + + it('should throw error given decimal with a non-integer scale', function() { + assert.throws(() => { + new parquet.ParquetSchema({ + decimal_column: {type: 'DECIMAL', precision: 6, scale: 5.1}, + }) + }, 'invalid schema for type: DECIMAL, for Column: decimal_column, scale must be an integer'); + }); + + it('should throw error given decimal with negative scale', function() { + assert.throws(() => { + new parquet.ParquetSchema({ + decimal_column: {type: 'DECIMAL', precision: 6, scale: -1}, + }) + }, 'invalid schema for type: DECIMAL, for Column: decimal_column, scale is required to be 0 or greater'); + }); + + it('should throw error given decimal with scale > precision', function() { + assert.throws(() => { + new parquet.ParquetSchema({ + decimal_column: {type: 'DECIMAL', precision: 5, scale: 6}, + }) + }, 'invalid schema or precision for type: DECIMAL, for Column: decimal_column, precision must be greater than or equal to scale'); }); });