From 4e94071ad7b07ee5c4cad93218f3a9b54466a8f0 Mon Sep 17 00:00:00 2001 From: Steven Barth Date: Fri, 6 Sep 2019 16:07:54 +0200 Subject: [PATCH] Rework NX-OS support, add support for embedded tags --- plugins/inputs/cisco_telemetry_mdt/README.md | 6 +- .../cisco_telemetry_mdt.go | 294 +++++++++++------- .../cisco_telemetry_mdt_test.go | 244 ++++++++++++++- 3 files changed, 405 insertions(+), 139 deletions(-) diff --git a/plugins/inputs/cisco_telemetry_mdt/README.md b/plugins/inputs/cisco_telemetry_mdt/README.md index 7fb049ec9f8de..498cfd9c819cd 100644 --- a/plugins/inputs/cisco_telemetry_mdt/README.md +++ b/plugins/inputs/cisco_telemetry_mdt/README.md @@ -25,9 +25,6 @@ The TCP dialout transport is supported on IOS XR (32-bit and 64-bit) 6.1.x and l # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" - ## Enable support for decoding NX-OS platform-specific telemetry extensions (disable for IOS XR and IOS XE) - # decode_nxos = true - ## Enable TLS client authentication and define allowed CA certificates; grpc ## transport only. # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] @@ -35,6 +32,9 @@ The TCP dialout transport is supported on IOS XR (32-bit and 64-bit) 6.1.x and l ## Define aliases to map telemetry encoding paths to simple measurement names [inputs.cisco_telemetry_mdt.aliases] ifstats = "ietf-interfaces:interfaces-state/interface/statistics" + + ## Define (for certain nested telemetry measurements with embedded tags) which fields are tags + # enbedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"] ``` ### Example Output: diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go index 908df91c0c227..c794e1359760a 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt.go @@ -7,10 +7,13 @@ import ( "io" "log" "net" + "strconv" "strings" "sync" "time" + "github.com/influxdata/telegraf/metric" + dialout "github.com/cisco-ie/nx-telemetry-proto/mdt_dialout" telemetry "github.com/cisco-ie/nx-telemetry-proto/telemetry_bis" "github.com/golang/protobuf/proto" @@ -35,9 +38,9 @@ type CiscoTelemetryMDT struct { // Common configuration Transport string ServiceAddress string `toml:"service_address"` - DecodeNXOS bool `toml:"decode_nxos"` MaxMsgSize int `toml:"max_msg_size"` Aliases map[string]string `toml:"aliases"` + EmbeddedTags []string `toml:"embedded_tags"` // GRPC TLS settings internaltls.ServerConfig @@ -47,9 +50,12 @@ type CiscoTelemetryMDT struct { listener net.Listener // Internal state - aliases map[string]string - acc telegraf.Accumulator - wg sync.WaitGroup + aliases map[string]string + warned map[string]struct{} + extraTags map[string]map[string]struct{} + mutex sync.Mutex + acc telegraf.Accumulator + wg sync.WaitGroup } // Start the Cisco MDT service @@ -62,11 +68,24 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { } // Invert aliases list + c.warned = make(map[string]struct{}) c.aliases = make(map[string]string, len(c.Aliases)) for alias, path := range c.Aliases { c.aliases[path] = alias } + // Fill extra tags + c.extraTags = make(map[string]map[string]struct{}) + for _, tag := range c.EmbeddedTags { + i := strings.LastIndexByte(tag, '/') + if i >= 0 { + if _, hasKey := c.extraTags[tag[0:i]]; !hasKey { + c.extraTags[tag[0:i]] = make(map[string]struct{}) + } + c.extraTags[tag[0:i]][tag[i+1:]] = struct{}{} + } + } + switch c.Transport { case "tcp": // TCP dialout server accept routine @@ -242,23 +261,22 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS // Handle telemetry packet from any transport, decode and add as measurement func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { - telemetry := &telemetry.Telemetry{} - err := proto.Unmarshal(data, telemetry) + msg := &telemetry.Telemetry{} + err := proto.Unmarshal(data, msg) if err != nil { c.acc.AddError(fmt.Errorf("Cisco MDT failed to decode: %v", err)) return } - for _, gpbkv := range telemetry.DataGpbkv { - var fields map[string]interface{} - + grouper := metric.NewSeriesGrouper() + for _, gpbkv := range msg.DataGpbkv { // Produce metadata tags var tags map[string]string // Top-level field may have measurement timestamp, if not use message timestamp measured := gpbkv.Timestamp if measured == 0 { - measured = telemetry.MsgTimestamp + measured = msg.MsgTimestamp } timestamp := time.Unix(int64(measured/1000), int64(measured%1000)*1000000) @@ -267,164 +285,202 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { for _, field := range gpbkv.Fields { switch field.Name { case "keys": - tags = make(map[string]string, len(field.Fields)+2) - tags["source"] = telemetry.GetNodeIdStr() - tags["subscription"] = telemetry.GetSubscriptionIdStr() + tags = make(map[string]string, len(field.Fields)+3) + tags["source"] = msg.GetNodeIdStr() + tags["subscription"] = msg.GetSubscriptionIdStr() + tags["path"] = msg.GetEncodingPath() + for _, subfield := range field.Fields { - c.parseGPBKVField(subfield, "", telemetry.EncodingPath, timestamp, tags, nil) + c.parseKeyField(tags, subfield, "") } case "content": - fields = make(map[string]interface{}, len(field.Fields)) - for _, subfield := range field.Fields { - c.parseGPBKVField(subfield, "", telemetry.EncodingPath, timestamp, tags, fields) + if tags != nil { + for _, subfield := range field.Fields { + c.parseContentField(grouper, subfield, "", msg.EncodingPath, tags, timestamp) + } } default: log.Printf("I! [inputs.cisco_telemetry_mdt]: Unexpected top-level MDT field: %s", field.Name) } } - - // Find best alias for encoding path and emit measurement - if len(fields) > 0 && len(tags) > 0 && len(telemetry.EncodingPath) > 0 { - c.addFieldsWithAlias(telemetry.EncodingPath, fields, tags, timestamp) - } else if !c.DecodeNXOS { - c.acc.AddError(fmt.Errorf("empty encoding path or measurement")) - } } -} -// Add fields doing alias replacement -func (c *CiscoTelemetryMDT) addFieldsWithAlias(path string, fields map[string]interface{}, - tags map[string]string, timestamp time.Time) { - name := path - if alias, ok := c.aliases[name]; ok { - tags["path"] = name - name = alias - } else { - log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", name) + for _, metric := range grouper.Metrics() { + c.acc.AddMetric(metric) } - c.acc.AddFields(name, fields, tags, timestamp) } -// Recursively parse GPBKV field structure into fields or tags -func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, prefix string, - path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) { - localname := strings.Replace(field.Name, "-", "_", -1) - name := localname - if len(name) == 0 { - name = prefix - } else if len(prefix) > 0 { - name = prefix + "/" + localname +func decodeValue(field *telemetry.TelemetryField) interface{} { + switch val := field.ValueByType.(type) { + case *telemetry.TelemetryField_BytesValue: + return val.BytesValue + case *telemetry.TelemetryField_StringValue: + if len(val.StringValue) > 0 { + return val.StringValue + } + case *telemetry.TelemetryField_BoolValue: + return val.BoolValue + case *telemetry.TelemetryField_Uint32Value: + return val.Uint32Value + case *telemetry.TelemetryField_Uint64Value: + return val.Uint64Value + case *telemetry.TelemetryField_Sint32Value: + return val.Sint32Value + case *telemetry.TelemetryField_Sint64Value: + return val.Sint64Value + case *telemetry.TelemetryField_DoubleValue: + return val.DoubleValue + case *telemetry.TelemetryField_FloatValue: + return val.FloatValue } + return nil +} - // Decode Telemetry field value if set - var value interface{} +func decodeTag(field *telemetry.TelemetryField) string { switch val := field.ValueByType.(type) { case *telemetry.TelemetryField_BytesValue: - value = val.BytesValue + return string(val.BytesValue) case *telemetry.TelemetryField_StringValue: - value = val.StringValue + return val.StringValue case *telemetry.TelemetryField_BoolValue: - value = val.BoolValue + if val.BoolValue { + return "true" + } + return "false" case *telemetry.TelemetryField_Uint32Value: - value = val.Uint32Value + return strconv.FormatUint(uint64(val.Uint32Value), 10) case *telemetry.TelemetryField_Uint64Value: - value = val.Uint64Value + return strconv.FormatUint(val.Uint64Value, 10) case *telemetry.TelemetryField_Sint32Value: - value = val.Sint32Value + return strconv.FormatInt(int64(val.Sint32Value), 10) case *telemetry.TelemetryField_Sint64Value: - value = val.Sint64Value + return strconv.FormatInt(val.Sint64Value, 10) case *telemetry.TelemetryField_DoubleValue: - value = val.DoubleValue + return strconv.FormatFloat(val.DoubleValue, 'f', -1, 64) case *telemetry.TelemetryField_FloatValue: - value = val.FloatValue + return strconv.FormatFloat(float64(val.FloatValue), 'f', -1, 32) + default: + return "" } +} - if value != nil { - // Distinguish between tags (keys) and fields (data) to write to - if fields != nil { - fields[name] = value +// Recursively parse tag fields +func (c *CiscoTelemetryMDT) parseKeyField(tags map[string]string, field *telemetry.TelemetryField, prefix string) { + localname := strings.Replace(field.Name, "-", "_", -1) + name := localname + if len(name) == 0 { + name = prefix + } else if len(prefix) > 0 { + name = prefix + "/" + localname + } + + if tag := decodeTag(field); len(tag) > 0 { + localname := strings.Replace(field.Name, "-", "_", -1) + if _, exists := tags[localname]; !exists { // Use short keys whenever possible + tags[localname] = tag } else { - if _, exists := tags[localname]; !exists { // Use short keys whenever possible - tags[localname] = fmt.Sprint(value) - } else { - tags[name] = fmt.Sprint(value) + tags[name] = tag + } + } + + for _, subfield := range field.Fields { + c.parseKeyField(tags, subfield, name) + } +} + +func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, field *telemetry.TelemetryField, prefix string, + path string, tags map[string]string, timestamp time.Time) { + name := strings.Replace(field.Name, "-", "_", -1) + if len(name) == 0 { + name = prefix + } else if len(prefix) > 0 { + name = prefix + "/" + name + } + + extraTags := c.extraTags[path+"/"+name] + + if value := decodeValue(field); value != nil { + // Do alias lookup, to shorten measurement names + measurement := path + if alias, ok := c.aliases[path]; ok { + measurement = alias + } else { + c.mutex.Lock() + if _, haveWarned := c.warned[path]; !haveWarned { + log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", path) + c.warned[path] = struct{}{} } + c.mutex.Unlock() } + + grouper.Add(measurement, tags, timestamp, name, value) } - if fields == nil || !c.DecodeNXOS { + + if len(extraTags) > 0 { for _, subfield := range field.Fields { - c.parseGPBKVField(subfield, name, path, timestamp, tags, fields) + if _, isExtraTag := extraTags[subfield.Name]; isExtraTag { + tags[name+"/"+subfield.Name] = decodeTag(subfield) + } } - } else if c.DecodeNXOS && len(field.Fields) > 0 { // NX-OS extended decoding logic - c.parseNXOSTelemetryStructure(field, prefix, name, path, timestamp, tags, fields) } -} -// Parse extended structure of NX-OS platform telemetry -func (c *CiscoTelemetryMDT) parseNXOSTelemetryStructure(field *telemetry.TelemetryField, prefix string, - name string, path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) { - var attributes, children, rows *telemetry.TelemetryField - - // NX-OS uses certain fieldnames to indicate the structure following + var nxAttributes, nxChildren, nxRows *telemetry.TelemetryField + isNXOS := !strings.ContainsRune(path, ':') // IOS-XR and IOS-XE have a colon in their encoding path, NX-OS does not for _, subfield := range field.Fields { - if subfield.Name == "attributes" && len(subfield.Fields) > 0 { - attributes = subfield - } else if subfield.Name == "children" && len(subfield.Fields) > 0 { - children = subfield - } else if strings.HasPrefix(subfield.Name, "ROW_") { - rows = subfield - } else { // Fallback to regular telemetry decoding - c.parseGPBKVField(subfield, name, path, timestamp, tags, fields) + if isNXOS && subfield.Name == "attributes" && len(subfield.Fields) > 0 { + nxAttributes = subfield.Fields[0] + } else if isNXOS && subfield.Name == "children" && len(subfield.Fields) > 0 { + nxChildren = subfield + } else if isNXOS && strings.HasPrefix(subfield.Name, "ROW_") { + nxRows = subfield + } else if _, isExtraTag := extraTags[subfield.Name]; !isExtraTag { // Regular telemetry decoding + c.parseContentField(grouper, subfield, name, path, tags, timestamp) } } - if attributes != nil { + if nxAttributes != nil { // DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/ - values := make(map[string]interface{}) - for _, subfield := range attributes.Fields { - c.parseGPBKVField(subfield, "", path, timestamp, tags, values) - } - if rn, hasRN := values["rn"]; hasRN { - // Promote the relative name of the entry from a value to a key - tags[prefix] = fmt.Sprint(rn) - delete(values, "rn") - for key, value := range values { - // Work around an issue where a field is returned of type string when empty - // and as a number otherwise causing type confusion, thus remove empty strings - if str, isStr := value.(string); isStr && len(str) == 0 { - delete(values, key) - } + rn := "" + dn := false + + for _, subfield := range nxAttributes.Fields { + if subfield.Name == "rn" { + rn = decodeTag(subfield) + } else if subfield.Name == "dn" { + dn = true } - c.addFieldsWithAlias(path+"/"+prefix, values, tags, timestamp) - } else if _, hasDN := values["dn"]; !hasDN { // Check for distinguished name being present + } + + if len(rn) > 0 { + tags[prefix] = rn + } else if !dn { // Check for distinguished name being present c.acc.AddError(fmt.Errorf("NX-OS decoding failed: missing dn field")) } - if children != nil { + + for _, subfield := range nxAttributes.Fields { + if subfield.Name != "rn" { + c.parseContentField(grouper, subfield, "", path, tags, timestamp) + } + } + + if nxChildren != nil { // This is a nested structure, children will inherit relative name keys of parent - for _, subfield := range children.Fields { - c.parseGPBKVField(subfield, prefix, path, timestamp, tags, fields) + for _, subfield := range nxChildren.Fields { + c.parseContentField(grouper, subfield, prefix, path, tags, timestamp) } } delete(tags, prefix) - } else if rows != nil { + } else if nxRows != nil { // NXAPI structure: https://developer.cisco.com/docs/cisco-nexus-9000-series-nx-api-cli-reference-release-9-2x/ - for _, row := range rows.Fields { - values := make(map[string]interface{}) + for _, row := range nxRows.Fields { for i, subfield := range row.Fields { - c.parseGPBKVField(subfield, "", path, timestamp, tags, values) if i == 0 { // First subfield contains the index, promote it from value to tag - tags[prefix] = fmt.Sprint(values[subfield.Name]) - delete(values, subfield.Name) - } - } - for key, value := range values { - // Work around an issue where a field is returned of type string when empty - // and as a number otherwise causing type confusion, thus remove empty strings - if str, isStr := value.(string); isStr && len(str) == 0 { - delete(values, key) + tags[prefix] = decodeTag(subfield) + } else { + c.parseContentField(grouper, subfield, "", path, tags, timestamp) } } - c.addFieldsWithAlias(path+"/"+prefix, values, tags, timestamp) + delete(tags, prefix) } } } @@ -449,9 +505,6 @@ const sampleConfig = ` ## Address and port to host telemetry listener service_address = ":57000" - ## Enable support for decoding NX-OS platform-specific telemetry extensions (disable for IOS XR and IOS XE) - # decode_nxos = true - ## Enable TLS; grpc transport only. # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" @@ -463,6 +516,9 @@ const sampleConfig = ` ## Define aliases to map telemetry encoding paths to simple measurement names [inputs.cisco_telemetry_mdt.aliases] ifstats = "ietf-interfaces:interfaces-state/interface/statistics" + + ## Define (for certain nested telemetry measurements with embedded tags) which fields are tags + # enbedded_tags = ["Cisco-IOS-XR-qos-ma-oper:qos/interface-table/interface/input/service-policy-names/service-policy-instance/statistics/class-stats/class-name"] ` // SampleConfig of plugin diff --git a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go index d2c686c69d86b..3736a85318e86 100644 --- a/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go +++ b/plugins/inputs/cisco_telemetry_mdt/cisco_telemetry_mdt_test.go @@ -17,23 +17,6 @@ import ( "google.golang.org/grpc" ) -func TestHandleTelemetryEmpty(t *testing.T) { - c := &CiscoTelemetryMDT{Transport: "dummy"} - acc := &testutil.Accumulator{} - c.Start(acc) - - telemetry := &telemetry.Telemetry{ - DataGpbkv: []*telemetry.TelemetryField{ - {}, - }, - } - data, _ := proto.Marshal(telemetry) - - c.handleTelemetry(data) - assert.Contains(t, acc.Errors, errors.New("empty encoding path or measurement")) - assert.Empty(t, acc.Metrics) -} - func TestHandleTelemetryTwoSimple(t *testing.T) { c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"alias": "type:model/some/path"}} acc := &testutil.Accumulator{} @@ -174,6 +157,233 @@ func TestHandleTelemetrySingleNested(t *testing.T) { acc.AssertContainsTaggedFields(t, "nested", fields, tags) } +func TestHandleEmbeddedTags(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"extra": "type:model/extra"}, EmbeddedTags: []string{"type:model/extra/list/name"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "type:model/extra", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Name: "list", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry1"}, + }, + { + Name: "test", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + }, + }, + }, + { + Name: "list", + Fields: []*telemetry.TelemetryField{ + { + Name: "name", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "entry2"}, + }, + { + Name: "test", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags1 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry1"} + fields1 := map[string]interface{}{"list/test": "foo"} + tags2 := map[string]string{"path": "type:model/extra", "foo": "bar", "source": "hostname", "subscription": "subscription", "list/name": "entry2"} + fields2 := map[string]interface{}{"list/test": "bar"} + acc.AssertContainsTaggedFields(t, "extra", fields1, tags1) + acc.AssertContainsTaggedFields(t, "extra", fields2, tags2) +} + +func TestHandleNXAPI(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"nxapi": "show nxapi"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "show nxapi", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "TABLE_nxapi", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "ROW_nxapi", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "index", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i1"}, + }, + { + Name: "value", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + }, + }, + }, + { + Fields: []*telemetry.TelemetryField{ + { + Name: "index", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "i2"}, + }, + { + Name: "value", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags1 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i1", "source": "hostname", "subscription": "subscription"} + fields1 := map[string]interface{}{"value": "foo"} + tags2 := map[string]string{"path": "show nxapi", "foo": "bar", "TABLE_nxapi": "i2", "source": "hostname", "subscription": "subscription"} + fields2 := map[string]interface{}{"value": "bar"} + acc.AssertContainsTaggedFields(t, "nxapi", fields1, tags1) + acc.AssertContainsTaggedFields(t, "nxapi", fields2, tags2) +} + +func TestHandleNXDME(t *testing.T) { + c := &CiscoTelemetryMDT{Transport: "dummy", Aliases: map[string]string{"dme": "sys/dme"}} + acc := &testutil.Accumulator{} + c.Start(acc) + + telemetry := &telemetry.Telemetry{ + MsgTimestamp: 1543236572000, + EncodingPath: "sys/dme", + NodeId: &telemetry.Telemetry_NodeIdStr{NodeIdStr: "hostname"}, + Subscription: &telemetry.Telemetry_SubscriptionIdStr{SubscriptionIdStr: "subscription"}, + DataGpbkv: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "keys", + Fields: []*telemetry.TelemetryField{ + { + Name: "foo", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "bar"}, + }, + }, + }, + { + Name: "content", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "fooEntity", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "attributes", + Fields: []*telemetry.TelemetryField{ + { + Fields: []*telemetry.TelemetryField{ + { + Name: "rn", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "some-rn"}, + }, + { + Name: "value", + ValueByType: &telemetry.TelemetryField_StringValue{StringValue: "foo"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + data, _ := proto.Marshal(telemetry) + + c.handleTelemetry(data) + assert.Empty(t, acc.Errors) + + tags1 := map[string]string{"path": "sys/dme", "foo": "bar", "fooEntity": "some-rn", "source": "hostname", "subscription": "subscription"} + fields1 := map[string]interface{}{"value": "foo"} + acc.AssertContainsTaggedFields(t, "dme", fields1, tags1) +} + func TestTCPDialoutOverflow(t *testing.T) { c := &CiscoTelemetryMDT{Transport: "tcp", ServiceAddress: "127.0.0.1:57000"} acc := &testutil.Accumulator{}