Skip to content

Commit

Permalink
add auto type casting
Browse files Browse the repository at this point in the history
  • Loading branch information
invisal committed Oct 28, 2024
1 parent 9fca126 commit 707dd84
Showing 1 changed file with 140 additions and 1 deletion.
141 changes: 140 additions & 1 deletion src/connections/bigquery.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { QueryType } from '../query-params';
import { Query } from '../query';
import { QueryResult } from './index';
import { ConnectionSelectOptions, QueryResult } from './index';
import { Database, Schema, Table, TableColumn } from '../models/database';
import { BigQueryDialect } from '../query-builder/dialects/bigquery';
import { BigQuery } from '@google-cloud/bigquery';
Expand All @@ -10,9 +10,18 @@ import {
} from './../utils/transformer';
import { SqlConnection } from './sql-base';

const NUMERIC_TYPE = [
'INT64',
'FLOAT64',
'INTEGER',
'FLOAT',
'NUMERIC',
'BIGNUMERIC',
];
export class BigQueryConnection extends SqlConnection {
bigQuery: BigQuery;
dialect = new BigQueryDialect();
cacheFields: Record<string, Record<string, string>> = {};

constructor(bigQuery: any) {
super();
Expand Down Expand Up @@ -41,6 +50,136 @@ export class BigQueryConnection extends SqlConnection {
return super.createTable(schemaName, tableName, tempColumns);
}

async getFields(
schemaName: string,
tableName: string
): Promise<Record<string, string>> {
if (this.cacheFields[schemaName]) return this.cacheFields[schemaName];

if (!schemaName)
throw new Error('Schema name is required for BigQuery');

const [metadata] = await this.bigQuery
.dataset(schemaName)
.table(tableName)
.getMetadata();

const fields: { name: string; type: string }[] = metadata.schema.fields;
const fieldsType: Record<string, string> = fields.reduce(
(acc, field) => {
acc[field.name] = field.type;
return acc;
},
{} as Record<string, string>
);

this.cacheFields[schemaName] = fieldsType;
return fieldsType;
}

transformTypedValue(type: string, value: unknown) {
if (value === null) return value;

if (NUMERIC_TYPE.includes(type)) {
return Number(value);
}

return value;
}

async autoCastingType(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>
): Promise<Record<string, unknown>> {
const tmp = structuredClone(data);

if (!schemaName)
throw new Error('Schema name is required for BigQuery');

const fieldsType: Record<string, string> = await this.getFields(
schemaName,
tableName
);

for (const key in tmp) {
const type = fieldsType[key];
if (!type) continue;
tmp[key] = this.transformTypedValue(type, tmp[key]);
}

return tmp;
}

async insert(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>
): Promise<QueryResult> {
return super.insert(
schemaName,
tableName,
await this.autoCastingType(schemaName, tableName, data)
);
}

async insertMany(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>[]
): Promise<QueryResult> {
const newData: Record<string, unknown>[] = [];

for (const item of data) {
newData.push(
await this.autoCastingType(schemaName, tableName, item)
);
}

return super.insertMany(schemaName, tableName, newData);
}

async update(
schemaName: string | undefined,
tableName: string,
data: Record<string, unknown>,
where: Record<string, unknown>
): Promise<QueryResult> {
return super.update(
schemaName,
tableName,
await this.autoCastingType(schemaName, tableName, data),
await this.autoCastingType(schemaName, tableName, where)
);
}

async select(
schemaName: string,
tableName: string,
options: ConnectionSelectOptions
): Promise<QueryResult> {
// Auto casting the where
let where = options.where;

if (where && where.length > 0) {
const fields = await this.getFields(schemaName, tableName);
where = where.map((t) => {
const type = fields[t.name];
if (!type) return t;

return {
...t,
value: this.transformTypedValue(type, t.value),
};
});
}

return super.select(schemaName, tableName, {
...options,
where,
});
}

/**
* Triggers a query action on the current Connection object.
*
Expand Down

0 comments on commit 707dd84

Please sign in to comment.