Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max reader algo #1352

Merged
merged 17 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "elasticsearch",
"version": "4.1.0",
"version": "4.2.0",
"minimum_teraslice_version": "2.0.0"
}
4 changes: 2 additions & 2 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "asset",
"displayName": "Asset",
"version": "4.1.0",
"version": "4.2.0",
"private": true,
"description": "",
"license": "MIT",
Expand All @@ -20,7 +20,7 @@
"dependencies": {
"@terascope/data-mate": "~1.7.4",
"@terascope/elasticsearch-api": "~4.8.1",
"@terascope/elasticsearch-asset-apis": "~1.1.0",
"@terascope/elasticsearch-asset-apis": "~1.2.0",
"@terascope/job-components": "~1.9.3",
"@terascope/teraslice-state-storage": "~1.8.1",
"@terascope/utils": "~1.7.3",
Expand Down
1 change: 1 addition & 0 deletions asset/src/__lib/ReaderAPIFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class ReaderAPIFetcher extends Fetcher<ESDateConfig> {
await super.initialize();

const { context, api, opConfig } = this;

if (isPromAvailable(context)) {
await this.context.apis.foundation.promMetrics.addGauge(
'elasticsearch_records_read',
Expand Down
2 changes: 1 addition & 1 deletion asset/src/elasticsearch_bulk/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OpConfig } from '@terascope/job-components';
import { ElasticsearchAPISenderConfig } from '../elasticsearch_sender_api/interfaces.js';

export interface ElasticsearchBulkConfig extends ElasticsearchAPISenderConfig, OpConfig {
export interface ElasticsearchBulkConfig extends ElasticsearchAPISenderConfig, Omit<OpConfig, 'connection'> {
api_name: string;
connection: string;
}
10 changes: 10 additions & 0 deletions asset/src/elasticsearch_reader_api/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ export const schema = {
doc: 'Use the original fetch algorithm, that sets query size to windowSize without extra retry logic',
default: false,
format: Boolean
},
total_optimization: {
doc: 'Setting to true will disable tracking total hits for fetches, and only track up to the config size for counts, but will track totals if recurse_optimization is set to true for slicers.',
default: false,
format: Boolean
},
recurse_optimization: {
doc: 'Setting to true will change the recursive behavior of slicers to better match ratios of counts and size instead of just splitting in half or into individual keys',
default: false,
format: Boolean
}
};

Expand Down
2 changes: 2 additions & 0 deletions asset/src/id_reader/slicer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export default class ESIDSlicer extends ParallelSlicer<ESIDReaderConfig> {
this.api = await apiManager.create(apiName, {});

const apiConfig = apiManager.getConfig(apiName);

if (!apiConfig) throw new Error(`Could not find api config for api_name ${apiName}`);
this.config = apiConfig;

Expand All @@ -30,6 +31,7 @@ export default class ESIDSlicer extends ParallelSlicer<ESIDReaderConfig> {
this.slicerRanges = await this.api.makeIDSlicerRanges({
numOfSlicers: this.executionConfig.slicers,
});

await super.initialize(recoveryData);
}

Expand Down
17 changes: 8 additions & 9 deletions asset/src/spaces_reader_api/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ const apiSchema = {
default: undefined,
format: String
},
includeTotals: {
doc: 'By default, data fetching is optimized by disabling total count calculation to achieve '
+ 'faster query execution. If you require total counts in your queries set this value to true '
+ `Some endpoints support setting to a fixed integer to limit the count up to that number then `
+ `stop... set to 'number' to count up to the query or slice size then stop. `,
total_optimization: {
doc: 'Setting to true will optimize tracking total hits by turning off for fetches, and only track up to the config size for counts.',
default: true,
format: Boolean
},
recurse_optimization: {
doc: 'Setting to true will change the recursive behavior of slicers to better match ratios of counts and size instead of just splitting in half or into individual keys',
default: false,
format(val: unknown) {
if (val === 'number' || typeof val !== 'number') return;
throw new Error(`Invalid parameter includeTotals, must be a boolean or string 'number', got ${getTypeOf(val)}`);
}
format: Boolean
}
};

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticsearch-assets",
"displayName": "Elasticsearch Assets",
"version": "4.1.0",
"version": "4.2.0",
"private": true,
"description": "bundle of processors for teraslice",
"homepage": "https://github.com/terascope/elasticsearch-assets#readme",
Expand Down Expand Up @@ -46,7 +46,7 @@
"devDependencies": {
"@terascope/data-types": "~1.7.3",
"@terascope/elasticsearch-api": "~4.8.1",
"@terascope/elasticsearch-asset-apis": "~1.1.0",
"@terascope/elasticsearch-asset-apis": "~1.2.0",
"@terascope/eslint-config": "~1.1.5",
"@terascope/job-components": "~1.9.3",
"@terascope/scripts": "~1.10.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-asset-apis/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/elasticsearch-asset-apis",
"displayName": "Elasticsearch Asset Apis",
"version": "1.1.0",
"version": "1.2.0",
"description": "Elasticsearch reader and sender apis",
"homepage": "https://github.com/terascope/elasticsearch-assets",
"repository": "git@github.com:terascope/elasticsearch-assets.git",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
dateOptions, processInterval, dateFormat,
dateFormatSeconds, parseDate, delayedStreamSegment,
determineIDSlicerRanges, determineDateSlicerRanges,
idSlicerOptimized
} from './algorithms/index.js';
import {
ESReaderOptions, DateSegments, InputDateSegments,
Expand Down Expand Up @@ -82,6 +83,7 @@ export class ElasticsearchReaderAPI {

async count(queryParams: ReaderSlice = {}): Promise<number> {
const query = buildQuery(this.config, { ...queryParams, count: 0 });
// TODO: change this to search probably
return this.client.count(query as any);
}

Expand Down Expand Up @@ -407,7 +409,8 @@ export class ElasticsearchReaderAPI {
baseKeyArray,
startingKeyDepth: this.config.starting_key_depth,
countFn: this.count,
size
size,
keyType: this.config.key_type
};

if (recoveryData && recoveryData.length > 0) {
Expand All @@ -426,6 +429,10 @@ export class ElasticsearchReaderAPI {
slicerConfig.retryData = parsedRetry[slicerID];
}

if (this.config.recurse_optimization) {
return idSlicerOptimized(slicerConfig);
}

return idSlicer(slicerConfig);
}

Expand Down Expand Up @@ -595,7 +602,8 @@ export class ElasticsearchReaderAPI {
subslice_key_threshold: subsliceKeyThreshold,
key_type: keyType,
id_field_name: idFieldName,
starting_key_depth: startingKeyDepth
starting_key_depth: startingKeyDepth,
recurse_optimization = false
} = this.config;

if (!this.windowSize) await this.setWindowSize();
Expand All @@ -613,7 +621,8 @@ export class ElasticsearchReaderAPI {
subsliceKeyThreshold,
keyType,
idFieldName,
startingKeyDepth
startingKeyDepth,
recurse_optimization
};

if (isPersistent) {
Expand Down Expand Up @@ -668,7 +677,7 @@ export class ElasticsearchReaderAPI {
if (date) return parseDate(date);

// we are in auto, so we determine each part
const query: AnyObject = {
const query: ClientParams.SearchParams = {
index: this.config.index,
size: 1,
body: {
Expand All @@ -677,7 +686,8 @@ export class ElasticsearchReaderAPI {
order: order === 'start' ? 'asc' : 'desc'
}
}]
}
},
track_total_hits: false
};

if (this.config.query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import tls from 'tls';
import {
Logger, TSError, get, isNil,
AnyObject, withoutNil, DataEntity,
isBoolean, isKey,
isKey,
} from '@terascope/utils';
import { ClientParams, ClientResponse } from '@terascope/types';
import { DataTypeConfig } from '@terascope/data-types';
Expand Down Expand Up @@ -148,7 +148,7 @@ export class SpacesReaderClient implements ReaderClient {
const fieldsQuery = fields ? { fields: fields.join(',') } : {};
const mustQuery = get(queryConfig, 'body.query.bool.must', null);

function parseQueryConfig(mustArray: null | any[], trackTotalHits?: any): AnyObject {
function parseQueryConfig(mustArray: null | any[]): AnyObject {
const queryOptions: Record<string, (op: any) => string> = {
query_string: _parseEsQ,
range: _parseDate,
Expand Down Expand Up @@ -194,7 +194,7 @@ export class SpacesReaderClient implements ReaderClient {
token: config.token,
q: luceneQuery,
size,
track_total_hits: trackTotalHits
track_total_hits: queryConfig.track_total_hits
});
}

Expand Down Expand Up @@ -258,28 +258,16 @@ export class SpacesReaderClient implements ReaderClient {
return `(${terms.join(' OR ')})`;
}

let trackTotalHits: boolean | number = false;

if (isBoolean(config.includeTotals)) {
trackTotalHits = config.includeTotals;
}
if (config.includeTotals === 'number') {
trackTotalHits = size + 1;
}
if (size === 0) {
// in case client uses a search API instead of count API
trackTotalHits = true;
}

return parseQueryConfig(mustQuery, trackTotalHits);
return parseQueryConfig(mustQuery);
}

async getDataType(): Promise<DataTypeConfig> {
const query = {
token: this.config.token,
q: '_exists_:_key',
size: 0,
include_type_config: true
include_type_config: true,
track_total_hits: false
};

const spaceResults = await this.makeRequest(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,25 @@ export async function determineDateSlicerRanges(
determineDateSlicerRange(config, id)
)));
}

export function splitTime(
start: moment.Moment,
end: moment.Moment,
limit: moment.Moment,
timeResolution: string,
ratio: number
) {
let diff = Math.floor(end.diff(start) * ratio);

if (moment.utc(start).add(diff, 'ms')
.isAfter(limit)) {
diff = moment.utc(limit).diff(start);
}

if (timeResolution === 'ms') {
return diff;
}

const secondDiff = Math.floor(diff / 1000);
return secondDiff;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
dateFormatSeconds,
dateOptions,
determineDateSlicerRange,
splitTime
} from './date-helpers.js';
import { getKeyArray } from './id-helpers.js';

Expand All @@ -36,27 +37,6 @@ interface DateParams {
size: number;
}

function splitTime(
start: moment.Moment,
end: moment.Moment,
limit: moment.Moment,
timeResolution: string
) {
let diff = Math.floor(end.diff(start) / 2);

if (moment.utc(start).add(diff, 'ms')
.isAfter(limit)) {
diff = moment.utc(limit).diff(start);
}

if (timeResolution === 'ms') {
return diff;
}

const secondDiff = Math.floor(diff / 1000);
return secondDiff;
}

export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
const {
events,
Expand All @@ -75,7 +55,8 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
subsliceKeyThreshold,
idFieldName = null,
startingKeyDepth = 0,
keyType = IDType.base64url
keyType = IDType.base64url,
recurse_optimization
} = args;

if (!args.interval) {
Expand Down Expand Up @@ -123,12 +104,16 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
}

if (count > size) {
// old way splits in half, new way create a ratio to try to get closer
// and minimize the amount of slicing
const ratio = recurse_optimization ? size / count : 0.5;

// if size is to big after increasing slice, use alternative division behavior
if (isExpandedSlice) {
// recurse down to the appropriate size
const newStart = moment.utc(dateParams.prevEnd);
// get diff from new start
const diff = splitTime(newStart, end, limit, timeResolution);
const diff = splitTime(newStart, end, limit, timeResolution, ratio);
const newEnd = moment.utc(newStart).add(diff, timeResolution);

if (!newEnd.isValid()) {
Expand Down Expand Up @@ -159,7 +144,7 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
}

// find difference in milliseconds and divide in half
const diff = splitTime(start, end, limit, timeResolution);
const diff = splitTime(start, end, limit, timeResolution, ratio);
const newEnd = moment.utc(start).add(diff, timeResolution);
// prevent recursive call if difference is one millisecond
if (diff <= 0) {
Expand Down Expand Up @@ -188,6 +173,7 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
dateParams.prevEnd = moment.utc(end);

let newEnd = moment.utc(dateParams.end).add(step, unit);

if (newEnd.isSameOrAfter(dateParams.limit)) {
// set to limit
makeLimitQuery = true;
Expand Down Expand Up @@ -261,7 +247,8 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
baseKeyArray: keyArray,
countFn,
size: querySize,
startingKeyDepth
startingKeyDepth,
keyType
};

const idSlicers = idSlicer(idSlicerArs);
Expand Down
Loading