Skip to content

Commit

Permalink
feat(csv): CSVArrowLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
ibgreen committed Oct 16, 2024
1 parent bfaabeb commit 451975d
Show file tree
Hide file tree
Showing 22 changed files with 406 additions and 105 deletions.
41 changes: 41 additions & 0 deletions modules/csv/src/csv-arrow-loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable, ArrowTableBatch} from '@loaders.gl/schema';
import {convertTable, convertBatches} from '@loaders.gl/schema-utils';

import type {CSVLoaderOptions} from './csv-loader';
import {CSVLoader} from './csv-loader';

export type CSVArrowLoaderOptions = LoaderOptions & {
csv?: Omit<CSVLoaderOptions['csv'], 'shape'>;
};

export const CSVArrowLoader = {
...CSVLoader,

dataType: null as unknown as ArrowTable,
batchType: null as unknown as ArrowTableBatch,

parse: async (arrayBuffer: ArrayBuffer, options?: CSVLoaderOptions) =>
parseCSVToArrow(new TextDecoder().decode(arrayBuffer), options),
parseText: (text: string, options?: CSVLoaderOptions) => parseCSVToArrow(text, options),
parseInBatches: parseCSVToArrowBatches
} as const satisfies LoaderWithParser<ArrowTable, ArrowTableBatch, CSVArrowLoaderOptions>;

async function parseCSVToArrow(csvText: string, options?: CSVLoaderOptions): Promise<ArrowTable> {
// Apps can call the parse method directly, we so apply default options here
// const csvOptions = {...CSVArrowLoader.options.csv, ...options?.csv};
const table = await CSVLoader.parseText(csvText, options);
return convertTable(table, 'arrow-table');
}

function parseCSVToArrowBatches(
asyncIterator: AsyncIterable<ArrayBuffer> | Iterable<ArrayBuffer>,
options?: CSVArrowLoaderOptions
): AsyncIterable<ArrowTableBatch> {
const tableIterator = CSVLoader.parseInBatches(asyncIterator, options);
return convertBatches(tableIterator, 'arrow-table');
}
3 changes: 3 additions & 0 deletions modules/csv/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ export {CSVLoader} from './csv-loader';

export type {CSVWriterOptions} from './csv-writer';
export {CSVWriter} from './csv-writer';

export type {CSVArrowLoaderOptions} from './csv-arrow-loader';
export {CSVArrowLoader} from './csv-arrow-loader';
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import test from 'tape-promise/tape';
import {loadInBatches, isIterator, isAsyncIterable} from '@loaders.gl/core';
import {CSVLoader} from '../src/csv-loader'; // from '@loaders.gl/csv';
import {CSVArrowLoader} from '@loaders.gl/csv';
import * as arrow from 'apache-arrow';

// Small CSV Sample Files
const CSV_NUMBERS_100_URL = '@loaders.gl/csv/test/data/numbers-100.csv';
const CSV_NUMBERS_10000_URL = '@loaders.gl/csv/test/data/numbers-10000.csv';

// TODO -restore
test.skip('CSVLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_100_URL, CSVLoader, {
test('CSVArrowLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_100_URL, CSVArrowLoader, {
csv: {
shape: 'arrow-table'
},
Expand All @@ -29,9 +32,8 @@ test.skip('CSVLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
t.end();
});

// TODO - restore
test.skip('CSVLoader#loadInBatches(numbers-10000.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_10000_URL, CSVLoader, {
test('CSVArrowLoader#loadInBatches(numbers-10000.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_10000_URL, CSVArrowLoader, {
csv: {
shape: 'arrow-table'
},
Expand Down
4 changes: 4 additions & 0 deletions modules/csv/test/csv-loader.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import test from 'tape-promise/tape';
import {validateLoader} from 'test/common/conformance';

Expand Down
4 changes: 4 additions & 0 deletions modules/csv/test/csv-writer-papaparse.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

// This is a fork of papaparse under MIT License
// https://github.com/mholt/PapaParse
/* eslint-disable */
Expand Down
4 changes: 2 additions & 2 deletions modules/csv/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import './papaparse/papaparse.spec';
// import './csv-writer-papaparse.spec';

import './csv-loader.spec';
import './csv-loader-arrow.spec';

import './csv-writer.spec';

import './csv-arrow-loader.spec';
42 changes: 31 additions & 11 deletions modules/schema-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,33 @@ export {
} from './lib/schema/convert-arrow-schema';
export {getDataTypeFromArray} from './lib/schema/data-type';

// Table utils
// TABLE CATEGORY UTILS

export {deduceTableSchema} from './lib/schema/deduce-table-schema';
export {makeTableFromData} from './lib/table/tables/make-table';
export {makeTableFromBatches} from './lib/table/batches/make-table-from-batches';

export {convertTable} from './lib/table/tables/convert-table';
export {convertToObjectRow, convertToArrayRow} from './lib/table/tables/row-utils';
export {convertArrowToTable, convertTableToArrow} from './lib/table/tables/convert-arrow-table';

// TABLE CATEGORY UTILS
export {TableBatchBuilder} from './lib/table/batches/table-batch-builder';
export type {TableBatchAggregator} from './lib/table/batches/table-batch-aggregator';
export {RowTableBatchAggregator} from './lib/table/batches/row-table-batch-aggregator';
export {ColumnarTableBatchAggregator} from './lib/table/batches/columnar-table-batch-aggregator';
export {
makeTableBatchIterator,
makeBatchFromTable
} from './lib/table/batches/make-table-batch-iterator';
export {
makeArrowTableBatchIterator,
makeArrowRecordBatchIterator
} from './lib/table/batches/make-arrow-batch-iterator';
export {convertBatch, convertBatches} from './lib/table/batches/convert-batches';

export {
isArrayRowTable,
isObjectRowTable,
isColumnarTable,
isGeoJSONTable,
isArrowTable
} from './lib/table/tables/table-types';

export {
isTable,
Expand All @@ -46,11 +65,12 @@ export {
makeObjectRowIterator
} from './lib/table/tables/table-accessors';

export {makeTableFromData} from './lib/table/tables/make-table';
export {makeTableFromBatches, makeBatchFromTable} from './lib/table/tables/make-table-from-batches';
export {convertTable} from './lib/table/tables/convert-table';
export {deduceTableSchema} from './lib/schema/deduce-table-schema';
export {convertToObjectRow, convertToArrayRow} from './lib/table/tables/row-utils';
// Table batch builders

export {TableBatchBuilder} from './lib/table/batch-builder/table-batch-builder';
export type {TableBatchAggregator} from './lib/table/batch-builder/table-batch-aggregator';
export {RowTableBatchAggregator} from './lib/table/batch-builder/row-table-batch-aggregator';
export {ColumnarTableBatchAggregator} from './lib/table/batch-builder/columnar-table-batch-aggregator';

export {ArrowLikeTable} from './lib/table/arrow-api/arrow-like-table';

Expand Down
83 changes: 83 additions & 0 deletions modules/schema-utils/src/lib/table/batches/convert-batches.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {
TableBatch,
ArrayRowTableBatch,
ObjectRowTableBatch,
ColumnarTableBatch,
ArrowTableBatch
} from '@loaders.gl/schema';
import {convertTable} from '../tables/convert-table';

export function convertBatch(batches: TableBatch, shape: 'object-row-table'): ObjectRowTableBatch;
export function convertBatch(batches: TableBatch, shape: 'array-row-table'): ArrayRowTableBatch;
export function convertBatch(batches: TableBatch, shape: 'columnar-table'): ColumnarTableBatch;
export function convertBatch(batches: TableBatch, shape: 'arrow-table'): ArrowTableBatch;

/** Convert a table batch to a different shape */
export function convertBatch(
batch: TableBatch,
shape: 'object-row-table' | 'array-row-table' | 'columnar-table' | 'arrow-table'
): TableBatch {
switch (batch.shape) {
case 'object-row-table':
return {...batch, ...convertTable(batch, 'object-row-table')};
case 'array-row-table':
return {...batch, ...convertTable(batch, 'array-row-table')};
case 'columnar-table':
return {...batch, ...convertTable(batch, 'columnar-table')};
case 'arrow-table':
return {...batch, ...convertTable(batch, 'arrow-table')};
default:
throw new Error(shape);
}
}

export function convertBatches(
batches: Iterable<TableBatch> | AsyncIterable<TableBatch>,
shape: 'object-row-table'
): AsyncIterableIterator<ObjectRowTableBatch>;
export function convertBatches(
batches: Iterable<TableBatch> | AsyncIterable<TableBatch>,
shape: 'array-row-table'
): AsyncIterableIterator<ArrayRowTableBatch>;
export function convertBatches(
batches: Iterable<TableBatch> | AsyncIterable<TableBatch>,
shape: 'columnar-table'
): AsyncIterableIterator<ColumnarTableBatch>;
export function convertBatches(
batches: Iterable<TableBatch> | AsyncIterable<TableBatch>,
shape: 'arrow-table'
): AsyncIterableIterator<ArrowTableBatch>;

/**
* Convert batches to a different shape
* @param table
* @param shape
* @returns
*/
export async function* convertBatches(
batches: Iterable<TableBatch> | AsyncIterable<TableBatch>,
shape: 'object-row-table' | 'array-row-table' | 'columnar-table' | 'arrow-table'
): AsyncIterableIterator<TableBatch> {
for await (const batch of batches) {
switch (shape) {
case 'object-row-table':
yield convertBatch(batch, 'object-row-table');
break;
case 'array-row-table':
yield convertBatch(batch, 'array-row-table');
break;
case 'columnar-table':
yield convertBatch(batch, 'columnar-table');
break;
case 'arrow-table':
yield convertBatch(batch, 'arrow-table');
break;
default:
throw new Error(shape);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import * as arrow from 'apache-arrow';
import type {Table, ArrowTableBatch} from '@loaders.gl/schema';

import {convertSchemaToArrow} from '../../schema/convert-arrow-schema';
import {getTableLength, getTableNumCols, getTableCellAt} from '../tables/table-accessors';

/**
* Returns an iterator that yields a single table as a sequence of ArrowTable batches.
* @note All batches will have the same shape and schema as the original table.
*/
export function* makeArrowTableBatchIterator(
table: Table,
options?: {batchSize?: number}
): IterableIterator<ArrowTableBatch> {
for (const batch of makeArrowRecordBatchIterator(table, options)) {
const arrowTable = new arrow.Table([batch]);
yield {
...batch,
shape: 'arrow-table',
schema: table.schema,
batchType: 'data',
length: arrowTable.numRows,
data: arrowTable
};
}
}

/**
* Returns an iterator that yields a single table as a sequence of arrow.RecordBatch batches.
* @note All batches will have the same shape and schema as the original table.
*/
export function* makeArrowRecordBatchIterator(
table: Table,
options?: {batchSize?: number}
): IterableIterator<arrow.RecordBatch> {
const arrowSchema = convertSchemaToArrow(table.schema!);

const length = getTableLength(table);
const numColumns = getTableNumCols(table);
const batchSize = options?.batchSize || length;

const builders = arrowSchema?.fields.map((arrowField) => arrow.makeBuilder(arrowField));
const structField = new arrow.Struct(arrowSchema.fields);

let batchLength = 0;
for (let rowIndex = 0; rowIndex < length; rowIndex++) {
for (let columnIndex = 0; columnIndex < numColumns; ++columnIndex) {
const value = getTableCellAt(table, rowIndex, columnIndex);

const builder = builders[columnIndex];
builder.append(value);
batchLength++;

if (batchLength >= batchSize) {
const datas = builders.map((builder) => builder.flush());
const structData = new arrow.Data(structField, 0, batchLength, 0, undefined, datas);
yield new arrow.RecordBatch(arrowSchema, structData);
batchLength = 0;
}
}
}

if (batchLength > 0) {
const datas = builders.map((builder) => builder.flush());
const structData = new arrow.Data(structField, 0, batchLength, 0, undefined, datas);
yield new arrow.RecordBatch(arrowSchema, structData);
batchLength = 0;
}

builders.map((builder) => builder.finish());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {TableBatch, Table} from '@loaders.gl/schema';
import {getTableLength} from '../tables/table-accessors';

/**
* Returns an iterator that yields the contents of a table as a sequence of batches.
* @todo Currently only a single batch is yielded.
* @note All batches will have the same shape and schema as the original table.
* @returns
*/
export function* makeTableBatchIterator(table: Table): IterableIterator<TableBatch> {
yield makeBatchFromTable(table);
}

/**
* Returns a table packaged as a single table batch
* @note The batch will have the same shape and schema as the original table.
* @returns `null` if no batches are yielded by the async iterator
*/
export function makeBatchFromTable(table: Table): TableBatch {
return {...table, length: getTableLength(table), batchType: 'data'};
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,7 @@ import type {
ArrayRowTable,
Feature
} from '@loaders.gl/schema';
import {getTableLength} from './table-accessors';

/**
* Returns an iterator that yields a single table as a sequence of batches.
* @note Currently only a single batch is yielded.
* @note All batches will have the same shape and schema as the original table.
* @returns
*/
export function* makeBatchesFromTable(table: Table): IterableIterator<TableBatch> {
yield makeBatchFromTable(table);
}

/**
* Returns a table packaged as a single table batch
* @note The batch will have the same shape and schema as the original table.
* @returns `null` if no batches are yielded by the async iterator
*/
export function makeBatchFromTable(table: Table): TableBatch {
return {...table, length: getTableLength(table), batchType: 'data'};
}
import {getTableLength} from '../tables/table-accessors';

/**
* Assembles all batches from an async iterator into a single table.
Expand Down
Loading

0 comments on commit 451975d

Please sign in to comment.