Skip to content

Commit

Permalink
Add STOMP protocol first implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kazeborja committed Jan 19, 2022
1 parent ee1494d commit 08d1536
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 35 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,27 @@ docker or kubernetes. By default, the config is stored in `/etc/xrootd-monitori

Environment variables:

* SHOVELER_MQ
* SHOVELER_AMQP_TOKEN_LOCATION
* SHOVELER_AMQP_URL
* SHOVELER_AMQP_EXCHANGE
* SHOVELER_LISTEN_PORT
* SHOVELER_LISTEN_IP
* SHOVELER_VERIFY
* SHOVELER_QUEUE_DIRECTORY
* SHOVELER_STOMP_USER
* SHOVELER_STOMP_PASSWORD
* SHOVELER_STOMP_URL
* SHOVELER_STOMP_TOPIC

Message Bus Credentials
-----------------------

The shoveler uses a [JWT](https://jwt.io/) to authorize with the message bus. The token will be issued by an
When running using AMQP as the protocol to connect the shoveler uses a [JWT](https://jwt.io/) to authorize with the message bus. The token will be issued by an
automated process, but for now, long lived tokens are issued to sites.

On the other hand, if STOMP is the selected protocol user and password will need to be provided when configuring the shoveler.

Running the Shoveler
--------------------

Expand Down
69 changes: 47 additions & 22 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ import (
)

type Config struct {
AmqpURL *url.URL // AMQP URL (password comes from the token)
AmqpExchange string // Exchange to shovel messages
AmqpToken string // File location of the token
ListenPort int
ListenIp string
DestUdp []string
Debug bool
Verify bool
MQ string // Which technology to use for the MQ connection
AmqpURL *url.URL // AMQP URL (password comes from the token)
AmqpExchange string // Exchange to shovel messages
AmqpToken string // File location of the token
ListenPort int
ListenIp string
DestUdp []string
Debug bool
Verify bool
StompUser string
StompPassword string
StompURL *url.URL
StompTopic string
}

func (c *Config) ReadConfig() {
Expand All @@ -38,24 +43,44 @@ func (c *Config) ReadConfig() {
// Look for environment variables with underscores
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

viper.SetDefault("amqp.exchange", "shoveled-xrd")
viper.SetDefault("amqp.token_location", "/etc/xrootd-monitoring-shoveler/token")
viper.SetDefault("mq", "amqp")
c.MQ = viper.GetString("mq")

// Get the AMQP URL
c.AmqpURL, err = url.Parse(viper.GetString("amqp.url"))
if err != nil {
panic(fmt.Errorf("Fatal error parsing AMQP URL: %s \n", err))
}
log.Debugln("AMQP URL:", c.AmqpURL.String())
if c.MQ == "amqp" {
viper.SetDefault("amqp.exchange", "shoveled-xrd")
viper.SetDefault("amqp.token_location", "/etc/xrootd-monitoring-shoveler/token")

// Get the AMQP URL
c.AmqpURL, err = url.Parse(viper.GetString("amqp.url"))
if err != nil {
panic(fmt.Errorf("Fatal error parsing AMQP URL: %s \n", err))
}
log.Debugln("AMQP URL:", c.AmqpURL.String())

// Get the AMQP Exchange
c.AmqpExchange = viper.GetString("amqp.exchange")
log.Debugln("AMQP Exchange:", c.AmqpExchange)

// Get the AMQP Exchange
c.AmqpExchange = viper.GetString("amqp.exchange")
log.Debugln("AMQP Exchange:", c.AmqpExchange)
// Get the Token location
c.AmqpToken = viper.GetString("amqp.token_location")
log.Debugln("AMQP Token location:", c.AmqpToken)
} else {
viper.SetDefault("stomp.topic", "xrootd.shoveler")

// Get the Token location
c.AmqpToken = viper.GetString("amqp.token_location")
log.Debugln("AMQP Token location:", c.AmqpToken)
c.StompUser = viper.GetString("stomp.user")
log.Debugln("STOMP User:", c.StompUser)
c.StompPassword = viper.GetString("stomp.password")

// Get the STOMP URL
c.StompURL, err = url.Parse(viper.GetString("stomp.url"))
if err != nil {
panic(fmt.Errorf("Fatal error parsing STOMP URL: %s \n", err))
}
log.Debugln("STOMP URL:", c.StompURL.String())

c.StompTopic = viper.GetString("stomp.topic")
log.Debugln("STOMP Topic:", c.StompTopic)
}
// Get the UDP listening parameters
viper.SetDefault("listen.port", 9993)
c.ListenPort = viper.GetInt("listen.port")
Expand Down
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ func main() {
// Start the message queue
cq := NewConfirmationQueue()

// Start the AMQP go func
go StartAMQP(&config, cq)
if config.MQ == "amqp" {
// Start the AMQP go func
go StartAMQP(&config, cq)
} else {
// Start the STOMP go func
go StartStomp(&config, cq)
}

// Start the metrics
StartMetrics()
Expand Down
47 changes: 37 additions & 10 deletions stomp.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package main

import (
stomp "github.com/go-stomp/stomp/v3"
"net/url"

stomp "github.com/go-stomp/stomp/v3"
log "github.com/sirupsen/logrus"
)

func StartStomp(config *Config, queue *ConfirmationQueue) error {

// TODO: Get the username, password, server, topic from the config
stompSession := NewStompConnection()
stompUser := config.StompUser
stompPassword := config.StompPassword
stompUrl := config.StompURL
stompTopic := config.StompTopic

stompSession := NewStompConnection(stompUser, stompPassword,
*stompUrl, stompTopic)

// Message loop, constantly be dequeing and sending the message
// No fancy stuff needed
Expand All @@ -22,32 +30,51 @@ func StartStomp(config *Config, queue *ConfirmationQueue) error {
return err
}
}
return nil

}

type StompSession struct {
Username string
Password string
username string
password string
stompUrl url.URL
Topic string
topic string
conn *stomp.Conn
}

func NewStompConnection() *StompSession {
return &StompSession{}
func NewStompConnection(username string, password string,
stompUrl url.URL, topic string) *StompSession {
session := StompSession{
username: username,
password: password,
stompUrl: stompUrl,
topic: topic,
}
go session.handleReconnect()
return &session
}

// handleReconnect reconnects to the stomp server
func (session *StompSession) handleReconnect() {
// Close the current session
session.conn.Disconnect()

// Start a new session
conn, err := stomp.Dial("tcp", session.stompUrl.String())
conn, err := stomp.Dial("tcp", session.stompUrl.String(),
stomp.ConnOpt.Login(session.username, session.password))
if err != nil {
log.Errorln("Failed to connect. Retrying:", err.Error)
}

session.conn = conn
}

// publish will send the message to the stomp message bus
// It will also handle any error in sending by calling handleReconnect
func (session *StompSession) publish(msg []byte) error {
err := session.conn.Send(
session.topic,
"text/plain",
msg)

return nil
return err
}

0 comments on commit 08d1536

Please sign in to comment.