Skip to content

Commit

Permalink
UDP input operator - async mode - separate between readers & processo…
Browse files Browse the repository at this point in the history
…rs (open-telemetry#27805)

**Description:** adding a feature - when async mode is enabled in the
UDP receiver (udp input operator), separating reading from processing
operations.
This is important to reduce data-loss in high scale UDP scenarios. See
original issue for more details.
The async config block is changed now. Instead of readers field
(determining the concurrency level of how many threads the udp receiver
is running, all reading from the UDP port, processing, and sending
downstream), it will now have 2 fields:
- readers - determines the concurrency level of threads only reading
from UDP port and pushing the packets to a channel.
- processors - determines the concurrency level of threads reading from
the channel, processing the packets, and sending downstream.
- max_queue_length - determines the max size of the channel between the
readers & the processors. Setting it high enough, allows to prevent
data-loss in cases of downstream temporary latency. Once channel is
full, the readers thread will stop until there's room in the queue (so
to prevent unlimited memory usage).
This improves performance and reduces UDP packet loss in high-scale
scenarios.
Note that async mode only supports this separation of readers from
processors. If async config block isn't included, the default state

**Link to tracking Issue:** 27613

**Testing:** Local stress tests ran all types of async config (no
'async', with 'async', etc.). Updating existing udp test accordingly.
Also, ran scale tests and saw improvement in data-loss.

**Documentation:** Updated md file for both udplogreceiver & stanza
udp_input operator with the new flags.

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
hovavza and djaglowski authored Oct 18, 2023
1 parent bf0a3f4 commit c2f343b
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: When async is enabled for udp receiver, separate logic into readers (only read logs from udp port and push to channel), and processors (read logs from channel and process; decode, split, add attributes, and push downstream), allowing to change concurrency level for both readers and processors separately.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27613]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 4 additions & 2 deletions pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ for other encodings available.

If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently.

**note** If `async` is not set at all, a single thread will read lines synchronously.
**note** If `async` is not set at all, a single thread will read & process lines synchronously.

| Field | Default | Description |
| --- | --- | --- |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). |
| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. |
| `max_queue_length` | 100 | Determines max number of messages which may be waiting for a processor. While the queue is full, the readers will wait until there's room (readers will not drop messages, but they will not read additional incoming messages during that period). |

### Example Configurations

Expand Down
7 changes: 5 additions & 2 deletions pkg/stanza/operator/input/udp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ func TestUnmarshal(t *testing.T) {
cfg.Encoding = "utf-8"
cfg.SplitConfig.LineStartPattern = "ABC"
cfg.SplitConfig.LineEndPattern = ""
cfg.AsyncConfig = NewAsyncConfig()
cfg.AsyncConfig.Readers = 2
cfg.AsyncConfig = &AsyncConfig{
Readers: 2,
Processors: 2,
MaxQueueLength: 100,
}
return cfg
}(),
},
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/udp/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ all_with_async:
line_start_pattern: ABC
line_end_pattern: ""
async:
readers: 2
readers: 2
processors: 2
max_queue_length: 100
156 changes: 117 additions & 39 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (

// Maximum UDP packet size
MaxUDPSize = 64 * 1024

defaultReaders = 1
defaultProcessors = 1
defaultMaxQueueLength = 100
)

func init() {
Expand Down Expand Up @@ -59,14 +63,9 @@ type Config struct {
}

type AsyncConfig struct {
Readers int `mapstructure:"readers,omitempty"`
}

// NewAsyncConfig creates a new AsyncConfig with default values.
func NewAsyncConfig() *AsyncConfig {
return &AsyncConfig{
Readers: 1,
}
Readers int `mapstructure:"readers,omitempty"`
Processors int `mapstructure:"processors,omitempty"`
MaxQueueLength int `mapstructure:"max_queue_length,omitempty"`
}

// BaseConfig is the details configuration of a udp input operator.
Expand Down Expand Up @@ -113,12 +112,16 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
resolver = helper.NewIPResolver()
}

if c.AsyncConfig == nil {
c.AsyncConfig = NewAsyncConfig()
}

if c.AsyncConfig.Readers <= 0 {
return nil, fmt.Errorf("async readers must be greater than 0")
if c.AsyncConfig != nil {
if c.AsyncConfig.Readers <= 0 {
c.AsyncConfig.Readers = defaultReaders
}
if c.AsyncConfig.Processors <= 0 {
c.AsyncConfig.Processors = defaultProcessors
}
if c.AsyncConfig.MaxQueueLength <= 0 {
c.AsyncConfig.MaxQueueLength = defaultMaxQueueLength
}
}

udpInput := &Input{
Expand All @@ -132,6 +135,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
OneLogPerPacket: c.OneLogPerPacket,
AsyncConfig: c.AsyncConfig,
}

if c.AsyncConfig != nil {
udpInput.messageQueue = make(chan messageAndAddress, c.AsyncConfig.MaxQueueLength)
}
return udpInput, nil
}

Expand All @@ -151,6 +158,14 @@ type Input struct {
encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver

messageQueue chan messageAndAddress
stopOnce sync.Once
}

type messageAndAddress struct {
Message []byte
RemoteAddr net.Addr
}

// Start will start listening for messages on a socket.
Expand All @@ -170,9 +185,20 @@ func (u *Input) Start(_ operator.Persister) error {

// goHandleMessages will handle messages from a udp connection.
func (u *Input) goHandleMessages(ctx context.Context) {
for i := 0; i < u.AsyncConfig.Readers; i++ {
if u.AsyncConfig == nil {
u.wg.Add(1)
go u.readAndProcessMessages(ctx)
return
}

for i := 0; i < u.AsyncConfig.Readers; i++ {
u.wg.Add(1)
go u.readMessagesAsync(ctx)
}

for i := 0; i < u.AsyncConfig.Processors; i++ {
u.wg.Add(1)
go u.processMessagesAsync(ctx)
}
}

Expand All @@ -193,23 +219,69 @@ func (u *Input) readAndProcessMessages(ctx context.Context) {
break
}

if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
continue
}
u.processMessage(ctx, message, remoteAddr, dec, buf)
}
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)
func (u *Input) processMessage(ctx context.Context, message []byte, remoteAddr net.Addr, dec *decode.Decoder, buf []byte) {
if u.OneLogPerPacket {
log := truncateMaxLog(message)
u.handleMessage(ctx, remoteAddr, dec, log)
return
}

scanner := bufio.NewScanner(bytes.NewReader(message))
scanner.Buffer(buf, MaxUDPSize)

scanner.Split(u.splitFunc)

scanner.Split(u.splitFunc)
for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))
}
}

for scanner.Scan() {
u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes())
func (u *Input) readMessagesAsync(ctx context.Context) {
defer u.wg.Done()

for {
message, remoteAddr, err := u.readMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
u.Errorw("Failed reading messages", zap.Error(err))
}
break
}
if err := scanner.Err(); err != nil {
u.Errorw("Scanner error", zap.Error(err))

messageAndAddr := messageAndAddress{
Message: message,
RemoteAddr: remoteAddr,
}

// Send the message to the message queue for processing
u.messageQueue <- messageAndAddr
}
}

func (u *Input) processMessagesAsync(ctx context.Context) {
defer u.wg.Done()

dec := decode.New(u.encoding)
buf := make([]byte, 0, MaxUDPSize)

for {
// Read a message from the message queue.
messageAndAddr, ok := <-u.messageQueue
if !ok {
return // Channel closed, exit the goroutine.
}

u.processMessage(ctx, messageAndAddr.Message, messageAndAddr.RemoteAddr, dec, buf)
}
}

Expand Down Expand Up @@ -274,18 +346,24 @@ func (u *Input) readMessage() ([]byte, net.Addr, error) {

// Stop will stop listening for udp messages.
func (u *Input) Stop() error {
if u.cancel == nil {
return nil
}
u.cancel()
if u.connection != nil {
if err := u.connection.Close(); err != nil {
u.Errorf("failed to close UDP connection: %s", err)
u.stopOnce.Do(func() {
if u.AsyncConfig != nil {
close(u.messageQueue)
}
}
u.wg.Wait()
if u.resolver != nil {
u.resolver.Stop()
}

if u.cancel == nil {
return
}
u.cancel()
if u.connection != nil {
if err := u.connection.Close(); err != nil {
u.Errorf("failed to close UDP connection: %s", err)
}
}
u.wg.Wait()
if u.resolver != nil {
u.resolver.Stop()
}
})
return nil
}
7 changes: 5 additions & 2 deletions pkg/stanza/operator/input/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ func TestInput(t *testing.T) {
t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg))
t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg))

cfg.AsyncConfig = NewAsyncConfig()
cfg.AsyncConfig.Readers = 2
cfg.AsyncConfig = &AsyncConfig{
Readers: 2,
Processors: 2,
MaxQueueLength: 100,
}
t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg))
}

Expand Down
6 changes: 4 additions & 2 deletions receiver/udplogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Receives logs over UDP.
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] |
| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] |
| `multiline` | | A `multiline` configuration block. See below for details |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details |
Expand Down Expand Up @@ -78,7 +78,9 @@ If set, the `async` configuration block instructs the `udp_input` operator to re

| Field | Default | Description |
| --- | --- | --- |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). |
| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). |
| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. |
| `max_queue_length` | 100 | Determines max length of channel being used by async reader routines. When channel reaches max number, reader routine will block until channel has room. |

## Example Configurations

Expand Down
7 changes: 6 additions & 1 deletion receiver/udplogreceiver/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ func TestUdp(t *testing.T) {
func TestUdpAsync(t *testing.T) {
listenAddress := "127.0.0.1:29019"
cfg := testdataConfigYaml(listenAddress)
cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig()
cfg.InputConfig.AsyncConfig = &udp.AsyncConfig{
Readers: 2,
Processors: 2,
MaxQueueLength: 100,
}

cfg.InputConfig.AsyncConfig.Readers = 2
testUDP(t, testdataConfigYaml(listenAddress), listenAddress)
}
Expand Down

0 comments on commit c2f343b

Please sign in to comment.