Skip to content

Commit

Permalink
Max reader algo (#1352)
Browse files Browse the repository at this point in the history
For date and id slicer/reader
two new feature flags for experimentation:

- `total_optimization`: defaults to false, if set to true, will will
change the track_total_hits parameter inside the query to not find the
exact total count for a given query. This is useful for dealing with
slices and reads of significant data density. Setting this to true, you
do not gain much benefit at all with the `recurse_optimization` flag so
its advised not to use them both together.
- `recurse_optimization`: defaults to false, if set to true, it attempts
to lower the amount of times it has to recount a query to make a slice
by make smarter chunk sizes based off the total count instead of its
simple splitting logic, only really useful if `total_optimization` is
set to false

For spaces slicer/reader
The two new feature flags for experimentation are the same except that
`total_optimization` defaults to true as per the current usage of this
does so in spaces
  • Loading branch information
jsnoble authored Feb 6, 2025
1 parent 185e2d5 commit 4570745
Show file tree
Hide file tree
Showing 34 changed files with 1,494 additions and 1,114 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "elasticsearch",
"version": "4.1.0",
"version": "4.2.0",
"minimum_teraslice_version": "2.0.0"
}
4 changes: 2 additions & 2 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "asset",
"displayName": "Asset",
"version": "4.1.0",
"version": "4.2.0",
"private": true,
"description": "",
"license": "MIT",
Expand All @@ -20,7 +20,7 @@
"dependencies": {
"@terascope/data-mate": "~1.7.4",
"@terascope/elasticsearch-api": "~4.8.1",
"@terascope/elasticsearch-asset-apis": "~1.1.0",
"@terascope/elasticsearch-asset-apis": "~1.2.0",
"@terascope/job-components": "~1.9.3",
"@terascope/teraslice-state-storage": "~1.8.1",
"@terascope/utils": "~1.7.3",
Expand Down
1 change: 1 addition & 0 deletions asset/src/__lib/ReaderAPIFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class ReaderAPIFetcher extends Fetcher<ESDateConfig> {
await super.initialize();

const { context, api, opConfig } = this;

if (isPromAvailable(context)) {
await this.context.apis.foundation.promMetrics.addGauge(
'elasticsearch_records_read',
Expand Down
2 changes: 1 addition & 1 deletion asset/src/elasticsearch_bulk/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OpConfig } from '@terascope/job-components';
import { ElasticsearchAPISenderConfig } from '../elasticsearch_sender_api/interfaces.js';

export interface ElasticsearchBulkConfig extends ElasticsearchAPISenderConfig, OpConfig {
export interface ElasticsearchBulkConfig extends ElasticsearchAPISenderConfig, Omit<OpConfig, 'connection'> {
api_name: string;
connection: string;
}
10 changes: 10 additions & 0 deletions asset/src/elasticsearch_reader_api/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ export const schema = {
doc: 'Use the original fetch algorithm, that sets query size to windowSize without extra retry logic',
default: false,
format: Boolean
},
total_optimization: {
doc: 'Setting to true will disable tracking total hits for fetches, and only track up to the config size for counts, but will track totals if recurse_optimization is set to true for slicers.',
default: false,
format: Boolean
},
recurse_optimization: {
doc: 'Setting to true will change the recursive behavior of slicers to better match ratios of counts and size instead of just splitting in half or into individual keys',
default: false,
format: Boolean
}
};

Expand Down
2 changes: 2 additions & 0 deletions asset/src/id_reader/slicer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export default class ESIDSlicer extends ParallelSlicer<ESIDReaderConfig> {
this.api = await apiManager.create(apiName, {});

const apiConfig = apiManager.getConfig(apiName);

if (!apiConfig) throw new Error(`Could not find api config for api_name ${apiName}`);
this.config = apiConfig;

Expand All @@ -30,6 +31,7 @@ export default class ESIDSlicer extends ParallelSlicer<ESIDReaderConfig> {
this.slicerRanges = await this.api.makeIDSlicerRanges({
numOfSlicers: this.executionConfig.slicers,
});

await super.initialize(recoveryData);
}

Expand Down
17 changes: 8 additions & 9 deletions asset/src/spaces_reader_api/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ const apiSchema = {
default: undefined,
format: String
},
includeTotals: {
doc: 'By default, data fetching is optimized by disabling total count calculation to achieve '
+ 'faster query execution. If you require total counts in your queries set this value to true '
+ `Some endpoints support setting to a fixed integer to limit the count up to that number then `
+ `stop... set to 'number' to count up to the query or slice size then stop. `,
total_optimization: {
doc: 'Setting to true will optimize tracking total hits by turning off for fetches, and only track up to the config size for counts.',
default: true,
format: Boolean
},
recurse_optimization: {
doc: 'Setting to true will change the recursive behavior of slicers to better match ratios of counts and size instead of just splitting in half or into individual keys',
default: false,
format(val: unknown) {
if (val === 'number' || typeof val !== 'number') return;
throw new Error(`Invalid parameter includeTotals, must be a boolean or string 'number', got ${getTypeOf(val)}`);
}
format: Boolean
}
};

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "elasticsearch-assets",
"displayName": "Elasticsearch Assets",
"version": "4.1.0",
"version": "4.2.0",
"private": true,
"description": "bundle of processors for teraslice",
"homepage": "https://github.com/terascope/elasticsearch-assets#readme",
Expand Down Expand Up @@ -46,7 +46,7 @@
"devDependencies": {
"@terascope/data-types": "~1.7.3",
"@terascope/elasticsearch-api": "~4.8.1",
"@terascope/elasticsearch-asset-apis": "~1.1.0",
"@terascope/elasticsearch-asset-apis": "~1.2.0",
"@terascope/eslint-config": "~1.1.5",
"@terascope/job-components": "~1.9.3",
"@terascope/scripts": "~1.10.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-asset-apis/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/elasticsearch-asset-apis",
"displayName": "Elasticsearch Asset Apis",
"version": "1.1.0",
"version": "1.2.0",
"description": "Elasticsearch reader and sender apis",
"homepage": "https://github.com/terascope/elasticsearch-assets",
"repository": "git@github.com:terascope/elasticsearch-assets.git",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
dateOptions, processInterval, dateFormat,
dateFormatSeconds, parseDate, delayedStreamSegment,
determineIDSlicerRanges, determineDateSlicerRanges,
idSlicerOptimized
} from './algorithms/index.js';
import {
ESReaderOptions, DateSegments, InputDateSegments,
Expand Down Expand Up @@ -82,6 +83,7 @@ export class ElasticsearchReaderAPI {

async count(queryParams: ReaderSlice = {}): Promise<number> {
const query = buildQuery(this.config, { ...queryParams, count: 0 });
// TODO: change this to search probably
return this.client.count(query as any);
}

Expand Down Expand Up @@ -407,7 +409,8 @@ export class ElasticsearchReaderAPI {
baseKeyArray,
startingKeyDepth: this.config.starting_key_depth,
countFn: this.count,
size
size,
keyType: this.config.key_type
};

if (recoveryData && recoveryData.length > 0) {
Expand All @@ -426,6 +429,10 @@ export class ElasticsearchReaderAPI {
slicerConfig.retryData = parsedRetry[slicerID];
}

if (this.config.recurse_optimization) {
return idSlicerOptimized(slicerConfig);
}

return idSlicer(slicerConfig);
}

Expand Down Expand Up @@ -595,7 +602,8 @@ export class ElasticsearchReaderAPI {
subslice_key_threshold: subsliceKeyThreshold,
key_type: keyType,
id_field_name: idFieldName,
starting_key_depth: startingKeyDepth
starting_key_depth: startingKeyDepth,
recurse_optimization = false
} = this.config;

if (!this.windowSize) await this.setWindowSize();
Expand All @@ -613,7 +621,8 @@ export class ElasticsearchReaderAPI {
subsliceKeyThreshold,
keyType,
idFieldName,
startingKeyDepth
startingKeyDepth,
recurse_optimization
};

if (isPersistent) {
Expand Down Expand Up @@ -668,7 +677,7 @@ export class ElasticsearchReaderAPI {
if (date) return parseDate(date);

// we are in auto, so we determine each part
const query: AnyObject = {
const query: ClientParams.SearchParams = {
index: this.config.index,
size: 1,
body: {
Expand All @@ -677,7 +686,8 @@ export class ElasticsearchReaderAPI {
order: order === 'start' ? 'asc' : 'desc'
}
}]
}
},
track_total_hits: false
};

if (this.config.query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import tls from 'tls';
import {
Logger, TSError, get, isNil,
AnyObject, withoutNil, DataEntity,
isBoolean, isKey,
isKey,
} from '@terascope/utils';
import { ClientParams, ClientResponse } from '@terascope/types';
import { DataTypeConfig } from '@terascope/data-types';
Expand Down Expand Up @@ -148,7 +148,7 @@ export class SpacesReaderClient implements ReaderClient {
const fieldsQuery = fields ? { fields: fields.join(',') } : {};
const mustQuery = get(queryConfig, 'body.query.bool.must', null);

function parseQueryConfig(mustArray: null | any[], trackTotalHits?: any): AnyObject {
function parseQueryConfig(mustArray: null | any[]): AnyObject {
const queryOptions: Record<string, (op: any) => string> = {
query_string: _parseEsQ,
range: _parseDate,
Expand Down Expand Up @@ -194,7 +194,7 @@ export class SpacesReaderClient implements ReaderClient {
token: config.token,
q: luceneQuery,
size,
track_total_hits: trackTotalHits
track_total_hits: queryConfig.track_total_hits
});
}

Expand Down Expand Up @@ -258,28 +258,16 @@ export class SpacesReaderClient implements ReaderClient {
return `(${terms.join(' OR ')})`;
}

let trackTotalHits: boolean | number = false;

if (isBoolean(config.includeTotals)) {
trackTotalHits = config.includeTotals;
}
if (config.includeTotals === 'number') {
trackTotalHits = size + 1;
}
if (size === 0) {
// in case client uses a search API instead of count API
trackTotalHits = true;
}

return parseQueryConfig(mustQuery, trackTotalHits);
return parseQueryConfig(mustQuery);
}

async getDataType(): Promise<DataTypeConfig> {
const query = {
token: this.config.token,
q: '_exists_:_key',
size: 0,
include_type_config: true
include_type_config: true,
track_total_hits: false
};

const spaceResults = await this.makeRequest(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,25 @@ export async function determineDateSlicerRanges(
determineDateSlicerRange(config, id)
)));
}

export function splitTime(
start: moment.Moment,
end: moment.Moment,
limit: moment.Moment,
timeResolution: string,
ratio: number
) {
let diff = Math.floor(end.diff(start) * ratio);

if (moment.utc(start).add(diff, 'ms')
.isAfter(limit)) {
diff = moment.utc(limit).diff(start);
}

if (timeResolution === 'ms') {
return diff;
}

const secondDiff = Math.floor(diff / 1000);
return secondDiff;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
dateFormatSeconds,
dateOptions,
determineDateSlicerRange,
splitTime
} from './date-helpers.js';
import { getKeyArray } from './id-helpers.js';

Expand All @@ -36,27 +37,6 @@ interface DateParams {
size: number;
}

function splitTime(
start: moment.Moment,
end: moment.Moment,
limit: moment.Moment,
timeResolution: string
) {
let diff = Math.floor(end.diff(start) / 2);

if (moment.utc(start).add(diff, 'ms')
.isAfter(limit)) {
diff = moment.utc(limit).diff(start);
}

if (timeResolution === 'ms') {
return diff;
}

const secondDiff = Math.floor(diff / 1000);
return secondDiff;
}

export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
const {
events,
Expand All @@ -75,7 +55,8 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
subsliceKeyThreshold,
idFieldName = null,
startingKeyDepth = 0,
keyType = IDType.base64url
keyType = IDType.base64url,
recurse_optimization
} = args;

if (!args.interval) {
Expand Down Expand Up @@ -123,12 +104,16 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
}

if (count > size) {
// old way splits in half, new way create a ratio to try to get closer
// and minimize the amount of slicing
const ratio = recurse_optimization ? size / count : 0.5;

// if size is to big after increasing slice, use alternative division behavior
if (isExpandedSlice) {
// recurse down to the appropriate size
const newStart = moment.utc(dateParams.prevEnd);
// get diff from new start
const diff = splitTime(newStart, end, limit, timeResolution);
const diff = splitTime(newStart, end, limit, timeResolution, ratio);
const newEnd = moment.utc(newStart).add(diff, timeResolution);

if (!newEnd.isValid()) {
Expand Down Expand Up @@ -159,7 +144,7 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
}

// find difference in milliseconds and divide in half
const diff = splitTime(start, end, limit, timeResolution);
const diff = splitTime(start, end, limit, timeResolution, ratio);
const newEnd = moment.utc(start).add(diff, timeResolution);
// prevent recursive call if difference is one millisecond
if (diff <= 0) {
Expand Down Expand Up @@ -188,6 +173,7 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
dateParams.prevEnd = moment.utc(end);

let newEnd = moment.utc(dateParams.end).add(step, unit);

if (newEnd.isSameOrAfter(dateParams.limit)) {
// set to limit
makeLimitQuery = true;
Expand Down Expand Up @@ -261,7 +247,8 @@ export function dateSlicer(args: SlicerArgs): () => Promise<DateSlicerResults> {
baseKeyArray: keyArray,
countFn,
size: querySize,
startingKeyDepth
startingKeyDepth,
keyType
};

const idSlicers = idSlicer(idSlicerArs);
Expand Down
Loading

0 comments on commit 4570745

Please sign in to comment.