Skip to content

Commit

Permalink
--dry-run Promtail. (#1652)
Browse files Browse the repository at this point in the history
* --dry-run Promtail.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Remove changed on gomod

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes flaky test

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Ensure position file is readonly when doing dry-run

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add information about dry run mode in the troubleshooting section.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more spaces.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes journaltarget refactor.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes journaltarget refactor.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes journaltarget refactor.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Feb 13, 2020
1 parent 636ed15 commit 4210fce
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 49 deletions.
3 changes: 2 additions & 1 deletion cmd/promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func init() {

func main() {
printVersion := flag.Bool("version", false, "Print this builds version information")
dryRun := flag.Bool("dry-run", false, "Start Promtail but print entries instead of sending them to Loki.")

// Load config, merging config file and CLI flags
var config config.Config
Expand Down Expand Up @@ -57,7 +58,7 @@ func main() {
stages.Debug = true
}

p, err := promtail.New(config)
p, err := promtail.New(config, *dryRun)
if err != nil {
level.Error(util.Logger).Log("msg", "error creating promtail", "error", err)
os.Exit(1)
Expand Down
13 changes: 13 additions & 0 deletions docs/clients/promtail/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
This document describes known failure modes of `promtail` on edge cases and the
adopted trade-offs.

## Dry running

Promtail can be configured to print log stream entries instead of sending them to Loki.
This can be used in combination with [piping data](#pipe-data-to-promtail) to debug or troubleshoot promtail log parsing.

In dry run mode, Promtail still support reading from a [positions](configuration.md#position_config) file however no update will be made to the targeted file, this is to ensure you can easily retry the same set of lines.

To start Promtail in dry run mode use the flag `--dry-run` as shown in the example below:

```bash
cat my.log | promtail --dry-run --client.url http://127.0.0.1:3100/loki/api/v1/push
```

## Pipe data to Promtail

Promtail supports piping data for sending logs to Loki. This is a very useful way to troubleshooting your configuration.
Expand Down
56 changes: 56 additions & 0 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package client

import (
"fmt"
"os"
"sync"
"text/tabwriter"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/fatih/color"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
)

type logger struct {
*tabwriter.Writer
sync.Mutex
}

// NewLogger creates a new client logger that logs entries instead of sending them.
func NewLogger(cfgs ...Config) (Client, error) {
// make sure the clients config is valid
c, err := NewMulti(util.Logger, cfgs...)
if err != nil {
return nil, err
}
c.Stop()
fmt.Println(color.YellowString("Clients configured:"))
for _, cfg := range cfgs {
yaml, err := yaml.Marshal(cfg)
if err != nil {
return nil, err
}
fmt.Println("----------------------")
fmt.Println(string(yaml))
}
return &logger{
Writer: tabwriter.NewWriter(os.Stdout, 0, 8, 0, '\t', 0),
}, nil
}

func (*logger) Stop() {}

func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error {
l.Lock()
defer l.Unlock()
fmt.Fprint(l.Writer, color.BlueString(time.Format("2006-01-02T15:04:05")))
fmt.Fprint(l.Writer, "\t")
fmt.Fprint(l.Writer, color.YellowString(labels.String()))
fmt.Fprint(l.Writer, "\t")
fmt.Fprint(l.Writer, entry)
fmt.Fprint(l.Writer, "\n")
l.Flush()
return nil
}
21 changes: 21 additions & 0 deletions pkg/promtail/client/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import (
"net/url"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func TestNewLogger(t *testing.T) {
_, err := NewLogger([]Config{}...)
require.Error(t, err)

l, err := NewLogger([]Config{{URL: flagext.URLValue{URL: &url.URL{Host: "string"}}}}...)
require.NoError(t, err)
err = l.Handle(model.LabelSet{"foo": "bar"}, time.Now(), "entry")
require.NoError(t, err)
}
72 changes: 43 additions & 29 deletions pkg/promtail/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
IgnoreInvalidYaml bool `yaml:"ignore_invalid_yaml"`
ReadOnly bool `yaml:"-"`
}

// RegisterFlags register flags.
Expand All @@ -33,7 +34,7 @@ func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
}

// Positions tracks how far through each file we've read.
type Positions struct {
type positions struct {
logger log.Logger
cfg Config
mtx sync.Mutex
Expand All @@ -47,17 +48,40 @@ type File struct {
Positions map[string]string `yaml:"positions"`
}

type Positions interface {
// GetString returns how far we've through a file as a string.
// JournalTarget writes a journal cursor to the positions file, while
// FileTarget writes an integer offset. Use Get to read the integer
// offset.
GetString(path string) string
// Get returns how far we've read through a file. Returns an error
// if the value stored for the file is not an integer.
Get(path string) (int64, error)
// PutString records (asynchronsouly) how far we've read through a file.
// Unlike Put, it records a string offset and is only useful for
// JournalTargets which doesn't have integer offsets.
PutString(path string, pos string)
// Put records (asynchronously) how far we've read through a file.
Put(path string, pos int64)
// Remove removes the position tracking for a filepath
Remove(path string)
// SyncPeriod returns how often the positions file gets resynced
SyncPeriod() time.Duration
// Stop the Position tracker.
Stop()
}

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (*Positions, error) {
positions, err := readPositionsFile(cfg, logger)
func New(logger log.Logger, cfg Config) (Positions, error) {
positionData, err := readPositionsFile(cfg, logger)
if err != nil {
return nil, err
}

p := &Positions{
p := &positions{
logger: logger,
cfg: cfg,
positions: positions,
positions: positionData,
quit: make(chan struct{}),
done: make(chan struct{}),
}
Expand All @@ -66,39 +90,28 @@ func New(logger log.Logger, cfg Config) (*Positions, error) {
return p, nil
}

// Stop the Position tracker.
func (p *Positions) Stop() {
func (p *positions) Stop() {
close(p.quit)
<-p.done
}

// PutString records (asynchronsouly) how far we've read through a file.
// Unlike Put, it records a string offset and is only useful for
// JournalTargets which doesn't have integer offsets.
func (p *Positions) PutString(path string, pos string) {
func (p *positions) PutString(path string, pos string) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.positions[path] = pos
}

// Put records (asynchronously) how far we've read through a file.
func (p *Positions) Put(path string, pos int64) {
func (p *positions) Put(path string, pos int64) {
p.PutString(path, strconv.FormatInt(pos, 10))
}

// GetString returns how far we've through a file as a string.
// JournalTarget writes a journal cursor to the positions file, while
// FileTarget writes an integer offset. Use Get to read the integer
// offset.
func (p *Positions) GetString(path string) string {
func (p *positions) GetString(path string) string {
p.mtx.Lock()
defer p.mtx.Unlock()
return p.positions[path]
}

// Get returns how far we've read through a file. Returns an error
// if the value stored for the file is not an integer.
func (p *Positions) Get(path string) (int64, error) {
func (p *positions) Get(path string) (int64, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
pos, ok := p.positions[path]
Expand All @@ -108,23 +121,21 @@ func (p *Positions) Get(path string) (int64, error) {
return strconv.ParseInt(pos, 10, 64)
}

// Remove removes the position tracking for a filepath
func (p *Positions) Remove(path string) {
func (p *positions) Remove(path string) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.remove(path)
}

func (p *Positions) remove(path string) {
func (p *positions) remove(path string) {
delete(p.positions, path)
}

// SyncPeriod returns how often the positions file gets resynced
func (p *Positions) SyncPeriod() time.Duration {
func (p *positions) SyncPeriod() time.Duration {
return p.cfg.SyncPeriod
}

func (p *Positions) run() {
func (p *positions) run() {
defer func() {
p.save()
level.Debug(p.logger).Log("msg", "positions saved")
Expand All @@ -143,7 +154,10 @@ func (p *Positions) run() {
}
}

func (p *Positions) save() {
func (p *positions) save() {
if p.cfg.ReadOnly {
return
}
p.mtx.Lock()
positions := make(map[string]string, len(p.positions))
for k, v := range p.positions {
Expand All @@ -156,7 +170,7 @@ func (p *Positions) save() {
}
}

func (p *Positions) cleanup() {
func (p *positions) cleanup() {
p.mtx.Lock()
defer p.mtx.Unlock()
toRemove := []string{}
Expand Down
44 changes: 44 additions & 0 deletions pkg/promtail/positions/positions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -136,3 +138,45 @@ func TestReadPositionsFromBadYamlIgnoreCorruption(t *testing.T) {
require.NoError(t, err)
require.Equal(t, map[string]string{}, out)
}

func Test_ReadOnly(t *testing.T) {
temp := tempFilename(t)
defer func() {
_ = os.Remove(temp)
}()
yaml := []byte(`positions:
/tmp/random.log: "17623"
`)
err := ioutil.WriteFile(temp, yaml, 0644)
if err != nil {
t.Fatal(err)
}
p, err := New(util.Logger, Config{
SyncPeriod: 20 * time.Nanosecond,
PositionsFile: temp,
ReadOnly: true,
})
if err != nil {
t.Fatal(err)
}
defer p.Stop()
p.Put("/foo/bar/f", 12132132)
p.PutString("/foo/f", "100")
pos, err := p.Get("/tmp/random.log")
if err != nil {
t.Fatal(err)
}
require.Equal(t, int64(17623), pos)
p.(*positions).save()
out, err := readPositionsFile(Config{
PositionsFile: temp,
IgnoreInvalidYaml: true,
ReadOnly: true,
}, log.NewNopLogger())

require.NoError(t, err)
require.Equal(t, map[string]string{
"/tmp/random.log": "17623",
}, out)

}
22 changes: 16 additions & 6 deletions pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,33 @@ type Promtail struct {
}

// New makes a new Promtail.
func New(cfg config.Config) (*Promtail, error) {
func New(cfg config.Config, dryRun bool) (*Promtail, error) {

if cfg.ClientConfig.URL.URL != nil {
// if a single client config is used we add it to the multiple client config for backward compatibility
cfg.ClientConfigs = append(cfg.ClientConfigs, cfg.ClientConfig)
}

client, err := client.NewMulti(util.Logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
var err error
var cl client.Client
if dryRun {
cl, err = client.NewLogger(cfg.ClientConfigs...)
if err != nil {
return nil, err
}
cfg.PositionsConfig.ReadOnly = true
} else {
cl, err = client.NewMulti(util.Logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
}
}

promtail := &Promtail{
client: client,
client: cl,
}

tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, client, cfg.ScrapeConfig, &cfg.TargetConfig)
tms, err := targets.NewTargetManagers(promtail, util.Logger, cfg.PositionsConfig, cl, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4210fce

Please sign in to comment.