Skip to content

Commit

Permalink
amqp protocol
Browse files Browse the repository at this point in the history
the FIELDS

updated Notes field when parsing of a Table fails, when no exchange, the field exchange is no longer displayed instead of none, commented the two Timestamp tests that made the Travis build failed

gofmted whole packetbeat code

empty exchange not displayed, connection.close and channel.close are now waiting for a close ok method. if the close ok method never arrives, the transaction is published in the expireTransaction function and the Notes field is updated to indicate it

Create a proper ICMP only BPF filter

Update to golang 1.5.3

Filebeat system tests adapted to new structure

Topbeat system tests updated

Move implementation of packetbeat system tests to new base

Not all code was moved to far, as there are some special implementations in packetbeat

Update winlogbeat to the newest beat system tests

Apply flake8

Fix 769: building the test binary includes all vendor package

"go test -c -covermode=atomic -coverpkg" is called with a list of packages isntead of "./..."

Add PyYAML to Windows vagrant box.

Change system test kill_and_wait method to gracefully stop processes on Windows

Fixes elastic#599

Refactor beat exit

* Introduce Signal function which is called if using CTRL-C or similar
* Run now returns an error and doesn't exist itself anymore
* Fix spooler and crawler shutdown issue
* Update mockbeat to check Run return error.

Thanks to @cyrilleverrier for his contribution here.

Fix elastic#779: libbeat/Makefile filters vendor folder

all the files inside the ./vendor/* folder are now excluded
when executing:
	make fmt
	make simplify
	make vet

Edit new/changed content added to topbeat for 1.1

Minor fix to awkward sentence

move preprocessor from libbeat and move packetbeat

- remove preprocessor worker completely from libbeat
- introduce transaction publisher in packetbeat to event processing:
  - GeoIP
  - normalize addresses
- simplify sync/async publisher client
- update new_protocol docs and changelog

Validate length in parser

Check length in pgsql parser before parsing column content in case column
length > buffer length.

changed results member of amqp struct to publish.Transactions, updated test file

handling of a message splitted in several body frames, need to test it. began to parse connection methods

handling of connection information methods, like connection.start, connection.open or channel.open, added a hide_connection_information in config set to true by default to choose to display them or not

fixed bytes in bytes out for classic methods, added basic qos method in connection information option, channel and connections methods with an error code above or equal to 300 are always published, some new functions are added to see if method reflects an error in the protocol

removed useless condition in expireTransaction

added some pcap tests

added three tests for amqp and their pcap files, corrected the fields.yml and added the amqp part in the packetbeat.yml.j2 for tests

camelcased structs, created parser file

updated the yaml with the last option, removed amqp.response field in the connectionStartOkMethod since it can hold the credentials of the client when a plain auth is selected

fixed yaml, camelcased const, some style fixes in code

added hasProperty function, added verbose in testing
  • Loading branch information
Developpement Web Thomas Paulmyer authored and Thomas Paulmyer committed Feb 9, 2016
1 parent 31c7e04 commit e973db0
Show file tree
Hide file tree
Showing 28 changed files with 3,935 additions and 5 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]

*Affecting all Beats*
- Run function to start a beat no returns an error instead of directly exiting. {pull}771[771]
- Some publisher options refactoring in libbeat {pull}684[684]
- Run function to start a beat no returns an error instead of directly exiting. {pull}771[771]
- Move event preprocessor applying GeoIP to packetbeat {pull}772[772]

*Packetbeat*
- Rename output fields in the dns package. Former flag `recursion_allowed` becomes `recursion_available`. {pull}803[803]
Expand Down Expand Up @@ -41,6 +44,10 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Harden pgsql parser. {issue}565[565]

*Packetbeat*
- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557]
- Allow PF_RING sniffer type to be configured using pf_ring or pfring {pull}671[671]
- Create a proper BPF filter when ICMP is the only enabled protocol {issue}757[757]
- Check column length in pgsql parser. {issue}565{565

*Topbeat*

Expand All @@ -56,6 +63,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]

*Affecting all Beats*
- Update builds to Golang version 1.5.3
- Make logstash output compression level configurable. {pull}630[630]
- Add ability to override configuration settings using environment variables {issue}114[114]
- Libbeat now always exits through a single exit method for proper cleanup and control {pull}736[736]
- Add ability to create Elasticsearch mapping on startup {pull}639[639]
Expand Down
26 changes: 26 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM golang:1.5.3
MAINTAINER Nicolas Ruflin <ruflin@elastic.co>

RUN set -x && \
apt-get update && \
apt-get install -y netcat && \
apt-get clean


## Install go package dependencies
RUN set -x \
go get \
github.com/pierrre/gotestcover \
github.com/tsg/goautotest \
golang.org/x/tools/cmd/cover \
golang.org/x/tools/cmd/vet

COPY libbeat/scripts/docker-entrypoint.sh /entrypoint.sh

ENV GO15VENDOREXPERIMENT=1

RUN mkdir -p /etc/pki/tls/certs
COPY testing/environments/docker/logstash/pki/tls/certs/logstash.crt /etc/pki/tls/certs/logstash.crt

# Create a copy of the respository inside the container.
COPY . /go/src/github.com/elastic/beats/
9 changes: 9 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
#
# solaris
# -------------------
<<<<<<< HEAD
# - Use gmake instead of make.
#
# freebsd
# -------------------
# - Use gmake instead of make.
# - Folder syncing doesn't work well. Consider copying the files into the box or
# cloning the project inside the box.
=======
# More development boxes can be added to this file and you can run commands
# like "vagrant up solaris" or "vagrant up winxp" to start them.
>>>>>>> amqp protocol

# Provisioning for Windows PowerShell
$winPsProvision = <<SCRIPT
Expand Down Expand Up @@ -59,6 +64,10 @@ Vagrant.configure(2) do |config|

# Windows Server 2012 R2
config.vm.define "win2012", primary: true do |win2012|
<<<<<<< HEAD
=======
# Windows Server 2012 R2
>>>>>>> amqp protocol
win2012.vm.box = "https://s3.amazonaws.com/beats-files/vagrant/beats-win2012-r2-virtualbox-2016-01-20_0057.box"
win2012.vm.guest = :windows

Expand Down
129 changes: 129 additions & 0 deletions filebeat/beat/spooler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package beat

import (
"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
)

type Spooler struct {
Filebeat *Filebeat
exit chan struct{}
nextFlushTime time.Time
spool []*input.FileEvent
Channel chan *input.FileEvent
}

func NewSpooler(filebeat *Filebeat) *Spooler {
spooler := &Spooler{
Filebeat: filebeat,
exit: make(chan struct{}),
}

config := &spooler.Filebeat.FbConfig.Filebeat

// Set the next flush time
spooler.nextFlushTime = time.Now().Add(config.IdleTimeoutDuration)
spooler.Channel = make(chan *input.FileEvent, 16)

return spooler
}

func (spooler *Spooler) Config() error {
config := &spooler.Filebeat.FbConfig.Filebeat

// Set default pool size if value not set
if config.SpoolSize == 0 {
config.SpoolSize = cfg.DefaultSpoolSize
}

// Set default idle timeout if not set
if config.IdleTimeout == "" {
logp.Debug("spooler", "Set idleTimeoutDuration to %s", cfg.DefaultIdleTimeout)
// Set it to default
config.IdleTimeoutDuration = cfg.DefaultIdleTimeout
} else {
var err error

config.IdleTimeoutDuration, err = time.ParseDuration(config.IdleTimeout)

if err != nil {
logp.Warn("Failed to parse idle timeout duration '%s'. Error was: %v", config.IdleTimeout, err)
return err
}
}

return nil
}

// Run runs the spooler
// It heartbeats periodically. If the last flush was longer than
// 'IdleTimeoutDuration' time ago, then we'll force a flush to prevent us from
// holding on to spooled events for too long.
func (s *Spooler) Run() {

config := &s.Filebeat.FbConfig.Filebeat

// Sets up ticket channel
ticker := time.NewTicker(config.IdleTimeoutDuration / 2)

s.spool = make([]*input.FileEvent, 0, config.SpoolSize)

logp.Info("Starting spooler: spool_size: %v; idle_timeout: %s", config.SpoolSize, config.IdleTimeoutDuration)

// Loops until running is set to false
for {
select {

case <-s.exit:
break
case event, ok := <-s.Channel:
if ok {
s.spool = append(s.spool, event)

// Spooler is full -> flush
if len(s.spool) == cap(s.spool) {
logp.Debug("spooler", "Flushing spooler because spooler full. Events flushed: %v", len(s.spool))
s.flush()
}
}
case <-ticker.C:
// Flush periodically
if time.Now().After(s.nextFlushTime) {
logp.Debug("spooler", "Flushing spooler because of timeout. Events flushed: %v", len(s.spool))
s.flush()
}
}
}
}

// Stop stops the spooler. Flushes events before stopping
func (s *Spooler) Stop() {
logp.Info("Stopping spooler")
close(s.exit)

// Flush again before exiting spooler and closes channel
logp.Info("Spooler last flush spooler")
s.flush()
close(s.Channel)
}

// flush flushes all event and sends them to the publisher
func (s *Spooler) flush() {
// Checks if any new objects
if len(s.spool) > 0 {

// copy buffer
tmpCopy := make([]*input.FileEvent, len(s.spool))
copy(tmpCopy, s.spool)

// clear buffer
s.spool = s.spool[:0]

// send
s.Filebeat.publisherChan <- tmpCopy
}
s.nextFlushTime = time.Now().Add(s.Filebeat.FbConfig.Filebeat.IdleTimeoutDuration)
}
1 change: 1 addition & 0 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def test_no_paths_defined(self):

filebeat.check_kill_and_wait(exit_code=1)


def test_files_added_late(self):
"""
Tests that prospectors stay running even though no harvesters are started yet
Expand Down
25 changes: 25 additions & 0 deletions libbeat/scripts/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ build: $(GOFILES)
# Create test coverage binary
.PHONY: buildbeat.test
buildbeat.test: $(GOFILES)
<<<<<<< HEAD
go test $(RACE) -c -coverpkg ${GOPACKAGES_COMMA_SEP}
=======
go test -c -covermode=atomic -coverpkg ${GOPACKAGES_COMMA_SEP}
>>>>>>> amqp protocol

# Cross-compile beat for the OS'es specified in GOX_OS variable.
# The binaries are placed in the build/bin directory.
Expand All @@ -71,7 +75,11 @@ crosscompile: $(GOFILES)
# Checks project and source code if everything is according to standard
.PHONY: check
check:
<<<<<<< HEAD
@gofmt -l ${GOFILES_NOVENDOR} | (! grep . -q) || (echo "Code differs from gofmt's style" && false)
=======
@gofmt -l ${GOFILES_NOVENDOR} | read && echo "Code differs from gofmt's style" 1>&2 && exit 1 || true
>>>>>>> amqp protocol
go vet ${GOPACKAGES}

# Runs gofmt -w on the project's source code, modifying any files that do not
Expand Down Expand Up @@ -137,12 +145,18 @@ integration-tests-environment:

# Runs the system tests
.PHONY: system-tests
<<<<<<< HEAD
system-tests: buildbeat.test prepare-tests python-env
. ${PYTHON_ENV}/bin/activate; nosetests -w tests/system --process-timeout=$(TIMEOUT) --with-timer
=======
system-tests: buildbeat.test prepare-tests system-tests-setup
. build/system-tests/env/bin/activate; nosetests -w tests/system --process-timeout=$(TIMEOUT) --with-timer
>>>>>>> amqp protocol
python ${ES_BEATS}/dev-tools/aggregate_coverage.py -o ${COVERAGE_DIR}/system.cov ./build/system-tests/run

# Runs system tests without coverage reports and in parallel
.PHONY: fast-system-tests
<<<<<<< HEAD
fast-system-tests: buildbeat.test python-env
. ${PYTHON_ENV}/bin/activate; nosetests -w tests/system --processes=$(PROCESSES) --process-timeout=$(TIMEOUT)

Expand All @@ -155,6 +169,17 @@ python-env: ${ES_BEATS}/libbeat/tests/system/requirements.txt
else \
. ${PYTHON_ENV}/bin/activate && pip install -Ur ${ES_BEATS}/libbeat/tests/system/requirements.txt ; \
fi
=======
fast-system-tests: buildbeat.test system-tests-setup
. build/system-tests/env/bin/activate; nosetests -w tests/system --processes=$(PROCESSES) --process-timeout=$(TIMEOUT)

.PHONY: system-tests-setup
system-tests-setup: tests/system/requirements.txt
test -d env || virtualenv build/system-tests/env > /dev/null
. build/system-tests/env/bin/activate && pip install -Ur tests/system/requirements.txt > /dev/null
touch build/system-tests/env/bin/activate

>>>>>>> amqp protocol

# Run benchmark tests
.PHONY: benchmark-tests
Expand Down
2 changes: 2 additions & 0 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/amqp"
"github.com/elastic/beats/packetbeat/protos/dns"
"github.com/elastic/beats/packetbeat/protos/http"
"github.com/elastic/beats/packetbeat/protos/icmp"
Expand All @@ -33,6 +34,7 @@ import (

var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[protos.Protocol]protos.ProtocolPlugin{
protos.HttpProtocol: new(http.HTTP),
protos.AmqpProtocol: new(amqp.Amqp),
protos.MemcacheProtocol: new(memcache.Memcache),
protos.MysqlProtocol: new(mysql.Mysql),
protos.PgsqlProtocol: new(pgsql.Pgsql),
Expand Down
9 changes: 9 additions & 0 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type InterfacesConfig struct {

type Protocols struct {
Icmp Icmp
Amqp Amqp
Dns Dns
Http Http
Memcache Memcache
Expand All @@ -59,6 +60,14 @@ type Icmp struct {
TransactionTimeout *int `yaml:"transaction_timeout"`
}

type Amqp struct {
ProtocolCommon `yaml:",inline"`
ParseHeaders *bool `yaml:"parse_headers"`
ParseArguments *bool `yaml:"parse_arguments"`
MaxBodyLength *int `yaml:"max_body_length"`
HideConnectionInformation *bool `yaml:"hide_connection_information"`
}

type Dns struct {
ProtocolCommon `yaml:",inline"`
Include_authorities *bool
Expand Down
23 changes: 22 additions & 1 deletion packetbeat/etc/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ protocols:
# Enable ICMPv4 and ICMPv6 monitoring. Default: false
enabled: true

amqp:
# Configure the ports where to listen for AMQP traffic. You can disable
# the AMQP protocol by commenting out the list of ports.
ports: [5672]
# Truncate messages that are published and avoid huge messages being
# indexed.
# Default: 1000
#max_body_length: 1000

# Hide the header fields in header frames.
# Default: false
#parse_headers: false

# Hide the additional arguments of method frames.
# Default: false
#parse_arguments: false

# Hide all methods relative to connection negociation between server and
# client.
# Default: true
#hide_connection_information: true

dns:
# Configure the ports where to listen for DNS traffic. You can disable
# the DNS protocol by commenting out the list of ports.
Expand Down Expand Up @@ -140,4 +162,3 @@ protocols:
#
# - process: app
# cmdline_grep: gunicorn

Loading

0 comments on commit e973db0

Please sign in to comment.