Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Hierarchical Partition Key Support #23919

Merged
merged 9 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 0 additions & 61 deletions sdk/cosmosdb/cosmos/.vscode/launch.json

This file was deleted.

35 changes: 21 additions & 14 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class ClientContext {
batch<T>({ body, path, partitionKey, resourceId, options, }: {
body: T;
path: string;
partitionKey: string;
partitionKey: PartitionKey;
resourceId: string;
options?: RequestOptions;
}): Promise<Response_2<any>>;
Expand Down Expand Up @@ -584,7 +584,7 @@ export interface CreateOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Create;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: JSONObject;
}
Expand Down Expand Up @@ -694,7 +694,7 @@ export interface DeleteOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Delete;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
}

// @public (undocumented)
Expand Down Expand Up @@ -734,8 +734,10 @@ export type ExistingKeyOperation = {
path: string;
};

// @public (undocumented)
export function extractPartitionKey(document: unknown, partitionKeyDefinition: PartitionKeyDefinition): PartitionKey[];
// Warning: (ae-forgotten-export) The symbol "PartitionKeyInternal" needs to be exported by the entry point index.d.ts
//
// @public
export function extractPartitionKey(document: unknown, partitionKeyDefinition?: PartitionKeyDefinition): PartitionKeyInternal | undefined;

// @public
export interface FeedOptions extends SharedOptions {
Expand All @@ -752,7 +754,7 @@ export interface FeedOptions extends SharedOptions {
forceQueryPlan?: boolean;
maxDegreeOfParallelism?: number;
maxItemCount?: number;
partitionKey?: any;
partitionKey?: PartitionKey;
populateQueryMetrics?: boolean;
useIncrementalFeed?: boolean;
}
Expand Down Expand Up @@ -870,7 +872,7 @@ export enum IndexKind {

// @public
export class Item {
constructor(container: Container, id: string, partitionKey: PartitionKey, clientContext: ClientContext);
constructor(container: Container, id: string, clientContext: ClientContext, partitionKey?: PartitionKey);
// (undocumented)
readonly container: Container;
delete<T extends ItemDefinition = any>(options?: RequestOptions): Promise<ItemResponse<T>>;
Expand Down Expand Up @@ -900,7 +902,7 @@ export class ItemResponse<T extends ItemDefinition> extends ResourceResponse<T &
// @public
export class Items {
constructor(container: Container, clientContext: ClientContext);
batch(operations: OperationInput[], partitionKey?: string, options?: RequestOptions): Promise<Response_2<OperationResponse[]>>;
batch(operations: OperationInput[], partitionKey?: PartitionKey, options?: RequestOptions): Promise<Response_2<OperationResponse[]>>;
bulk(operations: OperationInput[], bulkOptions?: BulkOptions, options?: RequestOptions): Promise<OperationResponse[]>;
changeFeed(partitionKey: string | number | boolean, changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
changeFeed(changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
Expand Down Expand Up @@ -1074,15 +1076,20 @@ export interface PartitionedQueryExecutionInfo {
queryRanges: QueryRange[];
}

// Warning: (ae-forgotten-export) The symbol "PrimitivePartitionKeyValue" needs to be exported by the entry point index.d.ts
//
// @public (undocumented)
export type PartitionKey = PartitionKeyDefinition | string | number | unknown;
export type PartitionKey = PrimitivePartitionKeyValue | PrimitivePartitionKeyValue[];

// @public (undocumented)
export interface PartitionKeyDefinition {
// Warning: (ae-forgotten-export) The symbol "PartitionKeyKind" needs to be exported by the entry point index.d.ts
kind?: PartitionKeyKind;
paths: string[];
// (undocumented)
systemKey?: boolean;
version?: number;
// Warning: (ae-forgotten-export) The symbol "PartitionKeyDefinitionVersion" needs to be exported by the entry point index.d.ts
version?: PartitionKeyDefinitionVersion;
}

// @public (undocumented)
Expand Down Expand Up @@ -1127,7 +1134,7 @@ export interface PatchOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Patch;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: PatchRequestBody;
}
Expand Down Expand Up @@ -1381,7 +1388,7 @@ export interface ReadOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Read;
// (undocumented)
partitionKey?: string | number | boolean | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
}

// @public (undocumented)
Expand All @@ -1407,7 +1414,7 @@ export interface ReplaceOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Replace;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: JSONObject;
}
Expand Down Expand Up @@ -2014,7 +2021,7 @@ export interface UpsertOperationInput {
// (undocumented)
operationType: typeof BulkOperationType.Upsert;
// (undocumented)
partitionKey?: string | number | null | Record<string, unknown> | undefined;
partitionKey?: PartitionKey;
// (undocumented)
resourceBody: JSONObject;
}
Expand Down
15 changes: 8 additions & 7 deletions sdk/cosmosdb/cosmos/src/ClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Constants, HTTPMethod, OperationType, ResourceType } from "./common/con
import { getIdFromLink, getPathFromLink, parseLink } from "./common/helper";
import { StatusCodes, SubStatusCodes } from "./common/statusCodes";
import { Agent, CosmosClientOptions } from "./CosmosClientOptions";
import { ConnectionPolicy, ConsistencyLevel, DatabaseAccount, PartitionKey } from "./documents";
import { ConnectionPolicy, ConsistencyLevel, DatabaseAccount, PartitionKey, convertToInternalPartitionKey } from "./documents";
import { GlobalEndpointManager } from "./globalEndpointManager";
import { PluginConfig, PluginOn, executePlugins } from "./plugins/Plugin";
import { FetchFunctionCallback, SqlQuerySpec } from "./queryExecutionContext";
Expand Down Expand Up @@ -168,9 +168,9 @@ export class ClientContext {
this.applySessionToken(request);
logger.info(
"query " +
requestId +
" started" +
(request.partitionKeyRangeId ? " pkrid: " + request.partitionKeyRangeId : "")
requestId +
" started" +
(request.partitionKeyRangeId ? " pkrid: " + request.partitionKeyRangeId : "")
);
logger.verbose(request);
const start = Date.now();
Expand Down Expand Up @@ -600,7 +600,7 @@ export class ClientContext {
}: {
body: T;
path: string;
partitionKey: string;
partitionKey: PartitionKey;
resourceId: string;
options?: RequestOptions;
}): Promise<Response<any>> {
Expand Down Expand Up @@ -757,12 +757,13 @@ export class ClientContext {
options: requestContext.options,
partitionKeyRangeId: requestContext.partitionKeyRangeId,
useMultipleWriteLocations: this.connectionPolicy.useMultipleWriteLocations,
partitionKey: requestContext.partitionKey,
partitionKey: requestContext.partitionKey !== undefined ? convertToInternalPartitionKey(requestContext.partitionKey) : undefined //TODO: Move this check from here to PartitionKey
});
}

/**
* Returns collection of properties which are derived from the context for Request Creation
* Returns collection of properties which are derived from the context for Request Creation.
* These properties have client wide scope, as opposed to request specific scope.
* @returns
*/
private getContextDerivedPropsForRequestCreation(): {
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmosdb/cosmos/src/client/Container/Container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class Container {
* `const {body: replacedItem} = await container.item("<item id>", "<partition key value>").replace({id: "<item id>", title: "Updated post", authorID: 5});`
*/
public item(id: string, partitionKeyValue?: PartitionKey): Item {
return new Item(this, id, partitionKeyValue, this.clientContext);
return new Item(this, id, this.clientContext, partitionKeyValue);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions sdk/cosmosdb/cosmos/src/client/Item/Item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
ResourceType,
StatusCodes,
} from "../../common";
import { PartitionKey } from "../../documents";
import { PartitionKey, PartitionKeyInternal, convertToInternalPartitionKey } from "../../documents";
import { extractPartitionKey, undefinedPartitionKey } from "../../extractPartitionKey";
import { RequestOptions, Response } from "../../request";
import { PatchRequestBody } from "../../utils/patch";
Expand All @@ -24,7 +24,7 @@ import { ItemResponse } from "./ItemResponse";
* @see {@link Items} for operations on all items; see `container.items`.
*/
export class Item {
private partitionKey: PartitionKey;
private partitionKey: PartitionKeyInternal;
/**
* Returns a reference URL to the resource. Used for linking in Permissions.
*/
Expand All @@ -41,10 +41,10 @@ export class Item {
constructor(
public readonly container: Container,
public readonly id: string,
partitionKey: PartitionKey,
private readonly clientContext: ClientContext
private readonly clientContext: ClientContext,
partitionKey?: PartitionKey,
) {
this.partitionKey = partitionKey;
this.partitionKey = partitionKey === undefined ? undefined : convertToInternalPartitionKey(partitionKey);
}

/**
Expand Down
55 changes: 32 additions & 23 deletions sdk/cosmosdb/cosmos/src/client/Item/Items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ import { ItemResponse } from "./ItemResponse";
import {
Batch,
isKeyInRange,
Operation,
getPartitionKeyToHash,
decorateOperation,
prepareOperations,
OperationResponse,
OperationInput,
BulkOptions,
decorateBatchOperation,
} from "../../utils/batch";
import { hashV1PartitionKey } from "../../utils/hashing/v1";
import { hashV2PartitionKey } from "../../utils/hashing/v2";
import { assertNotUndefined } from "../../utils/typeChecks";
import { hashPartitionKey } from "../../utils/hashing/hash";
import { PartitionKey, PartitionKeyDefinition } from "../../documents";

/**
* @hidden
Expand Down Expand Up @@ -288,8 +287,8 @@ export class Items {
const ref = new Item(
this.container,
(response.result as any).id,
this.clientContext,
partitionKey,
this.clientContext
);
return new ItemResponse(
response.result,
Expand Down Expand Up @@ -360,8 +359,8 @@ export class Items {
const ref = new Item(
this.container,
(response.result as any).id,
this.clientContext,
partitionKey,
this.clientContext
);
return new ItemResponse(
response.result,
Expand Down Expand Up @@ -408,7 +407,8 @@ export class Items {
const { resources: partitionKeyRanges } = await this.container
.readPartitionKeyRanges()
.fetchAll();
const { resource: definition } = await this.container.getPartitionKeyDefinition();
const { resource } = await this.container.readPartitionKeyDefinition();
const partitionDefinition = assertNotUndefined(resource, "PartitionKeyDefinition.");
const batches: Batch[] = partitionKeyRanges.map((keyRange: PartitionKeyRange) => {
return {
min: keyRange.minInclusive,
Expand All @@ -418,19 +418,8 @@ export class Items {
operations: [],
};
});
operations
.map((operation) => decorateOperation(operation, definition, options))
.forEach((operation: Operation, index: number) => {
const partitionProp = definition.paths[0].replace("/", "");
const isV2 = definition.version && definition.version === 2;
const toHashKey = getPartitionKeyToHash(operation, partitionProp);
const hashed = isV2 ? hashV2PartitionKey(toHashKey) : hashV1PartitionKey(toHashKey);
const batchForKey = batches.find((batch: Batch) => {
return isKeyInRange(batch.min, batch.max, hashed);
});
batchForKey.operations.push(operation);
batchForKey.indexes.push(index);
});

this.groupOperationsBasedOnPartitionKey(operations, partitionDefinition, options, batches);

const path = getPathFromLink(this.container.url, ResourceType.item);

Expand Down Expand Up @@ -460,7 +449,7 @@ export class Items {
// partition key types as well since we don't support them, so for now we throw
if (err.code === 410) {
throw new Error(
"Partition key error. Either the partitions have split or an operation has an unsupported partitionKey type"
"Partition key error. Either the partitions have split or an operation has an unsupported partitionKey type" + err.message
);
}
throw new Error(`Bulk request errored with: ${err.message}`);
Expand All @@ -470,6 +459,26 @@ export class Items {
return orderedResponses;
}

/**
* Function to create batches based of partition key Ranges.
* @param operations - operations to group
* @param partitionDefinition - PartitionKey definition of container.
* @param options - Request options for bulk request.
* @param batches - Groups to be filled with operations.
*/
private groupOperationsBasedOnPartitionKey(operations: OperationInput[], partitionDefinition: PartitionKeyDefinition, options: RequestOptions | undefined, batches: Batch[]) {
operations
.forEach((operationInput, index: number) => {
const { operation, partitionKey } = prepareOperations(operationInput, partitionDefinition, options);
const hashed = hashPartitionKey(assertNotUndefined(partitionKey, "undefined value for PartitionKey not expected during grouping of bulk operations."), partitionDefinition);
const batchForKey = assertNotUndefined(batches.find((batch: Batch) => {
return isKeyInRange(batch.min, batch.max, hashed);
}), "No suitable Batch found.");
batchForKey.operations.push(operation);
batchForKey.indexes.push(index);
});
}

/**
* Execute transactional batch operations on items.
*
Expand Down Expand Up @@ -499,7 +508,7 @@ export class Items {
*/
public async batch(
operations: OperationInput[],
partitionKey: string = "[{}]",
partitionKey?: PartitionKey,
options?: RequestOptions
): Promise<Response<OperationResponse[]>> {
operations.map((operation) => decorateBatchOperation(operation, options));
Expand Down
Loading