Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use epochs to gc eth tx hashes from chain indexer #12516

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.removeEthHashesBeforeTimeStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < ?",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
Expand Down
44 changes: 0 additions & 44 deletions chain/index/ddls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,47 +662,3 @@ func verifyNonRevertedEventEntriesCount(t *testing.T, s *SqliteIndexer, tipsetKe
require.NoError(t, err)
require.Equal(t, expectedCount, count)
}

func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 {
res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)

messageID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), messageID)

// read back the message to verify it was inserted correctly
verifyTipsetMessage(t, s, messageID, ts)

return messageID
}

func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 {
res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterAddr, e.reverted)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)

eventID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), eventID)

verifyEvent(t, s, eventID, e)

return eventID
}

func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) {
res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
}
21 changes: 10 additions & 11 deletions chain/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package index

import (
"context"
"strconv"
"time"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -75,17 +74,17 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
// -------------------------------------------------------------------------------------------------
// Also GC eth hashes

// Convert gcRetentionEpochs to number of days
gcRetentionDays := si.gcRetentionEpochs / (builtin.EpochsInDay)
if gcRetentionDays < 1 {
log.Infof("skipping gc of eth hashes as retention days is less than 1")
return
}
currHeadTime := time.Unix(int64(head.MinTimestamp()), 0)
retentionDuration := time.Duration(si.gcRetentionEpochs*builtin.EpochDurationSeconds) * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining this calculation here ?

Why do we need time.Duration(si.gcRetentionEpochs*builtin.EpochDurationSeconds) * time.Second ?


// gcTime is the time that is gcRetentionEpochs before currHeadTime
gcTime := currHeadTime.Add(-retentionDuration)

log.Infof("gc'ing eth hashes before time %s", gcTime.UTC().String())

log.Infof("gc'ing eth hashes older than %d days", gcRetentionDays)
res, err = si.stmts.removeEthHashesOlderThanStmt.ExecContext(ctx, "-"+strconv.Itoa(int(gcRetentionDays))+" day")
res, err = si.stmts.removeEthHashesBeforeTimeStmt.ExecContext(ctx, gcTime.Unix())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do .Unix() here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to provide as accurate time as possible, so I choose unix.

if err != nil {
log.Errorf("failed to gc eth hashes older than %d days: %w", gcRetentionDays, err)
log.Errorf("failed to gc eth hashes before time %s: %w", gcTime.String(), err)
return
}

Expand All @@ -95,5 +94,5 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
return
}

log.Infof("gc'd %d eth hashes older than %d days", rows, gcRetentionDays)
log.Infof("gc'd %d eth hashes before time %s", rows, gcTime.String())
}
67 changes: 38 additions & 29 deletions chain/index/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,57 +11,66 @@ import (

func TestGC(t *testing.T) {
ctx := context.Background()
rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano()))
genesisTime := time.Now()
rng := pseudo.New(pseudo.NewSource(genesisTime.UnixNano()))

// head at height 60
// insert tipsets at heigh 1,10,50.
// retention epochs is 20
// tipset at height 60 will be in the future
si, _, _ := setupWithHeadIndexed(t, 60, rng)
si.gcRetentionEpochs = 20
defer func() { _ = si.Close() }()

tsCid1 := randomCid(t, rng)
tsCid10 := randomCid(t, rng)
tsCid50 := randomCid(t, rng)

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid1.Bytes(),
height: 1,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid10.Bytes(),
height: 10,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid50.Bytes(),
height: 50,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})
// all tipsets will be in future
tsKeyCid1, _, _ := insertRandomTipsetAtHeight(t, si, 1, false, genesisTime)
tsKeyCid2, _, _ := insertRandomTipsetAtHeight(t, si, 10, false, genesisTime)
tsKeyCid3, _, _ := insertRandomTipsetAtHeight(t, si, 50, false, genesisTime)

si.gc(ctx)

// tipset at height 1 and 10 should be removed
// tipset at height 1 data should be removed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont understand this comment. what's "height 1 data" ?

var count int
err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 1").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = si.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tsKeyCid1.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = si.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tsKeyCid1.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

// tipset at height 10 data should be removed
err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 10").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = si.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tsKeyCid2.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = si.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tsKeyCid2.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

// tipset at height 50 should not be removed
err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 50").Scan(&count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lot of duplication here. Can we wrap this and the two sql statements below into a function and re-use it everywhere ?

require.NoError(t, err)
require.Equal(t, 1, count)

err = si.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tsKeyCid3.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, 1, count)

err = si.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tsKeyCid3.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, 1, count)

err = si.db.QueryRow("SELECT COUNT(*) FROM eth_tx_hash").Scan(&count)
require.NoError(t, err)
// eth_tx_hash for tipset at height 50 timestamp should not be removed
require.Equal(t, 1, count)
}
Loading
Loading