diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c06066ef3aa..faebba02aaa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -175,6 +175,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054] - Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974] - Output errors when Kibana index pattern setup fails. {pull}20121[20121] +- Fix issue in autodiscover that kept inputs stopped after config updates. {pull}20305[20305] *Auditbeat* diff --git a/filebeat/input/errors.go b/filebeat/input/errors.go new file mode 100644 index 00000000000..098156abf91 --- /dev/null +++ b/filebeat/input/errors.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input + +import ( + "fmt" +) + +// ErrInputNotFinished struct for reporting errors related to not finished inputs +type ErrInputNotFinished struct { + State string +} + +// Error method of ErrInputNotFinished +func (e *ErrInputNotFinished) Error() string { + return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.State) +} diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 18fcfee583b..560daf8a7bc 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -18,6 +18,7 @@ package file import ( + "fmt" "os" "time" @@ -66,3 +67,19 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin func (s *State) IsEqual(c *State) bool { return s.Id == c.Id } + +// String returns string representation of the struct +func (s *State) String() string { + return fmt.Sprintf( + "{Id: %v, Finished: %v, Fileinfo: %v, Source: %v, Offset: %v, Timestamp: %v, TTL: %v, Type: %v, Meta: %v, FileStateOS: %v}", + s.Id, + s.Finished, + s.Fileinfo, + s.Source, + s.Offset, + s.Timestamp, + s.TTL, + s.Type, + s.Meta, + s.FileStateOS) +} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index d7bd4f18017..a1837bbd471 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -175,7 +175,7 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) + return &input.ErrInputNotFinished{State: state.String()} } // Convert state to current identifier if different diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index 179057c4373..afd67d4e08a 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -59,5 +59,9 @@ func (r *RunnerFactory) Create( func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { _, err := r.Create(pipeline.NewNilPipeline(), cfg) + if _, ok := err.(*ErrInputNotFinished); ok { + // error is related to state, and hence config can be considered valid + return nil + } return err } diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 39df134b809..033146a84d4 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -160,6 +160,7 @@ func (p *pod) OnUpdate(obj interface{}) { } } time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) + return } p.logger.Debugf("Watcher Pod update: %+v", obj)