diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 5e74eb7965571..bee57a1a232fb 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -23,6 +23,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" + _ "github.com/influxdata/telegraf/plugins/outputs/rotatingfile" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" ) diff --git a/plugins/outputs/rotatingfile/README.md b/plugins/outputs/rotatingfile/README.md new file mode 100644 index 0000000000000..4b08b56563216 --- /dev/null +++ b/plugins/outputs/rotatingfile/README.md @@ -0,0 +1,17 @@ +# rotatingfile Output Plugin +This plugin works exactly the same as the file output plugin, but the file is rotated. This practical if you for example use something that grabs those files and moves them across a network boundary or similar. + +# Configuration +``` + [[outputs.rotating_file]] + ## Files to write to, "stdout" is a specially handled file. + root = "/tmp" + filename_prefix = "metrics" + max_age = "1m" + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.m + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +``` diff --git a/plugins/outputs/rotatingfile/rotate.go b/plugins/outputs/rotatingfile/rotate.go new file mode 100644 index 0000000000000..9fae40d00ae41 --- /dev/null +++ b/plugins/outputs/rotatingfile/rotate.go @@ -0,0 +1,123 @@ +package rotatingfile + +// Rotating things +import ( + "errors" + "fmt" + "os" + "path" + "sync" + "time" +) + +// RootPerm defines the permissions that Writer will use if it +// needs to create the root directory. +var RootPerm = os.FileMode(0755) + +// FilePerm defines the permissions that Writer will use for all +// the files it creates. +var FilePerm = os.FileMode(0644) + +// Writer implements the io.Writer interface and writes to the +// "current" file in the root directory. When current file age +// exceeds max, it is renamed and a new file is created. +type Writer struct { + root string + prefix string + current *os.File + expireTime time.Time + max time.Duration + sync.Mutex +} + +// New creates a new Writer. The files will be created in the +// root directory. root will be created if necessary. The +// filenames will start with prefix. +func NewRotatingWriter(root, prefix string, maxAgeInput string) (*Writer, error) { + maxAge, err := time.ParseDuration(maxAgeInput) + if err != nil { + return nil, err + } + l := &Writer{root: root, prefix: prefix, max: maxAge} + if err := l.setup(); err != nil { + return nil, err + } + return l, nil +} + +// Write writes p to the current file, then checks to see if +// rotation is necessary. +func (r *Writer) Write(p []byte) (n int, err error) { + r.Lock() + defer r.Unlock() + n, err = r.current.Write(p) + if err != nil { + return n, err + } + if time.Now().After(r.expireTime) { + if err := r.rotate(); err != nil { + return n, err + } + } + return n, nil +} + +// Close closes the current file. Writer is unusable after this +// is called. +func (r *Writer) Close() error { + r.Lock() + defer r.Unlock() + + // Rotate before closing + if err := r.rotate(); err != nil { + return err + } + + if err := r.current.Close(); err != nil { + return err + } + r.current = nil + return nil +} + +// setup creates the root directory if necessary, then opens the +// current file. +func (r *Writer) setup() error { + fi, err := os.Stat(r.root) + if err != nil && os.IsNotExist(err) { + err := os.MkdirAll(r.root, RootPerm) + if err != nil { + return err + } + } else if err != nil { + return err + } else if !fi.IsDir() { + return errors.New("root must be a directory") + } + + // root exists, and it is a directory + + return r.openCurrent() +} + +func (r *Writer) openCurrent() error { + cp := path.Join(r.root, fmt.Sprintf("%s-current", r.prefix)) // It should be safe to use Sprintf here since path.Join() uses path.Clean() on the path afterwards + var err error + r.current, err = os.OpenFile(cp, os.O_RDWR|os.O_CREATE|os.O_APPEND, FilePerm) + r.expireTime = time.Now().Add(r.max) + if err != nil { + return err + } + return nil +} + +func (r *Writer) rotate() error { + if err := r.current.Close(); err != nil { + return err + } + filename := fmt.Sprintf("%s-%d", r.prefix, time.Now().UnixNano()) // UnixNano should be unique enough for this (up until a point) + if err := os.Rename(path.Join(r.root, fmt.Sprintf("%s-current", r.prefix)), path.Join(r.root, filename)); err != nil { + return err + } + return r.openCurrent() +} diff --git a/plugins/outputs/rotatingfile/rotatingfile.go b/plugins/outputs/rotatingfile/rotatingfile.go new file mode 100644 index 0000000000000..9336add048b4c --- /dev/null +++ b/plugins/outputs/rotatingfile/rotatingfile.go @@ -0,0 +1,85 @@ +package rotatingfile + +import ( + "errors" + "fmt" + "io" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +type File struct { + Root string + FilenamePrefix string + MaxAge string + + writer io.WriteCloser + serializer serializers.Serializer +} + +var sampleConfig = ` + ## Path to write files into. + root = "/tmp" + filename_prefix = "metrics" + max_age = "1m" + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + +func (f *File) SetSerializer(serializer serializers.Serializer) { + f.serializer = serializer +} + +func (f *File) Connect() error { + if len(f.Root) == 0 { + return errors.New("we need a root path") + } + + var err error + f.writer, err = NewRotatingWriter(f.Root, f.FilenamePrefix, f.MaxAge) + if err != nil { + return err + } + return nil +} + +func (f *File) Close() error { + return f.writer.Close() +} + +func (f *File) SampleConfig() string { + return sampleConfig +} + +func (f *File) Description() string { + return "Send telegraf metrics to a rotating file" +} +func (f *File) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + b, err := f.serializer.Serialize(metric) + if err != nil { + return fmt.Errorf("failed to serialize message: %s", err) + } + _, err = f.writer.Write(b) + if err != nil { + return fmt.Errorf("failed to write message: %s, %s", b, err) + } + } + return nil +} + +func init() { + outputs.Add("rotating_file", func() telegraf.Output { + return &File{} + }) +}