Skip to content

Commit

Permalink
Move ignore_outgoing to packetbeat (#2393)
Browse files Browse the repository at this point in the history
* Update documentation
* Update CHANGELOG
* Remove option from test config files as not needed
* Add option to migration script
  • Loading branch information
tsg authored Aug 31, 2016
2 parents 34d5825 + 50c8384 commit 549377e
Show file tree
Hide file tree
Showing 22 changed files with 74 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Packetbeat*
- Group HTTP fields under `http.request` and `http.response` {pull}2167[2167]
- Export `http.request.body` and `http.response.body` when configured under `include_body_for` {pull}2167[2167]
- Move `ignore_outgoing` config to `packetbeat.ignore_outgoing` {pull}2393[2393]

*Topbeat*

Expand Down
5 changes: 0 additions & 5 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,6 @@ filebeat.prospectors:
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
5 changes: 0 additions & 5 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ tags: [
{%- endif -%}
]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
# ignore_outgoing: true

{% if geoip_paths is not none %}
geoip:
paths: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,18 @@ output:



############################# Shipper #########################################

shipper:
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
# If this options is not defined, the hostname is used.
#name:

# The tags of the shipper are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true
############################# Beat #########################################

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
# If this options is not defined, the hostname is used.
#name:

# The tags of the shipper are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]



############################# Logging #########################################
Expand Down
5 changes: 0 additions & 5 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
37 changes: 0 additions & 37 deletions libbeat/docs/generalconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ Here is an example configuration:
# logical properties.
tags: ["service-X", "web-tier"]
# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
ignore_outgoing: true
# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
refresh_topology_freq: 10
Expand Down Expand Up @@ -124,38 +119,6 @@ fields:
region: us-east-1
------------------------------------------------------------------------------

===== ignore_outgoing

If the `ignore_outgoing` option is enabled, the Beat ignores all the
transactions initiated from the server running the Beat.

This is useful when two Beats publish the same transactions. Because one Beat
sees the transaction in its outgoing queue and the other sees it in its incoming
queue, you can end up with duplicate transactions. To remove the duplicates, you
can enable the `ignore_outgoing` option on one of the servers.

For example, in the following scenario, you see a 3-server architecture
where a Beat is installed on each server. t1 is the transaction exchanged between
Server1 and Server2, and t2 is the transaction between Server2 and Server3.

image:./images/option_ignore_outgoing.png[Beats Architecture]

By default, each transaction is indexed twice because Beat2
sees both transactions. So you would see the following published transactions
(when `ignore_outgoing` is false):

- Beat1: t1
- Beat2: t1 and t2
- Beat3: t2

To avoid duplicates, you can force your Beats to send only the incoming
transactions and ignore the transactions created by the local server. So you would
see the following published transactions (when `ignore_outgoing` is true):

- Beat1: none
- Beat2: t1
- Beat3: t2

===== refresh_topology_freq

The refresh interval of the topology map in
Expand Down
7 changes: 0 additions & 7 deletions libbeat/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type BeatPublisher struct {
Index string
Output []*outputWorker
TopologyOutput outputs.TopologyOutputer
ignoreOutgoing bool
geoLite *libgeo.GeoIP
Processors *processors.Processors

Expand Down Expand Up @@ -87,7 +86,6 @@ type ShipperConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to each event.
Name string `config:"name"`
RefreshTopologyFreq time.Duration `config:"refresh_topology_freq"`
Ignore_outgoing bool `config:"ignore_outgoing"`
Topology_expire int `config:"topology_expire"`
Geoip common.Geoip `config:"geoip"`

Expand Down Expand Up @@ -145,10 +143,6 @@ func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP {
return publisher.geoLite
}

func (publisher *BeatPublisher) IgnoreOutgoing() bool {
return publisher.ignoreOutgoing
}

func (publisher *BeatPublisher) Connect() Client {
atomic.AddUint32(&publisher.numClients, 1)
return newClient(publisher)
Expand Down Expand Up @@ -207,7 +201,6 @@ func (publisher *BeatPublisher) init(
processors *processors.Processors,
) error {
var err error
publisher.ignoreOutgoing = shipper.Ignore_outgoing
publisher.Processors = processors

publisher.disabled = *publishDisabled
Expand Down
9 changes: 6 additions & 3 deletions libbeat/scripts/migrate_beat_config_1_x_to_5_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def migrate_packetbeat(content):
Changes things like `interfaces:` to `packetbeat.interfaces:`
at the top level.
"""
sections = ["interfaces", "protocols", "procs", "runoptions"]
sections = ["interfaces", "protocols", "procs", "runoptions", "ignore_outgoing"]
lines = content.splitlines()
outlines = []
for line in lines:
Expand Down Expand Up @@ -61,8 +61,9 @@ def main():

with open(args.file, "r") as f:
content = f.read()
out = migrate_packetbeat(content)
out = migrate_shipper(out)
# Shipper must be migrated first for ignore_outgoing to be applied properly
out = migrate_shipper(content)
out = migrate_packetbeat(out)

if args.dry:
print(out)
Expand Down Expand Up @@ -92,6 +93,7 @@ def test_migrate_packetbeat():
ports: [53]
runoptions:
procs:
ignore_outgoing: true
"""

output = migrate_packetbeat(test)
Expand All @@ -109,6 +111,7 @@ def test_migrate_packetbeat():
ports: [53]
packetbeat.runoptions:
packetbeat.procs:
packetbeat.ignore_outgoing: true
"""


Expand Down
6 changes: 0 additions & 6 deletions libbeat/tests/system/config/mockbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ tags: [
{%- endif -%}]


# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
# ignore_outgoing: true



############################# Output ############################################

Expand Down
5 changes: 0 additions & 5 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ metricbeat.modules:
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
5 changes: 0 additions & 5 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ tags: [
{%- endif -%}
]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
# ignore_outgoing: true

{% if geoip_paths is not none %}
geoip:
paths: [
Expand Down
3 changes: 2 additions & 1 deletion packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (pb *Packetbeat) init(b *beat.Beat) error {
if b.Config.Shipper.BulkQueueSize != nil {
bulkQueueSize = *b.Config.Shipper.BulkQueueSize
}
pb.Pub, err = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize)

pb.Pub, err = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize, pb.Config.IgnoreOutgoing)
if err != nil {
return fmt.Errorf("Initializing publisher failed: %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
)

type Config struct {
Interfaces InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*common.Config `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
RunOptions droppriv.RunOptions
Interfaces InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*common.Config `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
IgnoreOutgoing bool `config:"ignore_outgoing"`
RunOptions droppriv.RunOptions
}

type InterfacesConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,38 @@ NOTE: This setting disables automatic generation of the BPF filter. If
you use this setting, it's your responsibility to keep the BPF filters in sync with the
ports defined in the `protocols` section.

===== ignore_outgoing

If the `ignore_outgoing` option is enabled, Packetbeat ignores all the
transactions initiated from the server running Packetbeat.

This is useful when two Packetbeat instances publish the same transactions. Because one Packetbeat
sees the transaction in its outgoing queue and the other sees it in its incoming
queue, you can end up with duplicate transactions. To remove the duplicates, you
can enable the `packetbeat.ignore_outgoing` option on one of the servers.

For example, in the following scenario, you see a 3-server architecture
where a Beat is installed on each server. t1 is the transaction exchanged between
Server1 and Server2, and t2 is the transaction between Server2 and Server3.

image:./images/option_ignore_outgoing.png[Beats Architecture]

By default, each transaction is indexed twice because Beat2
sees both transactions. So you would see the following published transactions
(when `ignore_outgoing` is false):

- Beat1: t1
- Beat2: t1 and t2
- Beat3: t2

To avoid duplicates, you can force your Beats to send only the incoming
transactions and ignore the transactions created by the local server. So you would
see the following published transactions (when `ignore_outgoing` is true):

- Beat1: none
- Beat2: t1
- Beat3: t2


[[configuration-flows]]
=== Flows Configuration
Expand Down
4 changes: 4 additions & 0 deletions packetbeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,7 @@ packetbeat.protocols.nfs:
# - process: app
# cmdline_grep: gunicorn

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#packetbeat.ignore_outgoing: true
9 changes: 4 additions & 5 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ packetbeat.protocols.nfs:
# - process: app
# cmdline_grep: gunicorn

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#packetbeat.ignore_outgoing: true

#================================ General =====================================

Expand All @@ -468,11 +472,6 @@ packetbeat.protocols.nfs:
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type TopologyProvider interface {
IsPublisherIP(ip string) bool
GetServerName(ip string) string
GeoLite() *libgeo.GeoIP
IgnoreOutgoing() bool
}

func (t *ChanTransactions) PublishTransaction(event common.MapStr) bool {
Expand All @@ -58,6 +57,7 @@ var debugf = logp.MakeDebug("publish")
func NewPublisher(
pub publisher.Publisher,
hwm, bulkHWM int,
ignoreOutgoing bool,
) (*PacketbeatPublisher, error) {
topo, ok := pub.(TopologyProvider)
if !ok {
Expand All @@ -68,7 +68,7 @@ func NewPublisher(
pub: pub,
topo: topo,
geoLite: topo.GeoLite(),
ignoreOutgoing: topo.IgnoreOutgoing(),
ignoreOutgoing: ignoreOutgoing,
client: pub.Connect(),
done: make(chan struct{}),
trans: make(chan common.MapStr, hwm),
Expand Down
6 changes: 3 additions & 3 deletions packetbeat/publish/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestFilterEvent(t *testing.T) {

func TestDirectionOut(t *testing.T) {
publisher := newTestPublisher([]string{"192.145.2.4"})
ppub, _ := NewPublisher(publisher, 1000, 1)
ppub, _ := NewPublisher(publisher, 1000, 1, false)

event := common.MapStr{
"src": &common.Endpoint{
Expand All @@ -89,7 +89,7 @@ func TestDirectionOut(t *testing.T) {

func TestDirectionIn(t *testing.T) {
publisher := newTestPublisher([]string{"192.145.2.5"})
ppub, _ := NewPublisher(publisher, 1000, 1)
ppub, _ := NewPublisher(publisher, 1000, 1, false)

event := common.MapStr{
"src": &common.Endpoint{
Expand Down Expand Up @@ -121,7 +121,7 @@ func newTestPublisher(ips []string) *publisher.BeatPublisher {

func TestNoDirection(t *testing.T) {
publisher := newTestPublisher([]string{"192.145.2.6"})
ppub, _ := NewPublisher(publisher, 1000, 1)
ppub, _ := NewPublisher(publisher, 1000, 1, false)

event := common.MapStr{
"src": &common.Endpoint{
Expand Down
Loading

0 comments on commit 549377e

Please sign in to comment.