diff --git a/integration/client/client.go b/integration/client/client.go index 558c5d6c47077..114c99f27929f 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -110,27 +110,22 @@ func formatTS(ts time.Time) string { type stream struct { Stream map[string]string `json:"stream"` - Values [][]string `json:"values"` + Values [][]any `json:"values"` } // pushLogLine creates a new logline func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error { apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL) - logLabelsJSON, err := logLabels.MarshalJSON() - if err != nil { - return err - } - s := stream{ Stream: map[string]string{ "job": "varlog", }, - Values: [][]string{ + Values: [][]any{ { formatTS(timestamp), line, - string(logLabelsJSON), + logLabels, }, }, } diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index af8ebbe9689fc..ddc66193138f4 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -55,9 +55,122 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error { }) } -// PushRequest models a log stream push +// PushRequest models a log stream push but is unmarshalled to proto push format. type PushRequest struct { - Streams []*Stream `json:"streams"` + Streams []LogProtoStream `json:"streams"` +} + +// LogProtoStream helps with unmarshalling of each log stream for push request. +// This might look un-necessary but without it the CPU usage in benchmarks was increasing by ~25% :shrug: +type LogProtoStream struct { + logproto.Stream +} + +func (s *LogProtoStream) UnmarshalJSON(data []byte) error { + err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error { + switch string(key) { + case "stream": + labels := make(LabelSet) + err := jsonparser.ObjectEach(val, func(key, val []byte, _ jsonparser.ValueType, _ int) error { + labels[yoloString(key)] = yoloString(val) + return nil + }) + if err != nil { + return err + } + s.Labels = labels.String() + case "values": + if ty == jsonparser.Null { + return nil + } + entries, err := unmarshalHTTPToLogProtoEntries(val) + if err != nil { + return err + } + s.Entries = entries + } + return nil + }) + return err +} + +func unmarshalHTTPToLogProtoEntries(data []byte) ([]logproto.Entry, error) { + var ( + entries []logproto.Entry + parseError error + ) + _, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) { + if err != nil || parseError != nil { + return + } + if ty == jsonparser.Null { + return + } + e, err := unmarshalHTTPToLogProtoEntry(value) + if err != nil { + parseError = err + return + } + entries = append(entries, e) + }) + if parseError != nil { + return nil, parseError + } + if err != nil { + return nil, parseError + } + + return entries, nil +} + +func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) { + var ( + i int + parseError error + e logproto.Entry + ) + _, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) { + // assert that both items in array are of type string + if (i == 0 || i == 1) && t != jsonparser.String { + parseError = jsonparser.MalformedStringError + return + } else if i == 2 && t != jsonparser.Object { + parseError = jsonparser.MalformedObjectError + return + } + switch i { + case 0: // timestamp + ts, err := jsonparser.ParseInt(value) + if err != nil { + parseError = err + return + } + e.Timestamp = time.Unix(0, ts) + case 1: // value + v, err := jsonparser.ParseString(value) + if err != nil { + parseError = err + return + } + e.Line = v + case 2: // labels + labels := make(LabelSet) + err := jsonparser.ObjectEach(value, func(key, val []byte, _ jsonparser.ValueType, _ int) error { + labels[yoloString(key)] = yoloString(val) + return nil + }) + if err != nil { + parseError = err + return + } + e.Labels = labels.String() + } + i++ + }) + if parseError != nil { + return e, parseError + } + return e, err } // ResultType holds the type of the result @@ -377,3 +490,7 @@ func labelVolumeLimit(r *http.Request) error { return nil } + +func yoloString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} diff --git a/pkg/util/unmarshal/unmarshal.go b/pkg/util/unmarshal/unmarshal.go index 3250b5a06c2e4..51e7d1108d9d9 100644 --- a/pkg/util/unmarshal/unmarshal.go +++ b/pkg/util/unmarshal/unmarshal.go @@ -2,7 +2,6 @@ package unmarshal import ( "io" - "reflect" "unsafe" jsoniter "github.com/json-iterator/go" @@ -19,57 +18,13 @@ func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error { return err } - req, err := NewPushRequest(request) - if err != nil { - return err + *r = logproto.PushRequest{ + Streams: *(*[]logproto.Stream)(unsafe.Pointer(&request.Streams)), } - *r = *req return nil } -// NewPushRequest constructs a logproto.PushRequest from a PushRequest -func NewPushRequest(r loghttp.PushRequest) (*logproto.PushRequest, error) { - ret := logproto.PushRequest{ - Streams: make([]logproto.Stream, len(r.Streams)), - } - - for i, s := range r.Streams { - stream, err := NewStream(s) - if err != nil { - return nil, err - } - - ret.Streams[i] = *stream - } - - return &ret, nil -} - -// NewStream constructs a logproto.Stream from a Stream -func NewStream(s *loghttp.Stream) (*logproto.Stream, error) { - stream := logproto.Stream{ - Entries: *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)), - Labels: s.Labels.String(), - } - - for i, entry := range stream.Entries { - if entry.Labels == "" { - continue - } - // labels in v1 HTTP push endpoint are in json format({"foo":"bar"}) while for proto it is key=value format({foo="bar"}) - // So here we need to convert metadata labels from json to proto format. - // ToDo(Sandeep): Find a way to either not do the conversion or efficiently do it since - // metadata labels can be attached to each log line. - labels := loghttp.LabelSet{} - if err := labels.UnmarshalJSON(yoloBytes(entry.Labels)); err != nil { - return nil, err - } - stream.Entries[i].Labels = labels.String() - } - return &stream, nil -} - // WebsocketReader knows how to read message to a websocket connection. type WebsocketReader interface { ReadMessage() (int, []byte, error) @@ -83,9 +38,3 @@ func ReadTailResponseJSON(r *loghttp.TailResponse, reader WebsocketReader) error } return jsoniter.Unmarshal(data, r) } - -func yoloBytes(s string) (b []byte) { - *(*string)(unsafe.Pointer(&b)) = s - (*reflect.SliceHeader)(unsafe.Pointer(&b)).Cap = len(s) - return -} diff --git a/pkg/util/unmarshal/unmarshal_test.go b/pkg/util/unmarshal/unmarshal_test.go index 51e515e0d1df0..da8f8640f4d46 100644 --- a/pkg/util/unmarshal/unmarshal_test.go +++ b/pkg/util/unmarshal/unmarshal_test.go @@ -45,6 +45,32 @@ var pushTests = []struct { ] }`, }, + { + []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + Labels: `{foo="bar"}`, + }, + }, + Labels: `{test="test"}`, + }, + }, + `{ + "streams": [ + { + "stream": { + "test": "test" + }, + "values":[ + [ "123456789012345", "super line", {"foo":"bar"} ] + ] + } + ] + }`, + }, } func Test_DecodePushRequest(t *testing.T) {