Skip to content

Commit

Permalink
feat(prometheus): update prometheus remote protocol
Browse files Browse the repository at this point in the history
Fetched up-to-date protocol from prometheus project
  • Loading branch information
foobar committed Jul 7, 2020
1 parent 3f3b7b5 commit fc6edb1
Show file tree
Hide file tree
Showing 9 changed files with 731 additions and 2,028 deletions.
20 changes: 5 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/influxdata/influxdb
go 1.13

require (
cloud.google.com/go/bigtable v1.2.0 // indirect
collectd.org v0.3.0
github.com/BurntSushi/toml v0.3.1
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db
Expand All @@ -13,8 +12,6 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd // indirect
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.5.0
Expand All @@ -24,31 +21,24 @@ require (
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jsternberg/zap-logfmt v1.2.0
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef
github.com/klauspost/compress v1.4.0 // indirect
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5 // indirect
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada
github.com/mattn/go-isatty v0.0.12
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect
github.com/opentracing/opentracing-go v1.1.0
github.com/paulbellamy/ratecounter v0.2.0
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52
github.com/segmentio/kafka-go v0.2.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spf13/cast v1.3.0
github.com/tinylib/msgp v1.1.0
github.com/willf/bitset v1.1.9 // indirect
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6
go.uber.org/zap v1.14.1
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82
golang.org/x/crypto v0.0.0-20200422194213-44a606286825
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gonum.org/v1/gonum v0.6.0 // indirect
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
google.golang.org/grpc v1.29.1
)
669 changes: 607 additions & 62 deletions go.sum

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions prometheus/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/services/storage"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/prometheus/prometheus/prompb"
)

const (
Expand Down Expand Up @@ -45,7 +45,7 @@ func (e DroppedValuesError) Error() string {

// WriteRequestToPoints converts a Prometheus remote write request of time series and their
// samples into Points that can be written into Influx
func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
func WriteRequestToPoints(req *prompb.WriteRequest) ([]models.Point, error) {
var maxPoints int
for _, ts := range req.Timeseries {
maxPoints += len(ts.Samples)
Expand Down Expand Up @@ -79,7 +79,7 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
}

// convert and append
t := time.Unix(0, s.TimestampMs*int64(time.Millisecond))
t := time.Unix(0, s.Timestamp*int64(time.Millisecond))
fields := map[string]interface{}{fieldName: s.Value}
p, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
if err != nil {
Expand All @@ -97,7 +97,7 @@ func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {

// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the
// new storage API that IFQL uses.
func ReadRequestToInfluxStorageRequest(req *remote.ReadRequest, db, rp string) (*datatypes.ReadFilterRequest, error) {
func ReadRequestToInfluxStorageRequest(req *prompb.ReadRequest, db, rp string) (*datatypes.ReadFilterRequest, error) {
if len(req.Queries) != 1 {
return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func RemoveInfluxSystemTags(tags models.Tags) models.Tags {
// predicateFromMatchers takes Prometheus label matchers and converts them to a storage
// predicate that works with the schema that is written in, which assumes a single field
// named value
func predicateFromMatchers(matchers []*remote.LabelMatcher) (*datatypes.Predicate, error) {
func predicateFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Predicate, error) {
left, err := nodeFromMatchers(matchers)
if err != nil {
return nil, err
Expand Down Expand Up @@ -182,7 +182,7 @@ func fieldNode() *datatypes.Node {
}
}

func nodeFromMatchers(matchers []*remote.LabelMatcher) (*datatypes.Node, error) {
func nodeFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Node, error) {
if len(matchers) == 0 {
return nil, errors.New("expected matcher")
} else if len(matchers) == 1 {
Expand All @@ -207,16 +207,16 @@ func nodeFromMatchers(matchers []*remote.LabelMatcher) (*datatypes.Node, error)
}, nil
}

func nodeFromMatcher(m *remote.LabelMatcher) (*datatypes.Node, error) {
func nodeFromMatcher(m *prompb.LabelMatcher) (*datatypes.Node, error) {
var op datatypes.Node_Comparison
switch m.Type {
case remote.MatchType_EQUAL:
case prompb.LabelMatcher_EQ:
op = datatypes.ComparisonEqual
case remote.MatchType_NOT_EQUAL:
case prompb.LabelMatcher_NEQ:
op = datatypes.ComparisonNotEqual
case remote.MatchType_REGEX_MATCH:
case prompb.LabelMatcher_RE:
op = datatypes.ComparisonRegex
case remote.MatchType_REGEX_NO_MATCH:
case prompb.LabelMatcher_NRE:
op = datatypes.ComparisonNotRegex
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
Expand Down Expand Up @@ -261,13 +261,13 @@ func nodeFromMatcher(m *remote.LabelMatcher) (*datatypes.Node, error) {
}

// ModelTagsToLabelPairs converts models.Tags to a slice of Prometheus label pairs
func ModelTagsToLabelPairs(tags models.Tags) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
func ModelTagsToLabelPairs(tags models.Tags) []prompb.Label {
pairs := make([]prompb.Label, 0, len(tags))
for _, t := range tags {
if string(t.Value) == "" {
continue
}
pairs = append(pairs, &remote.LabelPair{
pairs = append(pairs, prompb.Label{
Name: string(t.Key),
Value: string(t.Value),
})
Expand All @@ -276,8 +276,8 @@ func ModelTagsToLabelPairs(tags models.Tags) []*remote.LabelPair {
}

// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
func TagsToLabelPairs(tags map[string]string) []*prompb.Label {
pairs := make([]*prompb.Label, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
Expand All @@ -288,7 +288,7 @@ func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
// to make the result correct.
continue
}
pairs = append(pairs, &remote.LabelPair{
pairs = append(pairs, &prompb.Label{
Name: k,
Value: v,
})
Expand Down
3 changes: 0 additions & 3 deletions prometheus/remote/generate.go

This file was deleted.

Loading

0 comments on commit fc6edb1

Please sign in to comment.