diff --git a/sdk/search/search-documents/package.json b/sdk/search/search-documents/package.json index 77d2c1e86003..a911548c5db4 100644 --- a/sdk/search/search-documents/package.json +++ b/sdk/search/search-documents/package.json @@ -80,7 +80,8 @@ "@azure/core-tracing": "1.0.0-preview.9", "@azure/logger": "^1.0.0", "@opentelemetry/api": "^0.10.2", - "tslib": "^2.0.0" + "tslib": "^2.0.0", + "events": "^3.0.0" }, "devDependencies": { "@azure/dev-tool": "^1.0.0", diff --git a/sdk/search/search-documents/review/search-documents.api.md b/sdk/search/search-documents/review/search-documents.api.md index 5b07a287af10..68cede724ba6 100644 --- a/sdk/search/search-documents/review/search-documents.api.md +++ b/sdk/search/search-documents/review/search-documents.api.md @@ -9,6 +9,7 @@ import { KeyCredential } from '@azure/core-auth'; import { OperationOptions } from '@azure/core-http'; import { PagedAsyncIterableIterator } from '@azure/core-paging'; import { PipelineOptions } from '@azure/core-http'; +import { RestError } from '@azure/core-http'; // @public export interface AnalyzedTokenInfo { @@ -229,6 +230,15 @@ export type DataChangeDetectionPolicy = HighWaterMarkChangeDetectionPolicy | Sql // @public export type DataDeletionDetectionPolicy = SoftDeleteColumnDeletionDetectionPolicy; +// @public +export const DEFAULT_BATCH_SIZE: number; + +// @public +export const DEFAULT_FLUSH_WINDOW: number; + +// @public +export const DEFAULT_RETRY_COUNT: number; + // @public export interface DefaultCognitiveServicesAccount { description?: string; @@ -1028,6 +1038,7 @@ export class SearchClient { readonly endpoint: string; getDocument(key: string, options?: GetDocumentOptions): Promise; getDocumentsCount(options?: CountDocumentsOptions): Promise; + getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender; indexDocuments(batch: IndexDocumentsBatch, options?: IndexDocumentsOptions): Promise; readonly indexName: string; mergeDocuments(documents: T[], options?: MergeDocumentsOptions): Promise; @@ -1222,6 +1233,51 @@ export interface SearchIndexerWarning { readonly name?: string; } +// @public +export class SearchIndexingBufferedSender { + constructor(client: SearchClient, options?: SearchIndexingBufferedSenderOptions); + deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise; + dispose(): Promise; + flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise; + mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise; + mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise; + off(event: "batchAdded", listener: (e: { + action: string; + documents: T[]; + }) => void): void; + off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; + off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + off(event: "batchFailed", listener: (e: RestError) => void): void; + on(event: "batchAdded", listener: (e: { + action: string; + documents: T[]; + }) => void): void; + on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; + on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + on(event: "batchFailed", listener: (e: RestError) => void): void; + uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise; +} + +// @public +export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions; + +// @public +export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions; + +// @public +export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions; + +// @public +export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions; + +// @public +export interface SearchIndexingBufferedSenderOptions { + autoFlush?: boolean; +} + +// @public +export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; + // @public export interface SearchIndexStatistics { readonly documentCount: number; diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts new file mode 100644 index 000000000000..12070abb9461 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts @@ -0,0 +1,106 @@ +import { + SearchIndexingBufferedSender, + AzureKeyCredential, + SearchClient, + GeographyPoint, + SearchIndexClient +} from "@azure/search-documents"; +import { createIndex, WAIT_TIME } from "../../utils/setup"; +import { Hotel } from "../../utils/interfaces"; +import { delay } from "@azure/core-http"; + +/** + * This sample is to demonstrate the use of SearchIndexingBufferedSender. + * In this sample, the autoFlush is set to true. i.e. the user does not + * want to call the flush manually. The upload action happen automatically + * when the size of the batch is greater than threshold (which is set to 1000) + * by default. + */ +const endpoint = process.env.SEARCH_API_ENDPOINT || ""; +const apiKey = process.env.SEARCH_API_KEY || ""; +const TEST_INDEX_NAME = "hotel-live-sample-test3"; + +function getDocumentsArray(size: number): Hotel[] { + const array: Hotel[] = []; + for (let i = 1; i <= size; i++) { + array.push({ + hotelId: `${i}`, + description: + "Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " + + "and a really helpful concierge. The location is perfect -- right downtown, close to all " + + "the tourist attractions. We highly recommend this hotel.", + descriptionFr: + "Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " + + "à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " + + "centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " + + "cet hôtel.", + hotelName: "Fancy Stay", + category: "Luxury", + tags: ["pool", "view", "wifi", "concierge"], + parkingIncluded: false, + lastRenovationDate: new Date(2010, 5, 27), + rating: 5, + location: new GeographyPoint(47.678581, -122.131577) + }); + } + return array; +} + +export async function main() { + console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Sizes Sample`); + + const credential = new AzureKeyCredential(apiKey); + const searchClient: SearchClient = new SearchClient( + endpoint, + TEST_INDEX_NAME, + credential + ); + const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); + + await createIndex(indexClient, TEST_INDEX_NAME); + await delay(WAIT_TIME); + + const bufferedClient: SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance( + { + autoFlush: true + } + ); + + bufferedClient.on("batchAdded", (response: any) => { + console.log("Batch Added Event has been receieved...."); + }); + + bufferedClient.on("beforeDocumentSent", (response: any) => { + console.log("Before Document Sent Event has been receieved...."); + }); + + bufferedClient.on("batchSucceeded", (response: any) => { + console.log("Batch Succeeded Event has been receieved...."); + console.log(response); + }); + + bufferedClient.on("batchFailed", (response: any) => { + console.log("Batch Failed Event has been receieved...."); + console.log(response); + }); + + const documents: Hotel[] = getDocumentsArray(1001); + bufferedClient.uploadDocuments(documents); + + await WAIT_TIME; + + let count = await searchClient.getDocumentsCount(); + while (count !== documents.length) { + await delay(WAIT_TIME); + count = await searchClient.getDocumentsCount(); + } + + // When the autoFlush is set to true, the user + // has to call the dispose method to clear the + // timer. + bufferedClient.dispose(); + await indexClient.deleteIndex(TEST_INDEX_NAME); + await delay(WAIT_TIME); +} + +main(); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts new file mode 100644 index 000000000000..b415765e5551 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushTimerBased.ts @@ -0,0 +1,96 @@ +import { + SearchIndexingBufferedSender, + AzureKeyCredential, + SearchClient, + GeographyPoint, + SearchIndexClient, + DEFAULT_FLUSH_WINDOW +} from "@azure/search-documents"; +import { createIndex, WAIT_TIME } from "../../utils/setup"; +import { Hotel } from "../../utils/interfaces"; +import { delay } from "@azure/core-http"; + +/** + * This sample is to demonstrate the use of SearchIndexingBufferedSender. + * In this sample, the autoFlush is set to true. i.e. the user does not + * want to call the flush manually. The upload action happen automatically + * when the time interval is met. The time interval is set to 60000ms + * by default. + */ +const endpoint = process.env.SEARCH_API_ENDPOINT || ""; +const apiKey = process.env.SEARCH_API_KEY || ""; +const TEST_INDEX_NAME = "hotel-live-sample-test2"; + +export async function main() { + console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Timer Sample`); + + const credential = new AzureKeyCredential(apiKey); + const searchClient: SearchClient = new SearchClient( + endpoint, + TEST_INDEX_NAME, + credential + ); + const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); + + await createIndex(indexClient, TEST_INDEX_NAME); + await delay(WAIT_TIME); + + const bufferedClient: SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance( + { + autoFlush: true + } + ); + + bufferedClient.on("batchAdded", (response: any) => { + console.log("Batch Added Event has been receieved...."); + }); + + bufferedClient.on("beforeDocumentSent", (response: any) => { + console.log("Before Document Sent Event has been receieved...."); + }); + + bufferedClient.on("batchSucceeded", (response: any) => { + console.log("Batch Succeeded Event has been receieved...."); + console.log(response); + }); + + bufferedClient.on("batchFailed", (response: any) => { + console.log("Batch Failed Event has been receieved...."); + console.log(response); + }); + + bufferedClient.uploadDocuments([ + { + hotelId: "1", + description: + "Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " + + "and a really helpful concierge. The location is perfect -- right downtown, close to all " + + "the tourist attractions. We highly recommend this hotel.", + descriptionFr: + "Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " + + "à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " + + "centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " + + "cet hôtel.", + hotelName: "Fancy Stay", + category: "Luxury", + tags: ["pool", "view", "wifi", "concierge"], + parkingIncluded: false, + lastRenovationDate: new Date(2010, 5, 27), + rating: 5, + location: new GeographyPoint(47.678581, -122.131577) + } + ]); + + const wait_time = DEFAULT_FLUSH_WINDOW + 5000; + console.log(`Waiting for ${wait_time} ms to meet the flush window interval....`); + await delay(wait_time); + + // When the autoFlush is set to true, the user + // has to call the dispose method to clear the + // timer. + bufferedClient.dispose(); + await indexClient.deleteIndex(TEST_INDEX_NAME); + await delay(WAIT_TIME); +} + +main(); diff --git a/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts new file mode 100644 index 000000000000..6cf95aefaa9c --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/manualFlush.ts @@ -0,0 +1,87 @@ +import { + SearchIndexingBufferedSender, + AzureKeyCredential, + SearchClient, + GeographyPoint, + SearchIndexClient +} from "@azure/search-documents"; +import { createIndex, WAIT_TIME } from "../../utils/setup"; +import { Hotel } from "../../utils/interfaces"; +import { delay } from "@azure/core-http"; + +/** + * This sample is to demonstrate the use of SearchIndexingBufferedSender. + * In this sample, the autoFlush is set to false. i.e. the user + * wants to call the flush manually. + */ +const endpoint = process.env.SEARCH_API_ENDPOINT || ""; +const apiKey = process.env.SEARCH_API_KEY || ""; +const TEST_INDEX_NAME = "hotel-live-sample-test1"; + +export async function main() { + console.log(`Running SearchIndexingBufferedSender-uploadDocuments-Without AutoFlush Sample`); + + const credential = new AzureKeyCredential(apiKey); + const searchClient: SearchClient = new SearchClient( + endpoint, + TEST_INDEX_NAME, + credential + ); + const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential); + + await createIndex(indexClient, TEST_INDEX_NAME); + await delay(WAIT_TIME); + + const bufferedClient: SearchIndexingBufferedSender = searchClient.getSearchIndexingBufferedSenderInstance( + { + autoFlush: false + } + ); + + bufferedClient.on("batchAdded", (response: any) => { + console.log("Batch Added Event has been receieved...."); + }); + + bufferedClient.on("beforeDocumentSent", (response: any) => { + console.log("Before Document Sent Event has been receieved...."); + }); + + bufferedClient.on("batchSucceeded", (response: any) => { + console.log("Batch Succeeded Event has been receieved...."); + console.log(response); + }); + + bufferedClient.on("batchFailed", (response: any) => { + console.log("Batch Failed Event has been receieved...."); + console.log(response); + }); + + bufferedClient.uploadDocuments([ + { + hotelId: "1", + description: + "Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " + + "and a really helpful concierge. The location is perfect -- right downtown, close to all " + + "the tourist attractions. We highly recommend this hotel.", + descriptionFr: + "Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " + + "à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " + + "centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " + + "cet hôtel.", + hotelName: "Fancy Stay", + category: "Luxury", + tags: ["pool", "view", "wifi", "concierge"], + parkingIncluded: false, + lastRenovationDate: new Date(2010, 5, 27), + rating: 5, + location: new GeographyPoint(47.678581, -122.131577) + } + ]); + + await bufferedClient.flush(); + bufferedClient.dispose(); + await indexClient.deleteIndex(TEST_INDEX_NAME); + await delay(WAIT_TIME); +} + +main(); diff --git a/sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts b/sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts new file mode 100644 index 000000000000..35a81e351321 --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/utils/interfaces.ts @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { GeographyPoint } from "@azure/search-documents"; + +export interface Hotel { + hotelId: string; + hotelName?: string | null; + description?: string | null; + descriptionFr?: string | null; + category?: string | null; + tags?: string[] | null; + parkingIncluded?: boolean | null; + smokingAllowed?: boolean | null; + lastRenovationDate?: Date | null; + rating?: number | null; + location?: GeographyPoint | null; + address?: { + streetAddress?: string | null; + city?: string | null; + stateProvince?: string | null; + postalCode?: string | null; + country?: string | null; + } | null; + rooms?: Array<{ + description?: string | null; + descriptionFr?: string | null; + type?: string | null; + baseRate?: number | null; + bedOptions?: string | null; + sleepsCount?: number | null; + smokingAllowed?: boolean | null; + tags?: string[] | null; + }> | null; +} diff --git a/sdk/search/search-documents/samples/typescript/src/utils/setup.ts b/sdk/search/search-documents/samples/typescript/src/utils/setup.ts new file mode 100644 index 000000000000..07898f8702dd --- /dev/null +++ b/sdk/search/search-documents/samples/typescript/src/utils/setup.ts @@ -0,0 +1,218 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { SearchIndexClient, SearchIndex, KnownAnalyzerNames } from "@azure/search-documents"; + +export const WAIT_TIME = 4000; + +// eslint-disable-next-line @azure/azure-sdk/ts-use-interface-parameters +export async function createIndex(client: SearchIndexClient, name: string): Promise { + const hotelIndex: SearchIndex = { + name, + fields: [ + { + type: "Edm.String", + name: "hotelId", + key: true, + filterable: true, + sortable: true + }, + { + type: "Edm.String", + name: "hotelName", + searchable: true, + filterable: true, + sortable: true + }, + { + type: "Edm.String", + name: "description", + searchable: true, + analyzerName: KnownAnalyzerNames.EnLucene + }, + { + type: "Edm.String", + name: "descriptionFr", + searchable: true, + analyzerName: KnownAnalyzerNames.FrLucene + }, + { + type: "Edm.String", + name: "category", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Collection(Edm.String)", + name: "tags", + searchable: true, + filterable: true, + facetable: true + }, + { + type: "Edm.Boolean", + name: "parkingIncluded", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.Boolean", + name: "smokingAllowed", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.DateTimeOffset", + name: "lastRenovationDate", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.Double", + name: "rating", + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.GeographyPoint", + name: "location", + filterable: true, + sortable: true + }, + { + type: "Edm.ComplexType", + name: "address", + fields: [ + { + type: "Edm.String", + name: "streetAddress", + searchable: true + }, + { + type: "Edm.String", + name: "city", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.String", + name: "stateProvince", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.String", + name: "country", + searchable: true, + filterable: true, + sortable: true, + facetable: true + }, + { + type: "Edm.String", + name: "postalCode", + searchable: true, + filterable: true, + sortable: true, + facetable: true + } + ] + }, + { + type: "Collection(Edm.ComplexType)", + name: "rooms", + fields: [ + { + type: "Edm.String", + name: "description", + searchable: true, + analyzerName: KnownAnalyzerNames.EnLucene + }, + { + type: "Edm.String", + name: "descriptionFr", + searchable: true, + analyzerName: KnownAnalyzerNames.FrLucene + }, + { + type: "Edm.String", + name: "type", + searchable: true, + filterable: true, + facetable: true + }, + { + type: "Edm.Double", + name: "baseRate", + filterable: true, + facetable: true + }, + { + type: "Edm.String", + name: "bedOptions", + searchable: true, + filterable: true, + facetable: true + }, + { + type: "Edm.Int32", + name: "sleepsCount", + filterable: true, + facetable: true + }, + { + type: "Edm.Boolean", + name: "smokingAllowed", + filterable: true, + facetable: true + }, + { + type: "Collection(Edm.String)", + name: "tags", + searchable: true, + filterable: true, + facetable: true + } + ] + } + ], + suggesters: [ + { + name: "sg", + sourceFields: ["description", "hotelName"] + } + ], + scoringProfiles: [ + { + name: "nearest", + functionAggregation: "sum", + functions: [ + { + type: "distance", + fieldName: "location", + boost: 2, + parameters: { + referencePointParameter: "myloc", + boostingDistance: 100 + } + } + ] + } + ], + corsOptions: { + // for browser tests + allowedOrigins: ["*"] + } + }; + await client.createIndex(hotelIndex); +} diff --git a/sdk/search/search-documents/src/index.ts b/sdk/search/search-documents/src/index.ts index 7f50e7a83d6a..f544e6842394 100644 --- a/sdk/search/search-documents/src/index.ts +++ b/sdk/search/search-documents/src/index.ts @@ -2,6 +2,12 @@ // Licensed under the MIT license. export { SearchClient, SearchClientOptions } from "./searchClient"; +export { + SearchIndexingBufferedSender, + DEFAULT_BATCH_SIZE, + DEFAULT_FLUSH_WINDOW, + DEFAULT_RETRY_COUNT +} from "./searchIndexingBufferedSender"; export { AutocompleteRequest, AutocompleteOptions, @@ -25,7 +31,13 @@ export { SuggestOptions, MergeDocumentsOptions, MergeOrUploadDocumentsOptions, - UploadDocumentsOptions + UploadDocumentsOptions, + SearchIndexingBufferedSenderOptions, + SearchIndexingBufferedSenderDeleteDocumentsOptions, + SearchIndexingBufferedSenderFlushDocumentsOptions, + SearchIndexingBufferedSenderMergeDocumentsOptions, + SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, + SearchIndexingBufferedSenderUploadDocumentsOptions } from "./indexModels"; export { SearchIndexClient, SearchIndexClientOptions } from "./searchIndexClient"; export { SearchIndexerClient, SearchIndexerClientOptions } from "./searchIndexerClient"; diff --git a/sdk/search/search-documents/src/indexModels.ts b/sdk/search/search-documents/src/indexModels.ts index 5387d02b97da..4d32ffd10cce 100644 --- a/sdk/search/search-documents/src/indexModels.ts +++ b/sdk/search/search-documents/src/indexModels.ts @@ -29,6 +29,37 @@ export type SearchOptions = OperationOptions & SearchRequestOptions = OperationOptions & SuggestRequest; +/** + * Options for SearchIndexingBufferedSender. + */ +export interface SearchIndexingBufferedSenderOptions { + /** + * Indicates if autoFlush is enabled. + */ + autoFlush?: boolean; +} + +/** + * Options for SearchIndexingBufferedSenderUploadDocuments. + */ +export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderMergeDocuments. + */ +export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderMergeOrUploadDocuments. + */ +export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderDeleteDocuments. + */ +export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions; +/** + * Options for SearchIndexingBufferedSenderFlushDocuments. + */ +export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions; + /** * Options for retrieving a single document. */ diff --git a/sdk/search/search-documents/src/searchClient.ts b/sdk/search/search-documents/src/searchClient.ts index 28025c8053e4..83bb43d456ca 100644 --- a/sdk/search/search-documents/src/searchClient.ts +++ b/sdk/search/search-documents/src/searchClient.ts @@ -41,12 +41,14 @@ import { DeleteDocumentsOptions, SearchDocumentsPageResult, MergeOrUploadDocumentsOptions, - SearchRequest + SearchRequest, + SearchIndexingBufferedSenderOptions } from "./indexModels"; import { odataMetadataPolicy } from "./odataMetadataPolicy"; import { IndexDocumentsBatch } from "./indexDocumentsBatch"; import { encode, decode } from "./base64"; import * as utils from "./serviceUtils"; +import { SearchIndexingBufferedSender } from "."; /** * Client options used to configure Cognitive Search API requests. @@ -616,6 +618,18 @@ export class SearchClient { } } + /** + * Gets an instance of SearchIndexingBufferedSender. + * + * @param options SearchIndexingBufferedSender Options + */ + + public getSearchIndexingBufferedSenderInstance( + options: SearchIndexingBufferedSenderOptions = {} + ): SearchIndexingBufferedSender { + return new SearchIndexingBufferedSender(this, options); + } + private encodeContinuationToken( nextLink: string | undefined, nextPageParameters: SearchRequest | undefined diff --git a/sdk/search/search-documents/src/searchIndexingBufferedSender.ts b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts new file mode 100644 index 000000000000..e2e2a887a3ed --- /dev/null +++ b/sdk/search/search-documents/src/searchIndexingBufferedSender.ts @@ -0,0 +1,363 @@ +import { SearchClient } from "./searchClient"; +import { IndexDocumentsBatch } from "./indexDocumentsBatch"; +import { + IndexDocumentsAction, + SearchIndexingBufferedSenderOptions, + SearchIndexingBufferedSenderUploadDocumentsOptions, + SearchIndexingBufferedSenderMergeDocumentsOptions, + SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions, + SearchIndexingBufferedSenderDeleteDocumentsOptions, + SearchIndexingBufferedSenderFlushDocumentsOptions +} from "./indexModels"; +import { IndexDocumentsResult } from "./generated/data/models"; +import { RestError, OperationOptions } from "@azure/core-http"; +import EventEmitter from "events"; +import { createSpan } from "./tracing"; +import { CanonicalCode } from "@opentelemetry/api"; + +/** + * Default Batch Size + */ +export const DEFAULT_BATCH_SIZE: number = 1000; +/** + * Default window flush interval + */ +export const DEFAULT_FLUSH_WINDOW: number = 60000; +/** + * Default number of times to retry + */ +export const DEFAULT_RETRY_COUNT: number = 3; + +/** + * Class used to perform buffered operations against a search index, + * including adding, updating, and removing them. + */ +export class SearchIndexingBufferedSender { + /** + * Search Client used to call the underlying IndexBatch operations. + */ + private client: SearchClient; + /** + * Indicates if autoFlush is enabled. + */ + private autoFlush: boolean; + /** + * Interval between flushes (in milliseconds). + */ + private flushWindowInMs: number; + /** + * Size of the batch. + */ + private batchSize: number; + /** + * Batch object used to complete the service call. + */ + private batchObject: IndexDocumentsBatch; + /** + * Clean up for the timer + */ + private cleanupTimer?: () => void; + /** + * Event emitter/publisher used in the Buffered Sender + */ + private readonly emitter = new EventEmitter(); + + /** + * Creates a new instance of SearchIndexingBufferedSender. + * + * @param client Search Client used to call the underlying IndexBatch operations. + * @param options Options to modify auto flush. + * + */ + constructor(client: SearchClient, options: SearchIndexingBufferedSenderOptions = {}) { + this.client = client; + this.autoFlush = options.autoFlush ?? false; + this.flushWindowInMs = DEFAULT_FLUSH_WINDOW; + this.batchSize = DEFAULT_BATCH_SIZE; + this.batchObject = new IndexDocumentsBatch(); + if (this.autoFlush) { + const interval = setInterval(() => this.flush(), this.flushWindowInMs); + interval?.unref(); + this.cleanupTimer = () => { + clearInterval(interval); + }; + } + } + + /** + * Uploads the documents/Adds the documents to the upload queue. + * + * @param documents Documents to be uploaded. + * @param options Upload options. + */ + public async uploadDocuments( + documents: T[], + options: SearchIndexingBufferedSenderUploadDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-uploadDocuments", + options + ); + try { + this.batchObject.upload(documents); + this.emitter.emit("batchAdded", { + action: "upload", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + /** + * Merges the documents/Adds the documents to the merge queue. + * + * @param documents Documents to be merged. + * @param options Upload options. + */ + public async mergeDocuments( + documents: T[], + options: SearchIndexingBufferedSenderMergeDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-mergeDocuments", + options + ); + try { + this.batchObject.merge(documents); + this.emitter.emit("batchAdded", { + action: "merge", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + /** + * Merges/Uploads the documents/Adds the documents to the merge/upload queue. + * + * @param documents Documents to be merged/uploaded. + * @param options Upload options. + */ + public async mergeOrUploadDocuments( + documents: T[], + options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-mergeOrUploadDocuments", + options + ); + try { + this.batchObject.mergeOrUpload(documents); + this.emitter.emit("batchAdded", { + action: "mergeOrUpload", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + /** + * Deletes the documents/Adds the documents to the delete queue. + * + * @param documents Documents to be deleted. + * @param options Upload options. + */ + public async deleteDocuments( + documents: T[], + options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan( + "SearchIndexingBufferedSender-deleteDocuments", + options + ); + try { + this.batchObject.delete(documents); + this.emitter.emit("batchAdded", { + action: "delete", + documents + }); + return this.internalFlush(false, updatedOptions); + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + /** + * Flushes the queue manually. + * + * @param options Flush options. + */ + public async flush( + options: SearchIndexingBufferedSenderFlushDocumentsOptions = {} + ): Promise { + const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-flush", options); + try { + if (this.batchObject.actions.length > 0) { + return this.internalFlush(true, updatedOptions); + } + } catch (e) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: e.message + }); + throw e; + } finally { + span.end(); + } + } + + /** + * If using autoFlush: true, call this to cleanup the autoflush timer. + */ + public async dispose(): Promise { + if (this.batchObject.actions.length > 0) { + await this.internalFlush(true); + } + this.cleanupTimer && this.cleanupTimer(); + } + + /** + * Attach Batch Added Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public on(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void; + /** + * Attach Batch Sent Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; + /** + * Attach Batch Succeeded Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + /** + * Attach Batch Failed Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public on(event: "batchFailed", listener: (e: RestError) => void): void; + public on( + event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed", + listener: (e: any) => void + ): void { + this.emitter.on(event, listener); + } + + /** + * Detach Batch Added Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public off(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void; + /** + * Detach Batch Sent Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction) => void): void; + /** + * Detach Batch Succeeded Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void; + /** + * Detach Batch Failed Event + * + * @param event Event to be emitted + * @param listener Event Listener + */ + public off(event: "batchFailed", listener: (e: RestError) => void): void; + public off( + event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed", + listener: (e: any) => void + ): void { + this.emitter.removeListener(event, listener); + } + + private isBatchReady(): boolean { + return this.batchObject.actions.length >= this.batchSize; + } + + private async internalFlush(force: boolean, options: OperationOptions = {}): Promise { + if (force || (this.autoFlush && this.isBatchReady())) { + // Split it + const actions: IndexDocumentsAction[] = this.batchObject.actions; + this.batchObject = new IndexDocumentsBatch(); + while (actions.length > 0) { + const actionsToSend = actions.splice(0, this.batchSize); + await this.submitDocuments(actionsToSend, options); + } + } + } + + private async submitDocuments( + actionsToSend: IndexDocumentsAction[], + options: OperationOptions, + retryAttempt: number = 0 + ): Promise { + try { + for (const action of actionsToSend) { + this.emitter.emit("beforeDocumentSent", action); + } + const result = await this.client.indexDocuments( + new IndexDocumentsBatch(actionsToSend), + options + ); + // raise success event + this.emitter.emit("batchSucceeded", result); + } catch (e) { + if (this.isRetryAbleError(e) && retryAttempt < DEFAULT_RETRY_COUNT) { + this.submitDocuments(actionsToSend, options, retryAttempt + 1); + } else { + this.emitter.emit("batchFailed", e); + throw e; + } + } + } + + private isRetryAbleError(e: any): boolean { + return e.code && (e.code === "422" || e.code === "409" || e.code === "503"); + } +}