diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index c5391b5239ff..23230265c179 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -47,6 +47,8 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff]
*Packetbeat*
+- Limit memory usage of Redis replication sessions. {issue}12657[12657]
+
*Winlogbeat*
*Functionbeat*
diff --git a/packetbeat/_meta/beat.reference.yml b/packetbeat/_meta/beat.reference.yml
index f91827b33ba3..dac12dddbb23 100644
--- a/packetbeat/_meta/beat.reference.yml
+++ b/packetbeat/_meta/beat.reference.yml
@@ -329,6 +329,15 @@ packetbeat.protocols:
# incoming responses, but sent to Elasticsearch immediately.
#transaction_timeout: 10s
+ # Max size for per-session message queue. This places a limit on the memory
+ # that can be used to buffer requests and responses for correlation.
+ #queue_max_bytes: 1048576
+
+ # Max number of messages for per-session message queue. This limits the number
+ # of requests or responses that can be buffered for correlation. Set a value
+ # large enough to allow for pipelining.
+ #queue_max_messages: 20000
+
- type: thrift
# Enable thrift monitoring. Default: true
#enabled: true
diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc
index 071bbed5ae18..6f013f5c39a0 100644
--- a/packetbeat/docs/packetbeat-options.asciidoc
+++ b/packetbeat/docs/packetbeat-options.asciidoc
@@ -1273,6 +1273,39 @@ include the raw certificate encoded in PEM format as a `raw` field.
If `send_certificates` is false, this setting is ignored. The default is false.
+[[packetbeat-redis-options]]
+=== Capture Redis traffic
+
+++++
+Redis
+++++
+
+The Redis protocol has several specific configuration options. Here is a
+sample configuration for the `redis` section of the +{beatname_lc}.yml+ config file:
+
+[source,yaml]
+------------------------------------------------------------------------------
+packetbeat.protocols:
+- type: redis
+ ports: [6379]
+ queue_max_bytes: 1048576
+ queue_max_messages: 20000
+------------------------------------------------------------------------------
+
+==== Configuration options
+
+Also see <>.
+
+===== `queue_max_bytes` and `queue_max_messages`
+
+In order for request/response correlation to work, {beatname_uc} needs to
+store requests in memory until a response is received. These settings impose
+a limit on the number of bytes (`queue_max_bytes`) and number of requests
+(`queue_max_messages`) that can be stored. These limits are per-connection.
+The default is to queue up to 1MB or 20.000 requests per connection, which
+allows to use request pipelining while at the same time limiting the amount
+of memory consumed by replication sessions.
+
[[configuration-processes]]
== Specify which processes to monitor
diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml
index 2258cbb38155..8258d5fd6e6e 100644
--- a/packetbeat/packetbeat.reference.yml
+++ b/packetbeat/packetbeat.reference.yml
@@ -329,6 +329,15 @@ packetbeat.protocols:
# incoming responses, but sent to Elasticsearch immediately.
#transaction_timeout: 10s
+ # Max size for per-session message queue. This places a limit on the memory
+ # that can be used to buffer requests and responses for correlation.
+ #queue_max_bytes: 1048576
+
+ # Max number of messages for per-session message queue. This limits the number
+ # of requests or responses that can be buffered for correlation. Set a value
+ # large enough to allow for pipelining.
+ #queue_max_messages: 20000
+
- type: thrift
# Enable thrift monitoring. Default: true
#enabled: true
diff --git a/packetbeat/protos/redis/config.go b/packetbeat/protos/redis/config.go
index b71f59c31dda..0450923ca898 100644
--- a/packetbeat/protos/redis/config.go
+++ b/packetbeat/protos/redis/config.go
@@ -24,6 +24,7 @@ import (
type redisConfig struct {
config.ProtocolCommon `config:",inline"`
+ QueueLimits MessageQueueConfig `config:",inline"`
}
var (
@@ -31,5 +32,9 @@ var (
ProtocolCommon: config.ProtocolCommon{
TransactionTimeout: protos.DefaultTransactionExpiration,
},
+ QueueLimits: MessageQueueConfig{
+ MaxBytes: 1024 * 1024,
+ MaxMessages: 20000,
+ },
}
)
diff --git a/packetbeat/protos/redis/messagequeue.go b/packetbeat/protos/redis/messagequeue.go
new file mode 100644
index 000000000000..167ec5bd7f8c
--- /dev/null
+++ b/packetbeat/protos/redis/messagequeue.go
@@ -0,0 +1,117 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package redis
+
+import (
+ "math"
+)
+
+// Message interface needs to be implemented by types in order to be stored
+// in a MessageQueue.
+type Message interface {
+ // Size returns the size of the current element.
+ Size() int
+}
+
+type listEntry struct {
+ item Message
+ next *listEntry
+}
+
+// MessageQueue defines a queue that automatically evicts messages based on
+// the total size or number of elements contained.
+type MessageQueue struct {
+ head, tail *listEntry
+ bytesAvail int64
+ slotsAvail int32
+}
+
+// MessageQueueConfig represents the configuration for a MessageQueue.
+// Setting any limit to zero disables the limit.
+type MessageQueueConfig struct {
+ // MaxBytes is the maximum number of bytes that can be stored in the queue.
+ MaxBytes int64 `config:"queue_max_bytes"`
+
+ // MaxMessages sets a limit on the number of messages that the queue can hold.
+ MaxMessages int32 `config:"queue_max_messages"`
+}
+
+// NewMessageQueue creates a new MessageQueue with the given configuration.
+func NewMessageQueue(c MessageQueueConfig) (queue MessageQueue) {
+ queue.bytesAvail = c.MaxBytes
+ if queue.bytesAvail <= 0 {
+ queue.bytesAvail = math.MaxInt64
+ }
+ queue.slotsAvail = c.MaxMessages
+ if queue.slotsAvail <= 0 {
+ queue.slotsAvail = math.MaxInt32
+ }
+ return queue
+}
+
+// Append appends a new message into the queue, returning the number of
+// messages evicted to make room, if any.
+func (ml *MessageQueue) Append(msg Message) (evicted int) {
+ size := int64(msg.Size())
+ evicted = ml.adjust(size)
+ ml.slotsAvail--
+ ml.bytesAvail -= size
+ entry := &listEntry{
+ item: msg,
+ }
+ if ml.tail == nil {
+ ml.head = entry
+ } else {
+ ml.tail.next = entry
+ }
+ ml.tail = entry
+ return evicted
+}
+
+// IsEmpty returns if the MessageQueue is empty.
+func (ml *MessageQueue) IsEmpty() bool {
+ return ml.head == nil
+}
+
+// Pop returns the oldest message in the queue, if any.
+func (ml *MessageQueue) Pop() Message {
+ if ml.head == nil {
+ return nil
+ }
+
+ msg := ml.head
+ ml.head = msg.next
+ if ml.head == nil {
+ ml.tail = nil
+ }
+ ml.slotsAvail++
+ ml.bytesAvail += int64(msg.item.Size())
+ return msg.item
+}
+
+func (ml *MessageQueue) adjust(msgSize int64) (evicted int) {
+ if ml.slotsAvail == 0 {
+ ml.Pop()
+ evicted++
+ }
+ for ml.bytesAvail < msgSize && !ml.IsEmpty() {
+ ml.Pop()
+ evicted++
+ }
+ return evicted
+}
diff --git a/packetbeat/protos/redis/messagequeue_test.go b/packetbeat/protos/redis/messagequeue_test.go
new file mode 100644
index 000000000000..9909c7cfad5a
--- /dev/null
+++ b/packetbeat/protos/redis/messagequeue_test.go
@@ -0,0 +1,128 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package redis
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type testMessage int
+
+func (t testMessage) Size() int {
+ return int(t)
+}
+
+func TestMessageList_Append(t *testing.T) {
+ for _, test := range []struct {
+ title string
+ maxBytes int64
+ maxCount int32
+ input []int
+ expected []int
+ }{
+ {
+ title: "unbounded queue",
+ maxBytes: 0,
+ maxCount: 0,
+ input: []int{1, 2, 3, 4, 5},
+ expected: []int{1, 2, 3, 4, 5},
+ },
+ {
+ title: "count limited",
+ maxBytes: 0,
+ maxCount: 3,
+ input: []int{1, 2, 3, 4, 5},
+ expected: []int{3, 4, 5},
+ },
+ {
+ title: "count limit boundary",
+ maxBytes: 0,
+ maxCount: 3,
+ input: []int{1, 2, 3},
+ expected: []int{1, 2, 3},
+ },
+ {
+ title: "size limited",
+ maxBytes: 10,
+ maxCount: 0,
+ input: []int{1, 2, 3, 4, 5},
+ expected: []int{4, 5},
+ },
+ {
+ title: "size limited boundary",
+ maxBytes: 10,
+ maxCount: 0,
+ input: []int{1, 2, 3, 4},
+ expected: []int{1, 2, 3, 4},
+ },
+ {
+ title: "excess size",
+ maxBytes: 10,
+ maxCount: 0,
+ input: []int{1, 2, 3, 100},
+ expected: []int{100},
+ },
+ {
+ title: "excess size 2",
+ maxBytes: 10,
+ maxCount: 0,
+ input: []int{100, 1},
+ expected: []int{1},
+ },
+ {
+ title: "excess size 3",
+ maxBytes: 10,
+ maxCount: 0,
+ input: []int{1, 2, 3, 4, 5, 5},
+ expected: []int{5, 5},
+ },
+ {
+ title: "both",
+ maxBytes: 10,
+ maxCount: 3,
+ input: []int{3, 4, 2, 1},
+ expected: []int{4, 2, 1},
+ },
+ } {
+ t.Run(test.title, func(t *testing.T) {
+ conf := MessageQueueConfig{
+ MaxBytes: test.maxBytes,
+ MaxMessages: test.maxCount,
+ }
+ q := NewMessageQueue(conf)
+ for _, elem := range test.input {
+ q.Append(testMessage(elem))
+ }
+ var result []int
+ for !q.IsEmpty() {
+ msg := q.Pop()
+ if !assert.NotNil(t, msg) {
+ t.FailNow()
+ }
+ value, ok := msg.(testMessage)
+ if !assert.True(t, ok) {
+ t.FailNow()
+ }
+ result = append(result, value.Size())
+ }
+ assert.Equal(t, test.expected, result)
+ })
+ }
+}
diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go
index 4e51dbb2e525..f9556250bdf5 100644
--- a/packetbeat/protos/redis/redis.go
+++ b/packetbeat/protos/redis/redis.go
@@ -40,22 +40,18 @@ type stream struct {
type redisConnectionData struct {
streams [2]*stream
- requests messageList
- responses messageList
-}
-
-type messageList struct {
- head, tail *redisMessage
+ requests MessageQueue
+ responses MessageQueue
}
// Redis protocol plugin
type redisPlugin struct {
// config
- ports []int
- sendRequest bool
- sendResponse bool
-
+ ports []int
+ sendRequest bool
+ sendResponse bool
transactionTimeout time.Duration
+ queueConfig MessageQueueConfig
results protos.Reporter
}
@@ -67,6 +63,7 @@ var (
var (
unmatchedResponses = monitoring.NewInt(nil, "redis.unmatched_responses")
+ unmatchedRequests = monitoring.NewInt(nil, "redis.unmatched_requests")
)
func init() {
@@ -106,6 +103,7 @@ func (redis *redisPlugin) setFromConfig(config *redisConfig) {
redis.sendRequest = config.SendRequest
redis.sendResponse = config.SendResponse
redis.transactionTimeout = config.TransactionTimeout
+ redis.queueConfig = config.QueueLimits
}
func (redis *redisPlugin) GetPorts() []int {
@@ -130,27 +128,33 @@ func (redis *redisPlugin) Parse(
) protos.ProtocolData {
defer logp.Recover("ParseRedis exception")
- conn := ensureRedisConnection(private)
+ conn := redis.ensureRedisConnection(private)
conn = redis.doParse(conn, pkt, tcptuple, dir)
if conn == nil {
return nil
}
return conn
}
+func (redis *redisPlugin) newConnectionData() *redisConnectionData {
+ return &redisConnectionData{
+ requests: NewMessageQueue(redis.queueConfig),
+ responses: NewMessageQueue(redis.queueConfig),
+ }
+}
-func ensureRedisConnection(private protos.ProtocolData) *redisConnectionData {
+func (redis *redisPlugin) ensureRedisConnection(private protos.ProtocolData) *redisConnectionData {
if private == nil {
- return &redisConnectionData{}
+ return redis.newConnectionData()
}
priv, ok := private.(*redisConnectionData)
if !ok {
logp.Warn("redis connection data type error, create new one")
- return &redisConnectionData{}
+ return redis.newConnectionData()
}
if priv == nil {
logp.Warn("Unexpected: redis connection data not set, create new one")
- return &redisConnectionData{}
+ return redis.newConnectionData()
}
return priv
@@ -244,29 +248,37 @@ func (redis *redisPlugin) handleRedis(
m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort())
if m.isRequest {
- conn.requests.append(m) // wait for response
+ // wait for response
+ if evicted := conn.requests.Append(m); evicted > 0 {
+ unmatchedRequests.Add(int64(evicted))
+ }
} else {
- conn.responses.append(m)
+ if evicted := conn.responses.Append(m); evicted > 0 {
+ unmatchedResponses.Add(int64(evicted))
+ }
redis.correlate(conn)
}
}
func (redis *redisPlugin) correlate(conn *redisConnectionData) {
// drop responses with missing requests
- if conn.requests.empty() {
- for !conn.responses.empty() {
+ if conn.requests.IsEmpty() {
+ for !conn.responses.IsEmpty() {
debugf("Response from unknown transaction. Ignoring")
unmatchedResponses.Add(1)
- conn.responses.pop()
+ conn.responses.Pop()
}
return
}
// merge requests with responses into transactions
- for !conn.responses.empty() && !conn.requests.empty() {
- requ := conn.requests.pop()
- resp := conn.responses.pop()
-
+ for !conn.responses.IsEmpty() && !conn.requests.IsEmpty() {
+ requ, okReq := conn.requests.Pop().(*redisMessage)
+ resp, okResp := conn.responses.Pop().(*redisMessage)
+ if !okReq || !okResp {
+ logp.Err("invalid type found in message queue")
+ continue
+ }
if redis.results != nil {
event := redis.newTransaction(requ, resp)
redis.results(event)
@@ -342,34 +354,3 @@ func (redis *redisPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8,
return private
}
-
-func (ml *messageList) append(msg *redisMessage) {
- if ml.tail == nil {
- ml.head = msg
- } else {
- ml.tail.next = msg
- }
- msg.next = nil
- ml.tail = msg
-}
-
-func (ml *messageList) empty() bool {
- return ml.head == nil
-}
-
-func (ml *messageList) pop() *redisMessage {
- if ml.head == nil {
- return nil
- }
-
- msg := ml.head
- ml.head = ml.head.next
- if ml.head == nil {
- ml.tail = nil
- }
- return msg
-}
-
-func (ml *messageList) last() *redisMessage {
- return ml.tail
-}
diff --git a/packetbeat/protos/redis/redis_parse.go b/packetbeat/protos/redis/redis_parse.go
index e023bcee38e0..783be5cb579e 100644
--- a/packetbeat/protos/redis/redis_parse.go
+++ b/packetbeat/protos/redis/redis_parse.go
@@ -44,15 +44,11 @@ type redisMessage struct {
message common.NetString
method common.NetString
path common.NetString
-
- next *redisMessage
}
-const (
- start = iota
- bulkArray
- simpleMessage
-)
+func (msg *redisMessage) Size() int {
+ return len(msg.message)
+}
var (
empty = common.NetString("")