Skip to content

Commit

Permalink
use map object instead of string representation for log labels in jso…
Browse files Browse the repository at this point in the history
…n push request
  • Loading branch information
sandeepsukhani committed Jun 19, 2023
1 parent 7b77c69 commit 44b0932
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 63 deletions.
11 changes: 3 additions & 8 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,22 @@ func formatTS(ts time.Time) string {

type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
Values [][]any `json:"values"`
}

// pushLogLine creates a new logline
func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)

logLabelsJSON, err := logLabels.MarshalJSON()
if err != nil {
return err
}

s := stream{
Stream: map[string]string{
"job": "varlog",
},
Values: [][]string{
Values: [][]any{
{
formatTS(timestamp),
line,
string(logLabelsJSON),
logLabels,
},
},
}
Expand Down
121 changes: 119 additions & 2 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,122 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error {
})
}

// PushRequest models a log stream push
// PushRequest models a log stream push but is unmarshalled to proto push format.
type PushRequest struct {
Streams []*Stream `json:"streams"`
Streams []LogProtoStream `json:"streams"`
}

// LogProtoStream helps with unmarshalling of each log stream for push request.
// This might look un-necessary but without it the CPU usage in benchmarks was increasing by ~25% :shrug:
type LogProtoStream struct {
logproto.Stream
}

func (s *LogProtoStream) UnmarshalJSON(data []byte) error {
err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
switch string(key) {
case "stream":
labels := make(LabelSet)
err := jsonparser.ObjectEach(val, func(key, val []byte, _ jsonparser.ValueType, _ int) error {
labels[yoloString(key)] = yoloString(val)
return nil
})
if err != nil {
return err
}
s.Labels = labels.String()
case "values":
if ty == jsonparser.Null {
return nil
}
entries, err := unmarshalHTTPToLogProtoEntries(val)
if err != nil {
return err
}
s.Entries = entries
}
return nil
})
return err
}

func unmarshalHTTPToLogProtoEntries(data []byte) ([]logproto.Entry, error) {
var (
entries []logproto.Entry
parseError error
)
_, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
if err != nil || parseError != nil {
return
}
if ty == jsonparser.Null {
return
}
e, err := unmarshalHTTPToLogProtoEntry(value)
if err != nil {
parseError = err
return
}
entries = append(entries, e)
})
if parseError != nil {
return nil, parseError
}
if err != nil {
return nil, parseError
}

return entries, nil
}

func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) {
var (
i int
parseError error
e logproto.Entry
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if (i == 0 || i == 1) && t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
} else if i == 2 && t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
switch i {
case 0: // timestamp
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // labels
labels := make(LabelSet)
err := jsonparser.ObjectEach(value, func(key, val []byte, _ jsonparser.ValueType, _ int) error {
labels[yoloString(key)] = yoloString(val)
return nil
})
if err != nil {
parseError = err
return
}
e.Labels = labels.String()
}
i++
})
if parseError != nil {
return e, parseError
}
return e, err
}

// ResultType holds the type of the result
Expand Down Expand Up @@ -377,3 +490,7 @@ func labelVolumeLimit(r *http.Request) error {

return nil
}

func yoloString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
55 changes: 2 additions & 53 deletions pkg/util/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package unmarshal

import (
"io"
"reflect"
"unsafe"

jsoniter "github.com/json-iterator/go"
Expand All @@ -19,57 +18,13 @@ func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
return err
}

req, err := NewPushRequest(request)
if err != nil {
return err
*r = logproto.PushRequest{
Streams: *(*[]logproto.Stream)(unsafe.Pointer(&request.Streams)),
}
*r = *req

return nil
}

// NewPushRequest constructs a logproto.PushRequest from a PushRequest
func NewPushRequest(r loghttp.PushRequest) (*logproto.PushRequest, error) {
ret := logproto.PushRequest{
Streams: make([]logproto.Stream, len(r.Streams)),
}

for i, s := range r.Streams {
stream, err := NewStream(s)
if err != nil {
return nil, err
}

ret.Streams[i] = *stream
}

return &ret, nil
}

// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) (*logproto.Stream, error) {
stream := logproto.Stream{
Entries: *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)),
Labels: s.Labels.String(),
}

for i, entry := range stream.Entries {
if entry.Labels == "" {
continue
}
// labels in v1 HTTP push endpoint are in json format({"foo":"bar"}) while for proto it is key=value format({foo="bar"})
// So here we need to convert metadata labels from json to proto format.
// ToDo(Sandeep): Find a way to either not do the conversion or efficiently do it since
// metadata labels can be attached to each log line.
labels := loghttp.LabelSet{}
if err := labels.UnmarshalJSON(yoloBytes(entry.Labels)); err != nil {
return nil, err
}
stream.Entries[i].Labels = labels.String()
}
return &stream, nil
}

// WebsocketReader knows how to read message to a websocket connection.
type WebsocketReader interface {
ReadMessage() (int, []byte, error)
Expand All @@ -83,9 +38,3 @@ func ReadTailResponseJSON(r *loghttp.TailResponse, reader WebsocketReader) error
}
return jsoniter.Unmarshal(data, r)
}

func yoloBytes(s string) (b []byte) {
*(*string)(unsafe.Pointer(&b)) = s
(*reflect.SliceHeader)(unsafe.Pointer(&b)).Cap = len(s)
return
}
26 changes: 26 additions & 0 deletions pkg/util/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ var pushTests = []struct {
]
}`,
},
{
[]logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
Labels: `{foo="bar"}`,
},
},
Labels: `{test="test"}`,
},
},
`{
"streams": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line", {"foo":"bar"} ]
]
}
]
}`,
},
}

func Test_DecodePushRequest(t *testing.T) {
Expand Down

0 comments on commit 44b0932

Please sign in to comment.