Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ChainIndexer: improve sqllite query handling #12485

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ var ddls = []string{
// the preparedStatements struct.
func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
return map[**sql.Stmt]string{
&ps.getNonRevertedMsgInfoStmt: "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0",
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ?",
&ps.getNonRevertedMsgInfoStmt: "SELECT tipset_key_cid, height FROM tipset_message WHERE message_cid = ? AND reverted = 0 LIMIT 1",
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tx_hash is the primary key LIMIT 1 is not required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dosent hurt to have it there.

&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
Expand All @@ -75,7 +75,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.hasNonRevertedTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.updateEventsToRevertedStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateEventsToNonRevertedStmt: "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0",
&ps.getMsgIdForMsgCidAndTipsetStmt: "SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0 LIMIT 1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tipset_message table has unique constraint on (tipset_key_cid, message_cid), it will only return one row, so we can remove LIMIT 1 here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just there for redundancy.

&ps.insertEventStmt: "INSERT INTO event (message_id, event_index, emitter_addr, reverted) VALUES (?, ?, ?, ?)",
Copy link
Contributor

@akaladarshi akaladarshi Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add On Conflict in InsertEventStmt?.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in #12450.

&ps.insertEventEntryStmt: "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)",
}
Expand Down
6 changes: 3 additions & 3 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (si *SqliteIndexer) indexEvents(ctx context.Context, tx *sql.Tx, msgTs *typ

// read message id for this message cid and tipset key cid
var messageID int64
if err := tx.Stmt(si.stmts.getMsgIdForMsgCidAndTipsetStmt).QueryRow(msgTsKeyCidBytes, msgCidBytes).Scan(&messageID); err != nil {
if err := tx.Stmt(si.stmts.getMsgIdForMsgCidAndTipsetStmt).QueryRowContext(ctx, msgTsKeyCidBytes, msgCidBytes).Scan(&messageID); err != nil {
return xerrors.Errorf("failed to get message id for message cid and tipset key cid: %w", err)
}
if messageID == 0 {
Expand All @@ -92,7 +92,7 @@ func (si *SqliteIndexer) indexEvents(ctx context.Context, tx *sql.Tx, msgTs *typ
}

// Insert event into events table
eventResult, err := tx.Stmt(si.stmts.insertEventStmt).Exec(messageID, eventCount, addr.Bytes(), 0)
eventResult, err := tx.Stmt(si.stmts.insertEventStmt).ExecContext(ctx, messageID, eventCount, addr.Bytes(), 0)
if err != nil {
return xerrors.Errorf("failed to insert event: %w", err)
}
Expand All @@ -105,7 +105,7 @@ func (si *SqliteIndexer) indexEvents(ctx context.Context, tx *sql.Tx, msgTs *typ

// Insert event entries
for _, entry := range event.Entries {
_, err := tx.Stmt(si.stmts.insertEventEntryStmt).Exec(
_, err := tx.Stmt(si.stmts.insertEventEntryStmt).ExecContext(ctx,
eventID,
isIndexedValue(entry.Flags),
[]byte{entry.Flags},
Expand Down
2 changes: 1 addition & 1 deletion chain/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
}

log.Infof("gc'ing eth hashes older than %d days", gcRetentionDays)
res, err = si.stmts.removeEthHashesOlderThanStmt.Exec("-" + strconv.Itoa(int(gcRetentionDays)) + " day")
res, err = si.stmts.removeEthHashesOlderThanStmt.ExecContext(ctx, "-"+strconv.Itoa(int(gcRetentionDays))+" day")
if err != nil {
log.Errorf("failed to gc eth hashes older than %d days: %w", gcRetentionDays, err)
return
Expand Down
42 changes: 26 additions & 16 deletions chain/index/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,9 @@ func (si *SqliteIndexer) ReconcileWithChain(ctx context.Context, head *types.Tip
return si.backfillIndex(ctx, tx, head, 0)
}

// Find the minimum applied tipset in the index; this will mark the absolute min height of the reconciliation walk
var reconciliationEpochInIndex sql.NullInt64
var reconciliationEpoch abi.ChainEpoch

row := tx.StmtContext(ctx, si.stmts.getMinNonRevertedHeightStmt).QueryRowContext(ctx)
if err := row.Scan(&reconciliationEpochInIndex); err != nil {
if err != sql.ErrNoRows {
return xerrors.Errorf("failed to scan minimum non-reverted height: %w", err)
}
log.Warn("index only contains reverted tipsets; setting reconciliation epoch to 0")
reconciliationEpoch = 0
} else if !reconciliationEpochInIndex.Valid {
log.Warn("index only contains reverted tipsets; setting reconciliation epoch to 0")
reconciliationEpoch = 0
} else {
reconciliationEpoch = abi.ChainEpoch(reconciliationEpochInIndex.Int64)
reconciliationEpoch, err := si.getReconciliationEpoch(ctx, tx)
if err != nil {
return xerrors.Errorf("failed to get reconciliation epoch: %w", err)
}

currTs := head
Expand Down Expand Up @@ -161,6 +148,29 @@ func (si *SqliteIndexer) ReconcileWithChain(ctx context.Context, head *types.Tip
})
}

func (si *SqliteIndexer) getReconciliationEpoch(ctx context.Context, tx *sql.Tx) (abi.ChainEpoch, error) {
var reconciliationEpochInIndex sql.NullInt64

err := tx.StmtContext(ctx, si.stmts.getMinNonRevertedHeightStmt).
QueryRowContext(ctx).
Scan(&reconciliationEpochInIndex)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the extra space

if err != nil {
if err == sql.ErrNoRows {
log.Warn("index only contains reverted tipsets; setting reconciliation epoch to 0")
return 0, nil
}
return 0, xerrors.Errorf("failed to scan minimum non-reverted height: %w", err)
}

if !reconciliationEpochInIndex.Valid {
log.Warn("index only contains reverted tipsets; setting reconciliation epoch to 0")
return 0, nil
}

return abi.ChainEpoch(reconciliationEpochInIndex.Int64), nil
}

// backfillIndex backfills the chain index with missing tipsets starting from the given head tipset
// and stopping after the specified stopAfter epoch (inclusive).
//
Expand Down
Loading