Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ETH getLogs: fix slowness at head and ignore null blocks #12207

Merged
merged 2 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,12 @@ func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
return maxHeight, err
}

func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height)
var exists bool
err := row.Scan(&exists)
return exists, err
func (ei *EventIndex) IsHeightPast(ctx context.Context, height uint64) (bool, error) {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
maxHeight, err := ei.GetMaxHeightInIndex(ctx)
if err != nil {
return false, err
}
return height <= maxHeight, nil
}

func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
Expand Down
8 changes: 4 additions & 4 deletions chain/events/filter/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ func TestEventIndexPrefillFilter(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(14000), mh)

b, err := ei.IsHeightProcessed(context.Background(), 14000)
b, err := ei.IsHeightPast(context.Background(), 14000)
require.NoError(t, err)
require.True(t, b)

b, err = ei.IsHeightProcessed(context.Background(), 14001)
b, err = ei.IsHeightPast(context.Background(), 14001)
require.NoError(t, err)
require.False(t, b)

b, err = ei.IsHeightProcessed(context.Background(), 13000)
b, err = ei.IsHeightPast(context.Background(), 13000)
require.NoError(t, err)
require.False(t, b)
require.True(t, b)

tsKey := events14000.msgTs.Key()
tsKeyCid, err := tsKey.Cid()
Expand Down
35 changes: 22 additions & 13 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func (a *EthModule) EthGetBlockByNumber(ctx context.Context, blkParam string, fu

func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) {
return a.EthGetTransactionByHashLimited(ctx, txHash, api.LookbackNoLimit)

}

func (a *EthModule) EthGetTransactionByHashLimited(ctx context.Context, txHash *ethtypes.EthHash, limit abi.ChainEpoch) (*ethtypes.EthTx, error) {
Expand Down Expand Up @@ -1276,23 +1275,31 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
if pf.tipsetCid == cid.Undef {
maxHeight := pf.maxHeight
if maxHeight == -1 {
maxHeight = e.Chain.GetHeaviestTipSet().Height()
// heaviest tipset doesn't have events because its messages haven't been executed yet
maxHeight = e.Chain.GetHeaviestTipSet().Height() - 1
}

if maxHeight < 0 {
return nil, xerrors.Errorf("maxHeight requested is less than 0")
}
if maxHeight > e.Chain.GetHeaviestTipSet().Height() {

// we can't return events for the heaviest tipset as the transactions in that tipset will be executed
// in the next non null tipset (because of Filecoin's "deferred execution" model)
if maxHeight > e.Chain.GetHeaviestTipSet().Height()-1 {
return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset")
}

err := e.waitForHeightProcessed(ctx, maxHeight)
if err != nil {
return nil, err
}

// should also have the minHeight in the filter indexed
if b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)); err != nil {
return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err)
} else if !b {
return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight)
rvagg marked this conversation as resolved.
Show resolved Hide resolved
}
// TODO: Ideally we should also check that events for the epoch at `pf.minheight` have been indexed
// However, it is currently tricky to check/guarantee this for two reasons:
// a) Event Index is not aware of null-blocks. This means that the Event Index wont be able to say whether the block at
// `pf.minheight` is a null block or whether it has no events
// b) There can be holes in the index where events at certain epoch simply haven't been indexed because of edge cases around
// node restarts while indexing. This needs a long term "auto-repair"/"automated-backfilling" implementation in the index
// So, for now, the best we can do is ensure that the event index has evenets for events at height >= `pf.maxHeight`
} else {
ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid)
if err != nil {
Expand Down Expand Up @@ -1324,6 +1331,8 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI)
}

// note that we can have null blocks at the given height and the event Index is not null block aware
// so, what we do here is wait till we see the event index contain a block at a height greater than the given height
func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error {
ei := e.EventFilterManager.EventIndex
if height > e.Chain.GetHeaviestTipSet().Height() {
Expand All @@ -1334,7 +1343,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
defer cancel()

// if the height we're interested in has already been indexed -> there's nothing to do here
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
Expand All @@ -1345,7 +1354,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
defer unSubscribeF()

// it could be that the event index was update while the subscription was being processed -> check if index has what we need now
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
Expand All @@ -1354,7 +1363,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
for {
select {
case <-subCh:
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil {
if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b {
return nil
Expand Down
Loading