diff --git a/exp/lighthorizon/index/types/bitmap.go b/exp/lighthorizon/index/types/bitmap.go index 619904dec2..346ede80a7 100644 --- a/exp/lighthorizon/index/types/bitmap.go +++ b/exp/lighthorizon/index/types/bitmap.go @@ -166,7 +166,9 @@ func (i *CheckpointIndex) Merge(other *CheckpointIndex) error { return err } -// NextActive returns the next checkpoint (inclusive) where this index is active. +// NextActive returns the next checkpoint (inclusive) where this index is +// active. "Inclusive" means that if the index is active at `checkpoint`, this +// returns `checkpoint`. func (i *CheckpointIndex) NextActive(checkpoint uint32) (uint32, error) { i.mutex.RLock() defer i.mutex.RUnlock() diff --git a/exp/lighthorizon/services/cursor.go b/exp/lighthorizon/services/cursor.go new file mode 100644 index 0000000000..673a57fce5 --- /dev/null +++ b/exp/lighthorizon/services/cursor.go @@ -0,0 +1,98 @@ +package services + +import ( + "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/toid" +) + +// CursorManager describes a way to control how a cursor advances for a +// particular indexing strategy. +type CursorManager interface { + Begin(cursor int64) (int64, error) + Advance() (int64, error) +} + +type AccountActivityCursorManager struct { + AccountId string + + store index.Store + lastCursor *toid.ID +} + +func NewCursorManagerForAccountActivity(store index.Store, accountId string) *AccountActivityCursorManager { + return &AccountActivityCursorManager{AccountId: accountId, store: store} +} + +func (c *AccountActivityCursorManager) Begin(cursor int64) (int64, error) { + freq := checkpointManager.GetCheckpointFrequency() + id := toid.Parse(cursor) + lastCheckpoint := uint32(0) + if id.LedgerSequence >= int32(checkpointManager.GetCheckpointFrequency()) { + lastCheckpoint = index.GetCheckpointNumber(uint32(id.LedgerSequence)) + } + + // We shouldn't take the provided cursor for granted: instead, we should + // skip ahead to the first active ledger that's >= the given cursor. + // + // For example, someone might say ?cursor=0 but the first active checkpoint + // is actually 40M ledgers in. + firstCheckpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, lastCheckpoint) + if err != nil { + return cursor, err + } + + nextLedger := (firstCheckpoint - 1) * freq + + // However, if the given cursor is actually *more* specific than the index + // can give us (e.g. somewhere *within* an active checkpoint range), prefer + // it rather than starting over. + if nextLedger < uint32(id.LedgerSequence) { + better := toid.Parse(cursor) + c.lastCursor = &better + return cursor, nil + } + + c.lastCursor = toid.New(int32(nextLedger), 1, 1) + return c.lastCursor.ToInt64(), nil +} + +func (c *AccountActivityCursorManager) Advance() (int64, error) { + if c.lastCursor == nil { + panic("invalid cursor, call Begin() first") + } + + // Advancing the cursor means deciding whether or not we need to query the + // index. + + lastLedger := uint32(c.lastCursor.LedgerSequence) + freq := checkpointManager.GetCheckpointFrequency() + + if checkpointManager.IsCheckpoint(lastLedger) { + // If the last cursor we looked at was a checkpoint ledger, then we need + // to jump ahead to the next checkpoint. Note that NextActive() is + // "inclusive" so if the parameter is an active checkpoint it will + // return itself. + checkpoint := index.GetCheckpointNumber(uint32(c.lastCursor.LedgerSequence)) + checkpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, checkpoint+1) + if err != nil { + return c.lastCursor.ToInt64(), err + } + + // We add a -1 here because an active checkpoint indicates that an + // account had activity in the *previous* 64 ledgers, so we need to + // backtrack to that ledger range. + c.lastCursor = toid.New(int32((checkpoint-1)*freq), 1, 1) + } else { + // Otherwise, we can just bump the ledger number. + c.lastCursor = toid.New(int32(lastLedger+1), 1, 1) + } + + return c.lastCursor.ToInt64(), nil +} + +var _ CursorManager = (*AccountActivityCursorManager)(nil) // ensure conformity to the interface + +// getLedgerFromCursor is a helpful way to turn a cursor into a ledger number +func getLedgerFromCursor(cursor int64) uint32 { + return uint32(toid.Parse(cursor).LedgerSequence) +} diff --git a/exp/lighthorizon/services/cursor_test.go b/exp/lighthorizon/services/cursor_test.go new file mode 100644 index 0000000000..4239c5105a --- /dev/null +++ b/exp/lighthorizon/services/cursor_test.go @@ -0,0 +1,78 @@ +package services + +import ( + "io" + "testing" + + "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/keypair" + "github.com/stellar/go/toid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + checkpointMgr = historyarchive.NewCheckpointManager(0) +) + +func TestAccountTransactionCursorManager(t *testing.T) { + freq := int32(checkpointMgr.GetCheckpointFrequency()) + accountId := keypair.MustRandom().Address() + + // Create an index and fill it with some checkpoint details. + store, err := index.NewFileStore(index.StoreConfig{ + Url: "file://" + t.TempDir(), + Workers: 4, + }) + require.NoError(t, err) + + for _, checkpoint := range []uint32{1, 5, 10} { + require.NoError(t, store.AddParticipantsToIndexes( + checkpoint, allTransactionsIndex, []string{accountId})) + } + + cursorMgr := NewCursorManagerForAccountActivity(store, accountId) + + cursor := toid.New(1, 1, 1) + var nextCursor int64 + + // first checkpoint works + nextCursor, err = cursorMgr.Begin(cursor.ToInt64()) + require.NoError(t, err) + assert.EqualValues(t, 1, getLedgerFromCursor(nextCursor)) + + // cursor is preserved if mid-active-range + cursor.LedgerSequence = freq / 2 + nextCursor, err = cursorMgr.Begin(cursor.ToInt64()) + require.NoError(t, err) + assert.EqualValues(t, cursor.LedgerSequence, getLedgerFromCursor(nextCursor)) + + // cursor jumps ahead if not active + cursor.LedgerSequence = 2 * freq + nextCursor, err = cursorMgr.Begin(cursor.ToInt64()) + require.NoError(t, err) + assert.EqualValues(t, 4*freq, getLedgerFromCursor(nextCursor)) + + for i := int32(1); i < freq; i++ { + nextCursor, err = cursorMgr.Advance() + require.NoError(t, err) + assert.EqualValues(t, 4*freq+i, getLedgerFromCursor(nextCursor)) + } + + // cursor jumps to next active checkpoint + nextCursor, err = cursorMgr.Advance() + require.NoError(t, err) + assert.EqualValues(t, 9*freq, getLedgerFromCursor(nextCursor)) + + // cursor increments + for i := int32(1); i < freq; i++ { + nextCursor, err = cursorMgr.Advance() + require.NoError(t, err) + assert.EqualValues(t, 9*freq+i, getLedgerFromCursor(nextCursor)) + } + + // cursor stops when no more actives + _, err = cursorMgr.Advance() + assert.ErrorIs(t, err, io.EOF) +} diff --git a/exp/lighthorizon/services/main.go b/exp/lighthorizon/services/main.go index 7d5a4bcefb..349aee0d84 100644 --- a/exp/lighthorizon/services/main.go +++ b/exp/lighthorizon/services/main.go @@ -8,7 +8,6 @@ import ( "github.com/stellar/go/exp/lighthorizon/common" "github.com/stellar/go/exp/lighthorizon/index" "github.com/stellar/go/historyarchive" - "github.com/stellar/go/toid" "github.com/stellar/go/xdr" "github.com/stellar/go/support/errors" @@ -16,7 +15,8 @@ import ( ) const ( - allIndexes = "all/all" + allTransactionsIndex = "all/all" + allPaymentsIndex = "all/payments" ) var ( @@ -52,16 +52,24 @@ type TransactionRepository interface { GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error) } +// searchCallback is a generic way for any endpoint to process a transaction and +// its corresponding ledger. It should return whether or not we should stop +// processing (e.g. when a limit is reached) and any error that occurred. type searchCallback func(archive.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error) -func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Operation, error) { +func (os *OperationsService) GetOperationsByAccount(ctx context.Context, + cursor int64, limit uint64, + accountId string, +) ([]common.Operation, error) { ops := []common.Operation{} + opsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { for operationOrder, op := range tx.Envelope.Operations() { - opParticipants, opParticipantErr := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder) - if opParticipantErr != nil { - return false, opParticipantErr + opParticipants, err := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder) + if err != nil { + return false, err } + if _, foundInOp := opParticipants[accountId]; foundInOp { ops = append(ops, common.Operation{ TransactionEnvelope: &tx.Envelope, @@ -70,11 +78,13 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor TxIndex: int32(tx.Index), OpIndex: int32(operationOrder), }) + if uint64(len(ops)) == limit { return true, nil } } } + return false, nil } @@ -85,7 +95,10 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor return ops, nil } -func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error) { +func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, + cursor int64, limit uint64, + accountId string, +) ([]common.Transaction, error) { txs := []common.Transaction{} txsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) { @@ -96,7 +109,8 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cur TxIndex: int32(tx.Index), NetworkPassphrase: ts.Config.Passphrase, }) - return (uint64(len(txs)) >= limit), nil + + return uint64(len(txs)) == limit, nil } if err := searchTxByAccount(ctx, cursor, accountId, ts.Config, txsCallback); err != nil { @@ -106,18 +120,23 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cur } func searchTxByAccount(ctx context.Context, cursor int64, accountId string, config Config, callback searchCallback) error { - nextLedger, err := getAccountNextLedgerCursor(accountId, cursor, config.IndexStore, allIndexes) + cursorMgr := NewCursorManagerForAccountActivity(config.IndexStore, accountId) + cursor, err := cursorMgr.Begin(cursor) if err == io.EOF { return nil } else if err != nil { return err } - log.Debugf("Searching index by account %v starting at cursor %v", accountId, nextLedger) + + nextLedger := getLedgerFromCursor(cursor) + log.Debugf("Searching %s for account %s starting at ledger %d", + allTransactionsIndex, accountId, nextLedger) for { - ledger, ledgerErr := config.Archive.GetLedger(ctx, uint32(nextLedger)) + ledger, ledgerErr := config.Archive.GetLedger(ctx, nextLedger) if ledgerErr != nil { - return errors.Wrapf(ledgerErr, "ledger export state is out of sync, missing ledger %v from checkpoint %v", nextLedger, getIndexCheckpointCounter(uint32(nextLedger))) + return errors.Wrapf(ledgerErr, + "ledger export state is out of sync at ledger %d", nextLedger) } reader, readerErr := config.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(config.Passphrase, ledger) @@ -140,10 +159,9 @@ func searchTxByAccount(ctx context.Context, cursor int64, accountId string, conf } if _, found := participants[accountId]; found { - if finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header); callBackErr != nil { + finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header) + if finished || callBackErr != nil { return callBackErr - } else if finished { - return nil } } @@ -151,43 +169,14 @@ func searchTxByAccount(ctx context.Context, cursor int64, accountId string, conf return ctx.Err() } } - nextCursor := toid.New(int32(nextLedger), 1, 1).ToInt64() - nextLedger, err = getAccountNextLedgerCursor(accountId, nextCursor, config.IndexStore, allIndexes) + + cursor, err = cursorMgr.Advance() if err == io.EOF { return nil } else if err != nil { return err } - } -} - -// this deals in ledgers but adapts to the index model, which is currently keyed by checkpoint for now -func getAccountNextLedgerCursor(accountId string, cursor int64, store index.Store, indexName string) (uint64, error) { - nextLedger := uint32(toid.Parse(cursor).LedgerSequence + 1) - - // done for performance reasons, skip reading the index for any requested ledger cursors - // only need to read the index when next cursor falls on checkpoint boundary - if !checkpointManager.IsCheckpoint(nextLedger) { - return uint64(nextLedger), nil - } - // the 'NextActive' index query takes a starting checkpoint, from which the index is scanned AFTER that checkpoint, non-inclusive - // use the the currrent checkpoint as the starting point since it represents up to the cursor's ledger - queryStartingCheckpoint := getIndexCheckpointCounter(nextLedger) - indexNextCheckpoint, err := store.NextActive(accountId, indexName, queryStartingCheckpoint) - - if err != nil { - return 0, err + nextLedger = getLedgerFromCursor(cursor) } - - // return the first ledger of the next index checkpoint that had account activity after cursor - // so we need to go back 64 ledgers(one checkpoint's worth) relative to next index checkpoint - // to get first ledger, since checkpoint ledgers are the last/greatest ledger in the checkpoint range - return uint64((indexNextCheckpoint - 1) * checkpointManager.GetCheckpointFrequency()), nil -} - -// derives what checkpoint this ledger would be in the index -func getIndexCheckpointCounter(ledger uint32) uint32 { - return (checkpointManager.GetCheckpoint(uint32(ledger)) / - checkpointManager.GetCheckpointFrequency()) + 1 } diff --git a/exp/lighthorizon/services/main_test.go b/exp/lighthorizon/services/main_test.go index ab472f2883..f543e8117c 100644 --- a/exp/lighthorizon/services/main_test.go +++ b/exp/lighthorizon/services/main_test.go @@ -7,178 +7,176 @@ import ( "github.com/stellar/go/exp/lighthorizon/archive" "github.com/stellar/go/exp/lighthorizon/index" + "github.com/stellar/go/toid" "github.com/stellar/go/xdr" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestItGetsTransactionsByAccount(tt *testing.T) { - // l=1586045, t=1, o=1 - // cursor = 6812011404988417, checkpoint=24781 +var ( + accountId = "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" + startLedgerSeq = 1586112 +) - cursor := int64(6812011404988417) +func TestItGetsTransactionsByAccount(t *testing.T) { ctx := context.Background() - passphrase := "White New England clam chowder" - archive, store := mockArchiveAndIndex(ctx, passphrase) - txService := TransactionsService{ - Config: Config{ - Archive: archive, - IndexStore: store, - Passphrase: passphrase, - }, - } - accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" - // this should start at next checkpoint - txs, err := txService.GetTransactionsByAccount(ctx, cursor, 1, accountId) - require.NoError(tt, err) - require.Len(tt, txs, 1) - require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) - require.Equal(tt, txs[0].TxIndex, int32(2)) -} - -func TestItGetsTransactionsByAccountAndPageLimit(tt *testing.T) { - // l=1586045, t=1, o=1 - // cursor = 6812011404988417, checkpoint=24781 - cursor := int64(6812011404988417) - ctx := context.Background() - passphrase := "White New England clam chowder" - archive, store := mockArchiveAndIndex(ctx, passphrase) - txService := TransactionsService{ - Config: Config{ - Archive: archive, - IndexStore: store, - Passphrase: passphrase, - }, - } - accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" - // this should start at next checkpoint - txs, err := txService.GetTransactionsByAccount(ctx, cursor, 5, accountId) - require.NoError(tt, err) - require.Len(tt, txs, 2) - require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) - require.Equal(tt, txs[0].TxIndex, int32(2)) - require.Equal(tt, txs[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586114)) - require.Equal(tt, txs[1].TxIndex, int32(1)) + // this is in the checkpoint range prior to the first active checkpoint + ledgerSeq := checkpointMgr.PrevCheckpoint(uint32(startLedgerSeq)) + cursor := toid.New(int32(ledgerSeq), 1, 1).ToInt64() + + t.Run("first", func(tt *testing.T) { + txService := newTransactionService(ctx) + + txs, err := txService.GetTransactionsByAccount(ctx, cursor, 1, accountId) + require.NoError(tt, err) + require.Len(tt, txs, 1) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.EqualValues(tt, txs[0].TxIndex, 2) + }) + + t.Run("without cursor", func(tt *testing.T) { + txService := newTransactionService(ctx) + + txs, err := txService.GetTransactionsByAccount(ctx, 0, 1, accountId) + require.NoError(tt, err) + require.Len(tt, txs, 1) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.EqualValues(tt, txs[0].TxIndex, 2) + }) + + t.Run("with limit", func(tt *testing.T) { + txService := newTransactionService(ctx) + + txs, err := txService.GetTransactionsByAccount(ctx, cursor, 5, accountId) + require.NoError(tt, err) + require.Len(tt, txs, 2) + require.Equal(tt, txs[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.EqualValues(tt, txs[0].TxIndex, 2) + require.Equal(tt, txs[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586114)) + require.EqualValues(tt, txs[1].TxIndex, 1) + }) } -func TestItGetsOperationsByAccount(tt *testing.T) { - // l=1586045, t=1, o=1 - // cursor = 6812011404988417, checkpoint=24781 - - cursor := int64(6812011404988417) +func TestItGetsOperationsByAccount(t *testing.T) { ctx := context.Background() - passphrase := "White New England clam chowder" - archive, store := mockArchiveAndIndex(ctx, passphrase) - opsService := OperationsService{ - Config: Config{ - Archive: archive, - IndexStore: store, - Passphrase: passphrase, - }, - } - accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" - // this should start at next checkpoint - ops, err := opsService.GetOperationsByAccount(ctx, cursor, 1, accountId) - require.NoError(tt, err) - require.Len(tt, ops, 1) - require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) - require.Equal(tt, ops[0].TxIndex, int32(2)) -} - -func TestItGetsOperationsByAccountAndPageLimit(tt *testing.T) { - // l=1586045, t=1, o=1 - // cursor = 6812011404988417, checkpoint=24781 - cursor := int64(6812011404988417) - ctx := context.Background() - passphrase := "White New England clam chowder" - archive, store := mockArchiveAndIndex(ctx, passphrase) - opsService := OperationsService{ - Config: Config{ - Archive: archive, - IndexStore: store, - Passphrase: passphrase, - }, - } - accountId := "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX" - // this should start at next checkpoint - ops, err := opsService.GetOperationsByAccount(ctx, cursor, 5, accountId) - require.NoError(tt, err) - require.Len(tt, ops, 2) - require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) - require.Equal(tt, ops[0].TxIndex, int32(2)) - require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586114)) - require.Equal(tt, ops[1].TxIndex, int32(1)) + // this is in the checkpoint range prior to the first active checkpoint + ledgerSeq := checkpointMgr.PrevCheckpoint(uint32(startLedgerSeq)) + cursor := toid.New(int32(ledgerSeq), 1, 1).ToInt64() + + t.Run("first", func(tt *testing.T) { + opsService := newOperationService(ctx) + + // this should start at next checkpoint + ops, err := opsService.GetOperationsByAccount(ctx, cursor, 1, accountId) + require.NoError(tt, err) + require.Len(tt, ops, 1) + require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.Equal(tt, ops[0].TxIndex, int32(2)) + + }) + + t.Run("with limit", func(tt *testing.T) { + opsService := newOperationService(ctx) + + // this should start at next checkpoint + ops, err := opsService.GetOperationsByAccount(ctx, cursor, 5, accountId) + require.NoError(tt, err) + require.Len(tt, ops, 2) + require.Equal(tt, ops[0].LedgerHeader.LedgerSeq, xdr.Uint32(1586113)) + require.Equal(tt, ops[0].TxIndex, int32(2)) + require.Equal(tt, ops[1].LedgerHeader.LedgerSeq, xdr.Uint32(1586114)) + require.Equal(tt, ops[1].TxIndex, int32(1)) + }) } func mockArchiveAndIndex(ctx context.Context, passphrase string) (archive.Archive, index.Store) { - mockArchive := &archive.MockArchive{} mockReaderLedger1 := &archive.MockLedgerTransactionReader{} mockReaderLedger2 := &archive.MockLedgerTransactionReader{} mockReaderLedger3 := &archive.MockLedgerTransactionReader{} mockReaderLedgerTheRest := &archive.MockLedgerTransactionReader{} - expectedLedger1 := testLedger(1586112) - expectedLedger2 := testLedger(1586113) - expectedLedger3 := testLedger(1586114) + expectedLedger1 := testLedger(startLedgerSeq) + expectedLedger2 := testLedger(startLedgerSeq + 1) + expectedLedger3 := testLedger(startLedgerSeq + 2) + + // throw an irrelevant account in there to make sure it's filtered source := xdr.MustAddress("GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU") - source2 := xdr.MustAddress("GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX") + source2 := xdr.MustAddress(accountId) + // assert results iterate sequentially across ops-tx-ledgers - expectedLedger1Transaction1 := testLedgerTx(source, []int{34, 34}, 1) - expectedLedger1Transaction2 := testLedgerTx(source, []int{34}, 2) - expectedLedger2Transaction1 := testLedgerTx(source, []int{34}, 1) - expectedLedger2Transaction2 := testLedgerTx(source2, []int{34}, 2) - expectedLedger3Transaction1 := testLedgerTx(source2, []int{34}, 1) - expectedLedger3Transaction2 := testLedgerTx(source, []int{34}, 2) - - mockArchive.On("GetLedger", ctx, uint32(1586112)).Return(expectedLedger1, nil) - mockArchive.On("GetLedger", ctx, uint32(1586113)).Return(expectedLedger2, nil) - mockArchive.On("GetLedger", ctx, uint32(1586114)).Return(expectedLedger3, nil) - mockArchive.On("GetLedger", ctx, mock.Anything).Return(xdr.LedgerCloseMeta{}, nil) - - mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger1).Return(mockReaderLedger1, nil) - mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger2).Return(mockReaderLedger2, nil) - mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger3).Return(mockReaderLedger3, nil) - mockArchive.On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, mock.Anything).Return(mockReaderLedgerTheRest, nil) + expectedLedger1Tx1 := testLedgerTx(source, 1, 34, 35) + expectedLedger1Tx2 := testLedgerTx(source, 2, 34) + expectedLedger2Tx1 := testLedgerTx(source, 1, 34) + expectedLedger2Tx2 := testLedgerTx(source2, 2, 34) + expectedLedger3Tx1 := testLedgerTx(source2, 1, 34) + expectedLedger3Tx2 := testLedgerTx(source, 2, 34) + + mockArchive. + On("GetLedger", ctx, uint32(1586112)).Return(expectedLedger1, nil). + On("GetLedger", ctx, uint32(1586113)).Return(expectedLedger2, nil). + On("GetLedger", ctx, uint32(1586114)).Return(expectedLedger3, nil). + On("GetLedger", ctx, mock.Anything).Return(xdr.LedgerCloseMeta{}, nil) + + mockArchive. + On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger1).Return(mockReaderLedger1, nil). + On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger2).Return(mockReaderLedger2, nil). + On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, expectedLedger3).Return(mockReaderLedger3, nil). + On("NewLedgerTransactionReaderFromLedgerCloseMeta", passphrase, mock.Anything).Return(mockReaderLedgerTheRest, nil) partialParticipants := make(map[string]struct{}) partialParticipants[source.Address()] = struct{}{} + allParticipants := make(map[string]struct{}) allParticipants[source.Address()] = struct{}{} allParticipants[source2.Address()] = struct{}{} - mockArchive.On("GetTransactionParticipants", expectedLedger1Transaction1).Return(partialParticipants, nil) - mockArchive.On("GetTransactionParticipants", expectedLedger1Transaction2).Return(partialParticipants, nil) - mockArchive.On("GetTransactionParticipants", expectedLedger2Transaction1).Return(partialParticipants, nil) - mockArchive.On("GetTransactionParticipants", expectedLedger2Transaction2).Return(allParticipants, nil) - mockArchive.On("GetTransactionParticipants", expectedLedger3Transaction1).Return(allParticipants, nil) - mockArchive.On("GetTransactionParticipants", expectedLedger3Transaction2).Return(partialParticipants, nil) - - mockArchive.On("GetOperationParticipants", expectedLedger1Transaction1, mock.Anything, int(0)).Return(partialParticipants, nil) - mockArchive.On("GetOperationParticipants", expectedLedger1Transaction1, mock.Anything, int(1)).Return(partialParticipants, nil) - mockArchive.On("GetOperationParticipants", expectedLedger1Transaction2, mock.Anything, int(0)).Return(partialParticipants, nil) - mockArchive.On("GetOperationParticipants", expectedLedger2Transaction1, mock.Anything, int(0)).Return(partialParticipants, nil) - mockArchive.On("GetOperationParticipants", expectedLedger2Transaction2, mock.Anything, int(0)).Return(allParticipants, nil) - mockArchive.On("GetOperationParticipants", expectedLedger3Transaction1, mock.Anything, int(0)).Return(allParticipants, nil) - mockArchive.On("GetOperationParticipants", expectedLedger3Transaction2, mock.Anything, int(0)).Return(partialParticipants, nil) - - mockReaderLedger1.On("Read").Return(expectedLedger1Transaction1, nil).Once() - mockReaderLedger1.On("Read").Return(expectedLedger1Transaction2, nil).Once() - mockReaderLedger1.On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() - mockReaderLedger2.On("Read").Return(expectedLedger2Transaction1, nil).Once() - mockReaderLedger2.On("Read").Return(expectedLedger2Transaction2, nil).Once() - mockReaderLedger2.On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() - mockReaderLedger3.On("Read").Return(expectedLedger3Transaction1, nil).Once() - mockReaderLedger3.On("Read").Return(expectedLedger3Transaction2, nil).Once() - mockReaderLedger3.On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() - mockReaderLedgerTheRest.On("Read").Return(archive.LedgerTransaction{}, io.EOF) - + mockArchive. + On("GetTransactionParticipants", expectedLedger1Tx1).Return(partialParticipants, nil). + On("GetTransactionParticipants", expectedLedger1Tx2).Return(partialParticipants, nil). + On("GetTransactionParticipants", expectedLedger2Tx1).Return(partialParticipants, nil). + On("GetTransactionParticipants", expectedLedger2Tx2).Return(allParticipants, nil). + On("GetTransactionParticipants", expectedLedger3Tx1).Return(allParticipants, nil). + On("GetTransactionParticipants", expectedLedger3Tx2).Return(partialParticipants, nil) + + mockArchive. + On("GetOperationParticipants", expectedLedger1Tx1, mock.Anything, int(0)).Return(partialParticipants, nil). + On("GetOperationParticipants", expectedLedger1Tx1, mock.Anything, int(1)).Return(partialParticipants, nil). + On("GetOperationParticipants", expectedLedger1Tx2, mock.Anything, int(0)).Return(partialParticipants, nil). + On("GetOperationParticipants", expectedLedger2Tx1, mock.Anything, int(0)).Return(partialParticipants, nil). + On("GetOperationParticipants", expectedLedger2Tx2, mock.Anything, int(0)).Return(allParticipants, nil). + On("GetOperationParticipants", expectedLedger3Tx1, mock.Anything, int(0)).Return(allParticipants, nil). + On("GetOperationParticipants", expectedLedger3Tx2, mock.Anything, int(0)).Return(partialParticipants, nil) + + mockReaderLedger1. + On("Read").Return(expectedLedger1Tx1, nil).Once(). + On("Read").Return(expectedLedger1Tx2, nil).Once(). + On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + + mockReaderLedger2. + On("Read").Return(expectedLedger2Tx1, nil).Once(). + On("Read").Return(expectedLedger2Tx2, nil).Once(). + On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + + mockReaderLedger3. + On("Read").Return(expectedLedger3Tx1, nil).Once(). + On("Read").Return(expectedLedger3Tx2, nil).Once(). + On("Read").Return(archive.LedgerTransaction{}, io.EOF).Once() + + mockReaderLedgerTheRest. + On("Read").Return(archive.LedgerTransaction{}, io.EOF) + + // should be 24784 + firstActiveChk := uint32(index.GetCheckpointNumber(uint32(startLedgerSeq))) mockStore := &index.MockStore{} - mockStore.On("NextActive", "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX", mock.Anything, uint32(24782)).Return(uint32(24783), nil) - mockStore.On("NextActive", "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX", mock.Anything, uint32(24783)).Return(uint32(24784), nil) - mockStore.On("NextActive", "GDCXSQPVE45DVGT2ZRFFIIHSJ2EJED65W6AELGWIDRMPMWNXCEBJ4FKX", mock.Anything, uint32(24784)).Return(uint32(0), io.EOF) + mockStore. + On("NextActive", accountId, mock.Anything, uint32(0)).Return(firstActiveChk, nil). + On("NextActive", accountId, mock.Anything, firstActiveChk-1).Return(firstActiveChk, nil). + On("NextActive", accountId, mock.Anything, firstActiveChk).Return(firstActiveChk, nil). + On("NextActive", accountId, mock.Anything, firstActiveChk+1).Return(firstActiveChk+1, nil). + On("NextActive", accountId, mock.Anything, firstActiveChk+2).Return(uint32(0), io.EOF) return mockArchive, mockStore } @@ -195,8 +193,7 @@ func testLedger(seq int) xdr.LedgerCloseMeta { } } -func testLedgerTx(source xdr.AccountId, bumpTos []int, txIndex uint32) archive.LedgerTransaction { - +func testLedgerTx(source xdr.AccountId, txIndex uint32, bumpTos ...int) archive.LedgerTransaction { ops := []xdr.Operation{} for _, bumpTo := range bumpTos { ops = append(ops, xdr.Operation{ @@ -224,3 +221,27 @@ func testLedgerTx(source xdr.AccountId, bumpTos []int, txIndex uint32) archive.L return tx } + +func newTransactionService(ctx context.Context) TransactionsService { + passphrase := "White New England clam chowder" + archive, store := mockArchiveAndIndex(ctx, passphrase) + return TransactionsService{ + Config: Config{ + Archive: archive, + IndexStore: store, + Passphrase: passphrase, + }, + } +} + +func newOperationService(ctx context.Context) OperationsService { + passphrase := "White New England clam chowder" + archive, store := mockArchiveAndIndex(ctx, passphrase) + return OperationsService{ + Config: Config{ + Archive: archive, + IndexStore: store, + Passphrase: passphrase, + }, + } +}