Skip to content

Commit

Permalink
Rename methods and functions in submission system (#4298)
Browse files Browse the repository at this point in the history
It's the first time I get into this code and the naming has made it
hard to understand. So, I decided to make the naming more
understandable.
  • Loading branch information
2opremio authored Mar 21, 2022
1 parent 105173d commit 7df8601
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ import (
"time"
)

// Queue manages the submission queue for a single source account. The
// AccountTxSubmissionQueue manages the submission queue for a single source account. The
// transaction system uses Push to enqueue submissions for given sequence
// numbers.
//
// Queue maintains a priority queue of pending submissions, and when updated
// (via the Update() method) with the current sequence number of the account
// AccountTxSubmissionQueue maintains a priority queue of pending submissions, and when updated
// (via the NotifyLastAccountSequence() method) with the current sequence number of the account
// being managed, queued submissions that can be acted upon will be unblocked.
//
type Queue struct {
type AccountTxSubmissionQueue struct {
lastActiveAt time.Time
timeout time.Duration
nextSequence uint64
queue pqueue
}

// NewQueue creates a new *Queue
func NewQueue() *Queue {
result := &Queue{
// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue
func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue {
result := &AccountTxSubmissionQueue{
lastActiveAt: time.Now(),
timeout: 10 * time.Second,
queue: nil,
Expand All @@ -34,7 +34,7 @@ func NewQueue() *Queue {
}

// Size returns the count of currently buffered submissions in the queue.
func (q *Queue) Size() int {
func (q *AccountTxSubmissionQueue) Size() int {
return len(q.queue)
}

Expand All @@ -43,26 +43,26 @@ func (q *Queue) Size() int {
// to do so.
//
// Push does not perform any triggering (which
// occurs in Update(), even if the current sequence number for this queue is
// occurs in NotifyLastAccountSequence(), even if the current sequence number for this queue is
// the same as the provided sequence, to keep internal complexity much lower.
// Given that, the recommended usage pattern is:
//
// 1. Push the submission onto the queue
// 2. Load the current sequence number for the source account from the DB
// 3. Call Update() with the result from step 2 to trigger the submission if
// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if
// possible
func (q *Queue) Push(sequence uint64) <-chan error {
func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
ch := make(chan error, 1)
heap.Push(&q.queue, item{sequence, ch})
return ch
}

// Update notifies the queue that the provided sequence number is the latest
// NotifyLastAccountSequence notifies the queue that the provided sequence number is the latest
// seen value for the account that this queue manages submissions for.
//
// This function is monotonic... calling it with a sequence number lower than
// the latest seen sequence number is a noop.
func (q *Queue) Update(sequence uint64) {
func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
if q.nextSequence <= sequence {
q.nextSequence = sequence + 1
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func (q *Queue) Update(sequence uint64) {
}

// helper function for interacting with the priority queue
func (q *Queue) head() (chan error, uint64) {
func (q *AccountTxSubmissionQueue) head() (chan error, uint64) {
if len(q.queue) == 0 {
return nil, uint64(0)
}
Expand All @@ -122,7 +122,7 @@ func (q *Queue) head() (chan error, uint64) {
}

// helper function for interacting with the priority queue
func (q *Queue) pop() (chan error, uint64) {
func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) {
i := heap.Pop(&q.queue).(item)

return i.Chan, i.Sequence
Expand All @@ -134,7 +134,7 @@ type item struct {
Chan chan error
}

// pqueue is a priority queue used by Queue to manage buffered submissions. It
// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It
// implements heap.Interface.
type pqueue []item

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
type QueueTestSuite struct {
suite.Suite
ctx context.Context
queue *Queue
queue *AccountTxSubmissionQueue
}

func (suite *QueueTestSuite) SetupTest() {
suite.ctx = test.Context()
suite.queue = NewQueue()
suite.queue = NewAccountTxSubmissionQueue()
}

//Push adds the provided channel on to the priority queue
Expand All @@ -43,35 +43,35 @@ func (suite *QueueTestSuite) TestQueue_Push() {

// Tests the update method
func (suite *QueueTestSuite) TestQueue_Update() {
// Update removes sequences that are submittable or in the past
// NotifyLastAccountSequence removes sequences that are submittable or in the past
results := []<-chan error{
suite.queue.Push(1),
suite.queue.Push(2),
suite.queue.Push(3),
suite.queue.Push(4),
}

suite.queue.Update(2)
suite.queue.NotifyLastAccountSequence(2)

// the update above signifies that 2 is the accounts current sequence,
// meaning that 3 is submittable, and so only 4 should still be queued
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(4), s)

suite.queue.Update(4)
suite.queue.NotifyLastAccountSequence(4)
assert.Equal(suite.T(), 0, suite.queue.Size())

assert.Equal(suite.T(), ErrBadSequence, <-results[0])
assert.Equal(suite.T(), ErrBadSequence, <-results[1])
assert.Equal(suite.T(), nil, <-results[2])
assert.Equal(suite.T(), ErrBadSequence, <-results[3])

// Update clears the queue if the head has not been released within the time limit
// NotifyLastAccountSequence clears the queue if the head has not been released within the time limit
suite.queue.timeout = 1 * time.Millisecond
result := suite.queue.Push(2)
<-time.After(10 * time.Millisecond)
suite.queue.Update(0)
suite.queue.NotifyLastAccountSequence(0)

assert.Equal(suite.T(), 0, suite.queue.Size())
assert.Equal(suite.T(), ErrBadSequence, <-result)
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/txsub/sequence/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
type Manager struct {
mutex sync.Mutex
MaxSize int
queues map[string]*Queue
queues map[string]*AccountTxSubmissionQueue
}

// NewManager returns a new manager
func NewManager() *Manager {
return &Manager{
MaxSize: 1024, //TODO: make MaxSize configurable
queues: map[string]*Queue{},
queues: map[string]*AccountTxSubmissionQueue{},
}
}

Expand Down Expand Up @@ -70,15 +70,15 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error {

aq, ok := m.queues[address]
if !ok {
aq = NewQueue()
aq = NewAccountTxSubmissionQueue()
m.queues[address] = aq
}

return aq.Push(sequence)
}

// Update notifies the manager of newly loaded account sequence information. The manager uses this information
// to notify requests to submit that they should proceed. See Queue#Update for the actual meat of the logic.
// to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic.
func (m *Manager) Update(updates map[string]uint64) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -89,7 +89,7 @@ func (m *Manager) Update(updates map[string]uint64) {
continue
}

queue.Update(seq)
queue.NotifyLastAccountSequence(seq)
if queue.Size() == 0 {
delete(m.queues, address)
}
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/sequence/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestManager_Push(t *testing.T) {
assert.Equal(t, 1, mgr.queues["2"].Size())
}

// Test the Update method
// Test the NotifyLastAccountSequence method
func TestManager_Update(t *testing.T) {
mgr := NewManager()
results := []<-chan error{
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (sub *submitter) Submit(ctx context.Context, env string) (result Submission
return
}

// interpet response
// interpret response
if cresp.IsException() {
result.Err = errors.Errorf("stellar-core exception: %s", cresp.Exception)
return
Expand Down

0 comments on commit 7df8601

Please sign in to comment.