Skip to content

Commit

Permalink
feat: Async search client (#103)
Browse files Browse the repository at this point in the history
* refactor: User friendly Search request builder

* docs: included search example in README

* feat: Async search client
  • Loading branch information
adilansari authored Jun 27, 2022
1 parent 80bb473 commit 723dc2a
Show file tree
Hide file tree
Showing 19 changed files with 440 additions and 56 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ users.delete(
Filters.eq("id", 2)
)
);

// search - search for users with name "Jania"
users.search(
SearchRequest.newBuilder("Jania")
.withSearchFields("name")
.build()
);
```

# License
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@
*/
package com.tigrisdata.db.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.tigrisdata.db.api.v1.grpc.Api;
import com.tigrisdata.db.api.v1.grpc.TigrisGrpc;
import static com.tigrisdata.db.client.Constants.DELETE_FAILED;
import static com.tigrisdata.db.client.Constants.DESCRIBE_COLLECTION_FAILED;
import static com.tigrisdata.db.client.Constants.INSERT_FAILED;
import static com.tigrisdata.db.client.Constants.INSERT_OR_REPLACE_FAILED;
import static com.tigrisdata.db.client.Constants.JSON_SER_DE_ERROR;
import static com.tigrisdata.db.client.Constants.READ_FAILED;
import static com.tigrisdata.db.client.Constants.SEARCH_FAILED;
import static com.tigrisdata.db.client.Constants.UPDATE_FAILED;
import static com.tigrisdata.db.client.TypeConverter.extractTigrisError;
import static com.tigrisdata.db.client.TypeConverter.readOneDefaultReadRequestOptions;
Expand All @@ -33,13 +29,23 @@
import static com.tigrisdata.db.client.TypeConverter.toInsertRequest;
import static com.tigrisdata.db.client.TypeConverter.toReadRequest;
import static com.tigrisdata.db.client.TypeConverter.toReplaceRequest;
import static com.tigrisdata.db.client.TypeConverter.toSearchRequest;
import static com.tigrisdata.db.client.TypeConverter.toUpdateRequest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.tigrisdata.db.api.v1.grpc.Api;
import com.tigrisdata.db.api.v1.grpc.Api.SearchResponse;
import com.tigrisdata.db.api.v1.grpc.TigrisGrpc;
import com.tigrisdata.db.client.error.TigrisException;
import com.tigrisdata.db.client.search.SearchRequest;
import com.tigrisdata.db.client.search.SearchRequestOptions;
import com.tigrisdata.db.client.search.SearchResult;
import com.tigrisdata.db.type.TigrisCollectionType;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -55,6 +61,7 @@
*/
class StandardTigrisAsyncCollection<T extends TigrisCollectionType>
extends AbstractTigrisCollection<T> implements TigrisAsyncCollection<T> {

private final Executor executor;
private final TigrisGrpc.TigrisStub stub;
private final TigrisGrpc.TigrisFutureStub futureStub;
Expand Down Expand Up @@ -114,6 +121,21 @@ public CompletableFuture<Optional<T>> readOne(TigrisFilter filter) {
return completableFuture;
}

@Override
public void search(
SearchRequest request, SearchRequestOptions options, TigrisAsyncSearchReader<T> reader) {
Api.SearchRequest searchRequest =
toSearchRequest(databaseName, collectionName, request, objectMapper);
stub.search(
searchRequest,
new SearchResponseObserverAdapter<>(reader, collectionTypeClass, objectMapper));
}

@Override
public void search(SearchRequest request, TigrisAsyncSearchReader<T> reader) {
this.search(request, SearchRequestOptions.getDefault(), reader);
}

@Override
public CompletableFuture<InsertResponse<T>> insert(
List<T> documents, InsertRequestOptions insertRequestOptions) throws TigrisException {
Expand Down Expand Up @@ -364,6 +386,7 @@ public DeleteResponse delete(TransactionSession session, TigrisFilter filter)

static class ReadManyResponseObserverAdapter<T extends TigrisCollectionType>
implements StreamObserver<Api.ReadResponse> {

private final TigrisAsyncReader<T> reader;
private final Class<T> collectionTypeClass;
private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -409,6 +432,7 @@ public void onCompleted() {

static class ReadSingleResponseObserverAdapter<T extends TigrisCollectionType>
implements StreamObserver<Api.ReadResponse> {

private final CompletableFuture<Optional<T>> completableFuture;
private final Class<T> collectionTypeClass;
private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -451,4 +475,43 @@ public void onCompleted() {
// no op
}
}

static class SearchResponseObserverAdapter<T extends TigrisCollectionType>
implements StreamObserver<Api.SearchResponse> {

private final TigrisAsyncSearchReader<T> reader;
private final Class<T> collectionTypeClass;
private final ObjectMapper objectMapper;

public SearchResponseObserverAdapter(
TigrisAsyncSearchReader<T> reader,
Class<T> collectionTypeClass,
ObjectMapper objectMapper) {
this.reader = reader;
this.collectionTypeClass = collectionTypeClass;
this.objectMapper = objectMapper;
}

@Override
public void onNext(SearchResponse response) {
SearchResult<T> result = SearchResult.from(response, objectMapper, collectionTypeClass);
reader.onNext(result);
}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
reader.onError(
new TigrisException(
SEARCH_FAILED, extractTigrisError((StatusRuntimeException) throwable), throwable));
} else {
reader.onError(new TigrisException(SEARCH_FAILED, throwable));
}
}

@Override
public void onCompleted() {
reader.onCompleted();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2022 Tigris Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.tigrisdata.db.client;

/**
* Asynchronous callback to receive messages from server
*
* @param <T> type of message
*/
public interface TigrisAsyncCallback<T> {

/**
* Receives a message from the stream.
*
* <p>Can be called many times but is never called after {@link #onError(Throwable)} or {@link
* #onCompleted()} are called.
*
* <p>If an exception is thrown by an implementation the caller is expected to terminate the
* stream by calling {@link #onError(Throwable)} with the caught exception prior to propagating
* it.
*
* @param message the value passed from the server
*/
void onNext(T message);

/**
* Receives terminating error from the stream.
*
* @param t captures the error
*/
void onError(Throwable t);

/**
* Receives a notification of successful stream completion.
*
* <p>May only be called once and if called it must be the last method called. In particular if an
* exception is thrown by an implementation of {@code onCompleted} no further calls to any method
* are allowed.
*/
void onCompleted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package com.tigrisdata.db.client;

import com.tigrisdata.db.client.error.TigrisException;
import com.tigrisdata.db.client.search.SearchRequest;
import com.tigrisdata.db.client.search.SearchRequestOptions;
import com.tigrisdata.db.type.TigrisCollectionType;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -66,6 +67,32 @@ void read(
*/
CompletableFuture<Optional<T>> readOne(TigrisFilter filter);

/**
* Search for documents in a collection. Easily perform sophisticated queries and refine results
* using filters with advanced features like faceting and ordering.
*
* <p>Note: Searching is expensive. If using as a primary key based lookup, use {@code read()}
* instead
*
* @param request search request to execute
* @param options search pagination options
* @param reader reader callback
* @see #search(SearchRequest, TigrisAsyncSearchReader)
*/
void search(
SearchRequest request, SearchRequestOptions options, TigrisAsyncSearchReader<T> reader);

/**
* Search for documents in a collection.
*
* <p>Wrapper around {@link #search(SearchRequest, SearchRequestOptions, TigrisAsyncSearchReader)}
* with default pagination options
*
* @param request search request to execute
* @param reader reader callback
*/
void search(SearchRequest request, TigrisAsyncSearchReader<T> reader);

/**
* Inserts documents into collection
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,8 @@
import com.tigrisdata.db.type.TigrisCollectionType;

/**
* A callback that reads the documents
* Asynchronous callback to receive collection documents from server
*
* @param <T> Collection type
*/
public interface TigrisAsyncReader<T extends TigrisCollectionType> {

/** @param document next document */
void onNext(T document);

/** @param t captures the error */
void onError(Throwable t);

/** Gets invoked when read is completed */
void onCompleted();
}
public interface TigrisAsyncReader<T extends TigrisCollectionType> extends TigrisAsyncCallback<T> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Tigris Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.tigrisdata.db.client;

import com.tigrisdata.db.client.search.SearchResult;
import com.tigrisdata.db.type.TigrisCollectionType;

/**
* Asynchronous callback to receive {@link SearchResult} from server
*
* @param <T> Collection type
*/
public interface TigrisAsyncSearchReader<T extends TigrisCollectionType>
extends TigrisAsyncCallback<SearchResult<T>> {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,5 @@
*/
package com.tigrisdata.db.client;

/** A callback that reads the events */
public interface TigrisAsyncStreamer {

/** @param event next event */
void onNext(StreamEvent event);

/** @param t captures the error */
void onError(Throwable t);

/** Gets invoked when read is completed */
void onCompleted();
}
/** Asynchronous callback to receive events */
public interface TigrisAsyncStreamer extends TigrisAsyncCallback<StreamEvent> {}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Iterator<T> read(TigrisFilter filter, ReadRequestOptions readRequestOptions)
* @param options search pagination options
* @return stream of search results
* @throws TigrisException in case of error
* @see #search(SearchRequest)
*/
Iterator<SearchResult<T>> search(SearchRequest request, SearchRequestOptions options)
throws TigrisException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -36,7 +37,7 @@ private FacetFieldsQuery(Builder builder) {
}

/**
* Represents empty facet query, server will not return any facet results
* Represents empty facet query, server will not return any facets
*
* @return an empty {@link FacetFieldsQuery}
*/
Expand Down Expand Up @@ -119,7 +120,7 @@ public static final class Builder {
private Builder() {}

/**
* Sets a collection field name to include facet results in search response
* Sets a collection field name to include facets in search results
*
* <p>Uses default {@link FacetQueryOptions} options
*
Expand All @@ -134,6 +135,22 @@ public Builder withField(String field) {
return this;
}

/**
* Sets collection field names to include in facets in search results
*
* <p>Uses default {@link FacetQueryOptions} options
*
* @param fields list of field names
* @return {@link FacetFieldsQuery.Builder}
*/
public Builder withFields(Collection<String> fields) {
if (fieldMap == null) {
fieldMap = new HashMap<>();
}
fields.forEach(f -> fieldMap.put(f, FacetQueryOptions.getDefault()));
return this;
}

/**
* Sets a collection field name and query options to include facet results in search response
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ private Builder(String queryString) {
* Constructs a {@link QueryString}
*
* @return {@link QueryString}
* @throws NullPointerException if query string is null
* @throws IllegalArgumentException if query string is null
* @see #getMatchAllQuery()
*/
public QueryString build() {
Objects.requireNonNull(this.q);
if (this.q == null) {
throw new IllegalArgumentException("Query cannot be null");
}
return new QueryString(this.q);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Builder withField(String field) {
* @param fields list of schema field names as string
* @return {@link SearchFields.Builder}
*/
public Builder withFields(Collection<? extends String> fields) {
public Builder withFields(Collection<String> fields) {
if (this.fields == null) {
this.fields = new ArrayList<>();
}
Expand Down
Loading

0 comments on commit 723dc2a

Please sign in to comment.