Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csv): CSVArrowLoader #3135

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions modules/csv/src/csv-arrow-loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import type {LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrowTable, ArrowTableBatch} from '@loaders.gl/schema';
import {convertTable, convertBatches} from '@loaders.gl/schema-utils';

import type {CSVLoaderOptions} from './csv-loader';
import {CSVLoader} from './csv-loader';

export type CSVArrowLoaderOptions = LoaderOptions & {
csv?: Omit<CSVLoaderOptions['csv'], 'shape'>;
};

export const CSVArrowLoader = {
...CSVLoader,

dataType: null as unknown as ArrowTable,
batchType: null as unknown as ArrowTableBatch,

parse: async (arrayBuffer: ArrayBuffer, options?: CSVLoaderOptions) =>
parseCSVToArrow(new TextDecoder().decode(arrayBuffer), options),
parseText: (text: string, options?: CSVLoaderOptions) => parseCSVToArrow(text, options),
parseInBatches: parseCSVToArrowBatches
} as const satisfies LoaderWithParser<ArrowTable, ArrowTableBatch, CSVArrowLoaderOptions>;

async function parseCSVToArrow(csvText: string, options?: CSVLoaderOptions): Promise<ArrowTable> {
// Apps can call the parse method directly, we so apply default options here
// const csvOptions = {...CSVArrowLoader.options.csv, ...options?.csv};
const table = await CSVLoader.parseText(csvText, options);
return convertTable(table, 'arrow-table');
}

function parseCSVToArrowBatches(
asyncIterator: AsyncIterable<ArrayBuffer> | Iterable<ArrayBuffer>,
options?: CSVArrowLoaderOptions
): AsyncIterable<ArrowTableBatch> {
const tableIterator = CSVLoader.parseInBatches(asyncIterator, options);
return convertBatches(tableIterator, 'arrow-table');
}
71 changes: 54 additions & 17 deletions modules/csv/src/csv-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
// Copyright (c) vis.gl contributors

import type {LoaderWithParser, LoaderOptions} from '@loaders.gl/loader-utils';
import type {ArrayRowTable, ObjectRowTable, TableBatch} from '@loaders.gl/schema';
import type {Schema, ArrayRowTable, ObjectRowTable, TableBatch} from '@loaders.gl/schema';

import {log} from '@loaders.gl/loader-utils';
import {
AsyncQueue,
deduceTableSchema,
TableBatchBuilder,
convertToArrayRow,
convertToObjectRow
} from '@loaders.gl/schema-utils';
import Papa from './papaparse/papaparse';
import AsyncIteratorStreamer from './papaparse/async-iterator-streamer';

type ObjectField = {name: string; index: number; type: any};
type ObjectSchema = {[key: string]: ObjectField} | ObjectField[];

// __VERSION__ is injected by babel-plugin-version-inline
// @ts-ignore TS2304: Cannot find name '__VERSION__'.
const VERSION = typeof __VERSION__ !== 'undefined' ? __VERSION__ : 'latest';
Expand Down Expand Up @@ -89,7 +88,7 @@ async function parseCSV(
csvText: string,
options?: CSVLoaderOptions
): Promise<ObjectRowTable | ArrayRowTable> {
// Apps can call the parse method directly, we so apply default options here
// Apps can call the parse method directly, so we apply default options here
const csvOptions = {...CSVLoader.options.csv, ...options?.csv};

const firstRow = readFirstRow(csvText);
Expand All @@ -115,20 +114,25 @@ async function parseCSV(
const headerRow = result.meta.fields || generateHeader(csvOptions.columnPrefix, firstRow.length);

const shape = csvOptions.shape || DEFAULT_CSV_SHAPE;
let table: ArrayRowTable | ObjectRowTable;
switch (shape) {
case 'object-row-table':
return {
table = {
shape: 'object-row-table',
data: rows.map((row) => (Array.isArray(row) ? convertToObjectRow(row, headerRow) : row))
};
break;
case 'array-row-table':
return {
table = {
shape: 'array-row-table',
data: rows.map((row) => (Array.isArray(row) ? row : convertToArrayRow(row, headerRow)))
};
break;
default:
throw new Error(shape);
}
table.schema = deduceTableSchema(table!);
return table;
}

// TODO - support batch size 0 = no batching/single batch?
Expand All @@ -151,7 +155,7 @@ function parseCSVInBatches(
let isFirstRow: boolean = true;
let headerRow: string[] | null = null;
let tableBatchBuilder: TableBatchBuilder | null = null;
let schema: ObjectSchema | null = null;
let schema: Schema | null = null;

const config = {
// dynamicTyping: true, // Convert numbers and boolean values in rows from strings,
Expand Down Expand Up @@ -199,7 +203,7 @@ function parseCSVInBatches(
if (!headerRow) {
headerRow = generateHeader(csvOptions.columnPrefix, row.length);
}
schema = deduceSchema(row, headerRow);
schema = deduceCSVSchema(row, headerRow);
}

if (csvOptions.optimizeMemoryUsage) {
Expand Down Expand Up @@ -314,23 +318,56 @@ function generateHeader(columnPrefix: string, count: number = 0): string[] {
return headers;
}

function deduceSchema(row, headerRow): ObjectSchema {
const schema: ObjectSchema = headerRow ? {} : [];
function deduceCSVSchema(row, headerRow): Schema {
const fields: Schema['fields'] = [];
for (let i = 0; i < row.length; i++) {
const columnName = (headerRow && headerRow[i]) || i;
const value = row[i];
switch (typeof value) {
case 'number':
fields.push({name: String(columnName), type: 'float64', nullable: true});
break;
case 'boolean':
// TODO - booleans could be handled differently...
schema[columnName] = {name: String(columnName), index: i, type: Float32Array};
fields.push({name: String(columnName), type: 'bool', nullable: true});
break;
case 'string':
fields.push({name: String(columnName), type: 'utf8', nullable: true});
break;
default:
schema[columnName] = {name: String(columnName), index: i, type: Array};
// We currently only handle numeric rows
// TODO we could offer a function to map strings to numbers?
log.warn(`CSV: Unknown column type: ${typeof value}`)();
fields.push({name: String(columnName), type: 'utf8', nullable: true});
}
}
return schema;
return {
fields,
metadata: {
'loaders.gl#format': 'csv',
'loaders.gl#loader': 'CSVLoader'
}
};
}

// TODO - remove
// type ObjectField = {name: string; index: number; type: any};
// type ObjectSchema = {[key: string]: ObjectField} | ObjectField[];

// function deduceObjectSchema(row, headerRow): ObjectSchema {
// const schema: ObjectSchema = headerRow ? {} : [];
// for (let i = 0; i < row.length; i++) {
// const columnName = (headerRow && headerRow[i]) || i;
// const value = row[i];
// switch (typeof value) {
// case 'number':
// case 'boolean':
// // TODO - booleans could be handled differently...
// schema[columnName] = {name: String(columnName), index: i, type: Float32Array};
// break;
// case 'string':
// default:
// schema[columnName] = {name: String(columnName), index: i, type: Array};
// // We currently only handle numeric rows
// // TODO we could offer a function to map strings to numbers?
// }
// }
// return schema;
// }
3 changes: 3 additions & 0 deletions modules/csv/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ export {CSVLoader} from './csv-loader';

export type {CSVWriterOptions} from './csv-writer';
export {CSVWriter} from './csv-writer';

export type {CSVArrowLoaderOptions} from './csv-arrow-loader';
export {CSVArrowLoader} from './csv-arrow-loader';
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import test from 'tape-promise/tape';
import {loadInBatches, isIterator, isAsyncIterable} from '@loaders.gl/core';
import {CSVLoader} from '../src/csv-loader'; // from '@loaders.gl/csv';
import {CSVArrowLoader} from '@loaders.gl/csv';
import * as arrow from 'apache-arrow';

// Small CSV Sample Files
const CSV_NUMBERS_100_URL = '@loaders.gl/csv/test/data/numbers-100.csv';
const CSV_NUMBERS_10000_URL = '@loaders.gl/csv/test/data/numbers-10000.csv';
const CSV_INCIDENTS_URL_QUOTES = '@loaders.gl/csv/test/data/sf_incidents-small.csv';

// TODO -restore
test.skip('CSVLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_100_URL, CSVLoader, {
csv: {
shape: 'arrow-table'
},
test('CSVArrowLoader#loadInBatches(numbers-100.csv)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_100_URL, CSVArrowLoader, {
batchSize: 40
});

Expand All @@ -29,12 +30,8 @@ test.skip('CSVLoader#loadInBatches(numbers-100.csv, arrow)', async (t) => {
t.end();
});

// TODO - restore
test.skip('CSVLoader#loadInBatches(numbers-10000.csv, arrow)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_10000_URL, CSVLoader, {
csv: {
shape: 'arrow-table'
},
test('CSVArrowLoader#loadInBatches(numbers-10000.csv)', async (t) => {
const iterator = await loadInBatches(CSV_NUMBERS_10000_URL, CSVArrowLoader, {
batchSize: 2000
});
t.ok(isIterator(iterator) || isAsyncIterable(iterator), 'loadInBatches returned iterator');
Expand All @@ -49,3 +46,18 @@ test.skip('CSVLoader#loadInBatches(numbers-10000.csv, arrow)', async (t) => {

t.end();
});

test('CSVArrowLoader#loadInBatches(incidents.csv)', async (t) => {
const iterator = await loadInBatches(CSV_INCIDENTS_URL_QUOTES, CSVArrowLoader);
t.ok(isIterator(iterator) || isAsyncIterable(iterator), 'loadInBatches returned iterator');

let batchCount = 0;
for await (const batch of iterator) {
t.ok(batch.data instanceof arrow.Table, 'returns arrow RecordBatch');
// t.comment(`BATCH: ${batch.length}`);
batchCount++;
}
t.equal(batchCount, 1, 'Correct number of batches received');

t.end();
});
4 changes: 4 additions & 0 deletions modules/csv/test/csv-loader.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

import test from 'tape-promise/tape';
import {validateLoader} from 'test/common/conformance';

Expand Down
4 changes: 4 additions & 0 deletions modules/csv/test/csv-writer-papaparse.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// loaders.gl
// SPDX-License-Identifier: MIT
// Copyright (c) vis.gl contributors

// This is a fork of papaparse under MIT License
// https://github.com/mholt/PapaParse
/* eslint-disable */
Expand Down
4 changes: 2 additions & 2 deletions modules/csv/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import './papaparse/papaparse.spec';
// import './csv-writer-papaparse.spec';

import './csv-loader.spec';
import './csv-loader-arrow.spec';

import './csv-writer.spec';

import './csv-arrow-loader.spec';
42 changes: 31 additions & 11 deletions modules/schema-utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,33 @@ export {
} from './lib/schema/convert-arrow-schema';
export {getDataTypeFromArray} from './lib/schema/data-type';

// Table utils
// TABLE CATEGORY UTILS

export {deduceTableSchema} from './lib/schema/deduce-table-schema';
export {makeTableFromData} from './lib/table/tables/make-table';
export {makeTableFromBatches} from './lib/table/batches/make-table-from-batches';

export {convertTable} from './lib/table/tables/convert-table';
export {convertToObjectRow, convertToArrayRow} from './lib/table/tables/row-utils';
export {convertArrowToTable, convertTableToArrow} from './lib/table/tables/convert-arrow-table';

// TABLE CATEGORY UTILS
export {TableBatchBuilder} from './lib/table/batches/table-batch-builder';
export type {TableBatchAggregator} from './lib/table/batches/table-batch-aggregator';
export {RowTableBatchAggregator} from './lib/table/batches/row-table-batch-aggregator';
export {ColumnarTableBatchAggregator} from './lib/table/batches/columnar-table-batch-aggregator';
export {
makeTableBatchIterator,
makeBatchFromTable
} from './lib/table/batches/make-table-batch-iterator';
export {
makeArrowTableBatchIterator,
makeArrowRecordBatchIterator
} from './lib/table/batches/make-arrow-batch-iterator';
export {convertBatch, convertBatches} from './lib/table/batches/convert-batches';

export {
isArrayRowTable,
isObjectRowTable,
isColumnarTable,
isGeoJSONTable,
isArrowTable
} from './lib/table/tables/table-types';

export {
isTable,
Expand All @@ -46,11 +65,12 @@ export {
makeObjectRowIterator
} from './lib/table/tables/table-accessors';

export {makeTableFromData} from './lib/table/tables/make-table';
export {makeTableFromBatches, makeBatchFromTable} from './lib/table/tables/make-table-from-batches';
export {convertTable} from './lib/table/tables/convert-table';
export {deduceTableSchema} from './lib/schema/deduce-table-schema';
export {convertToObjectRow, convertToArrayRow} from './lib/table/tables/row-utils';
// Table batch builders

export {TableBatchBuilder} from './lib/table/batch-builder/table-batch-builder';
export type {TableBatchAggregator} from './lib/table/batch-builder/table-batch-aggregator';
export {RowTableBatchAggregator} from './lib/table/batch-builder/row-table-batch-aggregator';
export {ColumnarTableBatchAggregator} from './lib/table/batch-builder/columnar-table-batch-aggregator';

export {ArrowLikeTable} from './lib/table/arrow-api/arrow-like-table';

Expand Down
Loading
Loading