diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md new file mode 100644 index 0000000..4c4e172 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -0,0 +1,20 @@ +--- +name: Bug +about: "Report confirmed bugs. For unconfirmed bugs please visit https://discuss.elastic.co/c/elastic-stack/elastic-agent" + +--- + +Please post all questions and issues on https://discuss.elastic.co/c/elastic-stack/elastic-agent +before opening a Github Issue. Your questions will reach a wider audience there, +and if we confirm that there is a bug, then you can open a new issue. + +For security vulnerabilities please only send reports to security@elastic.co. +See https://www.elastic.co/community/security for more information. + +Please include configurations and logs if available. + +For confirmed bugs, please report: +- Version: +- Operating System: +- Discuss Forum URL +- Steps to Reproduce: diff --git a/.github/ISSUE_TEMPLATE/feature-request.md b/.github/ISSUE_TEMPLATE/feature-request.md new file mode 100644 index 0000000..da67181 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature-request.md @@ -0,0 +1,10 @@ +--- +name: Enhancement request +about: The Elastic agent data shipper can't do all the things, but maybe it can do your things. + +--- + +**Describe the enhancement:** + +**Describe a specific use case for the enhancement or feature:** + diff --git a/.github/ISSUE_TEMPLATE/flaky-test.md b/.github/ISSUE_TEMPLATE/flaky-test.md new file mode 100644 index 0000000..3a87af9 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/flaky-test.md @@ -0,0 +1,19 @@ +--- +name: Flaky Test +about: Report a flaky test (one that doesn't pass consistently) +labels: flaky-test +--- + +## Flaky Test + +* **Test Name:** Name of the failing test. +* **Link:** Link to file/line number in github. +* **Branch:** Git branch the test was seen in. If a PR, the branch the PR was based off. +* **Artifact Link:** If available, attach the generated zip artifact associated with the stack trace for this failure. +* **Notes:** Additional details about the test. e.g. theory as to failure cause + +### Stack Trace + +``` +paste stack trace here +``` diff --git a/.github/ISSUE_TEMPLATE/question.md b/.github/ISSUE_TEMPLATE/question.md new file mode 100644 index 0000000..b020108 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/question.md @@ -0,0 +1,23 @@ +--- +name: Question +about: Who, what, when, where, and how? + +--- + +Hey, stop right there! + +We use GitHub to track feature requests and bug reports. Please do not +submit issues for questions about how to use features of the Elastic agent, how to +set the Elastic agent up, best practices, or development related help. + +However, we do want to help! Head on over to our official Elastic agent forums and ask +your questions there. In additional to awesome, knowledgeable community +contributors, core Elastic agent developers are on the forums every single day to help +you out. + +The forums are here: https://discuss.elastic.co/c/elastic-stack/elastic-agent + +We can't stop you from opening an issue here, but it will likely +linger without a response for days or weeks before it is closed and we +ask you to join us on the forums instead. Save yourself the time, and +ask on the forums today. diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..cadd55c --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,81 @@ + + +## What does this PR do? + + + +## Why is it important? + + + +## Checklist + + + +- [ ] My code follows the style guidelines of this project +- [ ] I have commented my code, particularly in hard-to-understand areas +- [ ] I have made corresponding changes to the documentation +- [ ] I have made corresponding change to the default configuration files +- [ ] I have added tests that prove my fix is effective or that my feature works +- [ ] I have added an entry in `CHANGELOG.md` or `CHANGELOG-developer.md`. + +## Author's Checklist + + +- [ ] + +## How to test this PR locally + + + +## Related issues + + +- + +## Use cases + + + +## Screenshots + + + +## Logs + + diff --git a/README.md b/README.md index 04cb851..575db53 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,66 @@ # elastic-agent-shipper -Data shipper for the Elastic Agent - single, unified way to add monitoring for logs, metrics, and other types of data to a host. + +> :warning: The Elastic agent data shipper is under active development, and will be available as an opt-in beta feature in an upcoming Elastic agent release. + +Data shipper for the Elastic Agent - a single, unified way to add monitoring for logs, metrics, and +other types of data to a host. + +The data shipper is a new process in the Elastic agent system designed to centralize local data +processing, queueuing, and publishing to the target output (Elasticsearch, Logstash, etc.). + +The data shipper is a part of a larger effort to rearchitect the Elastic agent. In the initial Elastic +agent architecture each underlying data collector (e.g. Filebeat) was required to implement its own +processing, queueing, and output connection(s) for each supported output type. The data shipper +simplifies this architecture by allowing data collectors to implement a single gRPC client to +process, queue, and publish data. The initial design goals of the data shipper are to: + +- Remove the need for processing, queueing, and output protocols to be reimplemented in each input. +- Minimize the number of output connections required in Elastic agent deployments. +- Simplify configuration and performance tuning of Elastic agent data pipelines. +- Make Elastic agent data pipelines more observable and easier to debug. +- Improve on or maintain the performance of the existing Beats outputs. +- Define the event publishing interface all current and future Elastic agent data inputs will use. + +Each output in an agent policy will map to a separate instance of the shipper process: +![Elastic Agent Data Shipper](docs/elastic-agent-shipper-arch.png) + +## Client Development + +Data shipper clients must implement the shipper [gRPC API](https://github.com/elastic/elastic-agent-shipper-client/tree/main/api). +The reference client is the [Beats shipper output](https://github.com/elastic/beats/tree/main/libbeat/outputs/shipper), which is +used by Beats like Filebeat and Metricbeat when they are started by Elastic agent integrations. + +Data shipper support in the Elastic Agent is under active development. The shipper currently depends on a running Elastic agent instance to +start but it will soon be possible to run the shipper in standalone mode for development: https://github.com/elastic/elastic-agent-shipper/issues/83 + +The [reference shipper configuration file](https://github.com/elastic/elastic-agent-shipper/blob/main/elastic-agent-shipper.yml) defines the available +configuration options. + +## Contributing + +The process for contributing to any of the Elastic repositories is similar. + +1. Please make sure you have signed our [Contributor License Agreement](https://www.elastic.co/contributor-agreement/). +We are not asking you to assign copyright to us, but to give us the right to distribute your code +without restriction. We ask this of all contributors in order to assure our users of the origin and +continuing existence of the code. You only need to sign the CLA once. + +2. Send a pull request! Push your changes to your fork of the repository and [submit a pull +request](https://help.github.com/articles/using-pull-requests). New PRs go to the main branch. The +development team will backport your PR to previous release branches if necessary. In the pull request, describe what +your changes do and mention any bugs/issues related to the pull request. + +### Developing +The data shipper is developed in [Go](http://golang.org/) so install the [version](https://github.com/elastic/elastic-agent-shipper/blob/main/.go-version) +which is being used for shipper development. One deterministic manner to install the proper Go version to work with the shipper is to use the +[GVM](https://github.com/andrewkroh/gvm) Go version manager. + +The data shipper primarily uses the [mage](https://magefile.org/) build system. The list of supported mage commands can be obtained +by running `mage -l` from the root of the repository. The most commonly used commands are: + +* `mage build` to build the data shipper binary. +* `mage check` to check license files and dependencies. +* `mage lint` to lint the source code using [golangci-lint](https://golangci-lint.run/). +* `go test ./...` to run all tests. + +Run mage commands with the `-v` flag for more detailed output, for example `mage -v check`. diff --git a/config/config.go b/config/config.go index f58f576..aa7c27d 100644 --- a/config/config.go +++ b/config/config.go @@ -23,7 +23,6 @@ import ( const ( defaultConfigName = "elastic-agent-shipper.yml" - defaultPort = 50051 ) var ( @@ -41,10 +40,6 @@ func init() { //ShipperConfig defines the options present in the config file type ShipperConfig struct { Log logp.Config `config:"logging"` - TLS bool `config:"tls"` - Cert string `config:"cert"` //TLS cert file, if TLS is enabled - Key string `config:"key"` //TLS Keyfile, if specified - Port int `config:"port"` //Port to listen on Monitor monitoring.Config `config:"monitoring"` //Queue monitoring settings Queue queue.Config `config:"queue"` //Queue settings Server server.Config `config:"server"` //gRPC Server settings @@ -65,7 +60,6 @@ func ReadConfig() (ShipperConfig, error) { } // systemd environment will send us to stdout environment, which we want config := ShipperConfig{ - Port: defaultPort, Log: logp.DefaultConfig(logp.SystemdEnvironment), Monitor: monitoring.DefaultConfig(), Queue: queue.DefaultConfig(), @@ -101,7 +95,6 @@ func ReadConfigFromJSON(raw string) (ShipperConfig, error) { return ShipperConfig{}, fmt.Errorf("error parsing string config: %w", err) } shipperConfig := ShipperConfig{ - Port: defaultPort, Log: logp.DefaultConfig(logp.SystemdEnvironment), Monitor: monitoring.DefaultConfig(), Queue: queue.DefaultConfig(), diff --git a/controller/run.go b/controller/run.go index 90de2e8..c94b904 100644 --- a/controller/run.go +++ b/controller/run.go @@ -84,7 +84,7 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { out := output.NewConsole(queue) out.Start() - lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port)) + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Server.Port)) if err != nil { return fmt.Errorf("failed to listen: %w", err) } @@ -97,8 +97,8 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { _ = unit.UpdateState(client.UnitStateConfiguring, "starting shipper server", nil) var opts []grpc.ServerOption - if cfg.TLS { - creds, err := credentials.NewServerTLSFromFile(cfg.Cert, cfg.Key) + if cfg.Server.TLS { + creds, err := credentials.NewServerTLSFromFile(cfg.Server.Cert, cfg.Server.Key) if err != nil { return fmt.Errorf("failed to generate credentials %w", err) } @@ -122,7 +122,7 @@ func (c *clientHandler) Run(cfg config.ShipperConfig, unit *client.Unit) error { shipperServer.Close() } handleShutdown(shutdownFunc, c.shutdownInit) - log.Debugf("gRPC server is listening on port %d", cfg.Port) + log.Debugf("gRPC server is listening on port %d", cfg.Server.Port) _ = unit.UpdateState(client.UnitStateHealthy, "Shipper Running", nil) // This will get sent after the server has shutdown, signaling to the runloop that it can stop. diff --git a/docs/elastic-agent-shipper-arch.png b/docs/elastic-agent-shipper-arch.png new file mode 100644 index 0000000..34bce08 Binary files /dev/null and b/docs/elastic-agent-shipper-arch.png differ diff --git a/elastic-agent-shipper.yml b/elastic-agent-shipper.yml index 3d6c911..f3836dd 100644 --- a/elastic-agent-shipper.yml +++ b/elastic-agent-shipper.yml @@ -1,29 +1,111 @@ -###################### Shipper Configuration Example ######################### +# ========================= Shipper Configuration File ======================== -# This file is an example configuration file highlighting only the most common -# options. +# ================================== General ================================== -# The gRPC port that the shipper will listen on -port: 50052 -tls: false -#cert: # path to TLS cert -#key: # path to TLS keyfile +server: + # StrictMode means that every incoming event will be validated against the list of required + # fields. This introduces some additional overhead but can be really handy for client developers + # on the debugging stage. Normally, it should be disabled during production use and enabled for + # testing. In production it is preferable to send events to the output if at all possible. + strict_mode: false + # The gRPC port that the shipper will listen on. + port: 50052 + # Whether to use TLS for gRPC communication or not. + tls: false + # A path to a TLS certifcate. + # cert: + # A path to a TLS keyfile. + # key: -#log level -logging.level: debug -logging.selectors: ["*"] -logging.to_stderr: true +# ================================== Logging ================================== -queue: - test: #There is no actual "test" queue type, remove this later. - events: 512 +logging: + # Sets log level. The default log level is info. + # Available log levels are: error, warning, info, debug + level: debug + # Enable debug output for selected components. To enable all selectors use ["*"] + # Other available selectors are "beat", "publisher", "service" + # Multiple selectors can be chained. + selectors: ["*"] + # Send all logging output to stderr. The default is false. + logging.to_stderr: true + +# ================================== Monitoring ================================== monitoring: + # Whether metrics reporting is enabled. enabled: true + # Metrics reporting interval. interval: 5s + # Whether to report metrics periodically in the logs. log: true + # exposes metrics using http http: + # enables the http endpoint enabled: true + # The HTTP endpoint will bind to this hostname, IP address, unix socket or named pipe. + # When using IP addresses, it is recommended to only use localhost. host: "localhost" + # Port on which the HTTP endpoint will bind. Default is 0 meaning feature is disabled. port: 8282 + # Name of the reported metrics object name: "queue" + +# ================================== Queue ==================================== + +# Internal queue configuration for buffering events to be published. +#queue: + # Queue type by name (default 'mem') + # The memory queue will present all available events to the output, the moment the output is ready + # to serve another batch of events. + #mem: + # Max number of events the queue can buffer. + #events: 4096 + + # Hints the minimum number of events stored in the queue, + # before providing a batch of events to the outputs. + # The default value is set to 2048. + # A value of 0 ensures events are immediately available + # to be sent to the outputs. + #flush.min_events: 256 + + # Maximum duration after which events are available to the outputs, + # if the number of events stored in the queue is < `flush.min_events`. + #flush.timeout: 5ms + + # The disk queue stores incoming events on disk until the output is + # ready for them. This allows a higher event limit than the memory-only + # queue and lets pending events persist through a restart. + #disk: + # The directory path to store the queue's data. + #path: "path/to/diskqueue" + + # The maximum space the queue should occupy on disk. Depending on + # input settings, events that exceed this limit are delayed (if + # persistent, like a log file) or discarded (if ephemeral, like a + # UDP packet). + #max_size: 10GB + + # The maximum size of a single queue data file. Data in the queue is + # stored in smaller segments that are deleted after all their events + # have been processed. + #segment_size: 1GB + + # The number of events to read from disk to memory while waiting for + # the output to request them. + #read_ahead: 512 + + # The number of events to accept from inputs while waiting for them + # to be written to disk. If event data arrives faster than it + # can be written to disk, this setting prevents it from overflowing + # main memory. + #write_ahead: 2048 + + # The duration to wait before retrying when the queue encounters a disk + # write error. + #retry_interval: 1s + + # The maximum length of time to wait before retrying on a disk write + # error. If the queue encounters repeated errors, it will double the + # length of its retry interval each time, up to this maximum. + #max_retry_interval: 30s diff --git a/queue/config.go b/queue/config.go index 39e2e60..c909d50 100644 --- a/queue/config.go +++ b/queue/config.go @@ -13,16 +13,18 @@ import ( ) type Config struct { - MemSettings *memqueue.Settings `config:"memqueue"` - DiskSettings *diskqueue.Settings `config:"diskqueue"` + MemSettings *memqueue.Settings `config:"mem"` + DiskSettings *diskqueue.Settings `config:"disk"` } func DefaultConfig() Config { + // Use the same default memory queue configuration that Beats does: + // https://github.com/elastic/beats/blob/7449e5c4b944c661299de8099d5423bafd458ee2/libbeat/publisher/queue/memqueue/config.go#L32 return Config{ MemSettings: &memqueue.Settings{ - Events: 1024, - FlushMinEvents: 256, - FlushTimeout: 5 * time.Millisecond, + Events: 4096, + FlushMinEvents: 2048, + FlushTimeout: 1 * time.Second, }, //memqueue should have a DefaultSettings() DiskSettings: nil, } diff --git a/server/config.go b/server/config.go index 9f1c777..59cb488 100644 --- a/server/config.go +++ b/server/config.go @@ -4,6 +4,10 @@ package server +const ( + defaultPort = 50051 +) + type Config struct { // StrictMode means that every incoming event will be validated against the // list of required fields. This introduces some additional overhead but can @@ -11,11 +15,21 @@ type Config struct { // Normally, it should be disabled during production use and enabled for testing. // In production it is preferable to send events to the output if at all possible. StrictMode bool `config:"strict_mode"` + // Whether to use TLS for the gRPC connection. + TLS bool `config:"tls"` + // TLS cert file, if TLS is enabled + Cert string `config:"cert"` + // TLS Keyfile, if specified + Key string `config:"key"` + // Port to listen on + Port int `config:"port"` } // DefaultConfig returns default configuration for the gRPC server func DefaultConfig() Config { return Config{ StrictMode: false, + TLS: false, + Port: defaultPort, } }