Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Commit for Advanced Batching #11544

Merged
merged 9 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/search/search-documents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@
"@azure/core-tracing": "1.0.0-preview.9",
"@azure/logger": "^1.0.0",
"@opentelemetry/api": "^0.10.2",
"tslib": "^2.0.0"
"tslib": "^2.0.0",
"events": "^3.0.0"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down
56 changes: 56 additions & 0 deletions sdk/search/search-documents/review/search-documents.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { KeyCredential } from '@azure/core-auth';
import { OperationOptions } from '@azure/core-http';
import { PagedAsyncIterableIterator } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RestError } from '@azure/core-http';

// @public
export interface AnalyzedTokenInfo {
Expand Down Expand Up @@ -229,6 +230,15 @@ export type DataChangeDetectionPolicy = HighWaterMarkChangeDetectionPolicy | Sql
// @public
export type DataDeletionDetectionPolicy = SoftDeleteColumnDeletionDetectionPolicy;

// @public
export const DEFAULT_BATCH_SIZE: number;

// @public
export const DEFAULT_FLUSH_WINDOW: number;

// @public
export const DEFAULT_RETRY_COUNT: number;

// @public
export interface DefaultCognitiveServicesAccount {
description?: string;
Expand Down Expand Up @@ -1028,6 +1038,7 @@ export class SearchClient<T> {
readonly endpoint: string;
getDocument<Fields extends keyof T>(key: string, options?: GetDocumentOptions<Fields>): Promise<T>;
getDocumentsCount(options?: CountDocumentsOptions): Promise<number>;
getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender<T>;
indexDocuments(batch: IndexDocumentsBatch<T>, options?: IndexDocumentsOptions): Promise<IndexDocumentsResult>;
readonly indexName: string;
mergeDocuments(documents: T[], options?: MergeDocumentsOptions): Promise<IndexDocumentsResult>;
Expand Down Expand Up @@ -1222,6 +1233,51 @@ export interface SearchIndexerWarning {
readonly name?: string;
}

// @public
export class SearchIndexingBufferedSender<T> {
constructor(client: SearchClient<T>, options?: SearchIndexingBufferedSenderOptions);
deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise<void>;
dispose(): Promise<void>;
flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise<void>;
mergeDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeDocumentsOptions): Promise<void>;
mergeOrUploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions): Promise<void>;
off(event: "batchAdded", listener: (e: {
action: string;
documents: T[];
}) => void): void;
off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
off(event: "batchFailed", listener: (e: RestError) => void): void;
on(event: "batchAdded", listener: (e: {
action: string;
documents: T[];
}) => void): void;
on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
on(event: "batchFailed", listener: (e: RestError) => void): void;
uploadDocuments(documents: T[], options?: SearchIndexingBufferedSenderUploadDocumentsOptions): Promise<void>;
}

// @public
export type SearchIndexingBufferedSenderDeleteDocumentsOptions = OperationOptions;

// @public
export type SearchIndexingBufferedSenderFlushDocumentsOptions = OperationOptions;

// @public
export type SearchIndexingBufferedSenderMergeDocumentsOptions = OperationOptions;

// @public
export type SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = OperationOptions;

// @public
export interface SearchIndexingBufferedSenderOptions {
autoFlush?: boolean;
}

// @public
export type SearchIndexingBufferedSenderUploadDocumentsOptions = OperationOptions;

// @public
export interface SearchIndexStatistics {
readonly documentCount: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import {
SearchIndexingBufferedSender,
AzureKeyCredential,
SearchClient,
GeographyPoint,
SearchIndexClient
} from "@azure/search-documents";
import { createIndex, WAIT_TIME } from "../../utils/setup";
import { Hotel } from "../../utils/interfaces";
import { delay } from "@azure/core-http";

/**
* This sample is to demonstrate the use of SearchIndexingBufferedSender.
* In this sample, the autoFlush is set to true. i.e. the user does not
* want to call the flush manually. The upload action happen automatically
* when the size of the batch is greater than threshold (which is set to 1000)
* by default.
*/
const endpoint = process.env.SEARCH_API_ENDPOINT || "";
const apiKey = process.env.SEARCH_API_KEY || "";
const TEST_INDEX_NAME = "hotel-live-sample-test3";

function getDocumentsArray(size: number): Hotel[] {
const array: Hotel[] = [];
for (let i = 1; i <= size; i++) {
array.push({
hotelId: `${i}`,
description:
"Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " +
"and a really helpful concierge. The location is perfect -- right downtown, close to all " +
"the tourist attractions. We highly recommend this hotel.",
descriptionFr:
"Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " +
"à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " +
"centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " +
"cet hôtel.",
hotelName: "Fancy Stay",
category: "Luxury",
tags: ["pool", "view", "wifi", "concierge"],
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
});
}
return array;
}

export async function main() {
console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Sizes Sample`);

const credential = new AzureKeyCredential(apiKey);
const searchClient: SearchClient<Hotel> = new SearchClient<Hotel>(
endpoint,
TEST_INDEX_NAME,
credential
);
const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential);

await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);

const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: true
}
);

bufferedClient.on("batchAdded", (response: any) => {
console.log("Batch Added Event has been receieved....");
});

bufferedClient.on("beforeDocumentSent", (response: any) => {
console.log("Before Document Sent Event has been receieved....");
});

bufferedClient.on("batchSucceeded", (response: any) => {
console.log("Batch Succeeded Event has been receieved....");
console.log(response);
});

bufferedClient.on("batchFailed", (response: any) => {
console.log("Batch Failed Event has been receieved....");
console.log(response);
});

const documents: Hotel[] = getDocumentsArray(1001);
bufferedClient.uploadDocuments(documents);

await WAIT_TIME;

let count = await searchClient.getDocumentsCount();
while (count !== documents.length) {
await delay(WAIT_TIME);
count = await searchClient.getDocumentsCount();
}

// When the autoFlush is set to true, the user
// has to call the dispose method to clear the
// timer.
bufferedClient.dispose();
await indexClient.deleteIndex(TEST_INDEX_NAME);
await delay(WAIT_TIME);
}

main();
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import {
SearchIndexingBufferedSender,
AzureKeyCredential,
SearchClient,
GeographyPoint,
SearchIndexClient,
DEFAULT_FLUSH_WINDOW
} from "@azure/search-documents";
import { createIndex, WAIT_TIME } from "../../utils/setup";
import { Hotel } from "../../utils/interfaces";
import { delay } from "@azure/core-http";

/**
* This sample is to demonstrate the use of SearchIndexingBufferedSender.
* In this sample, the autoFlush is set to true. i.e. the user does not
* want to call the flush manually. The upload action happen automatically
* when the time interval is met. The time interval is set to 60000ms
* by default.
*/
const endpoint = process.env.SEARCH_API_ENDPOINT || "";
const apiKey = process.env.SEARCH_API_KEY || "";
const TEST_INDEX_NAME = "hotel-live-sample-test2";

export async function main() {
console.log(`Running SearchIndexingBufferedSender-uploadDocuments-With Auto Flush Timer Sample`);

const credential = new AzureKeyCredential(apiKey);
const searchClient: SearchClient<Hotel> = new SearchClient<Hotel>(
endpoint,
TEST_INDEX_NAME,
credential
);
const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential);

await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);

const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: true
}
);

bufferedClient.on("batchAdded", (response: any) => {
console.log("Batch Added Event has been receieved....");
});

bufferedClient.on("beforeDocumentSent", (response: any) => {
console.log("Before Document Sent Event has been receieved....");
});

bufferedClient.on("batchSucceeded", (response: any) => {
console.log("Batch Succeeded Event has been receieved....");
console.log(response);
});

bufferedClient.on("batchFailed", (response: any) => {
console.log("Batch Failed Event has been receieved....");
console.log(response);
});

bufferedClient.uploadDocuments([
{
hotelId: "1",
description:
"Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " +
"and a really helpful concierge. The location is perfect -- right downtown, close to all " +
"the tourist attractions. We highly recommend this hotel.",
descriptionFr:
"Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " +
"à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " +
"centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " +
"cet hôtel.",
hotelName: "Fancy Stay",
category: "Luxury",
tags: ["pool", "view", "wifi", "concierge"],
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
}
]);

const wait_time = DEFAULT_FLUSH_WINDOW + 5000;
console.log(`Waiting for ${wait_time} ms to meet the flush window interval....`);
await delay(wait_time);

// When the autoFlush is set to true, the user
// has to call the dispose method to clear the
// timer.
bufferedClient.dispose();
await indexClient.deleteIndex(TEST_INDEX_NAME);
await delay(WAIT_TIME);
}

main();
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {
SearchIndexingBufferedSender,
AzureKeyCredential,
SearchClient,
GeographyPoint,
SearchIndexClient
} from "@azure/search-documents";
import { createIndex, WAIT_TIME } from "../../utils/setup";
import { Hotel } from "../../utils/interfaces";
import { delay } from "@azure/core-http";

/**
* This sample is to demonstrate the use of SearchIndexingBufferedSender.
* In this sample, the autoFlush is set to false. i.e. the user
* wants to call the flush manually.
*/
const endpoint = process.env.SEARCH_API_ENDPOINT || "";
const apiKey = process.env.SEARCH_API_KEY || "";
const TEST_INDEX_NAME = "hotel-live-sample-test1";

export async function main() {
console.log(`Running SearchIndexingBufferedSender-uploadDocuments-Without AutoFlush Sample`);

const credential = new AzureKeyCredential(apiKey);
const searchClient: SearchClient<Hotel> = new SearchClient<Hotel>(
endpoint,
TEST_INDEX_NAME,
credential
);
const indexClient: SearchIndexClient = new SearchIndexClient(endpoint, credential);

await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);

const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: false
}
);

bufferedClient.on("batchAdded", (response: any) => {
console.log("Batch Added Event has been receieved....");
});

bufferedClient.on("beforeDocumentSent", (response: any) => {
console.log("Before Document Sent Event has been receieved....");
});

bufferedClient.on("batchSucceeded", (response: any) => {
console.log("Batch Succeeded Event has been receieved....");
console.log(response);
});

bufferedClient.on("batchFailed", (response: any) => {
console.log("Batch Failed Event has been receieved....");
console.log(response);
});

bufferedClient.uploadDocuments([
{
hotelId: "1",
description:
"Best hotel in town if you like luxury hotels. They have an amazing infinity pool, a spa, " +
"and a really helpful concierge. The location is perfect -- right downtown, close to all " +
"the tourist attractions. We highly recommend this hotel.",
descriptionFr:
"Meilleur hôtel en ville si vous aimez les hôtels de luxe. Ils ont une magnifique piscine " +
"à débordement, un spa et un concierge très utile. L'emplacement est parfait – en plein " +
"centre, à proximité de toutes les attractions touristiques. Nous recommandons fortement " +
"cet hôtel.",
hotelName: "Fancy Stay",
category: "Luxury",
tags: ["pool", "view", "wifi", "concierge"],
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
}
]);

await bufferedClient.flush();
bufferedClient.dispose();
await indexClient.deleteIndex(TEST_INDEX_NAME);
await delay(WAIT_TIME);
}

main();
Loading