Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update /loki/api/v1/push to use the v1 json format #1145

Merged
merged 6 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,12 @@ JSON post body can be sent in the following format:
{
"streams": [
{
"labels": "<LogQL label key-value pairs>",
"entries": [
{
"ts": "<RFC3339Nano string>",
"line": "<log line>"
}
"stream": {
"label": "value"
},
"values": [
[ "<unix epoch in nanoseconds>", "<log line>" ],
[ "<unix epoch in nanoseconds>", "<log line>" ]
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
]
}
]
Expand All @@ -482,8 +482,8 @@ In microservices mode, `/loki/api/v1/push` is exposed by the distributor.
### Examples

```bash
$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/loki/api/v1/push" --data-raw \
'{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}'
$ curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:3100/loki/api/v1/push" --data-raw \
'{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}'
```

## `GET /api/prom/tail`
Expand Down
2 changes: 1 addition & 1 deletion fluentd/fluent-plugin-grafana-loki/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ services:
## Configuration

### url
The url of the Loki server to send logs to. When sending data the publish path (`/loki/api/v1/push`) will automatically be appended.
The url of the Loki server to send logs to. When sending data the publish path (`/api/prom/push`) will automatically be appended.
By default the url is set to `https://logs-us-west1.grafana.net`, the url of the Grafana Labs preview (hosted Loki)[https://grafana.com/loki] service.

#### Proxy Support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.1.0'
spec.version = '1.0.2'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def write(chunk)
body = { 'streams': payload }

# add ingest path to loki url
uri = URI.parse(url + '/loki/api/v1/push')
uri = URI.parse(url + '/api/prom/push')

req = Net::HTTP::Post.new(
uri.request_uri
Expand Down
14 changes: 12 additions & 2 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package distributor

import (
"encoding/json"
"net/http"

"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
)

var contentType = http.CanonicalHeaderKey("Content-Type")
Expand All @@ -21,7 +23,15 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

switch r.Header.Get(contentType) {
case applicationJSON:
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
}

if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type QueryResponse struct {
Data QueryResponseData `json:"data"`
}

// PushRequest models a log stream push
type PushRequest struct {
Streams []*Stream `json:"streams"`
}

// ResultType holds the type of the result
type ResultType string

Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/unmarshal/legacy/unmarshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package unmarshal

import (
"encoding/json"
"io"

"github.com/grafana/loki/pkg/logproto"
)

// DecodePushRequest directly decodes json to a logproto.PushRequest
func DecodePushRequest(b io.ReadCloser, r *logproto.PushRequest) error {
return json.NewDecoder(b).Decode(r)
}
67 changes: 67 additions & 0 deletions pkg/logql/unmarshal/legacy/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package unmarshal

import (
"io/ioutil"
"log"
"strings"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

// covers requests to /api/prom/push
var pushTests = []struct {
expected []*logproto.Stream
actual string
}{
{
[]*logproto.Stream{
&logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"streams":[
{
"labels":"{test=\"test\"}",
"entries":[
{
"ts": "2019-09-13T18:32:22.380001319Z",
"line": "super line"
}
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))

err := DecodePushRequest(closer, &actual)
require.NoError(t, err)

require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}

func mustParse(l string, t string) time.Time {
ret, err := time.Parse(l, t)
if err != nil {
log.Fatalf("Failed to parse %s", t)
}

return ret
}
60 changes: 60 additions & 0 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package unmarshal

import (
"encoding/json"
"io"

"github.com/grafana/loki/pkg/loghttp"

"github.com/grafana/loki/pkg/logproto"
)

// DecodePushRequest directly decodes json to a logproto.PushRequest
func DecodePushRequest(b io.ReadCloser, r *logproto.PushRequest) error {
var request loghttp.PushRequest

err := json.NewDecoder(b).Decode(&request)

if err != nil {
return err
}

*r = NewPushRequest(request)

return nil
}

// NewPushRequest constructs a logproto.PushRequest from a PushRequest
func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest {
ret := logproto.PushRequest{
Streams: make([]*logproto.Stream, len(r.Streams)),
}

for i, s := range r.Streams {
ret.Streams[i] = NewStream(s)
}

return ret
}

// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) *logproto.Stream {
ret := &logproto.Stream{
Entries: make([]logproto.Entry, len(s.Entries)),
Labels: s.Labels.String(),
}

for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
}

return ret
}

// NewEntry constructs a logproto.Entry from a Entry
func NewEntry(e loghttp.Entry) logproto.Entry {
return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
}
56 changes: 56 additions & 0 deletions pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package unmarshal

import (
"io/ioutil"
"strings"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

// covers requests to /loki/api/v1/push
var pushTests = []struct {
expected []*logproto.Stream
actual string
}{
{
[]*logproto.Stream{
&logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"streams": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))

err := DecodePushRequest(closer, &actual)
require.NoError(t, err)

require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}