Skip to content

Commit

Permalink
Initial Commit for Advanced Batching (#11544)
Browse files Browse the repository at this point in the history
* Initial Commit for Advanced Batching

* Formatting changes

* Additional Changes

* Formatting changes

* Additional Changes

* Formatting changes

* Response to PR Comments

* Formatting changes

* Update Message
  • Loading branch information
sarangan12 authored Oct 2, 2020
1 parent 7c069d6 commit b6b8037
Show file tree
Hide file tree
Showing 11 changed files with 1,022 additions and 3 deletions.
3 changes: 2 additions & 1 deletion sdk/search/search-documents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@
"@azure/core-tracing": "1.0.0-preview.9",
"@azure/logger": "^1.0.0",
"@opentelemetry/api": "^0.10.2",
"tslib": "^2.0.0"
"tslib": "^2.0.0",
"events": "^3.0.0"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down
56 changes: 56 additions & 0 deletions sdk/search/search-documents/review/search-documents.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { KeyCredential } from '@azure/core-auth';
import { OperationOptions } from '@azure/core-http';
import { PagedAsyncIterableIterator } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RestError } from '@azure/core-http';

// @public
export interface AnalyzedTokenInfo {
Expand Down Expand Up @@ -229,6 +230,15 @@ export type DataChangeDetectionPolicy = HighWaterMarkChangeDetectionPolicy | Sql
// @public
export type DataDeletionDetectionPolicy = SoftDeleteColumnDeletionDetectionPolicy;

// @public
export const DEFAULT_BATCH_SIZE: number;

// @public
export const DEFAULT_FLUSH_WINDOW: number;

// @public
export const DEFAULT_RETRY_COUNT: number;

// @public
export interface DefaultCognitiveServicesAccount {
description?: string;
Expand Down Expand Up @@ -1028,6 +1038,7 @@ export class SearchClient<T> {
readonly endpoint: string;
getDocument<Fields extends keyof T>(key: string, options?: GetDocumentOptions<Fields>): Promise<T>;
getDocumentsCount(options?: CountDocumentsOptions): Promise<number>;
getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender<T>;
indexDocuments(batch: IndexDocumentsBatch<T>, options?: IndexDocumentsOptions): Promise<IndexDocumentsResult>;
readonly indexName: string;
mergeDocuments(documents: T[], options?: MergeDocumentsOptions): Promise<IndexDocumentsResult>;
Expand Down Expand Up @@ -1222,6 +1233,51 @@ export interface SearchIndexerWarning {
readonly name?: string;
}

// @public
export class SearchIndexingBufferedSender<T> {
constructor(client: SearchClient<T>, options?: SearchIndexingBufferedSenderOptions);
deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise<void>;
dispose(): Promise<void>;
flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise<void>;
mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise<void>;
mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise<void>;
off(event: "batchAdded", listener: (e: {
action: string;
documents: T[];
}) => void): void;
off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
off(event: "batchFailed", listener: (e: RestError) => void): void;
on(event: "batchAdded", listener: (e: {
action: string;
documents: T[];
}) => void): void;
on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
on(event: "batchFailed", listener: (e: RestError) => void): void;
uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise<void>;
}

// @public
export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions;

// @public
export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions;

// @public
export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions;

// @public
export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions;

// @public
export interface SearchIndexingBufferedSenderOptions {
autoFlush?: boolean;
}

// @public
export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions;

// @public
export interface SearchIndexStatistics {
readonly documentCount: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import {
SearchIndexingBufferedSender,
AzureKeyCredential,
SearchClient,
GeographyPoint,
SearchIndexClient
} from "@azure/search-documents";
import { createIndex, WAIT_TIME } from "../../utils/setup";
import { Hotel } from "../../utils/interfaces";
import { delay } from "@azure/core-http";

/**
* This sample is to demonstrate the use of SearchIndexingBufferedSender.
* In this sample, the autoFlush is set to true. i.e. the user does not
* want to call the flush manually. The upload action happen automatically
* when the size of the batch is greater than threshold (which is set to 1000)
* by default.
*/
const endpoint = process.env.SEARCH_API_ENDPOINT || "";
const apiKey = process.env.SEARCH_API_KEY || "";
const TEST_INDEX_NAME = "hotel-live-sample-test3";

function getDocumentsArray(size: number): Hotel[] {
const array: Hotel[] = [];
for (let i = 1; i <= size; i++) {
array.push({
hotelId: `${i}`,
description:
"Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " +
"and a really helpful concierge. The location is perfect -- right downtown, close to all " +
"the tourist attractions. We highly recommend this hotel.",
descriptionFr:
"Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " +
"à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " +
"centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " +
"cet hôtel.",
hotelName: "Fancy Stay",
category: "Luxury",
tags: ["pool", "view", "wifi", "concierge"],
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
});
}
return array;
}

export async function main() {
console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Sizes Sample`);

const credential = new AzureKeyCredential(apiKey);
const searchClient: SearchClient<Hotel> = new SearchClient<Hotel>(
endpoint,
TEST_INDEX_NAME,
credential
);
const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential);

await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);

const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: true
}
);

bufferedClient.on("batchAdded", (response: any) => {
console.log("Batch Added Event has been receieved....");
});

bufferedClient.on("beforeDocumentSent", (response: any) => {
console.log("Before Document Sent Event has been receieved....");
});

bufferedClient.on("batchSucceeded", (response: any) => {
console.log("Batch Succeeded Event has been receieved....");
console.log(response);
});

bufferedClient.on("batchFailed", (response: any) => {
console.log("Batch Failed Event has been receieved....");
console.log(response);
});

const documents: Hotel[] = getDocumentsArray(1001);
bufferedClient.uploadDocuments(documents);

await WAIT_TIME;

let count = await searchClient.getDocumentsCount();
while (count !== documents.length) {
await delay(WAIT_TIME);
count = await searchClient.getDocumentsCount();
}

// When the autoFlush is set to true, the user
// has to call the dispose method to clear the
// timer.
bufferedClient.dispose();
await indexClient.deleteIndex(TEST_INDEX_NAME);
await delay(WAIT_TIME);
}

main();
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import {
SearchIndexingBufferedSender,
AzureKeyCredential,
SearchClient,
GeographyPoint,
SearchIndexClient,
DEFAULT_FLUSH_WINDOW
} from "@azure/search-documents";
import { createIndex, WAIT_TIME } from "../../utils/setup";
import { Hotel } from "../../utils/interfaces";
import { delay } from "@azure/core-http";

/**
* This sample is to demonstrate the use of SearchIndexingBufferedSender.
* In this sample, the autoFlush is set to true. i.e. the user does not
* want to call the flush manually. The upload action happen automatically
* when the time interval is met. The time interval is set to 60000ms
* by default.
*/
const endpoint = process.env.SEARCH_API_ENDPOINT || "";
const apiKey = process.env.SEARCH_API_KEY || "";
const TEST_INDEX_NAME = "hotel-live-sample-test2";

export async function main() {
console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Timer Sample`);

const credential = new AzureKeyCredential(apiKey);
const searchClient: SearchClient<Hotel> = new SearchClient<Hotel>(
endpoint,
TEST_INDEX_NAME,
credential
);
const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential);

await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);

const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: true
}
);

bufferedClient.on("batchAdded", (response: any) => {
console.log("Batch Added Event has been receieved....");
});

bufferedClient.on("beforeDocumentSent", (response: any) => {
console.log("Before Document Sent Event has been receieved....");
});

bufferedClient.on("batchSucceeded", (response: any) => {
console.log("Batch Succeeded Event has been receieved....");
console.log(response);
});

bufferedClient.on("batchFailed", (response: any) => {
console.log("Batch Failed Event has been receieved....");
console.log(response);
});

bufferedClient.uploadDocuments([
{
hotelId: "1",
description:
"Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " +
"and a really helpful concierge. The location is perfect -- right downtown, close to all " +
"the tourist attractions. We highly recommend this hotel.",
descriptionFr:
"Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " +
"à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " +
"centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " +
"cet hôtel.",
hotelName: "Fancy Stay",
category: "Luxury",
tags: ["pool", "view", "wifi", "concierge"],
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
}
]);

const wait_time = DEFAULT_FLUSH_WINDOW + 5000;
console.log(`Waiting for ${wait_time} ms to meet the flush window interval....`);
await delay(wait_time);

// When the autoFlush is set to true, the user
// has to call the dispose method to clear the
// timer.
bufferedClient.dispose();
await indexClient.deleteIndex(TEST_INDEX_NAME);
await delay(WAIT_TIME);
}

main();
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {
SearchIndexingBufferedSender,
AzureKeyCredential,
SearchClient,
GeographyPoint,
SearchIndexClient
} from "@azure/search-documents";
import { createIndex, WAIT_TIME } from "../../utils/setup";
import { Hotel } from "../../utils/interfaces";
import { delay } from "@azure/core-http";

/**
* This sample is to demonstrate the use of SearchIndexingBufferedSender.
* In this sample, the autoFlush is set to false. i.e. the user
* wants to call the flush manually.
*/
const endpoint = process.env.SEARCH_API_ENDPOINT || "";
const apiKey = process.env.SEARCH_API_KEY || "";
const TEST_INDEX_NAME = "hotel-live-sample-test1";

export async function main() {
console.log(`Running SearchIndexingBufferedSender-uploadDocuments-Without AutoFlush Sample`);

const credential = new AzureKeyCredential(apiKey);
const searchClient: SearchClient<Hotel> = new SearchClient<Hotel>(
endpoint,
TEST_INDEX_NAME,
credential
);
const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential);

await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);

const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: false
}
);

bufferedClient.on("batchAdded", (response: any) => {
console.log("Batch Added Event has been receieved....");
});

bufferedClient.on("beforeDocumentSent", (response: any) => {
console.log("Before Document Sent Event has been receieved....");
});

bufferedClient.on("batchSucceeded", (response: any) => {
console.log("Batch Succeeded Event has been receieved....");
console.log(response);
});

bufferedClient.on("batchFailed", (response: any) => {
console.log("Batch Failed Event has been receieved....");
console.log(response);
});

bufferedClient.uploadDocuments([
{
hotelId: "1",
description:
"Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " +
"and a really helpful concierge. The location is perfect -- right downtown, close to all " +
"the tourist attractions. We highly recommend this hotel.",
descriptionFr:
"Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " +
"à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " +
"centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " +
"cet hôtel.",
hotelName: "Fancy Stay",
category: "Luxury",
tags: ["pool", "view", "wifi", "concierge"],
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
}
]);

await bufferedClient.flush();
bufferedClient.dispose();
await indexClient.deleteIndex(TEST_INDEX_NAME);
await delay(WAIT_TIME);
}

main();
Loading

0 comments on commit b6b8037

Please sign in to comment.