Skip to content

Commit

Permalink
feat: CreateDatabaseIfNotExists method for async client
Browse files Browse the repository at this point in the history
  • Loading branch information
JigarJoshi committed Apr 15, 2022
1 parent 11b8f02 commit f311172
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@
import com.tigrisdata.db.api.v1.grpc.Api;
import com.tigrisdata.db.api.v1.grpc.TigrisDBGrpc;
import static com.tigrisdata.db.client.ErrorMessages.CREATE_DB_FAILED;
import static com.tigrisdata.db.client.ErrorMessages.DB_ALREADY_EXISTS;
import static com.tigrisdata.db.client.ErrorMessages.DROP_DB_FAILED;
import static com.tigrisdata.db.client.ErrorMessages.LIST_DBS_FAILED;
import static com.tigrisdata.db.client.TypeConverter.toCreateDatabaseRequest;
import static com.tigrisdata.db.client.TypeConverter.toDropDatabaseRequest;
import static com.tigrisdata.db.client.TypeConverter.toListDatabasesRequest;
import com.tigrisdata.db.client.auth.AuthorizationToken;
import com.tigrisdata.db.client.config.TigrisDBConfiguration;
import com.tigrisdata.db.client.error.TigrisDBException;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;

/** Async client for TigrisDB */
public class StandardTigrisDBAsyncClient extends AbstractTigrisDBClient
Expand Down Expand Up @@ -119,22 +125,22 @@ public CompletableFuture<List<TigrisAsyncDatabase>> listDatabases(
}

@Override
public CompletableFuture<TigrisDBResponse> createDatabase(
String databaseName, DatabaseOptions databaseOptions) {
public CompletableFuture<TigrisDBResponse> createDatabaseIfNotExists(String databaseName) {
ListenableFuture<Api.CreateDatabaseResponse> createDatabaseResponse =
stub.createDatabase(toCreateDatabaseRequest(databaseName, databaseOptions));
stub.createDatabase(
toCreateDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return Utilities.transformFuture(
createDatabaseResponse,
apiResponse -> new TigrisDBResponse(apiResponse.getMsg()),
executor,
CREATE_DB_FAILED);
CREATE_DB_FAILED,
Optional.of(CreateDatabaseExceptionHandler.DEFAULT_INSTANCE));
}

@Override
public CompletableFuture<TigrisDBResponse> dropDatabase(
String databaseName, DatabaseOptions databaseOptions) {
public CompletableFuture<TigrisDBResponse> dropDatabase(String databaseName) {
ListenableFuture<Api.DropDatabaseResponse> dropDatabaseResponse =
stub.dropDatabase(toDropDatabaseRequest(databaseName, databaseOptions));
stub.dropDatabase(toDropDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return Utilities.transformFuture(
dropDatabaseResponse,
apiResponse -> new TigrisDBResponse(apiResponse.getMsg()),
Expand All @@ -151,4 +157,28 @@ public void close() {
ManagedChannel getChannel() {
return channel;
}

/**
* This is the exception handler for CreateDatabase operation, here it will swallow the exception
* if the server says database already exists, it will pass the exception further otherwise.
*/
static class CreateDatabaseExceptionHandler
implements BiConsumer<CompletableFuture<TigrisDBResponse>, Throwable> {
static final CreateDatabaseExceptionHandler DEFAULT_INSTANCE =
new CreateDatabaseExceptionHandler();

@Override
public void accept(CompletableFuture<TigrisDBResponse> completableFuture, Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
if (((StatusRuntimeException) throwable).getStatus().getCode()
== Status.ALREADY_EXISTS.getCode()) {
// swallow the already exists exception
completableFuture.complete(new TigrisDBResponse(DB_ALREADY_EXISTS));
return;
}
}
// pass on the error otherwise
completableFuture.completeExceptionally(new TigrisDBException(CREATE_DB_FAILED, throwable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public List<TigrisDatabase> listDatabases(DatabaseOptions listDatabaseOptions)
}

@Override
public TigrisDBResponse createDatabaseIfNotExists(
String databaseName, DatabaseOptions databaseOptions) throws TigrisDBException {
public TigrisDBResponse createDatabaseIfNotExists(String databaseName) throws TigrisDBException {
Api.CreateDatabaseResponse createDatabaseResponse = null;
try {
createDatabaseResponse =
stub.createDatabase(toCreateDatabaseRequest(databaseName, databaseOptions));
stub.createDatabase(
toCreateDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return new TigrisDBResponse(createDatabaseResponse.getMsg());
} catch (StatusRuntimeException statusRuntimeException) {
// ignore the error if the database is already exists
Expand All @@ -104,11 +104,10 @@ public TigrisDBResponse createDatabaseIfNotExists(
}

@Override
public TigrisDBResponse dropDatabase(String databaseName, DatabaseOptions databaseOptions)
throws TigrisDBException {
public TigrisDBResponse dropDatabase(String databaseName) throws TigrisDBException {
try {
Api.DropDatabaseResponse dropDatabaseResponse =
stub.dropDatabase(toDropDatabaseRequest(databaseName, databaseOptions));
stub.dropDatabase(toDropDatabaseRequest(databaseName, DatabaseOptions.DEFAULT_INSTANCE));
return new TigrisDBResponse(dropDatabaseResponse.getMsg());
} catch (StatusRuntimeException statusRuntimeException) {
throw new TigrisDBException(DROP_DB_FAILED, statusRuntimeException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public List<CollectionInfo> listCollections() throws TigrisDBException {
}

@Override
public TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSchemas)
throws TigrisDBException {
public TigrisDBResponse applySchemas(List<URL> collectionsSchemas) throws TigrisDBException {
TransactionSession transactionSession = null;
try {
transactionSession = beginTransaction(new TransactionOptions());
Expand All @@ -76,7 +75,7 @@ public TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSche
transactionSession.createOrUpdateCollection(schema, CollectionOptions.DEFAULT_INSTANCE);
}
transactionSession.commit();
return new TigrisDBResponse("Collections creates successfully");
return new TigrisDBResponse("Collections created successfully");
} catch (Exception exception) {
if (transactionSession != null) {
transactionSession.rollback();
Expand All @@ -86,15 +85,14 @@ public TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSche
}

@Override
public TigrisDBResponse createCollectionsInTransaction(File schemaDirectory)
throws TigrisDBException {
public TigrisDBResponse applySchemas(File schemaDirectory) throws TigrisDBException {
List<URL> schemaURLs = new ArrayList<>();
try {
for (File file :
schemaDirectory.listFiles(file -> file.getName().toLowerCase().endsWith(".json"))) {
schemaURLs.add(file.toURI().toURL());
}
return createCollectionsInTransaction(schemaURLs);
return applySchemas(schemaURLs);
} catch (NullPointerException | MalformedURLException ex) {
throw new TigrisDBException("Failed to process schemaDirectory", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,18 @@ public interface TigrisDBAsyncClient extends Closeable {
CompletableFuture<List<TigrisAsyncDatabase>> listDatabases(DatabaseOptions listDatabaseOptions);

/**
* Creates the database
* Creates the database if not already present
*
* @param databaseName name of the database
* @param databaseOptions options
* @return a future to the {@link TigrisDBResponse}
*/
CompletableFuture<TigrisDBResponse> createDatabase(
String databaseName, DatabaseOptions databaseOptions);
CompletableFuture<TigrisDBResponse> createDatabaseIfNotExists(String databaseName);

/**
* Drops the database
*
* @param databaseName name of the database
* @param databaseOptions options
* @return a future to the {@link TigrisDBResponse}
*/
CompletableFuture<TigrisDBResponse> dropDatabase(
String databaseName, DatabaseOptions databaseOptions);
CompletableFuture<TigrisDBResponse> dropDatabase(String databaseName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,17 @@ public interface TigrisDBClient extends Closeable {
* Creates the database if the database is not already present
*
* @param databaseName name of the database
* @param databaseOptions options
* @return an instance of {@link TigrisDBResponse} from server
* @throws TigrisDBException in case of auth error or any other failure.
*/
TigrisDBResponse createDatabaseIfNotExists(String databaseName, DatabaseOptions databaseOptions)
throws TigrisDBException;
TigrisDBResponse createDatabaseIfNotExists(String databaseName) throws TigrisDBException;

/**
* Drops the database
*
* @param databaseName name of the database
* @param databaseOptions options
* @return an instance of {@link TigrisDBResponse} from server
* @throws TigrisDBException in case of auth error or any other failure.
*/
TigrisDBResponse dropDatabase(String databaseName, DatabaseOptions databaseOptions)
throws TigrisDBException;
TigrisDBResponse dropDatabase(String databaseName) throws TigrisDBException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ public interface TigrisDatabase {
*/
List<CollectionInfo> listCollections() throws TigrisDBException;
/**
* Creates the collection in a transaction
* Creates or updates the collection schemas on the current database/
*
* @param collectionsSchemas list of URL pointing to schema files
* @return response
* @throws TigrisDBException in case of an error.
*/
TigrisDBResponse createCollectionsInTransaction(List<URL> collectionsSchemas)
throws TigrisDBException;
TigrisDBResponse applySchemas(List<URL> collectionsSchemas) throws TigrisDBException;

/**
* Reads schema files from a directory and creates the collection in a transaction
* Reads schema files from a directory and Creates or updates the collection schemas on the
* current database.
*
* @param schemaDirectory directory containing schema files
* @return response
* @throws TigrisDBException in case of an error.
*/
TigrisDBResponse createCollectionsInTransaction(File schemaDirectory) throws TigrisDBException;
TigrisDBResponse applySchemas(File schemaDirectory) throws TigrisDBException;
/**
* Drops the collection.
*
Expand Down
36 changes: 30 additions & 6 deletions client/src/main/java/com/tigrisdata/db/client/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
import com.tigrisdata.db.client.error.TigrisDBException;

import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Function;

final class Utilities {
private Utilities() {}

// TODO update this once server sends the message back
public static final String INSERT_SUCCESS_RESPONSE = "inserted";
public static final String DELETE_SUCCESS_RESPONSE = "deleted";
static final String INSERT_SUCCESS_RESPONSE = "inserted";
static final String DELETE_SUCCESS_RESPONSE = "deleted";

/**
* Converts from {@link Iterator} of Type F to {@link Iterator} of type T
Expand All @@ -39,8 +41,7 @@ private Utilities() {}
* @param <T> destination type
* @return an instance of {@link Iterator} of type T
*/
public static <F, T> Iterator<T> transformIterator(
Iterator<F> iterator, Function<F, T> converter) {
static <F, T> Iterator<T> transformIterator(Iterator<F> iterator, Function<F, T> converter) {
return new ConvertedIterator<>(iterator, converter);
}

Expand All @@ -54,11 +55,30 @@ public static <F, T> Iterator<T> transformIterator(
* @param <T> to type
* @return an instance of {@link CompletableFuture}
*/
public static <F, T> CompletableFuture<T> transformFuture(
static <F, T> CompletableFuture<T> transformFuture(
ListenableFuture<F> listenableFuture,
Function<F, T> converter,
Executor executor,
String errorMessage) {
return transformFuture(listenableFuture, converter, executor, errorMessage, Optional.empty());
}
/**
* Converts {@link ListenableFuture} of type F to {@link CompletableFuture} of type T
*
* @param listenableFuture source listenable future
* @param converter function that converts type F to type T
* @param executor executor to run callback that transforms Future when source Future is complete
* @param exceptionHandler handles exception
* @param <F> from type
* @param <T> to type
* @return an instance of {@link CompletableFuture}
*/
static <F, T> CompletableFuture<T> transformFuture(
ListenableFuture<F> listenableFuture,
Function<F, T> converter,
Executor executor,
String errorMessage,
Optional<BiConsumer<CompletableFuture<T>, Throwable>> exceptionHandler) {
CompletableFuture<T> result = new CompletableFuture<>();
Futures.addCallback(
listenableFuture,
Expand All @@ -70,7 +90,11 @@ public void onSuccess(F f) {

@Override
public void onFailure(Throwable throwable) {
result.completeExceptionally(new TigrisDBException(errorMessage, throwable));
if (exceptionHandler.isPresent()) {
exceptionHandler.get().accept(result, throwable);
} else {
result.completeExceptionally(new TigrisDBException(errorMessage, throwable));
}
}
},
executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,26 @@ public void testListDatabases() throws InterruptedException, ExecutionException
@Test
public void testCreateDatabase() throws InterruptedException, ExecutionException {
TigrisDBAsyncClient asyncClient = TestUtils.getTestAsyncClient(SERVER_NAME, grpcCleanup);
CompletableFuture<TigrisDBResponse> response =
asyncClient.createDatabase("db4", DatabaseOptions.DEFAULT_INSTANCE);
CompletableFuture<TigrisDBResponse> response = asyncClient.createDatabaseIfNotExists("db4");
Assert.assertEquals("db4 created", response.get().getMessage());
// 4th db created
Assert.assertEquals(
4, asyncClient.listDatabases(DatabaseOptions.DEFAULT_INSTANCE).get().size());
}

@Test
public void testDropDatabase() throws InterruptedException, ExecutionException {
public void testAlreadyExisingDatabaseCreation() throws InterruptedException, ExecutionException {
TigrisDBAsyncClient asyncClient = TestUtils.getTestAsyncClient(SERVER_NAME, grpcCleanup);
CompletableFuture<TigrisDBResponse> response =
asyncClient.dropDatabase("db2", DatabaseOptions.DEFAULT_INSTANCE);
asyncClient.createDatabaseIfNotExists("pre-existing-db-name");
// no exception is thrown, response with message is served
Assert.assertEquals("Database already exists", response.get().getMessage());
}

@Test
public void testDropDatabase() throws InterruptedException, ExecutionException {
TigrisDBAsyncClient asyncClient = TestUtils.getTestAsyncClient(SERVER_NAME, grpcCleanup);
CompletableFuture<TigrisDBResponse> response = asyncClient.dropDatabase("db2");
Assert.assertEquals("db2 dropped", response.get().getMessage());
// 4th db created
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testCreateDatabase() {
String dbName = UUID.randomUUID().toString();
TigrisDBClient client = TestUtils.getTestClient(SERVER_NAME, grpcCleanup);
try {
client.createDatabaseIfNotExists(dbName, DatabaseOptions.DEFAULT_INSTANCE);
client.createDatabaseIfNotExists(dbName);
Assert.fail("This must fail");
} catch (TigrisDBException tigrisDBException) {
Assert.assertEquals(
Expand All @@ -73,7 +73,7 @@ public void testDropDatabase() {
String dbName = UUID.randomUUID().toString();
TigrisDBClient client = TestUtils.getTestClient(SERVER_NAME, grpcCleanup);
try {
client.dropDatabase(dbName, DatabaseOptions.DEFAULT_INSTANCE);
client.dropDatabase(dbName);
Assert.fail("This must fail");
} catch (TigrisDBException tigrisDBException) {
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public void testListDatabases() throws TigrisDBException {
@Test
public void testCreateDatabase() throws TigrisDBException {
TigrisDBClient client = TestUtils.getTestClient(SERVER_NAME, grpcCleanup);
TigrisDBResponse response =
client.createDatabaseIfNotExists("db4", DatabaseOptions.DEFAULT_INSTANCE);
TigrisDBResponse response = client.createDatabaseIfNotExists("db4");
Assert.assertEquals("db4 created", response.getMessage());
// 4th db created
Assert.assertEquals(4, client.listDatabases(DatabaseOptions.DEFAULT_INSTANCE).size());
Expand All @@ -93,7 +92,7 @@ public void testCreateDatabase() throws TigrisDBException {
@Test
public void testDropDatabase() throws TigrisDBException {
TigrisDBClient client = TestUtils.getTestClient(SERVER_NAME, grpcCleanup);
TigrisDBResponse response = client.dropDatabase("db2", DatabaseOptions.DEFAULT_INSTANCE);
TigrisDBResponse response = client.dropDatabase("db2");
Assert.assertEquals("db2 dropped", response.getMessage());
// 4th db created
Assert.assertEquals(2, client.listDatabases(DatabaseOptions.DEFAULT_INSTANCE).size());
Expand Down
Loading

0 comments on commit f311172

Please sign in to comment.