Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchpoints: Add onlineaccounts and onlineroundparamstail tables to snapshot files #6177

Merged
merged 25 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d775659
Add onlineaccounts and onlineroundparamstail to catchpoint files
cce Nov 22, 2024
39f25db
Add test demonstrating in-memory sqlite DB retries causing merkle tri…
cce Dec 4, 2024
9b0f9e9
Add retry clear callback
cce Dec 4, 2024
1e736fe
change names
cce Dec 4, 2024
7cc08cc
fmt
cce Dec 4, 2024
a101d07
add dualdriver implementation of TransactionContextWithRetryClearFn
cce Dec 4, 2024
ae67ab8
Update mockDB in trackerdb testsuite
cce Dec 5, 2024
865d018
Update pebbledriver.go comment
cce Dec 5, 2024
33af474
fix race by removing deadlock detection disable
cce Dec 5, 2024
c2c3e4f
Merge remote-tracking branch 'origin/retry-rollback-ledger' into catc…
cce Dec 5, 2024
abf5853
move ledgerTracker advanced error handling methods to ledgerTrackerEx…
cce Dec 6, 2024
b196586
make TestCatchpointAfterStakeLookupTxns work
cce Dec 11, 2024
c11197b
Merge remote-tracking branch 'upstream/master' into catchpoint-online…
cce Dec 11, 2024
49c3067
Merge remote-tracking branch 'origin/catchpoint-onlineaccts-separateh…
cce Dec 11, 2024
4baad21
Update ledger/catchupaccessor.go
cce Dec 17, 2024
48d76af
add more assertions to unit test
cce Dec 12, 2024
8adddbc
update TestCatchpointFastUpdates to give one more kick if DB commits …
cce Dec 19, 2024
330c344
use future
cce Dec 19, 2024
8c81163
use future in state proof catchpoint tests
cce Dec 19, 2024
544b0b0
add TODO in TestFullCatchpointWriterOverflowAccounts and testNewLedge…
cce Dec 19, 2024
5fb737c
code review
cce Dec 19, 2024
998f2fd
Merge remote-tracking branch 'upstream/master' into catchpoint-online…
cce Dec 19, 2024
1fa610d
update TestCatchpointAfterStakeLookupTxns for TEAL R-320
cce Dec 19, 2024
5c9bc2d
Update ledger/catchpointtracker_test.go
cce Dec 19, 2024
767e369
Merge remote-tracking branch 'upstream/master' into catchpoint-online…
cce Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catchup/catchpointService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (m *catchpointCatchupAccessorMock) Ledger() (l ledger.CatchupAccessorClient
}

// GetVerifyData returns the balances hash, spver hash and totals used by VerifyCatchpoint
func (m *catchpointCatchupAccessorMock) GetVerifyData(ctx context.Context) (balancesHash crypto.Digest, spverHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
func (m *catchpointCatchupAccessorMock) GetVerifyData(ctx context.Context) (balancesHash, spverHash, onlineAccountsHash, onlineRoundParamsHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
}

// TestCatchpointServicePeerRank ensures CatchpointService does not crash when a block fetched
Expand Down
7 changes: 4 additions & 3 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@
if err != nil {
return fileHeader, err
}
var balanceHash, spverHash crypto.Digest
balanceHash, spverHash, _, err = catchupAccessor.GetVerifyData(ctx)
var balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash crypto.Digest
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash, _, err = catchupAccessor.GetVerifyData(ctx)

Check warning on line 218 in cmd/catchpointdump/file.go

View check run for this annotation

Codecov / codecov/patch

cmd/catchpointdump/file.go#L217-L218

Added lines #L217 - L218 were not covered by tests
if err != nil {
return fileHeader, err
}
fmt.Printf("accounts digest=%s, spver digest=%s\n\n", balanceHash, spverHash)
fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n\n",
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash)

Check warning on line 223 in cmd/catchpointdump/file.go

View check run for this annotation

Codecov / codecov/patch

cmd/catchpointdump/file.go#L222-L223

Added lines #L222 - L223 were not covered by tests
}
return fileHeader, nil
}
Expand Down
4 changes: 2 additions & 2 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
}

// GetVerifyData returns the balances hash, spver hash and totals used by VerifyCatchpoint
func (m *MockCatchpointCatchupAccessor) GetVerifyData(ctx context.Context) (balancesHash crypto.Digest, spverHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
func (m *MockCatchpointCatchupAccessor) GetVerifyData(ctx context.Context) (balancesHash, spverHash, onlineAccountsHash, onlineRoundParams crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil

Check warning on line 75 in components/mocks/mockCatchpointCatchupAccessor.go

View check run for this annotation

Codecov / codecov/patch

components/mocks/mockCatchpointCatchupAccessor.go#L74-L75

Added lines #L74 - L75 were not covered by tests
}

// VerifyCatchpoint verifies that the catchpoint is valid by reconstructing the label.
Expand Down
6 changes: 6 additions & 0 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ type ConsensusParams struct {
// Version 7 includes state proof verification contexts
EnableCatchpointsWithSPContexts bool

// EnableCatchpointsWithOnlineAccounts specifies when to enable version 8 catchpoints.
// Version 8 includes onlineaccounts and onlineroundparams amounts, for historical stake lookups.
EnableCatchpointsWithOnlineAccounts bool
cce marked this conversation as resolved.
Show resolved Hide resolved

// AppForbidLowResources enforces a rule that prevents apps from accessing
// asas and apps below 256, in an effort to decrease the ambiguity of
// opcodes that accept IDs or slot indexes. Simultaneously, the first ID
Expand Down Expand Up @@ -1532,6 +1536,8 @@ func initConsensusProtocols() {
// 2.9 sec rounds gives about 10.8M rounds per year.
vFuture.Bonus.DecayInterval = 250_000 // .99^(10.8/0.25) ~ .648. So 35% decay per year

vFuture.EnableCatchpointsWithOnlineAccounts = true

Consensus[protocol.ConsensusFuture] = vFuture

// vAlphaX versions are an separate series of consensus parameters and versions for alphanet
Expand Down
20 changes: 11 additions & 9 deletions ledger/catchpointfileheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
type CatchpointFileHeader struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
TotalKVs uint64 `codec:"kvsCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
TotalKVs uint64 `codec:"kvsCount"`
TotalOnlineAccounts uint64 `codec:"onlineAccountsCount"`
TotalOnlineRoundParams uint64 `codec:"onlineRoundParamsCount"`
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
}
197 changes: 141 additions & 56 deletions ledger/catchpointfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,29 @@
// the writing is complete. It might take multiple steps until the operation is over, and the caller
// has the option of throttling the CPU utilization in between the calls.
type catchpointFileWriter struct {
ctx context.Context
tx trackerdb.SnapshotScope
filePath string
totalAccounts uint64
totalKVs uint64
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
accountsIterator accountsBatchIter
maxResourcesPerChunk int
accountsDone bool
kvRows kvIter
}

type kvIter interface {
Next() bool
KeyValue() ([]byte, []byte, error)
Close()
}

type accountsBatchIter interface {
Next(ctx context.Context, accountCount int, resourceCount int) ([]encoded.BalanceRecordV6, uint64, error)
Close()
ctx context.Context
tx trackerdb.SnapshotScope
filePath string
totalAccounts uint64
totalKVs uint64
totalOnlineAccounts uint64
cce marked this conversation as resolved.
Show resolved Hide resolved
totalOnlineRoundParams uint64
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
accountsIterator trackerdb.EncodedAccountsBatchIter
maxResourcesPerChunk int
accountsDone bool
kvRows trackerdb.KVsIter
kvDone bool
onlineAccountRows trackerdb.TableIterator[*encoded.OnlineAccountRecordV6]
onlineAccountsDone bool
onlineRoundParamsRows trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6]
onlineRoundParamsDone bool
}

type catchpointFileBalancesChunkV5 struct {
Expand All @@ -88,13 +84,15 @@
type catchpointFileChunkV6 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
numAccounts uint64
KVs []encoded.KVRecordV6 `codec:"kv,allocbound=BalancesPerCatchpointFileChunk"`
Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
numAccounts uint64
KVs []encoded.KVRecordV6 `codec:"kv,allocbound=BalancesPerCatchpointFileChunk"`
OnlineAccounts []encoded.OnlineAccountRecordV6 `codec:"oa,allocbound=BalancesPerCatchpointFileChunk"`
cce marked this conversation as resolved.
Show resolved Hide resolved
OnlineRoundParams []encoded.OnlineRoundParamsRecordV6 `codec:"orp,allocbound=BalancesPerCatchpointFileChunk"`
}

func (chunk catchpointFileChunkV6) empty() bool {
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0 && len(chunk.OnlineAccounts) == 0 && len(chunk.OnlineRoundParams) == 0
cce marked this conversation as resolved.
Show resolved Hide resolved
}

type catchpointStateProofVerificationContext struct {
Expand Down Expand Up @@ -122,6 +120,16 @@
return nil, err
}

totalOnlineAccounts, err := aw.TotalOnlineAccountRows(ctx)
if err != nil {
return nil, err

Check warning on line 125 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L125

Added line #L125 was not covered by tests
}

totalOnlineRoundParams, err := aw.TotalOnlineRoundParams(ctx)
if err != nil {
return nil, err

Check warning on line 130 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L130

Added line #L130 was not covered by tests
}

err = os.MkdirAll(filepath.Dir(filePath), 0700)
if err != nil {
return nil, err
Expand All @@ -137,16 +145,18 @@
tar := tar.NewWriter(compressor)

res := &catchpointFileWriter{
ctx: ctx,
tx: tx,
filePath: filePath,
totalAccounts: totalAccounts,
totalKVs: totalKVs,
file: file,
compressor: compressor,
tar: tar,
accountsIterator: tx.MakeEncodedAccoutsBatchIter(),
maxResourcesPerChunk: maxResourcesPerChunk,
ctx: ctx,
tx: tx,
filePath: filePath,
totalAccounts: totalAccounts,
totalKVs: totalKVs,
totalOnlineAccounts: totalOnlineAccounts,
totalOnlineRoundParams: totalOnlineRoundParams,
file: file,
compressor: compressor,
tar: tar,
accountsIterator: tx.MakeEncodedAccountsBatchIter(),
maxResourcesPerChunk: maxResourcesPerChunk,
}
return res, nil
}
Expand Down Expand Up @@ -233,6 +243,14 @@
cw.kvRows.Close()
cw.kvRows = nil
}
if cw.onlineAccountRows != nil {
cw.onlineAccountRows.Close()
cw.onlineAccountRows = nil
}
if cw.onlineRoundParamsRows != nil {
cw.onlineRoundParamsRows.Close()
cw.onlineRoundParamsRows = nil
}
}
}()

Expand Down Expand Up @@ -323,27 +341,94 @@
cw.accountsDone = true
}

// Create the *Rows iterator JIT
if cw.kvRows == nil {
rows, err := cw.tx.MakeKVsIter(ctx)
if err != nil {
return err
// Create the kvRows iterator JIT
if !cw.kvDone {
if cw.kvRows == nil {
rows, err := cw.tx.MakeKVsIter(ctx)
if err != nil {
return err

Check warning on line 349 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L349

Added line #L349 was not covered by tests
}
cw.kvRows = rows
}

kvrs := make([]encoded.KVRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.kvRows.Next() {
k, v, err := cw.kvRows.KeyValue()
if err != nil {
return err

Check warning on line 358 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L358

Added line #L358 was not covered by tests
}
kvrs = append(kvrs, encoded.KVRecordV6{Key: k, Value: v})
if len(kvrs) == BalancesPerCatchpointFileChunk {
break

Check warning on line 362 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L362

Added line #L362 was not covered by tests
}
}
if len(kvrs) > 0 {
cw.chunk = catchpointFileChunkV6{KVs: kvrs}
return nil
}
cw.kvRows = rows
// Do not close kvRows here, or it will start over on the next iteration
cw.kvDone = true
}

kvrs := make([]encoded.KVRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.kvRows.Next() {
k, v, err := cw.kvRows.KeyValue()
if err != nil {
return err
if !cw.onlineAccountsDone {
// Create the OnlineAccounts iterator JIT
if cw.onlineAccountRows == nil {
rows, err := cw.tx.MakeOnlineAccountsIter(ctx)
if err != nil {
return err

Check warning on line 378 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L378

Added line #L378 was not covered by tests
}
cw.onlineAccountRows = rows
}
kvrs = append(kvrs, encoded.KVRecordV6{Key: k, Value: v})
if len(kvrs) == BalancesPerCatchpointFileChunk {
break

onlineAccts := make([]encoded.OnlineAccountRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.onlineAccountRows.Next() {
oa, err := cw.onlineAccountRows.GetItem()
if err != nil {
return err

Check warning on line 387 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L387

Added line #L387 was not covered by tests
}
onlineAccts = append(onlineAccts, *oa)
if len(onlineAccts) == BalancesPerCatchpointFileChunk {
break

Check warning on line 391 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L391

Added line #L391 was not covered by tests
}
}
if len(onlineAccts) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineAccounts: onlineAccts}
return nil
}
// Do not close onlineAccountRows here, or it will start over on the next iteration
cw.onlineAccountsDone = true
}

if !cw.onlineRoundParamsDone {
// Create the OnlineRoundParams iterator JIT
if cw.onlineRoundParamsRows == nil {
rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx)
if err != nil {
return err

Check warning on line 407 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L407

Added line #L407 was not covered by tests
}
cw.onlineRoundParamsRows = rows
}

onlineRndParams := make([]encoded.OnlineRoundParamsRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.onlineRoundParamsRows.Next() {
or, err := cw.onlineRoundParamsRows.GetItem()
if err != nil {
return err

Check warning on line 416 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L416

Added line #L416 was not covered by tests
}
onlineRndParams = append(onlineRndParams, *or)
if len(onlineRndParams) == BalancesPerCatchpointFileChunk {
break
}
}
if len(onlineRndParams) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineRoundParams: onlineRndParams}
return nil
}
// Do not close onlineRndParamsRows here, or it will start over on the next iteration
cw.onlineRoundParamsDone = true
}
cw.chunk = catchpointFileChunkV6{KVs: kvrs}

// Finished the last chunk
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions ledger/catchpointfilewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func TestExactAccountChunk(t *testing.T) {
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, cph.TotalChunks, 1)
require.EqualValues(t, cph.TotalChunks, 2)
cce marked this conversation as resolved.
Show resolved Hide resolved

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, 2, cph.TotalChunks)
require.EqualValues(t, 3, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand All @@ -922,7 +922,7 @@ func TestCatchpointAfterTxns(t *testing.T) {

// Write and read back in, and ensure even the last effect exists.
cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, cph.TotalChunks, 2) // Still only 2 chunks, as last was in a recent block
require.EqualValues(t, cph.TotalChunks, 3) // Still only 3 chunks, as last was in a recent block

// Drive home the point that `last` is _not_ included in the catchpoint by inspecting balance read from catchpoint.
{
Expand All @@ -938,7 +938,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
}

cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, cph.TotalChunks, 3)
require.EqualValues(t, cph.TotalChunks, 4)

l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func TestCatchpointAfterBoxTxns(t *testing.T) {
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, 2, cph.TotalChunks)
require.EqualValues(t, 3, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down
Loading
Loading