Skip to content

Commit

Permalink
✨ add support for syslog tcp and tcp+tls protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
linkdd committed Oct 7, 2024
1 parent 1b47d36 commit 8cf601f
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 18 deletions.
4 changes: 3 additions & 1 deletion cmd/flowg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ var exitCode int = 0

var (
defaultHttpBindAddress = getEnv("FLOWG_HTTP_BIND_ADDRESS", ":5080")
defaultSyslogBindAddr = getEnv("FLOWG_SYSLOG_BIND_ADDRESS", ":5514")

defaultSyslogProtocol = getEnv("FLOWG_SYSLOG_PROTOCOL", "udp")
defaultSyslogBindAddr = getEnv("FLOWG_SYSLOG_BIND_ADDRESS", ":5514")

defaultAuthDir = getEnv("FLOWG_AUTH_DIR", "./data/auth")
defaultConfigDir = getEnv("FLOWG_CONFIG_DIR", "./data/config")
Expand Down
84 changes: 81 additions & 3 deletions cmd/flowg/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ type serveCommandOpts struct {
httpTlsCert string
httpTlsCertKey string

syslogBindAddr string
syslogProtocol string
syslogBindAddr string
syslogTlsEnabled bool
syslogTlsCert string
syslogTlsCertKey string
syslogTlsAuthEnabled bool

authDir string
logDir string
Expand All @@ -41,12 +46,15 @@ func NewServeCommand() *cobra.Command {
metrics.Setup()
},
Run: func(cmd *cobra.Command, args []string) {
var httpTlsConfig *tls.Config
var (
httpTlsConfig *tls.Config
syslogTlsConfig *tls.Config
)

if opts.httpTlsEnabled {
cert, err := tls.LoadX509KeyPair(opts.httpTlsCert, opts.httpTlsCertKey)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load TLS certificate: %v", err)
fmt.Fprintf(os.Stderr, "Failed to load TLS certificate: %v\n", err)
exitCode = 1
return
}
Expand All @@ -56,11 +64,46 @@ func NewServeCommand() *cobra.Command {
}
}

if opts.syslogProtocol != "tcp" && opts.syslogProtocol != "udp" {
cmd.Usage()
fmt.Fprintf(os.Stderr, "\nERROR: Invalid syslog protocol: %s\n", opts.syslogProtocol)
exitCode = 1
return
}

if opts.syslogTlsEnabled && opts.syslogProtocol == "udp" {
cmd.Usage()
fmt.Fprintf(os.Stderr, "\nERROR: TLS is not supported for Syslog UDP protocol\n")
exitCode = 1
return
}

if opts.syslogTlsEnabled {
cert, err := tls.LoadX509KeyPair(opts.syslogTlsCert, opts.syslogTlsCertKey)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load Syslog TLS certificate: %v\n", err)
exitCode = 1
return
}

clientAuth := tls.VerifyClientCertIfGiven
if opts.syslogTlsAuthEnabled {
clientAuth = tls.RequireAndVerifyClientCert
}

syslogTlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: clientAuth,
}
}

srv := server.NewServer(server.Options{
HttpBindAddress: opts.httpBindAddress,
HttpTlsConfig: httpTlsConfig,

SyslogTCP: opts.syslogProtocol == "tcp",
SyslogBindAddress: opts.syslogBindAddr,
SyslogTlsConfig: syslogTlsConfig,

ConfigStorageDir: opts.configDir,
AuthStorageDir: opts.authDir,
Expand Down Expand Up @@ -116,13 +159,48 @@ func NewServeCommand() *cobra.Command {
"Path to the certificate key file for the HTTPS server",
)

cmd.Flags().StringVar(
&opts.syslogProtocol,
"syslog-proto",
defaultSyslogProtocol,
"Protocol to use for the Syslog server (one of \"tcp\" or \"udp\")",
)

cmd.Flags().StringVar(
&opts.syslogBindAddr,
"syslog-bind",
defaultSyslogBindAddr,
"Address to bind the Syslog server to",
)

cmd.Flags().BoolVar(
&opts.syslogTlsEnabled,
"syslog-tls",
false,
"Enable TLS for the Syslog server (requires protocol to be \"tcp\")",
)

cmd.Flags().StringVar(
&opts.syslogTlsCert,
"syslog-tls-cert",
"",
"Path to the certificate file for the Syslog server",
)

cmd.Flags().StringVar(
&opts.syslogTlsCertKey,
"syslog-tls-key",
"",
"Path to the certificate key file for the Syslog server",
)

cmd.Flags().BoolVar(
&opts.syslogTlsAuthEnabled,
"syslog-tls-auth",
false,
"Require clients to authenticate against the Syslog server with a client certificate",
)

cmd.Flags().StringVar(
&opts.authDir,
"auth-dir",
Expand Down
4 changes: 4 additions & 0 deletions internal/app/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ type Options struct {
HttpBindAddress string
HttpTlsConfig *tls.Config

SyslogTCP bool
SyslogBindAddress string
SyslogTlsConfig *tls.Config

AuthStorageDir string
ConfigStorageDir string
Expand All @@ -37,7 +39,9 @@ func NewServer(opts Options) *Server {
opts.HttpBindAddress,
opts.HttpTlsConfig,

opts.SyslogTCP,
opts.SyslogBindAddress,
opts.SyslogTlsConfig,

storageLayer,
engineLayer,
Expand Down
5 changes: 5 additions & 0 deletions internal/app/server/service_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func newServiceLayer(
httpBindAddress string,
httpTlsConfig *tls.Config,

syslogTCP bool,
syslogBindAddress string,
syslogTlsConfig *tls.Config,

storageLayer *storageLayer,
engineLayer *engineLayer,
Expand All @@ -38,7 +40,10 @@ func newServiceLayer(
)

syslogServer := syslog.NewServer(
syslogTCP,
syslogBindAddress,
syslogTlsConfig,

storageLayer.configStorage,
engineLayer.pipelineRunner,
)
Expand Down
11 changes: 10 additions & 1 deletion internal/services/syslog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package syslog
import (
"log/slog"

"crypto/tls"

"github.com/vladopajic/go-actor/actor"

"link-society.com/flowg/internal/utils/sync"
Expand All @@ -17,7 +19,10 @@ type Server struct {
}

func NewServer(
isTCP bool,
bindAddress string,
tlsConfig *tls.Config,

configStorage *config.Storage,
pipelineRunner *pipelines.Runner,
) *Server {
Expand All @@ -27,7 +32,11 @@ func NewServer(
configStorage: configStorage,
pipelineRunner: pipelineRunner,

state: &workerStarting{bindAddress: bindAddress},
state: &workerStarting{
isTCP: isTCP,
bindAddress: bindAddress,
tlsConfig: tlsConfig,
},

startCond: sync.NewCondValue[error](),
stopCond: sync.NewCondValue[error](),
Expand Down
76 changes: 63 additions & 13 deletions internal/services/syslog/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package syslog

import (
"log/slog"

"crypto/tls"
"sync"

"github.com/vladopajic/go-actor/actor"
Expand All @@ -16,7 +18,9 @@ type workerState interface {
}

type workerStarting struct {
isTCP bool
bindAddress string
tlsConfig *tls.Config
}

type workerRunning struct {
Expand All @@ -37,33 +41,79 @@ func (s *workerStarting) DoWork(ctx actor.Context, worker *worker) workerState {
server.SetFormat(gosyslog.Automatic)
server.SetHandler(handler)

proto := "udp"
if s.isTCP {
proto = "tcp"
}

worker.logger.InfoContext(
ctx,
"Starting Syslog server",
slog.Group("udp",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
)

if err := server.ListenUDP(s.bindAddress); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on UDP",
slog.Group("udp",
slog.String("bind", s.bindAddress),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
switch {
case s.isTCP && s.tlsConfig != nil:
if err := server.ListenTCPTLS(s.bindAddress, s.tlsConfig); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on TCP+TLS",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
}

case s.isTCP && s.tlsConfig == nil:
if err := server.ListenTCP(s.bindAddress); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on TCP",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
}

case !s.isTCP:
if err := server.ListenUDP(s.bindAddress); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on UDP",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
}
}

if err := server.Boot(); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to boot server",
slog.Group("udp",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
Expand Down

0 comments on commit 8cf601f

Please sign in to comment.