Skip to content

Commit

Permalink
add post order sync
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope committed Nov 16, 2023
1 parent f1364be commit 7b953bc
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 93 deletions.
47 changes: 43 additions & 4 deletions export.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,70 @@ package iavl

import "fmt"

// TraverseOrderType is the type of the order in which the tree is traversed.
type TraverseOrderType uint8

const (
PreOrder TraverseOrderType = iota
PostOrder
)

type Exporter struct {
tree *Tree
out chan *Node
errCh chan error
}

func (tree *Tree) ExportPreOrder() *Exporter {
func (tree *Tree) Export(order TraverseOrderType) *Exporter {
exporter := &Exporter{
tree: tree,
out: make(chan *Node),
errCh: make(chan error),
}
go func() {

go func(traverseOrder TraverseOrderType) {
defer close(exporter.out)
defer close(exporter.errCh)
exporter.preOrderNext(tree.root)
}()

if traverseOrder == PostOrder {
exporter.postOrderNext(tree.root)
} else if traverseOrder == PreOrder {
exporter.preOrderNext(tree.root)
}
}(order)

return exporter
}

func (e *Exporter) postOrderNext(node *Node) {
if node.isLeaf() {
e.out <- node
return
}

left, err := node.getLeftNode(e.tree)
if err != nil {
e.errCh <- err
return
}
e.postOrderNext(left)

right, err := node.getRightNode(e.tree)
if err != nil {
e.errCh <- err
return
}
e.postOrderNext(right)

e.out <- node
}

func (e *Exporter) preOrderNext(node *Node) {
e.out <- node
if node.isLeaf() {
return
}

left, err := node.getLeftNode(e.tree)
if err != nil {
e.errCh <- err
Expand Down
251 changes: 166 additions & 85 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (sql *SqliteDb) Snapshot(ctx context.Context, tree *Tree) error {
type SnapshotOptions struct {
StoreLeafValues bool
SaveTree bool
TraverseOrder TraverseOrderType
}

func NewIngestSnapshotConnection(snapshotDbPath string) (*sqlite3.Conn, error) {
Expand Down Expand Up @@ -252,93 +253,14 @@ func (sql *SqliteDb) WriteSnapshot(
}

var (
step func() (*Node, error)
maybeFlush func() error
count int
uniqueVersions = make(map[int64]struct{})
root *Node
uniqueVersions map[int64]struct{}
)
maybeFlush = func() error {
count++
if count%snap.batchSize == 0 {
if err = snap.flush(); err != nil {
return err
}
if err = snap.prepareWrite(); err != nil {
return err
}
}
return nil
}

step = func() (*Node, error) {
snapshotNode, err := nextFn()
if err != nil {
return nil, err
}
ordinal := snap.ordinal
snap.ordinal++

node := &Node{
key: snapshotNode.Key,
subtreeHeight: snapshotNode.Height,
nodeKey: NewNodeKey(snapshotNode.Version, uint32(ordinal)),
}
if node.subtreeHeight == 0 {
node.value = snapshotNode.Value
node.size = 1
node._hash(snapshotNode.Version)
if !opts.StoreLeafValues {
node.value = nil
}
nodeBz, err := node.Bytes()
if err != nil {
return nil, err
}
if err = snap.snapshotInsert.Exec(ordinal, snapshotNode.Version, ordinal, nodeBz); err != nil {
return nil, err
}
if err = snap.leafInsert.Exec(snapshotNode.Version, ordinal, nodeBz); err != nil {
return nil, err
}
if err = maybeFlush(); err != nil {
return nil, err
}
return node, nil
}

node.leftNode, err = step()
if err != nil {
return nil, err
}
node.leftNodeKey = node.leftNode.nodeKey
node.rightNode, err = step()
if err != nil {
return nil, err
}
node.rightNodeKey = node.rightNode.nodeKey

node.size = node.leftNode.size + node.rightNode.size
node._hash(snapshotNode.Version)
node.leftNode = nil
node.rightNode = nil

nodeBz, err := node.Bytes()
if err != nil {
return nil, err
}
if err = snap.snapshotInsert.Exec(ordinal, snapshotNode.Version, ordinal, nodeBz); err != nil {
return nil, err
}
if err = snap.treeInsert.Exec(snapshotNode.Version, ordinal, nodeBz); err != nil {
return nil, err
}
uniqueVersions[snapshotNode.Version] = struct{}{}
if err = maybeFlush(); err != nil {
return nil, err
}
return node, nil
if opts.TraverseOrder == PostOrder {
root, uniqueVersions, err = snap.restorePostOrderStep(nextFn, opts.StoreLeafValues)
} else if opts.TraverseOrder == PreOrder {
root, uniqueVersions, err = snap.restorePreOrderStep(nextFn, opts.StoreLeafValues)
}
root, err := step()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -592,6 +514,165 @@ func (snap *sqliteSnapshot) prepareWrite() error {
return err
}

func (snap *sqliteSnapshot) restorePostOrderStep(nextFn func() (*SnapshotNode, error), isStoreLeafValues bool) (*Node, map[int64]struct{}, error) {
var (
snapshotNode *SnapshotNode
err error
count int
stack []*Node
uniqueVersions = make(map[int64]struct{})
)

for {
snapshotNode, err = nextFn()
if err != nil || snapshotNode == nil {
break
}

uniqueVersions[snapshotNode.Version] = struct{}{}
node := &Node{
key: snapshotNode.Key,
subtreeHeight: snapshotNode.Height,
nodeKey: NewNodeKey(snapshotNode.Version, uint32(snap.ordinal)),
}

stackSize := len(stack)
if node.isLeaf() {
node.value = snapshotNode.Value
node.size = 1
node._hash(snapshotNode.Version)
if !isStoreLeafValues {
node.value = nil
}

count++
if err := snap.writeSnapNode(node, snapshotNode.Version, count); err != nil {
return nil, nil, err
}
} else if stackSize >= 2 && stack[stackSize-1].subtreeHeight < node.subtreeHeight && stack[stackSize-2].subtreeHeight < node.subtreeHeight {
node.leftNode = stack[stackSize-2]
node.leftNodeKey = node.leftNode.nodeKey
node.rightNode = stack[stackSize-1]
node.rightNodeKey = node.rightNode.nodeKey
node.size = node.leftNode.size + node.rightNode.size
node._hash(snapshotNode.Version)
stack = stack[:stackSize-2]

node.leftNode = nil
node.rightNode = nil

count++
if err := snap.writeSnapNode(node, snapshotNode.Version, count); err != nil {
return nil, nil, err
}
}

stack = append(stack, node)
snap.ordinal++
}

if err != nil && !errors.Is(err, ErrorExportDone) {
return nil, nil, err
}

if len(stack) != 1 {
return nil, nil, fmt.Errorf("expected stack size 1, got %d", len(stack))
}

return stack[0], uniqueVersions, nil
}

func (snap *sqliteSnapshot) restorePreOrderStep(nextFn func() (*SnapshotNode, error), isStoreLeafValues bool) (*Node, map[int64]struct{}, error) {
var (
count int
step func() (*Node, error)
uniqueVersions = make(map[int64]struct{})
)

step = func() (*Node, error) {
snapshotNode, err := nextFn()
if err != nil {
return nil, err
}

node := &Node{
key: snapshotNode.Key,
subtreeHeight: snapshotNode.Height,
nodeKey: NewNodeKey(snapshotNode.Version, uint32(snap.ordinal)),
}

if node.isLeaf() {
node.value = snapshotNode.Value
node.size = 1
node._hash(snapshotNode.Version)
if !isStoreLeafValues {
node.value = nil
}
} else {
node.leftNode, err = step()
if err != nil {
return nil, err
}
node.leftNodeKey = node.leftNode.nodeKey
node.rightNode, err = step()
if err != nil {
return nil, err
}
node.rightNodeKey = node.rightNode.nodeKey

node.size = node.leftNode.size + node.rightNode.size
node._hash(snapshotNode.Version)
node.leftNode = nil
node.rightNode = nil
uniqueVersions[snapshotNode.Version] = struct{}{}
}

count++
if err := snap.writeSnapNode(node, snapshotNode.Version, count); err != nil {
return nil, err
}
snap.ordinal++

return node, nil
}

node, err := step()

return node, uniqueVersions, err
}

func (snap *sqliteSnapshot) writeSnapNode(node *Node, version int64, count int) error {
ordinal := snap.ordinal

nodeBz, err := node.Bytes()
if err != nil {
return err
}
if err = snap.snapshotInsert.Exec(ordinal, version, ordinal, nodeBz); err != nil {
return err
}
if node.isLeaf() {
if err = snap.leafInsert.Exec(version, ordinal, nodeBz); err != nil {
return err
}
} else {
if err = snap.treeInsert.Exec(version, ordinal, nodeBz); err != nil {
return err
}
}

if count%snap.batchSize == 0 {
if err := snap.flush(); err != nil {
return err
}
if err := snap.prepareWrite(); err != nil {
return err
}
}

return nil
}

func rehashTree(node *Node) {
if node.isLeaf() {
return
Expand Down
8 changes: 4 additions & 4 deletions tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ func TestTree_Hash(t *testing.T) {
opts.Until = 100
opts.UntilHash = "0101e1d6f3158dcb7221acd7ed36ce19f2ef26847ffea7ce69232e362539e5cf"

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
_, cancel := context.WithCancel(context.Background())

testStart := time.Now()
multiTree := NewMultiTree(tmpDir, TreeOptions{CheckpointInterval: 10})
Expand Down Expand Up @@ -205,13 +204,14 @@ func TestTree_Build_Load(t *testing.T) {

// export the tree at version 12,000 and import it into a sql db
ctx := context.Background()
traverseOrder := PreOrder
restoreMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000})
for sk, tree := range multiTree.Trees {
require.NoError(t, restoreMt.MountTree(sk))
exporter := tree.ExportPreOrder()
exporter := tree.Export(traverseOrder)

restoreTree := restoreMt.Trees[sk]
_, err := restoreTree.sql.WriteSnapshot(ctx, tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true})
_, err := restoreTree.sql.WriteSnapshot(ctx, tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true, TraverseOrder: traverseOrder})
require.NoError(t, err)
require.NoError(t, restoreTree.LoadSnapshot(tree.Version()))
}
Expand Down

0 comments on commit 7b953bc

Please sign in to comment.