Skip to content

Commit

Permalink
Close the AMQP connection and create a new one when token changes.
Browse files Browse the repository at this point in the history
In the previous version, the shoveler would simple re-initialize the
connection when the token changes.  That lead to the possibility of a
panic when it closed an already closed connection.
  • Loading branch information
djw8605 committed May 3, 2022
1 parent 70e44f6 commit 5331585
Showing 1 changed file with 20 additions and 2 deletions.
22 changes: 20 additions & 2 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func StartAMQP(config *Config, queue *ConfirmationQueue) {
select {
case <-triggerReconnect:
log.Debugln("Triggering reconnect")
amqpQueue.newConnection(*amqpURL)
amqpQueue, err = reconnectAmqp(amqpURL, amqpQueue)
if err != nil {
log.Errorln("Failed to reconnect to AMQP:", err)
}
case msg := <-messagesQueue:
// Handle a new message to put on the message queue
TryPush:
Expand All @@ -61,7 +64,10 @@ func StartAMQP(config *Config, queue *ConfirmationQueue) {
select {
case <-triggerReconnect:
log.Debugln("Triggering reconnect from within failure")
amqpQueue.newConnection(*amqpURL)
amqpQueue, err = reconnectAmqp(amqpURL, amqpQueue)
if err != nil {
log.Errorln("Failed to reconnect to AMQP:", err)
}
case <-time.After(time.Duration(randSleep) * time.Millisecond):
continue TryPush
}
Expand All @@ -73,6 +79,18 @@ func StartAMQP(config *Config, queue *ConfirmationQueue) {
}
}

// reconnectAmqp reconnects to AMQP if something fails or if the token changes.
// This is safer than just reconnecting, as it will ensure that
// resources from the previous connection are cleaned up.
func reconnectAmqp(amqpURL *url.URL, curSession *Session) (*Session, error) {
// close the current session
curSession.Close()

// Create a new session and return it
newSession := New(*amqpURL)
return newSession, nil
}

// Listen to the channel for messages
func CheckTokenFile(config *Config, tokenAge time.Time, triggerReconnect chan<- bool) {
// Create a timer to check for changes in the token file ever 10 seconds
Expand Down

0 comments on commit 5331585

Please sign in to comment.