Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

full node streaming - reusing subscription ids #2518

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct {

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
activeSubscriptionIds map[uint32]bool

// stream will batch and flush out messages every 10 ms.
ticker *time.Ticker
Expand Down Expand Up @@ -106,7 +106,7 @@ func NewFullNodeStreamingManager(
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
activeSubscriptionIds: make(map[uint32]bool),

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
Expand Down Expand Up @@ -170,6 +170,16 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
}
}

// getNextAvailableSubscriptionId returns next available subscription id. Assumes the
// lock has been acquired.
func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 {
id := uint32(0)
for _, inUse := sm.activeSubscriptionIds[id]; inUse; _, inUse = sm.activeSubscriptionIds[id] {
id = id + uint32(1)
}
return id
}
Comment on lines +175 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix the incorrect for loop in getNextAvailableSubscriptionId

The getNextAvailableSubscriptionId function contains a for loop with incorrect syntax and logic, which may lead to runtime errors or unintended behavior.

The current for loop:

func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 {
    id := uint32(0)
    for _, inUse := sm.activeSubscriptionIds[id]; inUse; _, inUse = sm.activeSubscriptionIds[id] {
        id = id + uint32(1)
    }
    return id
}

Issues with the loop:

  • The for loop syntax is unconventional and may not compile or behave as expected.
  • The assignment and condition checks within the loop are not correctly structured.

Proposed Fix:

Refactor the for loop to correctly iterate until an unused subscriptionId is found.

Apply this diff to fix the loop:

 func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 {
     id := uint32(0)
-    for _, inUse := sm.activeSubscriptionIds[id]; inUse; _, inUse = sm.activeSubscriptionIds[id] {
-        id = id + uint32(1)
+    for {
+        if _, inUse := sm.activeSubscriptionIds[id]; !inUse {
+            break
+        }
+        id++
     }
     return id
 }


// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
Expand All @@ -188,8 +198,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
}

subscriptionId := sm.getNextAvailableSubscriptionId()

subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
Expand All @@ -204,7 +217,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}
for _, subaccountId := range sIds {
Expand All @@ -215,7 +228,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append(
sm.subaccountIdToSubscriptionIdMapping[subaccountId],
sm.nextSubscriptionId,
subscription.subscriptionId,
)
}

Expand All @@ -228,7 +241,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
),
)
sm.orderbookSubscriptions[subscription.subscriptionId] = subscription
sm.nextSubscriptionId++
sm.activeSubscriptionIds[subscription.subscriptionId] = true
sm.EmitMetrics()
sm.Unlock()

Expand Down Expand Up @@ -280,6 +293,7 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
close(subscription.updatesChannel)
delete(sm.orderbookSubscriptions, subscriptionIdToRemove)
delete(sm.activeSubscriptionIds, subscriptionIdToRemove)

// Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove
for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping {
Expand Down
Loading