From 4e26d081dbb8d43fe60ee3221fb0ad8411e076e8 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Tue, 29 Sep 2020 00:10:36 -0700 Subject: [PATCH 1/9] Initial Commit for Advanced Batching --- sdk/search/search-documents/package.json | 3 +- .../review/search-documents.api.md | 54 +++++ sdk/search/search-documents/src/index.ts | 9 +- .../search-documents/src/indexModels.ts | 12 ++ .../search-documents/src/searchClient.ts | 8 +- .../src/searchIndexingBufferedSender.ts | 193 ++++++++++++++++++ 6 files changed, 276 insertions(+), 3 deletions(-) create mode 100644 sdk/search/search-documents/src/searchIndexingBufferedSender.ts diff --git a/sdk/search/search-documents/package.json b/sdk/search/search-documents/package.json index 69e13f37362b..fd102f5d2f4e 100644 --- a/sdk/search/search-documents/package.json +++ b/sdk/search/search-documents/package.json @@ -80,7 +80,8 @@ "@azure/core-tracing": "1.0.0-preview.9", "@azure/logger": "^1.0.0", "@opentelemetry/api": "^0.10.2", - "tslib": "^2.0.0" + "tslib": "^2.0.0", + "events": "^3.0.0" }, "devDependencies": { "@azure/dev-tool": "^1.0.0", diff --git a/sdk/search/search-documents/review/search-documents.api.md b/sdk/search/search-documents/review/search-documents.api.md index 5b07a287af10..4fc66ba959ee 100644 --- a/sdk/search/search-documents/review/search-documents.api.md +++ b/sdk/search/search-documents/review/search-documents.api.md @@ -9,6 +9,7 @@ import { KeyCredential } from '@azure/core-auth'; import { OperationOptions } from '@azure/core-http'; import { PagedAsyncIterableIterator } from '@azure/core-paging'; import { PipelineOptions } from '@azure/core-http'; +import { RestError } from '@azure/core-http'; // @public export interface AnalyzedTokenInfo { @@ -1028,6 +1029,8 @@ export class SearchClient { readonly endpoint: string; getDocument(key: string, options?: GetDocumentOptions): Promise; getDocumentsCount(options?: CountDocumentsOptions): Promise; + // (undocumented) + getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender; indexDocuments(batch: IndexDocumentsBatch, options?: IndexDocumentsOptions): Promise; readonly indexName: string; mergeDocuments(documents: T[], options?: MergeDocumentsOptions): Promise; @@ -1222,6 +1225,57 @@ export interface SearchIndexerWarning { readonly name?: string; } +// @public +export class SearchIndexingBufferedSender { + constructor(client: SearchClient, options?: SearchIndexingBufferedSenderOptions); + // (undocumented) + deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise; + dispose(): void; + // (undocumented) + flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise; + // (undocumented) + mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise; + // (undocumented) + mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise; + // (undocumented) + off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + // (undocumented) + off(event: "batchFailed", listener: (e: RestError) => void): void; + // (undocumented) + on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void; + // (undocumented) + on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + // (undocumented) + on(event: "batchFailed", listener: (e: RestError) => void): void; + // (undocumented) + uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise; +} + +// @public (undocumented) +export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions; + +// @public (undocumented) +export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions; + +// @public (undocumented) +export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions; + +// @public (undocumented) +export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions; + +// @public (undocumented) +export interface SearchIndexingBufferedSenderOptions { + // (undocumented) + autoFlush?: boolean; + // (undocumented) + batchSize?: number; + // (undocumented) + flushWindowInMs?: number; +} + +// @public (undocumented) +export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; + // @public export interface SearchIndexStatistics { readonly documentCount: number; diff --git a/sdk/search/search-documents/src/index.ts b/sdk/search/search-documents/src/index.ts index 7f50e7a83d6a..86e2a918e3fb 100644 --- a/sdk/search/search-documents/src/index.ts +++ b/sdk/search/search-documents/src/index.ts @@ -2,6 +2,7 @@ // Licensed under the MIT license. export { SearchClient, SearchClientOptions } from "./searchClient"; +export {SearchIndexingBufferedSender} from "./searchIndexingBufferedSender"; export { AutocompleteRequest, AutocompleteOptions, @@ -25,7 +26,13 @@ export { SuggestOptions, MergeDocumentsOptions, MergeOrUploadDocumentsOptions, - UploadDocumentsOptions + UploadDocumentsOptions, + SearchIndexingBufferedSenderOptions, + SearchIndexingBufferedSenderDeleteDocumentsOptions, + SearchIndexingBufferedSenderFlushDocumentsOptions, + SearchIndexingBufferedSenderMergeDocumentsOptions, + SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, + SearchIndexingBufferedSenderUploadDocumentsOptions } from "./indexModels"; export { SearchIndexClient, SearchIndexClientOptions } from "./searchIndexClient"; export { SearchIndexerClient, SearchIndexerClientOptions } from "./searchIndexerClient"; diff --git a/sdk/search/search-documents/src/indexModels.ts b/sdk/search/search-documents/src/indexModels.ts index 5387d02b97da..bc25ed47fe8f 100644 --- a/sdk/search/search-documents/src/indexModels.ts +++ b/sdk/search/search-documents/src/indexModels.ts @@ -29,6 +29,18 @@ export type SearchOptions = OperationOptions & SearchRequestOptions = OperationOptions & SuggestRequest; +export interface SearchIndexingBufferedSenderOptions { + autoFlush?:boolean; + flushWindowInMs?:number; + batchSize?:number; +} + +export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; +export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions; +export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions; +export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions; +export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions; + /** * Options for retrieving a single document. */ diff --git a/sdk/search/search-documents/src/searchClient.ts b/sdk/search/search-documents/src/searchClient.ts index 28025c8053e4..3d18badf6d9d 100644 --- a/sdk/search/search-documents/src/searchClient.ts +++ b/sdk/search/search-documents/src/searchClient.ts @@ -41,12 +41,14 @@ import { DeleteDocumentsOptions, SearchDocumentsPageResult, MergeOrUploadDocumentsOptions, - SearchRequest + SearchRequest, + SearchIndexingBufferedSenderOptions } from "./indexModels"; import { odataMetadataPolicy } from "./odataMetadataPolicy"; import { IndexDocumentsBatch } from "./indexDocumentsBatch"; import { encode, decode } from "./base64"; import * as utils from "./serviceUtils"; +import {SearchIndexingBufferedSender} from "."; /** * Client options used to configure Cognitive Search API requests. @@ -616,6 +618,10 @@ export class SearchClient { } } + public getSearchIndexingBufferedSenderInstance(options: SearchIndexingBufferedSenderOptions = {}): SearchIndexingBufferedSender { + return new SearchIndexingBufferedSender(this, options); + } + private encodeContinuationToken( nextLink: string | undefined, nextPageParameters: SearchRequest | undefined diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts new file mode 100644 index 000000000000..af54f1a2dfcb --- /dev/null +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -0,0 +1,193 @@ +import {SearchClient} from "./searchClient"; +import {IndexDocumentsBatch} from "./indexDocumentsBatch"; +import {IndexDocumentsAction, SearchIndexingBufferedSenderOptions, SearchIndexingBufferedSenderUploadDocumentsOptions, SearchIndexingBufferedSenderMergeDocumentsOptions, SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, SearchIndexingBufferedSenderDeleteDocumentsOptions, SearchIndexingBufferedSenderFlushDocumentsOptions} from "./indexModels"; +import {IndexDocumentsResult} from "./generated/data/models"; +import {RestError, OperationOptions} from "@azure/core-http"; +import EventEmitter from 'events'; +import {createSpan} from "./tracing"; +import {CanonicalCode} from "@opentelemetry/api"; + +const DEFAULT_BATCH_SIZE:number = 1000; +const DEFAULT_FLUSH_WINDOW:number = 60000; +const RETRY_COUNT:number = 3; + +/** + * Class used to perform buffered operations against a search index, + * including adding, updating, and removing them. + */ +export class SearchIndexingBufferedSender { + private client:SearchClient; + private autoFlush:boolean; + private flushWindowInMs:number; + private batchSize:number; + private batchObject: IndexDocumentsBatch; + private cleanupTimer?: () => void; + private readonly emitter = new EventEmitter(); + + constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { + this.client = client; + this.autoFlush = options.autoFlush ?? false; + this.flushWindowInMs = options.flushWindowInMs?? DEFAULT_FLUSH_WINDOW; + this.batchSize = options.batchSize?? DEFAULT_BATCH_SIZE; + this.batchObject = new IndexDocumentsBatch(); + if(this.autoFlush) { + const interval = setInterval(() => this.flush(), this.flushWindowInMs); + interval?.unref(); + this.cleanupTimer = () => { + clearInterval(interval); + } + } + } + + public async uploadDocuments(documents: T[], options: SearchIndexingBufferedSenderUploadDocumentsOptions = {}): Promise { + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-uploadDocuments", options); + try { + this.batchObject.upload(documents); + this.emitter.emit("batchAdded", { + action: "upload", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + public async mergeDocuments(documents: T[], options: SearchIndexingBufferedSenderMergeDocumentsOptions = {}): Promise { + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-mergeDocuments", options); + try { + this.batchObject.merge(documents); + this.emitter.emit("batchAdded", { + action: "merge", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + public async mergeOrUploadDocuments(documents: T[], options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {}): Promise { + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-mergeOrUploadDocuments", options); + try { + this.batchObject.mergeOrUpload(documents); + this.emitter.emit("batchAdded", { + action: "mergeOrUpload", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + public async deleteDocuments(documents: T[], options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {}): Promise { + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-deleteDocuments", options); + try { + this.batchObject.delete(documents); + this.emitter.emit("batchAdded", { + action: "delete", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + public async flush(options: SearchIndexingBufferedSenderFlushDocumentsOptions = {}): Promise { + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-deleteDocuments", options); + try { + if (this.batchObject.actions.length > 0) { + return this.internalFlush(true, updatedOptions); + } + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + /** + * If using autoFlush: true, call this to cleanup the autoflush timer. + */ + public dispose(): void { + this.cleanupTimer && this.cleanupTimer(); + } + + public on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void; + public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + public on(event: "batchFailed", listener: (e: RestError) => void): void; + public on(event: "batchAdded" | "batchSucceeded" | "batchFailed", listener: (e: any) => void): void { + this.emitter.on(event, listener); + } + + public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + public off(event: "batchFailed", listener: (e: RestError) => void): void; + public off(event: "batchSucceeded" | "batchFailed", listener: (e: any) => void): void { + this.emitter.removeListener(event, listener); + } + + private isBatchReady(): boolean { + return (this.batchObject.actions.length >= this.batchSize); + } + + private async internalFlush(force: boolean, options: OperationOptions = {}): Promise { + if(force || (this.autoFlush && this.isBatchReady())) { + // Split it + const actions:IndexDocumentsAction[] = this.batchObject.actions; + this.batchObject = new IndexDocumentsBatch(); + while(actions.length > 0) { + const actionsToSend = actions.splice(0, this.batchSize); + await this.submitDocuments(actionsToSend, options); + } + } + } + + private async submitDocuments(actionsToSend: IndexDocumentsAction[], options:OperationOptions, retryAttempt:number = 0): Promise { + try { + const result = await this.client.indexDocuments(new IndexDocumentsBatch(actionsToSend), options); + // raise success event + this.emitter.emit("batchSucceeded", result); + } catch(e) { + if(this.isRetryAbleError(e) && retryAttempt < RETRY_COUNT) { + this.submitDocuments(actionsToSend, options, retryAttempt + 1); + } else { + this.emitter.emit("batchFailed", e); + throw(e); + } + } + } + + private isRetryAbleError(e: any): boolean { + return (e.code && (e.code === "422" || e.code === "409" || e.code === "503")); + } +} From df72e410aca0aa32f2fa2a6d81f93d24a9a95d08 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Tue, 29 Sep 2020 00:32:34 -0700 Subject: [PATCH 2/9] Formatting changes --- sdk/search/search-documents/src/index.ts | 2 +- .../search-documents/src/indexModels.ts | 6 +- .../search-documents/src/searchClient.ts | 6 +- .../src/searchIndexingBufferedSender.ts | 131 ++++++++++++------ 4 files changed, 97 insertions(+), 48 deletions(-) diff --git a/sdk/search/search-documents/src/index.ts b/sdk/search/search-documents/src/index.ts index 86e2a918e3fb..10474cb3876e 100644 --- a/sdk/search/search-documents/src/index.ts +++ b/sdk/search/search-documents/src/index.ts @@ -2,7 +2,7 @@ // Licensed under the MIT license. export { SearchClient, SearchClientOptions } from "./searchClient"; -export {SearchIndexingBufferedSender} from "./searchIndexingBufferedSender"; +export { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender"; export { AutocompleteRequest, AutocompleteOptions, diff --git a/sdk/search/search-documents/src/indexModels.ts b/sdk/search/search-documents/src/indexModels.ts index bc25ed47fe8f..61e10d1811d0 100644 --- a/sdk/search/search-documents/src/indexModels.ts +++ b/sdk/search/search-documents/src/indexModels.ts @@ -30,9 +30,9 @@ export type SearchOptions = OperationOptions & SearchRequestOptions = OperationOptions & SuggestRequest; export interface SearchIndexingBufferedSenderOptions { - autoFlush?:boolean; - flushWindowInMs?:number; - batchSize?:number; + autoFlush?: boolean; + flushWindowInMs?: number; + batchSize?: number; } export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; diff --git a/sdk/search/search-documents/src/searchClient.ts b/sdk/search/search-documents/src/searchClient.ts index 3d18badf6d9d..999144f72d10 100644 --- a/sdk/search/search-documents/src/searchClient.ts +++ b/sdk/search/search-documents/src/searchClient.ts @@ -48,7 +48,7 @@ import { odataMetadataPolicy } from "./odataMetadataPolicy"; import { IndexDocumentsBatch } from "./indexDocumentsBatch"; import { encode, decode } from "./base64"; import * as utils from "./serviceUtils"; -import {SearchIndexingBufferedSender} from "."; +import { SearchIndexingBufferedSender } from "."; /** * Client options used to configure Cognitive Search API requests. @@ -618,7 +618,9 @@ export class SearchClient { } } - public getSearchIndexingBufferedSenderInstance(options: SearchIndexingBufferedSenderOptions = {}): SearchIndexingBufferedSender { + public getSearchIndexingBufferedSenderInstance( + options: SearchIndexingBufferedSenderOptions = {} + ): SearchIndexingBufferedSender { return new SearchIndexingBufferedSender(this, options); } diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index af54f1a2dfcb..ce69e08d7ef9 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -1,25 +1,33 @@ -import {SearchClient} from "./searchClient"; -import {IndexDocumentsBatch} from "./indexDocumentsBatch"; -import {IndexDocumentsAction, SearchIndexingBufferedSenderOptions, SearchIndexingBufferedSenderUploadDocumentsOptions, SearchIndexingBufferedSenderMergeDocumentsOptions, SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, SearchIndexingBufferedSenderDeleteDocumentsOptions, SearchIndexingBufferedSenderFlushDocumentsOptions} from "./indexModels"; -import {IndexDocumentsResult} from "./generated/data/models"; -import {RestError, OperationOptions} from "@azure/core-http"; -import EventEmitter from 'events'; -import {createSpan} from "./tracing"; -import {CanonicalCode} from "@opentelemetry/api"; +import { SearchClient } from "./searchClient"; +import { IndexDocumentsBatch } from "./indexDocumentsBatch"; +import { + IndexDocumentsAction, + SearchIndexingBufferedSenderOptions, + SearchIndexingBufferedSenderUploadDocumentsOptions, + SearchIndexingBufferedSenderMergeDocumentsOptions, + SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, + SearchIndexingBufferedSenderDeleteDocumentsOptions, + SearchIndexingBufferedSenderFlushDocumentsOptions +} from "./indexModels"; +import { IndexDocumentsResult } from "./generated/data/models"; +import { RestError, OperationOptions } from "@azure/core-http"; +import EventEmitter from "events"; +import { createSpan } from "./tracing"; +import { CanonicalCode } from "@opentelemetry/api"; -const DEFAULT_BATCH_SIZE:number = 1000; -const DEFAULT_FLUSH_WINDOW:number = 60000; -const RETRY_COUNT:number = 3; +const DEFAULT_BATCH_SIZE: number = 1000; +const DEFAULT_FLUSH_WINDOW: number = 60000; +const RETRY_COUNT: number = 3; /** * Class used to perform buffered operations against a search index, * including adding, updating, and removing them. */ export class SearchIndexingBufferedSender { - private client:SearchClient; - private autoFlush:boolean; - private flushWindowInMs:number; - private batchSize:number; + private client: SearchClient; + private autoFlush: boolean; + private flushWindowInMs: number; + private batchSize: number; private batchObject: IndexDocumentsBatch; private cleanupTimer?: () => void; private readonly emitter = new EventEmitter(); @@ -27,20 +35,26 @@ export class SearchIndexingBufferedSender { constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { this.client = client; this.autoFlush = options.autoFlush ?? false; - this.flushWindowInMs = options.flushWindowInMs?? DEFAULT_FLUSH_WINDOW; - this.batchSize = options.batchSize?? DEFAULT_BATCH_SIZE; + this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW; + this.batchSize = options.batchSize ?? DEFAULT_BATCH_SIZE; this.batchObject = new IndexDocumentsBatch(); - if(this.autoFlush) { + if (this.autoFlush) { const interval = setInterval(() => this.flush(), this.flushWindowInMs); interval?.unref(); this.cleanupTimer = () => { clearInterval(interval); - } + }; } } - public async uploadDocuments(documents: T[], options: SearchIndexingBufferedSenderUploadDocumentsOptions = {}): Promise { - const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-uploadDocuments", options); + public async uploadDocuments( + documents: T[], + options: SearchIndexingBufferedSenderUploadDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-uploadDocuments", + options + ); try { this.batchObject.upload(documents); this.emitter.emit("batchAdded", { @@ -59,8 +73,14 @@ export class SearchIndexingBufferedSender { } } - public async mergeDocuments(documents: T[], options: SearchIndexingBufferedSenderMergeDocumentsOptions = {}): Promise { - const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-mergeDocuments", options); + public async mergeDocuments( + documents: T[], + options: SearchIndexingBufferedSenderMergeDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-mergeDocuments", + options + ); try { this.batchObject.merge(documents); this.emitter.emit("batchAdded", { @@ -79,8 +99,14 @@ export class SearchIndexingBufferedSender { } } - public async mergeOrUploadDocuments(documents: T[], options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {}): Promise { - const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-mergeOrUploadDocuments", options); + public async mergeOrUploadDocuments( + documents: T[], + options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-mergeOrUploadDocuments", + options + ); try { this.batchObject.mergeOrUpload(documents); this.emitter.emit("batchAdded", { @@ -97,10 +123,16 @@ export class SearchIndexingBufferedSender { } finally { span.end(); } - } + } - public async deleteDocuments(documents: T[], options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {}): Promise { - const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-deleteDocuments", options); + public async deleteDocuments( + documents: T[], + options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-deleteDocuments", + options + ); try { this.batchObject.delete(documents); this.emitter.emit("batchAdded", { @@ -119,8 +151,13 @@ export class SearchIndexingBufferedSender { } } - public async flush(options: SearchIndexingBufferedSenderFlushDocumentsOptions = {}): Promise { - const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-deleteDocuments", options); + public async flush( + options: SearchIndexingBufferedSenderFlushDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-deleteDocuments", + options + ); try { if (this.batchObject.actions.length > 0) { return this.internalFlush(true, updatedOptions); @@ -146,7 +183,10 @@ export class SearchIndexingBufferedSender { public on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void; public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; public on(event: "batchFailed", listener: (e: RestError) => void): void; - public on(event: "batchAdded" | "batchSucceeded" | "batchFailed", listener: (e: any) => void): void { + public on( + event: "batchAdded" | "batchSucceeded" | "batchFailed", + listener: (e: any) => void + ): void { this.emitter.on(event, listener); } @@ -157,37 +197,44 @@ export class SearchIndexingBufferedSender { } private isBatchReady(): boolean { - return (this.batchObject.actions.length >= this.batchSize); + return this.batchObject.actions.length >= this.batchSize; } private async internalFlush(force: boolean, options: OperationOptions = {}): Promise { - if(force || (this.autoFlush && this.isBatchReady())) { + if (force || (this.autoFlush && this.isBatchReady())) { // Split it - const actions:IndexDocumentsAction[] = this.batchObject.actions; + const actions: IndexDocumentsAction[] = this.batchObject.actions; this.batchObject = new IndexDocumentsBatch(); - while(actions.length > 0) { + while (actions.length > 0) { const actionsToSend = actions.splice(0, this.batchSize); await this.submitDocuments(actionsToSend, options); } } } - private async submitDocuments(actionsToSend: IndexDocumentsAction[], options:OperationOptions, retryAttempt:number = 0): Promise { + private async submitDocuments( + actionsToSend: IndexDocumentsAction[], + options: OperationOptions, + retryAttempt: number = 0 + ): Promise { try { - const result = await this.client.indexDocuments(new IndexDocumentsBatch(actionsToSend), options); + const result = await this.client.indexDocuments( + new IndexDocumentsBatch(actionsToSend), + options + ); // raise success event this.emitter.emit("batchSucceeded", result); - } catch(e) { - if(this.isRetryAbleError(e) && retryAttempt < RETRY_COUNT) { + } catch (e) { + if (this.isRetryAbleError(e) && retryAttempt < RETRY_COUNT) { this.submitDocuments(actionsToSend, options, retryAttempt + 1); } else { this.emitter.emit("batchFailed", e); - throw(e); - } + throw e; + } } } private isRetryAbleError(e: any): boolean { - return (e.code && (e.code === "422" || e.code === "409" || e.code === "503")); + return e.code && (e.code === "422" || e.code === "409" || e.code === "503"); } } From 1d72786eaadc36cfc3c984baffabfe362193f3d4 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Tue, 29 Sep 2020 23:57:24 -0700 Subject: [PATCH 3/9] Additional Changes --- .../review/search-documents.api.md | 37 +++--- .../search-documents/src/indexModels.ts | 27 ++++ .../search-documents/src/searchClient.ts | 6 + .../src/searchIndexingBufferedSender.ts | 117 +++++++++++++++++- 4 files changed, 162 insertions(+), 25 deletions(-) diff --git a/sdk/search/search-documents/review/search-documents.api.md b/sdk/search/search-documents/review/search-documents.api.md index 4fc66ba959ee..eeb9fdf13f6a 100644 --- a/sdk/search/search-documents/review/search-documents.api.md +++ b/sdk/search/search-documents/review/search-documents.api.md @@ -1029,7 +1029,6 @@ export class SearchClient { readonly endpoint: string; getDocument(key: string, options?: GetDocumentOptions): Promise; getDocumentsCount(options?: CountDocumentsOptions): Promise; - // (undocumented) getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender; indexDocuments(batch: IndexDocumentsBatch, options?: IndexDocumentsOptions): Promise; readonly indexName: string; @@ -1228,52 +1227,48 @@ export interface SearchIndexerWarning { // @public export class SearchIndexingBufferedSender { constructor(client: SearchClient, options?: SearchIndexingBufferedSenderOptions); - // (undocumented) deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise; dispose(): void; - // (undocumented) flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise; - // (undocumented) mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise; - // (undocumented) mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise; - // (undocumented) + off(event: "batchAdded", listener: (e: { + action: string; + documents: T[]; + }) => void): void; + off(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; - // (undocumented) off(event: "batchFailed", listener: (e: RestError) => void): void; - // (undocumented) - on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void; - // (undocumented) + on(event: "batchAdded", listener: (e: { + action: string; + documents: T[]; + }) => void): void; + on(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; - // (undocumented) on(event: "batchFailed", listener: (e: RestError) => void): void; - // (undocumented) uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise; } -// @public (undocumented) +// @public export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions; -// @public (undocumented) +// @public export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions; -// @public (undocumented) +// @public export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions; -// @public (undocumented) +// @public export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions; -// @public (undocumented) +// @public export interface SearchIndexingBufferedSenderOptions { - // (undocumented) autoFlush?: boolean; - // (undocumented) batchSize?: number; - // (undocumented) flushWindowInMs?: number; } -// @public (undocumented) +// @public export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; // @public diff --git a/sdk/search/search-documents/src/indexModels.ts b/sdk/search/search-documents/src/indexModels.ts index 61e10d1811d0..6a0f47658bd5 100644 --- a/sdk/search/search-documents/src/indexModels.ts +++ b/sdk/search/search-documents/src/indexModels.ts @@ -29,16 +29,43 @@ export type SearchOptions = OperationOptions & SearchRequestOptions = OperationOptions & SuggestRequest; +/** + * Options for SearchIndexingBufferedSender. + */ export interface SearchIndexingBufferedSenderOptions { + /** + * Indicates if autoFlush is enabled. + */ autoFlush?: boolean; + /** + * Interval between flushes (in milliseconds). + */ flushWindowInMs?: number; + /** + * Size of the batch. + */ batchSize?: number; } +/** + * Options for SearchIndexingBufferedSenderUploadDocuments. + */ export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderMergeDocuments. + */ export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderMergeOrUploadDocuments. + */ export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderDeleteDocuments. + */ export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderFlushDocuments. + */ export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions; /** diff --git a/sdk/search/search-documents/src/searchClient.ts b/sdk/search/search-documents/src/searchClient.ts index 999144f72d10..bd8ab2bd44bc 100644 --- a/sdk/search/search-documents/src/searchClient.ts +++ b/sdk/search/search-documents/src/searchClient.ts @@ -618,6 +618,12 @@ export class SearchClient { } } + /** + * Gets an instance of SearchIndexingBufferedSender. + * + * @param options SearchIndexingBufferedSender Options + */ + public getSearchIndexingBufferedSenderInstance( options: SearchIndexingBufferedSenderOptions = {} ): SearchIndexingBufferedSender { diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index ce69e08d7ef9..4a8e8032453e 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -24,14 +24,42 @@ const RETRY_COUNT: number = 3; * including adding, updating, and removing them. */ export class SearchIndexingBufferedSender { + /** + * Search Client used to call the underlying IndexBatch operations. + */ private client: SearchClient; + /** + * Indicates if autoFlush is enabled. + */ private autoFlush: boolean; + /** + * Interval between flushes (in milliseconds). + */ private flushWindowInMs: number; + /** + * Size of the batch. + */ private batchSize: number; + /** + * Batch object used to complete the service call. + */ private batchObject: IndexDocumentsBatch; + /** + * Clean up for the timer + */ private cleanupTimer?: () => void; + /** + * Event emitter/publisher used in the Buffered Sender + */ private readonly emitter = new EventEmitter(); + /** + * Creates a new instance of SearchIndexingBufferedSender. + * + * @param client Search Client used to call the underlying IndexBatch operations. + * @param options Options to modify batch size, auto flush and flush window. + * + */ constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { this.client = client; this.autoFlush = options.autoFlush ?? false; @@ -47,6 +75,12 @@ export class SearchIndexingBufferedSender { } } + /** + * Uploads the documents/Adds the documents to the upload queue. + * + * @param documents Documents to be uploaded. + * @param options Upload options. + */ public async uploadDocuments( documents: T[], options: SearchIndexingBufferedSenderUploadDocumentsOptions = {} @@ -73,6 +107,12 @@ export class SearchIndexingBufferedSender { } } + /** + * Merges the documents/Adds the documents to the merge queue. + * + * @param documents Documents to be merged. + * @param options Upload options. + */ public async mergeDocuments( documents: T[], options: SearchIndexingBufferedSenderMergeDocumentsOptions = {} @@ -99,6 +139,12 @@ export class SearchIndexingBufferedSender { } } + /** + * Merges/Uploads the documents/Adds the documents to the merge/upload queue. + * + * @param documents Documents to be merged/uploaded. + * @param options Upload options. + */ public async mergeOrUploadDocuments( documents: T[], options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {} @@ -125,6 +171,12 @@ export class SearchIndexingBufferedSender { } } + /** + * Deletes the documents/Adds the documents to the delete queue. + * + * @param documents Documents to be deleted. + * @param options Upload options. + */ public async deleteDocuments( documents: T[], options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {} @@ -151,11 +203,16 @@ export class SearchIndexingBufferedSender { } } + /** + * Flushes the queue manually. + * + * @param options Flush options. + */ public async flush( options: SearchIndexingBufferedSenderFlushDocumentsOptions = {} ): Promise { const { span, updatedOptions } = createSpan( - "SearchIndexingBufferedSender-deleteDocuments", + "SearchIndexingBufferedSender-flush", options ); try { @@ -180,19 +237,70 @@ export class SearchIndexingBufferedSender { this.cleanupTimer && this.cleanupTimer(); } - public on(event: "batchAdded", listener: (e: IndexDocumentsResult) => void): void; + /** + * Attach Batch Added Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public on(event: "batchAdded", listener: (e: {action: string, documents: T[]}) => void): void; + /** + * Attach Batch Sent Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public on(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; + /** + * Attach Batch Succeeded Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + /** + * Attach Batch Failed Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ public on(event: "batchFailed", listener: (e: RestError) => void): void; public on( - event: "batchAdded" | "batchSucceeded" | "batchFailed", + event: "batchAdded" | "batchSent" | "batchSucceeded" | "batchFailed", listener: (e: any) => void ): void { this.emitter.on(event, listener); } + /** + * Detach Batch Added Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public off(event: "batchAdded", listener: (e: {action: string, documents: T[]}) => void): void; + /** + * Detach Batch Sent Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public off(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; + /** + * Detach Batch Succeeded Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + /** + * Detach Batch Failed Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ public off(event: "batchFailed", listener: (e: RestError) => void): void; - public off(event: "batchSucceeded" | "batchFailed", listener: (e: any) => void): void { + public off(event: "batchAdded" | "batchSent" | "batchSucceeded" | "batchFailed", listener: (e: any) => void): void { this.emitter.removeListener(event, listener); } @@ -218,6 +326,7 @@ export class SearchIndexingBufferedSender { retryAttempt: number = 0 ): Promise { try { + this.emitter.emit("batchSent", actionsToSend); const result = await this.client.indexDocuments( new IndexDocumentsBatch(actionsToSend), options From 89f664288a436ebaf689c9c385914639a61ff65d Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Tue, 29 Sep 2020 23:59:00 -0700 Subject: [PATCH 4/9] Formatting changes --- .../search-documents/src/searchClient.ts | 2 +- .../src/searchIndexingBufferedSender.ts | 46 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sdk/search/search-documents/src/searchClient.ts b/sdk/search/search-documents/src/searchClient.ts index bd8ab2bd44bc..83bb43d456ca 100644 --- a/sdk/search/search-documents/src/searchClient.ts +++ b/sdk/search/search-documents/src/searchClient.ts @@ -620,7 +620,7 @@ export class SearchClient { /** * Gets an instance of SearchIndexingBufferedSender. - * + * * @param options SearchIndexingBufferedSender Options */ diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index 4a8e8032453e..a134aeecb4e9 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -45,7 +45,7 @@ export class SearchIndexingBufferedSender { */ private batchObject: IndexDocumentsBatch; /** - * Clean up for the timer + * Clean up for the timer */ private cleanupTimer?: () => void; /** @@ -55,10 +55,10 @@ export class SearchIndexingBufferedSender { /** * Creates a new instance of SearchIndexingBufferedSender. - * + * * @param client Search Client used to call the underlying IndexBatch operations. * @param options Options to modify batch size, auto flush and flush window. - * + * */ constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { this.client = client; @@ -77,7 +77,7 @@ export class SearchIndexingBufferedSender { /** * Uploads the documents/Adds the documents to the upload queue. - * + * * @param documents Documents to be uploaded. * @param options Upload options. */ @@ -109,7 +109,7 @@ export class SearchIndexingBufferedSender { /** * Merges the documents/Adds the documents to the merge queue. - * + * * @param documents Documents to be merged. * @param options Upload options. */ @@ -141,7 +141,7 @@ export class SearchIndexingBufferedSender { /** * Merges/Uploads the documents/Adds the documents to the merge/upload queue. - * + * * @param documents Documents to be merged/uploaded. * @param options Upload options. */ @@ -173,7 +173,7 @@ export class SearchIndexingBufferedSender { /** * Deletes the documents/Adds the documents to the delete queue. - * + * * @param documents Documents to be deleted. * @param options Upload options. */ @@ -205,16 +205,13 @@ export class SearchIndexingBufferedSender { /** * Flushes the queue manually. - * + * * @param options Flush options. */ public async flush( options: SearchIndexingBufferedSenderFlushDocumentsOptions = {} ): Promise { - const { span, updatedOptions } = createSpan( - "SearchIndexingBufferedSender-flush", - options - ); + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-flush", options); try { if (this.batchObject.actions.length > 0) { return this.internalFlush(true, updatedOptions); @@ -239,28 +236,28 @@ export class SearchIndexingBufferedSender { /** * Attach Batch Added Event - * + * * @param event Event to be emitted * @param listener Event Listener */ - public on(event: "batchAdded", listener: (e: {action: string, documents: T[]}) => void): void; + public on(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void; /** * Attach Batch Sent Event - * + * * @param event Event to be emitted * @param listener Event Listener */ public on(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; /** * Attach Batch Succeeded Event - * + * * @param event Event to be emitted * @param listener Event Listener */ public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; /** * Attach Batch Failed Event - * + * * @param event Event to be emitted * @param listener Event Listener */ @@ -274,33 +271,36 @@ export class SearchIndexingBufferedSender { /** * Detach Batch Added Event - * + * * @param event Event to be emitted * @param listener Event Listener */ - public off(event: "batchAdded", listener: (e: {action: string, documents: T[]}) => void): void; + public off(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void; /** * Detach Batch Sent Event - * + * * @param event Event to be emitted * @param listener Event Listener */ public off(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; /** * Detach Batch Succeeded Event - * + * * @param event Event to be emitted * @param listener Event Listener */ public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; /** * Detach Batch Failed Event - * + * * @param event Event to be emitted * @param listener Event Listener */ public off(event: "batchFailed", listener: (e: RestError) => void): void; - public off(event: "batchAdded" | "batchSent" | "batchSucceeded" | "batchFailed", listener: (e: any) => void): void { + public off( + event: "batchAdded" | "batchSent" | "batchSucceeded" | "batchFailed", + listener: (e: any) => void + ): void { this.emitter.removeListener(event, listener); } From 87d312413ad21868d5820e3831fb3335227b4ca1 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Wed, 30 Sep 2020 21:59:33 -0700 Subject: [PATCH 5/9] Additional Changes --- .../review/search-documents.api.md | 6 +- .../uploadDocuments/autoFlushSizeBased.ts | 101 ++++++++ .../uploadDocuments/autoFlushTimerBased.ts | 88 +++++++ .../uploadDocuments/manualFlush.ts | 81 +++++++ .../typescript/src/utils/interfaces.ts | 35 +++ .../samples/typescript/src/utils/setup.ts | 222 ++++++++++++++++++ .../search-documents/src/indexModels.ts | 8 - .../src/searchIndexingBufferedSender.ts | 12 +- 8 files changed, 536 insertions(+), 17 deletions(-) create mode 100644 sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts create mode 100644 sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts create mode 100644 sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts create mode 100644 sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts create mode 100644 sdk/search/search-documents/samples/typescript/src/utils/setup.ts diff --git a/sdk/search/search-documents/review/search-documents.api.md b/sdk/search/search-documents/review/search-documents.api.md index eeb9fdf13f6a..09762cb1f5f7 100644 --- a/sdk/search/search-documents/review/search-documents.api.md +++ b/sdk/search/search-documents/review/search-documents.api.md @@ -1236,14 +1236,14 @@ export class SearchIndexingBufferedSender { action: string; documents: T[]; }) => void): void; - off(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; + off(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; off(event: "batchFailed", listener: (e: RestError) => void): void; on(event: "batchAdded", listener: (e: { action: string; documents: T[]; }) => void): void; - on(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; + on(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; on(event: "batchFailed", listener: (e: RestError) => void): void; uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise; @@ -1264,8 +1264,6 @@ export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = Operatio // @public export interface SearchIndexingBufferedSenderOptions { autoFlush?: boolean; - batchSize?: number; - flushWindowInMs?: number; } // @public diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts new file mode 100644 index 000000000000..efe9051bae05 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts @@ -0,0 +1,101 @@ +import { + SearchIndexingBufferedSender, + AzureKeyCredential, + SearchClient, + GeographyPoint, + SearchIndexClient +} from "@azure/search-documents"; +import {createIndex, WAIT_TIME} from "../../utils/setup"; +import {Hotel} from "../../utils/interfaces"; +import {delay} from "@azure/core-http"; + +/** + * This sample is to demonstrate the use of SearchIndexingBufferedSender. + * In this sample, the autoFlush is set to true. i.e. the user does not + * want to call the flush manually. The upload action happen automatically + * when the size of the batch is greater than threshold (which is set to 1000) + * by default. + */ +const endpoint = process.env.SEARCH_API_ENDPOINT || ""; +const apiKey = process.env.SEARCH_API_KEY || ""; +const TEST_INDEX_NAME = "hotel-live-sample-test3"; + +function getDocumentsArray(size: number): Hotel[] { + const array:Hotel[] = []; + for(let i = 1;i <= size;i++) { + array.push({ + hotelId: `${i}`, + description: + "Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " + + "and a really helpful concierge. The location is perfect -- right downtown, close to all " + + "the tourist attractions. We highly recommend this hotel.", + descriptionFr: + "Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " + + "à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " + + "centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " + + "cet hôtel.", + hotelName: "Fancy Stay", + category: "Luxury", + tags: ["pool", "view", "wifi", "concierge"], + parkingIncluded: false, + lastRenovationDate: new Date(2010, 5, 27), + rating: 5, + location: new GeographyPoint(47.678581, -122.131577) + }); + } + return array; +} + +export async function main() { + console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Sizes Sample`); + + const credential = new AzureKeyCredential(apiKey); + const searchClient: SearchClient = new SearchClient(endpoint, TEST_INDEX_NAME, credential); + const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); + + await createIndex(indexClient, TEST_INDEX_NAME); + await delay(WAIT_TIME); + + const bufferedClient:SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance({ + autoFlush: true + }); + + bufferedClient.on("batchAdded", (response:any) => { + console.log("Batch Added Event has been receieved...."); + }); + + bufferedClient.on("batchSent", (response: any) => { + console.log("Batch Sent Event has been receieved...."); + }); + + bufferedClient.on("batchSucceeded", (response:any) => { + console.log("Batch Succeeded Event has been receieved...."); + console.log(response); + }); + + bufferedClient.on("batchFailed", (response:any) => { + console.log("Batch Failed Event has been receieved...."); + console.log(response); + }); + + const documents:Hotel[] = getDocumentsArray(1001); + bufferedClient.uploadDocuments(documents); + + await (WAIT_TIME); + + let count = await searchClient.getDocumentsCount(); + while (count !== documents.length) { + await delay(WAIT_TIME); + count = await searchClient.getDocumentsCount(); + } + + // When the autoFlush is set to true, the user + // has to call the dispose method to clear the + // timer. + bufferedClient.dispose(); + await indexClient.deleteIndex(TEST_INDEX_NAME); + await delay(WAIT_TIME); +} + + +main(); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts new file mode 100644 index 000000000000..f5c3a5ff47c4 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts @@ -0,0 +1,88 @@ +import { + SearchIndexingBufferedSender, + AzureKeyCredential, + SearchClient, + GeographyPoint, + SearchIndexClient +} from "@azure/search-documents"; +import {createIndex, WAIT_TIME} from "../../utils/setup"; +import {Hotel} from "../../utils/interfaces"; +import {delay} from "@azure/core-http"; + +/** + * This sample is to demonstrate the use of SearchIndexingBufferedSender. + * In this sample, the autoFlush is set to true. i.e. the user does not + * want to call the flush manually. The upload action happen automatically + * when the time interval is met. The time interval is set to 60000ms + * by default. + */ +const endpoint = process.env.SEARCH_API_ENDPOINT || ""; +const apiKey = process.env.SEARCH_API_KEY || ""; +const TEST_INDEX_NAME = "hotel-live-sample-test2"; + +export async function main() { + console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Timer Sample`); + + const credential = new AzureKeyCredential(apiKey); + const searchClient: SearchClient = new SearchClient(endpoint, TEST_INDEX_NAME, credential); + const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); + + await createIndex(indexClient, TEST_INDEX_NAME); + await delay(WAIT_TIME); + + const bufferedClient:SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance({ + autoFlush: true + }); + + bufferedClient.on("batchAdded", (response:any) => { + console.log("Batch Added Event has been receieved...."); + }); + + bufferedClient.on("batchSent", (response: any) => { + console.log("Batch Sent Event has been receieved...."); + }); + + bufferedClient.on("batchSucceeded", (response:any) => { + console.log("Batch Succeeded Event has been receieved...."); + console.log(response); + }); + + bufferedClient.on("batchFailed", (response:any) => { + console.log("Batch Failed Event has been receieved...."); + console.log(response); + }); + + bufferedClient.uploadDocuments([ + { + hotelId: "1", + description: + "Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " + + "and a really helpful concierge. The location is perfect -- right downtown, close to all " + + "the tourist attractions. We highly recommend this hotel.", + descriptionFr: + "Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " + + "à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " + + "centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " + + "cet hôtel.", + hotelName: "Fancy Stay", + category: "Luxury", + tags: ["pool", "view", "wifi", "concierge"], + parkingIncluded: false, + lastRenovationDate: new Date(2010, 5, 27), + rating: 5, + location: new GeographyPoint(47.678581, -122.131577) + }]); + + console.log("Waiting for 65000 ms to meet the flush window interval...."); + await delay(65000); + + // When the autoFlush is set to true, the user + // has to call the dispose method to clear the + // timer. + bufferedClient.dispose(); + await indexClient.deleteIndex(TEST_INDEX_NAME); + await delay(WAIT_TIME); +} + + +main(); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts new file mode 100644 index 000000000000..7afa9cdabb93 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts @@ -0,0 +1,81 @@ +import { + SearchIndexingBufferedSender, + AzureKeyCredential, + SearchClient, + GeographyPoint, + SearchIndexClient +} from "@azure/search-documents"; +import {createIndex, WAIT_TIME} from "../../utils/setup"; +import {Hotel} from "../../utils/interfaces"; +import {delay} from "@azure/core-http"; + +/** + * This sample is to demonstrate the use of SearchIndexingBufferedSender. + * In this sample, the autoFlush is set to false. i.e. the user + * wants to call the flush manually. + */ +const endpoint = process.env.SEARCH_API_ENDPOINT || ""; +const apiKey = process.env.SEARCH_API_KEY || ""; +const TEST_INDEX_NAME = "hotel-live-sample-test1"; + +export async function main() { + console.log(`Running SearchIndexingBufferedSender-uploadDocuments-Without AutoFlush Sample`); + + const credential = new AzureKeyCredential(apiKey); + const searchClient: SearchClient = new SearchClient(endpoint, TEST_INDEX_NAME, credential); + const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); + + await createIndex(indexClient, TEST_INDEX_NAME); + await delay(WAIT_TIME); + + const bufferedClient:SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance({ + autoFlush: false + }); + + bufferedClient.on("batchAdded", (response:any) => { + console.log("Batch Added Event has been receieved...."); + }); + + bufferedClient.on("batchSent", (response: any) => { + console.log("Batch Sent Event has been receieved...."); + }); + + bufferedClient.on("batchSucceeded", (response:any) => { + console.log("Batch Succeeded Event has been receieved...."); + console.log(response); + }); + + bufferedClient.on("batchFailed", (response:any) => { + console.log("Batch Failed Event has been receieved...."); + console.log(response); + }) + + bufferedClient.uploadDocuments([ + { + hotelId: "1", + description: + "Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " + + "and a really helpful concierge. The location is perfect -- right downtown, close to all " + + "the tourist attractions. We highly recommend this hotel.", + descriptionFr: + "Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " + + "à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " + + "centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " + + "cet hôtel.", + hotelName: "Fancy Stay", + category: "Luxury", + tags: ["pool", "view", "wifi", "concierge"], + parkingIncluded: false, + lastRenovationDate: new Date(2010, 5, 27), + rating: 5, + location: new GeographyPoint(47.678581, -122.131577) + }]); + + await bufferedClient.flush(); + bufferedClient.dispose(); + await indexClient.deleteIndex(TEST_INDEX_NAME); + await delay(WAIT_TIME); +} + + +main(); diff --git a/sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts b/sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts new file mode 100644 index 000000000000..35a81e351321 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { GeographyPoint } from "@azure/search-documents"; + +export interface Hotel { + hotelId: string; + hotelName?: string | null; + description?: string | null; + descriptionFr?: string | null; + category?: string | null; + tags?: string[] | null; + parkingIncluded?: boolean | null; + smokingAllowed?: boolean | null; + lastRenovationDate?: Date | null; + rating?: number | null; + location?: GeographyPoint | null; + address?: { + streetAddress?: string | null; + city?: string | null; + stateProvince?: string | null; + postalCode?: string | null; + country?: string | null; + } | null; + rooms?: Array<{ + description?: string | null; + descriptionFr?: string | null; + type?: string | null; + baseRate?: number | null; + bedOptions?: string | null; + sleepsCount?: number | null; + smokingAllowed?: boolean | null; + tags?: string[] | null; + }> | null; +} diff --git a/sdk/search/search-documents/samples/typescript/src/utils/setup.ts b/sdk/search/search-documents/samples/typescript/src/utils/setup.ts new file mode 100644 index 000000000000..f9e9542ba64a --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/utils/setup.ts @@ -0,0 +1,222 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { + SearchIndexClient, + SearchIndex, + KnownAnalyzerNames +} from "@azure/search-documents" + +export const WAIT_TIME = 4000; + +// eslint-disable-next-line @azure/azure-sdk/ts-use-interface-parameters +export async function createIndex(client: SearchIndexClient, name: string): Promise { + const hotelIndex: SearchIndex = { + name, + fields: [ + { + type: "Edm.String", + name: "hotelId", + key: true, + filterable: true, + sortable: true + }, + { + type: "Edm.String", + name: "hotelName", + searchable: true, + filterable: true, + sortable: true + }, + { + type: "Edm.String", + name: "description", + searchable: true, + analyzerName: KnownAnalyzerNames.EnLucene + }, + { + type: "Edm.String", + name: "descriptionFr", + searchable: true, + analyzerName: KnownAnalyzerNames.FrLucene + }, + { + type: "Edm.String", + name: "category", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Collection(Edm.String)", + name: "tags", + searchable: true, + filterable: true, + facetable: true + }, + { + type: "Edm.Boolean", + name: "parkingIncluded", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.Boolean", + name: "smokingAllowed", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.DateTimeOffset", + name: "lastRenovationDate", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.Double", + name: "rating", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.GeographyPoint", + name: "location", + filterable: true, + sortable: true + }, + { + type: "Edm.ComplexType", + name: "address", + fields: [ + { + type: "Edm.String", + name: "streetAddress", + searchable: true + }, + { + type: "Edm.String", + name: "city", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.String", + name: "stateProvince", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.String", + name: "country", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.String", + name: "postalCode", + searchable: true, + filterable: true, + sortable: true, + facetable: true + } + ] + }, + { + type: "Collection(Edm.ComplexType)", + name: "rooms", + fields: [ + { + type: "Edm.String", + name: "description", + searchable: true, + analyzerName: KnownAnalyzerNames.EnLucene + }, + { + type: "Edm.String", + name: "descriptionFr", + searchable: true, + analyzerName: KnownAnalyzerNames.FrLucene + }, + { + type: "Edm.String", + name: "type", + searchable: true, + filterable: true, + facetable: true + }, + { + type: "Edm.Double", + name: "baseRate", + filterable: true, + facetable: true + }, + { + type: "Edm.String", + name: "bedOptions", + searchable: true, + filterable: true, + facetable: true + }, + { + type: "Edm.Int32", + name: "sleepsCount", + filterable: true, + facetable: true + }, + { + type: "Edm.Boolean", + name: "smokingAllowed", + filterable: true, + facetable: true + }, + { + type: "Collection(Edm.String)", + name: "tags", + searchable: true, + filterable: true, + facetable: true + } + ] + } + ], + suggesters: [ + { + name: "sg", + sourceFields: ["description", "hotelName"] + } + ], + scoringProfiles: [ + { + name: "nearest", + functionAggregation: "sum", + functions: [ + { + type: "distance", + fieldName: "location", + boost: 2, + parameters: { + referencePointParameter: "myloc", + boostingDistance: 100 + } + } + ] + } + ], + corsOptions: { + // for browser tests + allowedOrigins: ["*"] + } + }; + await client.createIndex(hotelIndex); +} diff --git a/sdk/search/search-documents/src/indexModels.ts b/sdk/search/search-documents/src/indexModels.ts index 6a0f47658bd5..4d32ffd10cce 100644 --- a/sdk/search/search-documents/src/indexModels.ts +++ b/sdk/search/search-documents/src/indexModels.ts @@ -37,14 +37,6 @@ export interface SearchIndexingBufferedSenderOptions { * Indicates if autoFlush is enabled. */ autoFlush?: boolean; - /** - * Interval between flushes (in milliseconds). - */ - flushWindowInMs?: number; - /** - * Size of the batch. - */ - batchSize?: number; } /** diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index a134aeecb4e9..a7da6b5e104e 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -63,8 +63,8 @@ export class SearchIndexingBufferedSender { constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { this.client = client; this.autoFlush = options.autoFlush ?? false; - this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW; - this.batchSize = options.batchSize ?? DEFAULT_BATCH_SIZE; + this.flushWindowInMs = DEFAULT_FLUSH_WINDOW; + this.batchSize = DEFAULT_BATCH_SIZE; this.batchObject = new IndexDocumentsBatch(); if (this.autoFlush) { const interval = setInterval(() => this.flush(), this.flushWindowInMs); @@ -247,7 +247,7 @@ export class SearchIndexingBufferedSender { * @param event Event to be emitted * @param listener Event Listener */ - public on(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; + public on(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; /** * Attach Batch Succeeded Event * @@ -282,7 +282,7 @@ export class SearchIndexingBufferedSender { * @param event Event to be emitted * @param listener Event Listener */ - public off(event: "batchSent", listener: (e: IndexDocumentsAction[]) => void): void; + public off(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; /** * Detach Batch Succeeded Event * @@ -326,7 +326,9 @@ export class SearchIndexingBufferedSender { retryAttempt: number = 0 ): Promise { try { - this.emitter.emit("batchSent", actionsToSend); + for(let i = 0; i < actionsToSend.length; i++) { + this.emitter.emit("batchSent", actionsToSend[i]); + } const result = await this.client.indexDocuments( new IndexDocumentsBatch(actionsToSend), options From d8316296ed32a60192c44b70000677240c36c41a Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Wed, 30 Sep 2020 22:00:03 -0700 Subject: [PATCH 6/9] Formatting changes --- .../uploadDocuments/autoFlushSizeBased.ts | 45 ++++++++++--------- .../uploadDocuments/autoFlushTimerBased.ts | 40 ++++++++++------- .../uploadDocuments/manualFlush.ts | 40 ++++++++++------- .../samples/typescript/src/utils/setup.ts | 6 +-- .../src/searchIndexingBufferedSender.ts | 4 +- 5 files changed, 74 insertions(+), 61 deletions(-) diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts index efe9051bae05..bf883f4cc422 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts @@ -5,9 +5,9 @@ import { GeographyPoint, SearchIndexClient } from "@azure/search-documents"; -import {createIndex, WAIT_TIME} from "../../utils/setup"; -import {Hotel} from "../../utils/interfaces"; -import {delay} from "@azure/core-http"; +import { createIndex, WAIT_TIME } from "../../utils/setup"; +import { Hotel } from "../../utils/interfaces"; +import { delay } from "@azure/core-http"; /** * This sample is to demonstrate the use of SearchIndexingBufferedSender. @@ -21,8 +21,8 @@ const apiKey = process.env.SEARCH_API_KEY || ""; const TEST_INDEX_NAME = "hotel-live-sample-test3"; function getDocumentsArray(size: number): Hotel[] { - const array:Hotel[] = []; - for(let i = 1;i <= size;i++) { + const array: Hotel[] = []; + for (let i = 1; i <= size; i++) { array.push({ hotelId: `${i}`, description: @@ -48,19 +48,25 @@ function getDocumentsArray(size: number): Hotel[] { export async function main() { console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Sizes Sample`); - + const credential = new AzureKeyCredential(apiKey); - const searchClient: SearchClient = new SearchClient(endpoint, TEST_INDEX_NAME, credential); + const searchClient: SearchClient = new SearchClient( + endpoint, + TEST_INDEX_NAME, + credential + ); const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); await createIndex(indexClient, TEST_INDEX_NAME); await delay(WAIT_TIME); - - const bufferedClient:SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance({ - autoFlush: true - }); - bufferedClient.on("batchAdded", (response:any) => { + const bufferedClient: SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance( + { + autoFlush: true + } + ); + + bufferedClient.on("batchAdded", (response: any) => { console.log("Batch Added Event has been receieved...."); }); @@ -68,34 +74,33 @@ export async function main() { console.log("Batch Sent Event has been receieved...."); }); - bufferedClient.on("batchSucceeded", (response:any) => { + bufferedClient.on("batchSucceeded", (response: any) => { console.log("Batch Succeeded Event has been receieved...."); console.log(response); }); - bufferedClient.on("batchFailed", (response:any) => { + bufferedClient.on("batchFailed", (response: any) => { console.log("Batch Failed Event has been receieved...."); console.log(response); }); - const documents:Hotel[] = getDocumentsArray(1001); + const documents: Hotel[] = getDocumentsArray(1001); bufferedClient.uploadDocuments(documents); - await (WAIT_TIME); + await WAIT_TIME; let count = await searchClient.getDocumentsCount(); while (count !== documents.length) { await delay(WAIT_TIME); count = await searchClient.getDocumentsCount(); } - + // When the autoFlush is set to true, the user - // has to call the dispose method to clear the - // timer. + // has to call the dispose method to clear the + // timer. bufferedClient.dispose(); await indexClient.deleteIndex(TEST_INDEX_NAME); await delay(WAIT_TIME); } - main(); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts index f5c3a5ff47c4..5f4e84095dc7 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts @@ -5,9 +5,9 @@ import { GeographyPoint, SearchIndexClient } from "@azure/search-documents"; -import {createIndex, WAIT_TIME} from "../../utils/setup"; -import {Hotel} from "../../utils/interfaces"; -import {delay} from "@azure/core-http"; +import { createIndex, WAIT_TIME } from "../../utils/setup"; +import { Hotel } from "../../utils/interfaces"; +import { delay } from "@azure/core-http"; /** * This sample is to demonstrate the use of SearchIndexingBufferedSender. @@ -22,19 +22,25 @@ const TEST_INDEX_NAME = "hotel-live-sample-test2"; export async function main() { console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Timer Sample`); - + const credential = new AzureKeyCredential(apiKey); - const searchClient: SearchClient = new SearchClient(endpoint, TEST_INDEX_NAME, credential); + const searchClient: SearchClient = new SearchClient( + endpoint, + TEST_INDEX_NAME, + credential + ); const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); await createIndex(indexClient, TEST_INDEX_NAME); await delay(WAIT_TIME); - - const bufferedClient:SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance({ - autoFlush: true - }); - bufferedClient.on("batchAdded", (response:any) => { + const bufferedClient: SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance( + { + autoFlush: true + } + ); + + bufferedClient.on("batchAdded", (response: any) => { console.log("Batch Added Event has been receieved...."); }); @@ -42,17 +48,17 @@ export async function main() { console.log("Batch Sent Event has been receieved...."); }); - bufferedClient.on("batchSucceeded", (response:any) => { + bufferedClient.on("batchSucceeded", (response: any) => { console.log("Batch Succeeded Event has been receieved...."); console.log(response); }); - bufferedClient.on("batchFailed", (response:any) => { + bufferedClient.on("batchFailed", (response: any) => { console.log("Batch Failed Event has been receieved...."); console.log(response); }); - bufferedClient.uploadDocuments([ + bufferedClient.uploadDocuments([ { hotelId: "1", description: @@ -71,18 +77,18 @@ export async function main() { lastRenovationDate: new Date(2010, 5, 27), rating: 5, location: new GeographyPoint(47.678581, -122.131577) - }]); + } + ]); console.log("Waiting for 65000 ms to meet the flush window interval...."); await delay(65000); - + // When the autoFlush is set to true, the user - // has to call the dispose method to clear the + // has to call the dispose method to clear the // timer. bufferedClient.dispose(); await indexClient.deleteIndex(TEST_INDEX_NAME); await delay(WAIT_TIME); } - main(); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts index 7afa9cdabb93..0a03f806f8ff 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts @@ -5,13 +5,13 @@ import { GeographyPoint, SearchIndexClient } from "@azure/search-documents"; -import {createIndex, WAIT_TIME} from "../../utils/setup"; -import {Hotel} from "../../utils/interfaces"; -import {delay} from "@azure/core-http"; +import { createIndex, WAIT_TIME } from "../../utils/setup"; +import { Hotel } from "../../utils/interfaces"; +import { delay } from "@azure/core-http"; /** * This sample is to demonstrate the use of SearchIndexingBufferedSender. - * In this sample, the autoFlush is set to false. i.e. the user + * In this sample, the autoFlush is set to false. i.e. the user * wants to call the flush manually. */ const endpoint = process.env.SEARCH_API_ENDPOINT || ""; @@ -20,19 +20,25 @@ const TEST_INDEX_NAME = "hotel-live-sample-test1"; export async function main() { console.log(`Running SearchIndexingBufferedSender-uploadDocuments-Without AutoFlush Sample`); - + const credential = new AzureKeyCredential(apiKey); - const searchClient: SearchClient = new SearchClient(endpoint, TEST_INDEX_NAME, credential); + const searchClient: SearchClient = new SearchClient( + endpoint, + TEST_INDEX_NAME, + credential + ); const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); await createIndex(indexClient, TEST_INDEX_NAME); await delay(WAIT_TIME); - - const bufferedClient:SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance({ - autoFlush: false - }); - bufferedClient.on("batchAdded", (response:any) => { + const bufferedClient: SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance( + { + autoFlush: false + } + ); + + bufferedClient.on("batchAdded", (response: any) => { console.log("Batch Added Event has been receieved...."); }); @@ -40,17 +46,17 @@ export async function main() { console.log("Batch Sent Event has been receieved...."); }); - bufferedClient.on("batchSucceeded", (response:any) => { + bufferedClient.on("batchSucceeded", (response: any) => { console.log("Batch Succeeded Event has been receieved...."); console.log(response); }); - bufferedClient.on("batchFailed", (response:any) => { + bufferedClient.on("batchFailed", (response: any) => { console.log("Batch Failed Event has been receieved...."); console.log(response); - }) + }); - bufferedClient.uploadDocuments([ + bufferedClient.uploadDocuments([ { hotelId: "1", description: @@ -69,7 +75,8 @@ export async function main() { lastRenovationDate: new Date(2010, 5, 27), rating: 5, location: new GeographyPoint(47.678581, -122.131577) - }]); + } + ]); await bufferedClient.flush(); bufferedClient.dispose(); @@ -77,5 +84,4 @@ export async function main() { await delay(WAIT_TIME); } - main(); diff --git a/sdk/search/search-documents/samples/typescript/src/utils/setup.ts b/sdk/search/search-documents/samples/typescript/src/utils/setup.ts index f9e9542ba64a..07898f8702dd 100644 --- a/sdk/search/search-documents/samples/typescript/src/utils/setup.ts +++ b/sdk/search/search-documents/samples/typescript/src/utils/setup.ts @@ -1,11 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { - SearchIndexClient, - SearchIndex, - KnownAnalyzerNames -} from "@azure/search-documents" +import { SearchIndexClient, SearchIndex, KnownAnalyzerNames } from "@azure/search-documents"; export const WAIT_TIME = 4000; diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index a7da6b5e104e..7e2cf4894918 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -326,9 +326,9 @@ export class SearchIndexingBufferedSender { retryAttempt: number = 0 ): Promise { try { - for(let i = 0; i < actionsToSend.length; i++) { + for (let i = 0; i < actionsToSend.length; i++) { this.emitter.emit("batchSent", actionsToSend[i]); - } + } const result = await this.client.indexDocuments( new IndexDocumentsBatch(actionsToSend), options From 187630cbe414af45ccc91a8b8f175365730e5270 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Thu, 1 Oct 2020 14:52:24 -0700 Subject: [PATCH 7/9] Response to PR Comments --- .../review/search-documents.api.md | 15 ++++++-- .../uploadDocuments/autoFlushSizeBased.ts | 2 +- .../uploadDocuments/autoFlushTimerBased.ts | 10 +++--- .../uploadDocuments/manualFlush.ts | 2 +- sdk/search/search-documents/src/index.ts | 2 +- .../src/searchIndexingBufferedSender.ts | 36 ++++++++++++------- 6 files changed, 45 insertions(+), 22 deletions(-) diff --git a/sdk/search/search-documents/review/search-documents.api.md b/sdk/search/search-documents/review/search-documents.api.md index 09762cb1f5f7..68cede724ba6 100644 --- a/sdk/search/search-documents/review/search-documents.api.md +++ b/sdk/search/search-documents/review/search-documents.api.md @@ -230,6 +230,15 @@ export type DataChangeDetectionPolicy = HighWaterMarkChangeDetectionPolicy | Sql // @public export type DataDeletionDetectionPolicy = SoftDeleteColumnDeletionDetectionPolicy; +// @public +export const DEFAULT_BATCH_SIZE: number; + +// @public +export const DEFAULT_FLUSH_WINDOW: number; + +// @public +export const DEFAULT_RETRY_COUNT: number; + // @public export interface DefaultCognitiveServicesAccount { description?: string; @@ -1228,7 +1237,7 @@ export interface SearchIndexerWarning { export class SearchIndexingBufferedSender { constructor(client: SearchClient, options?: SearchIndexingBufferedSenderOptions); deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise; - dispose(): void; + dispose(): Promise; flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise; mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise; mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise; @@ -1236,14 +1245,14 @@ export class SearchIndexingBufferedSender { action: string; documents: T[]; }) => void): void; - off(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; + off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; off(event: "batchFailed", listener: (e: RestError) => void): void; on(event: "batchAdded", listener: (e: { action: string; documents: T[]; }) => void): void; - on(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; + on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; on(event: "batchFailed", listener: (e: RestError) => void): void; uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise; diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts index bf883f4cc422..da6fd72ddda5 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts @@ -70,7 +70,7 @@ export async function main() { console.log("Batch Added Event has been receieved...."); }); - bufferedClient.on("batchSent", (response: any) => { + bufferedClient.on("beforeDocumentSent", (response: any) => { console.log("Batch Sent Event has been receieved...."); }); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts index 5f4e84095dc7..6f686626a226 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts @@ -3,7 +3,8 @@ import { AzureKeyCredential, SearchClient, GeographyPoint, - SearchIndexClient + SearchIndexClient, + DEFAULT_FLUSH_WINDOW } from "@azure/search-documents"; import { createIndex, WAIT_TIME } from "../../utils/setup"; import { Hotel } from "../../utils/interfaces"; @@ -44,7 +45,7 @@ export async function main() { console.log("Batch Added Event has been receieved...."); }); - bufferedClient.on("batchSent", (response: any) => { + bufferedClient.on("beforeDocumentSent", (response: any) => { console.log("Batch Sent Event has been receieved...."); }); @@ -80,8 +81,9 @@ export async function main() { } ]); - console.log("Waiting for 65000 ms to meet the flush window interval...."); - await delay(65000); + const wait_time = DEFAULT_FLUSH_WINDOW + 5000; + console.log(`Waiting for ${wait_time} ms to meet the flush window interval....`); + await delay(wait_time); // When the autoFlush is set to true, the user // has to call the dispose method to clear the diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts index 0a03f806f8ff..a4645a7bec46 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts @@ -42,7 +42,7 @@ export async function main() { console.log("Batch Added Event has been receieved...."); }); - bufferedClient.on("batchSent", (response: any) => { + bufferedClient.on("beforeDocumentSent", (response: any) => { console.log("Batch Sent Event has been receieved...."); }); diff --git a/sdk/search/search-documents/src/index.ts b/sdk/search/search-documents/src/index.ts index 10474cb3876e..5c21060a8549 100644 --- a/sdk/search/search-documents/src/index.ts +++ b/sdk/search/search-documents/src/index.ts @@ -2,7 +2,7 @@ // Licensed under the MIT license. export { SearchClient, SearchClientOptions } from "./searchClient"; -export { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender"; +export { SearchIndexingBufferedSender, DEFAULT_BATCH_SIZE, DEFAULT_FLUSH_WINDOW, DEFAULT_RETRY_COUNT } from "./searchIndexingBufferedSender"; export { AutocompleteRequest, AutocompleteOptions, diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index 7e2cf4894918..9925355db88e 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -15,9 +15,18 @@ import EventEmitter from "events"; import { createSpan } from "./tracing"; import { CanonicalCode } from "@opentelemetry/api"; -const DEFAULT_BATCH_SIZE: number = 1000; -const DEFAULT_FLUSH_WINDOW: number = 60000; -const RETRY_COUNT: number = 3; +/** + * Default Batch Size + */ +export const DEFAULT_BATCH_SIZE: number = 1000; +/** + * Default window flush interval + */ +export const DEFAULT_FLUSH_WINDOW: number = 60000; +/** + * Default number of times to retry + */ +export const DEFAULT_RETRY_COUNT: number = 3; /** * Class used to perform buffered operations against a search index, @@ -57,7 +66,7 @@ export class SearchIndexingBufferedSender { * Creates a new instance of SearchIndexingBufferedSender. * * @param client Search Client used to call the underlying IndexBatch operations. - * @param options Options to modify batch size, auto flush and flush window. + * @param options Options to modify auto flush. * */ constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { @@ -230,7 +239,10 @@ export class SearchIndexingBufferedSender { /** * If using autoFlush: true, call this to cleanup the autoflush timer. */ - public dispose(): void { + public async dispose(): Promise { + if (this.batchObject.actions.length > 0) { + await this.internalFlush(true); + } this.cleanupTimer && this.cleanupTimer(); } @@ -247,7 +259,7 @@ export class SearchIndexingBufferedSender { * @param event Event to be emitted * @param listener Event Listener */ - public on(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; + public on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; /** * Attach Batch Succeeded Event * @@ -263,7 +275,7 @@ export class SearchIndexingBufferedSender { */ public on(event: "batchFailed", listener: (e: RestError) => void): void; public on( - event: "batchAdded" | "batchSent" | "batchSucceeded" | "batchFailed", + event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed", listener: (e: any) => void ): void { this.emitter.on(event, listener); @@ -282,7 +294,7 @@ export class SearchIndexingBufferedSender { * @param event Event to be emitted * @param listener Event Listener */ - public off(event: "batchSent", listener: (e: IndexDocumentsAction) => void): void; + public off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; /** * Detach Batch Succeeded Event * @@ -298,7 +310,7 @@ export class SearchIndexingBufferedSender { */ public off(event: "batchFailed", listener: (e: RestError) => void): void; public off( - event: "batchAdded" | "batchSent" | "batchSucceeded" | "batchFailed", + event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed", listener: (e: any) => void ): void { this.emitter.removeListener(event, listener); @@ -326,8 +338,8 @@ export class SearchIndexingBufferedSender { retryAttempt: number = 0 ): Promise { try { - for (let i = 0; i < actionsToSend.length; i++) { - this.emitter.emit("batchSent", actionsToSend[i]); + for(const action of actionsToSend) { + this.emitter.emit("beforeDocumentSent", action); } const result = await this.client.indexDocuments( new IndexDocumentsBatch(actionsToSend), @@ -336,7 +348,7 @@ export class SearchIndexingBufferedSender { // raise success event this.emitter.emit("batchSucceeded", result); } catch (e) { - if (this.isRetryAbleError(e) && retryAttempt < RETRY_COUNT) { + if (this.isRetryAbleError(e) && retryAttempt < DEFAULT_RETRY_COUNT) { this.submitDocuments(actionsToSend, options, retryAttempt + 1); } else { this.emitter.emit("batchFailed", e); From 1e41067d1e72c6064154c737635911da9f4b5aa7 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Thu, 1 Oct 2020 14:52:48 -0700 Subject: [PATCH 8/9] Formatting changes --- sdk/search/search-documents/src/index.ts | 7 ++++++- .../search-documents/src/searchIndexingBufferedSender.ts | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/search/search-documents/src/index.ts b/sdk/search/search-documents/src/index.ts index 5c21060a8549..f544e6842394 100644 --- a/sdk/search/search-documents/src/index.ts +++ b/sdk/search/search-documents/src/index.ts @@ -2,7 +2,12 @@ // Licensed under the MIT license. export { SearchClient, SearchClientOptions } from "./searchClient"; -export { SearchIndexingBufferedSender, DEFAULT_BATCH_SIZE, DEFAULT_FLUSH_WINDOW, DEFAULT_RETRY_COUNT } from "./searchIndexingBufferedSender"; +export { + SearchIndexingBufferedSender, + DEFAULT_BATCH_SIZE, + DEFAULT_FLUSH_WINDOW, + DEFAULT_RETRY_COUNT +} from "./searchIndexingBufferedSender"; export { AutocompleteRequest, AutocompleteOptions, diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts index 9925355db88e..e2e2a887a3ed 100644 --- a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -338,7 +338,7 @@ export class SearchIndexingBufferedSender { retryAttempt: number = 0 ): Promise { try { - for(const action of actionsToSend) { + for (const action of actionsToSend) { this.emitter.emit("beforeDocumentSent", action); } const result = await this.client.indexDocuments( From 767f5278045c15cdd203eb2af343412809f2bc30 Mon Sep 17 00:00:00 2001 From: Sarangan Rajamanickam Date: Thu, 1 Oct 2020 16:02:01 -0700 Subject: [PATCH 9/9] Update Message --- .../src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts | 2 +- .../src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts | 2 +- .../src/bufferedSender/uploadDocuments/manualFlush.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts index da6fd72ddda5..12070abb9461 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts @@ -71,7 +71,7 @@ export async function main() { }); bufferedClient.on("beforeDocumentSent", (response: any) => { - console.log("Batch Sent Event has been receieved...."); + console.log("Before Document Sent Event has been receieved...."); }); bufferedClient.on("batchSucceeded", (response: any) => { diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts index 6f686626a226..b415765e5551 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts @@ -46,7 +46,7 @@ export async function main() { }); bufferedClient.on("beforeDocumentSent", (response: any) => { - console.log("Batch Sent Event has been receieved...."); + console.log("Before Document Sent Event has been receieved...."); }); bufferedClient.on("batchSucceeded", (response: any) => { diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts index a4645a7bec46..6cf95aefaa9c 100644 --- a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts @@ -43,7 +43,7 @@ export async function main() { }); bufferedClient.on("beforeDocumentSent", (response: any) => { - console.log("Batch Sent Event has been receieved...."); + console.log("Before Document Sent Event has been receieved...."); }); bufferedClient.on("batchSucceeded", (response: any) => {