Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CreateDatabaseIfNotExists method for async client #56

Merged
merged 2 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/badges/branches.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion .github/badges/coverage-summary.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"branches": 90.08620689655173, "coverage": 92.81062776764783}
{"branches": 89.49579831932773, "coverage": 92.6923076923077}
2 changes: 1 addition & 1 deletion .github/badges/jacoco.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@
import com.tigrisdata.db.api.v1.grpc.Api;
import com.tigrisdata.db.api.v1.grpc.TigrisDBGrpc;
import static com.tigrisdata.db.client.ErrorMessages.CREATE_DB_FAILED;
import static com.tigrisdata.db.client.ErrorMessages.DB_ALREADY_EXISTS;
import static com.tigrisdata.db.client.ErrorMessages.DROP_DB_FAILED;
import static com.tigrisdata.db.client.ErrorMessages.LIST_DBS_FAILED;
import static com.tigrisdata.db.client.TypeConverter.toCreateDatabaseRequest;
import static com.tigrisdata.db.client.TypeConverter.toDropDatabaseRequest;
import static com.tigrisdata.db.client.TypeConverter.toListDatabasesRequest;
import com.tigrisdata.db.client.auth.AuthorizationToken;
import com.tigrisdata.db.client.config.TigrisDBConfiguration;
import com.tigrisdata.db.client.error.TigrisDBException;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;

/** Async client for TigrisDB */
public class StandardTigrisDBAsyncClient extends AbstractTigrisDBClient
Expand Down Expand Up @@ -119,22 +125,22 @@ public CompletableFuture<List<TigrisAsyncDatabase>> listDatabases(
}

@Override
public CompletableFuture<TigrisDBResponse> createDatabase(
String databaseName, DatabaseOptions databaseOptions) {
public CompletableFuture<TigrisDBResponse> createDatabaseIfNotExists(String databaseName) {
ListenableFuture<Api.CreateDatabaseResponse> createDatabaseResponse =
stub.createDatabase(toCreateDatabaseRequest(databaseName, databaseOptions));
stub.createDatabase(
toCreateDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return Utilities.transformFuture(
createDatabaseResponse,
apiResponse -> new TigrisDBResponse(apiResponse.getMsg()),
executor,
CREATE_DB_FAILED);
CREATE_DB_FAILED,
Optional.of(CreateDatabaseExceptionHandler.DEFAULT_INSTANCE));
}

@Override
public CompletableFuture<TigrisDBResponse> dropDatabase(
String databaseName, DatabaseOptions databaseOptions) {
public CompletableFuture<TigrisDBResponse> dropDatabase(String databaseName) {
ListenableFuture<Api.DropDatabaseResponse> dropDatabaseResponse =
stub.dropDatabase(toDropDatabaseRequest(databaseName, databaseOptions));
stub.dropDatabase(toDropDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return Utilities.transformFuture(
dropDatabaseResponse,
apiResponse -> new TigrisDBResponse(apiResponse.getMsg()),
Expand All @@ -151,4 +157,28 @@ public void close() {
ManagedChannel getChannel() {
return channel;
}

/**
* This is the exception handler for CreateDatabase operation, here it will swallow the exception
* if the server says database already exists, it will pass the exception further otherwise.
*/
static class CreateDatabaseExceptionHandler
implements BiConsumer<CompletableFuture<TigrisDBResponse>, Throwable> {
static final CreateDatabaseExceptionHandler DEFAULT_INSTANCE =
new CreateDatabaseExceptionHandler();

@Override
public void accept(CompletableFuture<TigrisDBResponse> completableFuture, Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
if (((StatusRuntimeException) throwable).getStatus().getCode()
== Status.ALREADY_EXISTS.getCode()) {
// swallow the already exists exception
completableFuture.complete(new TigrisDBResponse(DB_ALREADY_EXISTS));
return;
}
}
// pass on the error otherwise
completableFuture.completeExceptionally(new TigrisDBException(CREATE_DB_FAILED, throwable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public List<TigrisDatabase> listDatabases(DatabaseOptions listDatabaseOptions)
}

@Override
public TigrisDBResponse createDatabaseIfNotExists(
String databaseName, DatabaseOptions databaseOptions) throws TigrisDBException {
public TigrisDBResponse createDatabaseIfNotExists(String databaseName) throws TigrisDBException {
Api.CreateDatabaseResponse createDatabaseResponse = null;
try {
createDatabaseResponse =
stub.createDatabase(toCreateDatabaseRequest(databaseName, databaseOptions));
stub.createDatabase(
toCreateDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return new TigrisDBResponse(createDatabaseResponse.getMsg());
} catch (StatusRuntimeException statusRuntimeException) {
// ignore the error if the database is already exists
Expand All @@ -104,11 +104,10 @@ public TigrisDBResponse createDatabaseIfNotExists(
}

@Override
public TigrisDBResponse dropDatabase(String databaseName, DatabaseOptions databaseOptions)
throws TigrisDBException {
public TigrisDBResponse dropDatabase(String databaseName) throws TigrisDBException {
try {
Api.DropDatabaseResponse dropDatabaseResponse =
stub.dropDatabase(toDropDatabaseRequest(databaseName, databaseOptions));
stub.dropDatabase(toDropDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return new TigrisDBResponse(dropDatabaseResponse.getMsg());
} catch (StatusRuntimeException statusRuntimeException) {
throw new TigrisDBException(DROP_DB_FAILED, statusRuntimeException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public List<CollectionInfo> listCollections() throws TigrisDBException {
}

@Override
public TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSchemas)
throws TigrisDBException {
public TigrisDBResponse applySchemas(List<URL> collectionsSchemas) throws TigrisDBException {
TransactionSession transactionSession = null;
try {
transactionSession = beginTransaction(new TransactionOptions());
Expand All @@ -76,7 +75,7 @@ public TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSche
transactionSession.createOrUpdateCollection(schema, CollectionOptions.DEFAULT_INSTANCE);
}
transactionSession.commit();
return new TigrisDBResponse("Collections creates successfully");
return new TigrisDBResponse("Collections created successfully");
} catch (Exception exception) {
if (transactionSession != null) {
transactionSession.rollback();
Expand All @@ -86,15 +85,14 @@ public TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSche
}

@Override
public TigrisDBResponse createCollectionsInTransaction(File schemaDirectory)
throws TigrisDBException {
public TigrisDBResponse applySchemas(File schemaDirectory) throws TigrisDBException {
List<URL> schemaURLs = new ArrayList<>();
try {
for (File file :
schemaDirectory.listFiles(file -> file.getName().toLowerCase().endsWith(".json"))) {
schemaURLs.add(file.toURI().toURL());
}
return createCollectionsInTransaction(schemaURLs);
return applySchemas(schemaURLs);
} catch (NullPointerException | MalformedURLException ex) {
throw new TigrisDBException("Failed to process schemaDirectory", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,18 @@ public interface TigrisDBAsyncClient extends Closeable {
CompletableFuture<List<TigrisAsyncDatabase>> listDatabases(DatabaseOptions listDatabaseOptions);

/**
* Creates the database
* Creates the database if not already present
*
* @param databaseName name of the database
* @param databaseOptions options
* @return a future to the {@link TigrisDBResponse}
*/
CompletableFuture<TigrisDBResponse> createDatabase(
String databaseName, DatabaseOptions databaseOptions);
CompletableFuture<TigrisDBResponse> createDatabaseIfNotExists(String databaseName);

/**
* Drops the database
*
* @param databaseName name of the database
* @param databaseOptions options
* @return a future to the {@link TigrisDBResponse}
*/
CompletableFuture<TigrisDBResponse> dropDatabase(
String databaseName, DatabaseOptions databaseOptions);
CompletableFuture<TigrisDBResponse> dropDatabase(String databaseName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,17 @@ public interface TigrisDBClient extends Closeable {
* Creates the database if the database is not already present
*
* @param databaseName name of the database
* @param databaseOptions options
* @return an instance of {@link TigrisDBResponse} from server
* @throws TigrisDBException in case of auth error or any other failure.
*/
TigrisDBResponse createDatabaseIfNotExists(String databaseName, DatabaseOptions databaseOptions)
throws TigrisDBException;
TigrisDBResponse createDatabaseIfNotExists(String databaseName) throws TigrisDBException;

/**
* Drops the database
*
* @param databaseName name of the database
* @param databaseOptions options
* @return an instance of {@link TigrisDBResponse} from server
* @throws TigrisDBException in case of auth error or any other failure.
*/
TigrisDBResponse dropDatabase(String databaseName, DatabaseOptions databaseOptions)
throws TigrisDBException;
TigrisDBResponse dropDatabase(String databaseName) throws TigrisDBException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ public interface TigrisDatabase {
*/
List<CollectionInfo> listCollections() throws TigrisDBException;
/**
* Creates the collection in a transaction
* Creates or updates the collection schemas on the current database/
*
* @param collectionsSchemas list of URL pointing to schema files
* @return response
* @throws TigrisDBException in case of an error.
*/
TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSchemas)
throws TigrisDBException;
TigrisDBResponse applySchemas(List<URL> collectionsSchemas) throws TigrisDBException;

/**
* Reads schema files from a directory and creates the collection in a transaction
* Reads schema files from a directory and Creates or updates the collection schemas on the
* current database.
*
* @param schemaDirectory directory containing schema files
* @return response
* @throws TigrisDBException in case of an error.
*/
TigrisDBResponse createCollectionsInTransaction(File schemaDirectory) throws TigrisDBException;
TigrisDBResponse applySchemas(File schemaDirectory) throws TigrisDBException;
/**
* Drops the collection.
*
Expand Down
36 changes: 30 additions & 6 deletions client/src/main/java/com/tigrisdata/db/client/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
import com.tigrisdata.db.client.error.TigrisDBException;

import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Function;

final class Utilities {
private Utilities() {}

// TODO update this once server sends the message back
public static final String INSERT_SUCCESS_RESPONSE = "inserted";
public static final String DELETE_SUCCESS_RESPONSE = "deleted";
static final String INSERT_SUCCESS_RESPONSE = "inserted";
static final String DELETE_SUCCESS_RESPONSE = "deleted";

/**
* Converts from {@link Iterator} of Type F to {@link Iterator} of type T
Expand All @@ -39,8 +41,7 @@ private Utilities() {}
* @param <T> destination type
* @return an instance of {@link Iterator} of type T
*/
public static <F, T> Iterator<T> transformIterator(
Iterator<F> iterator, Function<F, T> converter) {
static <F, T> Iterator<T> transformIterator(Iterator<F> iterator, Function<F, T> converter) {
return new ConvertedIterator<>(iterator, converter);
}

Expand All @@ -54,11 +55,30 @@ public static <F, T> Iterator<T> transformIterator(
* @param <T> to type
* @return an instance of {@link CompletableFuture}
*/
public static <F, T> CompletableFuture<T> transformFuture(
static <F, T> CompletableFuture<T> transformFuture(
ListenableFuture<F> listenableFuture,
Function<F, T> converter,
Executor executor,
String errorMessage) {
return transformFuture(listenableFuture, converter, executor, errorMessage, Optional.empty());
}
/**
* Converts {@link ListenableFuture} of type F to {@link CompletableFuture} of type T
*
* @param listenableFuture source listenable future
* @param converter function that converts type F to type T
* @param executor executor to run callback that transforms Future when source Future is complete
* @param exceptionHandler handles exception
* @param <F> from type
* @param <T> to type
* @return an instance of {@link CompletableFuture}
*/
static <F, T> CompletableFuture<T> transformFuture(
ListenableFuture<F> listenableFuture,
Function<F, T> converter,
Executor executor,
String errorMessage,
Optional<BiConsumer<CompletableFuture<T>, Throwable>> exceptionHandler) {
CompletableFuture<T> result = new CompletableFuture<>();
Futures.addCallback(
listenableFuture,
Expand All @@ -70,7 +90,11 @@ public void onSuccess(F f) {

@Override
public void onFailure(Throwable throwable) {
result.completeExceptionally(new TigrisDBException(errorMessage, throwable));
if (exceptionHandler.isPresent()) {
exceptionHandler.get().accept(result, throwable);
} else {
result.completeExceptionally(new TigrisDBException(errorMessage, throwable));
}
}
},
executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,26 @@ public void testListDatabases() throws InterruptedException, ExecutionException
@Test
public void testCreateDatabase() throws InterruptedException, ExecutionException {
TigrisDBAsyncClient asyncClient = TestUtils.getTestAsyncClient(SERVER_NAME, grpcCleanup);
CompletableFuture<TigrisDBResponse> response =
asyncClient.createDatabase("db4", DatabaseOptions.DEFAULT_INSTANCE);
CompletableFuture<TigrisDBResponse> response = asyncClient.createDatabaseIfNotExists("db4");
Assert.assertEquals("db4 created", response.get().getMessage());
// 4th db created
Assert.assertEquals(
4, asyncClient.listDatabases(DatabaseOptions.DEFAULT_INSTANCE).get().size());
}

@Test
public void testDropDatabase() throws InterruptedException, ExecutionException {
public void testAlreadyExisingDatabaseCreation() throws InterruptedException, ExecutionException {
TigrisDBAsyncClient asyncClient = TestUtils.getTestAsyncClient(SERVER_NAME, grpcCleanup);
CompletableFuture<TigrisDBResponse> response =
asyncClient.dropDatabase("db2", DatabaseOptions.DEFAULT_INSTANCE);
asyncClient.createDatabaseIfNotExists("pre-existing-db-name");
// no exception is thrown, response with message is served
Assert.assertEquals("Database already exists", response.get().getMessage());
}

@Test
public void testDropDatabase() throws InterruptedException, ExecutionException {
TigrisDBAsyncClient asyncClient = TestUtils.getTestAsyncClient(SERVER_NAME, grpcCleanup);
CompletableFuture<TigrisDBResponse> response = asyncClient.dropDatabase("db2");
Assert.assertEquals("db2 dropped", response.get().getMessage());
// 4th db created
Assert.assertEquals(
Expand Down
Loading