Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fields and Tag to monitor config #4141

Merged
merged 1 commit into from
Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ https://github.com/elastic/beats/compare/v5.2.2...v5.3.0[View commits]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Add multiline.flush_pattern option, for specifying the 'end' of a multiline pattern {pull}4019[4019]

*Heartbeat*

- Add `tags`, `fields` and `fields_under_root` in monitors configuration. {pull}3623[3623]

*Metricbeat*

Expand Down
20 changes: 18 additions & 2 deletions heartbeat/_meta/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ heartbeat.monitors:
# Waiting duration until another ICMP Echo Request is emitted.
wait: 1s

# The tags of the monitors are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# monitor output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# If this option is set to true, the custom fields are stored as top-level
# fields in the output document instead of being grouped under a fields
# sub-dictionary. Default is false.
#fields_under_root: false

- type: tcp # monitor type `tcp`. Connect via TCP and optionally verify endpoint
# by sending/receiving a custom payload

Expand All @@ -64,8 +80,8 @@ heartbeat.monitors:
# Requires ports configs to be checked. If ssl is configured,
# a SSL/TLS based connection will be established. Otherwise plain tcp connection
# will be established
# - hostname + port like `localhost:12345`:
# Connect to port on given host. If ssl is configured,
# - hostname + port like `localhost:12345`:
# Connect to port on given host. If ssl is configured,
# a SSL/TLS based connection will be established. Otherwise plain tcp connection
# will be established
# - full url syntax. `scheme://<host>:[port]`. The `<scheme>` can be one of
Expand Down
66 changes: 34 additions & 32 deletions heartbeat/beater/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@ type Monitor struct {
factory monitors.Factory
config *common.Config

active map[string]MonitorTask
active map[string]monitorTask
}

type MonitorTask struct {
name, typ string
type monitorTask struct {
job monitors.Job
cancel JobCanceller

job monitors.Job
schedule scheduler.Schedule
cancel JobCanceller
config monitorTaskConfig
}

type monitorTaskConfig struct {
Name string `config:"name"`
Type string `config:"type"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`
// Fields and tags to add to monitor.
EventMetadata common.EventMetadata `config:",inline"`
}

type JobControl interface {
Expand Down Expand Up @@ -109,7 +116,7 @@ func newMonitorManager(
name: info.Name,
factory: factory,
config: config,
active: map[string]MonitorTask{},
active: map[string]monitorTask{},
})
}

Expand Down Expand Up @@ -146,19 +153,15 @@ func newMonitorManager(
}

func (m *Monitor) Update(configs []*common.Config) error {
all := map[string]MonitorTask{}
all := map[string]monitorTask{}
for i, upd := range configs {
config, err := common.MergeConfigs(m.config, upd)
if err != nil {
logp.Err("Failed merging monitor config with updates: %v", err)
return err
}

shared := struct {
Name string `config:"name"`
Type string `config:"type"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`
}{}
shared := monitorTaskConfig{}
if err := config.Unpack(&shared); err != nil {
logp.Err("Failed parsing job schedule: ", err)
return err
Expand All @@ -170,17 +173,10 @@ func (m *Monitor) Update(configs []*common.Config) error {
return err
}

name := shared.Name
if name == "" {
name = shared.Type
}

for _, job := range jobs {
all[job.Name()] = MonitorTask{
name: name,
typ: shared.Type,
job: job,
schedule: shared.Schedule,
all[job.Name()] = monitorTask{
job: job,
config: shared,
}
}
}
Expand All @@ -189,12 +185,12 @@ func (m *Monitor) Update(configs []*common.Config) error {
for _, job := range m.active {
job.cancel()
}
m.active = map[string]MonitorTask{}
m.active = map[string]monitorTask{}

// start new and reconfigured tasks
for id, t := range all {
job := createJob(m.manager.client, t.job, t.name, t.typ)
t.cancel = m.manager.jobControl.Add(t.schedule, id, job)
job := createJob(m.manager.client, t)
t.cancel = m.manager.jobControl.Add(t.config.Schedule, id, job)
m.active[id] = t
}

Expand Down Expand Up @@ -234,13 +230,17 @@ func createWatchUpdater(monitor *Monitor) func(content []byte) {
}
}

func createJob(client publisher.Client, r monitors.Job, name, typ string) scheduler.TaskFunc {
return createJobTask(client, r, name, typ)
func createJob(client publisher.Client, task monitorTask) scheduler.TaskFunc {
return task.prepareSchedulerJob(client, task.job)
}

func createJobTask(client publisher.Client, r monitors.TaskRunner, name, typ string) scheduler.TaskFunc {
func (m *monitorTask) prepareSchedulerJob(client publisher.Client, r monitors.TaskRunner) scheduler.TaskFunc {
name := m.config.Name
if name == "" {
name = m.config.Name
}
return func() []scheduler.TaskFunc {
event, next, err := r.Run()
event, next, err := m.job.Run()
if err != nil {
logp.Err("Job %v failed with: ", err)
}
Expand All @@ -249,13 +249,15 @@ func createJobTask(client publisher.Client, r monitors.TaskRunner, name, typ str
event.DeepUpdate(common.MapStr{
"monitor": common.MapStr{
"name": name,
"type": typ,
"type": m.config.Type,
},
})

if _, exists := event["type"]; !exists {
event["type"] = defaultEventType
}
common.MergeFields(event, m.config.EventMetadata.Fields, m.config.EventMetadata.FieldsUnderRoot)
common.AddTags(event, m.config.EventMetadata.Tags)
client.PublishEvent(event)
}

Expand All @@ -265,7 +267,7 @@ func createJobTask(client publisher.Client, r monitors.TaskRunner, name, typ str

cont := make([]scheduler.TaskFunc, len(next))
for i, n := range next {
cont[i] = createJobTask(client, n, name, typ)
cont[i] = m.prepareSchedulerJob(client, n)
}
return cont
}
Expand Down
20 changes: 18 additions & 2 deletions heartbeat/heartbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ heartbeat.monitors:
# Waiting duration until another ICMP Echo Request is emitted.
wait: 1s

# The tags of the monitors are included in their own field with each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be added to heartbeat/_meta/beat.full.yml and then you have to run make update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'm reverting those modifications.

# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# monitor output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# If this option is set to true, the custom fields are stored as top-level
# fields in the output document instead of being grouped under a fields
# sub-dictionary. Default is false.
#fields_under_root: false

- type: tcp # monitor type `tcp`. Connect via TCP and optionally verify endpoint
# by sending/receiving a custom payload

Expand All @@ -64,8 +80,8 @@ heartbeat.monitors:
# Requires ports configs to be checked. If ssl is configured,
# a SSL/TLS based connection will be established. Otherwise plain tcp connection
# will be established
# - hostname + port like `localhost:12345`:
# Connect to port on given host. If ssl is configured,
# - hostname + port like `localhost:12345`:
# Connect to port on given host. If ssl is configured,
# a SSL/TLS based connection will be established. Otherwise plain tcp connection
# will be established
# - full url syntax. `scheme://<host>:[port]`. The `<scheme>` can be one of
Expand Down
10 changes: 7 additions & 3 deletions heartbeat/tests/system/config/heartbeat.yml.j2
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
heartbeat.monitors:
- type: icmp
hosts: ["localhost"]
schedule: '@every 10s'
- type: http
urls: ["http://localhost:9200"]
schedule: '@every 1s'
tags: ["http_monitor_tags"]
fields_under_root: true
fields:
hello: world

output.file:
path: {{ output_file_path|default(beat.working_dir + "/output") }}
Expand Down
17 changes: 17 additions & 0 deletions heartbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,20 @@ def test_base(self):
heartbeat_proc = self.start_beat()
self.wait_until(lambda: self.log_contains("heartbeat is running"))
heartbeat_proc.check_kill_and_wait()

def test_monitor_config(self):
"""
Basic test with fields and tags in monitor
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/*"
)

heartbeat_proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0)
heartbeat_proc.check_kill_and_wait()

doc = self.read_output()[0]
assert doc["hello"] == "world"
assert doc["tags"] == ["http_monitor_tags"]
assert "fields" not in doc