Skip to content

Commit

Permalink
fix: ETH getLogs: fix slowness at head and ignore null blocks (#12207)
Browse files Browse the repository at this point in the history
* fix get logs slowness and handling of null blocks
  • Loading branch information
aarshkshah1992 authored Jul 11, 2024
1 parent 7ea0e6c commit 9741571
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 22 deletions.
11 changes: 6 additions & 5 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,12 @@ func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
return maxHeight, err
}

func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height)
var exists bool
err := row.Scan(&exists)
return exists, err
func (ei *EventIndex) IsHeightPast(ctx context.Context, height uint64) (bool, error) {
maxHeight, err := ei.GetMaxHeightInIndex(ctx)
if err != nil {
return false, err
}
return height <= maxHeight, nil
}

func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
Expand Down
8 changes: 4 additions & 4 deletions chain/events/filter/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ func TestEventIndexPrefillFilter(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(14000), mh)

b, err := ei.IsHeightProcessed(context.Background(), 14000)
b, err := ei.IsHeightPast(context.Background(), 14000)
require.NoError(t, err)
require.True(t, b)

b, err = ei.IsHeightProcessed(context.Background(), 14001)
b, err = ei.IsHeightPast(context.Background(), 14001)
require.NoError(t, err)
require.False(t, b)

b, err = ei.IsHeightProcessed(context.Background(), 13000)
b, err = ei.IsHeightPast(context.Background(), 13000)
require.NoError(t, err)
require.False(t, b)
require.True(t, b)

tsKey := events14000.msgTs.Key()
tsKeyCid, err := tsKey.Cid()
Expand Down
35 changes: 22 additions & 13 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func (a *EthModule) EthGetBlockByNumber(ctx context.Context, blkParam string, fu

func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) {
return a.EthGetTransactionByHashLimited(ctx, txHash, api.LookbackNoLimit)

}

func (a *EthModule) EthGetTransactionByHashLimited(ctx context.Context, txHash *ethtypes.EthHash, limit abi.ChainEpoch) (*ethtypes.EthTx, error) {
Expand Down Expand Up @@ -1276,23 +1275,31 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
if pf.tipsetCid == cid.Undef {
maxHeight := pf.maxHeight
if maxHeight == -1 {
maxHeight = e.Chain.GetHeaviestTipSet().Height()
// heaviest tipset doesn't have events because its messages haven't been executed yet
maxHeight = e.Chain.GetHeaviestTipSet().Height() - 1
}

if maxHeight < 0 {
return nil, xerrors.Errorf("maxHeight requested is less than 0")
}
if maxHeight > e.Chain.GetHeaviestTipSet().Height() {

// we can't return events for the heaviest tipset as the transactions in that tipset will be executed
// in the next non null tipset (because of Filecoin's "deferred execution" model)
if maxHeight > e.Chain.GetHeaviestTipSet().Height()-1 {
return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset")
}

err := e.waitForHeightProcessed(ctx, maxHeight)
if err != nil {
return nil, err
}

// should also have the minHeight in the filter indexed
if b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)); err != nil {
return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err)
} else if !b {
return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight)
}
// TODO: Ideally we should also check that events for the epoch at `pf.minheight` have been indexed
// However, it is currently tricky to check/guarantee this for two reasons:
// a) Event Index is not aware of null-blocks. This means that the Event Index wont be able to say whether the block at
// `pf.minheight` is a null block or whether it has no events
// b) There can be holes in the index where events at certain epoch simply haven't been indexed because of edge cases around
// node restarts while indexing. This needs a long term "auto-repair"/"automated-backfilling" implementation in the index
// So, for now, the best we can do is ensure that the event index has evenets for events at height >= `pf.maxHeight`
} else {
ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid)
if err != nil {
Expand Down Expand Up @@ -1324,6 +1331,8 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI)
}

// note that we can have null blocks at the given height and the event Index is not null block aware
// so, what we do here is wait till we see the event index contain a block at a height greater than the given height
func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error {
ei := e.EventFilterManager.EventIndex
if height > e.Chain.GetHeaviestTipSet().Height() {
Expand All @@ -1334,7 +1343,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
defer cancel()

// if the height we're interested in has already been indexed -> there's nothing to do here
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
Expand All @@ -1345,7 +1354,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
defer unSubscribeF()

// it could be that the event index was update while the subscription was being processed -> check if index has what we need now
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
Expand All @@ -1354,7 +1363,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
for {
select {
case <-subCh:
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
Expand Down

0 comments on commit 9741571

Please sign in to comment.