Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Commit for Advanced Batching #11544

Merged
merged 9 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/search/search-documents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@
"@azure/core-tracing": "1.0.0-preview.9",
"@azure/logger": "^1.0.0",
"@opentelemetry/api": "^0.10.2",
"tslib": "^2.0.0"
"tslib": "^2.0.0",
"events": "^3.0.0"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down
54 changes: 54 additions & 0 deletions sdk/search/search-documents/review/search-documents.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { KeyCredential } from '@azure/core-auth';
import { OperationOptions } from '@azure/core-http';
import { PagedAsyncIterableIterator } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RestError } from '@azure/core-http';

// @public
export interface AnalyzedTokenInfo {
Expand Down Expand Up @@ -1028,6 +1029,8 @@ export class SearchClient<T> {
readonly endpoint: string;
getDocument<Fields extends keyof T>(key: string, options?: GetDocumentOptions<Fields>): Promise<T>;
getDocumentsCount(options?: CountDocumentsOptions): Promise<number>;
// (undocumented)
getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender<T>;
indexDocuments(batch: IndexDocumentsBatch<T>, options?: IndexDocumentsOptions): Promise<IndexDocumentsResult>;
readonly indexName: string;
mergeDocuments(documents: T[], options?: MergeDocumentsOptions): Promise<IndexDocumentsResult>;
Expand Down Expand Up @@ -1222,6 +1225,57 @@ export interface SearchIndexerWarning {
readonly name?: string;
}

// @public
export class SearchIndexingBufferedSender<T> {
constructor(client: SearchClient<T>, options?: SearchIndexingBufferedSenderOptions);
// (undocumented)
deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise<void>;
dispose(): void;
// (undocumented)
flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise<void>;
// (undocumented)
mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise<void>;
// (undocumented)
mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise<void>;
// (undocumented)
off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
// (undocumented)
off(event: "batchFailed", listener: (e: RestError) => void): void;
// (undocumented)
on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void;
// (undocumented)
on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
// (undocumented)
on(event: "batchFailed", listener: (e: RestError) => void): void;
// (undocumented)
uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise<void>;
}

// @public (undocumented)
export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions;

// @public (undocumented)
export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions;

// @public (undocumented)
export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions;

// @public (undocumented)
export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions;

// @public (undocumented)
export interface SearchIndexingBufferedSenderOptions {
// (undocumented)
autoFlush?: boolean;
// (undocumented)
batchSize?: number;
// (undocumented)
flushWindowInMs?: number;
}

// @public (undocumented)
export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions;

// @public
export interface SearchIndexStatistics {
readonly documentCount: number;
Expand Down
9 changes: 8 additions & 1 deletion sdk/search/search-documents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

export { SearchClient, SearchClientOptions } from "./searchClient";
export {SearchIndexingBufferedSender} from "./searchIndexingBufferedSender";
export {
AutocompleteRequest,
AutocompleteOptions,
Expand All @@ -25,7 +26,13 @@ export {
SuggestOptions,
MergeDocumentsOptions,
MergeOrUploadDocumentsOptions,
UploadDocumentsOptions
UploadDocumentsOptions,
SearchIndexingBufferedSenderOptions,
SearchIndexingBufferedSenderDeleteDocumentsOptions,
SearchIndexingBufferedSenderFlushDocumentsOptions,
SearchIndexingBufferedSenderMergeDocumentsOptions,
SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions,
SearchIndexingBufferedSenderUploadDocumentsOptions
} from "./indexModels";
export { SearchIndexClient, SearchIndexClientOptions } from "./searchIndexClient";
export { SearchIndexerClient, SearchIndexerClientOptions } from "./searchIndexerClient";
Expand Down
12 changes: 12 additions & 0 deletions sdk/search/search-documents/src/indexModels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ export type SearchOptions<Fields> = OperationOptions & SearchRequestOptions<Fiel
*/
export type SuggestOptions<Fields> = OperationOptions & SuggestRequest<Fields>;

export interface SearchIndexingBufferedSenderOptions {
autoFlush?:boolean;
flushWindowInMs?:number;
batchSize?:number;
}

export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions;
export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions;
export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions;
export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions;
export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions;

/**
* Options for retrieving a single document.
*/
Expand Down
8 changes: 7 additions & 1 deletion sdk/search/search-documents/src/searchClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import {
DeleteDocumentsOptions,
SearchDocumentsPageResult,
MergeOrUploadDocumentsOptions,
SearchRequest
SearchRequest,
SearchIndexingBufferedSenderOptions
} from "./indexModels";
import { odataMetadataPolicy } from "./odataMetadataPolicy";
import { IndexDocumentsBatch } from "./indexDocumentsBatch";
import { encode, decode } from "./base64";
import * as utils from "./serviceUtils";
import {SearchIndexingBufferedSender} from ".";

/**
* Client options used to configure Cognitive Search API requests.
Expand Down Expand Up @@ -616,6 +618,10 @@ export class SearchClient<T> {
}
}

public getSearchIndexingBufferedSenderInstance(options: SearchIndexingBufferedSenderOptions = {}): SearchIndexingBufferedSender<T> {
return new SearchIndexingBufferedSender(this, options);
}

private encodeContinuationToken(
nextLink: string | undefined,
nextPageParameters: SearchRequest | undefined
Expand Down
193 changes: 193 additions & 0 deletions sdk/search/search-documents/src/searchIndexingBufferedSender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import {SearchClient} from "./searchClient";
import {IndexDocumentsBatch} from "./indexDocumentsBatch";
import {IndexDocumentsAction, SearchIndexingBufferedSenderOptions, SearchIndexingBufferedSenderUploadDocumentsOptions, SearchIndexingBufferedSenderMergeDocumentsOptions, SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, SearchIndexingBufferedSenderDeleteDocumentsOptions, SearchIndexingBufferedSenderFlushDocumentsOptions} from "./indexModels";
import {IndexDocumentsResult} from "./generated/data/models";
import {RestError, OperationOptions} from "@azure/core-http";
import EventEmitter from 'events';
import {createSpan} from "./tracing";
import {CanonicalCode} from "@opentelemetry/api";

const DEFAULT_BATCH_SIZE:number = 1000;
const DEFAULT_FLUSH_WINDOW:number = 60000;
const RETRY_COUNT:number = 3;

/**
* Class used to perform buffered operations against a search index,
* including adding, updating, and removing them.
*/
export class SearchIndexingBufferedSender<T> {
private client:SearchClient<T>;
private autoFlush:boolean;
private flushWindowInMs:number;
private batchSize:number;
private batchObject: IndexDocumentsBatch<T>;
private cleanupTimer?: () => void;
private readonly emitter = new EventEmitter();

constructor(client: SearchClient<T>, options: SearchIndexingBufferedSenderOptions = {}) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this release, I am keeping the constructor just like Python

this.client = client;
this.autoFlush = options.autoFlush ?? false;
this.flushWindowInMs = options.flushWindowInMs?? DEFAULT_FLUSH_WINDOW;
this.batchSize = options.batchSize?? DEFAULT_BATCH_SIZE;
this.batchObject = new IndexDocumentsBatch<T>();
if(this.autoFlush) {
const interval = setInterval(() => this.flush(), this.flushWindowInMs);
interval?.unref();
this.cleanupTimer = () => {
clearInterval(interval);
}
}
}

public async uploadDocuments(documents: T[], options: SearchIndexingBufferedSenderUploadDocumentsOptions = {}): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-uploadDocuments", options);
try {
this.batchObject.upload(documents);
this.emitter.emit("batchAdded", {
action: "upload",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}

public async mergeDocuments(documents: T[], options: SearchIndexingBufferedSenderMergeDocumentsOptions = {}): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-mergeDocuments", options);
try {
this.batchObject.merge(documents);
this.emitter.emit("batchAdded", {
action: "merge",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}

public async mergeOrUploadDocuments(documents: T[], options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {}): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-mergeOrUploadDocuments", options);
try {
this.batchObject.mergeOrUpload(documents);
this.emitter.emit("batchAdded", {
action: "mergeOrUpload",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}

public async deleteDocuments(documents: T[], options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {}): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-deleteDocuments", options);
try {
this.batchObject.delete(documents);
this.emitter.emit("batchAdded", {
action: "delete",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}

public async flush(options: SearchIndexingBufferedSenderFlushDocumentsOptions = {}): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-deleteDocuments", options);
try {
if (this.batchObject.actions.length > 0) {
return this.internalFlush(true, updatedOptions);
}
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}

/**
* If using autoFlush: true, call this to cleanup the autoflush timer.
*/
public dispose(): void {
this.cleanupTimer && this.cleanupTimer();
}

public on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void;
public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
public on(event: "batchFailed", listener: (e: RestError) => void): void;
public on(event: "batchAdded" | "batchSucceeded" | "batchFailed", listener: (e: any) => void): void {
this.emitter.on(event, listener);
}

public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
public off(event: "batchFailed", listener: (e: RestError) => void): void;
public off(event: "batchSucceeded" | "batchFailed", listener: (e: any) => void): void {
this.emitter.removeListener(event, listener);
}

private isBatchReady(): boolean {
return (this.batchObject.actions.length >= this.batchSize);
}

private async internalFlush(force: boolean, options: OperationOptions = {}): Promise<void> {
if(force || (this.autoFlush && this.isBatchReady())) {
// Split it
const actions:IndexDocumentsAction<T>[] = this.batchObject.actions;
this.batchObject = new IndexDocumentsBatch<T>();
while(actions.length > 0) {
const actionsToSend = actions.splice(0, this.batchSize);
await this.submitDocuments(actionsToSend, options);
}
}
}

private async submitDocuments(actionsToSend: IndexDocumentsAction<T>[], options:OperationOptions, retryAttempt:number = 0): Promise<void> {
try {
const result = await this.client.indexDocuments(new IndexDocumentsBatch<T>(actionsToSend), options);
// raise success event
this.emitter.emit("batchSucceeded", result);
} catch(e) {
if(this.isRetryAbleError(e) && retryAttempt < RETRY_COUNT) {
this.submitDocuments(actionsToSend, options, retryAttempt + 1);
} else {
this.emitter.emit("batchFailed", e);
throw(e);
}
}
}

private isRetryAbleError(e: any): boolean {
return (e.code && (e.code === "422" || e.code === "409" || e.code === "503"));
}
}