Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-953] Optimize CloudFetchResultHandler memory consumption #204

Merged
merged 15 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ node_modules
.nyc_output
coverage_e2e
coverage_unit
.clinic
*.code-workspace
dist
*.DS_Store
Expand Down
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ node_modules
.nyc_output
coverage_e2e
coverage_unit
.clinic

dist
thrift
Expand Down
13 changes: 11 additions & 2 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import RowSetProvider from '../result/RowSetProvider';
import JsonResultHandler from '../result/JsonResultHandler';
import ArrowResultHandler from '../result/ArrowResultHandler';
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
import ArrowResultConverter from '../result/ArrowResultConverter';
import ResultSlicer from '../result/ResultSlicer';
import { definedOrError } from '../utils';
import HiveDriverError from '../errors/HiveDriverError';
Expand Down Expand Up @@ -377,10 +378,18 @@ export default class DBSQLOperation implements IOperation {
resultSource = new JsonResultHandler(this.context, this._data, metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
resultSource = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
resultSource = new ArrowResultConverter(
this.context,
new ArrowResultHandler(this.context, this._data, metadata.arrowSchema),
metadata.schema,
);
break;
case TSparkRowSetType.URL_BASED_SET:
resultSource = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
resultSource = new ArrowResultConverter(
this.context,
new CloudFetchResultHandler(this.context, this._data),
metadata.schema,
);
break;
// no default
}
Expand Down
182 changes: 182 additions & 0 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { Buffer } from 'buffer';
import {
Table,
Schema,
Field,
TypeMap,
DataType,
Type,
StructRow,
MapRow,
Vector,
RecordBatch,
RecordBatchReader,
util as arrowUtils,
} from 'apache-arrow';
import { TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;

type ArrowSchema = Schema<TypeMap>;
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;

export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<Array<Buffer>>;

private readonly schema: Array<TColumnDesc>;

private reader?: IterableIterator<RecordBatch<TypeMap>>;

private pendingRecordBatch?: RecordBatch<TypeMap>;

constructor(context: IClientContext, source: IResultsProvider<Array<Buffer>>, schema?: TTableSchema) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
}

public async hasMore() {
if (this.schema.length === 0) {
return false;
}
if (this.pendingRecordBatch) {
return true;
}
return this.source.hasMore();
}

public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (this.schema.length === 0) {
return [];
}

// eslint-disable-next-line no-constant-condition
while (true) {
// It's not possible to know if iterator has more items until trying
// to get the next item. But we need to know if iterator is empty right
// after getting the next item. Therefore, after creating the iterator,
// we get one item more and store it in `pendingRecordBatch`. Next time,
// we use that stored item, and prefetch the next one. Prefetched item
// is therefore the next item we are going to return, so it can be used
// to know if we actually can return anything next time
const recordBatch = this.pendingRecordBatch;
this.pendingRecordBatch = this.prefetch();

if (recordBatch) {
const table = new Table(recordBatch);
return this.getRows(table.schema, table.toArray());
}

// eslint-disable-next-line no-await-in-loop
const batches = await this.source.fetchNext(options);
if (batches.length === 0) {
this.reader = undefined;
break;
}

const reader = RecordBatchReader.from<TypeMap>(batches);
this.reader = reader[Symbol.iterator]();
this.pendingRecordBatch = this.prefetch();
}

return [];
}

private prefetch(): RecordBatch<TypeMap> | undefined {
const item = this.reader?.next() ?? { done: true, value: undefined };

if (item.done || item.value === undefined) {
this.reader = undefined;
return undefined;
}

return item.value;
}

private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
return rows.map((row) => {
// First, convert native Arrow values to corresponding plain JS objects
const record = this.convertArrowTypes(row, undefined, schema.fields);
// Second, cast all the values to original Thrift types
return this.convertThriftTypes(record);
});
}

private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
if (value === null) {
return value;
}

const fieldsMap: Record<string, ArrowSchemaField> = {};
for (const field of fields) {
fieldsMap[field.name] = field;
}

// Convert structures to plain JS object and process all its fields recursively
if (value instanceof StructRow) {
const result = value.toJSON();
for (const key of Object.keys(result)) {
const field: ArrowSchemaField | undefined = fieldsMap[key];
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
}
return result;
}
if (value instanceof MapRow) {
const result = value.toJSON();
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
for (const key of Object.keys(result)) {
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
}
return result;
}

// Convert lists to JS array and process items recursively
if (value instanceof Vector) {
const result = value.toJSON();
// Array type contains the only child which defines a type of each array's element
const field = fieldsMap.element;
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
}

if (DataType.isTimestamp(valueType)) {
return new Date(value);
}

// Convert big number values to BigInt
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
if (value instanceof Object && value[isArrowBigNumSymbol]) {
const result = bigNumToBigInt(value);
if (DataType.isDecimal(valueType)) {
return Number(result) / 10 ** valueType.scale;
}
return result;
}

// Convert binary data to Buffer
if (value instanceof Uint8Array) {
return Buffer.from(value);
}

// Return other values as is
return typeof value === 'bigint' ? Number(value) : value;
}

private convertThriftTypes(record: Record<string, any>): any {
const result: Record<string, any> = {};

this.schema.forEach((column) => {
const typeDescriptor = column.typeDesc.types[0]?.primitiveEntry;
const field = column.columnName;
const value = record[field];
result[field] = value === null ? null : convertThriftValue(typeDescriptor, value);
});

return result;
}
}
138 changes: 13 additions & 125 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,158 +1,46 @@
import { Buffer } from 'buffer';
import {
tableFromIPC,
Schema,
Field,
TypeMap,
DataType,
Type,
StructRow,
MapRow,
Vector,
util as arrowUtils,
} from 'apache-arrow';
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
import { TRowSet } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;

type ArrowSchema = Schema<TypeMap>;
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;

export default class ArrowResultHandler implements IResultsProvider<Array<any>> {
export default class ArrowResultHandler implements IResultsProvider<Array<Buffer>> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly schema: Array<TColumnDesc>;

private readonly arrowSchema?: Buffer;

constructor(
context: IClientContext,
source: IResultsProvider<TRowSet | undefined>,
schema?: TTableSchema,
arrowSchema?: Buffer,
) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, arrowSchema?: Buffer) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
this.arrowSchema = arrowSchema;
}

public async hasMore() {
if (!this.arrowSchema) {
return false;
}
return this.source.hasMore();
}

public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (this.schema.length === 0 || !this.arrowSchema) {
return [];
}

const data = await this.source.fetchNext(options);

const batches = await this.getBatches(data);
if (batches.length === 0) {
if (!this.arrowSchema) {
return [];
}

const table = tableFromIPC<TypeMap>([this.arrowSchema, ...batches]);
return this.getRows(table.schema, table.toArray());
}

protected async getBatches(rowSet?: TRowSet): Promise<Array<Buffer>> {
const result: Array<Buffer> = [];
const rowSet = await this.source.fetchNext(options);

const batches: Array<Buffer> = [];
rowSet?.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
result.push(arrowBatch.batch);
batches.push(arrowBatch.batch);
}
});

return result;
}

private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
return rows.map((row) => {
// First, convert native Arrow values to corresponding plain JS objects
const record = this.convertArrowTypes(row, undefined, schema.fields);
// Second, cast all the values to original Thrift types
return this.convertThriftTypes(record);
});
}

private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
if (value === null) {
return value;
}

const fieldsMap: Record<string, ArrowSchemaField> = {};
for (const field of fields) {
fieldsMap[field.name] = field;
}

// Convert structures to plain JS object and process all its fields recursively
if (value instanceof StructRow) {
const result = value.toJSON();
for (const key of Object.keys(result)) {
const field: ArrowSchemaField | undefined = fieldsMap[key];
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
}
return result;
}
if (value instanceof MapRow) {
const result = value.toJSON();
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
for (const key of Object.keys(result)) {
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
}
return result;
}

// Convert lists to JS array and process items recursively
if (value instanceof Vector) {
const result = value.toJSON();
// Array type contains the only child which defines a type of each array's element
const field = fieldsMap.element;
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
}

if (DataType.isTimestamp(valueType)) {
return new Date(value);
}

// Convert big number values to BigInt
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
if (value instanceof Object && value[isArrowBigNumSymbol]) {
const result = bigNumToBigInt(value);
if (DataType.isDecimal(valueType)) {
return Number(result) / 10 ** valueType.scale;
}
return result;
}

// Convert binary data to Buffer
if (value instanceof Uint8Array) {
return Buffer.from(value);
if (batches.length === 0) {
return [];
}

// Return other values as is
return typeof value === 'bigint' ? Number(value) : value;
}

private convertThriftTypes(record: Record<string, any>): any {
const result: Record<string, any> = {};

this.schema.forEach((column) => {
const typeDescriptor = column.typeDesc.types[0]?.primitiveEntry;
const field = column.columnName;
const value = record[field];
result[field] = value === null ? null : convertThriftValue(typeDescriptor, value);
});

return result;
return [this.arrowSchema, ...batches];
}
}
Loading
Loading