Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use epochs to gc eth tx hashes from chain indexer #12516

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.removeEthHashesBeforeTimeStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < ?",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
Expand Down
44 changes: 0 additions & 44 deletions chain/index/ddls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,47 +662,3 @@ func verifyNonRevertedEventEntriesCount(t *testing.T, s *SqliteIndexer, tipsetKe
require.NoError(t, err)
require.Equal(t, expectedCount, count)
}

func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 {
res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)

messageID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), messageID)

// read back the message to verify it was inserted correctly
verifyTipsetMessage(t, s, messageID, ts)

return messageID
}

func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 {
res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterAddr, e.reverted)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)

eventID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), eventID)

verifyEvent(t, s, eventID, e)

return eventID
}

func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) {
res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
}
32 changes: 20 additions & 12 deletions chain/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package index

import (
"context"
"strconv"
"time"

logging "github.com/ipfs/go-log/v2"
Expand All @@ -15,6 +14,8 @@ var (
cleanupInterval = time.Duration(4) * time.Hour
)

const graceEpochs = 10

func (si *SqliteIndexer) gcLoop() {
defer si.wg.Done()

Expand Down Expand Up @@ -50,7 +51,7 @@ func (si *SqliteIndexer) gc(ctx context.Context) {

head := si.cs.GetHeaviestTipSet()

removalEpoch := int64(head.Height()) - si.gcRetentionEpochs - 10 // 10 is for some grace period
removalEpoch := int64(head.Height()) - si.gcRetentionEpochs - graceEpochs
if removalEpoch <= 0 {
log.Info("no tipsets to gc")
return
Expand All @@ -75,17 +76,24 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
// -------------------------------------------------------------------------------------------------
// Also GC eth hashes

// Convert gcRetentionEpochs to number of days
gcRetentionDays := si.gcRetentionEpochs / (builtin.EpochsInDay)
if gcRetentionDays < 1 {
log.Infof("skipping gc of eth hashes as retention days is less than 1")
return
}
// Calculate the retention duration based on the number of epochs to retain.
// retentionDuration represents the total duration (in nano seconds) for which data should be retained before considering it for garbage collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is it in nanoseconds ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be seconds 👍🏾 (I will update it)
so what is happening here is we are converting EpochDurationSeconds(30) to its time duration by multiplying with time.seconds.

Copy link
Contributor Author

@akaladarshi akaladarshi Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 So time.Duration takes data in nanoseconds (that's why I added nano seconds)
so if I just multiply time.Duration(rententionEpoch * EpochsDurationSeconds), it will give 600 (rententionEpoch = 20, EpochsDurationSeconds=30) but that will be in nanoseconds, to get proper time in seconds we have to multiply with time.Seconds

// graceDuration represents the additional duration (in nano seconds) to retain data after the retention duration.
// Since time.Duration expects a nanosecond value, we multiply the total seconds by time.Second to convert it to nanoseconds.
retentionDuration := time.Duration(si.gcRetentionEpochs*builtin.EpochDurationSeconds) * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining this calculation here ?

Why do we need time.Duration(si.gcRetentionEpochs*builtin.EpochDurationSeconds) * time.Second ?

graceDuration := time.Duration(graceEpochs*builtin.EpochDurationSeconds) * time.Second

// Calculate the total duration to retain data.
totalRetentionDuration := retentionDuration + graceDuration
currHeadTime := time.Unix(int64(head.MinTimestamp()), 0)
// gcTime is the time that is (gcRetentionEpochs + graceEpochs) in nano seconds before currHeadTime
gcTime := currHeadTime.Add(-totalRetentionDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is less than or equal to 0, return without doing anything. Also please can we had a test for when gcTime <= 0 ?


log.Infof("gc'ing eth hashes before time %s", gcTime.UTC().String())

log.Infof("gc'ing eth hashes older than %d days", gcRetentionDays)
res, err = si.stmts.removeEthHashesOlderThanStmt.ExecContext(ctx, "-"+strconv.Itoa(int(gcRetentionDays))+" day")
res, err = si.stmts.removeEthHashesBeforeTimeStmt.ExecContext(ctx, gcTime.Unix())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do .Unix() here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to provide as accurate time as possible, so I choose unix.

if err != nil {
log.Errorf("failed to gc eth hashes older than %d days: %w", gcRetentionDays, err)
log.Errorf("failed to gc eth hashes before time %s: %w", gcTime.String(), err)
return
}

Expand All @@ -95,5 +103,5 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
return
}

log.Infof("gc'd %d eth hashes older than %d days", rows, gcRetentionDays)
log.Infof("gc'd %d eth hashes before time %s", rows, gcTime.String())
}
195 changes: 141 additions & 54 deletions chain/index/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,149 @@ import (
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/abi"
)

const (
epochOne = 1
epochTen = 10
epochFifty = 50
headEpoch = 60

validRetentionEpochs = 20
highRetentionEpochs = 100
lowRetentionEpochs = 1
)

func TestGC(t *testing.T) {
ctx := context.Background()
rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano()))

// head at height 60
// insert tipsets at heigh 1,10,50.
// retention epochs is 20
si, _, _ := setupWithHeadIndexed(t, 60, rng)
si.gcRetentionEpochs = 20
defer func() { _ = si.Close() }()

tsCid1 := randomCid(t, rng)
tsCid10 := randomCid(t, rng)
tsCid50 := randomCid(t, rng)

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid1.Bytes(),
height: 1,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid10.Bytes(),
height: 10,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid50.Bytes(),
height: 50,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

si.gc(ctx)

// tipset at height 1 and 10 should be removed
var count int
err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 1").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 10").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

// tipset at height 50 should not be removed
err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 50").Scan(&count)
require.NoError(t, err)
require.Equal(t, 1, count)
type tipsetData struct {
height abi.ChainEpoch
reverted bool
}

tests := []struct {
name string
headHeight abi.ChainEpoch
gcRetentionEpochs int64
tipsets []tipsetData
expectedEpochTipsetDataCounts map[abi.ChainEpoch]int // expected data count(tipsetMsg, event, eventEntry), for each epoch
expectedEthTxHashCount int // expected eth tx hash count after gc
}{
{
name: "Basic GC with some tipsets removed",
headHeight: headEpoch,
gcRetentionEpochs: validRetentionEpochs,
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 0, // Should be removed
epochTen: 0, // Should be removed
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 1, // Only the entry for height 50 should remain
},
{
name: "No GC when retention epochs is high",
headHeight: headEpoch,
gcRetentionEpochs: highRetentionEpochs,
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 1, // Should remain
epochTen: 1, // Should remain
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 3, // All entries should remain
},
{
name: "No GC when gcRetentionEpochs is zero",
headHeight: headEpoch,
gcRetentionEpochs: 0,
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 1, // Should remain
epochTen: 1, // Should remain
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 3, // All entries should remain
},
{
name: "GC should remove tipsets that are older than gcRetentionEpochs + gracEpochs",
headHeight: headEpoch,
gcRetentionEpochs: lowRetentionEpochs, // headHeight - gcRetentionEpochs - graceEpochs = 60 - 5 - 10 = 45 (removalEpoch)
tipsets: []tipsetData{
{height: 1, reverted: false},
{height: 10, reverted: false},
{height: 50, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 0, // Should be removed
epochTen: 0, // Should be removed
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 1, // Only the entry for height 50 should remain
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
genesisTime := time.Now()
rng := pseudo.New(pseudo.NewSource(genesisTime.UnixNano()))

si, _, _ := setupWithHeadIndexed(t, tt.headHeight, rng)
si.gcRetentionEpochs = tt.gcRetentionEpochs
defer func() { _ = si.Close() }()

tipsetKeyCids := make(map[abi.ChainEpoch]cid.Cid)

for _, tsData := range tt.tipsets {
t.Logf("inserting tipset at height %d", tsData.height)

tsKeyCid, _, _ := insertRandomTipsetAtHeight(t, si, uint64(tsData.height), tsData.reverted, genesisTime)
tipsetKeyCids[tsData.height] = tsKeyCid
}

si.gc(ctx)

for height, expectedCount := range tt.expectedEpochTipsetDataCounts {
var count int

err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = ?", height).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count, "Unexpected tipset_message count for height %d", height)

tsKeyCid := tipsetKeyCids[height]
err = si.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tsKeyCid.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count, "Unexpected events count for height %d", height)

err = si.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tsKeyCid.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count, "Unexpected event_entries count for height %d", height)
}

var ethTxHashCount int
err := si.db.QueryRow("SELECT COUNT(*) FROM eth_tx_hash").Scan(&ethTxHashCount)
require.NoError(t, err)
require.Equal(t, tt.expectedEthTxHashCount, ethTxHashCount, "Unexpected eth_tx_hash count")

t.Cleanup(func() {
cleanup(t, si)
})
})
}
}
Loading
Loading