diff --git a/docs/api.md b/docs/api.md index 95753fe8e3d10..4c90d7a394760 100644 --- a/docs/api.md +++ b/docs/api.md @@ -14,6 +14,7 @@ The HTTP API includes the following endpoints: - [`POST /loki/api/v1/push`](#post-lokiapiv1push) - [`GET /api/prom/tail`](#get-apipromtail) - [`GET /api/prom/query`](#get-apipromquery) +- [`POST /api/prom/push`](#post-apiprompush) - [`GET /ready`](#get-ready) - [`POST /flush`](#post-flush) - [`GET /metrics`](#get-metrics) @@ -445,8 +446,6 @@ Response (streamed): ## `POST /loki/api/v1/push` -Alias (DEPRECATED): `POST /api/prom/push` - `/loki/api/v1/push` is the endpoint used to send log entries to Loki. The default behavior is for the POST body to be a snappy-compressed protobuf messsage: @@ -460,12 +459,12 @@ JSON post body can be sent in the following format: { "streams": [ { - "labels": "", - "entries": [ - { - "ts": "", - "line": "" - } + "stream": { + "label": "value" + }, + "values": [ + [ "", "" ], + [ "", "" ] ] } ] @@ -482,8 +481,8 @@ In microservices mode, `/loki/api/v1/push` is exposed by the distributor. ### Examples ```bash -$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/loki/api/v1/push" --data-raw \ - '{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}' +$ curl -v -H "Content-Type: application/json" -XPOST -s "http://localhost:3100/loki/api/v1/push" --data-raw \ + '{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}' ``` ## `GET /api/prom/tail` @@ -535,8 +534,6 @@ and `Labels` instead of `labels` and `ts` like in the entries for the stream. As the response is streamed, the object defined by the response format above will be sent over the WebSocket multiple times. - - ## `GET /api/prom/query` > **WARNING**: `/api/prom/query` is DEPRECATED; use `/loki/api/v1/query_range` @@ -607,6 +604,50 @@ $ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/lok '{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}' ``` +## `POST /api/prom/push` + +> **WARNING**: `/api/prom/push` is DEPRECATED; use `/loki/api/v1/push` +> instead. + +`/api/prom/push` is the endpoint used to send log entries to Loki. The default +behavior is for the POST body to be a snappy-compressed protobuf messsage: + +- [Protobuf definition](/pkg/logproto/logproto.proto) +- [Go client library](/pkg/promtail/client/client.go) + +Alternatively, if the `Content-Type` header is set to `application/json`, a +JSON post body can be sent in the following format: + +``` +{ + "streams": [ + { + "labels": "", + "entries": [ + { + "ts": "", + "line": "" + } + ] + } + ] +} +``` + +> **NOTE**: logs sent to Loki for every stream must be in timestamp-ascending +> order, meaning each log line must be more recent than the one last received. +> If logs do not follow this order, Loki will reject the log with an out of +> order error. + +In microservices mode, `/api/prom/push` is exposed by the distributor. + +### Examples + +```bash +$ curl -H "Content-Type: application/json" -XPOST -s "https://localhost:3100/api/prom/push" --data-raw \ + '{"streams": [{ "labels": "{foo=\"bar\"}", "entries": [{ "ts": "2018-12-18T08:28:06.801064-04:00", "line": "fizzbuzz" }] }]}' +``` + ## `GET /ready` `/ready` returns HTTP 200 when the Loki ingester is ready to accept traffic. If diff --git a/fluentd/fluent-plugin-grafana-loki/README.md b/fluentd/fluent-plugin-grafana-loki/README.md index 140d14bf654cd..5c2902872cf58 100644 --- a/fluentd/fluent-plugin-grafana-loki/README.md +++ b/fluentd/fluent-plugin-grafana-loki/README.md @@ -92,7 +92,7 @@ services: ## Configuration ### url -The url of the Loki server to send logs to. When sending data the publish path (`/loki/api/v1/push`) will automatically be appended. +The url of the Loki server to send logs to. When sending data the publish path (`/api/prom/push`) will automatically be appended. By default the url is set to `https://logs-us-west1.grafana.net`, the url of the Grafana Labs preview (hosted Loki)[https://grafana.com/loki] service. #### Proxy Support diff --git a/fluentd/fluent-plugin-grafana-loki/fluent-plugin-grafana-loki.gemspec b/fluentd/fluent-plugin-grafana-loki/fluent-plugin-grafana-loki.gemspec index b3b7a1caef82a..296387fdc3991 100644 --- a/fluentd/fluent-plugin-grafana-loki/fluent-plugin-grafana-loki.gemspec +++ b/fluentd/fluent-plugin-grafana-loki/fluent-plugin-grafana-loki.gemspec @@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__) Gem::Specification.new do |spec| spec.name = 'fluent-plugin-grafana-loki' - spec.version = '1.1.0' + spec.version = '1.0.2' spec.authors = %w[woodsaj briangann] spec.email = ['awoods@grafana.com', 'brian@grafana.com'] diff --git a/fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb b/fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb index 661d34648f9fb..c02426684a6c1 100644 --- a/fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb +++ b/fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb @@ -87,7 +87,7 @@ def write(chunk) body = { 'streams': payload } # add ingest path to loki url - uri = URI.parse(url + '/loki/api/v1/push') + uri = URI.parse(url + '/api/prom/push') req = Net::HTTP::Post.new( uri.request_uri diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index d5d512c2a7b68..7d3adeca2c2b0 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -1,14 +1,16 @@ package distributor import ( - "encoding/json" "net/http" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/util" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/unmarshal" + unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy" ) var contentType = http.CanonicalHeaderKey("Content-Type") @@ -21,7 +23,15 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { switch r.Header.Get(contentType) { case applicationJSON: - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + var err error + + if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { + err = unmarshal.DecodePushRequest(r.Body, &req) + } else { + err = unmarshal_legacy.DecodePushRequest(r.Body, &req) + } + + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index 259df3208f53e..f69fb1470c62b 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -23,6 +23,11 @@ type QueryResponse struct { Data QueryResponseData `json:"data"` } +// PushRequest models a log stream push +type PushRequest struct { + Streams []*Stream `json:"streams"` +} + // ResultType holds the type of the result type ResultType string diff --git a/pkg/logql/unmarshal/legacy/unmarshal.go b/pkg/logql/unmarshal/legacy/unmarshal.go new file mode 100644 index 0000000000000..065aef0e6b1d8 --- /dev/null +++ b/pkg/logql/unmarshal/legacy/unmarshal.go @@ -0,0 +1,13 @@ +package unmarshal + +import ( + "encoding/json" + "io" + + "github.com/grafana/loki/pkg/logproto" +) + +// DecodePushRequest directly decodes json to a logproto.PushRequest +func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error { + return json.NewDecoder(b).Decode(r) +} diff --git a/pkg/logql/unmarshal/legacy/unmarshal_test.go b/pkg/logql/unmarshal/legacy/unmarshal_test.go new file mode 100644 index 0000000000000..d27d28a35031e --- /dev/null +++ b/pkg/logql/unmarshal/legacy/unmarshal_test.go @@ -0,0 +1,67 @@ +package unmarshal + +import ( + "io/ioutil" + "log" + "strings" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" +) + +// covers requests to /api/prom/push +var pushTests = []struct { + expected []*logproto.Stream + actual string +}{ + { + []*logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"), + Line: "super line", + }, + }, + Labels: `{test="test"}`, + }, + }, + `{ + "streams":[ + { + "labels":"{test=\"test\"}", + "entries":[ + { + "ts": "2019-09-13T18:32:22.380001319Z", + "line": "super line" + } + ] + } + ] + }`, + }, +} + +func Test_DecodePushRequest(t *testing.T) { + + for i, pushTest := range pushTests { + var actual logproto.PushRequest + closer := ioutil.NopCloser(strings.NewReader(pushTest.actual)) + + err := DecodePushRequest(closer, &actual) + require.NoError(t, err) + + require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i) + } +} + +func mustParse(l string, t string) time.Time { + ret, err := time.Parse(l, t) + if err != nil { + log.Fatalf("Failed to parse %s", t) + } + + return ret +} diff --git a/pkg/logql/unmarshal/unmarshal.go b/pkg/logql/unmarshal/unmarshal.go new file mode 100644 index 0000000000000..fecaa178151a0 --- /dev/null +++ b/pkg/logql/unmarshal/unmarshal.go @@ -0,0 +1,60 @@ +package unmarshal + +import ( + "encoding/json" + "io" + + "github.com/grafana/loki/pkg/loghttp" + + "github.com/grafana/loki/pkg/logproto" +) + +// DecodePushRequest directly decodes json to a logproto.PushRequest +func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error { + var request loghttp.PushRequest + + err := json.NewDecoder(b).Decode(&request) + + if err != nil { + return err + } + + *r = NewPushRequest(request) + + return nil +} + +// NewPushRequest constructs a logproto.PushRequest from a PushRequest +func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest { + ret := logproto.PushRequest{ + Streams: make([]*logproto.Stream, len(r.Streams)), + } + + for i, s := range r.Streams { + ret.Streams[i] = NewStream(s) + } + + return ret +} + +// NewStream constructs a logproto.Stream from a Stream +func NewStream(s *loghttp.Stream) *logproto.Stream { + ret := &logproto.Stream{ + Entries: make([]logproto.Entry, len(s.Entries)), + Labels: s.Labels.String(), + } + + for i, e := range s.Entries { + ret.Entries[i] = NewEntry(e) + } + + return ret +} + +// NewEntry constructs a logproto.Entry from a Entry +func NewEntry(e loghttp.Entry) logproto.Entry { + return logproto.Entry{ + Timestamp: e.Timestamp, + Line: e.Line, + } +} diff --git a/pkg/logql/unmarshal/unmarshal_test.go b/pkg/logql/unmarshal/unmarshal_test.go new file mode 100644 index 0000000000000..10b67a9865070 --- /dev/null +++ b/pkg/logql/unmarshal/unmarshal_test.go @@ -0,0 +1,56 @@ +package unmarshal + +import ( + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" +) + +// covers requests to /loki/api/v1/push +var pushTests = []struct { + expected []*logproto.Stream + actual string +}{ + { + []*logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + }, + }, + Labels: `{test="test"}`, + }, + }, + `{ + "streams": [ + { + "stream": { + "test": "test" + }, + "values":[ + [ "123456789012345", "super line" ] + ] + } + ] + }`, + }, +} + +func Test_DecodePushRequest(t *testing.T) { + + for i, pushTest := range pushTests { + var actual logproto.PushRequest + closer := ioutil.NopCloser(strings.NewReader(pushTest.actual)) + + err := DecodePushRequest(closer, &actual) + require.NoError(t, err) + + require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i) + } +}