Skip to content

Commit

Permalink
Merge pull request #6 from onflow/janez/add-blocktimestamp-to-blockevent
Browse files Browse the repository at this point in the history
Added BlockTimestamp to access/GetBlockEvents
  • Loading branch information
Kay-Zee authored Sep 29, 2020
2 parents d00b412 + 9c57fb2 commit 5afe528
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 57 deletions.
55 changes: 39 additions & 16 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@ func (b *backendEvents) GetEventsForHeightRange(
endHeight = head.Height
}

// find the block IDs for all the blocks between min and max height (inclusive)
blockIDs := make([]flow.Identifier, 0)
// find the block headers for all the blocks between min and max height (inclusive)
blockHeaders := make([]*flow.Header, 0)

for i := startHeight; i <= endHeight; i++ {
block, err := b.blocks.ByHeight(i)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get events: %v", err)
}

blockIDs = append(blockIDs, block.ID())
blockHeaders = append(blockHeaders, block.Header)
}

return b.getBlockEventsFromExecutionNode(ctx, blockIDs, eventType)
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType)
}

// GetEventsForBlockIDs retrieves events for all the specified block IDs that have the given type
Expand All @@ -66,17 +66,33 @@ func (b *backendEvents) GetEventsForBlockIDs(
eventType string,
blockIDs []flow.Identifier,
) ([]flow.BlockEvents, error) {

// find the block headers for all the block IDs
blockHeaders := make([]*flow.Header, 0)
for _, blockID := range blockIDs {
block, err := b.blocks.ByID(blockID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get events: %v", err)
}

blockHeaders = append(blockHeaders, block.Header)
}

// forward the request to the execution node
return b.getBlockEventsFromExecutionNode(ctx, blockIDs, eventType)
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType)
}

func (b *backendEvents) getBlockEventsFromExecutionNode(
ctx context.Context,
blockIDs []flow.Identifier,
blockHeaders []*flow.Header,
eventType string,
) ([]flow.BlockEvents, error) {

// create an execution API request for events at block ID
blockIDs := make([]flow.Identifier, len(blockHeaders))
for i := range blockIDs {
blockIDs[i] = blockHeaders[i].ID()
}
req := execproto.GetEventsForBlockIDsRequest{
Type: eventType,
BlockIds: convert.IdentifiersToMessages(blockIDs),
Expand All @@ -90,7 +106,7 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
}

// convert execution node api result to access node api result
results, err := verifyAndConvertToAccessEvents(resp.GetResults(), blockIDs)
results, err := verifyAndConvertToAccessEvents(resp.GetResults(), blockHeaders)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to verify retrieved events from execution node: %v", err)
}
Expand All @@ -100,27 +116,34 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(

// verifyAndConvertToAccessEvents converts execution node api result to access node api result, and verifies that the results contains
// results from each block that was requested
func verifyAndConvertToAccessEvents(execEvents []*execproto.GetEventsForBlockIDsResponse_Result, requestedBlockIDs []flow.Identifier) ([]flow.BlockEvents, error) {
if len(execEvents) != len(requestedBlockIDs) {
func verifyAndConvertToAccessEvents(execEvents []*execproto.GetEventsForBlockIDsResponse_Result, requestedBlockHeaders []*flow.Header) ([]flow.BlockEvents, error) {
if len(execEvents) != len(requestedBlockHeaders) {
return nil, errors.New("number of results does not match number of blocks requested")
}

blockIDSet := map[string]bool{}
for _, blockID := range requestedBlockIDs {
blockIDSet[blockID.String()] = true
reqestedBlockHeaderSet := map[string]*flow.Header{}
for _, header := range requestedBlockHeaders {
reqestedBlockHeaderSet[header.ID().String()] = header
}

results := make([]flow.BlockEvents, len(execEvents))

for i, result := range execEvents {
if !blockIDSet[hex.EncodeToString(result.GetBlockId())] {
header, expected := reqestedBlockHeaderSet[hex.EncodeToString(result.GetBlockId())]
if !expected {
return nil, fmt.Errorf("unexpected blockID from exe node %x", result.GetBlockId())
}
if result.GetBlockHeight() != header.Height {
return nil, fmt.Errorf("unexpected block height %d for block %x from exe node",
result.GetBlockHeight(),
result.GetBlockId())
}

results[i] = flow.BlockEvents{
BlockID: convert.MessageToIdentifier(result.GetBlockId()),
BlockHeight: result.GetBlockHeight(),
Events: convert.MessagesToEvents(result.GetEvents()),
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
Events: convert.MessagesToEvents(result.GetEvents()),
}
}

Expand Down
91 changes: 53 additions & 38 deletions engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,28 +417,40 @@ func (suite *Suite) TestGetLatestFinalizedBlock() {
}

func (suite *Suite) TestGetEventsForBlockIDs() {

blockIDs := getIDs(5)
events := getEvents(10)

setupStorage := func(n int) []*flow.Header {
headers := make([]*flow.Header, n)
for i := 0; i < n; i++ {
b := unittest.BlockFixture()
suite.blocks.
On("ByID", b.ID()).
Return(&b, nil).Once()

headers[i] = b.Header
}
return headers
}
blockHeaders := setupStorage(5)

// create the expected results from execution node and access node
exeResults := make([]*execproto.GetEventsForBlockIDsResponse_Result, len(blockIDs))
exeResults := make([]*execproto.GetEventsForBlockIDsResponse_Result, len(blockHeaders))

for i := 0; i < len(blockIDs); i++ {
for i := 0; i < len(blockHeaders); i++ {
exeResults[i] = &execproto.GetEventsForBlockIDsResponse_Result{
BlockId: convert.IdentifierToMessage(blockIDs[i]),
BlockHeight: uint64(i),
BlockId: convert.IdentifierToMessage(blockHeaders[i].ID()),
BlockHeight: blockHeaders[i].Height,
Events: convert.EventsToMessages(events),
}
}

expected := make([]flow.BlockEvents, len(blockIDs))

for i := 0; i < len(blockIDs); i++ {
expected := make([]flow.BlockEvents, len(blockHeaders))
for i := 0; i < len(blockHeaders); i++ {
expected[i] = flow.BlockEvents{
BlockID: blockIDs[i],
BlockHeight: uint64(i),
Events: events,
BlockID: blockHeaders[i].ID(),
BlockHeight: blockHeaders[i].Height,
BlockTimestamp: blockHeaders[i].Timestamp,
Events: events,
}
}

Expand All @@ -449,6 +461,10 @@ func (suite *Suite) TestGetEventsForBlockIDs() {

ctx := context.Background()

blockIDs := make([]flow.Identifier, len(blockHeaders))
for i, header := range blockHeaders {
blockIDs[i] = header.ID()
}
exeReq := &execproto.GetEventsForBlockIDsRequest{
BlockIds: convert.IdentifiersToMessages(blockIDs),
Type: string(flow.EventAccountCreated),
Expand All @@ -464,7 +480,9 @@ func (suite *Suite) TestGetEventsForBlockIDs() {
backend := New(
suite.state,
suite.execClient,
nil, nil, nil, nil, nil,
nil,
suite.blocks,
nil, nil, nil,
suite.chainID,
metrics.NewNoopCollector(),
0,
Expand All @@ -485,16 +503,16 @@ func (suite *Suite) TestGetEventsForHeightRange() {
var minHeight uint64 = 5
var maxHeight uint64 = 10
var headHeight uint64
var expBlockIDs []flow.Identifier
var blockHeaders []*flow.Header

setupHeadHeight := func(height uint64) {
header := unittest.BlockHeaderFixture() // create a mock header
header.Height = height // set the header height
suite.snapshot.On("Head").Return(&header, nil).Once()
}

setupStorage := func(min uint64, max uint64) []flow.Identifier {
ids := make([]flow.Identifier, 0)
setupStorage := func(min uint64, max uint64) []*flow.Header {
headers := make([]*flow.Header, 0)

for i := min; i <= max; i++ {
b := unittest.BlockFixture()
Expand All @@ -503,34 +521,39 @@ func (suite *Suite) TestGetEventsForHeightRange() {
On("ByHeight", i).
Return(&b, nil).Once()

ids = append(ids, b.ID())
headers = append(headers, b.Header)
}

return ids
return headers
}

setupExecClient := func() []flow.BlockEvents {
blockIDs := make([]flow.Identifier, len(blockHeaders))
for i, header := range blockHeaders {
blockIDs[i] = header.ID()
}
execReq := &execproto.GetEventsForBlockIDsRequest{
BlockIds: convert.IdentifiersToMessages(expBlockIDs),
BlockIds: convert.IdentifiersToMessages(blockIDs),
Type: string(flow.EventAccountCreated),
}

results := make([]flow.BlockEvents, len(expBlockIDs))
exeResults := make([]*execproto.GetEventsForBlockIDsResponse_Result, len(expBlockIDs))
results := make([]flow.BlockEvents, len(blockHeaders))
exeResults := make([]*execproto.GetEventsForBlockIDsResponse_Result, len(blockHeaders))

for i, id := range expBlockIDs {
for i, header := range blockHeaders {
events := getEvents(1)
height := uint64(5) // an arbitrary height
height := header.Height

results[i] = flow.BlockEvents{
BlockID: id,
BlockHeight: height,
Events: events,
BlockID: header.ID(),
BlockHeight: height,
BlockTimestamp: header.Timestamp,
Events: events,
}

exeResults[i] = &execproto.GetEventsForBlockIDsResponse_Result{
BlockId: convert.IdentifierToMessage(id),
BlockHeight: height,
BlockId: convert.IdentifierToMessage(header.ID()),
BlockHeight: header.Height,
Events: convert.EventsToMessages(events),
}
}
Expand Down Expand Up @@ -570,7 +593,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {

// setup mocks
setupHeadHeight(headHeight)
expBlockIDs = setupStorage(minHeight, maxHeight)
blockHeaders = setupStorage(minHeight, maxHeight)
expectedResp := setupExecClient()

// create handler
Expand Down Expand Up @@ -600,7 +623,7 @@ func (suite *Suite) TestGetEventsForHeightRange() {
suite.Run("valid request with max_height > last_sealed_block_height", func() {
headHeight = maxHeight - 1
setupHeadHeight(headHeight)
expBlockIDs = setupStorage(minHeight, headHeight)
blockHeaders = setupStorage(minHeight, headHeight)
expectedResp := setupExecClient()

backend := New(
Expand Down Expand Up @@ -779,14 +802,6 @@ func (suite *Suite) checkResponse(resp interface{}, err error) {
suite.Require().NotNil(resp)
}

func getIDs(n int) []flow.Identifier {
ids := make([]flow.Identifier, n)
for i := range ids {
ids[i] = unittest.IdentifierFixture()
}
return ids
}

func getEvents(n int) []flow.Event {
events := make([]flow.Event, n)
for i := range events {
Expand Down
8 changes: 5 additions & 3 deletions model/flow/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package flow

import (
"fmt"
"time"

"github.com/onflow/flow-go/model/encoding"
"github.com/onflow/flow-go/model/fingerprint"
Expand Down Expand Up @@ -74,7 +75,8 @@ func wrapEvent(e Event) eventWrapper {

// BlockEvents contains events emitted in a single block.
type BlockEvents struct {
BlockID Identifier
BlockHeight uint64
Events []Event
BlockID Identifier
BlockHeight uint64
BlockTimestamp time.Time
Events []Event
}

0 comments on commit 5afe528

Please sign in to comment.