Skip to content

Commit

Permalink
feat(loki): extended tailing (#764)
Browse files Browse the repository at this point in the history
* refactor(querier/ingester): TailRequest Lookback window

Moves the specifications of the Lookback Window out of the
logproto.QueryRequest into it's own type logproto.Lookback.
This is required, because the Lookback Window will be used in the TailRequest
as well.

* feat(querier): parse Lookback from HTTP Request

* feat(logcli): send Lookback Window spec with tail request

* feat(querier): include historic entries in tail mode

Extends tailing by sending a configurable amount of historic entries with
before the live entries. This enables a behaviour that is closer to kubectl logs
-f and docker logs -f.

It is implemented by running a regular Query before subscribing to the ingesters.

* fix: adapt tests to Lookback change

* feat(querier): check all errors to make the linter happy

* fix(logproto): flatten Lookback window spec

Flattens the Lookback window spec into the individual queries

* fix(ingester): adapt test to Lookback flatten
  • Loading branch information
sh0rez authored Jul 19, 2019
1 parent da6a133 commit 38cb093
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 119 deletions.
20 changes: 16 additions & 4 deletions cmd/logcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ const (
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s&regexp=%s"
labelsPath = "/api/prom/label"
labelValuesPath = "/api/prom/label/%s/values"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d"
tailPath = "/api/prom/tail?query=%s&regexp=%s&delay_for=%d&limit=%d&start=%d"
)

func query(from, through time.Time, direction logproto.Direction) (*logproto.QueryResponse, error) {
path := fmt.Sprintf(queryPath, url.QueryEscape(*queryStr), *limit, from.UnixNano(),
through.UnixNano(), direction.String(), url.QueryEscape(*regexpStr))
path := fmt.Sprintf(queryPath,
url.QueryEscape(*queryStr), // query
*limit, // limit
from.UnixNano(), // start
through.UnixNano(), // end
direction.String(), // direction
url.QueryEscape(*regexpStr), // regexp
)

var resp logproto.QueryResponse
if err := doRequest(path, &resp); err != nil {
Expand Down Expand Up @@ -105,7 +111,13 @@ func doRequest(path string, out interface{}) error {
}

func liveTailQueryConn() (*websocket.Conn, error) {
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr), *delayFor)
path := fmt.Sprintf(tailPath,
url.QueryEscape(*queryStr), // query
url.QueryEscape(*regexpStr), // regexp
*delayFor, // delay_for
*limit, // limit
getStart(time.Now()).UnixNano(), // start
)
return wsConnect(path)
}

Expand Down
21 changes: 13 additions & 8 deletions cmd/logcli/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ import (
"github.com/grafana/loki/pkg/logproto"
)

func getStart(end time.Time) time.Time {
start := end.Add(-*since)
if *from != "" {
var err error
start, err = time.Parse(time.RFC3339Nano, *from)
if err != nil {
log.Fatalf("error parsing date '%s': %s", *from, err)
}
}
return start
}

func doQuery() {
if *tail {
tailQuery()
Expand All @@ -24,14 +36,7 @@ func doQuery() {
)

end := time.Now()
start := end.Add(-*since)
if *from != "" {
var err error
start, err = time.Parse(time.RFC3339Nano, *from)
if err != nil {
log.Fatalf("error parsing --from date '%s': %s", *from, err)
}
}
start := getStart(end)

if *to != "" {
var err error
Expand Down
Loading

0 comments on commit 38cb093

Please sign in to comment.