Skip to content

Commit

Permalink
Support Stream as return type to datastore queries (#551)
Browse files Browse the repository at this point in the history
fixes #452.
Fixed code in GqlDatastoreQuery and PartTreeDatastoreQuery to support for Stream as a return type and added corresponding unit tests and integration tests.
  • Loading branch information
zhumin8 authored Jul 28, 2021
1 parent 32f4731 commit b90ed85
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/src/main/asciidoc/datastore.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ In addition to retrieving entities by their IDs, you can also submit queries.
----

These methods, respectively, allow querying for:

* entities mapped by a given entity class using all the same mapping and converting features
* arbitrary types produced by a given mapping function
* only the Cloud Datastore keys of the entities found by the query
Expand Down Expand Up @@ -949,6 +950,8 @@ public interface TradeRepository extends DatastoreRepository<Trade, String[]> {
Slice<TestEntity> findBySymbol(String symbol, Pageable pageable);
List<TestEntity> findBySymbol(String symbol, Sort sort);
Stream<TestEntity> findBySymbol(String symbol);
}
----

Expand Down Expand Up @@ -1039,6 +1042,9 @@ public interface TraderRepository extends DatastoreRepository<Trader, String> {
@Query("SELECT * FROM traders WHERE name = @trader_name")
List<Trader> tradersByName(@Param("trader_name") String traderName);
@Query("SELECT * FROM traders WHERE name = @trader_name")
Stream<Trader> tradersStreamByName(@Param("trader_name") String traderName);
@Query("SELECT * FROM test_entities_ci WHERE name = @trader_name")
TestEntity getOneTestEntity(@Param("trader_name") String traderName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Object execute(Object[] parameters) {
if (isPageQuery() || isSliceQuery()) {
result = buildPageOrSlice(parameters, parsedQueryWithTagsAndValues, found);
}
else if (this.queryMethod.isCollectionQuery()) {
else if (this.queryMethod.isCollectionQuery() || this.queryMethod.isStreamQuery()) {
result = convertCollectionResult(returnedItemType, found);
}
else {
Expand Down Expand Up @@ -199,6 +199,9 @@ private Page buildPage(Pageable pageableParam, ParsedQueryWithTagsAndValues pars
}

private Object convertCollectionResult(Class returnedItemType, Iterable rawResult) {
if (this.queryMethod.isStreamQuery()) {
return StreamSupport.stream(rawResult.spliterator(), false);
}
Object result = this.datastoreOperations.getDatastoreEntityConverter()
.getConversions().convertOnRead(
rawResult, this.queryMethod.getCollectionReturnType(), returnedItemType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,19 @@ public Object execute(Object[] parameters) {
}

private Object runQuery(Object[] parameters, Class returnedElementType, Class<?> collectionType, boolean requiresCount) {
ExecutionOptions options = new ExecutionOptions(returnedElementType, collectionType, requiresCount);
ExecutionOptions options = new ExecutionOptions(returnedElementType, collectionType, requiresCount,
getQueryMethod().isStreamQuery());

DatastoreResultsIterable rawResults = getDatastoreOperations()
.queryKeysOrEntities(
applyQueryBody(parameters, options.getQueryBuilder(),
requiresCount, options.isSingularResult(), null),
this.entityType);

if (getQueryMethod().isStreamQuery()) {
return StreamSupport.stream(rawResults.spliterator(), false);
}

Object result = StreamSupport.stream(rawResults.spliterator(), false)
.map(options.isReturnedTypeIsNumber() ? Function.identity() : this::processRawObjectForProjection)
.collect(options.getResultsCollector());
Expand Down Expand Up @@ -422,7 +427,7 @@ private class ExecutionOptions {

private boolean singularResult;

ExecutionOptions(Class returnedElementType, Class<?> collectionType, boolean requiresCount) {
ExecutionOptions(Class returnedElementType, Class<?> collectionType, boolean requiresCount, boolean isStreamQuery) {

returnedTypeIsNumber = Number.class.isAssignableFrom(returnedElementType)
|| returnedElementType == int.class || returnedElementType == long.class;
Expand All @@ -439,7 +444,13 @@ private class ExecutionOptions {

structuredQueryBuilder.setKind(PartTreeDatastoreQuery.this.datastorePersistentEntity.kindName());

singularResult = (!isCountingQuery && collectionType == null) && !PartTreeDatastoreQuery.this.tree.isDelete();
if (isCountingQuery || collectionType != null || isStreamQuery
|| PartTreeDatastoreQuery.this.tree.isDelete()) {
singularResult = false;
}
else {
singularResult = true;
}
}

boolean isReturnedTypeIsNumber() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.google.cloud.datastore.Blob;
import com.google.cloud.datastore.DatastoreException;
Expand Down Expand Up @@ -1037,6 +1038,20 @@ public void newFieldTest() {
assertThat(companyWithBooleanPrimitive.name).isEqualTo(company.name);
assertThat(companyWithBooleanPrimitive.active).isFalse();
}

@Test
public void returnStreamPartTreeTest() {
this.testEntityRepository.saveAll(this.allTestEntities);
Stream<TestEntity> resultStream = this.testEntityRepository.findPartTreeStreamByColor("red");
assertThat(resultStream).hasSize(3).contains(testEntityA, testEntityC, testEntityD);
}

@Test
public void returnStreamGqlTest() {
this.testEntityRepository.saveAll(this.allTestEntities);
Stream<TestEntity> resultStream = this.testEntityRepository.findGqlStreamByColor("red");
assertThat(resultStream).hasSize(3).contains(testEntityA, testEntityC, testEntityD);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -134,6 +135,11 @@ public interface TestEntityRepository extends DatastoreRepository<TestEntity, Lo

Optional<TestEntity> findFirstByColor(String color);

Stream<TestEntity> findPartTreeStreamByColor(String color);

@Query("select * from test_entities_ci where color = @color")
Stream<TestEntity> findGqlStreamByColor(@Param("color") String color);

@Nullable
TestEntity getByColor(String color);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import com.google.cloud.datastore.Cursor;
import com.google.cloud.datastore.DoubleValue;
Expand Down Expand Up @@ -447,6 +448,35 @@ public void pageableTestPageCursor() {
.queryKeysOrEntities(any(), eq(Trade.class));
}

@Test
public void streamResultTest() {
Mockito.<Class>when(this.queryMethod.getReturnedObjectType()).thenReturn(Trade.class);
Parameters parameters = mock(Parameters.class);
when(this.queryMethod.getParameters()).thenReturn(parameters);
when(parameters.getNumberOfParameters()).thenReturn(0);
when(this.queryMethod.isStreamQuery()).thenReturn(true);

Trade tradeA = new Trade();
tradeA.id = "a";
Trade tradeB = new Trade();
tradeB.id = "b";
doAnswer(invocation -> {
GqlQuery statement = invocation.getArgument(0);
assertThat(statement.getQueryString()).isEqualTo("unusedGqlString");

Cursor cursor = Cursor.copyFrom("abc".getBytes());
DatastoreResultsIterable datastoreResultsIterable = new DatastoreResultsIterable(
Arrays.asList(tradeA, tradeB), cursor);
return datastoreResultsIterable;
}).when(this.datastoreTemplate).queryKeysOrEntities(any(), eq(Trade.class));

GqlDatastoreQuery gqlDatastoreQuery = createQuery("unusedGqlString", false, false);

Object result = gqlDatastoreQuery.execute(new Parameters[0]);
assertThat(result).isInstanceOf(Stream.class);
assertThat((Stream) result).hasSize(2).containsExactly(tradeA, tradeB);
}

private Parameters buildParameters(Object[] params, String[] paramNames) {
Parameters parameters = mock(Parameters.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

import com.google.cloud.datastore.Cursor;
import com.google.cloud.datastore.EntityQuery;
Expand Down Expand Up @@ -846,6 +847,21 @@ public void nonCollectionReturnTypeNoResultsOptional() throws NoSuchMethodExcept
assertThat((Optional) this.partTreeDatastoreQuery.execute(params)).isNotPresent();
}

@Test
public void streamResultTest() throws NoSuchMethodException {
Trade tradeA = new Trade();
tradeA.id = "a";
Trade tradeB = new Trade();
tradeB.id = "b";
queryWithMockResult("findStreamByAction", Arrays.asList(tradeA, tradeB),
getClass().getMethod("findStreamByAction", String.class));
when(this.queryMethod.isStreamQuery()).thenReturn(true);
Object[] params = new Object[] { "BUY", };
Object result = this.partTreeDatastoreQuery.execute(params);
assertThat(result).isInstanceOf(Stream.class);
assertThat((Stream) result).hasSize(2).contains(tradeA, tradeB);
}

private void queryWithMockResult(String queryName, List results, Method m,
ProjectionInformation projectionInformation) {
queryWithMockResult(queryName, results, m, false, projectionInformation);
Expand Down Expand Up @@ -884,6 +900,10 @@ public Trade findByAction(String action) {
return null;
}

public Stream<Trade> findStreamByAction(String action) {
return null;
}

@Nullable
public Trade findByActionNullable(String action) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
Expand Down Expand Up @@ -99,6 +100,10 @@ private void retrieveAndPrintSingers() {
.findAllById(Arrays.asList("singer1", "singer2", "singer3"))
.forEach(x -> System.out.println("retrieved singer: " + x));

System.out.println("Query results can also be returned as Stream: ");
Stream<Singer> streamResult = singerRepository.findSingersByLastName("Doe");
streamResult.forEach(System.out::println);

//Query by example: find all singers with the last name "Doe"
Iterable<Singer> singers = this.singerRepository.findAll(
Example.of(new Singer(null, null, "Doe", null), ExampleMatcher.matching().withIgnorePaths("singerId")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.example;

import java.util.List;
import java.util.stream.Stream;

import com.google.cloud.spring.data.datastore.repository.DatastoreRepository;
import com.google.cloud.spring.data.datastore.repository.query.Query;
Expand All @@ -39,4 +40,6 @@ public interface SingerRepository extends DatastoreRepository<Singer, String> {
List<Singer> findSingersByFirstBand(@Param("band") Band band);

List<Singer> findByFirstBand(Band band);

Stream<Singer> findSingersByLastName(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -240,4 +241,11 @@ private List<Singer> getSingers(String url) throws java.io.IOException {
(String) som.get("firstName"), (String) som.get("lastName"), null))
.collect(Collectors.toList());
}

@Test
public void testQueryReturnStream() {
Stream<Singer> streamResult = singerRepository.findSingersByLastName("Doe");
assertThat(streamResult).isInstanceOf(Stream.class);
streamResult.map(Singer::getLastName).forEach(x -> assertThat(x).isEqualTo("Doe"));
}
}

0 comments on commit b90ed85

Please sign in to comment.