diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index ce1c9a43c2..74b179b198 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -115,6 +115,10 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private readonly bool IsLocalQuorumConsistency = false; private readonly bool isReplicaAddressValidationEnabled; + // Thin Client + private readonly bool isLiteClientEnabled; + private readonly string liteClientEndpoint; + //Fault Injection private readonly IChaosInterceptorFactory chaosInterceptorFactory; private readonly IChaosInterceptor chaosInterceptor; @@ -163,6 +167,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private IStoreClientFactory storeClientFactory; internal CosmosHttpClient httpClient { get; private set; } + internal CosmosHttpClient liteModeHttpClient { get; private set; } + // Flag that indicates whether store client factory must be disposed whenever client is disposed. // Setting this flag to false will result in store client factory not being disposed when client is disposed. // This flag is used to allow shared store client factory survive disposition of a document client while other clients continue using it. @@ -194,6 +200,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private event EventHandler sendingRequest; private event EventHandler receivedResponse; private Func transportClientHandlerFactory; + private string liteClientTestEndpoint = "https://cdb-ms-stage-eastus2-fe2.eastus2.cloudapp.azure.com:10650"; /// /// Initializes a new instance of the class using the @@ -241,6 +248,12 @@ public DocumentClient(Uri serviceEndpoint, this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel); this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); this.isReplicaAddressValidationEnabled = ConfigurationManager.IsReplicaAddressValidationEnabled(connectionPolicy); + this.isLiteClientEnabled = ConfigurationManager.IsLiteClientEnabled(defaultValue: true); + + if (this.isLiteClientEnabled) + { + this.liteClientEndpoint = ConfigurationManager.GetLiteClientEndpoint(defaultValue: this.liteClientTestEndpoint); + } } /// @@ -493,6 +506,12 @@ internal DocumentClient(Uri serviceEndpoint, this.initTaskCache = new AsyncCacheNonBlocking(cancellationToken: this.cancellationTokenSource.Token); this.chaosInterceptorFactory = chaosInterceptorFactory; this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this); + this.isLiteClientEnabled = ConfigurationManager.IsLiteClientEnabled(defaultValue: true); + + if (this.isLiteClientEnabled) + { + this.liteClientEndpoint = ConfigurationManager.GetLiteClientEndpoint(defaultValue: this.liteClientTestEndpoint); + } this.Initialize( serviceEndpoint: serviceEndpoint, @@ -504,7 +523,8 @@ internal DocumentClient(Uri serviceEndpoint, storeClientFactory: storeClientFactory, cosmosClientId: cosmosClientId, remoteCertificateValidationCallback: remoteCertificateValidationCallback, - cosmosClientTelemetryOptions: cosmosClientTelemetryOptions); + cosmosClientTelemetryOptions: cosmosClientTelemetryOptions, + enableLiteClientMode: this.isLiteClientEnabled); } /// @@ -688,7 +708,8 @@ internal virtual void Initialize(Uri serviceEndpoint, TokenCredential tokenCredential = null, string cosmosClientId = null, RemoteCertificateValidationCallback remoteCertificateValidationCallback = null, - CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null) + CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null, + bool enableLiteClientMode = false) { if (serviceEndpoint == null) { @@ -947,6 +968,17 @@ internal virtual void Initialize(Uri serviceEndpoint, this.sendingRequest, this.receivedResponse); + if (enableLiteClientMode) + { + this.liteModeHttpClient = CosmosHttpClientCore.CreateWithConnectionPolicy( + this.ApiType, + DocumentClientEventSource.Instance, + this.ConnectionPolicy, + null, + this.sendingRequest, + this.receivedResponse); + } + // Loading VM Information (non blocking call and initialization won't fail if this call fails) VmMetadataApiHandler.TryInitialize(this.httpClient); @@ -1052,6 +1084,23 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeCli { this.StoreModel = this.GatewayStoreModel; } + // Change it to this.ConnectionPolicy.ConnectionMode == ConnectionMode.LiteClient when LiteClient is supported. + else if (this.isLiteClientEnabled) + { + ThinClientStoreModel thinClientStoreModel = new ( + endpointManager: this.GlobalEndpointManager, + this.sessionContainer, + (Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel, + this.eventSource, + this.serializerSettings, + this.liteModeHttpClient, + new Uri(this.liteClientEndpoint), + this.accountServiceConfiguration.AccountProperties.Id); + + thinClientStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache); + + this.StoreModel = thinClientStoreModel; + } else { this.InitializeDirectConnectivity(storeClientFactory); @@ -6528,6 +6577,13 @@ internal IStoreModel GetStoreProxy(DocumentServiceRequest request) return this.GatewayStoreModel; } + if (this.isLiteClientEnabled + && operationType == OperationType.Read + && resourceType == ResourceType.Database) + { + return this.GatewayStoreModel; + } + if (operationType == OperationType.Create || operationType == OperationType.Upsert) { diff --git a/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs b/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs index 0e7cc30c9e..3b8389fcbc 100644 --- a/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs +++ b/Microsoft.Azure.Cosmos/src/HttpClient/CosmosHttpClientCore.cs @@ -441,7 +441,7 @@ private static bool IsOutOfRetries( return !timeoutEnumerator.MoveNext(); // No more retries are configured } - private async Task ExecuteHttpHelperAsync( + public async Task ExecuteHttpHelperAsync( HttpRequestMessage requestMessage, ResourceType resourceType, CancellationToken cancellationToken) diff --git a/Microsoft.Azure.Cosmos/src/ProxyStoreClient.cs b/Microsoft.Azure.Cosmos/src/ProxyStoreClient.cs new file mode 100644 index 0000000000..998748d933 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ProxyStoreClient.cs @@ -0,0 +1,433 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using System.Net.Http; + using System.Net.Http.Headers; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Tracing.TraceData; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + using Newtonsoft.Json; + using static Microsoft.Azure.Cosmos.ThinClientTransportSerializer; + + internal class ProxyStoreClient : TransportClient + { + private readonly ICommunicationEventSource eventSource; + private readonly CosmosHttpClient httpClient; + private readonly Uri proxyEndpoint; + private readonly JsonSerializerSettings SerializerSettings; + private static readonly HttpMethod httpPatchMethod = new HttpMethod(HttpConstants.HttpMethods.Patch); + private readonly ObjectPool bufferProviderWrapperPool; + private readonly string globalDatabaseAccountName; + + public ProxyStoreClient( + CosmosHttpClient httpClient, + ICommunicationEventSource eventSource, + Uri proxyEndpoint, + string globalDatabaseAccountName, + JsonSerializerSettings serializerSettings = null) + { + this.proxyEndpoint = proxyEndpoint; + this.httpClient = httpClient; + this.SerializerSettings = serializerSettings; + this.eventSource = eventSource; + this.globalDatabaseAccountName = globalDatabaseAccountName; + this.bufferProviderWrapperPool = new ObjectPool(() => new BufferProviderWrapper()); + } + + public async Task InvokeAsync( + DocumentServiceRequest request, + ResourceType resourceType, + Uri physicalAddress, + CancellationToken cancellationToken) + { + using (HttpResponseMessage responseMessage = await this.InvokeClientAsync(request, resourceType, physicalAddress, cancellationToken)) + { + HttpResponseMessage proxyResponse = await ThinClientTransportSerializer.ConvertProxyResponseAsync(responseMessage); + return await ProxyStoreClient.ParseResponseAsync(proxyResponse, request.SerializerSettings ?? this.SerializerSettings, request); + } + } + + public static bool IsFeedRequest(OperationType requestOperationType) + { + return requestOperationType == OperationType.Create || + requestOperationType == OperationType.Upsert || + requestOperationType == OperationType.ReadFeed || + requestOperationType == OperationType.Query || + requestOperationType == OperationType.SqlQuery || + requestOperationType == OperationType.QueryPlan || + requestOperationType == OperationType.Batch; + } + + internal override async Task InvokeStoreAsync(Uri baseAddress, ResourceOperation resourceOperation, DocumentServiceRequest request) + { + Uri physicalAddress = ProxyStoreClient.IsFeedRequest(request.OperationType) ? + HttpTransportClient.GetResourceFeedUri(resourceOperation.resourceType, baseAddress, request) : + HttpTransportClient.GetResourceEntryUri(resourceOperation.resourceType, baseAddress, request); + + using (HttpResponseMessage responseMessage = await this.InvokeClientAsync(request, resourceOperation.resourceType, physicalAddress, default)) + { + return await HttpTransportClient.ProcessHttpResponse(request.ResourceAddress, string.Empty, responseMessage, physicalAddress, request); + } + } + + [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope", Justification = "Disposable object returned by method")] + internal Task SendHttpAsync( + Func> requestMessage, + ResourceType resourceType, + HttpTimeoutPolicy timeoutPolicy, + IClientSideRequestStatistics clientSideRequestStatistics, + CancellationToken cancellationToken = default) + { + return this.httpClient.SendHttpAsync( + createRequestMessageAsync: requestMessage, + resourceType: resourceType, + timeoutPolicy: timeoutPolicy, + clientSideRequestStatistics: clientSideRequestStatistics, + cancellationToken: cancellationToken); + } + + internal static async Task ParseResponseAsync(HttpResponseMessage responseMessage, JsonSerializerSettings serializerSettings = null, DocumentServiceRequest request = null) + { + using (responseMessage) + { + IClientSideRequestStatistics requestStatistics = request?.RequestContext?.ClientRequestStatistics; + if ((int)responseMessage.StatusCode < 400) + { + INameValueCollection headers = ProxyStoreClient.ExtractResponseHeaders(responseMessage); + Stream contentStream = await ProxyStoreClient.BufferContentIfAvailableAsync(responseMessage); + return new DocumentServiceResponse( + body: contentStream, + headers: headers, + statusCode: responseMessage.StatusCode, + clientSideRequestStatistics: requestStatistics, + serializerSettings: serializerSettings); + } + else if (request != null + && request.IsValidStatusCodeForExceptionlessRetry((int)responseMessage.StatusCode)) + { + INameValueCollection headers = ProxyStoreClient.ExtractResponseHeaders(responseMessage); + Stream contentStream = await ProxyStoreClient.BufferContentIfAvailableAsync(responseMessage); + return new DocumentServiceResponse( + body: contentStream, + headers: headers, + statusCode: responseMessage.StatusCode, + clientSideRequestStatistics: requestStatistics, + serializerSettings: serializerSettings); + } + else + { + throw await ProxyStoreClient.CreateDocumentClientExceptionAsync(responseMessage, requestStatistics); + } + } + } + + internal static INameValueCollection ExtractResponseHeaders(HttpResponseMessage responseMessage) + { + INameValueCollection headers = new HttpResponseHeadersWrapper( + responseMessage.Headers, + responseMessage.Content?.Headers); + + return headers; + } + + /// + /// Creating a new DocumentClientException using the Gateway response message. + /// + /// + /// + internal static async Task CreateDocumentClientExceptionAsync( + HttpResponseMessage responseMessage, + IClientSideRequestStatistics requestStatistics) + { + if (!PathsHelper.TryParsePathSegments( + resourceUrl: responseMessage.RequestMessage.RequestUri.LocalPath, + isFeed: out _, + resourcePath: out _, + resourceIdOrFullName: out string resourceIdOrFullName, + isNameBased: out _)) + { + // if resourceLink is invalid - we will not set resourceAddress in exception. + } + + // If service rejects the initial payload like header is to large it will return an HTML error instead of JSON. + if (string.Equals(responseMessage.Content?.Headers?.ContentType?.MediaType, "application/json", StringComparison.OrdinalIgnoreCase) && + responseMessage.Content?.Headers.ContentLength > 0) + { + try + { + Stream contentAsStream = await responseMessage.Content.ReadAsStreamAsync(); + Error error = JsonSerializable.LoadFrom(stream: contentAsStream); + + return new DocumentClientException( + errorResource: error, + responseHeaders: responseMessage.Headers, + statusCode: responseMessage.StatusCode) + { + StatusDescription = responseMessage.ReasonPhrase, + ResourceAddress = resourceIdOrFullName, + RequestStatistics = requestStatistics + }; + } + catch + { + } + } + + StringBuilder contextBuilder = new StringBuilder(); + contextBuilder.AppendLine(await responseMessage.Content.ReadAsStringAsync()); + + HttpRequestMessage requestMessage = responseMessage.RequestMessage; + + if (requestMessage != null) + { + contextBuilder.AppendLine($"RequestUri: {requestMessage.RequestUri};"); + contextBuilder.AppendLine($"RequestMethod: {requestMessage.Method.Method};"); + + if (requestMessage.Headers != null) + { + foreach (KeyValuePair> header in requestMessage.Headers) + { + contextBuilder.AppendLine($"Header: {header.Key} Length: {string.Join(",", header.Value).Length};"); + } + } + } + + return new DocumentClientException( + message: contextBuilder.ToString(), + innerException: null, + responseHeaders: responseMessage.Headers, + statusCode: responseMessage.StatusCode, + requestUri: responseMessage.RequestMessage.RequestUri) + { + StatusDescription = responseMessage.ReasonPhrase, + ResourceAddress = resourceIdOrFullName, + RequestStatistics = requestStatistics + }; + } + + internal static bool IsAllowedRequestHeader(string headerName) + { + if (!headerName.StartsWith("x-ms", StringComparison.OrdinalIgnoreCase)) + { +#pragma warning disable IDE0066 // Convert switch statement to expression + switch (headerName) + { + //Just flow the header which are settable at RequestMessage level and the one we care. + case HttpConstants.HttpHeaders.Authorization: + case HttpConstants.HttpHeaders.Accept: + case HttpConstants.HttpHeaders.ContentType: + case HttpConstants.HttpHeaders.Host: + case HttpConstants.HttpHeaders.IfMatch: + case HttpConstants.HttpHeaders.IfModifiedSince: + case HttpConstants.HttpHeaders.IfNoneMatch: + case HttpConstants.HttpHeaders.IfRange: + case HttpConstants.HttpHeaders.IfUnmodifiedSince: + case HttpConstants.HttpHeaders.UserAgent: + case HttpConstants.HttpHeaders.Prefer: + case HttpConstants.HttpHeaders.Query: + case HttpConstants.HttpHeaders.A_IM: + return true; + + default: + return false; + } +#pragma warning restore IDE0066 // Convert switch statement to expression + } + return true; + } + + private static async Task BufferContentIfAvailableAsync(HttpResponseMessage responseMessage) + { + if (responseMessage.Content == null) + { + return null; + } + + MemoryStream bufferedStream = new MemoryStream(); + await responseMessage.Content.CopyToAsync(bufferedStream); + bufferedStream.Position = 0; + return bufferedStream; + } + + [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope", Justification = "Disposable object returned by method")] + private async ValueTask PrepareRequestMessageAsync( + DocumentServiceRequest request, + Uri physicalAddress) + { + HttpMethod httpMethod = HttpMethod.Head; + if (request.OperationType == OperationType.Create || + request.OperationType == OperationType.Upsert || + request.OperationType == OperationType.Query || + request.OperationType == OperationType.SqlQuery || + request.OperationType == OperationType.Batch || + request.OperationType == OperationType.ExecuteJavaScript || + request.OperationType == OperationType.QueryPlan || + (request.ResourceType == ResourceType.PartitionKey && request.OperationType == OperationType.Delete)) + { + httpMethod = HttpMethod.Post; + } + else if (ChangeFeedHelper.IsChangeFeedWithQueryRequest(request.OperationType, request.Body != null)) + { + // ChangeFeed with payload is a CF with query support and will + // be a query POST request. + httpMethod = HttpMethod.Post; + } + else if (request.OperationType == OperationType.Read + || request.OperationType == OperationType.ReadFeed) + { + httpMethod = HttpMethod.Get; + } + else if ((request.OperationType == OperationType.Replace) + || (request.OperationType == OperationType.CollectionTruncate)) + { + httpMethod = HttpMethod.Put; + } + else if (request.OperationType == OperationType.Delete) + { + httpMethod = HttpMethod.Delete; + } + else if (request.OperationType == OperationType.Patch) + { + // There isn't support for PATCH method in .NetStandard 2.0 + httpMethod = httpPatchMethod; + } + else + { + throw new NotImplementedException(); + } + + HttpRequestMessage requestMessage = new (httpMethod, physicalAddress) + { + Version = new Version(2, 0), + }; + + // The StreamContent created below will own and dispose its underlying stream, but we may need to reuse the stream on the + // DocumentServiceRequest for future requests. Hence we need to clone without incurring copy cost, so that when + // HttpRequestMessage -> StreamContent -> MemoryStream all get disposed, the original stream will be left open. + if (request.Body != null) + { + await request.EnsureBufferedBodyAsync(); + MemoryStream clonedStream = new MemoryStream(); + // WriteTo doesn't use and update Position of source stream. No point in setting/restoring it. + request.CloneableBody.WriteTo(clonedStream); + clonedStream.Position = 0; + + requestMessage.Content = new StreamContent(clonedStream); + } + + if (request.Headers != null) + { + foreach (string key in request.Headers) + { + if (GatewayStoreClient.IsAllowedRequestHeader(key)) + { + if (key.Equals(HttpConstants.HttpHeaders.ContentType, StringComparison.OrdinalIgnoreCase)) + { + requestMessage.Content.Headers.ContentType = new MediaTypeHeaderValue(request.Headers[key]); + } + else + { + requestMessage.Headers.TryAddWithoutValidation(key, request.Headers[key]); + } + } + } + } + + if (request.Properties != null) + { + foreach (KeyValuePair property in request.Properties) + { + requestMessage.Properties.Add(property); + } + } + + // add activityId + Guid activityId = System.Diagnostics.Trace.CorrelationManager.ActivityId; + Debug.Assert(activityId != Guid.Empty); + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ActivityId, activityId.ToString()); + + string regionName = request?.RequestContext?.RegionName; + if (regionName != null) + { + requestMessage.Properties.Add(ClientSideRequestStatisticsTraceDatum.HttpRequestRegionNameProperty, regionName); + } + + BufferProviderWrapper bufferProviderWrapper = this.bufferProviderWrapperPool.Get(); + try + { + requestMessage.Headers.TryAddWithoutValidation("x-ms-thinclient-proxy-operation-type", request.OperationType.ToOperationTypeString()); + requestMessage.Headers.TryAddWithoutValidation("x-ms-thinclient-proxy-resource-type", request.ResourceType.ToResourceTypeString()); + Stream contentStream = await ThinClientTransportSerializer.SerializeProxyRequestAsync(bufferProviderWrapper, this.globalDatabaseAccountName, requestMessage); + + // force Http2, post and route to the thin client endpoint. + requestMessage.Content = new StreamContent(contentStream); + requestMessage.Content.Headers.ContentLength = contentStream.Length; + requestMessage.Headers.Clear(); + + // Force Http 2.0 on the request + // this.forceHttp20Action.Invoke(request); + + requestMessage.RequestUri = this.proxyEndpoint; + requestMessage.Method = HttpMethod.Post; + + return requestMessage; + } + finally + { + this.bufferProviderWrapperPool.Return(bufferProviderWrapper); + } + } + + [SuppressMessage("Microsoft.Reliability", "CA2000:DisposeObjectsBeforeLosingScope", Justification = "Disposable object returned by method")] + private Task InvokeClientAsync( + DocumentServiceRequest request, + ResourceType resourceType, + Uri physicalAddress, + CancellationToken cancellationToken) + { + DefaultTrace.TraceInformation("In {0}, OperationType: {1}, ResourceType: {2}", nameof(ProxyStoreClient), request.OperationType, request.ResourceType); + + return this.httpClient.SendHttpAsync( + () => this.PrepareRequestMessageAsync(request, physicalAddress), + resourceType, + HttpTimeoutPolicy.GetTimeoutPolicy(request), + request.RequestContext.ClientRequestStatistics, + cancellationToken); + } + + public class ObjectPool + { + private readonly ConcurrentBag Objects; + private readonly Func ObjectGenerator; + + public ObjectPool(Func objectGenerator) + { + this.ObjectGenerator = objectGenerator ?? throw new ArgumentNullException(nameof(objectGenerator)); + this.Objects = new ConcurrentBag(); + } + + public T Get() + { + return this.Objects.TryTake(out T item) ? item : this.ObjectGenerator(); + } + + public void Return(T item) + { + this.Objects.Add(item); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/ThinClientHttpMessageHandler.cs b/Microsoft.Azure.Cosmos/src/ThinClientHttpMessageHandler.cs new file mode 100644 index 0000000000..6881c4cc01 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ThinClientHttpMessageHandler.cs @@ -0,0 +1,95 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +/*namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Net.Http.Headers; + using System.Net.Security; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Routing; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + using Newtonsoft.Json; + + /// + /// blabla + /// + internal class ThinClientHttpMessageHandler : DelegatingHandler + { + private readonly string accountName; + private readonly Uri gatewayEndpoint; + private readonly Uri thinClientEndpoint; + private readonly BufferProviderWrapper bufferProvider; + private readonly string customUserAgent; + + public ThinClientHttpMessageHandler(Uri gatewayEndpoint, Uri thinClientEndpoint, string accountName) + : this(gatewayEndpoint, thinClientEndpoint, accountName, serverValidator: null) + { + } + + public ThinClientHttpMessageHandler(Uri gatewayEndpoint, Uri thinClientEndpoint, string accountName, RemoteCertificateValidationCallback serverValidator, string customUserAgent = "") + { + this.gatewayEndpoint = gatewayEndpoint; + this.thinClientEndpoint = thinClientEndpoint; + this.accountName = accountName ?? throw new ArgumentNullException(nameof(accountName)); + this.customUserAgent = customUserAgent; + this.bufferProvider = new(); + SocketsHttpHandler clientHandler = new SocketsHttpHandler(); + clientHandler.EnableMultipleHttp2Connections = true; + if (serverValidator != null) + { + clientHandler.SslOptions.RemoteCertificateValidationCallback = serverValidator; + } + + this.InnerHandler = clientHandler; + } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + if (request.Headers.Contains(ProxyExtensions.RoutedViaProxy)) + { + return this.SendViaProxyAsync(request, cancellationToken); + } + else + { + request.RequestUri = new Uri(this.gatewayEndpoint, request.RequestUri.PathAndQuery); + return base.SendAsync(request, cancellationToken); + } + } + + private async Task SendViaProxyAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + Stream contentStream = await ThinClientTransportSerializer.SerializeProxyRequestAsync(this.bufferProvider, this.accountName, request); + + // force Http2, post and route to the thin client endpoint. + request.Content = new StreamContent(contentStream); + request.Content.Headers.ContentLength = contentStream.Length; + request.Headers.Clear(); + + request.Version = new Version("2.0"); + request.VersionPolicy = HttpVersionPolicy.RequestVersionExact; + request.RequestUri = this.thinClientEndpoint; + request.Method = HttpMethod.Post; + + if (!string.IsNullOrEmpty(this.customUserAgent)) + { + request.Headers.UserAgent.ParseAdd(this.customUserAgent); + } + + using HttpResponseMessage responseMessage = await base.SendAsync(request, cancellationToken); + return await ProxyExtensions.ConvertProxyResponseAsync(responseMessage); + } + } +}*/ diff --git a/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs b/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs new file mode 100644 index 0000000000..4a086c043b --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs @@ -0,0 +1,426 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Net.Http.Headers; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Routing; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + using Newtonsoft.Json; + + // Marking it as non-sealed in order to unit test it using Moq framework + internal class ThinClientStoreModel : IStoreModelExtension, IDisposable + { + private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString(); + + private readonly GlobalEndpointManager endpointManager; + private readonly DocumentClientEventSource eventSource; + private readonly ISessionContainer sessionContainer; + private readonly ConsistencyLevel defaultConsistencyLevel; + + private ProxyStoreClient proxyStoreClient; + + // Caches to resolve the PartitionKeyRange from request. For Session Token Optimization. + private ClientCollectionCache clientCollectionCache; + private PartitionKeyRangeCache partitionKeyRangeCache; + + public ThinClientStoreModel( + GlobalEndpointManager endpointManager, + ISessionContainer sessionContainer, + ConsistencyLevel defaultConsistencyLevel, + DocumentClientEventSource eventSource, + JsonSerializerSettings serializerSettings, + CosmosHttpClient httpClient, + Uri proxyEndpoint, + string globalDatabaseAccountName) + { + this.endpointManager = endpointManager; + this.sessionContainer = sessionContainer; + this.defaultConsistencyLevel = defaultConsistencyLevel; + this.eventSource = eventSource; + this.proxyStoreClient = new ProxyStoreClient( + httpClient, + this.eventSource, + proxyEndpoint, + globalDatabaseAccountName, + serializerSettings); + } + + public virtual async Task ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default) + { + DefaultTrace.TraceInformation("In {0}, OperationType: {1}, ResourceType: {2}", nameof(ThinClientStoreModel), request.OperationType, request.ResourceType); + + await ThinClientStoreModel.ApplySessionTokenAsync( + request, + this.defaultConsistencyLevel, + this.sessionContainer, + this.partitionKeyRangeCache, + this.clientCollectionCache, + this.endpointManager); + + DocumentServiceResponse response; + try + { + Uri physicalAddress = ProxyStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request); + // Collect region name only for document resources + if (request.ResourceType.Equals(ResourceType.Document) && this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName)) + { + request.RequestContext.RegionName = regionName; + } + response = await this.proxyStoreClient.InvokeAsync(request, request.ResourceType, physicalAddress, cancellationToken); + } + catch (DocumentClientException exception) + { + if ((!ReplicatedResourceClient.IsMasterResource(request.ResourceType)) && + (exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict + || (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable))) + { + await this.CaptureSessionTokenAndHandleSplitAsync(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers); + } + + throw; + } + + await this.CaptureSessionTokenAndHandleSplitAsync(response.StatusCode, response.SubStatusCode, request, response.Headers); + return response; + } + + public void SetCaches(PartitionKeyRangeCache partitionKeyRangeCache, + ClientCollectionCache clientCollectionCache) + { + this.clientCollectionCache = clientCollectionCache; + this.partitionKeyRangeCache = partitionKeyRangeCache; + } + + public void Dispose() + { + this.Dispose(true); + } + + private async Task CaptureSessionTokenAndHandleSplitAsync( + HttpStatusCode? statusCode, + SubStatusCodes subStatusCode, + DocumentServiceRequest request, + INameValueCollection responseHeaders) + { + // Exceptionless can try to capture session token from CompleteResponse + if (request.IsValidStatusCodeForExceptionlessRetry((int)statusCode, subStatusCode)) + { + // Not capturing on master resources + if (ReplicatedResourceClient.IsMasterResource(request.ResourceType)) + { + return; + } + + // Only capturing on 409, 412, 404 && !1002 + if (statusCode != HttpStatusCode.PreconditionFailed + && statusCode != HttpStatusCode.Conflict + && (statusCode != HttpStatusCode.NotFound || subStatusCode == SubStatusCodes.ReadSessionNotAvailable)) + { + return; + } + } + + if (request.ResourceType == ResourceType.Collection && request.OperationType == OperationType.Delete) + { + string resourceId; + + if (request.IsNameBased) + { + resourceId = responseHeaders[HttpConstants.HttpHeaders.OwnerId]; + } + else + { + resourceId = request.ResourceId; + } + + this.sessionContainer.ClearTokenByResourceId(resourceId); + } + else + { + this.sessionContainer.SetSessionToken(request, responseHeaders); + PartitionKeyRange detectedPartitionKeyRange = request.RequestContext.ResolvedPartitionKeyRange; + string partitionKeyRangeInResponse = responseHeaders[HttpConstants.HttpHeaders.PartitionKeyRangeId]; + if (detectedPartitionKeyRange != null + && !string.IsNullOrEmpty(partitionKeyRangeInResponse) + && !string.IsNullOrEmpty(request.RequestContext.ResolvedCollectionRid) + && !partitionKeyRangeInResponse.Equals(detectedPartitionKeyRange.Id, StringComparison.OrdinalIgnoreCase)) + { + // The request ended up being on a different partition unknown to the client, so we better refresh the caches + await this.partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync( + request.RequestContext.ResolvedCollectionRid, + partitionKeyRangeInResponse, + NoOpTrace.Singleton, + forceRefresh: true); + } + } + } + + internal static async Task ApplySessionTokenAsync( + DocumentServiceRequest request, + ConsistencyLevel defaultConsistencyLevel, + ISessionContainer sessionContainer, + PartitionKeyRangeCache partitionKeyRangeCache, + CollectionCache clientCollectionCache, + IGlobalEndpointManager globalEndpointManager) + { + if (request.Headers == null) + { + Debug.Fail("DocumentServiceRequest does not have headers."); + return; + } + + // Master resource operations don't require session token. + if (ThinClientStoreModel.IsMasterOperation(request.ResourceType, request.OperationType)) + { + if (!string.IsNullOrEmpty(request.Headers[HttpConstants.HttpHeaders.SessionToken])) + { + request.Headers.Remove(HttpConstants.HttpHeaders.SessionToken); + } + + return; + } + + if (!string.IsNullOrEmpty(request.Headers[HttpConstants.HttpHeaders.SessionToken])) + { + return; // User is explicitly controlling the session. + } + + string requestConsistencyLevel = request.Headers[HttpConstants.HttpHeaders.ConsistencyLevel]; + bool isReadOrBatchRequest = request.IsReadOnlyRequest || request.OperationType == OperationType.Batch; + bool requestHasConsistencySet = !string.IsNullOrEmpty(requestConsistencyLevel) && isReadOrBatchRequest; // Only read requests can have their consistency modified + + bool sessionConsistencyApplies = + (!requestHasConsistencySet && defaultConsistencyLevel == ConsistencyLevel.Session) || + (requestHasConsistencySet + && string.Equals(requestConsistencyLevel, ThinClientStoreModel.sessionConsistencyAsString, StringComparison.OrdinalIgnoreCase)); + + bool isMultiMasterEnabledForRequest = globalEndpointManager.CanUseMultipleWriteLocations(request); + + if (!sessionConsistencyApplies + || (!isReadOrBatchRequest + && !isMultiMasterEnabledForRequest)) + { + return; // Only apply the session token in case of session consistency and the request is read only or read/write on multimaster + } + + (bool isSuccess, string sessionToken) = await ThinClientStoreModel.TryResolveSessionTokenAsync( + request, + sessionContainer, + partitionKeyRangeCache, + clientCollectionCache); + + if (isSuccess && !string.IsNullOrEmpty(sessionToken)) + { + request.Headers[HttpConstants.HttpHeaders.SessionToken] = sessionToken; + } + } + + internal static async Task> TryResolveSessionTokenAsync( + DocumentServiceRequest request, + ISessionContainer sessionContainer, + PartitionKeyRangeCache partitionKeyRangeCache, + CollectionCache clientCollectionCache) + { + if (request == null) + { + throw new ArgumentNullException(nameof(request)); + } + + if (sessionContainer == null) + { + throw new ArgumentNullException(nameof(sessionContainer)); + } + + if (partitionKeyRangeCache == null) + { + throw new ArgumentNullException(nameof(partitionKeyRangeCache)); + } + + if (clientCollectionCache == null) + { + throw new ArgumentNullException(nameof(clientCollectionCache)); + } + + if (request.ResourceType.IsPartitioned()) + { + (bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync( + request: request, + sessionContainer: sessionContainer, + partitionKeyRangeCache: partitionKeyRangeCache, + clientCollectionCache: clientCollectionCache, + refreshCache: false); + + if (isSuccess && sessionContainer is SessionContainer gatewaySessionContainer) + { + request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange; + string localSessionToken = gatewaySessionContainer.ResolvePartitionLocalSessionTokenForGateway(request, partitionKeyRange.Id); + if (!string.IsNullOrEmpty(localSessionToken)) + { + return new Tuple(true, localSessionToken); + } + } + } + + return new Tuple(false, null); + } + + private static async Task> TryResolvePartitionKeyRangeAsync( + DocumentServiceRequest request, + ISessionContainer sessionContainer, + PartitionKeyRangeCache partitionKeyRangeCache, + CollectionCache clientCollectionCache, + bool refreshCache) + { + if (refreshCache) + { + request.ForceMasterRefresh = true; + request.ForceNameCacheRefresh = true; + } + + PartitionKeyRange partitonKeyRange = null; + ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync( + request, + CancellationToken.None, + NoOpTrace.Singleton); + + string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey]; + if (partitionKeyString != null) + { + CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync( + collectionRid: collection.ResourceId, + previousValue: null, + request: request, + NoOpTrace.Singleton); + + if (refreshCache && collectionRoutingMap != null) + { + collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync( + collectionRid: collection.ResourceId, + previousValue: collectionRoutingMap, + request: request, + NoOpTrace.Singleton); + } + + if (collectionRoutingMap != null) + { + partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey( + request: request, + partitionKeyString: partitionKeyString, + collectionCacheUptoDate: false, + collection: collection, + routingMap: collectionRoutingMap); + } + } + else if (request.PartitionKeyRangeIdentity != null) + { + PartitionKeyRangeIdentity partitionKeyRangeId = request.PartitionKeyRangeIdentity; + partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync( + collection.ResourceId, + partitionKeyRangeId.PartitionKeyRangeId, + NoOpTrace.Singleton, + refreshCache); + } + else if (request.RequestContext.ResolvedPartitionKeyRange != null) + { + partitonKeyRange = request.RequestContext.ResolvedPartitionKeyRange; + } + + if (partitonKeyRange == null) + { + if (refreshCache) + { + return new Tuple(false, null); + } + + // need to refresh cache. Maybe split happened. + return await ThinClientStoreModel.TryResolvePartitionKeyRangeAsync( + request: request, + sessionContainer: sessionContainer, + partitionKeyRangeCache: partitionKeyRangeCache, + clientCollectionCache: clientCollectionCache, + refreshCache: true); + } + + return new Tuple(true, partitonKeyRange); + } + + // DEVNOTE: This can be replace with ReplicatedResourceClient.IsMasterOperation on next Direct sync + internal static bool IsMasterOperation( + ResourceType resourceType, + OperationType operationType) + { + // Stored procedures, trigger, and user defined functions CRUD operations are done on + // master so they do not require the session token. Stored procedures execute is not a master operation + return ReplicatedResourceClient.IsMasterResource(resourceType) || + ThinClientStoreModel.IsStoredProcedureCrudOperation(resourceType, operationType) || + resourceType == ResourceType.Trigger || + resourceType == ResourceType.UserDefinedFunction || + operationType == OperationType.QueryPlan; + } + + internal static bool IsStoredProcedureCrudOperation( + ResourceType resourceType, + OperationType operationType) + { + return resourceType == ResourceType.StoredProcedure && + operationType != Documents.OperationType.ExecuteJavaScript; + } + + private void Dispose(bool disposing) + { + if (disposing) + { + if (this.proxyStoreClient != null) + { + try + { + this.proxyStoreClient.Dispose(); + } + catch (Exception exception) + { + DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client", + exception); + } + + this.proxyStoreClient = null; + } + } + } + + private Uri GetEntityUri(DocumentServiceRequest entity) + { + string contentLocation = entity.Headers[HttpConstants.HttpHeaders.ContentLocation]; + + if (!string.IsNullOrEmpty(contentLocation)) + { + return new Uri(this.endpointManager.ResolveServiceEndpoint(entity), new Uri(contentLocation).AbsolutePath); + } + + return new Uri(this.endpointManager.ResolveServiceEndpoint(entity), PathsHelper.GeneratePath(entity.ResourceType, entity, false)); + } + + private Uri GetFeedUri(DocumentServiceRequest request) + { + return new Uri(this.endpointManager.ResolveServiceEndpoint(request), PathsHelper.GeneratePath(request.ResourceType, request, true)); + } + + public Task OpenConnectionsToAllReplicasAsync(string databaseName, string containerLinkUri, CancellationToken cancellationToken = default) + { + return Task.CompletedTask; + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs new file mode 100644 index 0000000000..e354938347 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ThinClientTransportSerializer.cs @@ -0,0 +1,285 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Buffers; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Collections; + + /// + /// blabla + /// + internal static class ThinClientTransportSerializer + { + public const string RoutedViaProxy = "x-ms-thinclient-route-via-proxy"; + public const string ProxyStartEpk = "x-ms-thinclient-range-min"; + public const string ProxyEndEpk = "x-ms-thinclient-range-max"; + + public const string ProxyOperationType = "x-ms-thinclient-proxy-operation-type"; + public const string ProxyResourceType = "x-ms-thinclient-proxy-resource-type"; + + private static readonly PartitionKeyDefinition HashV2SinglePath; + + static ThinClientTransportSerializer() + { + HashV2SinglePath = new PartitionKeyDefinition + { + Kind = PartitionKind.Hash, + Version = Documents.PartitionKeyDefinitionVersion.V2, + }; + HashV2SinglePath.Paths.Add("/id"); + } + + /// + /// Wrapper to expose a public bufferprovider for the RNTBD stack. + /// +#pragma warning disable CA1034 // Nested types should not be visible + public sealed class BufferProviderWrapper +#pragma warning restore CA1034 // Nested types should not be visible + { + internal BufferProvider Provider { get; set; } = new (); + } + + /// + /// Serialize the Proxy request to the RNTBD protocol format. + /// Today this takes the HttprequestMessage and reconstructs the DSR. + /// If the SDK can push properties to the HttpRequestMessage then the handler above having + /// the DSR can allow us to push that directly to the serialization. + /// + public static async Task SerializeProxyRequestAsync( + BufferProviderWrapper bufferProvider, + string accountName, + HttpRequestMessage requestMessage) + { + // Skip this and use the original DSR. + OperationType operationType = (OperationType)Enum.Parse(typeof(OperationType), requestMessage.Headers.GetValues(ProxyOperationType).First()); + ResourceType resourceType = (ResourceType)Enum.Parse(typeof(ResourceType), requestMessage.Headers.GetValues(ProxyResourceType).First()); + + Guid activityId = Guid.Parse(requestMessage.Headers.GetValues(HttpConstants.HttpHeaders.ActivityId).First()); + + Stream requestStream = null; + if (requestMessage.Content != null) + { + requestStream = await requestMessage.Content.ReadAsStreamAsync(); + } + + RequestNameValueCollection dictionaryCollection = new RequestNameValueCollection(); + foreach (KeyValuePair> header in requestMessage.Headers) + { + dictionaryCollection.Set(header.Key, string.Join(",", header.Value)); + } + + using DocumentServiceRequest request = new (operationType, resourceType, requestMessage.RequestUri.PathAndQuery, + requestStream, AuthorizationTokenType.PrimaryMasterKey, + dictionaryCollection); + + if (operationType.IsPointOperation()) + { + string partitionKey = request.Headers.Get(HttpConstants.HttpHeaders.PartitionKey); + + if (string.IsNullOrEmpty(partitionKey)) + { + throw new InternalServerErrorException(); + } + + string epk = GetEffectivePartitionKeyHash(partitionKey); + + request.Properties = new Dictionary + { + { "x-ms-effective-partition-key", HexStringUtility.HexStringToBytes(epk) } + }; + } + else if (request.Headers[ProxyStartEpk] != null) + { + // Re-add EPK headers removed by RequestInvokerHandler through Properties + request.Properties = new Dictionary + { + { WFConstants.BackendHeaders.StartEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyStartEpk]) }, + { WFConstants.BackendHeaders.EndEpkHash, HexStringUtility.HexStringToBytes(request.Headers[ProxyEndEpk]) } + }; + + request.Headers.Add(HttpConstants.HttpHeaders.ReadFeedKeyType, RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString()); + request.Headers.Add(HttpConstants.HttpHeaders.StartEpk, request.Headers[ProxyStartEpk]); + request.Headers.Add(HttpConstants.HttpHeaders.EndEpk, request.Headers[ProxyEndEpk]); + } + + await request.EnsureBufferedBodyAsync(); + + using Documents.Rntbd.TransportSerialization.SerializedRequest serializedRequest = + Documents.Rntbd.TransportSerialization.BuildRequestForProxy(request, + new ResourceOperation(operationType, resourceType), + activityId, + bufferProvider.Provider, + accountName, + out _, + out _); + + // TODO: consider using the SerializedRequest directly. + MemoryStream memoryStream = new MemoryStream(serializedRequest.RequestSize); + await serializedRequest.CopyToStreamAsync(memoryStream); + memoryStream.Position = 0; + return memoryStream; + } + + public static string GetEffectivePartitionKeyHash(string partitionJson) + { + return Documents.PartitionKey.FromJsonString(partitionJson).InternalKey.GetEffectivePartitionKeyString(HashV2SinglePath); + } + + /// + /// Deserialize the Proxy Response from the RNTBD protocol format to the Http format needed by the caller. + /// Today this takes the HttpResponseMessage and reconstructs the modified Http response. + /// + public static async Task ConvertProxyResponseAsync(HttpResponseMessage responseMessage) + { + using Stream responseStream = await responseMessage.Content.ReadAsStreamAsync(); + + (StatusCodes status, byte[] metadata) = await ThinClientTransportSerializer.ReadHeaderAndMetadataAsync(responseStream); + + if (responseMessage.StatusCode != (HttpStatusCode)status) + { + throw new InternalServerErrorException("Status code mismatch"); + } + + Rntbd.BytesDeserializer bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); + if (!Documents.Rntbd.HeadersTransportSerialization.TryParseMandatoryResponseHeaders(ref bytesDeserializer, out bool payloadPresent, out _)) + { + throw new InternalServerErrorException("Length mismatch"); + } + + MemoryStream bodyStream = null; + if (payloadPresent) + { + int length = await ThinClientTransportSerializer.ReadBodyLengthAsync(responseStream); + bodyStream = new MemoryStream(length); + await responseStream.CopyToAsync(bodyStream); + bodyStream.Position = 0; + } + + // TODO(Perf): Clean this up. + bytesDeserializer = new Rntbd.BytesDeserializer(metadata, metadata.Length); + StoreResponse storeResponse = Documents.Rntbd.TransportSerialization.MakeStoreResponse( + status, + Guid.NewGuid(), + bodyStream, + HttpConstants.Versions.CurrentVersion, + ref bytesDeserializer); + + HttpResponseMessage response = new HttpResponseMessage((HttpStatusCode)storeResponse.StatusCode) + { + RequestMessage = responseMessage.RequestMessage + }; + + if (bodyStream != null) + { + response.Content = new StreamContent(bodyStream); + } + + foreach (string header in storeResponse.Headers.Keys()) + { + if (header == HttpConstants.HttpHeaders.SessionToken) + { + string newSessionToken = storeResponse.PartitionKeyRangeId + ":" + storeResponse.Headers.Get(header); + response.Headers.TryAddWithoutValidation(header, newSessionToken); + } + else + { + response.Headers.TryAddWithoutValidation(header, storeResponse.Headers.Get(header)); + } + } + + response.Headers.TryAddWithoutValidation(RoutedViaProxy, "1"); + return response; + } + + private static async Task<(StatusCodes, byte[] metadata)> ReadHeaderAndMetadataAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(24); + const int headerLength = 24; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = 0; + read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + uint totalLength = BitConverter.ToUInt32(header, 0); + StatusCodes status = (StatusCodes)BitConverter.ToUInt32(header, 4); + + if (totalLength < headerLength) + { + throw new InternalServerErrorException("Length mismatch"); + } + + int metadataLength = (int)totalLength - headerLength; + byte[] metadata = new byte[metadataLength]; + int responseMetadataRead = 0; + while (responseMetadataRead < metadataLength) + { + int read = 0; + read = await stream.ReadAsync(metadata, responseMetadataRead, metadataLength - responseMetadataRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + responseMetadataRead += read; + } + + return (status, metadata); + } + finally + { + ArrayPool.Shared.Return(header); + } + } + + private static async Task ReadBodyLengthAsync(Stream stream) + { + byte[] header = ArrayPool.Shared.Rent(4); + const int headerLength = 4; + try + { + int headerRead = 0; + while (headerRead < headerLength) + { + int read = 0; + read = await stream.ReadAsync(header, headerRead, headerLength - headerRead); + + if (read == 0) + { + throw new DocumentClientException("Unexpected read empty bytes", HttpStatusCode.Gone, SubStatusCodes.Unknown); + } + + headerRead += read; + } + + return BitConverter.ToInt32(header, 0); + } + finally + { + ArrayPool.Shared.Return(header); + } + + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs index bffb8b93f9..812b17807a 100644 --- a/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs +++ b/Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs @@ -35,7 +35,17 @@ internal static class ConfigurationManager /// /// Environment variable name to enable distributed query gateway mode. /// - internal static readonly string DistributedQueryGatewayModeEnabled = "AZURE_COSMOS_DISTRIBUTED_QUERY_GATEWAY_ENABLED"; + internal static readonly string DistributedQueryGatewayModeEnabled = "AZURE_COSMOS_DISTRIBUTED_QUERY_GATEWAY_ENABLED"; + + /// + /// Environment variable name to enable lite client mode. + /// + internal static readonly string LiteClientModeEnabled = "AZURE_COSMOS_LITE_CLIENT_ENABLED"; + + /// + /// Environment variable name to get lite client endpoint. + /// + internal static readonly string LiteClientEndpoint = "AZURE_COSMOS_LITE_CLIENT_ENDPOINT"; public static T GetEnvironmentVariable(string variable, T defaultValue) { @@ -124,6 +134,36 @@ public static bool IsDistributedQueryGatewayModeEnabled( .GetEnvironmentVariable( variable: DistributedQueryGatewayModeEnabled, defaultValue: defaultValue); + } + + /// + /// Gets the boolean value of the partition level failover environment variable. Note that, partition level failover + /// is disabled by default for both preview and GA releases. The user can set the respective environment variable + /// 'AZURE_COSMOS_PARTITION_LEVEL_FAILOVER_ENABLED' to override the value for both preview and GA. The method will + /// eventually be removed, once partition level failover is enabled by default for both preview and GA. + /// + /// A boolean field containing the default value for partition level failover. + /// A boolean flag indicating if partition level failover is enabled. + public static bool IsLiteClientEnabled( + bool defaultValue) + { + Console.WriteLine(defaultValue); + return true; + } + + /// + /// Gets the boolean value of the partition level failover environment variable. Note that, partition level failover + /// is disabled by default for both preview and GA releases. The user can set the respective environment variable + /// 'AZURE_COSMOS_PARTITION_LEVEL_FAILOVER_ENABLED' to override the value for both preview and GA. The method will + /// eventually be removed, once partition level failover is enabled by default for both preview and GA. + /// + /// A boolean field containing the default value for partition level failover. + /// A boolean flag indicating if partition level failover is enabled. + public static string GetLiteClientEndpoint( + string defaultValue) + { + Console.WriteLine(defaultValue); + return "https://cdb-ms-stage-eastus2-fe2-sql.eastus2.cloudapp.azure.com:10650"; } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs index 1813700e05..ab34544787 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs @@ -6,13 +6,16 @@ using System.Linq; using System.Net; using System.Net.Http; + using System.Net.Security; using System.Reflection; + using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Fluent; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; using Moq.Protected; + using static Microsoft.Azure.Cosmos.SDK.EmulatorTests.ClientCreateAndInitializeTest; [TestClass] public class ClientCreateAndInitializeTest : BaseCosmosClientHelper @@ -20,7 +23,7 @@ public class ClientCreateAndInitializeTest : BaseCosmosClientHelper private ContainerInternal Container = null; private const string PartitionKey = "/pk"; - [TestInitialize] + // [TestInitialize] public async Task TestInitialize() { await this.TestInit(); @@ -364,5 +367,80 @@ public async Task CreateAndInitializeAsync_WithValidDatabaseAndInvalidContainer_ Assert.IsNotNull(ce); Assert.AreEqual(HttpStatusCode.NotFound, ce.StatusCode); } + + [TestMethod] + [Owner("dkunda")] + public async Task CreateAndInitializeAsync_LocalTest() + { + string databaseName = "thin-client-test-db"; + string containerName = "thin-client-test-container-1"; + CosmosClientOptions clientOptions = new () + { + ApplicationPreferredRegions = new List() + { + Regions.WestUS2, + }, + EnablePartitionLevelFailover = false, + ConnectionMode = ConnectionMode.Direct, + RequestTimeout = TimeSpan.FromSeconds(500), + OpenTcpConnectionTimeout = TimeSpan.FromSeconds(5), + DisableServerCertificateValidation = true, + ServerCertificateCustomValidationCallback = (X509Certificate2 cerf, X509Chain chain, SslPolicyErrors error) => true + }; + + CosmosClient client = new CosmosClient( + "account", + "key", + clientOptions + ); + + DatabaseResponse dbResponse = await client.CreateDatabaseIfNotExistsAsync(databaseName); + + ContainerProperties properties = new ContainerProperties(id: containerName, partitionKeyPath: PartitionKey); ; + + await dbResponse.Database.CreateContainerIfNotExistsAsync(properties); + + Cosmos.Database database = client.GetDatabase(databaseName); + Container container = database.GetContainer(containerName); + Random random = new (); + + for (int i = 0; i < 5; i++) + { + Comment comment = new Comment(Guid.NewGuid().ToString(), "pk", random.Next().ToString(), "abc@def.com", "blabla"); + try + { + ItemResponse writeResponse = await container.CreateItemAsync( + item: comment, + partitionKey: new Cosmos.PartitionKey(comment.pk) + ); + + Console.WriteLine("Comment ID: " + comment.id); + Console.WriteLine(writeResponse.Diagnostics); + + ItemResponse readResponse = await container.ReadItemAsync( + id: comment.id, + partitionKey: new Cosmos.PartitionKey(comment.pk) + ); + + Console.WriteLine(readResponse.Diagnostics); + } + catch (CosmosException ce) + { + Console.WriteLine(ce.Diagnostics); + } + catch (Exception ex) + { + Console.WriteLine("Exception Occurred: " + ex.Message); + } + } + } + + public record Comment( + string id, + string pk, + string name, + string email, + string body + ); } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Benchmarks/ThinClientIntegrationBenchmark.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Benchmarks/ThinClientIntegrationBenchmark.cs new file mode 100644 index 0000000000..b065e91469 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Benchmarks/ThinClientIntegrationBenchmark.cs @@ -0,0 +1,404 @@ +namespace Microsoft.Azure.Cosmos.Performance.Tests +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Net; + using System.Text; + using System.Threading.Tasks; + using BenchmarkDotNet.Attributes; + using BenchmarkDotNet.Configs; + using BenchmarkDotNet.Columns; + using BenchmarkDotNet.Exporters; + using BenchmarkDotNet.Exporters.Csv; + using BenchmarkDotNet.Jobs; + using Microsoft.Azure.Cosmos; + using Newtonsoft.Json; + using BenchmarkDotNet.Diagnosers; + + [MemoryDiagnoser] + [BenchmarkCategory("NewGateBenchmark")] + [Config(typeof(CustomBenchmarkConfig))] + public class ThinClientIntegrationBenchmark + { + private readonly string databaseName = "MyTestDatabase"; + private readonly string containerName = "MyTestContainer"; + private readonly string partitionKeyPath = "/pk"; + private readonly List readItems = new(); + private readonly List upsertItems = new(); + private readonly List replaceItems = new(); + private readonly List deleteItems = new(); + private readonly List deleteStreamItems = new(); + private readonly Random random = new(); + private CosmosClient client; + private Database database; + private Container container; + + private const int DeleteItemsCount = 100; + + [GlobalSetup(Targets = new[] { nameof(CreateItemAsync), nameof(CreateItemStreamAsync) })] + public async Task GlobalSetupCreate() + { + await this.InitializeDatabaseAndContainers(); + } + + [GlobalSetup(Targets = new[] { nameof(ReadItemAsync), nameof(ReadItemStreamAsync) })] + public async Task GlobalSetupRead() + { + await this.InitializeDatabaseAndContainers(); + await this.InitializeContainerWithPreCreatedItemsAsync(0, 100, this.readItems); + Console.WriteLine("Inserted documents for read benchmark."); + } + + [GlobalSetup(Targets = new[] { nameof(UpsertItemAsync), nameof(UpsertItemStreamAsync) })] + public async Task GlobalSetupUpsert() + { + await this.InitializeDatabaseAndContainers(); + await this.InitializeContainerWithPreCreatedItemsAsync(0, 100, this.upsertItems); + Console.WriteLine("Inserted documents for upsert benchmark."); + } + + [GlobalSetup(Targets = new[] { nameof(ReplaceItemAsync), nameof(ReplaceItemStreamAsync) })] + public async Task GlobalSetupReplace() + { + await this.InitializeDatabaseAndContainers(); + await this.InitializeContainerWithPreCreatedItemsAsync(0, 100, this.replaceItems); + Console.WriteLine("Inserted documents for replace benchmark."); + } + + [GlobalSetup(Targets = new[] { nameof(DeleteItemAsync) })] + public async Task GlobalSetupDelete() + { + await this.InitializeDatabaseAndContainers(); + Console.WriteLine("Database and container ready for delete benchmark."); + } + + [GlobalSetup(Targets = new[] { nameof(DeleteItemStreamAsync) })] + public async Task GlobalSetupDeleteStream() + { + await this.InitializeDatabaseAndContainers(); + Console.WriteLine("Database and container ready for delete stream benchmark."); + } + + // Must be synchronous (void) for IterationSetup + [IterationSetup(Target = nameof(DeleteItemAsync))] + public void IterationSetupDelete() + { + this.deleteItems.Clear(); + // Blocking async call + this.InitializeContainerWithPreCreatedItemsAsync(0, DeleteItemsCount, this.deleteItems) + .GetAwaiter().GetResult(); + } + + [IterationSetup(Target = nameof(DeleteItemStreamAsync))] + public void IterationSetupDeleteStream() + { + this.deleteStreamItems.Clear(); + // Blocking async call + this.InitializeContainerWithPreCreatedItemsAsync(0, DeleteItemsCount, this.deleteStreamItems) + .GetAwaiter().GetResult(); + } + + private async Task InitializeDatabaseAndContainers() + { + Console.WriteLine("Creating Database and Containers."); + + CosmosClientOptions clientOptions = new CosmosClientOptions + { + ConnectionMode = ConnectionMode.Gateway + }; + + this.client = new CosmosClient( + "accountenpoint", + "key", + clientOptions); + + this.database = await this.client.CreateDatabaseIfNotExistsAsync(this.databaseName); + + ContainerResponse containerResponse = await this.database.CreateContainerIfNotExistsAsync( + id: this.containerName, + partitionKeyPath: this.partitionKeyPath, + throughput: 10000); + + this.database = this.client.GetDatabase(this.databaseName); + this.container = containerResponse.Container; + + Console.WriteLine("Successfully created Database and Containers with status: " + containerResponse.StatusCode); + } + + private async Task InitializeContainerWithPreCreatedItemsAsync(int start, int count, List items) + { + for (int i = start; i < count; i++) + { + Comment comment = this.GetRandomCommentItem(); + ItemResponse writeResponse = await this.container.CreateItemAsync( + item: comment, + partitionKey: new PartitionKey(comment.pk) + ); + + if (writeResponse.StatusCode == HttpStatusCode.Created) + { + items.Add(comment); + } + } + } + + [GlobalCleanup] + public async Task CleanupAsync() + { + Console.WriteLine("Cleaning up resources..."); + await this.container.DeleteContainerAsync(); + await this.database.DeleteAsync(); + this.client.Dispose(); + } + + [Benchmark] + public async Task CreateItemAsync() + { + Comment comment = this.GetRandomCommentItem(); + + ItemResponse itemResponse = await this.container.CreateItemAsync( + item: comment, + partitionKey: new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.Created) + { + Console.WriteLine($"Error: Item {comment.id} was not created."); + } + } + + [Benchmark] + public async Task CreateItemStreamAsync() + { + Comment comment = this.GetRandomCommentItem(); + + using Stream stream = ToStream(comment); + ResponseMessage itemResponse = await this.container.CreateItemStreamAsync(stream, new PartitionKey(comment.pk)); + + if (itemResponse.StatusCode != HttpStatusCode.Created) + { + Console.WriteLine($"Error: Item {comment.id} was not created stream."); + } + } + + [Benchmark] + public async Task ReadItemAsync() + { + int index = this.random.Next(this.readItems.Count); + Comment comment = this.readItems[index]; + + ItemResponse itemResponse = await this.container.ReadItemAsync( + id: comment.id, + partitionKey: new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.OK) + { + Console.WriteLine($"Error: Item {comment.id} was not read."); + } + } + + [Benchmark] + public async Task ReadItemStreamAsync() + { + int index = this.random.Next(this.readItems.Count); + Comment comment = this.readItems[index]; + + ResponseMessage itemResponse = await this.container.ReadItemStreamAsync( + id: comment.id, + partitionKey: new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.OK) + { + Console.WriteLine($"Error: Item {comment.id} was not read stream."); + } + } + + [Benchmark] + public async Task UpsertItemAsync() + { + int index = this.random.Next(this.upsertItems.Count); + Comment comment = this.upsertItems[index]; + comment.Name = "UpdatedName"; + + ItemResponse itemResponse = await this.container.UpsertItemAsync( + item: comment, + partitionKey: new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.OK) + { + Console.WriteLine($"Error: Item {comment.id} was not upserted."); + } + } + + [Benchmark] + public async Task UpsertItemStreamAsync() + { + int index = this.random.Next(this.upsertItems.Count); + Comment comment = this.upsertItems[index]; + comment.Name = "UpdatedNameStream"; + + using Stream stream = ToStream(comment); + + ResponseMessage itemResponse = await this.container.UpsertItemStreamAsync( + stream, + new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.OK) + { + Console.WriteLine($"Error: Item {comment.id} was not upserted stream."); + } + } + + [Benchmark] + public async Task ReplaceItemAsync() + { + int index = this.random.Next(this.replaceItems.Count); + Comment comment = this.replaceItems[index]; + comment.Name = "ReplacedName"; + + ItemResponse itemResponse = await this.container.ReplaceItemAsync( + comment, + comment.id, + new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.OK) + { + Console.WriteLine($"Error: Item {comment.id} was not replaced."); + } + } + + [Benchmark] + public async Task ReplaceItemStreamAsync() + { + int index = this.random.Next(this.replaceItems.Count); + Comment comment = this.replaceItems[index]; + comment.Name = "ReplacedNameStream"; + + using Stream stream = ToStream(comment); + ResponseMessage itemResponse = await this.container.ReplaceItemStreamAsync( + stream, + comment.id, + new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode != HttpStatusCode.OK) + { + Console.WriteLine($"Error: Item {comment.id} was not replaced stream."); + } + } + + [Benchmark] + public async Task DeleteItemAsync() + { + // Items are populated in IterationSetupDelete + int index = this.random.Next(this.deleteItems.Count); + Comment comment = this.deleteItems[index]; + + ItemResponse itemResponse = await this.container.DeleteItemAsync( + comment.id, + new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode == HttpStatusCode.NotFound) + { + Console.WriteLine($"Error: Item {comment.id} was not deleted."); + } + else + { + this.deleteItems.RemoveAt(index); + } + } + + [Benchmark] + public async Task DeleteItemStreamAsync() + { + // Items are populated in IterationSetupDeleteStream + int index = this.random.Next(this.deleteStreamItems.Count); + Comment comment = this.deleteStreamItems[index]; + + ResponseMessage itemResponse = await this.container.DeleteItemStreamAsync( + comment.id, + new PartitionKey(comment.pk) + ); + + if (itemResponse.StatusCode == HttpStatusCode.NotFound) + { + Console.WriteLine($"Error: Item {comment.id} was not deleted stream."); + } + else + { + this.deleteStreamItems.RemoveAt(index); + } + } + + private static Stream ToStream(T input) + { + string json = JsonConvert.SerializeObject(input); + return new MemoryStream(Encoding.UTF8.GetBytes(json)); + } + + private class CustomBenchmarkConfig : ManualConfig + { + public CustomBenchmarkConfig() + { + this.AddColumn(StatisticColumn.OperationsPerSecond); + this.AddColumn(StatisticColumn.Q3); + this.AddColumn(StatisticColumn.P80); + this.AddColumn(StatisticColumn.P85); + this.AddColumn(StatisticColumn.P90); + this.AddColumn(StatisticColumn.P95); + this.AddColumn(StatisticColumn.P100); + + this.AddDiagnoser(new IDiagnoser[] { MemoryDiagnoser.Default, ThreadingDiagnoser.Default }); + this.AddColumnProvider(DefaultConfig.Instance.GetColumnProviders().ToArray()); + + // Minimal run to reduce time + this.AddJob(Job.ShortRun + .WithStrategy(BenchmarkDotNet.Engines.RunStrategy.Throughput)); + + this.AddExporter(HtmlExporter.Default); + this.AddExporter(CsvExporter.Default); + } + } + + public class Comment + { + public string id; + public string pk; + public string Name; + public string Email; + public string Body; + + public Comment( + string id, + string pk, + string name, + string email, + string body) + { + this.id = id; + this.pk = pk; + this.Name = name; + this.Email = email; + this.Body = body; + } + } + + private Comment GetRandomCommentItem() + { + return new( + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + this.random.Next().ToString(), + "dkunda@test.com", + "This document is intended for binary encoding perf testing."); + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AsyncCacheTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AsyncCacheTest.cs index 586da4f4f3..23378b0eaf 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AsyncCacheTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/AsyncCacheTest.cs @@ -13,6 +13,8 @@ namespace Microsoft.Azure.Cosmos.Tests using Microsoft.Azure.Cosmos.Common; using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.Azure.Documents; + using System.Net.Http; + using static Microsoft.Azure.Cosmos.GatewayAccountReaderTests; [TestClass] public class AsyncCacheTest @@ -314,6 +316,56 @@ await Task.Factory.StartNew(() => ); } + [TestMethod] + public async Task LocalHttp2Testing() + { + HttpMessageHandler messageHandler = new CustomMessageHandler(); + ConnectionPolicy connectionPolicy = new ConnectionPolicy() + { + }; + + CosmosHttpClientCore cosmosHttpClient = (CosmosHttpClientCore)CosmosHttpClientCore.CreateWithConnectionPolicy( + apiType: ApiType.None, + eventSource: DocumentClientEventSource.Instance, + connectionPolicy: connectionPolicy, + httpMessageHandler: null, + sendingRequestEventArgs: null, + receivedResponseEventArgs: null); + + using (cosmosHttpClient) + { + // Create an HttpRequestMessage + HttpRequestMessage request = new HttpRequestMessage() + { + RequestUri = new Uri("https://jsonplaceholder.typicode.com/posts"), + Method = HttpMethod.Get, + Version = new Version(1, 1) // Set HTTP/2 here + }; + + try + { + // Send the request + HttpResponseMessage response = await cosmosHttpClient.ExecuteHttpHelperAsync(request, ResourceType.Address, default); + + // Ensure we got a successful response + if (!response.IsSuccessStatusCode) + { + Console.WriteLine($"Error: {response.StatusCode}"); + } + else + { + // Read the response content + string content = await response.Content.ReadAsStringAsync(); + Console.WriteLine(content); + } + } + catch (HttpRequestException e) + { + Console.WriteLine($"Request exception: {e.Message}"); + } + } + } + private int GenerateIntFuncThatThrows() { throw new InvalidOperationException();