diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java index 0ea494c8dbc64..bd26bb53a0b59 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxGatewayStoreModelTest.java @@ -274,7 +274,7 @@ public void validateApiType() throws Exception { ResourceType.Document); try { - storeModel.performRequest(dsr, HttpMethod.POST).block(); + storeModel.performRequest(dsr).block(); fail("Request should fail"); } catch (Exception e) { //no-op diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java new file mode 100644 index 0000000000000..772be371b0c89 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java @@ -0,0 +1,69 @@ +package com.azure.cosmos.implementation; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.implementation.http.Http2ConnectionConfig; +import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpClientConfig; +import com.azure.cosmos.implementation.http.HttpHeaders; +import com.azure.cosmos.implementation.http.HttpRequest; +import com.azure.cosmos.implementation.http.ReactorNettyClient; +import io.netty.channel.ConnectTimeoutException; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; + +public class ThinClientStoreModelTest { + @Test(groups = "unit") + public void testThinClientStoreModel() throws Exception { + DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); + Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig(); + Mockito + .doReturn(ImplementationBridgeHelpers + .CosmosDiagnosticsHelper + .getCosmosDiagnosticsAccessor() + .create(clientContext, 1d)) + .when(clientContext).createDiagnostics(); + + String sdkGlobalSessionToken = "1#100#1=20#2=5#3=30"; + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + Mockito.doReturn(sdkGlobalSessionToken).when(sessionContainer).resolveGlobalSessionToken(any()); + + GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); + + Mockito.doReturn(new URI("https://localhost")) + .when(globalEndpointManager).resolveServiceEndpoint(any()); + + // mocking with HTTP/1.1 client, just using this test as basic store model validation. e2e request flow + // with HTTP/2 will be tested in future PR once the wiring is all connected + HttpClient httpClient = Mockito.mock(HttpClient.class); + Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException())); + + ThinClientStoreModel storeModel = new ThinClientStoreModel( + clientContext, + sessionContainer, + ConsistencyLevel.SESSION, + new UserAgentContainer(), + globalEndpointManager, + httpClient); + + RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Read, + "/fakeResourceFullName", + ResourceType.Document); + + try { + storeModel.performRequest(dsr).block(); + } catch (Exception e) { + //no-op + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index a04313a3d34c3..c6d167e877782 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -282,6 +282,10 @@ public static class HttpHeaders { // Priority Level for throttling public static final String PRIORITY_LEVEL = "x-ms-cosmos-priority-level"; + + // Thinclient headers + public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type"; + public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type"; } public static class A_IMHeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index a0b7daf4b351b..6e19d5770ef51 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.directconnectivity.WFConstants; import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; +import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import com.azure.cosmos.implementation.routing.Range; @@ -29,6 +30,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * This is core Transport/Connection agnostic request to the Azure Cosmos DB database service. @@ -89,6 +93,8 @@ public class RxDocumentServiceRequest implements Cloneable { private volatile boolean hasFeedRangeFilteringBeenApplied = false; + private final AtomicReference httpTransportSerializer = new AtomicReference<>(null); + public boolean isReadOnlyRequest() { return this.operationType.isReadOnlyOperation(); } @@ -1233,4 +1239,28 @@ public String getEffectivePartitionKey() { public void setEffectivePartitionKey(String effectivePartitionKey) { this.effectivePartitionKey = effectivePartitionKey; } + + public void setThinclientHeaders(String operationType, String resourceType) { + this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType); + this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType); + } + + public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) { + this.httpTransportSerializer.set(transportSerializer); + + return this; + } + + public HttpTransportSerializer getEffectiveHttpTransportSerializer( + HttpTransportSerializer defaultTransportSerializer) { + + checkNotNull(defaultTransportSerializer, "Argument 'defaultTransportSerializer' must not be null."); + + HttpTransportSerializer snapshot = this.httpTransportSerializer.get(); + if (snapshot != null) { + return snapshot; + } + + return defaultTransportSerializer; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 4ff506b711658..0ef03f72a061e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -12,7 +12,9 @@ import com.azure.cosmos.implementation.directconnectivity.GatewayServiceConfigurationReader; import com.azure.cosmos.implementation.directconnectivity.HttpUtils; import com.azure.cosmos.implementation.directconnectivity.RequestHelper; +import com.azure.cosmos.implementation.directconnectivity.ResourceOperation; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.implementation.directconnectivity.Uri; import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility; import com.azure.cosmos.implementation.faultinjection.GatewayServerErrorInjector; import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; @@ -20,6 +22,7 @@ import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; +import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; @@ -47,6 +50,7 @@ import java.util.concurrent.Callable; import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders.INTENDED_COLLECTION_RID_HEADER; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * While this class is public, but it is not part of our published public APIs. @@ -54,7 +58,7 @@ * * Used internally to provide functionality to communicate and process response from GATEWAY in the Azure Cosmos DB database service. */ -public class RxGatewayStoreModel implements RxStoreModel { +public class RxGatewayStoreModel implements RxStoreModel, HttpTransportSerializer { private static final boolean HTTP_CONNECTION_WITHOUT_TLS_ALLOWED = Configs.isHttpConnectionWithoutTLSAllowed(); private final DiagnosticsClientContext clientContext; @@ -83,29 +87,12 @@ public RxGatewayStoreModel( ApiType apiType) { this.clientContext = clientContext; - this.defaultHeaders = new HashMap<>(); - this.defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, - "no-cache"); - this.defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, - HttpConstants.Versions.CURRENT_VERSION); - this.defaultHeaders.put( - HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, - HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES); - - if (apiType != null) { - this.defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString()); - } if (userAgentContainer == null) { userAgentContainer = new UserAgentContainer(); } - this.defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); - - if (defaultConsistencyLevel != null) { - this.defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, - defaultConsistencyLevel.toString()); - } + this.defaultHeaders = this.getDefaultHeaders(apiType, userAgentContainer, defaultConsistencyLevel); this.defaultConsistencyLevel = defaultConsistencyLevel; this.globalEndpointManager = globalEndpointManager; @@ -126,6 +113,40 @@ public RxGatewayStoreModel(RxGatewayStoreModel inner) { this.sessionContainer = inner.sessionContainer; } + protected Map getDefaultHeaders( + ApiType apiType, + UserAgentContainer userAgentContainer, + ConsistencyLevel clientDefaultConsistencyLevel) { + + checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null."); + + Map defaultHeaders = new HashMap<>(); + defaultHeaders.put(HttpConstants.HttpHeaders.CACHE_CONTROL, + "no-cache"); + defaultHeaders.put(HttpConstants.HttpHeaders.VERSION, + HttpConstants.Versions.CURRENT_VERSION); + defaultHeaders.put( + HttpConstants.HttpHeaders.SDK_SUPPORTED_CAPABILITIES, + HttpConstants.SDKSupportedCapabilities.SUPPORTED_CAPABILITIES); + + if (apiType != null) { + defaultHeaders.put(HttpConstants.HttpHeaders.API_TYPE, apiType.toString()); + } + + if (userAgentContainer == null) { + userAgentContainer = new UserAgentContainer(); + } + + defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + + if (clientDefaultConsistencyLevel != null) { + defaultHeaders.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, + clientDefaultConsistencyLevel.toString()); + } + + return defaultHeaders; + } + void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) { this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader; } @@ -162,40 +183,46 @@ public void setCollectionCache(RxClientCollectionCache collectionCache) { this.collectionCache = collectionCache; } - private Mono create(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } - - private Mono patch(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.PATCH); - } - - private Mono upsert(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } - - private Mono read(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.GET); - } - - private Mono replace(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.PUT); - } - - private Mono delete(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.DELETE); - } + @Override + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + HttpMethod method = getHttpMethod(request); + HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); - private Mono deleteByPartitionKey(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); + Flux contentAsByteArray = request.getContentAsByteArrayFlux(); + return new HttpRequest(method, + requestUri, + requestUri.getPort(), + httpHeaders, + contentAsByteArray); } - private Mono execute(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.POST); - } + @Override + public StoreResponse unwrapToStoreResponse( + RxDocumentServiceRequest request, + int statusCode, + HttpHeaders headers, + ByteBuf content) { + + checkNotNull(headers, "Argument 'headers' must not be null."); + checkNotNull( + content, + "Argument 'content' must not be null - use empty ByteBuf when theres is no payload."); + + // If there is any error in the header response this throws exception + validateOrThrow(request, HttpResponseStatus.valueOf(statusCode), headers, content); + + int size; + if ((size = content.readableBytes()) > 0) { + return new StoreResponse(statusCode, + HttpUtils.unescape(headers.toMap()), + new ByteBufInputStream(content, true), + size); + } - private Mono readFeed(RxDocumentServiceRequest request) { - return this.performRequest(request, HttpMethod.GET); + return new StoreResponse(statusCode, + HttpUtils.unescape(headers.toMap()), + null, + 0); } private Mono query(RxDocumentServiceRequest request) { @@ -215,10 +242,10 @@ private Mono query(RxDocumentServiceRequest request) RuntimeConstants.MediaTypes.QUERY_JSON); break; } - return this.performRequest(request, HttpMethod.POST); + return this.performRequest(request); } - public Mono performRequest(RxDocumentServiceRequest request, HttpMethod method) { + public Mono performRequest(RxDocumentServiceRequest request) { try { if (request.requestContext.cosmosDiagnostics == null) { request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics(); @@ -228,10 +255,10 @@ public Mono performRequest(RxDocumentServiceRequest r request.requestContext.resourcePhysicalAddress = uri.toString(); if (this.throughputControlStore != null) { - return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, method, uri))); + return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, uri))); } - return this.performRequestInternal(request, method, uri); + return this.performRequestInternal(request, uri); } catch (Exception e) { return Mono.error(e); } @@ -241,23 +268,15 @@ public Mono performRequest(RxDocumentServiceRequest r * Given the request it creates an flux which upon subscription issues HTTP call and emits one RxDocumentServiceResponse. * * @param request - * @param method * @param requestUri * @return Flux */ - public Mono performRequestInternal(RxDocumentServiceRequest request, HttpMethod method, URI requestUri) { + public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { try { - - HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); - - Flux contentAsByteArray = request.getContentAsByteArrayFlux(); - - HttpRequest httpRequest = new HttpRequest(method, - requestUri, - requestUri.getPort(), - httpHeaders, - contentAsByteArray); + HttpRequest httpRequest = request + .getEffectiveHttpTransportSerializer(this) + .wrapInHttpRequest(request, requestUri); Mono httpResponseMono = this.httpClient.send(httpRequest, request.getResponseTimeout()); @@ -371,23 +390,9 @@ private Mono toDocumentServiceResponse(Mono 0) { - rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap()), - new ByteBufInputStream(content, true), - size); - } else { - rsp = new StoreResponse(httpResponseStatus, - HttpUtils.unescape(httpResponseHeaders.toMap()), - null, - 0); - } + StoreResponse rsp = request + .getEffectiveHttpTransportSerializer(this) + .unwrapToStoreResponse(request, httpResponseStatus, httpResponseHeaders, content); if (reactorNettyRequestRecord != null) { rsp.setRequestTimeline(reactorNettyRequestRecord.takeTimelineSnapshot()); @@ -518,28 +523,47 @@ private void validateOrThrow(RxDocumentServiceRequest request, } } - private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { + private static HttpMethod getHttpMethod(RxDocumentServiceRequest request) { switch (request.getOperationType()) { case Create: case Batch: - return this.create(request); - case Patch: - return this.patch(request); case Upsert: - return this.upsert(request); + case ExecuteJavaScript: + case SqlQuery: + case Query: + case QueryPlan: + return HttpMethod.POST; + case Patch: + return HttpMethod.PATCH; case Delete: if (request.getResourceType() == ResourceType.PartitionKey) { - return this.deleteByPartitionKey(request); + return HttpMethod.POST; } - return this.delete(request); + return HttpMethod.DELETE; + case Read: + case ReadFeed: + return HttpMethod.GET; + case Replace: + return HttpMethod.PUT; + default: + throw new IllegalStateException( + "Operation type " + request.getOperationType() + " cannot be processed in RxGatewayStoreModel."); + } + } + + private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { + switch (request.getOperationType()) { + case Create: + case Batch: + case Patch: + case Upsert: + case Delete: case ExecuteJavaScript: - return this.execute(request); case Read: - return this.read(request); case ReadFeed: - return this.readFeed(request); case Replace: - return this.replace(request); + return this.performRequest(request); + case SqlQuery: case Query: case QueryPlan: @@ -624,6 +648,10 @@ public void recordOpenConnectionsAndInitCachesStarted(List getDefaultHeaders() { + return this.defaultHeaders; + } + private void captureSessionToken(RxDocumentServiceRequest request, Map responseHeaders) { if (request.getResourceType() == ResourceType.DocumentCollection && request.getOperationType() == OperationType.Delete) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java new file mode 100644 index 0000000000000..747faa9effb57 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -0,0 +1,123 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest; +import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs; +import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.http.HttpHeaders; +import com.azure.cosmos.implementation.http.HttpRequest; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpMethod; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * While this class is public, but it is not part of our published public APIs. + * This is meant to be internally used only by our sdk. + * + * Used internally to provide functionality to communicate and process response from THINCLIENT in the Azure Cosmos DB database service. + */ +public class ThinClientStoreModel extends RxGatewayStoreModel { + + public ThinClientStoreModel( + DiagnosticsClientContext clientContext, + ISessionContainer sessionContainer, + ConsistencyLevel defaultConsistencyLevel, + UserAgentContainer userAgentContainer, + GlobalEndpointManager globalEndpointManager, + HttpClient httpClient) { + super( + clientContext, + sessionContainer, + defaultConsistencyLevel, + QueryCompatibilityMode.Default, + userAgentContainer, + globalEndpointManager, + httpClient, + ApiType.SQL); + } + + public ThinClientStoreModel(ThinClientStoreModel inner) { + super(inner); + } + + @Override + public Mono processMessage(RxDocumentServiceRequest request) { + return super.processMessage(request); + } + + @Override + protected Map getDefaultHeaders( + ApiType apiType, + UserAgentContainer userAgentContainer, + ConsistencyLevel clientDefaultConsistencyLevel) { + + checkNotNull(userAgentContainer, "Argument 'userAGentContainer' must not be null."); + + Map defaultHeaders = new HashMap<>(); + // For ThinClient http/2 used for framing only + // All operation-level headers are only added to the rntbd-encoded message + // the thin client proxy will parse the rntbd headers (not the content!) and substitute any + // missing headers for routing (like partitionId or replicaId) + // Since the Thin client proxy also needs to set the user-agent header to a different value + // it is not added to the rntbd headers - just http-headers in the SDK + defaultHeaders.put(HttpConstants.HttpHeaders.USER_AGENT, userAgentContainer.getUserAgent()); + + return defaultHeaders; + } + + @Override + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + + // todo - neharao1 - validate b/w name() v/s toString() + request.setThinclientHeaders(request.getOperationType().name(), request.getResourceType().name()); + + // todo - neharao1: no concept of a replica / service endpoint that can be passed + RntbdRequestArgs rntbdRequestArgs = new RntbdRequestArgs(request); + + // todo - neharao1: validate what HTTP headers are needed - for now have put default ThinClient HTTP headers + // todo - based on fabianm comment - thinClient also takes op type and resource type headers as HTTP headers + HttpHeaders headers = this.getHttpHeaders(); + + RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs); + + // todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer + // todo: eventually need to use pooled buffer + ByteBuf byteBuf = Unpooled.buffer(); + + // todo: comment can be removed - RntbdRequestEncoder does the same - a type of ChannelHandler in ChannelPipeline (a Netty concept) + // todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format) + // todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy + // todo: need to conditionally add some headers (userAgent, replicaId/endpoint, etc) + rntbdRequest.encode(byteBuf); + + return new HttpRequest( + HttpMethod.POST, + requestUri, + requestUri.getPort(), + headers, + Flux.just(byteBuf.array())); + } + + private HttpHeaders getHttpHeaders() { + HttpHeaders httpHeaders = new HttpHeaders(); + // todo: select only required headers from defaults + Map defaultHeaders = this.getDefaultHeaders(); + + for (Map.Entry header : defaultHeaders.entrySet()) { + httpHeaders.set(header.getKey(), header.getValue()); + } + + // todo: add thin client resourcetype/operationtype headers + return httpHeaders; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java index daaa93ea82fc7..ef01f5d15106f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequest.java @@ -76,7 +76,7 @@ public static RntbdRequest decode(final ByteBuf in) { return new RntbdRequest(header, metadata, payload); } - void encode(final ByteBuf out) { + public void encode(final ByteBuf out) { final int expectedLength = RntbdRequestFrame.LENGTH + this.headers.computeLength(); final int start = out.writerIndex(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java index 32869a5f6460a..e8d9069d1ed29 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestArgs.java @@ -56,6 +56,10 @@ public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest, final Uri this.transportRequestId = instanceCount.incrementAndGet(); } + public RntbdRequestArgs(final RxDocumentServiceRequest serviceRequest) { + this(serviceRequest, Uri.create("")); + } + // region Accessors @JsonProperty diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java new file mode 100644 index 0000000000000..975ae4d4ca77a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java @@ -0,0 +1,17 @@ +package com.azure.cosmos.implementation.http; + +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import io.netty.buffer.ByteBuf; + +import java.net.URI; + +public interface HttpTransportSerializer { + HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception; + + StoreResponse unwrapToStoreResponse( + RxDocumentServiceRequest request, + int statusCode, + HttpHeaders headers, + ByteBuf content); +}