Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: renamed stream to events and made it collection specific #104

Merged
merged 2 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/src/main/java/com/tigrisdata/db/client/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ private Constants() {}
public static final String DELETE_FAILED = "Failed to delete";
public static final String READ_FAILED = "Failed to read";
public static final String SEARCH_FAILED = "Failed to search";
public static final String STREAM_FAILED = "Failed to stream events";
public static final String STREAM_CONVERT_FAILED = "Failed to convert event data";
public static final String EVENTS_FAILED = "Failed to stream events";
public static final String EVENTS_CONVERT_FAILED = "Failed to convert event data";
public static final String DESCRIBE_COLLECTION_FAILED = "Failed to describe collection";

// JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static com.tigrisdata.db.client.Constants.JSON_SER_DE_ERROR;
import static com.tigrisdata.db.client.Constants.READ_FAILED;
import static com.tigrisdata.db.client.Constants.SEARCH_FAILED;
import static com.tigrisdata.db.client.Constants.EVENTS_CONVERT_FAILED;
import static com.tigrisdata.db.client.Constants.EVENTS_FAILED;
import static com.tigrisdata.db.client.Constants.UPDATE_FAILED;
import static com.tigrisdata.db.client.TypeConverter.extractTigrisError;
import static com.tigrisdata.db.client.TypeConverter.readOneDefaultReadRequestOptions;
Expand All @@ -46,6 +48,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -285,6 +288,42 @@ public CompletableFuture<CollectionDescription> describe(CollectionOptions colle
DESCRIBE_COLLECTION_FAILED);
}

@Override
public void events(TigrisAsyncEventer eventer) {
Api.EventsRequest streamRequest =
Api.EventsRequest.newBuilder().setDb(databaseName).setCollection(collectionName).build();
stub.events(
streamRequest,
new StreamObserver<Api.EventsResponse>() {
@Override
public void onNext(Api.EventsResponse streamResponse) {
try {
eventer.onNext(StreamEvent.from(streamResponse.getEvent(), objectMapper));
} catch (IOException e) {
eventer.onError(new TigrisException(EVENTS_CONVERT_FAILED, e));
}
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
eventer.onError(
new TigrisException(
EVENTS_FAILED,
TypeConverter.extractTigrisError((StatusRuntimeException) throwable),
throwable));
} else {
eventer.onError(new TigrisException(EVENTS_FAILED, throwable));
}
}

@Override
public void onCompleted() {
eventer.onCompleted();
}
});
}

@Override
public String name() {
return collectionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,14 @@
import static com.tigrisdata.db.client.Constants.DESCRIBE_DB_FAILED;
import static com.tigrisdata.db.client.Constants.DROP_COLLECTION_FAILED;
import static com.tigrisdata.db.client.Constants.LIST_COLLECTION_FAILED;
import static com.tigrisdata.db.client.Constants.STREAM_CONVERT_FAILED;
import static com.tigrisdata.db.client.Constants.STREAM_FAILED;
import static com.tigrisdata.db.client.TypeConverter.toBeginTransactionRequest;
import static com.tigrisdata.db.client.TypeConverter.toDatabaseDescription;
import static com.tigrisdata.db.client.TypeConverter.toDropCollectionRequest;
import com.tigrisdata.db.client.error.TigrisException;
import com.tigrisdata.db.type.TigrisCollectionType;
import com.tigrisdata.tools.schema.core.ModelToJsonSchema;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -178,41 +173,6 @@ public CompletableFuture<DatabaseDescription> describe() throws TigrisException
DESCRIBE_DB_FAILED);
}

@Override
public void stream(TigrisAsyncStreamer streamer) {
Api.EventsRequest streamRequest = Api.EventsRequest.newBuilder().setDb(db).build();
stub.events(
streamRequest,
new StreamObserver<Api.EventsResponse>() {
@Override
public void onNext(Api.EventsResponse streamResponse) {
try {
streamer.onNext(StreamEvent.from(streamResponse.getEvent(), objectMapper));
} catch (IOException e) {
streamer.onError(new TigrisException(STREAM_CONVERT_FAILED, e));
}
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
streamer.onError(
new TigrisException(
STREAM_FAILED,
TypeConverter.extractTigrisError((StatusRuntimeException) throwable),
throwable));
} else {
streamer.onError(new TigrisException(STREAM_FAILED, throwable));
}
}

@Override
public void onCompleted() {
streamer.onCompleted();
}
});
}

@Override
public String name() {
return db;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.tigrisdata.db.client.Constants.DESCRIBE_COLLECTION_FAILED;
import static com.tigrisdata.db.client.Constants.READ_FAILED;
import static com.tigrisdata.db.client.Constants.SEARCH_FAILED;
import static com.tigrisdata.db.client.Constants.EVENTS_FAILED;
import static com.tigrisdata.db.client.TypeConverter.toCollectionDescription;
import static com.tigrisdata.db.client.TypeConverter.toCollectionOptions;
import com.tigrisdata.db.client.error.TigrisException;
Expand All @@ -29,6 +30,7 @@
import com.tigrisdata.db.type.TigrisCollectionType;
import io.grpc.StatusRuntimeException;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -279,6 +281,29 @@ public CollectionDescription describe(CollectionOptions options) throws TigrisEx
}
}

@Override
public Iterator<StreamEvent> events() throws TigrisException {
try {
Api.EventsRequest streamRequest =
Api.EventsRequest.newBuilder().setDb(databaseName).setCollection(collectionName).build();
Iterator<Api.EventsResponse> streamResponseIterator = blockingStub.events(streamRequest);
Function<Api.EventsResponse, StreamEvent> converter =
streamResponse -> {
try {
return StreamEvent.from(streamResponse.getEvent(), objectMapper);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to convert event data to JSON", e);
}
};
return Utilities.transformIterator(streamResponseIterator, converter);
} catch (StatusRuntimeException statusRuntimeException) {
throw new TigrisException(
EVENTS_FAILED,
TypeConverter.extractTigrisError(statusRuntimeException),
statusRuntimeException);
}
}

@Override
public String name() {
return collectionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@
import static com.tigrisdata.db.client.Constants.DESCRIBE_DB_FAILED;
import static com.tigrisdata.db.client.Constants.DROP_COLLECTION_FAILED;
import static com.tigrisdata.db.client.Constants.LIST_COLLECTION_FAILED;
import static com.tigrisdata.db.client.Constants.STREAM_FAILED;
import static com.tigrisdata.db.client.Constants.TRANSACTION_FAILED;
import com.tigrisdata.db.client.error.TigrisException;
import com.tigrisdata.db.type.TigrisCollectionType;
import com.tigrisdata.tools.schema.core.ModelToJsonSchema;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -176,28 +172,6 @@ public DatabaseDescription describe() throws TigrisException {
}
}

@Override
public Iterator<StreamEvent> stream() throws TigrisException {
try {
Api.EventsRequest streamRequest = Api.EventsRequest.newBuilder().setDb(db).build();
Iterator<Api.EventsResponse> streamResponseIterator = blockingStub.events(streamRequest);
Function<Api.EventsResponse, StreamEvent> converter =
streamResponse -> {
try {
return StreamEvent.from(streamResponse.getEvent(), objectMapper);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to convert event data to JSON", e);
}
};
return Utilities.transformIterator(streamResponseIterator, converter);
} catch (StatusRuntimeException statusRuntimeException) {
throw new TigrisException(
STREAM_FAILED,
TypeConverter.extractTigrisError(statusRuntimeException),
statusRuntimeException);
}
}

@Override
public String name() {
return db;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ CompletableFuture<DeleteResponse> delete(
CompletableFuture<CollectionDescription> describe(CollectionOptions collectionOptions)
throws TigrisException;

/** @param eventer streamer callback */
void events(TigrisAsyncEventer eventer);

/** @return Name of the collection */
String name();
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ <T extends TigrisCollectionType> CompletableFuture<DropCollectionResponse> dropC
<C extends TigrisCollectionType> TigrisAsyncCollection<C> getCollection(
Class<C> collectionTypeClass);

/** @param streamer streamer callback */
void stream(TigrisAsyncStreamer streamer);

/**
* Begins the transaction on current database
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,14 @@
package com.tigrisdata.db.client;

/** Asynchronous callback to receive events */
public interface TigrisAsyncStreamer extends TigrisAsyncCallback<StreamEvent> {}
public interface TigrisAsyncEventer extends TigrisAsyncCallback<StreamEvent> {

/** @param event next event */
void onNext(StreamEvent event);

/** @param t captures the error */
void onError(Throwable t);

/** Gets invoked when read is completed */
void onCompleted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ DeleteResponse delete(TigrisFilter filter, DeleteRequestOptions deleteRequestOpt
*/
CollectionDescription describe(CollectionOptions collectionOptions) throws TigrisException;

/**
* @return stream of events.
* @throws TigrisException in case of an error.
*/
Iterator<StreamEvent> events() throws TigrisException;

/** @return Name of the collection */
String name();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.tigrisdata.db.client.error.TigrisException;
import com.tigrisdata.db.type.TigrisCollectionType;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
Expand Down Expand Up @@ -92,12 +91,6 @@ <T extends TigrisCollectionType> DropCollectionResponse dropCollection(Class<T>
*/
DatabaseDescription describe() throws TigrisException;

/**
* @return stream of events.
* @throws TigrisException in case of an error.
*/
Iterator<StreamEvent> stream() throws TigrisException;

/** @return name of the current database */
String name();

Expand Down