Skip to content

Commit

Permalink
Ensure Buffer objects are returned by compression functions (#88)
Browse files Browse the repository at this point in the history
Problem
=======

compression functions' return values are assumed to be Buffer objects.
However, in reality they could be non-Buffer objects and will results in
exceptions during reading/writing.
#72 

Solution
========

Detect the data type of the value from various compression libraries and
convert the value to Buffer object if necessary before returning.

Change summary:
---------------
Ensure Buffer objects are returned by compression functions

Steps to Verify:
----------------
As pointed out in the original issue, the problem can be reproduced
using the browser build of parquetjs to read a file with snappy
compression.
  • Loading branch information
JasonYeMSFT authored Jun 21, 2023
1 parent 17cb5ed commit fa1865b
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ npm-debug.log
dist
!test/test-files/*.parquet
examples/server/package-lock.json
test/browser/*.js
20 changes: 20 additions & 0 deletions esbuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ const baseConfig = {
plugins: [compressionBrowserPlugin, wasmPlugin],
target: "es2020" // default
};
// configuration for generating test code in browser
const testConfig = {
bundle: true,
entryPoints: ['test/browser/main.ts'],
define: {
"process.env.NODE_DEBUG": false,
"process.env.NODE_ENV": "\"production\"",
global: "window"
},
inject: ['./esbuild-shims.js'],
minify: false,
platform: 'browser', // default
plugins: [compressionBrowserPlugin, wasmPlugin],
target: "es2020" // default
}
const targets = [
{
...baseConfig,
Expand All @@ -31,6 +46,11 @@ const targets = [
...baseConfig,
format: "cjs",
outfile: path.resolve(__dirname, "dist","browser","parquet.cjs.js"),
},
// Browser test code below
{
...testConfig,
outfile: path.resolve(__dirname, "test","browser","main.js"),
}
]
Promise.all(targets.map(esbuild.build))
Expand Down
18 changes: 14 additions & 4 deletions lib/browser/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ async function deflate(method, value) {
}

function deflate_identity(value) {
return value;
return buffer_from_result(value);
}

function deflate_gzip(value) {
return zlib.gzipSync(value);
}

function deflate_snappy(value) {
return snappy.compress(value);
const compressedValue = snappy.compress(value);
return buffer_from_result(compressedValue);
}

/**
Expand All @@ -52,15 +53,24 @@ async function inflate(method, value) {
}

function inflate_identity(value) {
return value;
return buffer_from_result(value);
}

function inflate_gzip(value) {
return zlib.gunzipSync(value);
}

function inflate_snappy(value) {
return snappy.uncompress(value);
const uncompressedValue = snappy.uncompress(value);
return buffer_from_result(uncompressedValue);
}

function buffer_from_result(result) {
if (Buffer.isBuffer(result)) {
return result;
} else {
return Buffer.from(result);
}
}

exports.PARQUET_COMPRESSION_METHODS = PARQUET_COMPRESSION_METHODS
Expand Down
30 changes: 19 additions & 11 deletions lib/compression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type d_brotli = (value: Uint8Array ) => Promise<Buffer>

interface PARQUET_COMPRESSION_METHODS {
[key:string]: {
deflate: Function
inflate: Function
deflate: (value: any) => Buffer | Promise<Buffer>
inflate: (value: any) => Buffer | Promise<Buffer>
}
}
// LZO compression is disabled. See: https://github.com/LibertyDSNP/parquetjs/issues/18
Expand All @@ -36,7 +36,7 @@ export const PARQUET_COMPRESSION_METHODS: PARQUET_COMPRESSION_METHODS = {
/**
* Deflate a value using compression method `method`
*/
export async function deflate(method: string, value: unknown) {
export async function deflate(method: string, value: unknown): Promise<Buffer> {
if (!(method in PARQUET_COMPRESSION_METHODS)) {
throw 'invalid compression method: ' + method;
}
Expand All @@ -45,15 +45,16 @@ export async function deflate(method: string, value: unknown) {
}

function deflate_identity(value: ArrayBuffer | Buffer | Uint8Array) {
return value;
return buffer_from_result(value);
}

function deflate_gzip(value: ArrayBuffer | Buffer | string) {
return zlib.gzipSync(value);
}

function deflate_snappy(value: ArrayBuffer | Buffer | Uint8Array) {
return snappy.compress(value);
const compressedValue = snappy.compress(value);
return buffer_from_result(compressedValue);
}

async function deflate_brotli(value: Uint8Array) {
Expand All @@ -70,29 +71,36 @@ async function deflate_brotli(value: Uint8Array) {
/**
* Inflate a value using compression method `method`
*/
export async function inflate(method: string, value: unknown) {
export async function inflate(method: string, value: unknown): Promise<Buffer> {
if (!(method in PARQUET_COMPRESSION_METHODS)) {
throw 'invalid compression method: ' + method;
}

return await PARQUET_COMPRESSION_METHODS[method].inflate(value);
}

function inflate_identity(value: ArrayBuffer | Buffer | Uint8Array) {
return value;
async function inflate_identity(value: ArrayBuffer | Buffer | Uint8Array): Promise<Buffer> {
return buffer_from_result(value);
}

function inflate_gzip(value: Buffer | ArrayBuffer | string) {
async function inflate_gzip(value: Buffer | ArrayBuffer | string) {
return zlib.gunzipSync(value);
}

function inflate_snappy(value: ArrayBuffer | Buffer | Uint8Array) {
return snappy.uncompress(value);
const uncompressedValue = snappy.uncompress(value);
return buffer_from_result(uncompressedValue);
}

async function inflate_brotli(value: Uint8Array) {
const uncompressedContent = await brotliDecompress(value)
return Buffer.from(uncompressedContent);
}


function buffer_from_result(result: ArrayBuffer | Buffer | Uint8Array): Buffer {
if (Buffer.isBuffer(result)) {
return result;
} else {
return Buffer.from(result);
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"build:browser": "node esbuild.js",
"type": "tsc --noEmit",
"lint": "echo 'Linting, it is on the TODO list...'",
"test": "mocha -r ts-node/register 'test/**/*.{js,ts}'",
"test": "mocha -r ts-node/register 'test/{,!(browser)/**}/*.{js,ts}'",
"test:only": "mocha -r ts-node/register",
"clean": "rm -Rf ./dist",
"prepublishOnly": "npm run clean && npm run build:node && npm run build:types && npm run build:browser",
Expand Down
22 changes: 22 additions & 0 deletions test/browser/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!DOCTYPE html>
<html>
<head>
<title>Mocha Tests</title>
<link rel="stylesheet" href="../../node_modules/mocha/mocha.css">
</head>

<body>
<div id="mocha"></div>
<script src="../../node_modules/mocha/mocha.js"></script>
<script src="../../node_modules/chai/chai.js"></script>
<script>mocha.setup('bdd')</script>

<!-- load code you want to test here -->
<script src="./main.js"></script>
<!-- load your test files here -->

<script>
mocha.run();
</script>
</body>
</html>
24 changes: 24 additions & 0 deletions test/browser/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import * as parquetjs from "../../dist/browser/parquet.esm";
import { assert } from "chai";

const buffer = require("buffer");

describe("Browser tests", () => {
describe("reader", () => {
it("can read snappy compressed data", async () => {
// Data from test/test-files/snappy-compressed.parquet
const uint8Array = [80, 65, 82, 49, 21, 6, 21, 80, 21, 82, 92, 21, 8, 21, 0, 21, 8, 21, 0, 21, 0, 21, 0, 17, 28, 24, 5, 119, 111, 114, 108, 100, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 22, 0, 22, 8, 24, 5, 119, 111, 114, 108, 100, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 0, 0, 0, 40, 32, 5, 0, 0, 0, 104, 101, 108, 108, 111, 1, 9, 104, 119, 111, 114, 108, 100, 6, 0, 0, 0, 98, 97, 110, 97, 110, 97, 8, 0, 0, 0, 49, 112, 111, 97, 52, 98, 112, 102, 21, 12, 25, 37, 6, 0, 25, 24, 16, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 83, 116, 114, 105, 110, 103, 21, 2, 22, 8, 22, 206, 1, 22, 206, 1, 38, 8, 60, 24, 5, 119, 111, 114, 108, 100, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 22, 0, 22, 8, 24, 5, 119, 111, 114, 108, 100, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 0, 0, 41, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 25, 24, 5, 119, 111, 114, 108, 100, 0, 25, 28, 22, 8, 21, 206, 1, 22, 0, 0, 0, 21, 2, 25, 44, 72, 4, 114, 111, 111, 116, 21, 2, 0, 21, 12, 37, 0, 24, 16, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 83, 116, 114, 105, 110, 103, 37, 0, 0, 22, 8, 25, 28, 25, 28, 38, 214, 1, 28, 21, 12, 25, 37, 6, 0, 25, 24, 16, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100, 83, 116, 114, 105, 110, 103, 21, 2, 22, 8, 22, 206, 1, 22, 206, 1, 38, 8, 60, 24, 5, 119, 111, 114, 108, 100, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 22, 0, 22, 8, 24, 5, 119, 111, 114, 108, 100, 24, 8, 49, 112, 111, 97, 52, 98, 112, 102, 0, 0, 22, 154, 3, 21, 22, 22, 242, 2, 21, 40, 0, 22, 234, 2, 22, 8, 0, 25, 12, 24, 15, 64, 100, 115, 110, 112, 47, 112, 97, 114, 113, 117, 101, 116, 106, 115, 0, 163, 0, 0, 0, 80, 65, 82, 49];
const snappyCompressedBuffer = buffer.Buffer.from(uint8Array);
const reader = await parquetjs.ParquetReader.openBuffer(snappyCompressedBuffer);
const data: any[] = [];
for await (const record of reader) {
data.push(record);
}
assert.equal(data.length, 4);

after(async () => {
await reader.close();
})
});
});
});
Binary file added test/test-files/snappy-compressed.parquet
Binary file not shown.

0 comments on commit fa1865b

Please sign in to comment.