Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement input buffering #82

Merged
merged 6 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ Flags:
--heartbeat-enable Forward HTTP heartbeat event
--heartbeat-times strings Times of day to send heartbeat (list of 24h HH:MM strings)
-h, --help help for run
--in-buffer-length uint input buffer length (counted in EVE objects) (default 500000)
--in-buffer-drop drop incoming events on FEVER side instead of blocking the input socket (default true)
-r, --in-redis string Redis input server (assumes "suricata" list key, no pwd)
--in-redis-nopipe do not use Redis pipelining
-i, --in-socket string filename of input socket (accepts EVE JSON) (default "/tmp/suri.sock")
Expand Down
29 changes: 21 additions & 8 deletions cmd/fever/cmds/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,21 +534,30 @@ func mainfunc(cmd *cobra.Command, args []string) {
}()

// create input
inputChan := make(chan types.Entry)
inputBufferLen := viper.GetUint("input.buffer-length")
inputChan := make(chan types.Entry, inputBufferLen)
var sinput input.Input
inputRedis := viper.GetString("input.redis.server")
noUseRedisPipeline := viper.GetBool("input.redis.nopipe")
if len(inputRedis) > 0 {
sinput, err = input.MakeRedisInput(inputRedis, inputChan, int(chunkSize))
sinput.(*input.RedisInput).UsePipelining = !noUseRedisPipeline
sinput.(*input.RedisInput).SubmitStats(pse)
in, err := input.MakeRedisInput(inputRedis, inputChan, int(chunkSize))
if err != nil {
log.Fatal(err)
}
in.UsePipelining = !noUseRedisPipeline
in.SubmitStats(pse)
sinput = in
} else {
inputSocket := viper.GetString("input.socket")
sinput, err = input.MakeSocketInput(inputSocket, inputChan)
}
if err != nil {
log.Fatal(err)
bufDrop := viper.GetBool("input.buffer-drop")
in, err := input.MakeSocketInput(inputSocket, inputChan, bufDrop)
if err != nil {
log.Fatal(err)
}
in.SubmitStats(pse)
sinput = in
}

log.WithFields(log.Fields{
"input": sinput.GetName(),
}).Info("selected input driver")
Expand Down Expand Up @@ -583,6 +592,10 @@ func init() {
viper.BindPFlag("input.redis.server", runCmd.PersistentFlags().Lookup("in-redis"))
runCmd.PersistentFlags().BoolP("in-redis-nopipe", "", false, "do not use Redis pipelining")
viper.BindPFlag("input.redis.nopipe", runCmd.PersistentFlags().Lookup("in-redis-nopipe"))
runCmd.PersistentFlags().UintP("in-buffer-length", "", 500000, "input buffer length (counted in EVE objects)")
viper.BindPFlag("input.buffer-length", runCmd.PersistentFlags().Lookup("in-buffer-length"))
runCmd.PersistentFlags().BoolP("in-buffer-drop", "", true, "drop incoming events on FEVER side instead of blocking the input socket")
viper.BindPFlag("input.buffer-drop", runCmd.PersistentFlags().Lookup("in-buffer-drop"))

// Output options
runCmd.PersistentFlags().StringP("out-socket", "o", "/tmp/suri-forward.sock", "path to output socket (to forwarder), empty string disables forwarding")
Expand Down
16 changes: 14 additions & 2 deletions fever.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,24 @@ database:

# Configuration for input (from Suricata side). Only one of 'socket'
# or 'redis' is supported at the same time, comment/uncomment to choose.
# The 'nopipe' option disables Redis pipelining. For Redis, we assume the
# 'suricata' list as a source.
input:
# Path to the socket that Suricata writes to.
socket: /tmp/suri.sock
# Buffer length for EVE items parsed from input socket. Useful to help FEVER
# keep up with input from Suricata in case the processing pipeline is
# temporarily slow.
# Will track current buffer size in the `input_queue_length` metric.
buffer: 500000
# Rather drop items from a full buffer than causing writes to the input
# socket to block.
# This avoids congestion effects in Suricata (up to packet drops) if FEVER
# or its forwarding receiver remains slow for a longer period of time.
# Will count the number of dropped items in the `input_queue_dropped` metric.
buffer-drop: true
#redis:
# # Redis server hostname. We assume the 'suricata' list as a source.
# server: localhost
# # Disables Redis pipelining.
# nopipe: true

# Definition what event types to forward. Set 'all' to true to forward
Expand Down
71 changes: 60 additions & 11 deletions input/input_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,24 @@ import (
log "github.com/sirupsen/logrus"
)

// SocketInputPerfStats contains performance stats written to InfluxDB
// for monitoring.
type SocketInputPerfStats struct {
SocketQueueLength uint64 `influx:"input_queue_length"`
SocketQueueDropped uint64 `influx:"input_queue_dropped"`
}

// SocketInput is an Input reading JSON EVE input from a Unix socket.
type SocketInput struct {
EventChan chan types.Entry
Verbose bool
Running bool
InputListener net.Listener
StopChan chan bool
StoppedChan chan bool
EventChan chan types.Entry
Verbose bool
Running bool
InputListener net.Listener
StopChan chan bool
StoppedChan chan bool
DropIfChannelFull bool
PerfStats SocketInputPerfStats
StatsEncoder *util.PerformanceStatsEncoder
}

// GetName returns a printable name for the input
Expand Down Expand Up @@ -68,7 +78,16 @@ func (si *SocketInput) handleServerConnection() {
log.Warn(err, string(json[:]))
continue
}
si.EventChan <- e
if si.DropIfChannelFull {
select {
case si.EventChan <- e:
// pass
default:
si.PerfStats.SocketQueueDropped++
}
} else {
si.EventChan <- e
}
}
}
errRead := scanner.Err()
Expand All @@ -94,16 +113,40 @@ func (si *SocketInput) handleServerConnection() {
}
}

func (si *SocketInput) sendPerfStats() {
start := time.Now()
for {
select {
case <-si.StopChan:
return
default:
// We briefly wake up once a second to check whether we are asked
// to stop or whether it's time to submit stats. This is neglegible
// in overhead but massively improves shutdown time, as a simple
// time.Sleep() is non-interruptible by the stop channel.
if time.Since(start) > perfStatsSendInterval {
satta marked this conversation as resolved.
Show resolved Hide resolved
if si.StatsEncoder != nil {
si.PerfStats.SocketQueueLength = uint64(len(si.EventChan))
si.StatsEncoder.Submit(si.PerfStats)
}
start = time.Now()
}
time.Sleep(1 * time.Second)
}
}
}

// MakeSocketInput returns a new SocketInput reading from the Unix socket
// inputSocket and writing parsed events to outChan. If no such socket could be
// created for listening, the error returned is set accordingly.
func MakeSocketInput(inputSocket string,
outChan chan types.Entry) (*SocketInput, error) {
outChan chan types.Entry, bufDrop bool) (*SocketInput, error) {
var err error
si := &SocketInput{
EventChan: outChan,
Verbose: false,
StopChan: make(chan bool),
EventChan: outChan,
Verbose: false,
StopChan: make(chan bool),
DropIfChannelFull: bufDrop,
}
si.InputListener, err = net.Listen("unix", inputSocket)
if err != nil {
Expand All @@ -112,12 +155,18 @@ func MakeSocketInput(inputSocket string,
return si, err
}

// SubmitStats registers a PerformanceStatsEncoder for runtime stats submission.
func (si *SocketInput) SubmitStats(sc *util.PerformanceStatsEncoder) {
si.StatsEncoder = sc
}

// Run starts the SocketInput
func (si *SocketInput) Run() {
if !si.Running {
si.Running = true
si.StopChan = make(chan bool)
go si.handleServerConnection()
go si.sendPerfStats()
}
}

Expand Down
2 changes: 1 addition & 1 deletion input/input_socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSocketInput(t *testing.T) {
evChan := make(chan types.Entry)
events := make([]string, 1000)

is, err := MakeSocketInput(tmpfn, evChan)
is, err := MakeSocketInput(tmpfn, evChan, false)
if err != nil {
t.Fatal(err)
}
Expand Down