Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Rename methods and functions in submission system #4298

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ import (
"time"
)

// Queue manages the submission queue for a single source account. The
// AccountTxSubmissionQueue manages the submission queue for a single source account. The
// transaction system uses Push to enqueue submissions for given sequence
// numbers.
//
// Queue maintains a priority queue of pending submissions, and when updated
// (via the Update() method) with the current sequence number of the account
// AccountTxSubmissionQueue maintains a priority queue of pending submissions, and when updated
// (via the NotifyLastAccountSequence() method) with the current sequence number of the account
// being managed, queued submissions that can be acted upon will be unblocked.
//
type Queue struct {
type AccountTxSubmissionQueue struct {
lastActiveAt time.Time
timeout time.Duration
nextSequence uint64
queue pqueue
}

// NewQueue creates a new *Queue
func NewQueue() *Queue {
result := &Queue{
// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue
func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue {
result := &AccountTxSubmissionQueue{
lastActiveAt: time.Now(),
timeout: 10 * time.Second,
queue: nil,
Expand All @@ -34,7 +34,7 @@ func NewQueue() *Queue {
}

// Size returns the count of currently buffered submissions in the queue.
func (q *Queue) Size() int {
func (q *AccountTxSubmissionQueue) Size() int {
return len(q.queue)
}

Expand All @@ -43,26 +43,26 @@ func (q *Queue) Size() int {
// to do so.
//
// Push does not perform any triggering (which
// occurs in Update(), even if the current sequence number for this queue is
// occurs in NotifyLastAccountSequence(), even if the current sequence number for this queue is
// the same as the provided sequence, to keep internal complexity much lower.
// Given that, the recommended usage pattern is:
//
// 1. Push the submission onto the queue
// 2. Load the current sequence number for the source account from the DB
// 3. Call Update() with the result from step 2 to trigger the submission if
// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if
// possible
func (q *Queue) Push(sequence uint64) <-chan error {
func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
ch := make(chan error, 1)
heap.Push(&q.queue, item{sequence, ch})
return ch
}

// Update notifies the queue that the provided sequence number is the latest
// NotifyLastAccountSequence notifies the queue that the provided sequence number is the latest
// seen value for the account that this queue manages submissions for.
//
// This function is monotonic... calling it with a sequence number lower than
// the latest seen sequence number is a noop.
func (q *Queue) Update(sequence uint64) {
func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
if q.nextSequence <= sequence {
q.nextSequence = sequence + 1
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (q *Queue) Update(sequence uint64) {
}

// helper function for interacting with the priority queue
func (q *Queue) head() (chan error, uint64) {
func (q *AccountTxSubmissionQueue) head() (chan error, uint64) {
if len(q.queue) == 0 {
return nil, uint64(0)
}
Expand All @@ -122,7 +122,7 @@ func (q *Queue) head() (chan error, uint64) {
}

// helper function for interacting with the priority queue
func (q *Queue) pop() (chan error, uint64) {
func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) {
i := heap.Pop(&q.queue).(item)

return i.Chan, i.Sequence
Expand All @@ -134,7 +134,7 @@ type item struct {
Chan chan error
}

// pqueue is a priority queue used by Queue to manage buffered submissions. It
// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It
// implements heap.Interface.
type pqueue []item

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
type QueueTestSuite struct {
suite.Suite
ctx context.Context
queue *Queue
queue *AccountTxSubmissionQueue
}

func (suite *QueueTestSuite) SetupTest() {
suite.ctx = test.Context()
suite.queue = NewQueue()
suite.queue = NewAccountTxSubmissionQueue()
}

//Push adds the provided channel on to the priority queue
Expand All @@ -43,35 +43,35 @@ func (suite *QueueTestSuite) TestQueue_Push() {

// Tests the update method
func (suite *QueueTestSuite) TestQueue_Update() {
// Update removes sequences that are submittable or in the past
// NotifyLastAccountSequence removes sequences that are submittable or in the past
results := []<-chan error{
suite.queue.Push(1),
suite.queue.Push(2),
suite.queue.Push(3),
suite.queue.Push(4),
}

suite.queue.Update(2)
suite.queue.NotifyLastAccountSequence(2)

// the update above signifies that 2 is the accounts current sequence,
// meaning that 3 is submittable, and so only 4 should still be queued
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(4), s)

suite.queue.Update(4)
suite.queue.NotifyLastAccountSequence(4)
assert.Equal(suite.T(), 0, suite.queue.Size())

assert.Equal(suite.T(), ErrBadSequence, <-results[0])
assert.Equal(suite.T(), ErrBadSequence, <-results[1])
assert.Equal(suite.T(), nil, <-results[2])
assert.Equal(suite.T(), ErrBadSequence, <-results[3])

// Update clears the queue if the head has not been released within the time limit
// NotifyLastAccountSequence clears the queue if the head has not been released within the time limit
suite.queue.timeout = 1 * time.Millisecond
result := suite.queue.Push(2)
<-time.After(10 * time.Millisecond)
suite.queue.Update(0)
suite.queue.NotifyLastAccountSequence(0)

assert.Equal(suite.T(), 0, suite.queue.Size())
assert.Equal(suite.T(), ErrBadSequence, <-result)
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/txsub/sequence/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
type Manager struct {
mutex sync.Mutex
MaxSize int
queues map[string]*Queue
queues map[string]*AccountTxSubmissionQueue
}

// NewManager returns a new manager
func NewManager() *Manager {
return &Manager{
MaxSize: 1024, //TODO: make MaxSize configurable
queues: map[string]*Queue{},
queues: map[string]*AccountTxSubmissionQueue{},
}
}

Expand Down Expand Up @@ -70,15 +70,15 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error {

aq, ok := m.queues[address]
if !ok {
aq = NewQueue()
aq = NewAccountTxSubmissionQueue()
m.queues[address] = aq
}

return aq.Push(sequence)
}

// Update notifies the manager of newly loaded account sequence information. The manager uses this information
// to notify requests to submit that they should proceed. See Queue#Update for the actual meat of the logic.
// to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic.
func (m *Manager) Update(updates map[string]uint64) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -89,7 +89,7 @@ func (m *Manager) Update(updates map[string]uint64) {
continue
}

queue.Update(seq)
queue.NotifyLastAccountSequence(seq)
if queue.Size() == 0 {
delete(m.queues, address)
}
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/sequence/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestManager_Push(t *testing.T) {
assert.Equal(t, 1, mgr.queues["2"].Size())
}

// Test the Update method
// Test the NotifyLastAccountSequence method
func TestManager_Update(t *testing.T) {
mgr := NewManager()
results := []<-chan error{
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (sub *submitter) Submit(ctx context.Context, env string) (result Submission
return
}

// interpet response
// interpret response
if cresp.IsException() {
result.Err = errors.Errorf("stellar-core exception: %s", cresp.Exception)
return
Expand Down