Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add support for Syslog tcp and tcp+tls protocols #150

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ tasks:
dir: ./website
cmds:
- npm i
- npm start
- npm start

release:
desc: "Update version number"
cmds:
- echo {{.CLI_ARGS}} > VERSION.txt
- yq eval '.version = "{{.CLI_ARGS}}"' -i k8s/charts/flowg/Chart.yaml
- yq eval '.appVersion = "{{.CLI_ARGS}}"' -i k8s/charts/flowg/Chart.yaml
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.18.0
v0.19.0
4 changes: 3 additions & 1 deletion cmd/flowg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ var exitCode int = 0

var (
defaultHttpBindAddress = getEnv("FLOWG_HTTP_BIND_ADDRESS", ":5080")
defaultSyslogBindAddr = getEnv("FLOWG_SYSLOG_BIND_ADDRESS", ":5514")

defaultSyslogProtocol = getEnv("FLOWG_SYSLOG_PROTOCOL", "udp")
defaultSyslogBindAddr = getEnv("FLOWG_SYSLOG_BIND_ADDRESS", ":5514")

defaultAuthDir = getEnv("FLOWG_AUTH_DIR", "./data/auth")
defaultConfigDir = getEnv("FLOWG_CONFIG_DIR", "./data/config")
Expand Down
84 changes: 81 additions & 3 deletions cmd/flowg/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ type serveCommandOpts struct {
httpTlsCert string
httpTlsCertKey string

syslogBindAddr string
syslogProtocol string
syslogBindAddr string
syslogTlsEnabled bool
syslogTlsCert string
syslogTlsCertKey string
syslogTlsAuthEnabled bool

authDir string
logDir string
Expand All @@ -41,12 +46,15 @@ func NewServeCommand() *cobra.Command {
metrics.Setup()
},
Run: func(cmd *cobra.Command, args []string) {
var httpTlsConfig *tls.Config
var (
httpTlsConfig *tls.Config
syslogTlsConfig *tls.Config
)

if opts.httpTlsEnabled {
cert, err := tls.LoadX509KeyPair(opts.httpTlsCert, opts.httpTlsCertKey)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load TLS certificate: %v", err)
fmt.Fprintf(os.Stderr, "Failed to load TLS certificate: %v\n", err)
exitCode = 1
return
}
Expand All @@ -56,11 +64,46 @@ func NewServeCommand() *cobra.Command {
}
}

if opts.syslogProtocol != "tcp" && opts.syslogProtocol != "udp" {
cmd.Usage()
fmt.Fprintf(os.Stderr, "\nERROR: Invalid syslog protocol: %s\n", opts.syslogProtocol)
exitCode = 1
return
}

if opts.syslogTlsEnabled && opts.syslogProtocol == "udp" {
cmd.Usage()
fmt.Fprintf(os.Stderr, "\nERROR: TLS is not supported for Syslog UDP protocol\n")
exitCode = 1
return
}

if opts.syslogTlsEnabled {
cert, err := tls.LoadX509KeyPair(opts.syslogTlsCert, opts.syslogTlsCertKey)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load Syslog TLS certificate: %v\n", err)
exitCode = 1
return
}

clientAuth := tls.VerifyClientCertIfGiven
if opts.syslogTlsAuthEnabled {
clientAuth = tls.RequireAndVerifyClientCert
}

syslogTlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: clientAuth,
}
}

srv := server.NewServer(server.Options{
HttpBindAddress: opts.httpBindAddress,
HttpTlsConfig: httpTlsConfig,

SyslogTCP: opts.syslogProtocol == "tcp",
SyslogBindAddress: opts.syslogBindAddr,
SyslogTlsConfig: syslogTlsConfig,

ConfigStorageDir: opts.configDir,
AuthStorageDir: opts.authDir,
Expand Down Expand Up @@ -116,13 +159,48 @@ func NewServeCommand() *cobra.Command {
"Path to the certificate key file for the HTTPS server",
)

cmd.Flags().StringVar(
&opts.syslogProtocol,
"syslog-proto",
defaultSyslogProtocol,
"Protocol to use for the Syslog server (one of \"tcp\" or \"udp\")",
)

cmd.Flags().StringVar(
&opts.syslogBindAddr,
"syslog-bind",
defaultSyslogBindAddr,
"Address to bind the Syslog server to",
)

cmd.Flags().BoolVar(
&opts.syslogTlsEnabled,
"syslog-tls",
false,
"Enable TLS for the Syslog server (requires protocol to be \"tcp\")",
)

cmd.Flags().StringVar(
&opts.syslogTlsCert,
"syslog-tls-cert",
"",
"Path to the certificate file for the Syslog server",
)

cmd.Flags().StringVar(
&opts.syslogTlsCertKey,
"syslog-tls-key",
"",
"Path to the certificate key file for the Syslog server",
)

cmd.Flags().BoolVar(
&opts.syslogTlsAuthEnabled,
"syslog-tls-auth",
false,
"Require clients to authenticate against the Syslog server with a client certificate",
)

cmd.Flags().StringVar(
&opts.authDir,
"auth-dir",
Expand Down
25 changes: 15 additions & 10 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@ Usage:
flowg serve [flags]

Flags:
--auth-dir string Path to the auth database directory (default "./data/auth")
--config-dir string Path to the config directory (default "./data/config")
-h, --help help for serve
--http-bind string Address to bind the HTTP server to (default ":5080")
--http-tls Enable TLS for the HTTP server
--http-tls-cert string Path to the certificate file for the HTTPS server
--http-tls-key string Path to the certificate key file for the HTTPS server
--log-dir string Path to the log database directory (default "./data/logs")
--syslog-bind string Address to bind the Syslog server to (default ":5514")
--verbose Enable verbose logging
--auth-dir string Path to the auth database directory (default "./data/auth")
--config-dir string Path to the config directory (default "./data/config")
-h, --help help for serve
--http-bind string Address to bind the HTTP server to (default ":5080")
--http-tls Enable TLS for the HTTP server
--http-tls-cert string Path to the certificate file for the HTTPS server
--http-tls-key string Path to the certificate key file for the HTTPS server
--log-dir string Path to the log database directory (default "./data/logs")
--syslog-bind string Address to bind the Syslog server to (default ":5514")
--syslog-proto string Protocol to use for the Syslog server (one of "tcp" or "udp") (default "udp")
--syslog-tls Enable TLS for the Syslog server (requires protocol to be "tcp")
--syslog-tls-auth Require clients to authenticate against the Syslog server with a client certificate
--syslog-tls-cert string Path to the certificate file for the Syslog server
--syslog-tls-key string Path to the certificate key file for the Syslog server
--verbose Enable verbose logging
```

## 2. `flowg admin`
Expand Down
4 changes: 4 additions & 0 deletions internal/app/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ type Options struct {
HttpBindAddress string
HttpTlsConfig *tls.Config

SyslogTCP bool
SyslogBindAddress string
SyslogTlsConfig *tls.Config

AuthStorageDir string
ConfigStorageDir string
Expand All @@ -37,7 +39,9 @@ func NewServer(opts Options) *Server {
opts.HttpBindAddress,
opts.HttpTlsConfig,

opts.SyslogTCP,
opts.SyslogBindAddress,
opts.SyslogTlsConfig,

storageLayer,
engineLayer,
Expand Down
5 changes: 5 additions & 0 deletions internal/app/server/service_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func newServiceLayer(
httpBindAddress string,
httpTlsConfig *tls.Config,

syslogTCP bool,
syslogBindAddress string,
syslogTlsConfig *tls.Config,

storageLayer *storageLayer,
engineLayer *engineLayer,
Expand All @@ -38,7 +40,10 @@ func newServiceLayer(
)

syslogServer := syslog.NewServer(
syslogTCP,
syslogBindAddress,
syslogTlsConfig,

storageLayer.configStorage,
engineLayer.pipelineRunner,
)
Expand Down
11 changes: 10 additions & 1 deletion internal/services/syslog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package syslog
import (
"log/slog"

"crypto/tls"

"github.com/vladopajic/go-actor/actor"

"link-society.com/flowg/internal/utils/sync"
Expand All @@ -17,7 +19,10 @@ type Server struct {
}

func NewServer(
isTCP bool,
bindAddress string,
tlsConfig *tls.Config,

configStorage *config.Storage,
pipelineRunner *pipelines.Runner,
) *Server {
Expand All @@ -27,7 +32,11 @@ func NewServer(
configStorage: configStorage,
pipelineRunner: pipelineRunner,

state: &workerStarting{bindAddress: bindAddress},
state: &workerStarting{
isTCP: isTCP,
bindAddress: bindAddress,
tlsConfig: tlsConfig,
},

startCond: sync.NewCondValue[error](),
stopCond: sync.NewCondValue[error](),
Expand Down
76 changes: 63 additions & 13 deletions internal/services/syslog/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package syslog

import (
"log/slog"

"crypto/tls"
"sync"

"github.com/vladopajic/go-actor/actor"
Expand All @@ -16,7 +18,9 @@ type workerState interface {
}

type workerStarting struct {
isTCP bool
bindAddress string
tlsConfig *tls.Config
}

type workerRunning struct {
Expand All @@ -37,33 +41,79 @@ func (s *workerStarting) DoWork(ctx actor.Context, worker *worker) workerState {
server.SetFormat(gosyslog.Automatic)
server.SetHandler(handler)

proto := "udp"
if s.isTCP {
proto = "tcp"
}

worker.logger.InfoContext(
ctx,
"Starting Syslog server",
slog.Group("udp",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
)

if err := server.ListenUDP(s.bindAddress); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on UDP",
slog.Group("udp",
slog.String("bind", s.bindAddress),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
switch {
case s.isTCP && s.tlsConfig != nil:
if err := server.ListenTCPTLS(s.bindAddress, s.tlsConfig); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on TCP+TLS",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
}

case s.isTCP && s.tlsConfig == nil:
if err := server.ListenTCP(s.bindAddress); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on TCP",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
}

case !s.isTCP:
if err := server.ListenUDP(s.bindAddress); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to listen on UDP",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
worker.startCond.Broadcast(err)
return nil
}
}

if err := server.Boot(); err != nil {
worker.logger.ErrorContext(
ctx,
"Failed to boot server",
slog.Group("udp",
slog.Group("syslog",
slog.String("proto", proto),
slog.String("bind", s.bindAddress),
slog.Bool("tls", s.tlsConfig != nil),
),
slog.String("error", err.Error()),
)
Expand Down
4 changes: 2 additions & 2 deletions k8s/charts/flowg/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
apiVersion: v2
name: flowg
version: "v0.18.0"
version: "v0.19.0"
type: application
appVersion: "v0.18.0"
appVersion: "v0.19.0"
kubeVersion: ">=1.30.0-0"
description: Flowg is a low-code log processing platform.
home: https://github.com/link-society/flowg
Expand Down