diff --git a/Gopkg.lock b/Gopkg.lock index bc716a09b7cf4..5120c57e5bb21 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -187,7 +187,7 @@ [[projects]] branch = "lazy-load-chunks" - digest = "1:bf1fa66c54722bc8664f1465e427cd6fe7df52f2b6fd5ab996baf37601687b70" + digest = "1:a999c29b3a215dfc12d374a9aac09c94c1b72ef530f4e39d9ab3ae1468cfe8e8" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -213,7 +213,7 @@ "pkg/util/validation", ] pruneopts = "UT" - revision = "95a3f308e95617732b76e337874e83ccf173cf14" + revision = "61b92520b0c1afdef6e42b7a27cca6c715e9f386" source = "https://github.com/grafana/cortex" [[projects]] diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index 8dd17f50b7bcf..c7598067f0494 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -3,28 +3,22 @@ package logproto -import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" -import math "math" -import _ "github.com/gogo/protobuf/gogoproto" -import _ "github.com/gogo/protobuf/types" - -import time "time" - -import strconv "strconv" - -import strings "strings" -import reflect "reflect" - import ( - context "golang.org/x/net/context" + context "context" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + _ "github.com/gogo/protobuf/types" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" grpc "google.golang.org/grpc" + io "io" + math "math" + reflect "reflect" + strconv "strconv" + strings "strings" + time "time" ) -import github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" - -import io "io" - // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf @@ -48,23 +42,24 @@ var Direction_name = map[int32]string{ 0: "FORWARD", 1: "BACKWARD", } + var Direction_value = map[string]int32{ "FORWARD": 0, "BACKWARD": 1, } func (Direction) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{0} + return fileDescriptor_7a8976f235a02f79, []int{0} } type PushRequest struct { - Streams []*Stream `protobuf:"bytes,1,rep,name=streams" json:"streams"` + Streams []*Stream `protobuf:"bytes,1,rep,name=streams,proto3" json:"streams"` } func (m *PushRequest) Reset() { *m = PushRequest{} } func (*PushRequest) ProtoMessage() {} func (*PushRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{0} + return fileDescriptor_7a8976f235a02f79, []int{0} } func (m *PushRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -81,8 +76,8 @@ func (m *PushRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (dst *PushRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_PushRequest.Merge(dst, src) +func (m *PushRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PushRequest.Merge(m, src) } func (m *PushRequest) XXX_Size() int { return m.Size() @@ -106,7 +101,7 @@ type PushResponse struct { func (m *PushResponse) Reset() { *m = PushResponse{} } func (*PushResponse) ProtoMessage() {} func (*PushResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{1} + return fileDescriptor_7a8976f235a02f79, []int{1} } func (m *PushResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -123,8 +118,8 @@ func (m *PushResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (dst *PushResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_PushResponse.Merge(dst, src) +func (m *PushResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PushResponse.Merge(m, src) } func (m *PushResponse) XXX_Size() int { return m.Size() @@ -138,8 +133,8 @@ var xxx_messageInfo_PushResponse proto.InternalMessageInfo type QueryRequest struct { Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` Limit uint32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` - Start time.Time `protobuf:"bytes,3,opt,name=start,stdtime" json:"start"` - End time.Time `protobuf:"bytes,4,opt,name=end,stdtime" json:"end"` + Start time.Time `protobuf:"bytes,3,opt,name=start,proto3,stdtime" json:"start"` + End time.Time `protobuf:"bytes,4,opt,name=end,proto3,stdtime" json:"end"` Direction Direction `protobuf:"varint,5,opt,name=direction,proto3,enum=logproto.Direction" json:"direction,omitempty"` Regex string `protobuf:"bytes,6,opt,name=regex,proto3" json:"regex,omitempty"` } @@ -147,7 +142,7 @@ type QueryRequest struct { func (m *QueryRequest) Reset() { *m = QueryRequest{} } func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{2} + return fileDescriptor_7a8976f235a02f79, []int{2} } func (m *QueryRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -164,8 +159,8 @@ func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (dst *QueryRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_QueryRequest.Merge(dst, src) +func (m *QueryRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryRequest.Merge(m, src) } func (m *QueryRequest) XXX_Size() int { return m.Size() @@ -219,13 +214,13 @@ func (m *QueryRequest) GetRegex() string { } type QueryResponse struct { - Streams []*Stream `protobuf:"bytes,1,rep,name=streams" json:"streams,omitempty"` + Streams []*Stream `protobuf:"bytes,1,rep,name=streams,proto3" json:"streams,omitempty"` } func (m *QueryResponse) Reset() { *m = QueryResponse{} } func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{3} + return fileDescriptor_7a8976f235a02f79, []int{3} } func (m *QueryResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -242,8 +237,8 @@ func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error return b[:n], nil } } -func (dst *QueryResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_QueryResponse.Merge(dst, src) +func (m *QueryResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryResponse.Merge(m, src) } func (m *QueryResponse) XXX_Size() int { return m.Size() @@ -269,7 +264,7 @@ type LabelRequest struct { func (m *LabelRequest) Reset() { *m = LabelRequest{} } func (*LabelRequest) ProtoMessage() {} func (*LabelRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{4} + return fileDescriptor_7a8976f235a02f79, []int{4} } func (m *LabelRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -286,8 +281,8 @@ func (m *LabelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) return b[:n], nil } } -func (dst *LabelRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_LabelRequest.Merge(dst, src) +func (m *LabelRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelRequest.Merge(m, src) } func (m *LabelRequest) XXX_Size() int { return m.Size() @@ -313,13 +308,13 @@ func (m *LabelRequest) GetValues() bool { } type LabelResponse struct { - Values []string `protobuf:"bytes,1,rep,name=values" json:"values,omitempty"` + Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` } func (m *LabelResponse) Reset() { *m = LabelResponse{} } func (*LabelResponse) ProtoMessage() {} func (*LabelResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{5} + return fileDescriptor_7a8976f235a02f79, []int{5} } func (m *LabelResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -336,8 +331,8 @@ func (m *LabelResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error return b[:n], nil } } -func (dst *LabelResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_LabelResponse.Merge(dst, src) +func (m *LabelResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelResponse.Merge(m, src) } func (m *LabelResponse) XXX_Size() int { return m.Size() @@ -357,13 +352,13 @@ func (m *LabelResponse) GetValues() []string { type Stream struct { Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` - Entries []Entry `protobuf:"bytes,2,rep,name=entries" json:"entries"` + Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries"` } func (m *Stream) Reset() { *m = Stream{} } func (*Stream) ProtoMessage() {} func (*Stream) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{6} + return fileDescriptor_7a8976f235a02f79, []int{6} } func (m *Stream) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -380,8 +375,8 @@ func (m *Stream) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (dst *Stream) XXX_Merge(src proto.Message) { - xxx_messageInfo_Stream.Merge(dst, src) +func (m *Stream) XXX_Merge(src proto.Message) { + xxx_messageInfo_Stream.Merge(m, src) } func (m *Stream) XXX_Size() int { return m.Size() @@ -407,14 +402,14 @@ func (m *Stream) GetEntries() []Entry { } type Entry struct { - Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"ts"` + Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` } func (m *Entry) Reset() { *m = Entry{} } func (*Entry) ProtoMessage() {} func (*Entry) Descriptor() ([]byte, []int) { - return fileDescriptor_logproto_ab9c6ba375ad5e80, []int{7} + return fileDescriptor_7a8976f235a02f79, []int{7} } func (m *Entry) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -431,8 +426,8 @@ func (m *Entry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (dst *Entry) XXX_Merge(src proto.Message) { - xxx_messageInfo_Entry.Merge(dst, src) +func (m *Entry) XXX_Merge(src proto.Message) { + xxx_messageInfo_Entry.Merge(m, src) } func (m *Entry) XXX_Size() int { return m.Size() @@ -458,6 +453,7 @@ func (m *Entry) GetLine() string { } func init() { + proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value) proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest") proto.RegisterType((*PushResponse)(nil), "logproto.PushResponse") proto.RegisterType((*QueryRequest)(nil), "logproto.QueryRequest") @@ -466,8 +462,52 @@ func init() { proto.RegisterType((*LabelResponse)(nil), "logproto.LabelResponse") proto.RegisterType((*Stream)(nil), "logproto.Stream") proto.RegisterType((*Entry)(nil), "logproto.Entry") - proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value) } + +func init() { proto.RegisterFile("logproto.proto", fileDescriptor_7a8976f235a02f79) } + +var fileDescriptor_7a8976f235a02f79 = []byte{ + // 601 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x4f, 0x6f, 0xd3, 0x4e, + 0x10, 0xf5, 0xb6, 0x89, 0x13, 0x4f, 0xd2, 0xb4, 0xda, 0xdf, 0x8f, 0x62, 0x45, 0x68, 0x1d, 0xf9, + 0x00, 0x51, 0x25, 0x5c, 0x08, 0x88, 0x4a, 0x85, 0x4b, 0x4d, 0xa9, 0x90, 0x40, 0x02, 0x16, 0x24, + 0xce, 0x4e, 0xbb, 0xb8, 0x96, 0xfc, 0xa7, 0xb5, 0xd7, 0x88, 0xde, 0x90, 0xf8, 0x02, 0xfd, 0x18, + 0x7c, 0x94, 0x1e, 0x73, 0xec, 0x29, 0x10, 0xe7, 0x82, 0x72, 0xea, 0x8d, 0x2b, 0xda, 0xb5, 0x1d, + 0x1b, 0x90, 0x40, 0x5c, 0x9c, 0x79, 0xbb, 0xef, 0xcd, 0xec, 0x9b, 0x99, 0x40, 0xcf, 0x8f, 0xdc, + 0x93, 0x38, 0xe2, 0x91, 0x25, 0xbf, 0xb8, 0x5d, 0xe2, 0xbe, 0xe1, 0x46, 0x91, 0xeb, 0xb3, 0x6d, + 0x89, 0xc6, 0xe9, 0xbb, 0x6d, 0xee, 0x05, 0x2c, 0xe1, 0x4e, 0x70, 0x92, 0x53, 0xfb, 0xb7, 0x5d, + 0x8f, 0x1f, 0xa7, 0x63, 0xeb, 0x30, 0x0a, 0xb6, 0xdd, 0xc8, 0x8d, 0x2a, 0xa6, 0x40, 0x12, 0xc8, + 0x28, 0xa7, 0x9b, 0x07, 0xd0, 0x79, 0x99, 0x26, 0xc7, 0x94, 0x9d, 0xa6, 0x2c, 0xe1, 0x78, 0x07, + 0x5a, 0x09, 0x8f, 0x99, 0x13, 0x24, 0x3a, 0x1a, 0xac, 0x0e, 0x3b, 0xa3, 0x0d, 0x6b, 0xf9, 0x94, + 0xd7, 0xf2, 0xc2, 0xee, 0x2c, 0xa6, 0x46, 0x49, 0xa2, 0x65, 0x60, 0xf6, 0xa0, 0x9b, 0xe7, 0x49, + 0x4e, 0xa2, 0x30, 0x61, 0xe6, 0x77, 0x04, 0xdd, 0x57, 0x29, 0x8b, 0xcf, 0xca, 0xcc, 0xff, 0x43, + 0xf3, 0x54, 0x60, 0x1d, 0x0d, 0xd0, 0x50, 0xa3, 0x39, 0x10, 0xa7, 0xbe, 0x17, 0x78, 0x5c, 0x5f, + 0x19, 0xa0, 0xe1, 0x1a, 0xcd, 0x01, 0xde, 0x85, 0x66, 0xc2, 0x9d, 0x98, 0xeb, 0xab, 0x03, 0x34, + 0xec, 0x8c, 0xfa, 0x56, 0x6e, 0xda, 0x2a, 0xad, 0x58, 0x6f, 0x4a, 0xd3, 0x76, 0xfb, 0x62, 0x6a, + 0x28, 0xe7, 0x5f, 0x0c, 0x44, 0x73, 0x09, 0x7e, 0x00, 0xab, 0x2c, 0x3c, 0xd2, 0x1b, 0xff, 0xa0, + 0x14, 0x02, 0x7c, 0x17, 0xb4, 0x23, 0x2f, 0x66, 0x87, 0xdc, 0x8b, 0x42, 0xbd, 0x39, 0x40, 0xc3, + 0xde, 0xe8, 0xbf, 0xca, 0xfb, 0x7e, 0x79, 0x45, 0x2b, 0x96, 0x78, 0x7c, 0xcc, 0x5c, 0xf6, 0x41, + 0x57, 0x73, 0x4b, 0x12, 0x98, 0x0f, 0x61, 0xad, 0x30, 0x9e, 0xb7, 0x02, 0x6f, 0xfd, 0xb5, 0xa7, + 0x55, 0x1b, 0x77, 0xa1, 0xfb, 0xdc, 0x19, 0x33, 0xbf, 0xec, 0x1a, 0x86, 0x46, 0xe8, 0x04, 0xac, + 0x68, 0x9a, 0x8c, 0xf1, 0x26, 0xa8, 0xef, 0x1d, 0x3f, 0x65, 0x89, 0x6c, 0x5a, 0x9b, 0x16, 0xc8, + 0xbc, 0x05, 0x6b, 0x85, 0xb6, 0x28, 0x5c, 0x11, 0x45, 0x5d, 0x6d, 0x49, 0x3c, 0x06, 0x35, 0xaf, + 0x8b, 0x4d, 0x50, 0x7d, 0x21, 0x49, 0xf2, 0x02, 0x36, 0x2c, 0xa6, 0x46, 0x71, 0x42, 0x8b, 0x5f, + 0xbc, 0x0b, 0x2d, 0x16, 0xf2, 0xd8, 0x93, 0xf5, 0xc4, 0xf3, 0xd7, 0xab, 0xe7, 0x3f, 0x09, 0x79, + 0x7c, 0x66, 0xaf, 0x8b, 0x4e, 0x8a, 0xad, 0x28, 0x78, 0xb4, 0x0c, 0xcc, 0x08, 0x9a, 0x92, 0x82, + 0x9f, 0x82, 0xb6, 0x5c, 0x54, 0x59, 0xeb, 0xcf, 0xb3, 0xe9, 0x15, 0x19, 0x57, 0x78, 0x22, 0x27, + 0x54, 0x89, 0xf1, 0x0d, 0x68, 0xf8, 0x5e, 0xc8, 0xa4, 0x77, 0xcd, 0x6e, 0x2f, 0xa6, 0x86, 0xc4, + 0x54, 0x7e, 0xb7, 0x6e, 0x82, 0xb6, 0x1c, 0x15, 0xee, 0x40, 0xeb, 0xe0, 0x05, 0x7d, 0xbb, 0x47, + 0xf7, 0x37, 0x14, 0xdc, 0x85, 0xb6, 0xbd, 0xf7, 0xf8, 0x99, 0x44, 0x68, 0xb4, 0x07, 0xaa, 0x58, + 0x57, 0x16, 0xe3, 0x1d, 0x68, 0x88, 0x08, 0x5f, 0xab, 0x5c, 0xd5, 0xfe, 0x10, 0xfd, 0xcd, 0x5f, + 0x8f, 0x8b, 0xfd, 0x56, 0x46, 0x9f, 0x10, 0xb4, 0xc4, 0xa0, 0x3d, 0x16, 0xe3, 0x47, 0xd0, 0x94, + 0x33, 0xc7, 0x35, 0x7a, 0x7d, 0xfb, 0xfb, 0xd7, 0x7f, 0x3b, 0x2f, 0xf3, 0xdc, 0x41, 0x62, 0xdd, + 0xe5, 0xe0, 0xea, 0xea, 0xfa, 0x16, 0xd4, 0xd5, 0x3f, 0x4d, 0xd8, 0x54, 0xec, 0xfb, 0x93, 0x19, + 0x51, 0x2e, 0x67, 0x44, 0xb9, 0x9a, 0x11, 0xf4, 0x31, 0x23, 0xe8, 0x73, 0x46, 0xd0, 0x45, 0x46, + 0xd0, 0x24, 0x23, 0xe8, 0x6b, 0x46, 0xd0, 0xb7, 0x8c, 0x28, 0x57, 0x19, 0x41, 0xe7, 0x73, 0xa2, + 0x4c, 0xe6, 0x44, 0xb9, 0x9c, 0x13, 0x65, 0xac, 0xca, 0x64, 0xf7, 0x7e, 0x04, 0x00, 0x00, 0xff, + 0xff, 0x47, 0x69, 0x1e, 0x88, 0x68, 0x04, 0x00, 0x00, +} + func (x Direction) String() string { s, ok := Direction_name[int32(x)] if ok { @@ -1088,17 +1128,17 @@ func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x1a i++ i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Start))) - n1, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Start, dAtA[i:]) - if err != nil { - return 0, err + n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Start, dAtA[i:]) + if err1 != nil { + return 0, err1 } i += n1 dAtA[i] = 0x22 i++ i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.End))) - n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i:]) - if err != nil { - return 0, err + n2, err2 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i:]) + if err2 != nil { + return 0, err2 } i += n2 if m.Direction != 0 { @@ -1266,9 +1306,9 @@ func (m *Entry) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp))) - n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) - if err != nil { - return 0, err + n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + if err3 != nil { + return 0, err3 } i += n3 if len(m.Line) > 0 { @@ -1437,8 +1477,13 @@ func (this *PushRequest) String() string { if this == nil { return "nil" } + repeatedStringForStreams := "[]*Stream{" + for _, f := range this.Streams { + repeatedStringForStreams += strings.Replace(f.String(), "Stream", "Stream", 1) + "," + } + repeatedStringForStreams += "}" s := strings.Join([]string{`&PushRequest{`, - `Streams:` + strings.Replace(fmt.Sprintf("%v", this.Streams), "Stream", "Stream", 1) + `,`, + `Streams:` + repeatedStringForStreams + `,`, `}`, }, "") return s @@ -1459,8 +1504,8 @@ func (this *QueryRequest) String() string { s := strings.Join([]string{`&QueryRequest{`, `Query:` + fmt.Sprintf("%v", this.Query) + `,`, `Limit:` + fmt.Sprintf("%v", this.Limit) + `,`, - `Start:` + strings.Replace(strings.Replace(this.Start.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, - `End:` + strings.Replace(strings.Replace(this.End.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `Direction:` + fmt.Sprintf("%v", this.Direction) + `,`, `Regex:` + fmt.Sprintf("%v", this.Regex) + `,`, `}`, @@ -1471,8 +1516,13 @@ func (this *QueryResponse) String() string { if this == nil { return "nil" } + repeatedStringForStreams := "[]*Stream{" + for _, f := range this.Streams { + repeatedStringForStreams += strings.Replace(f.String(), "Stream", "Stream", 1) + "," + } + repeatedStringForStreams += "}" s := strings.Join([]string{`&QueryResponse{`, - `Streams:` + strings.Replace(fmt.Sprintf("%v", this.Streams), "Stream", "Stream", 1) + `,`, + `Streams:` + repeatedStringForStreams + `,`, `}`, }, "") return s @@ -1502,9 +1552,14 @@ func (this *Stream) String() string { if this == nil { return "nil" } + repeatedStringForEntries := "[]Entry{" + for _, f := range this.Entries { + repeatedStringForEntries += strings.Replace(strings.Replace(f.String(), "Entry", "Entry", 1), `&`, ``, 1) + "," + } + repeatedStringForEntries += "}" s := strings.Join([]string{`&Stream{`, `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, - `Entries:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Entries), "Entry", "Entry", 1), `&`, ``, 1) + `,`, + `Entries:` + repeatedStringForEntries + `,`, `}`, }, "") return s @@ -1514,7 +1569,7 @@ func (this *Entry) String() string { return "nil" } s := strings.Join([]string{`&Entry{`, - `Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Timestamp:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timestamp), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `Line:` + fmt.Sprintf("%v", this.Line) + `,`, `}`, }, "") @@ -1543,7 +1598,7 @@ func (m *PushRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1571,7 +1626,7 @@ func (m *PushRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1580,6 +1635,9 @@ func (m *PushRequest) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1597,6 +1655,9 @@ func (m *PushRequest) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1624,7 +1685,7 @@ func (m *PushResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1647,6 +1708,9 @@ func (m *PushResponse) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1674,7 +1738,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1702,7 +1766,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1712,6 +1776,9 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1731,7 +1798,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Limit |= (uint32(b) & 0x7F) << shift + m.Limit |= uint32(b&0x7F) << shift if b < 0x80 { break } @@ -1750,7 +1817,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1759,6 +1826,9 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1780,7 +1850,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1789,6 +1859,9 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1810,7 +1883,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Direction |= (Direction(b) & 0x7F) << shift + m.Direction |= Direction(b&0x7F) << shift if b < 0x80 { break } @@ -1829,7 +1902,7 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1839,6 +1912,9 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1853,6 +1929,9 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1880,7 +1959,7 @@ func (m *QueryResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1908,7 +1987,7 @@ func (m *QueryResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1917,6 +1996,9 @@ func (m *QueryResponse) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1934,6 +2016,9 @@ func (m *QueryResponse) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1961,7 +2046,7 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1989,7 +2074,7 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1999,6 +2084,9 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -2018,7 +2106,7 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= (int(b) & 0x7F) << shift + v |= int(b&0x7F) << shift if b < 0x80 { break } @@ -2033,6 +2121,9 @@ func (m *LabelRequest) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -2060,7 +2151,7 @@ func (m *LabelResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -2088,7 +2179,7 @@ func (m *LabelResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -2098,6 +2189,9 @@ func (m *LabelResponse) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -2112,6 +2206,9 @@ func (m *LabelResponse) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -2139,7 +2236,7 @@ func (m *Stream) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -2167,7 +2264,7 @@ func (m *Stream) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -2177,6 +2274,9 @@ func (m *Stream) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -2196,7 +2296,7 @@ func (m *Stream) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -2205,6 +2305,9 @@ func (m *Stream) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -2222,6 +2325,9 @@ func (m *Stream) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -2249,7 +2355,7 @@ func (m *Entry) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -2277,7 +2383,7 @@ func (m *Entry) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -2286,6 +2392,9 @@ func (m *Entry) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -2307,7 +2416,7 @@ func (m *Entry) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -2317,6 +2426,9 @@ func (m *Entry) Unmarshal(dAtA []byte) error { return ErrInvalidLengthLogproto } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -2331,6 +2443,9 @@ func (m *Entry) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthLogproto } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -2397,10 +2512,13 @@ func skipLogproto(dAtA []byte) (n int, err error) { break } } - iNdEx += length if length < 0 { return 0, ErrInvalidLengthLogproto } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthLogproto + } return iNdEx, nil case 3: for { @@ -2429,6 +2547,9 @@ func skipLogproto(dAtA []byte) (n int, err error) { return 0, err } iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthLogproto + } } return iNdEx, nil case 4: @@ -2447,47 +2568,3 @@ var ( ErrInvalidLengthLogproto = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowLogproto = fmt.Errorf("proto: integer overflow") ) - -func init() { proto.RegisterFile("logproto.proto", fileDescriptor_logproto_ab9c6ba375ad5e80) } - -var fileDescriptor_logproto_ab9c6ba375ad5e80 = []byte{ - // 601 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x4f, 0x6f, 0xd3, 0x4e, - 0x10, 0xf5, 0xb6, 0x89, 0x13, 0x4f, 0xd2, 0xb4, 0xda, 0xdf, 0x8f, 0x62, 0x45, 0x68, 0x1d, 0xf9, - 0x00, 0x51, 0x25, 0x5c, 0x08, 0x88, 0x4a, 0x85, 0x4b, 0x4d, 0xa9, 0x90, 0x40, 0x02, 0x16, 0x24, - 0xce, 0x4e, 0xbb, 0xb8, 0x96, 0xfc, 0xa7, 0xb5, 0xd7, 0x88, 0xde, 0x90, 0xf8, 0x02, 0xfd, 0x18, - 0x7c, 0x94, 0x1e, 0x73, 0xec, 0x29, 0x10, 0xe7, 0x82, 0x72, 0xea, 0x8d, 0x2b, 0xda, 0xb5, 0x1d, - 0x1b, 0x90, 0x40, 0x5c, 0x9c, 0x79, 0xbb, 0xef, 0xcd, 0xec, 0x9b, 0x99, 0x40, 0xcf, 0x8f, 0xdc, - 0x93, 0x38, 0xe2, 0x91, 0x25, 0xbf, 0xb8, 0x5d, 0xe2, 0xbe, 0xe1, 0x46, 0x91, 0xeb, 0xb3, 0x6d, - 0x89, 0xc6, 0xe9, 0xbb, 0x6d, 0xee, 0x05, 0x2c, 0xe1, 0x4e, 0x70, 0x92, 0x53, 0xfb, 0xb7, 0x5d, - 0x8f, 0x1f, 0xa7, 0x63, 0xeb, 0x30, 0x0a, 0xb6, 0xdd, 0xc8, 0x8d, 0x2a, 0xa6, 0x40, 0x12, 0xc8, - 0x28, 0xa7, 0x9b, 0x07, 0xd0, 0x79, 0x99, 0x26, 0xc7, 0x94, 0x9d, 0xa6, 0x2c, 0xe1, 0x78, 0x07, - 0x5a, 0x09, 0x8f, 0x99, 0x13, 0x24, 0x3a, 0x1a, 0xac, 0x0e, 0x3b, 0xa3, 0x0d, 0x6b, 0xf9, 0x94, - 0xd7, 0xf2, 0xc2, 0xee, 0x2c, 0xa6, 0x46, 0x49, 0xa2, 0x65, 0x60, 0xf6, 0xa0, 0x9b, 0xe7, 0x49, - 0x4e, 0xa2, 0x30, 0x61, 0xe6, 0x77, 0x04, 0xdd, 0x57, 0x29, 0x8b, 0xcf, 0xca, 0xcc, 0xff, 0x43, - 0xf3, 0x54, 0x60, 0x1d, 0x0d, 0xd0, 0x50, 0xa3, 0x39, 0x10, 0xa7, 0xbe, 0x17, 0x78, 0x5c, 0x5f, - 0x19, 0xa0, 0xe1, 0x1a, 0xcd, 0x01, 0xde, 0x85, 0x66, 0xc2, 0x9d, 0x98, 0xeb, 0xab, 0x03, 0x34, - 0xec, 0x8c, 0xfa, 0x56, 0x6e, 0xda, 0x2a, 0xad, 0x58, 0x6f, 0x4a, 0xd3, 0x76, 0xfb, 0x62, 0x6a, - 0x28, 0xe7, 0x5f, 0x0c, 0x44, 0x73, 0x09, 0x7e, 0x00, 0xab, 0x2c, 0x3c, 0xd2, 0x1b, 0xff, 0xa0, - 0x14, 0x02, 0x7c, 0x17, 0xb4, 0x23, 0x2f, 0x66, 0x87, 0xdc, 0x8b, 0x42, 0xbd, 0x39, 0x40, 0xc3, - 0xde, 0xe8, 0xbf, 0xca, 0xfb, 0x7e, 0x79, 0x45, 0x2b, 0x96, 0x78, 0x7c, 0xcc, 0x5c, 0xf6, 0x41, - 0x57, 0x73, 0x4b, 0x12, 0x98, 0x0f, 0x61, 0xad, 0x30, 0x9e, 0xb7, 0x02, 0x6f, 0xfd, 0xb5, 0xa7, - 0x55, 0x1b, 0x77, 0xa1, 0xfb, 0xdc, 0x19, 0x33, 0xbf, 0xec, 0x1a, 0x86, 0x46, 0xe8, 0x04, 0xac, - 0x68, 0x9a, 0x8c, 0xf1, 0x26, 0xa8, 0xef, 0x1d, 0x3f, 0x65, 0x89, 0x6c, 0x5a, 0x9b, 0x16, 0xc8, - 0xbc, 0x05, 0x6b, 0x85, 0xb6, 0x28, 0x5c, 0x11, 0x45, 0x5d, 0x6d, 0x49, 0x3c, 0x06, 0x35, 0xaf, - 0x8b, 0x4d, 0x50, 0x7d, 0x21, 0x49, 0xf2, 0x02, 0x36, 0x2c, 0xa6, 0x46, 0x71, 0x42, 0x8b, 0x5f, - 0xbc, 0x0b, 0x2d, 0x16, 0xf2, 0xd8, 0x93, 0xf5, 0xc4, 0xf3, 0xd7, 0xab, 0xe7, 0x3f, 0x09, 0x79, - 0x7c, 0x66, 0xaf, 0x8b, 0x4e, 0x8a, 0xad, 0x28, 0x78, 0xb4, 0x0c, 0xcc, 0x08, 0x9a, 0x92, 0x82, - 0x9f, 0x82, 0xb6, 0x5c, 0x54, 0x59, 0xeb, 0xcf, 0xb3, 0xe9, 0x15, 0x19, 0x57, 0x78, 0x22, 0x27, - 0x54, 0x89, 0xf1, 0x0d, 0x68, 0xf8, 0x5e, 0xc8, 0xa4, 0x77, 0xcd, 0x6e, 0x2f, 0xa6, 0x86, 0xc4, - 0x54, 0x7e, 0xb7, 0x6e, 0x82, 0xb6, 0x1c, 0x15, 0xee, 0x40, 0xeb, 0xe0, 0x05, 0x7d, 0xbb, 0x47, - 0xf7, 0x37, 0x14, 0xdc, 0x85, 0xb6, 0xbd, 0xf7, 0xf8, 0x99, 0x44, 0x68, 0xb4, 0x07, 0xaa, 0x58, - 0x57, 0x16, 0xe3, 0x1d, 0x68, 0x88, 0x08, 0x5f, 0xab, 0x5c, 0xd5, 0xfe, 0x10, 0xfd, 0xcd, 0x5f, - 0x8f, 0x8b, 0xfd, 0x56, 0x46, 0x9f, 0x10, 0xb4, 0xc4, 0xa0, 0x3d, 0x16, 0xe3, 0x47, 0xd0, 0x94, - 0x33, 0xc7, 0x35, 0x7a, 0x7d, 0xfb, 0xfb, 0xd7, 0x7f, 0x3b, 0x2f, 0xf3, 0xdc, 0x41, 0x62, 0xdd, - 0xe5, 0xe0, 0xea, 0xea, 0xfa, 0x16, 0xd4, 0xd5, 0x3f, 0x4d, 0xd8, 0x54, 0xec, 0xfb, 0x93, 0x19, - 0x51, 0x2e, 0x67, 0x44, 0xb9, 0x9a, 0x11, 0xf4, 0x31, 0x23, 0xe8, 0x73, 0x46, 0xd0, 0x45, 0x46, - 0xd0, 0x24, 0x23, 0xe8, 0x6b, 0x46, 0xd0, 0xb7, 0x8c, 0x28, 0x57, 0x19, 0x41, 0xe7, 0x73, 0xa2, - 0x4c, 0xe6, 0x44, 0xb9, 0x9c, 0x13, 0x65, 0xac, 0xca, 0x64, 0xf7, 0x7e, 0x04, 0x00, 0x00, 0xff, - 0xff, 0x47, 0x69, 0x1e, 0x88, 0x68, 0x04, 0x00, 0x00, -} diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index 80eb6328009d1..a297805e02b9d 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -66,6 +66,44 @@ gcs: { bucket_name: $._config.gcs_bucket_name, }, + + index_queries_cache_config: { + memcached: { + batch_size: 100, + parallelism: 100, + }, + + memcached_client: { + host: 'memcached-index-queries.%s.svc.cluster.local' % $._config.namespace, + service: 'memcached-client', + }, + }, + }, + + chunk_store_config: { + chunk_cache_config: { + memcached: { + batch_size: 100, + parallelism: 100, + }, + + memcached_client: { + host: 'memcached.%s.svc.cluster.local' % $._config.namespace, + service: 'memcached-client', + }, + }, + + write_dedupe_cache_config: { + memcached: { + batch_size: 100, + parallelism: 100, + }, + + memcached_client: { + host: 'memcached-index-writes.%s.svc.cluster.local' % $._config.namespace, + service: 'memcached-client', + }, + }, }, schema_config: { diff --git a/production/ksonnet/loki/jsonnetfile.json b/production/ksonnet/loki/jsonnetfile.json index 49ec3785ab436..7f40dd6152e7d 100644 --- a/production/ksonnet/loki/jsonnetfile.json +++ b/production/ksonnet/loki/jsonnetfile.json @@ -19,6 +19,16 @@ } }, "version": "master" + }, + { + "name": "memcached", + "source": { + "git": { + "remote": "https://github.com/grafana/jsonnet-libs", + "subdir": "memcached" + } + }, + "version": "master" } ] } diff --git a/production/ksonnet/loki/loki.libsonnet b/production/ksonnet/loki/loki.libsonnet index 59ee5bf2b20b9..beebd9f780032 100644 --- a/production/ksonnet/loki/loki.libsonnet +++ b/production/ksonnet/loki/loki.libsonnet @@ -4,8 +4,11 @@ (import 'config.libsonnet') + (import 'consul/consul.libsonnet') + -// Cortex services +// Loki services (import 'distributor.libsonnet') + (import 'ingester.libsonnet') + (import 'querier.libsonnet') + -(import 'table-manager.libsonnet') +(import 'table-manager.libsonnet') + + +// Supporting services +(import 'memcached.libsonnet') diff --git a/production/ksonnet/loki/memcached.libsonnet b/production/ksonnet/loki/memcached.libsonnet new file mode 100644 index 0000000000000..8634a398900d2 --- /dev/null +++ b/production/ksonnet/loki/memcached.libsonnet @@ -0,0 +1,21 @@ +local memcached = 'memcached/memcached.libsonnet'; + +memcached { + // Memcached instance used to cache chunks. + memcached_chunks: $.memcached { + name: 'memcached', + max_item_size: '2m', + memory_limit_mb: 4096, + }, + + // Dedicated memcached instance used to temporarily cache index lookups. + memcached_index_queries: $.memcached { + name: 'memcached-index-queries', + max_item_size: '5m', + }, + + // Dedicated memcached instance used to dedupe writes to the index. + memcached_index_writes: $.memcached { + name: 'memcached-index-writes', + }, +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index a1dff2849329f..3356eaa9ccbcf 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log/level" ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -25,6 +24,7 @@ import ( chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/spanlogger" awscommon "github.com/weaveworks/common/aws" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" @@ -79,12 +79,6 @@ var ( // metric names. Buckets: prometheus.ExponentialBuckets(1, 4, 6), }) - dynamoQueryRetryCount = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "dynamo_query_retry_count", - Help: "Number of retries per DynamoDB operation.", - Buckets: prometheus.LinearBuckets(0, 1, 21), - }, []string{"operation"}) ) func init() { @@ -92,7 +86,6 @@ func init() { prometheus.MustRegister(dynamoConsumedCapacity) prometheus.MustRegister(dynamoFailures) prometheus.MustRegister(dynamoQueryPagesCount) - prometheus.MustRegister(dynamoQueryRetryCount) prometheus.MustRegister(dynamoDroppedRequests) } @@ -212,9 +205,6 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write unprocessed := dynamoDBWriteBatch{} backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) - defer func() { - dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries())) - }() for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { requests := dynamoDBWriteBatch{} @@ -354,9 +344,6 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery func (a dynamoDBStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest, hashValue string, pageCount int) (*dynamoDBReadResponse, error) { backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) - defer func() { - dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) - }() var err error for backoff.Ongoing() { @@ -464,9 +451,9 @@ type chunksPlusError struct { // GetChunks implements chunk.ObjectClient. func (a dynamoDBStorageClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { - sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks.DynamoDB") - defer sp.Finish() - sp.LogFields(otlog.Int("chunks requested", len(chunks))) + log, ctx := spanlogger.New(ctx, "GetChunks.DynamoDB", ot.Tag{Key: "numChunks", Value: len(chunks)}) + defer log.Span.Finish() + level.Debug(log).Log("chunks requested", len(chunks)) dynamoDBChunks := chunks var err error @@ -499,13 +486,10 @@ func (a dynamoDBStorageClient) GetChunks(ctx context.Context, chunks []chunk.Chu } finalChunks = append(finalChunks, in.chunks...) } - sp.LogFields(otlog.Int("chunks fetched", len(finalChunks))) - if err != nil { - sp.LogFields(otlog.String("error", err.Error())) - } + level.Debug(log).Log("chunks fetched", len(finalChunks)) // Return any chunks we did receive: a partial result may be useful - return finalChunks, err + return finalChunks, log.Error(err) } // As we're re-using the DynamoDB schema from the index for the chunk tables, @@ -516,8 +500,8 @@ var placeholder = []byte{'c'} // Structure is identical to BatchWrite(), but operating on different datatypes // so cannot share implementation. If you fix a bug here fix it there too. func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { - sp, ctx := ot.StartSpanFromContext(ctx, "getDynamoDBChunks", ot.Tag{Key: "numChunks", Value: len(chunks)}) - defer sp.Finish() + log, ctx := spanlogger.New(ctx, "getDynamoDBChunks", ot.Tag{Key: "numChunks", Value: len(chunks)}) + defer log.Span.Finish() outstanding := dynamoDBReadRequest{} chunksByKey := map[string]chunk.Chunk{} for _, chunk := range chunks { @@ -525,7 +509,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c chunksByKey[key] = chunk tableName, err := a.schemaCfg.ChunkTableFor(chunk.From) if err != nil { - return nil, err + return nil, log.Error(err) } outstanding.Add(tableName, key, placeholder) } @@ -533,9 +517,6 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c result := []chunk.Chunk{} unprocessed := dynamoDBReadRequest{} backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) - defer func() { - dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries())) - }() for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { requests := dynamoDBReadRequest{} @@ -570,9 +551,8 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c continue } else if ok && awsErr.Code() == validationException { // this read will never work, so the only option is to drop the offending request and continue. - level.Warn(util.Logger).Log("msg", "Error while fetching data from Dynamo", "err", awsErr) - level.Debug(util.Logger).Log("msg", "Dropped request details", "requests", requests) - util.Event().Log("msg", "ValidationException", "requests", requests) + level.Warn(log).Log("msg", "Error while fetching data from Dynamo", "err", awsErr) + level.Debug(log).Log("msg", "Dropped request details", "requests", requests) // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context // to determine if a request was dropped (or not) for tableName := range requests { @@ -587,7 +567,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c processedChunks, err := processChunkResponse(response, chunksByKey) if err != nil { - return nil, err + return nil, log.Error(err) } result = append(result, processedChunks...) @@ -601,7 +581,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { // Return the chunks we did fetch, because partial results may be useful - return result, fmt.Errorf("failed to query chunks, %d values remaining: %s", valuesLeft, backoff.Err()) + return result, log.Error(fmt.Errorf("failed to query chunks, %d values remaining: %s", valuesLeft, backoff.Err())) } return result, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go index 6b552047c03aa..ea589630725a1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/background.go @@ -26,8 +26,8 @@ var ( // BackgroundConfig is config for a Background Cache. type BackgroundConfig struct { - WriteBackGoroutines int - WriteBackBuffer int + WriteBackGoroutines int `yaml:"writeback_goroutines,omitempty"` + WriteBackBuffer int `yaml:"writeback_buffer,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go index 62ef40958da93..8dbe001be03c9 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/cache.go @@ -15,19 +15,19 @@ type Cache interface { // Config for building Caches. type Config struct { - EnableDiskcache bool - EnableFifoCache bool + EnableDiskcache bool `yaml:"enable_diskcache,omitempty"` + EnableFifoCache bool `yaml:"enable_fifocache,omitempty"` - DefaultValidity time.Duration + DefaultValidity time.Duration `yaml:"defaul_validity,omitempty"` - background BackgroundConfig - memcache MemcachedConfig - memcacheClient MemcachedClientConfig - diskcache DiskcacheConfig - fifocache FifoCacheConfig + Background BackgroundConfig `yaml:"background,omitempty"` + Memcache MemcachedConfig `yaml:"memcached,omitempty"` + MemcacheClient MemcachedClientConfig `yaml:"memcached_client,omitempty"` + Diskcache DiskcacheConfig `yaml:"diskcache,omitempty"` + Fifocache FifoCacheConfig `yaml:"fifocache,omitempty"` // This is to name the cache metrics properly. - prefix string + Prefix string `yaml:"prefix,omitempty"` // For tests to inject specific implementations. Cache Cache @@ -35,17 +35,17 @@ type Config struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { - cfg.background.RegisterFlagsWithPrefix(prefix, description, f) - cfg.memcache.RegisterFlagsWithPrefix(prefix, description, f) - cfg.memcacheClient.RegisterFlagsWithPrefix(prefix, description, f) - cfg.diskcache.RegisterFlagsWithPrefix(prefix, description, f) - cfg.fifocache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Background.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Memcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Diskcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) f.BoolVar(&cfg.EnableDiskcache, prefix+"cache.enable-diskcache", false, description+"Enable on-disk cache.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", 0, description+"The default validity of entries for caches unless overridden.") - cfg.prefix = prefix + cfg.Prefix = prefix } // New creates a new Cache using Config. @@ -57,39 +57,39 @@ func New(cfg Config) (Cache, error) { caches := []Cache{} if cfg.EnableFifoCache { - if cfg.fifocache.Validity == 0 && cfg.DefaultValidity != 0 { - cfg.fifocache.Validity = cfg.DefaultValidity + if cfg.Fifocache.Validity == 0 && cfg.DefaultValidity != 0 { + cfg.Fifocache.Validity = cfg.DefaultValidity } - cache := NewFifoCache(cfg.prefix+"fifocache", cfg.fifocache) - caches = append(caches, Instrument(cfg.prefix+"fifocache", cache)) + cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache) + caches = append(caches, Instrument(cfg.Prefix+"fifocache", cache)) } if cfg.EnableDiskcache { - cache, err := NewDiskcache(cfg.diskcache) + cache, err := NewDiskcache(cfg.Diskcache) if err != nil { return nil, err } - cacheName := cfg.prefix + "diskcache" - caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache))) + cacheName := cfg.Prefix + "diskcache" + caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache))) } - if cfg.memcacheClient.Host != "" { - if cfg.memcache.Expiration == 0 && cfg.DefaultValidity != 0 { - cfg.memcache.Expiration = cfg.DefaultValidity + if cfg.MemcacheClient.Host != "" { + if cfg.Memcache.Expiration == 0 && cfg.DefaultValidity != 0 { + cfg.Memcache.Expiration = cfg.DefaultValidity } - client := NewMemcachedClient(cfg.memcacheClient) - cache := NewMemcached(cfg.memcache, client, cfg.prefix) + client := NewMemcachedClient(cfg.MemcacheClient) + cache := NewMemcached(cfg.Memcache, client, cfg.Prefix) - cacheName := cfg.prefix + "memcache" - caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache))) + cacheName := cfg.Prefix + "memcache" + caches = append(caches, NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache))) } cache := NewTiered(caches) if len(caches) > 1 { - cache = Instrument(cfg.prefix+"tiered", cache) + cache = Instrument(cfg.Prefix+"tiered", cache) } return cache, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go index 6463827f73d6e..28e3ac4f72d20 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/diskcache.go @@ -54,8 +54,8 @@ const ( // DiskcacheConfig for the Disk cache. type DiskcacheConfig struct { - Path string - Size int + Path string `yaml:"path,omitempty"` + Size int `yaml:"size,omitempty"` } // RegisterFlags adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go index 59c7d35b71f10..bd7b2be44a545 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/fifo_cache.go @@ -56,8 +56,8 @@ var ( // FifoCacheConfig holds config for the FifoCache. type FifoCacheConfig struct { - Size int - Validity time.Duration + Size int `yaml:"size,omitempty"` + Validity time.Duration `yaml:"validity,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go index 37836c77df018..dce4652018784 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached.go @@ -40,10 +40,10 @@ func (o observableVecCollector) After(method, statusCode string, start time.Time // MemcachedConfig is config to make a Memcached type MemcachedConfig struct { - Expiration time.Duration + Expiration time.Duration `yaml:"expiration,omitempty"` - BatchSize int - Parallelism int + BatchSize int `yaml:"batch_size,omitempty"` + Parallelism int `yaml:"parallelism,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go index d43fa93713f58..43e40cbbbaaa7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cache/memcached_client.go @@ -33,11 +33,11 @@ type memcachedClient struct { // MemcachedClientConfig defines how a MemcachedClient should be constructed. type MemcachedClientConfig struct { - Host string - Service string - Timeout time.Duration - MaxIdleConns int - UpdateInterval time.Duration + Host string `yaml:"host,omitempty"` + Service string `yaml:"service,omitempty"` + Timeout time.Duration `yaml:"timeout,omitempty"` + MaxIdleConns int `yaml:"max_idle_conns,omitempty"` + UpdateInterval time.Duration `yaml:"update_interval,omitempty"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go index 1ce3b1b9ae3e0..009fd8eb3f9e2 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go @@ -269,7 +269,7 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { return err } c.encoded = input - return c.Data.UnmarshalFromBuf(input) + return errors.Wrap(c.Data.UnmarshalFromBuf(input), "when unmarshalling legacy chunk") } // First, calculate the checksum of the chunk and confirm it matches @@ -282,14 +282,14 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { r := bytes.NewReader(input) var metadataLen uint32 if err := binary.Read(r, binary.BigEndian, &metadataLen); err != nil { - return err + return errors.Wrap(err, "when reading metadata length from chunk") } var tempMetadata Chunk decodeContext.reader.Reset(r) json := jsoniter.ConfigFastest err := json.NewDecoder(decodeContext.reader).Decode(&tempMetadata) if err != nil { - return err + return errors.Wrap(err, "when decoding chunk metadata") } if len(input)-r.Len() != int(metadataLen) { return ErrMetadataLength @@ -315,12 +315,12 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { // Finally, unmarshal the actual chunk data. c.Data, err = prom_chunk.NewForEncoding(c.Encoding) if err != nil { - return err + return errors.Wrap(err, "when creating new chunk") } var dataLen uint32 if err := binary.Read(r, binary.BigEndian, &dataLen); err != nil { - return err + return errors.Wrap(err, "when reading data length from chunk") } c.encoded = input diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index b3a82eb960ca6..836c5d4872f99 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -2,6 +2,7 @@ package chunk import ( "context" + "errors" "flag" "fmt" "net/http" @@ -10,7 +11,6 @@ import ( "time" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -56,11 +56,11 @@ func init() { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { - ChunkCacheConfig cache.Config - WriteDedupeCacheConfig cache.Config + ChunkCacheConfig cache.Config `yaml:"chunk_cache_config,omitempty"` + WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config,omitempty"` - MinChunkAge time.Duration - CacheLookupsOlderThan time.Duration + MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"` + CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -72,8 +72,8 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.") // Deprecated. - flagext.DeprecatedFlag(f, "store.cardinality-cache-size", "DEPRECATED. Use store.index-cache-size.enable-fifocache and store.cardinality-cache.fifocache.size instead.") - flagext.DeprecatedFlag(f, "store.cardinality-cache-validity", "DEPRECATED. Use store.index-cache-size.enable-fifocache and store.cardinality-cache.fifocache.duration instead.") + flagext.DeprecatedFlag(f, "store.cardinality-cache-size", "DEPRECATED. Use store.index-cache-read.enable-fifocache and store.index-cache-read.fifocache.size instead.") + flagext.DeprecatedFlag(f, "store.cardinality-cache-validity", "DEPRECATED. Use store.index-cache-read.enable-fifocache and store.index-cache-read.fifocache.duration instead.") } // store implements Store @@ -121,21 +121,16 @@ func (c *store) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return err - } - chunks := []Chunk{chunk} - err = c.storage.PutChunks(ctx, chunks) + err := c.storage.PutChunks(ctx, chunks) if err != nil { return err } c.writeBackCache(ctx, chunks) - writeReqs, err := c.calculateIndexEntries(userID, from, through, chunk) + writeReqs, err := c.calculateIndexEntries(chunk.UserID, from, through, chunk) if err != nil { return err } @@ -193,22 +188,64 @@ func (c *store) GetChunkRefs(ctx context.Context, from, through model.Time, allM return nil, nil, errors.New("not implemented") } -func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) { - log, ctx := spanlogger.New(ctx, "store.validateQuery") +// LabelValuesForMetricName retrieves all label values for a single label name and metric name. +func (c *store) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName, labelName string) ([]string, error) { + log, ctx := spanlogger.New(ctx, "ChunkStore.LabelValues") + defer log.Span.Finish() + level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "labelName", labelName) + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + shortcut, err := c.validateQueryTimeRange(ctx, from, &through) + if err != nil { + return nil, err + } else if shortcut { + return nil, nil + } + + queries, err := c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(labelName)) + if err != nil { + return nil, err + } + + entries, err := c.lookupEntriesByQueries(ctx, queries) + if err != nil { + return nil, err + } + + var result []string + for _, entry := range entries { + _, labelValue, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + if err != nil { + return nil, err + } + result = append(result, string(labelValue)) + } + + sort.Strings(result) + result = uniqueStrings(result) + return result, nil +} + +func (c *store) validateQueryTimeRange(ctx context.Context, from model.Time, through *model.Time) (bool, error) { + log, ctx := spanlogger.New(ctx, "store.validateQueryTimeRange") defer log.Span.Finish() if *through < from { - return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) + return false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) } userID, err := user.ExtractOrgID(ctx) if err != nil { - return "", nil, false, err + return false, err } maxQueryLength := c.limits.MaxQueryLength(userID) if maxQueryLength > 0 && (*through).Sub(from) > maxQueryLength { - return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength) + return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(from), maxQueryLength) } now := model.Now() @@ -216,12 +253,12 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod if from.After(now) { // time-span start is in future ... regard as legal level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) - return "", nil, true, nil + return true, nil } if from.After(now.Add(-c.cfg.MinChunkAge)) { // no data relevant to this query will have arrived at the store yet - return "", nil, true, nil + return true, nil } if through.After(now.Add(5 * time.Minute)) { @@ -230,6 +267,21 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod *through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } + return false, nil +} + +func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) { + log, ctx := spanlogger.New(ctx, "store.validateQuery") + defer log.Span.Finish() + + shortcut, err := c.validateQueryTimeRange(ctx, from, through) + if err != nil { + return "", nil, false, err + } + if shortcut { + return "", nil, true, nil + } + // Check there is a metric name matcher of type equal, metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers) if !ok || metricNameMatcher.Type != labels.MatchEqual { @@ -257,7 +309,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim level.Debug(log).Log("Chunks in index", len(chunks)) // Filter out chunks that are not in the selected time range. - filtered, keys := filterChunksByTime(from, through, chunks) + filtered := filterChunksByTime(from, through, chunks) level.Debug(log).Log("Chunks post filtering", len(chunks)) maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) @@ -268,6 +320,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim } // Now fetch the actual chunk data from Memcache / S3 + keys := keysFromChunks(filtered) allChunks, err := c.FetchChunks(ctx, filtered, keys) if err != nil { return nil, promql.ErrStorage{Err: err} @@ -307,7 +360,7 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode } level.Debug(log).Log("chunkIDs", len(chunkIDs)) - return c.convertChunkIDsToChunks(ctx, chunkIDs) + return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) } // Otherwise get chunks which include other matchers @@ -369,7 +422,7 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode level.Debug(log).Log("msg", "post intersection", "chunkIDs", len(chunkIDs)) // Convert IndexEntry's into chunks - return c.convertChunkIDsToChunks(ctx, chunkIDs) + return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { @@ -416,12 +469,7 @@ func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat return result, nil } -func (c *store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - +func (c *store) convertChunkIDsToChunks(ctx context.Context, userID string, chunkIDs []string) ([]Chunk, error) { chunkSet := make([]Chunk, 0, len(chunkIDs)) for _, chunkID := range chunkIDs { chunk, err := ParseExternalKey(userID, chunkID) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go index b61a2eaa60545..28f323a427749 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go @@ -16,17 +16,24 @@ import ( const chunkDecodeParallelism = 16 -func filterChunksByTime(from, through model.Time, chunks []Chunk) ([]Chunk, []string) { +func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk { filtered := make([]Chunk, 0, len(chunks)) - keys := make([]string, 0, len(chunks)) for _, chunk := range chunks { if chunk.Through < from || through < chunk.From { continue } filtered = append(filtered, chunk) - keys = append(keys, chunk.ExternalKey()) } - return filtered, keys + return filtered +} + +func keysFromChunks(chunks []Chunk) []string { + keys := make([]string, 0, len(chunks)) + for _, chk := range chunks { + keys = append(keys, chk.ExternalKey()) + } + + return keys } func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go index bd3d21612f1b8..762aa424e3625 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go @@ -15,7 +15,10 @@ type Store interface { Put(ctx context.Context, chunks []Chunk) error PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) + // GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk), + // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) + LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) Stop() } @@ -89,6 +92,20 @@ func (c compositeStore) Get(ctx context.Context, from, through model.Time, match return results, err } +// LabelValuesForMetricName retrieves all label values for a single label name and metric name. +func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) { + var result []string + err := c.forStores(from, through, func(from, through model.Time, store Store) error { + labelValues, err := store.LabelValuesForMetricName(ctx, from, through, metricName, labelName) + if err != nil { + return err + } + result = append(result, labelValues...) + return nil + }) + return result, err +} + func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) { chunkIDs := [][]Chunk{} fetchers := []*Fetcher{} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go index 05417d4650556..0dfe256d8af21 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go @@ -46,9 +46,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.") cfg.GRPCClientConfig.RegisterFlags("bigtable", f) - - // Deprecated. - f.Int("bigtable.max-recv-msg-size", 100<<20, "DEPRECATED. Bigtable grpc max receive message size.") } // storageClientColumnKey implements chunk.storageClient for GCP. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go index a87cd10b3dffb..99b211cdf0aca 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go @@ -33,9 +33,9 @@ var ( Name: "gcs_request_duration_seconds", Help: "Time spent doing GCS requests.", - // Bigtable latency seems to range from a few ms to a few hundred ms and is - // important. So use 6 buckets from 1ms to 1s. - Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + // GCS latency seems to range from a few ms to a few secs and is + // important. So use 6 buckets from 5ms to 5s. + Buckets: prometheus.ExponentialBuckets(0.005, 4, 6), }, []string{"operation", "status_code"}) ) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go index ec01b8415d00e..aa522284852c0 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go @@ -195,14 +195,6 @@ func (cfg PeriodConfig) createSchema() Schema { return s } -func (cfg *PeriodConfig) tableForBucket(bucketStart int64) string { - if cfg.IndexTables.Period == 0 { - return cfg.IndexTables.Prefix - } - // TODO remove reference to time package here - return cfg.IndexTables.Prefix + strconv.Itoa(int(bucketStart/int64(cfg.IndexTables.Period/time.Second))) -} - // Load the yaml file, or build the config from legacy command-line flags func (cfg *SchemaConfig) Load() error { if len(cfg.Configs) > 0 { @@ -268,7 +260,7 @@ func (cfg *PeriodConfig) hourlyBuckets(from, through model.Time, userID string) result = append(result, Bucket{ from: uint32(relativeFrom), through: uint32(relativeThrough), - tableName: cfg.tableForBucket(i * secondsInHour), + tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInHour)), hashKey: fmt.Sprintf("%s:%d", userID, i), }) } @@ -303,7 +295,7 @@ func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) [ result = append(result, Bucket{ from: uint32(relativeFrom), through: uint32(relativeThrough), - tableName: cfg.tableForBucket(i * secondsInDay), + tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInDay)), hashKey: fmt.Sprintf("%s:d%d", userID, i), }) } @@ -368,8 +360,7 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr } for i := firstTable; i <= lastTable; i++ { table := TableDesc{ - // Name construction needs to be consistent with chunk_store.bigBuckets - Name: cfg.Prefix + strconv.Itoa(int(i)), + Name: cfg.tableForPeriod(i), ProvisionedRead: pCfg.InactiveReadThroughput, ProvisionedWrite: pCfg.InactiveWriteThroughput, UseOnDemandIOMode: pCfg.InactiveThroughputOnDemandMode, @@ -443,9 +434,13 @@ func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) { // TableFor calculates the table shard for a given point in time. func (cfg *PeriodicTableConfig) TableFor(t model.Time) string { - var ( - periodSecs = int64(cfg.Period / time.Second) - table = t.Unix() / periodSecs - ) - return cfg.Prefix + strconv.Itoa(int(table)) + if cfg.Period == 0 { // non-periodic + return cfg.Prefix + } + periodSecs := int64(cfg.Period / time.Second) + return cfg.tableForPeriod(t.Unix() / periodSecs) +} + +func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string { + return cfg.Prefix + strconv.Itoa(int(i)) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 4a0b742e6af28..a7d618c3854bd 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -7,7 +7,6 @@ import ( "net/http" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -22,11 +21,19 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -var ( - // ErrCardinalityExceeded is returned when the user reads a row that - // is too large. - ErrCardinalityExceeded = errors.New("cardinality limit exceeded") +// CardinalityExceededError is returned when the user reads a row that +// is too large. +type CardinalityExceededError struct { + MetricName, LabelName string + Size, Limit int32 +} +func (e CardinalityExceededError) Error() string { + return fmt.Sprintf("cardinality limit exceeded for %s{%s}; %d entries, more than limit of %d", + e.MetricName, e.LabelName, e.Size, e.Limit) +} + +var ( indexLookupsPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_index_lookups_per_query", @@ -104,7 +111,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc return nil, err } - chks, _, err := c.GetChunkRefs(ctx, from, through, allMatchers...) + chks, fetchers, err := c.GetChunkRefs(ctx, from, through, allMatchers...) if err != nil { return nil, err } @@ -115,11 +122,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } chunks := chks[0] - // Filter out chunks that are not in the selected time range. - filtered, keys := filterChunksByTime(from, through, chunks) - level.Debug(log).Log("chunks-post-filtering", len(chunks)) - chunksPerQuery.Observe(float64(len(filtered))) - + fetcher := fetchers[0] // Protect ourselves against OOMing. maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery { @@ -129,7 +132,8 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc } // Now fetch the actual chunk data from Memcache / S3 - allChunks, err := c.FetchChunks(ctx, filtered, keys) + keys := keysFromChunks(chunks) + allChunks, err := fetcher.FetchChunks(ctx, chunks, keys) if err != nil { level.Error(log).Log("msg", "FetchChunks", "err", err) return nil, err @@ -144,6 +148,11 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time log, ctx := spanlogger.New(ctx, "SeriesStore.GetChunkRefs") defer log.Span.Finish() + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, nil, err + } + // Validate the query is within reasonable bounds. metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers) if err != nil { @@ -171,12 +180,16 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time } level.Debug(log).Log("chunk-ids", len(chunkIDs)) - chunks, err := c.convertChunkIDsToChunks(ctx, chunkIDs) + chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs) if err != nil { level.Error(log).Log("op", "convertChunkIDsToChunks", "err", err) return nil, nil, err } + chunks = filterChunksByTime(from, through, chunks) + level.Debug(log).Log("chunks-post-filtering", len(chunks)) + chunksPerQuery.Observe(float64(len(chunks))) + return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil } @@ -215,6 +228,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from var preIntersectionCount int var lastErr error var cardinalityExceededErrors int + var cardinalityExceededError CardinalityExceededError for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingIDs: @@ -229,8 +243,9 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // series and the other returns only 10 (a few), we don't lookup the first one at all. // We just manually filter through the 10 series again using "filterChunksByMatchers", // saving us from looking up and intersecting a lot of series. - if err == ErrCardinalityExceeded { + if e, ok := err.(CardinalityExceededError); ok { cardinalityExceededErrors++ + cardinalityExceededError = e } else { lastErr = err } @@ -239,7 +254,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // But if every single matcher returns a lot of series, then it makes sense to abort the query. if cardinalityExceededErrors == len(matchers) { - return nil, ErrCardinalityExceeded + return nil, cardinalityExceededError } else if lastErr != nil { return nil, lastErr } @@ -260,11 +275,14 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, } var queries []IndexQuery + var labelName string if matcher == nil { queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName)) } else if matcher.Type != labels.MatchEqual { + labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name)) } else { + labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value)) } if err != nil { @@ -273,7 +291,11 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) - if err != nil { + if e, ok := err.(CardinalityExceededError); ok { + e.MetricName = metricName + e.LabelName = labelName + return nil, e + } else if err != nil { return nil, err } level.Debug(log).Log("entries", len(entries)) @@ -329,21 +351,16 @@ func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error { // PutOne implements ChunkStore func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return err - } - chunks := []Chunk{chunk} - err = c.storage.PutChunks(ctx, chunks) + err := c.storage.PutChunks(ctx, chunks) if err != nil { return err } c.writeBackCache(ctx, chunks) - writeReqs, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunk) + writeReqs, keysToCache, err := c.calculateIndexEntries(from, through, chunk) if err != nil { return err } @@ -358,7 +375,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun } // calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. -func (c *seriesStore) calculateIndexEntries(userID string, from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { +func (c *seriesStore) calculateIndexEntries(from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { seenIndexEntries := map[string]struct{}{} entries := []IndexEntry{} keysToCache := []string{} @@ -368,7 +385,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T return nil, nil, err } - keys := c.schema.GetLabelEntryCacheKeys(from, through, userID, chunk.Metric) + keys := c.schema.GetLabelEntryCacheKeys(from, through, chunk.UserID, chunk.Metric) cacheKeys := make([]string, 0, len(keys)) // Keys which translate to the strings stored in the cache. for _, key := range keys { @@ -379,7 +396,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T _, _, missing := c.writeDedupeCache.Fetch(context.Background(), cacheKeys) if len(missing) != 0 { - labelEntries, err := c.schema.GetLabelWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + labelEntries, err := c.schema.GetLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) if err != nil { return nil, nil, err } @@ -388,7 +405,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T keysToCache = missing } - chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) if err != nil { return nil, nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go index 1469e92d75ed5..47e90fe44e572 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_fixtures.go @@ -40,5 +40,6 @@ var Fixtures = []testutils.Fixture{ func defaultLimits() (*validation.Overrides, error) { var defaults validation.Limits flagext.DefaultValues(&defaults) + defaults.CardinalityLimit = 5 return validation.NewOverrides(defaults) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go index c4df850b885b0..5513a5d6e83db 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go @@ -88,7 +88,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind batches, misses := s.cacheFetch(ctx, keys) for _, batch := range batches { if cardinalityLimit > 0 && batch.Cardinality > cardinalityLimit { - return chunk.ErrCardinalityExceeded + return chunk.CardinalityExceededError{ + Size: batch.Cardinality, + Limit: cardinalityLimit, + } } queries := queriesByKey[batch.Key] @@ -156,7 +159,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind if cardinalityLimit > 0 && cardinality > cardinalityLimit { batch.Cardinality = cardinality batch.Entries = nil - cardinalityErr = chunk.ErrCardinalityExceeded + cardinalityErr = chunk.CardinalityExceededError{ + Size: cardinality, + Limit: cardinalityLimit, + } } keys = append(keys, key) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go index a0fd1b41406ac..30d3cd1764810 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go @@ -28,11 +28,9 @@ type Config struct { BoltDBConfig local.BoltDBConfig `yaml:"boltdb"` FSConfig local.FSConfig `yaml:"filesystem"` - IndexCacheSize int IndexCacheValidity time.Duration - memcacheClient cache.MemcachedClientConfig - indexQueriesCacheConfig cache.Config + IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config,omitempty"` } // RegisterFlags adds the flags required to configure this flag set. @@ -44,43 +42,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.BoltDBConfig.RegisterFlags(f) cfg.FSConfig.RegisterFlags(f) - // Deprecated flags!! - f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Deprecated: Use -store.index-cache-read.*; Size of in-memory index cache, 0 to disable.") - cfg.memcacheClient.RegisterFlagsWithPrefix("index.", "Deprecated: Use -store.index-cache-read.*;", f) - - cfg.indexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f) + cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f) f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.") } // NewStore makes the storage clients based on the configuration. func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (chunk.Store, error) { - var err error - - // Building up from deprecated flags. - var caches []cache.Cache - if cfg.IndexCacheSize > 0 { - fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize})) - caches = append(caches, fifocache) - } - if cfg.memcacheClient.Host != "" { - client := cache.NewMemcachedClient(cfg.memcacheClient) - memcache := cache.Instrument("memcache-index", cache.NewMemcached(cache.MemcachedConfig{ - Expiration: cfg.IndexCacheValidity, - }, client, "memcache-index")) - caches = append(caches, cache.NewBackground("memcache-index", cache.BackgroundConfig{ - WriteBackGoroutines: 10, - WriteBackBuffer: 100, - }, memcache)) - } - - var tieredCache cache.Cache - if len(caches) > 0 { - tieredCache = cache.NewTiered(caches) - } else { - tieredCache, err = cache.New(cfg.indexQueriesCacheConfig) - if err != nil { - return nil, err - } + tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig) + if err != nil { + return nil, err } // Cache is shared by multiple stores, which means they will try and Stop diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go index 4c99a77fc9abc..b32034001ffee 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go @@ -110,48 +110,6 @@ func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) { f.Int64Var(&cfg.InactiveReadScaleLastN, argPrefix+".inactive-read-throughput.scale-last-n", 4, "Number of last inactive tables to enable read autoscale.") } -// Tags is a string-string map that implements flag.Value. -type Tags map[string]string - -// String implements flag.Value -func (ts Tags) String() string { - if ts == nil { - return "" - } - - return fmt.Sprintf("%v", map[string]string(ts)) -} - -// Set implements flag.Value -func (ts *Tags) Set(s string) error { - if *ts == nil { - *ts = map[string]string{} - } - - parts := strings.SplitN(s, "=", 2) - if len(parts) != 2 { - return fmt.Errorf("tag must of the format key=value") - } - (*ts)[parts[0]] = parts[1] - return nil -} - -// Equals returns true is other matches ts. -func (ts Tags) Equals(other Tags) bool { - if len(ts) != len(other) { - return false - } - - for k, v1 := range ts { - v2, ok := other[k] - if !ok || v1 != v2 { - return false - } - } - - return true -} - // TableManager creates and manages the provisioned throughput on DynamoDB tables type TableManager struct { client TableClient @@ -333,8 +291,12 @@ func (m *TableManager) partitionTables(ctx context.Context, descriptions []Table // Ensure we only delete tables which have a prefix managed by Cortex. tablePrefixes := map[string]struct{}{} for _, cfg := range m.schemaCfg.Configs { - tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} - tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + if cfg.IndexTables.Prefix != "" { + tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} + } + if cfg.ChunkTables.Prefix != "" { + tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + } } for existingTable := range existingTables { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/tags.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/tags.go new file mode 100644 index 0000000000000..b51849ceb2782 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/tags.go @@ -0,0 +1,58 @@ +package chunk + +import ( + "fmt" + "strings" +) + +// Tags is a string-string map that implements flag.Value. +type Tags map[string]string + +// String implements flag.Value +func (ts Tags) String() string { + if ts == nil { + return "" + } + + return fmt.Sprintf("%v", map[string]string(ts)) +} + +// Set implements flag.Value +func (ts *Tags) Set(s string) error { + if *ts == nil { + *ts = map[string]string{} + } + + parts := strings.SplitN(s, "=", 2) + if len(parts) != 2 { + return fmt.Errorf("tag must of the format key=value") + } + (*ts)[parts[0]] = parts[1] + return nil +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (ts *Tags) UnmarshalYAML(unmarshal func(interface{}) error) error { + var m map[string]string + if err := unmarshal(&m); err != nil { + return err + } + *ts = Tags(m) + return nil +} + +// Equals returns true is other matches ts. +func (ts Tags) Equals(other Tags) bool { + if len(ts) != len(other) { + return false + } + + for k, v1 := range ts { + v2, ok := other[k] + if !ok || v1 != v2 { + return false + } + } + + return true +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go index 54679185f1c54..dc30f7ff540e6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go @@ -75,9 +75,4 @@ type Config struct { // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GRPCClientConfig.RegisterFlags("ingester.client", f) - - // Deprecated. - f.Int("ingester.client.max-recv-message-size", 64*1024*1024, "DEPRECATED. Maximum message size, in bytes, this client will receive.") - f.Bool("ingester.client.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters.") - f.Bool("distributor.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead") } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go b/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go index 9d470b5051dcb..5d798612ce4c1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/flagext/url.go @@ -31,11 +31,5 @@ func (v *URLValue) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := unmarshal(&s); err != nil { return err } - - u, err := url.Parse(s) - if err != nil { - return err - } - v.URL = u - return nil + return v.Set(s) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go b/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go index 12326db9b2d83..61d825f8a1bf8 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/spanlogger/spanlogger.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" "github.com/cortexproject/cortex/pkg/util" @@ -41,3 +42,13 @@ func (s *SpanLogger) Log(kvps ...interface{}) error { s.Span.LogFields(fields...) return nil } + +// Error sets error flag and logs the error, if non-nil. Returns the err passed in. +func (s *SpanLogger) Error(err error) error { + if err == nil { + return nil + } + ext.Error.Set(s.Span, true) + s.Span.LogFields(otlog.Error(err)) + return err +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go index a2553f7de813a..7969aed12fe53 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go @@ -18,6 +18,10 @@ var overridesReloadSuccess = promauto.NewGauge(prometheus.GaugeOpts{ Help: "Whether the last overrides reload attempt was successful.", }) +func init() { + overridesReloadSuccess.Set(1) // Default to 1 +} + // When we load YAML from disk, we want the various per-customer limits // to default to any values specified on the command line, not default // command line values. This global contains those values. I (Tom) cannot