Skip to content

Commit

Permalink
V3 store consistency layer (Azure#194)
Browse files Browse the repository at this point in the history
* inital commit for V3 port from vsts to github

* Hash v2 for SDK v3

bumped version to 3.0.0-beta-3 for release

* Modified Pom file to copy jar files to required location

* updated sdk-version property

* removed additions to pom

* Removing unused code
Changing version to 3.0.0-SNAPSHOT

* Initial check in with reactor netty library

* Initial commit for Non partitioned collection support for V3 async Java
SDK

* updated git ignore

* updated git ignore

* removed conflict markers in comments

* removed unnecessary files

* Removed public constructor that took PartitionKeyInternal

* fixed javadoc errors

* Work In Progress

* Work in progress

* Work In Progress, Added Reactor Netty Client and other interfaces for Reactor HTTP Client

* Reverted test configuration:

* Fixed Gateway Store Model Test

* Removed unused classes, added headers to new classes. Added configuration to Reactor Netty Http Client

* Implemented Reactor HTTP Client for HttpTransportClient

* Fixed HttpTransportClient store response subscribe

* Removed unused code, removed some rxnetty usages

* Fixed Proxy Tests with correct log level for reactor netty client

* Fixed Gateaway Address Cache Tests

* Fixed test cases, optimized imports, and other refactoring changes

* Removed propogating error, instead just throw the error

* Storing Connection provider to close resources later

* Fixed empty body issue with Flux not allowing null values

* Disposing just connection provider on http client shutdown

* increased used memory limit for document leak resource test. Fix it later

* Incorporated port in Http Request

* Replacing read and write time out handlers with idle state handlers

* Removing experimental feature of pooled connection timeout

* initial commit for fixing tetss (will break)

* Removed timeout feature

* Calling http client shutdown in HttpTransportClient

* Fixed Document resource leak test by closing the channel when done

* Experimenting changes

* Commenting out old ssl provider

* Setting ssl timeout explicitly

* Trying out default ssl

* Trying out default ssl

* Trying out ssl config

* Trying out default ssl

* Using vanila ssl

* fixed testsuitebase

* Improving the configuration for http client

* Merging v3_reactor_netty_client_experiment changes

* Reverting Test Configurations change

* Implemented code review suggestions as well as some optimizations and refactoring

* Handling delete operation

* Fixing Delete operation

* Refactoring CosmosClient adding a Builder
Removed cosmos configuration
Adding getters for scripts
Adding CosmosUser
Changing Options to composition

* WIP: Work in progress

* - Added CosmosUser, get APIs for scripts
- Removed CosmosConfiguration and moved builder to CosmosClient

* Added retry utility for Backoff retry utility

* - Ported Cosmos item changes

* fixed more tests

* Minor refactoring
Adding listUsers and queryUsers

* Compilation work. Work in progress

* Refactoring in Reactor Netty Client:

* fixed database tests

* Fixed RxGatewayStoreModelTests

* Fixed Store reader, consistency reader, writer, address selector test cases

* fixed more tests

* Replaced Func1 with java Function

* - Fixed more tests
- rx.internal tests run using asyncDocumentClient

* Updated store and consistency reader writers, Collection caches, partition caches, helpers, address resolvers, global endpoint managers, retry policies with reactor-core changes

* Updated query contexts, document service requests, cosmos client changes

* Fixed bechmarks, and some tests in commons

* fixed more tests

* fixed more tests. commented out some tests for compilation

* Implementing PR comments

* Fixed Direct Implementation test cases

* Refactoring internal tests to Reactor

* Work in progress, fixing location cache test

* Fixing Async benchmark after Mo's changes

* Fixed Examples source code and test cases

* Fixing javadoc warnings

* Refactoring internal query tests to Reactor

* Refactoring gateway tests to Reactor

* Removed unused code, refactored to use internal test suite base

* Fixed concurrency issue with internal test suite base, imported internal test suite base in rx.internal tests

* Minor changes, removed invalid usage of TransportClient

* Fixed gateway tests, removed verbose test subscriber, using basic test subscriber for now

* Handling IO Exception for create  attachment and media. Fixing Consistency and Retry Create Document tests

* Fixing typo in doc

* Work in progress

* reverted changes to examples utils

* Added paginator prototype code

* Merging V3

* Code-complete draft port to reactor. The Direct TCP-related code compiles and is not yet testable because other significant portions of the code do not currently compile.

* Code-complete draft port to reactor. The Direct TCP-related code compiles and is not yet testable because other significant portions of the code do not currently compile.

* Renamed Reactor Transport Client to Transport Client, removed rx.Observable and rx.Single usages completely

* fixed more tests
made shared resources private

* Addressed some error handling ommissions in RntbdRequestManager and verified locally on the V2 branch that the long test suite passes.

* Fixed more tests to use getSharedDatabase and similar methods

* Addressed some error handling ommissions in RntbdRequestManager and verified locally on the V2 branch that the long test suite passes.

* Fixed buffer size issue

* Added Cosmos Request options

* Renamed COSMOS.PROTOCOL as cosmos.directModeProtocol and move ConfigsTests to the package that Configs is defined

* Fixing tests and refactored the way to get values

* Icreased Ssl handshake timeout for reactor netty client

* Ensured that our dependency on org.hamcreate:hamcrest-all:1.3 is respected by mockito which brings in hacrest-all:1.1. Also: sorted dependency version properties and dependency lists.

* Addressed RntbdTransportClient test faiures

* Minor test fixes

* We no longer skip the DocumentClientResourceLeakTest when Direct TCP is in use

* We no longer permit VeryLargeDocumentQueryTest to fail when Direct TCP is in use. Also improved logger.info messages in DocumentClientResourceLeakTest

* We no longer permit DCDocumentCrudTest.crossPartitionQuery to fail when Direct TCP is in use.

* We no longer permit BackPressureCrossPartition.query to fail when Direct TCP is in use.

* skipped tcp tests

* removed unused imports

* Resolves a long-standing TODO: Ensure that the Configs instance created by DirectHttpsClientUnderTest specifies Protocol.Https (now that Direct TCP is the default connection mode)

* changes to ConsistencyLevel

* TestSuiteBase.beforeMethod correctly logs the connection mode: Direct Tcp, Direct Https, or Gateway Https

* Merged enable TCP default protocol changes

* Fixed Address Resolver Tests

* Disabling connection pool exhausted tests

* Fixing Config Tests

* removed changes to DocumentCrudTests
edited manner in which ConsistencyLevel.toString is implemented

* removed unused import

* reverted changes to documentcrud tests

* fixed failing test

* made more ConsistencyLevel.name -> toString changes

* changed line endings

* Removed rx Composite Exception and used reactor Exceptions utility class to handle multiple exceptions

* Made more changes to enums
removed cosmos tests

* more enum fixes

* more enum changes

* Test tweaks

* Made more enum changes

* fixed line endings in includepathtest

* Fixed pagination and order by utils bugs

* Swapped rx Http Client with Reactor Http Client in Cosmos Partition Key Tests

* Removed rx java dependency

* Fixing tests

* Removed throws Exception and unused imports

* Optimized imports in Microsoft style

* Added parallel run code for bulk insert blocking method in Test Suite Base

* Minor test fixes

* Trying default Http Client

* re-using https client

* Disabling document resource leak test

* Fixing up schedulers

* fixed CosmosPartitionKeyTests and ConsistencyTests null pointer issues

* Added logging to netty client and fixed logger category

* Fixing spy wire test

* fixed failing tests

* Removing unused imports

* Minor test changes

* Removed usages of bodyAsString method from http response as its not working correctly, implemented toString to convert byteBuf flux to Mono string

* Increased timeout

* Replaced flux with mono across consistency layer to avoid confusion

* Reverting test configurations changes

* Fixed Parallel Document Query Context

* Fixed Protocol tests

* Replaced exceptions by Mono / Flux Error

* Incresaed wait to capture requests

* Added block instead of subscribe to capture response

* Fixed ReadMyWrites test with emitting just one value

* Modified the order of client builders and reverted flux / mono change for consistency stack

* Fixed Read my writes workflow

* Removed unnecessary stack trace

* Fixed Benchmarks to use BaseSubscriber

* Adding conflict api (Azure#192)

Fixing Permission tests
Making id method of Resource package private and adding id on
individual resource classes to enable fluent composition

* Removed unused ConfigsTests.java class

* Fixed Read latency and write latency benchmarks

* Fixed metrics version in parent pom.xml

* Revised DocumentCrudTest and TestSuiteBase to address test breaks

* Using Step Verifier, removed RetryAnalyzer

* POM + test tweaks.

* Eliminating some tests to test fast integration tests

* Reverting the experiment

* Fixed test break (see changes to CosmosPartitionKeyTests), tidied some test code (see changes to StoredProcedureCrudTest) and deleted two extraneous files

* Removed unnecessary call

* Code review comments
  • Loading branch information
kushagraThapar authored and Christopher Anderson committed Jun 21, 2019
1 parent b5d2dc1 commit 7b0dc3c
Show file tree
Hide file tree
Showing 248 changed files with 6,673 additions and 6,600 deletions.
11 changes: 6 additions & 5 deletions benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,6 @@
<version>${netty-tcnative.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-guava</artifactId>
<version>${rxjava-guava.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
Expand All @@ -181,6 +176,12 @@
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/*
* The MIT License (MIT)
* Copyright (c) 2018 Microsoft Corporation
*
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand Down Expand Up @@ -41,10 +41,11 @@
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import org.apache.commons.lang3.RandomStringUtils;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

import java.net.InetSocketAddress;
import java.time.Duration;
Expand Down Expand Up @@ -74,11 +75,11 @@ abstract class AsyncBenchmark<T> {

AsyncBenchmark(Configuration cfg) {
client = new AsyncDocumentClient.Builder()
.withServiceEndpoint(cfg.getServiceEndpoint())
.withMasterKeyOrResourceToken(cfg.getMasterKey())
.withConnectionPolicy(cfg.getConnectionPolicy())
.withConsistencyLevel(cfg.getConsistencyLevel())
.build();
.withServiceEndpoint(cfg.getServiceEndpoint())
.withMasterKeyOrResourceToken(cfg.getMasterKey())
.withConnectionPolicy(cfg.getConnectionPolicy())
.withConsistencyLevel(cfg.getConsistencyLevel())
.build();

logger = LoggerFactory.getLogger(this.getClass());

Expand All @@ -89,7 +90,7 @@ abstract class AsyncBenchmark<T> {
concurrencyControlSemaphore = new Semaphore(cfg.getConcurrency());
configuration = cfg;

ArrayList<Observable<Document>> createDocumentObservables = new ArrayList<>();
ArrayList<Flux<Document>> createDocumentObservables = new ArrayList<>();

if (configuration.getOperationType() != Operation.WriteLatency
&& configuration.getOperationType() != Operation.WriteThroughput
Expand All @@ -105,13 +106,13 @@ abstract class AsyncBenchmark<T> {
newDoc.set("dataField3", dataFieldValue);
newDoc.set("dataField4", dataFieldValue);
newDoc.set("dataField5", dataFieldValue);
Observable<Document> obs = client.createDocument(collection.selfLink(), newDoc, null, false)
.map(ResourceResponse::getResource);
Flux<Document> obs = client.createDocument(collection.selfLink(), newDoc, null, false)
.map(ResourceResponse::getResource);
createDocumentObservables.add(obs);
}
}

docsToRead = Observable.merge(createDocumentObservables, 100).toList().toBlocking().single();
docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), 100).collectList().block();
init();

if (configuration.isEnableJvmStats()) {
Expand All @@ -123,14 +124,14 @@ abstract class AsyncBenchmark<T> {
if (configuration.getGraphiteEndpoint() != null) {
final Graphite graphite = new Graphite(new InetSocketAddress(configuration.getGraphiteEndpoint(), configuration.getGraphiteEndpointPort()));
reporter = GraphiteReporter.forRegistry(metricsRegistry)
.prefixedWith(configuration.getOperationType().name())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.build(graphite);
.prefixedWith(configuration.getOperationType().name())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.build(graphite);
} else {
reporter = ConsoleReporter.forRegistry(metricsRegistry).convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS).build();
.convertDurationsTo(TimeUnit.MILLISECONDS).build();
}
}

Expand Down Expand Up @@ -163,7 +164,7 @@ protected String getDocumentLink(Document doc) {
}
}

protected abstract void performWorkload(Subscriber<T> subs, long i) throws Exception;
protected abstract void performWorkload(BaseSubscriber<T> baseSubscriber, long i) throws Exception;

private boolean shouldContinue(long startTimeMillis, long iterationCount) {
Duration maxDurationTime = configuration.getMaxRunningTimeDuration();
Expand Down Expand Up @@ -199,14 +200,19 @@ void run() throws Exception {
long i;
for ( i = 0; shouldContinue(startTime, i); i++) {

Subscriber<T> subs = new Subscriber<T>() {
BaseSubscriber<T> baseSubscriber = new BaseSubscriber<T>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
super.hookOnSubscribe(subscription);
}

@Override
public void onStart() {
protected void hookOnNext(T value) {

}

@Override
public void onCompleted() {
protected void hookOnComplete() {
successMeter.mark();
concurrencyControlSemaphore.release();
AsyncBenchmark.this.onSuccess();
Expand All @@ -218,25 +224,21 @@ public void onCompleted() {
}

@Override
public void onError(Throwable e) {
protected void hookOnError(Throwable throwable) {
failureMeter.mark();
logger.error("Encountered failure {} on thread {}" ,
e.getMessage(), Thread.currentThread().getName(), e);
throwable.getMessage(), Thread.currentThread().getName(), throwable);
concurrencyControlSemaphore.release();
AsyncBenchmark.this.onError(e);
AsyncBenchmark.this.onError(throwable);

synchronized (count) {
count.incrementAndGet();
count.notify();
}
}

@Override
public void onNext(T value) {
}
};

performWorkload(subs, i);
performWorkload(baseSubscriber, i);
}

synchronized (count) {
Expand All @@ -247,7 +249,7 @@ public void onNext(T value) {

long endTime = System.currentTimeMillis();
logger.info("[{}] operations performed in [{}] seconds.",
configuration.getNumberOfOperations(), (int) ((endTime - startTime) / 1000));
configuration.getNumberOfOperations(), (int) ((endTime - startTime) / 1000));

reporter.report();
reporter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import com.azure.data.cosmos.RequestOptions;
import com.azure.data.cosmos.ResourceResponse;
import org.apache.commons.lang3.RandomStringUtils;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.Random;
import java.util.UUID;
Expand All @@ -50,8 +50,8 @@ class AsyncMixedBenchmark extends AsyncBenchmark<Document> {
}

@Override
protected void performWorkload(Subscriber<Document> subs, long i) throws InterruptedException {
Observable<Document> obs;
protected void performWorkload(BaseSubscriber<Document> documentBaseSubscriber, long i) throws InterruptedException {
Flux<Document> obs;
if (i % 10 == 0 && i % 100 != 0) {

String idString = uuid + i;
Expand Down Expand Up @@ -86,6 +86,6 @@ protected void performWorkload(Subscriber<Document> subs, long i) throws Interru

concurrencyControlSemaphore.acquire();

obs.subscribeOn(Schedulers.computation()).subscribe(subs);
obs.subscribeOn(Schedulers.parallel()).subscribe(documentBaseSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import com.azure.data.cosmos.FeedOptions;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.PartitionKey;
import com.azure.data.cosmos.benchmark.Configuration.Operation;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.Random;

Expand All @@ -54,49 +53,49 @@ protected void onSuccess() {
}

@Override
protected void performWorkload(Subscriber<FeedResponse<Document>> subs, long i) throws InterruptedException {
protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscriber, long i) throws InterruptedException {

Observable<FeedResponse<Document>> obs;
Flux<FeedResponse<Document>> obs;
Random r = new Random();
FeedOptions options = new FeedOptions();

if (configuration.getOperationType() == Operation.QueryCross) {
if (configuration.getOperationType() == Configuration.Operation.QueryCross) {

int index = r.nextInt(1000);
options.enableCrossPartitionQuery(true);
String sqlQuery = "Select * from c where c._rid = \"" + docsToRead.get(index).resourceId() + "\"";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Operation.QuerySingle) {
} else if (configuration.getOperationType() == Configuration.Operation.QuerySingle) {

int index = r.nextInt(1000);
String pk = docsToRead.get(index).getString("pk");
options.partitionKey(new PartitionKey(pk));
String sqlQuery = "Select * from c where c.pk = \"" + pk + "\"";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Operation.QueryParallel) {
} else if (configuration.getOperationType() == Configuration.Operation.QueryParallel) {

options.maxItemCount(10);
options.enableCrossPartitionQuery(true);
String sqlQuery = "Select * from c";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Operation.QueryOrderby) {
} else if (configuration.getOperationType() == Configuration.Operation.QueryOrderby) {

options.maxItemCount(10);
options.enableCrossPartitionQuery(true);
String sqlQuery = "Select * from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Operation.QueryAggregate) {
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregate) {

options.maxItemCount(10);
options.enableCrossPartitionQuery(true);
String sqlQuery = "Select value max(c._ts) from c";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Operation.QueryAggregateTopOrderby) {
} else if (configuration.getOperationType() == Configuration.Operation.QueryAggregateTopOrderby) {

options.enableCrossPartitionQuery(true);
String sqlQuery = "Select top 1 value count(c) from c order by c._ts";
obs = client.queryDocuments(getCollectionLink(), sqlQuery, options);
} else if (configuration.getOperationType() == Operation.QueryTopOrderby) {
} else if (configuration.getOperationType() == Configuration.Operation.QueryTopOrderby) {

options.enableCrossPartitionQuery(true);
String sqlQuery = "Select top 1000 * from c order by c._ts";
Expand All @@ -106,6 +105,6 @@ protected void performWorkload(Subscriber<FeedResponse<Document>> subs, long i)
}
concurrencyControlSemaphore.acquire();

obs.subscribeOn(Schedulers.computation()).subscribe(subs);
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import com.azure.data.cosmos.FeedOptions;
import com.azure.data.cosmos.FeedResponse;
import com.azure.data.cosmos.PartitionKey;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

class AsyncQuerySinglePartitionMultiple extends AsyncBenchmark<FeedResponse<Document>> {

Expand All @@ -56,11 +56,11 @@ protected void onSuccess() {
}

@Override
protected void performWorkload(Subscriber<FeedResponse<Document>> subs, long i) throws InterruptedException {
Observable<FeedResponse<Document>> obs = client.queryDocuments(getCollectionLink(), SQL_QUERY, options);
protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscriber, long i) throws InterruptedException {
Flux<FeedResponse<Document>> obs = client.queryDocuments(getCollectionLink(), SQL_QUERY, options);

concurrencyControlSemaphore.acquire();

obs.subscribeOn(Schedulers.computation()).subscribe(subs);
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}
}
Loading

0 comments on commit 7b0dc3c

Please sign in to comment.