Skip to content

Commit

Permalink
Add support for reading from UNIX datagram sockets (#22699)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds support for reading from UNIX datagram sockets both from the `unix` input and the `syslog` input. A new option is added to select the type of the socket named `socket_type`. Available options are: `stream` and `datagram`.

## Why is it important?

A few applications which send logs over Unix sockets, use datagrams not streams. From now on, Filebeat can accept input from these applications as well.

Closes #18632
  • Loading branch information
kvch authored Nov 30, 2020
1 parent 50c1745 commit 3a1d1ae
Show file tree
Hide file tree
Showing 27 changed files with 880 additions and 392 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ from being added to events by default. {pull}18159[18159]
- Added `event.ingested` field to data from the Netflow module. {pull}22412[22412]
- Improve panw ECS url fields mapping. {pull}22481[22481]
- Improve Nats filebeat dashboard. {pull}22726[22726]
- Add support for UNIX datagram sockets in `unix` input. {issues}18632[18632] {pull}22699[22699]

*Heartbeat*

Expand Down
9 changes: 8 additions & 1 deletion filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ The maximum size of the message received over the socket. The default is `20MiB`
[id="{beatname_lc}-input-{type}-unix-path"]
==== `path`

The path to the Unix socket that will receive event streams.
The path to the Unix socket that will receive events.

[float]
[id="{beatname_lc}-input-{type}-unix-socket-type"]
==== `socket_type`

The type to of the Unix socket that will receive events. Valid values
are `stream` and `datagram`. The default is `stream`.

[float]
[id="{beatname_lc}-input-{type}-unix-group"]
Expand Down
33 changes: 14 additions & 19 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/common/streaming"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
Expand Down Expand Up @@ -59,16 +59,17 @@ var defaultTCP = syslogTCP{
}

type syslogUnix struct {
unix.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
unix.Config `config:",inline"`
}

var defaultUnix = syslogUnix{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
func defaultUnix() syslogUnix {
return syslogUnix{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
LineDelimiter: "\n",
},
}
}

var defaultUDP = udp.Config{
Expand All @@ -89,32 +90,26 @@ func factory(
return nil, err
}

splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc)
factory := streaming.SplitHandlerFactory(inputsource.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc)

return tcp.New(&config.Config, factory)
case unix.Name:
cfgwarn.Beta("Syslog Unix socket support is beta.")

config := defaultUnix
config := defaultUnix()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.syslog.unix").With("path", config.Config.Path)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, nf, splitFunc)

return unix.New(&config.Config, factory)
return unix.New(logger, &config.Config, nf)

case udp.Name:
config := defaultUDP
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/common/streaming"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -75,13 +75,13 @@ func NewInput(
forwarder.Send(event)
}

splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
splitFunc := streaming.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.tcp").With("address", config.Config.Host)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc)
factory := streaming.SplitHandlerFactory(inputsource.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc)

server, err := tcp.New(&config.Config, factory)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/unix/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
)

type config struct {
unix.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
unix.Config `config:",inline"`
}

func defaultConfig() config {
return config{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
SocketType: unix.StreamSocket,
LineDelimiter: "\n",
},
LineDelimiter: "\n",
}
}
22 changes: 7 additions & 15 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package unix

import (
"bufio"
"fmt"
"net"
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -35,8 +32,8 @@ import (
)

type server struct {
unix.Server
config
splitFunc bufio.SplitFunc
}

func Plugin() input.Plugin {
Expand All @@ -59,12 +56,7 @@ func configure(cfg *common.Config) (stateless.Input, error) {
}

func newServer(config config) (*server, error) {
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

return &server{config: config, splitFunc: splitFunc}, nil
return &server{config: config}, nil
}

func (s *server) Name() string { return "unix" }
Expand All @@ -83,17 +75,17 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log.Info("Starting Unix socket input")
defer log.Info("Unix socket input stopped")

cb := func(data []byte, metadata inputsource.NetworkMetadata) {
cb := inputsource.NetworkFunc(func(data []byte, metadata inputsource.NetworkMetadata) {
event := createEvent(data, metadata)
publisher.Publish(event)
}
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, log, unix.MetadataCallback, cb, s.splitFunc)
server, err := unix.New(&s.config.Config, factory)
})

server, err := unix.New(log, &s.config.Config, cb)
if err != nil {
return err
}

log.Debugf("TCP Input '%v' initialized", ctx.ID)
log.Debugf("%s Input '%v' initialized", s.config.Config.SocketType, ctx.ID)

err = server.Run(ctxtool.FromCanceller(ctx.Cancelation))

Expand Down
102 changes: 102 additions & 0 deletions filebeat/inputsource/common/dgram/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package dgram

import (
"context"
"fmt"
"net"
"runtime"
"strings"

"github.com/elastic/beats/v7/filebeat/inputsource"
"github.com/elastic/beats/v7/libbeat/logp"
)

// HandlerFactory returns a ConnectionHandler func
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler is able to read from incoming connections.
type ConnectionHandler func(context.Context, net.PacketConn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// DatagramReaderFactory allows creation of a handler which can read packets from connections.
func DatagramReaderFactory(
family inputsource.Family,
logger *logp.Logger,
callback inputsource.NetworkFunc,
) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(ctx context.Context, conn net.PacketConn) error {
for ctx.Err() == nil {

buffer := make([]byte, config.MaxMessageSize)
//conn.SetDeadline(time.Now().Add(config.Timeout))

// If you are using Windows and you are using a fixed buffer and you get a datagram which
// is bigger than the specified size of the buffer, it will return an `err` and the buffer will
// contains a subset of the data.
//
// On Unix based system, the buffer will be truncated but no error will be returned.
length, addr, err := conn.ReadFrom(buffer)
if err != nil {
if family == inputsource.FamilyUnix {
fmt.Println("connection handler error", err)
}
// don't log any deadline events.
e, ok := err.(net.Error)
if ok && e.Timeout() {
continue
}

// Closed network error string will never change in Go 1.X
// https://github.com/golang/go/issues/4373
opErr, ok := err.(*net.OpError)
if ok && strings.Contains(opErr.Err.Error(), "use of closed network connection") {
logger.Info("Connection has been closed")
return nil
}

logger.Errorf("Error reading from the socket %s", err)

// On Windows send the current buffer and mark it as truncated.
// The buffer will have content but length will return 0, addr will be nil.
if family == inputsource.FamilyUDP && isLargerThanBuffer(err) {
callback(buffer, inputsource.NetworkMetadata{RemoteAddr: addr, Truncated: true})
continue
}
}

if length > 0 {
callback(buffer[:length], inputsource.NetworkMetadata{RemoteAddr: addr})
}
}
fmt.Println("end of connection handling")
return nil
})
}
}

func isLargerThanBuffer(err error) bool {
if runtime.GOOS != "windows" {
return false
}
return strings.Contains(err.Error(), windowErrBuffer)
}
Loading

0 comments on commit 3a1d1ae

Please sign in to comment.