diff --git a/cmd/flowg/main.go b/cmd/flowg/main.go index 4ebfffe7..bde00532 100644 --- a/cmd/flowg/main.go +++ b/cmd/flowg/main.go @@ -15,7 +15,9 @@ var exitCode int = 0 var ( defaultHttpBindAddress = getEnv("FLOWG_HTTP_BIND_ADDRESS", ":5080") - defaultSyslogBindAddr = getEnv("FLOWG_SYSLOG_BIND_ADDRESS", ":5514") + + defaultSyslogProtocol = getEnv("FLOWG_SYSLOG_PROTOCOL", "udp") + defaultSyslogBindAddr = getEnv("FLOWG_SYSLOG_BIND_ADDRESS", ":5514") defaultAuthDir = getEnv("FLOWG_AUTH_DIR", "./data/auth") defaultConfigDir = getEnv("FLOWG_CONFIG_DIR", "./data/config") diff --git a/cmd/flowg/serve.go b/cmd/flowg/serve.go index bb8a70a2..92858e77 100644 --- a/cmd/flowg/serve.go +++ b/cmd/flowg/serve.go @@ -22,7 +22,12 @@ type serveCommandOpts struct { httpTlsCert string httpTlsCertKey string - syslogBindAddr string + syslogProtocol string + syslogBindAddr string + syslogTlsEnabled bool + syslogTlsCert string + syslogTlsCertKey string + syslogTlsAuthEnabled bool authDir string logDir string @@ -41,12 +46,15 @@ func NewServeCommand() *cobra.Command { metrics.Setup() }, Run: func(cmd *cobra.Command, args []string) { - var httpTlsConfig *tls.Config + var ( + httpTlsConfig *tls.Config + syslogTlsConfig *tls.Config + ) if opts.httpTlsEnabled { cert, err := tls.LoadX509KeyPair(opts.httpTlsCert, opts.httpTlsCertKey) if err != nil { - fmt.Fprintf(os.Stderr, "Failed to load TLS certificate: %v", err) + fmt.Fprintf(os.Stderr, "Failed to load TLS certificate: %v\n", err) exitCode = 1 return } @@ -56,11 +64,46 @@ func NewServeCommand() *cobra.Command { } } + if opts.syslogProtocol != "tcp" && opts.syslogProtocol != "udp" { + cmd.Usage() + fmt.Fprintf(os.Stderr, "\nERROR: Invalid syslog protocol: %s\n", opts.syslogProtocol) + exitCode = 1 + return + } + + if opts.syslogTlsEnabled && opts.syslogProtocol == "udp" { + cmd.Usage() + fmt.Fprintf(os.Stderr, "\nERROR: TLS is not supported for Syslog UDP protocol\n") + exitCode = 1 + return + } + + if opts.syslogTlsEnabled { + cert, err := tls.LoadX509KeyPair(opts.syslogTlsCert, opts.syslogTlsCertKey) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to load Syslog TLS certificate: %v\n", err) + exitCode = 1 + return + } + + clientAuth := tls.VerifyClientCertIfGiven + if opts.syslogTlsAuthEnabled { + clientAuth = tls.RequireAndVerifyClientCert + } + + syslogTlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientAuth: clientAuth, + } + } + srv := server.NewServer(server.Options{ HttpBindAddress: opts.httpBindAddress, HttpTlsConfig: httpTlsConfig, + SyslogTCP: opts.syslogProtocol == "tcp", SyslogBindAddress: opts.syslogBindAddr, + SyslogTlsConfig: syslogTlsConfig, ConfigStorageDir: opts.configDir, AuthStorageDir: opts.authDir, @@ -116,6 +159,13 @@ func NewServeCommand() *cobra.Command { "Path to the certificate key file for the HTTPS server", ) + cmd.Flags().StringVar( + &opts.syslogProtocol, + "syslog-proto", + defaultSyslogProtocol, + "Protocol to use for the Syslog server (one of \"tcp\" or \"udp\")", + ) + cmd.Flags().StringVar( &opts.syslogBindAddr, "syslog-bind", @@ -123,6 +173,34 @@ func NewServeCommand() *cobra.Command { "Address to bind the Syslog server to", ) + cmd.Flags().BoolVar( + &opts.syslogTlsEnabled, + "syslog-tls", + false, + "Enable TLS for the Syslog server (requires protocol to be \"tcp\")", + ) + + cmd.Flags().StringVar( + &opts.syslogTlsCert, + "syslog-tls-cert", + "", + "Path to the certificate file for the Syslog server", + ) + + cmd.Flags().StringVar( + &opts.syslogTlsCertKey, + "syslog-tls-key", + "", + "Path to the certificate key file for the Syslog server", + ) + + cmd.Flags().BoolVar( + &opts.syslogTlsAuthEnabled, + "syslog-tls-auth", + false, + "Require clients to authenticate against the Syslog server with a client certificate", + ) + cmd.Flags().StringVar( &opts.authDir, "auth-dir", diff --git a/internal/app/server/main.go b/internal/app/server/main.go index 5221d7c6..27c1ca2d 100644 --- a/internal/app/server/main.go +++ b/internal/app/server/main.go @@ -12,7 +12,9 @@ type Options struct { HttpBindAddress string HttpTlsConfig *tls.Config + SyslogTCP bool SyslogBindAddress string + SyslogTlsConfig *tls.Config AuthStorageDir string ConfigStorageDir string @@ -37,7 +39,9 @@ func NewServer(opts Options) *Server { opts.HttpBindAddress, opts.HttpTlsConfig, + opts.SyslogTCP, opts.SyslogBindAddress, + opts.SyslogTlsConfig, storageLayer, engineLayer, diff --git a/internal/app/server/service_layer.go b/internal/app/server/service_layer.go index 7c4262bd..f0ff26d9 100644 --- a/internal/app/server/service_layer.go +++ b/internal/app/server/service_layer.go @@ -22,7 +22,9 @@ func newServiceLayer( httpBindAddress string, httpTlsConfig *tls.Config, + syslogTCP bool, syslogBindAddress string, + syslogTlsConfig *tls.Config, storageLayer *storageLayer, engineLayer *engineLayer, @@ -38,7 +40,10 @@ func newServiceLayer( ) syslogServer := syslog.NewServer( + syslogTCP, syslogBindAddress, + syslogTlsConfig, + storageLayer.configStorage, engineLayer.pipelineRunner, ) diff --git a/internal/services/syslog/main.go b/internal/services/syslog/main.go index c12be272..33b5de25 100644 --- a/internal/services/syslog/main.go +++ b/internal/services/syslog/main.go @@ -3,6 +3,8 @@ package syslog import ( "log/slog" + "crypto/tls" + "github.com/vladopajic/go-actor/actor" "link-society.com/flowg/internal/utils/sync" @@ -17,7 +19,10 @@ type Server struct { } func NewServer( + isTCP bool, bindAddress string, + tlsConfig *tls.Config, + configStorage *config.Storage, pipelineRunner *pipelines.Runner, ) *Server { @@ -27,7 +32,11 @@ func NewServer( configStorage: configStorage, pipelineRunner: pipelineRunner, - state: &workerStarting{bindAddress: bindAddress}, + state: &workerStarting{ + isTCP: isTCP, + bindAddress: bindAddress, + tlsConfig: tlsConfig, + }, startCond: sync.NewCondValue[error](), stopCond: sync.NewCondValue[error](), diff --git a/internal/services/syslog/states.go b/internal/services/syslog/states.go index c7c2c2d0..314ba6d2 100644 --- a/internal/services/syslog/states.go +++ b/internal/services/syslog/states.go @@ -2,6 +2,8 @@ package syslog import ( "log/slog" + + "crypto/tls" "sync" "github.com/vladopajic/go-actor/actor" @@ -16,7 +18,9 @@ type workerState interface { } type workerStarting struct { + isTCP bool bindAddress string + tlsConfig *tls.Config } type workerRunning struct { @@ -37,33 +41,79 @@ func (s *workerStarting) DoWork(ctx actor.Context, worker *worker) workerState { server.SetFormat(gosyslog.Automatic) server.SetHandler(handler) + proto := "udp" + if s.isTCP { + proto = "tcp" + } + worker.logger.InfoContext( ctx, "Starting Syslog server", - slog.Group("udp", + slog.Group("syslog", + slog.String("proto", proto), slog.String("bind", s.bindAddress), + slog.Bool("tls", s.tlsConfig != nil), ), ) - if err := server.ListenUDP(s.bindAddress); err != nil { - worker.logger.ErrorContext( - ctx, - "Failed to listen on UDP", - slog.Group("udp", - slog.String("bind", s.bindAddress), - ), - slog.String("error", err.Error()), - ) - worker.startCond.Broadcast(err) - return nil + switch { + case s.isTCP && s.tlsConfig != nil: + if err := server.ListenTCPTLS(s.bindAddress, s.tlsConfig); err != nil { + worker.logger.ErrorContext( + ctx, + "Failed to listen on TCP+TLS", + slog.Group("syslog", + slog.String("proto", proto), + slog.String("bind", s.bindAddress), + slog.Bool("tls", s.tlsConfig != nil), + ), + slog.String("error", err.Error()), + ) + worker.startCond.Broadcast(err) + return nil + } + + case s.isTCP && s.tlsConfig == nil: + if err := server.ListenTCP(s.bindAddress); err != nil { + worker.logger.ErrorContext( + ctx, + "Failed to listen on TCP", + slog.Group("syslog", + slog.String("proto", proto), + slog.String("bind", s.bindAddress), + slog.Bool("tls", s.tlsConfig != nil), + ), + slog.String("error", err.Error()), + ) + worker.startCond.Broadcast(err) + return nil + } + + case !s.isTCP: + if err := server.ListenUDP(s.bindAddress); err != nil { + worker.logger.ErrorContext( + ctx, + "Failed to listen on UDP", + slog.Group("syslog", + slog.String("proto", proto), + slog.String("bind", s.bindAddress), + slog.Bool("tls", s.tlsConfig != nil), + ), + slog.String("error", err.Error()), + ) + worker.startCond.Broadcast(err) + return nil + } } if err := server.Boot(); err != nil { worker.logger.ErrorContext( ctx, "Failed to boot server", - slog.Group("udp", + slog.Group("syslog", + slog.String("proto", proto), slog.String("bind", s.bindAddress), + slog.Bool("tls", s.tlsConfig != nil), ), slog.String("error", err.Error()), )