Skip to content

Commit

Permalink
Merge pull request #2 from opensciencegrid/new-queue
Browse files Browse the repository at this point in the history
Adding new queue impelentation that uses memory and disk
  • Loading branch information
djw8605 authored Dec 16, 2021
2 parents 6bd5c62 + d8f40ef commit 83379e3
Show file tree
Hide file tree
Showing 11 changed files with 556 additions and 236 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
strategy:
matrix:
go-version: [1.17.x]
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- name: Install Go
Expand Down
114 changes: 80 additions & 34 deletions amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package main
import (
"errors"
"io/ioutil"
"math/rand"
"net/url"
"os"
"strings"
"time"

queue "github.com/opensciencegrid/xrootd-monitoring-shoveler/queue"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

// This should run in a new go co-routine.
func StartAMQP(config *Config, msgQueue *queue.MessageQueue) {
func StartAMQP(config *Config, queue *ConfirmationQueue) {

// Get the configuration URL
amqpURL := config.AmqpURL
Expand All @@ -31,47 +31,90 @@ func StartAMQP(config *Config, msgQueue *queue.MessageQueue) {
amqpURL.User = url.UserPassword("shoveler", tokenContents)
amqpQueue := New(*amqpURL)

// Create a timer to check for changes in the token file ever 10 seconds
checkTokenFile := time.NewTicker(10 * time.Second)
// Constantly check for new messages
messagesQueue := make(chan []byte)
triggerReconnect := make(chan bool)
go readMsg(messagesQueue, queue)

go CheckTokenFile(config, tokenAge, triggerReconnect)

// Listen to the channel for messages
for {
select {
case <-checkTokenFile.C:
log.Debugln("Checking the age of the token file...")
// Recheck the age of the token file
tokenStat, err := os.Stat(config.AmqpToken)
if err != nil {
log.Fatalln("Failed to stat token file", config.AmqpToken, "error:", err)
}
newTokenAge := tokenStat.ModTime()
if newTokenAge.After(tokenAge) {
tokenAge = newTokenAge
log.Debugln("Token file was updated, recreating AMQP connection...")
// New Token, reload the connection
tokenContents, err := readToken(config.AmqpToken)
case <-triggerReconnect:
log.Debugln("Triggering reconnect")
amqpQueue.newConnection(*amqpURL)
case msg := <-messagesQueue:
// Handle a new message to put on the message queue
TryPush:
for {
err = amqpQueue.Push(config.AmqpExchange, msg)
if err != nil {
log.Fatalln("Failed to read token, cannot recover")
}

// Set the username/password
amqpURL.User = url.UserPassword("shoveler", tokenContents)
amqpQueue.newConnection(*amqpURL)
// How to handle a failure to push?
// The UnsafePush function already should have tried to reconnect
log.Errorln("Failed to push message:", err)
// Try again in 1 second
// Sleep for random amount between 1 and 5 seconds
// Watch for new token files
randSleep := rand.Intn(4000) + 1000
log.Debugln("Sleeping for", randSleep/1000, "seconds")
select {
case <-triggerReconnect:
log.Debugln("Triggering reconnect from within failure")
amqpQueue.newConnection(*amqpURL)
case <-time.After(time.Duration(randSleep) * time.Millisecond):
continue TryPush
}

}
break TryPush
}
}
}
}

case msg := <-msgQueue.Receive:
// Handle a new message to put on the message queue
err = amqpQueue.UnsafePush(config.AmqpExchange, msg)
// Listen to the channel for messages
func CheckTokenFile(config *Config, tokenAge time.Time, triggerReconnect chan<- bool) {
// Create a timer to check for changes in the token file ever 10 seconds
amqpURL := config.AmqpURL
checkTokenFile := time.NewTicker(10 * time.Second)
for {
<-checkTokenFile.C
log.Debugln("Checking the age of the token file...")
// Recheck the age of the token file
tokenStat, err := os.Stat(config.AmqpToken)
if err != nil {
log.Fatalln("Failed to stat token file", config.AmqpToken, "error:", err)
}
newTokenAge := tokenStat.ModTime()
if newTokenAge.After(tokenAge) {
tokenAge = newTokenAge
log.Debugln("Token file was updated, recreating AMQP connection...")
// New Token, reload the connection
tokenContents, err := readToken(config.AmqpToken)
if err != nil {
// How to handle a failure to push?
// The UnsafePush function already should have tried to reconnect
log.Errorln("Failed to push message:", err)
log.Fatalln("Failed to read token, cannot recover")
}

// Set the username/password
amqpURL.User = url.UserPassword("shoveler", tokenContents)
triggerReconnect <- true

}

}
}

// Read a message from the queue
func readMsg(messagesQueue chan<- []byte, queue *ConfirmationQueue) {
for {
msg, err := queue.Dequeue()
if err != nil {
log.Errorln("Failed to read from queue:", err)
continue
}
messagesQueue <- msg
}
}

// Read the token from the token location
Expand Down Expand Up @@ -131,7 +174,10 @@ func New(url url.URL) *Session {
// newConnection will close the current connection, cleaning
// up the go-routines and connections. Then attempt to reconnect
func (session *Session) newConnection(url url.URL) {
session.Close()
err := session.Close()
if err != nil {
log.Errorln("Failed to close session:", err)
}
session.url = url
go session.handleReconnect()
}
Expand Down Expand Up @@ -198,11 +244,11 @@ func (session *Session) handleReInit(conn *amqp.Connection) bool {
select {
case <-session.done:
return true
case <-session.notifyConnClose:
log.Warningln("Connection closed. Reconnecting...")
case err := <-session.notifyConnClose:
log.Warningln("Connection closed. Reconnecting...", err)
return false
case <-session.notifyChanClose:
log.Warningln("Channel closed. Re-running init...")
case err := <-session.notifyChanClose:
log.Warningln("Channel closed. Re-running init...", err)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/createtoken/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
func main() {

hoursPtr := flag.Int("hours", 1, "Number of hours the token should be valid")
exchangePtr := flag.String("exchange", "shoveled-xrd", "Exchange to set")

flag.Parse()
// Read in the private key from the command line
Expand All @@ -39,7 +40,7 @@ func main() {

// Create the Claims
claims := MyCustomClaims{
"my_rabbit_server.write:xrd-mon/shoveled-xrd",
"my_rabbit_server.write:xrd-mon/" + *exchangePtr + " my_rabbit_server.read:xrd-mon/" + *exchangePtr + " my_rabbit_server.configure:xrd-mon/" + *exchangePtr,
jwt.StandardClaims{
ExpiresAt: time.Now().Add(time.Hour * time.Duration(*hoursPtr)).Unix(),
Issuer: "test",
Expand Down
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ go 1.15

require (
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a
github.com/prometheus/client_golang v1.11.0
github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.8.1
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/spf13/viper v1.10.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.0
golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed // indirect
)
Loading

0 comments on commit 83379e3

Please sign in to comment.