-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pkg/promtail: IETF Syslog (RFC5424) Support (#1275)
* IETF syslog (rfc5254) support * Upgrade go-syslog to correctly released v2.0.1 * Incorporate feedback from new linter * Update documentation with relevant RFC sections. * Incorporate feedback from review * Use context to shutdown server. * Use util.Backoff from cortex * Do not embed mutex into TestLabeledClient * Use strings.HasPrefix * Better naming of connectionsClosed -> openConnections * Incorporate further feedback from review * Callback instead of channel in ParseStream. Removes one running Goroutine per connection. * Improve parse error log level. * Move mutex above field it protects. * Use backoff.Ongoing() * Switch to "nontransparent" parser * Further improvements to the documentation
- Loading branch information
1 parent
9420fb1
commit f0f6f24
Showing
47 changed files
with
25,350 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package syslogparser | ||
|
||
import ( | ||
"bufio" | ||
"fmt" | ||
"io" | ||
|
||
"github.com/influxdata/go-syslog/v2" | ||
"github.com/influxdata/go-syslog/v2/nontransparent" | ||
"github.com/influxdata/go-syslog/v2/octetcounting" | ||
) | ||
|
||
// ParseStream parses a rfc5424 syslog stream from the given Reader, calling | ||
// the callback function with the parsed messages. The parser automatically | ||
// detects octet counting. | ||
// The function returns on EOF or unrecoverable errors. | ||
func ParseStream(r io.Reader, callback func(res *syslog.Result)) error { | ||
buf := bufio.NewReader(r) | ||
|
||
firstByte, err := buf.Peek(1) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
b := firstByte[0] | ||
if b == '<' { | ||
nontransparent.NewParser(syslog.WithListener(callback)).Parse(buf) | ||
} else if b >= '0' && b <= '9' { | ||
octetcounting.NewParser(syslog.WithListener(callback)).Parse(buf) | ||
} else { | ||
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package syslogparser_test | ||
|
||
import ( | ||
"io" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/grafana/loki/pkg/promtail/targets/syslogparser" | ||
"github.com/influxdata/go-syslog/v2" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestParseStream_OctetCounting(t *testing.T) { | ||
r := strings.NewReader("23 <13>1 - - - - - - First24 <13>1 - - - - - - Second") | ||
|
||
results := make([]*syslog.Result, 0) | ||
cb := func(res *syslog.Result) { | ||
results = append(results, res) | ||
} | ||
|
||
err := syslogparser.ParseStream(r, cb) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, 2, len(results)) | ||
require.NoError(t, results[0].Error) | ||
require.Equal(t, "First", *results[0].Message.Message()) | ||
require.NoError(t, results[1].Error) | ||
require.Equal(t, "Second", *results[1].Message.Message()) | ||
} | ||
|
||
func TestParseStream_NewlineSeparated(t *testing.T) { | ||
r := strings.NewReader("<13>1 - - - - - - First\n<13>1 - - - - - - Second\n") | ||
|
||
results := make([]*syslog.Result, 0) | ||
cb := func(res *syslog.Result) { | ||
results = append(results, res) | ||
} | ||
|
||
err := syslogparser.ParseStream(r, cb) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, 2, len(results)) | ||
require.NoError(t, results[0].Error) | ||
require.Equal(t, "First", *results[0].Message.Message()) | ||
require.NoError(t, results[1].Error) | ||
require.Equal(t, "Second", *results[1].Message.Message()) | ||
} | ||
|
||
func TestParseStream_InvalidStream(t *testing.T) { | ||
r := strings.NewReader("invalid") | ||
|
||
err := syslogparser.ParseStream(r, func(res *syslog.Result) {}) | ||
require.EqualError(t, err, "invalid or unsupported framing. first byte: 'i'") | ||
} | ||
|
||
func TestParseStream_EmptyStream(t *testing.T) { | ||
r := strings.NewReader("") | ||
|
||
err := syslogparser.ParseStream(r, func(res *syslog.Result) {}) | ||
require.Equal(t, err, io.EOF) | ||
} |
Oops, something went wrong.