Skip to content

Commit

Permalink
[PAN-3249] Use Bloombits for Logs queries (hyperledger#127)
Browse files Browse the repository at this point in the history
Use the bloombits for logs queries, so we only have to walk headers
and not every receipt on a large query.

Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
Signed-off-by: edwardmack <ed@edwardmack.com>
  • Loading branch information
shemnon authored and edwardmack committed Nov 4, 2019
1 parent 9f40cab commit ab28ce2
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,16 @@ public Optional<TransactionReceiptWithMetadata> transactionReceiptByTransactionH
*/
public List<LogWithMetadata> matchingLogs(
final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) {
// rangeClosed handles the inverted from/to situations automatically with zero results.
return LongStream.rangeClosed(fromBlockNumber, toBlockNumber)
.mapToObj(blockchain::getBlockHashByNumber)
.mapToObj(blockchain::getBlockHeader)
// Use takeWhile instead of clamping on toBlockNumber/headBlockNumber because it may get an
// extra block or two for a query that has a toBlockNumber past chain head. Similarly this
// handles the case when fromBlockNumber is past chain head.
.takeWhile(Optional::isPresent)
.flatMap(Optional::stream)
.flatMap(hash -> matchingLogs(hash, query).stream())
.map(Optional::get)
.filter(header -> query.couldMatch(header.getLogsBloom()))
.flatMap(header -> matchingLogs(header.getHash(), query).stream())
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,46 @@
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Log;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogsBloomFilter;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class LogsQuery {

private final List<Address> queryAddresses;
private final List<List<LogTopic>> queryTopics;
private final List<LogsBloomFilter> addressBlooms;
private final List<List<LogsBloomFilter>> topicsBlooms;

private LogsQuery(final List<Address> queryAddresses, final List<List<LogTopic>> queryTopics) {
this.queryAddresses = queryAddresses;
this.queryTopics = queryTopics;
addressBlooms =
this.queryAddresses.stream()
.map(LogsBloomFilter::computeBytes)
.collect(Collectors.toList());
topicsBlooms =
this.queryTopics.stream()
.map(
topics ->
topics.stream()
.filter(Objects::nonNull)
.map(LogsBloomFilter::computeBytes)
.collect(Collectors.toList()))
.collect(Collectors.toList());
}

private LogsQuery(final List<Address> addresses, final List<List<LogTopic>> topics) {
this.queryAddresses = addresses;
this.queryTopics = topics;
public boolean couldMatch(final LogsBloomFilter bloom) {
return (addressBlooms.isEmpty() || addressBlooms.stream().anyMatch(bloom::couldContain))
&& (topicsBlooms.isEmpty()
|| topicsBlooms.stream()
.allMatch(
topics -> topics.isEmpty() || topics.stream().anyMatch(bloom::couldContain)));
}

public boolean matches(final Log log) {
Expand Down Expand Up @@ -62,6 +88,20 @@ private boolean matchesTopic(final LogTopic topic, final List<LogTopic> matchCri
return matchCriteria.contains(null) || matchCriteria.contains(topic);
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final LogsQuery logsQuery = (LogsQuery) o;
return Objects.equals(queryAddresses, logsQuery.queryAddresses)
&& Objects.equals(queryTopics, logsQuery.queryTopics);
}

@Override
public int hashCode() {
return Objects.hash(queryAddresses, queryTopics);
}

public static class Builder {
private final List<Address> queryAddresses = Lists.newArrayList();
private final List<List<LogTopic>> queryTopics = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void shouldCheckMatchingLogsWhenRecordedNewBlockEvent() {
filterManager.installLogFilter(latest(), latest(), logsQuery());
recordNewBlockEvent();

verify(blockchainQueries).matchingLogs(eq(100L), eq(100L), refEq(logsQuery()));
verify(blockchainQueries).matchingLogs(eq(100L), eq(100L), eq(logsQuery()));
}

@Test
Expand All @@ -96,14 +95,14 @@ public void shouldUseHeadBlockAsFromBlockNumberWhenCheckingLogsForChanges() {
filterManager.installLogFilter(blockNum(1L), blockNum(10L), logsQuery());
recordNewBlockEvent();

verify(blockchainQueries).matchingLogs(eq(3L), eq(10L), refEq(logsQuery()));
verify(blockchainQueries).matchingLogs(eq(3L), eq(10L), eq(logsQuery()));
}

@Test
public void shouldReturnLogWhenLogFilterMatches() {
final LogWithMetadata log = logWithMetadata();
when(blockchainQueries.headBlockNumber()).thenReturn(100L);
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), refEq(logsQuery())))
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery())))
.thenReturn(Lists.newArrayList(log));

final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
Expand Down Expand Up @@ -163,13 +162,13 @@ public void getLogsForAbsentFilterReturnsNull() {
public void getLogsForExistingFilterReturnsResults() {
final LogWithMetadata log = logWithMetadata();
when(blockchainQueries.headBlockNumber()).thenReturn(100L);
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), refEq(logsQuery())))
when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery())))
.thenReturn(Lists.newArrayList(log));

final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery());
final List<LogWithMetadata> retrievedLogs = filterManager.logs(filterId);

assertThat(retrievedLogs).isEqualToComparingFieldByFieldRecursively(Lists.newArrayList(log));
assertThat(retrievedLogs).usingRecursiveComparison().isEqualTo(Lists.newArrayList(log));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Log;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogsBloomFilter;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.ArrayList;
Expand All @@ -41,6 +42,7 @@ public void wildcardQueryAddressTopicReturnTrue() {
final List<LogTopic> topics = new ArrayList<>();
final Log log = new Log(address, data, topics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand All @@ -52,6 +54,7 @@ public void univariateAddressMatchReturnsTrue() {
final List<LogTopic> topics = new ArrayList<>();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), topics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand All @@ -77,6 +80,7 @@ public void multivariateAddressQueryMatchReturnsTrue() {
final List<LogTopic> topics = new ArrayList<>();
final Log log = new Log(address1, BytesValue.fromHexString("0x0102"), topics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -129,6 +133,7 @@ public void univariateTopicQueryMatchReturnTrue() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(topicsQuery).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), Lists.newArrayList(topic));

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -214,6 +219,7 @@ public void multivariateSurplusTopicMatchMultivariateNullQueryReturnTrue() {
new LogsQuery.Builder().address(address1).topics(queryParameter).build();
final Log log = new Log(address1, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -243,6 +249,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_00() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -275,6 +282,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_01() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -308,6 +316,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_02() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -344,6 +353,7 @@ public void redundantUnivariateTopicMatchMultivariateQueryReturnTrue() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down Expand Up @@ -417,6 +427,7 @@ public void multivariateTopicMatchMultivariateQueryReturnTrue() {
final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build();
final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics);

assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue();
assertThat(query.matches(log)).isTrue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -85,14 +86,14 @@ public void newFilterWithoutAddressAndTopicsParamsInstallsEmptyLogFilter() {
final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), "0x1");

final LogsQuery expectedLogsQuery = new LogsQuery.Builder().build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

@Test
Expand All @@ -104,14 +105,14 @@ public void newFilterWithTopicsOnlyParamInstallsExpectedLogFilter() {

final LogsQuery expectedLogsQuery =
new LogsQuery.Builder().topics(new TopicsParameter(topics)).build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

@Test
Expand All @@ -122,14 +123,14 @@ public void newFilterWithAddressOnlyParamInstallsExpectedLogFilter() {
final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), "0x1");

final LogsQuery expectedLogsQuery = new LogsQuery.Builder().address(address).build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

@Test
Expand All @@ -142,14 +143,14 @@ public void newFilterWithAddressAndTopicsParamInstallsExpectedLogFilter() {

final LogsQuery expectedLogsQuery =
new LogsQuery.Builder().address(address).topics(new TopicsParameter(topics)).build();
when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1");
when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1");

final JsonRpcResponse actualResponse = method.response(request);

assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse);
verify(filterManager)
.installLogFilter(
refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery));
refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery));
}

private List<List<String>> topics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,32 @@
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [{
"fromBlock": "0x17",
"toBlock": "0x17",
"fromBlock": "0x20",
"toBlock": "0x20",
"address": [],
"topics": [["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]]
"topics": [null, ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b"]]
}]
},
"response": {
"jsonrpc": "2.0",
"id": 406,
"result" : [{
"logIndex" : "0x0",
"removed": false,
"blockNumber" : "0x17",
"blockHash" : "0x3c419f39b340a4c35cc27b8f7880b779dc1abb9814ad13a2a5a55b885cc8ec2d",
"transactionHash" : "0x97a385bf570ced7821c6495b3877ddd2afd5c452f350f0d4876e98d9161389c6",
"transactionIndex" : "0x0",
"address" : "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f",
"data" : "0x000000000000000000000000000000000000000000000000000000000000002a",
"topics" : ["0x65c9ac8011e286e89d02a269890f41d67ca2cc597b2c76c7c69321ff492be580"]
}]
"result": [
{
"logIndex": "0x0",
"removed": false,
"blockNumber": "0x20",
"blockHash": "0x71d59849ddd98543bdfbe8548f5eed559b07b8aaf196369f39134500eab68e53",
"transactionHash": "0xcef53f2311d7c80e9086d661e69ac11a5f3d081e28e02a9ba9b66749407ac310",
"transactionIndex": "0x0",
"address": "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f",
"data": "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe9000000000000000000000000000000000000000000000000000000000000002a",
"topics": [
"0x0000000000000000000000000000000000000000000000000000000000000001",
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
]
}
]
},
"statusCode": 200
}
Loading

0 comments on commit ab28ce2

Please sign in to comment.