-
Notifications
You must be signed in to change notification settings - Fork 273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: iavl v2 #872
feat: iavl v2 #872
Conversation
is this ready for review? |
- commit ref: 34d92ce
WalkthroughThe update enhances the Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
name: "osmo like many", | ||
versions: []int64{1, 51, 101, 151, 201, 251, 301, 351, 401}, | ||
find: 38, | ||
next: 51, | ||
prev: 1, | ||
}, | ||
{ | ||
name: "osmo like many", | ||
versions: []int64{1, 51, 101, 151, 201, 251, 301, 351, 401}, | ||
find: 408, | ||
next: -1, | ||
prev: 401, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having two test cases with the same name "osmo like many" could potentially lead to confusion or unintended behavior in test reporting tools. Consider renaming one of the test cases for clarity and to ensure unique identification in test results.
- name: "osmo like many",
+ name: "osmo like many high",
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
name: "osmo like many", | |
versions: []int64{1, 51, 101, 151, 201, 251, 301, 351, 401}, | |
find: 38, | |
next: 51, | |
prev: 1, | |
}, | |
{ | |
name: "osmo like many", | |
versions: []int64{1, 51, 101, 151, 201, 251, 301, 351, 401}, | |
find: 408, | |
next: -1, | |
prev: 401, | |
}, | |
name: "osmo like many", | |
versions: []int64{1, 51, 101, 151, 201, 251, 301, 351, 401}, | |
find: 38, | |
next: 51, | |
prev: 1, | |
}, | |
{ | |
name: "osmo like many high", | |
versions: []int64{1, 51, 101, 151, 201, 251, 301, 351, 401}, | |
find: 408, | |
next: -1, | |
prev: 401, | |
}, |
type pruneSignal struct { | ||
pruneVersion int64 | ||
checkpoints VersionRange | ||
} | ||
|
||
type saveSignal struct { | ||
batch *sqliteBatch | ||
root *Node | ||
version int64 | ||
wantCheckpoint bool | ||
} | ||
|
||
type saveResult struct { | ||
n int64 | ||
err error | ||
} | ||
|
||
type sqlWriter struct { | ||
sql *SqliteDb | ||
logger zerolog.Logger | ||
|
||
treePruneCh chan *pruneSignal | ||
treeCh chan *saveSignal | ||
treeResult chan *saveResult | ||
|
||
leafPruneCh chan *pruneSignal | ||
leafCh chan *saveSignal | ||
leafResult chan *saveResult | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sqlWriter
struct is well-structured, encapsulating channels for prune and save signals for both tree and leaf nodes, which facilitates clear separation of concerns. However, consider documenting the purpose of each channel and how they interact within the sqlWriter
operations to improve code readability and maintainability.
func (w *sqlWriter) start(ctx context.Context) { | ||
go func() { | ||
err := w.treeLoop(ctx) | ||
if err != nil { | ||
w.logger.Fatal().Err(err).Msg("tree loop failed") | ||
} | ||
}() | ||
go func() { | ||
err := w.leafLoop(ctx) | ||
if err != nil { | ||
w.logger.Fatal().Err(err).Msg("leaf loop failed") | ||
} | ||
}() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of goroutines in the start
method for treeLoop
and leafLoop
is a good approach for parallelizing tree and leaf operations. However, the use of logger.Fatal()
within these goroutines on error conditions is concerning. This will terminate the entire application, which might not be desirable in all cases. Consider using a less severe logging level and returning the error to a higher level for more graceful error handling.
v2/sqlite_writer.go
Outdated
return err | ||
} | ||
|
||
return nil | ||
} | ||
stepPruning := func() error { | ||
hasRow, err := orphanQuery.Step() | ||
if err != nil { | ||
return fmt.Errorf("failed to step leaf orphan query; %w", err) | ||
} | ||
if hasRow { | ||
pruneCount++ | ||
var ( | ||
version int64 | ||
sequence int | ||
rowID int64 | ||
) | ||
err = orphanQuery.Scan(&version, &sequence, &rowID) | ||
if err != nil { | ||
return err | ||
} | ||
if err = deleteLeaf.Exec(version, sequence); err != nil { | ||
return err | ||
} | ||
if err = deleteOrphan.Exec(rowID); err != nil { | ||
return err | ||
} | ||
if pruneCount%pruneBatchSize == 0 { | ||
if err = commitOrphaned(); err != nil { | ||
return err | ||
} | ||
if err = resetOrphanQuery(pruneVersion); err != nil { | ||
return err | ||
} | ||
} | ||
} else { | ||
if err = commitOrphaned(); err != nil { | ||
return err | ||
} | ||
if nextPruneVersion != 0 { | ||
if err = startPrune(nextPruneVersion); err != nil { | ||
return err | ||
} | ||
nextPruneVersion = 0 | ||
} else { | ||
pruneVersion = 0 | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
saveLeaves := func(sig *saveSignal) { | ||
res := &saveResult{} | ||
res.n, res.err = sig.batch.saveLeaves() | ||
if sig.batch.isCheckpoint() { | ||
if err = w.sql.leafWrite.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { | ||
w.logger.Err(err).Msg("failed leaf wal_checkpoint") | ||
} | ||
} | ||
w.leafResult <- res | ||
} | ||
for { | ||
if pruneVersion != 0 { | ||
select { | ||
case sig := <-w.leafCh: | ||
if err = commitOrphaned(); err != nil { | ||
return fmt.Errorf("interrupt leaf prune failed in commit; %w", err) | ||
} | ||
saveLeaves(sig) | ||
if err = startPrune(pruneVersion); err != nil { | ||
return fmt.Errorf("restart leaf prune failed in startPrune; %w", err) | ||
} | ||
case sig := <-w.leafPruneCh: | ||
w.logger.Warn().Msgf("leaf prune signal received while pruning version=%d next=%d", pruneVersion, sig.pruneVersion) | ||
checkpoints = sig.checkpoints | ||
nextPruneVersion = sig.pruneVersion | ||
case <-ctx.Done(): | ||
return nil | ||
default: | ||
err = stepPruning() | ||
if err != nil { | ||
return fmt.Errorf("failed to step pruning; %w", err) | ||
} | ||
} | ||
} else { | ||
select { | ||
case sig := <-w.leafCh: | ||
saveLeaves(sig) | ||
case sig := <-w.leafPruneCh: | ||
checkpoints = sig.checkpoints | ||
err = startPrune(sig.pruneVersion) | ||
if err != nil { | ||
return fmt.Errorf("failed to start leaf prune; %w", err) | ||
} | ||
case <-ctx.Done(): | ||
return nil | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The leafLoop
function is complex and handles multiple responsibilities, including pruning and saving leaves. While the logic appears sound, the complexity and length of this function could hinder maintainability. Consider refactoring to break down this function into smaller, more manageable pieces, such as separating the pruning logic and the saving logic into distinct methods. This would improve readability and make the code easier to maintain and test.
} | ||
} | ||
|
||
const pruneBatchSize = 500_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constant pruneBatchSize
is defined outside of the sqlWriter
struct. While this is not inherently problematic, consider if this value should be configurable or if it might vary between different instances of sqlWriter
. If so, moving it inside the struct or providing a way to set it during initialization could offer more flexibility.
func (mt *MultiTree) MountTree(storeKey string) error { | ||
opts := defaultSqliteDbOptions(SqliteDbOptions{ | ||
Path: mt.rootPath + "/" + storeKey, | ||
}) | ||
sql, err := NewSqliteDb(mt.pool, opts) | ||
if err != nil { | ||
return err | ||
} | ||
tree := NewTree(sql, mt.pool, mt.treeOpts) | ||
mt.Trees[storeKey] = tree | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MountTree
method is straightforward and correctly mounts a single tree. However, error handling could be improved by providing more context in the error messages, especially when database operations fail.
if err != nil {
- return err
+ return fmt.Errorf("failed to mount tree '%s': %w", storeKey, err)
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (mt *MultiTree) MountTree(storeKey string) error { | |
opts := defaultSqliteDbOptions(SqliteDbOptions{ | |
Path: mt.rootPath + "/" + storeKey, | |
}) | |
sql, err := NewSqliteDb(mt.pool, opts) | |
if err != nil { | |
return err | |
} | |
tree := NewTree(sql, mt.pool, mt.treeOpts) | |
mt.Trees[storeKey] = tree | |
return nil | |
} | |
func (mt *MultiTree) MountTree(storeKey string) error { | |
opts := defaultSqliteDbOptions(SqliteDbOptions{ | |
Path: mt.rootPath + "/" + storeKey, | |
}) | |
sql, err := NewSqliteDb(mt.pool, opts) | |
if err != nil { | |
return fmt.Errorf("failed to mount tree '%s': %w", storeKey, err) | |
} | |
tree := NewTree(sql, mt.pool, mt.treeOpts) | |
mt.Trees[storeKey] = tree | |
return nil | |
} |
func (mt *MultiTree) MountTrees() error { | ||
paths, err := FindDbsInPath(mt.rootPath) | ||
if err != nil { | ||
return err | ||
} | ||
for _, dbPath := range paths { | ||
prefix := filepath.Base(dbPath) | ||
sqlOpts := defaultSqliteDbOptions(SqliteDbOptions{}) | ||
sqlOpts.Path = dbPath | ||
log.Info().Msgf("mounting %s; opts %v", prefix, sqlOpts) | ||
sql, err := NewSqliteDb(mt.pool, sqlOpts) | ||
if err != nil { | ||
return err | ||
} | ||
tree := NewTree(sql, mt.pool, mt.treeOpts) | ||
mt.Trees[prefix] = tree | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MountTrees
method efficiently mounts all trees found in the root path. Similar to MountTree
, consider enhancing error messages for better debugging and maintenance.
func (mt *MultiTree) SaveVersionConcurrently() ([]byte, int64, error) { | ||
treeCount := 0 | ||
var workingSize atomic.Int64 | ||
var workingBytes atomic.Uint64 | ||
for _, tree := range mt.Trees { | ||
treeCount++ | ||
go func(t *Tree) { | ||
t.shouldCheckpoint = mt.shouldCheckpoint | ||
h, v, err := t.SaveVersion() | ||
workingSize.Add(t.workingSize) | ||
workingBytes.Add(t.workingBytes) | ||
if err != nil { | ||
mt.errorCh <- err | ||
} | ||
mt.doneCh <- saveVersionResult{version: v, hash: h} | ||
}(tree) | ||
} | ||
|
||
var ( | ||
errs []error | ||
version = int64(-1) | ||
) | ||
for i := 0; i < treeCount; i++ { | ||
select { | ||
case err := <-mt.errorCh: | ||
log.Error().Err(err).Msg("failed to save version") | ||
errs = append(errs, err) | ||
case result := <-mt.doneCh: | ||
if version != -1 && version != result.version { | ||
errs = append(errs, fmt.Errorf("unexpected; trees are at different versions: %d != %d", | ||
version, result.version)) | ||
} | ||
version = result.version | ||
} | ||
} | ||
mt.shouldCheckpoint = false | ||
|
||
if mt.treeOpts.MetricsProxy != nil { | ||
bz := workingBytes.Load() | ||
sz := workingSize.Load() | ||
fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n", | ||
version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory)) | ||
mt.treeOpts.MetricsProxy.SetGauge(float32(workingBytes.Load()), "iavl_v2", "working_bytes") | ||
mt.treeOpts.MetricsProxy.SetGauge(float32(workingSize.Load()), "iavl_v2", "working_size") | ||
} | ||
|
||
if mt.treeOpts.CheckpointMemory > 0 && workingBytes.Load() >= mt.treeOpts.CheckpointMemory { | ||
mt.shouldCheckpoint = true | ||
} | ||
|
||
return mt.Hash(), version, errors.Join(errs...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SaveVersionConcurrently
method introduces concurrent saving of tree versions, which can improve performance. However, the handling of errors from concurrent operations could be improved. Currently, all errors are collected, but the method returns only after all operations are attempted. In a scenario where an early operation fails, it might be more efficient to abort subsequent operations to save resources and time.
Consider adding a mechanism to abort early in case of errors, possibly using context cancellation or a similar approach.
func (mt *MultiTree) SnapshotConcurrently() error { | ||
treeCount := 0 | ||
for _, tree := range mt.Trees { | ||
treeCount++ | ||
go func(t *Tree) { | ||
if err := t.SaveSnapshot(); err != nil { | ||
mt.errorCh <- err | ||
} else { | ||
mt.doneCh <- saveVersionResult{} | ||
} | ||
}(tree) | ||
} | ||
|
||
var errs []error | ||
for i := 0; i < treeCount; i++ { | ||
select { | ||
case err := <-mt.errorCh: | ||
log.Error().Err(err).Msg("failed to snapshot") | ||
errs = append(errs, err) | ||
case <-mt.doneCh: | ||
} | ||
} | ||
return errors.Join(errs...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SnapshotConcurrently
method for concurrent snapshotting is well-structured. However, similar to SaveVersionConcurrently
, consider the potential for early termination in case of errors to optimize resource usage.
func (mt *MultiTree) WarmLeaves() error { | ||
var cnt int | ||
for _, tree := range mt.Trees { | ||
cnt++ | ||
go func(t *Tree) { | ||
if err := t.sql.WarmLeaves(); err != nil { | ||
mt.errorCh <- err | ||
} else { | ||
mt.doneCh <- saveVersionResult{} | ||
} | ||
}(tree) | ||
} | ||
for i := 0; i < cnt; i++ { | ||
select { | ||
case err := <-mt.errorCh: | ||
log.Error().Err(err).Msg("failed to warm leaves") | ||
return err | ||
case <-mt.doneCh: | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The WarmLeaves
method introduces concurrency for warming up the leaves of all trees. As with other concurrent methods, consider the impact of uncontrolled concurrency on system resources and the potential for early termination in case of errors.
errCh chan error | ||
} | ||
|
||
func (tree *Tree) Export(order TraverseOrderType) *Exporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go docs
}, | ||
} | ||
|
||
// decodeBytes decodes a varint length-prefixed byte slice, returning it along with the number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// decodeBytes decodes a varint length-prefixed byte slice, returning it along with the number | |
// DecodeBytes decodes a varint length-prefixed byte slice, returning it along with the number |
Rollback feature is missed (DeleteVersionsFrom in v1). |
how do we test v2 ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kocubinski, I just saw this change and here is a first pass review.
itr, err := getChangesetIterator(typ) | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing defer iter.Close()
} | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a defer to close the channel as well as wait on the wait group per
defer func() {
close(ch)
wg.Wait()
}()
go func() { | ||
stats, err := stream.Compact() | ||
if err != nil { | ||
log.Fatal().Err(err).Msg("failed to compact") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not recommended to invoke log.Fatal in a go routine, it doesn't give others time to cleanup.
var lastVersion int64 | ||
start := time.Now() | ||
for ; itr.Valid(); err = itr.Next() { | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to catch the end of the iteration, but this code just returns on any generic error
} | ||
|
||
changeset := itr.Nodes() | ||
for ; changeset.Valid(); err = changeset.Next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this code differentiate between the end of the iterator and another error? I don't see any special casing to break out of the loop
sql, err := iavl.NewSqliteDb(pool, sqlOpts) | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing sql.Close() on ending the function.
f, err := os.CreateTemp("", "iavl-v2-probe.sqlite") | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing defer f.Close()
if err = conn.Close(); err != nil { | ||
return err | ||
} | ||
if err = os.Remove(f.Name()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without f.Close() this code is undeterministic and might not have flushed data but even more on some systems, trying to delete an open file can be refused by the security policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you can simply invoke a return instead of special casing err != nil, vs nil error
for _, path := range paths { | ||
cnt++ | ||
sqlOpts := iavl.SqliteDbOptions{Path: path} | ||
sql, err := iavl.NewSqliteDb(pool, sqlOpts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handle is never closed on unrelated errors. Please close it after use perhaps with a defer, or in each error flow clause.
func (tree *Tree) Export(order TraverseOrderType) *Exporter { | ||
exporter := &Exporter{ | ||
tree: tree, | ||
out: make(chan *Node), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To speed this code up, please make these buffered channels with some capacity by means of a heuristic, so that traversal won't be constrained and blocked by the speed of reading out the nodes
gonna merge this as john is working iavlv2 cleann |
See also: https://hackmd.io/rXq7coSyTFmhWIa4mifBYQ
For a complete end to end test, this version of code is running on an osmosis node syncing on mainnet, some metrics are viewable at this Grafana. Sync started from block
13946655
after an export ofapplication.db
to an iavl v2 SQLite database with the code in./migrate
in this PR.Sync rates as fast as ~10 blocks per second are observed, most of the spikiness in commit rate is spent waiting on blocks from consensus, but I don't have a way to show this in telemetry. AFAIK this is about 2x from iavl v1 with osmosis, I don't see evidence that the osmosis build is I/O bound on disk either as benchmarks put max throughput 10-20x iavl v1.
This configuration is using 45G of disk storage, down from 177G.
Checkpoints are taken every 5k blocks, with a WAL maintained on each commit.
Pruning occurs every 900 blocks with the last 5k blocks kept, therefore the last checkpoint always contains a fully materialized tree.