Skip to content

Commit

Permalink
Merge pull request cloudflare#53 from netsampler/bugfix/optiontemplates
Browse files Browse the repository at this point in the history
Bugfix: decoding OptionsTemplateSet
  • Loading branch information
lspgn authored Nov 14, 2021
2 parents bf66556 + 43cf8b5 commit de5e751
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 91 deletions.
112 changes: 35 additions & 77 deletions decoders/netflow/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR
sizeScope := int(optsTemplateRecord.ScopeLength) / 4
sizeOptions := int(optsTemplateRecord.OptionLength) / 4
if sizeScope < 0 || sizeOptions < 0 {
return records, NewErrorDecodingNetFlow("Error decoding OptionsTemplateSet: negative length.")
return records, fmt.Errorf("Error decoding OptionsTemplateSet: negative length.")
}

fields := make([]Field, sizeScope)
for i := 0; i < sizeScope; i++ {
field := Field{}
err := utils.BinaryDecoder(payload, &field.Type, &field.Length)
if err != nil {
if err := DecodeField(payload, &field, false); err != nil {
return records, err
}
fields[i] = field
Expand All @@ -46,8 +45,7 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR
fields = make([]Field, sizeOptions)
for i := 0; i < sizeOptions; i++ {
field := Field{}
err := utils.BinaryDecoder(payload, &field.Type, &field.Length)
if err != nil {
if err := DecodeField(payload, &field, false); err != nil {
return records, err
}
fields[i] = field
Expand All @@ -60,6 +58,15 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR
return records, err
}

func DecodeField(payload *bytes.Buffer, field *Field, pen bool) error {
err := utils.BinaryDecoder(payload, &field.Type, &field.Length)
if pen && err == nil && field.Type&0x8000 != 0 {
field.PenProvided = true
err = utils.BinaryDecoder(payload, &field.Pen)
}
return err
}

func DecodeIPFIXOptionsTemplateSet(payload *bytes.Buffer) ([]IPFIXOptionsTemplateRecord, error) {
records := make([]IPFIXOptionsTemplateRecord, 0)
var err error
Expand All @@ -73,31 +80,22 @@ func DecodeIPFIXOptionsTemplateSet(payload *bytes.Buffer) ([]IPFIXOptionsTemplat
fields := make([]Field, int(optsTemplateRecord.ScopeFieldCount))
for i := 0; i < int(optsTemplateRecord.ScopeFieldCount); i++ {
field := Field{}
if field.Type&0x8000 != 0 {
field.PenProvided = true
err := utils.BinaryDecoder(payload, &field.Pen)
if err != nil {
return records, err
}
if err := DecodeField(payload, &field, true); err != nil {
return records, err
}
fields[i] = field
}
optsTemplateRecord.Scopes = fields

optionsSize := int(optsTemplateRecord.FieldCount) - int(optsTemplateRecord.ScopeFieldCount)
if optionsSize < 0 {
return records, NewErrorDecodingNetFlow("Error decoding OptionsTemplateSet: negative length.")
return records, fmt.Errorf("Error decoding OptionsTemplateSet: negative length.")
}
fields = make([]Field, optionsSize)
for i := 0; i < optionsSize; i++ {
field := Field{}
err := utils.BinaryDecoder(payload, &field.Type, &field.Length)
if err == nil && field.Type&0x8000 != 0 {
field.PenProvided = true
err = utils.BinaryDecoder(payload, &field.Pen)
}
if err != nil {
return records, nil
if err := DecodeField(payload, &field, true); err != nil {
return records, err
}
fields[i] = field
}
Expand All @@ -120,7 +118,7 @@ func DecodeTemplateSet(version uint16, payload *bytes.Buffer) ([]TemplateRecord,
}

if int(templateRecord.FieldCount) < 0 {
return records, NewErrorDecodingNetFlow("Error decoding TemplateSet: zero count.")
return records, fmt.Errorf("Error decoding TemplateSet: zero count.")
}

fields := make([]Field, int(templateRecord.FieldCount))
Expand Down Expand Up @@ -215,48 +213,6 @@ func (e *ErrorTemplateNotFound) Error() string {
return fmt.Sprintf("No %v template %v found for and domain id %v", e.typeTemplate, e.templateId, e.obsDomainId)
}

type ErrorVersion struct {
version uint16
}

func NewErrorVersion(version uint16) *ErrorVersion {
return &ErrorVersion{
version: version,
}
}

func (e *ErrorVersion) Error() string {
return fmt.Sprintf("Unknown NetFlow version %v (only decodes v9 and v10/IPFIX)", e.version)
}

type ErrorFlowId struct {
id uint16
}

func NewErrorFlowId(id uint16) *ErrorFlowId {
return &ErrorFlowId{
id: id,
}
}

func (e *ErrorFlowId) Error() string {
return fmt.Sprintf("Unknown flow id %v (templates < 256, data >= 256)", e.id)
}

type ErrorDecodingNetFlow struct {
msg string
}

func NewErrorDecodingNetFlow(msg string) *ErrorDecodingNetFlow {
return &ErrorDecodingNetFlow{
msg: msg,
}
}

func (e *ErrorDecodingNetFlow) Error() string {
return fmt.Sprintf("Error decoding NetFlow: %v", e.msg)
}

func DecodeOptionsDataSet(version uint16, payload *bytes.Buffer, listFieldsScopes, listFieldsOption []Field) ([]OptionsDataRecord, error) {
records := make([]OptionsDataRecord, 0)

Expand Down Expand Up @@ -360,12 +316,14 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte

var version uint16
var obsDomainId uint32
binary.Read(payload, binary.BigEndian, &version)
if err := binary.Read(payload, binary.BigEndian, &version); err != nil {
return nil, fmt.Errorf("Error decoding version: %v", err)
}

if version == 9 {
err := utils.BinaryDecoder(payload, &packetNFv9.Count, &packetNFv9.SystemUptime, &packetNFv9.UnixSeconds, &packetNFv9.SequenceNumber, &packetNFv9.SourceId)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error decoding NetFlow v9 header: %v", err)
}
size = packetNFv9.Count
packetNFv9.Version = version
Expand All @@ -374,25 +332,25 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
} else if version == 10 {
err := utils.BinaryDecoder(payload, &packetIPFIX.Length, &packetIPFIX.ExportTime, &packetIPFIX.SequenceNumber, &packetIPFIX.ObservationDomainId)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error decoding IPFIX header: %v", err)
}
size = packetIPFIX.Length
packetIPFIX.Version = version
returnItem = *(&packetIPFIX)
obsDomainId = packetIPFIX.ObservationDomainId
} else {
return nil, NewErrorVersion(version)
return nil, fmt.Errorf("NetFlow/IPFIX version error: %d", version)
}

for i := 0; ((i < int(size) && version == 9) || version == 10) && payload.Len() > 0; i++ {
fsheader := FlowSetHeader{}
if err := utils.BinaryDecoder(payload, &fsheader); err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding FlowSet header: %v", err)
}

nextrelpos := int(fsheader.Length) - binary.Size(fsheader)
if nextrelpos < 0 {
return returnItem, NewErrorDecodingNetFlow("Error decoding packet: non-terminated stream.")
return returnItem, fmt.Errorf("Error decoding packet: non-terminated stream")
}

var flowSet interface{}
Expand All @@ -401,7 +359,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
records, err := DecodeTemplateSet(version, templateReader)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding FlowSet header: %v", err)
}
templatefs := TemplateFlowSet{
FlowSetHeader: fsheader,
Expand All @@ -420,7 +378,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
records, err := DecodeNFv9OptionsTemplateSet(templateReader)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding NetFlow OptionsTemplateSet: %v", err)
}
optsTemplatefs := NFv9OptionsTemplateFlowSet{
FlowSetHeader: fsheader,
Expand All @@ -438,7 +396,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
records, err := DecodeTemplateSet(version, templateReader)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding IPFIX TemplateSet: %v", err)
}
templatefs := TemplateFlowSet{
FlowSetHeader: fsheader,
Expand All @@ -456,7 +414,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
records, err := DecodeIPFIXOptionsTemplateSet(templateReader)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding IPFIX OptionsTemplateSet: %v", err)
}
optsTemplatefs := IPFIXOptionsTemplateFlowSet{
FlowSetHeader: fsheader,
Expand Down Expand Up @@ -484,7 +442,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
case TemplateRecord:
records, err := DecodeDataSet(version, dataReader, templatec.Fields)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding DataSet: %v", err)
}
datafs := DataFlowSet{
FlowSetHeader: fsheader,
Expand All @@ -494,7 +452,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
case IPFIXOptionsTemplateRecord:
records, err := DecodeOptionsDataSet(version, dataReader, templatec.Scopes, templatec.Options)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding DataSet: %v", err)
}

datafs := OptionsDataFlowSet{
Expand All @@ -505,7 +463,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
case NFv9OptionsTemplateRecord:
records, err := DecodeOptionsDataSet(version, dataReader, templatec.Scopes, templatec.Options)
if err != nil {
return returnItem, err
return returnItem, fmt.Errorf("Error decoding OptionDataSet: %v", err)
}

datafs := OptionsDataFlowSet{
Expand All @@ -518,7 +476,7 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
return returnItem, err
}
} else {
return returnItem, NewErrorFlowId(fsheader.Id)
return returnItem, fmt.Errorf("Error with ID %d", fsheader.Id)
}

if version == 9 && flowSet != nil {
Expand All @@ -533,6 +491,6 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
} else if version == 10 {
return packetIPFIX, nil
} else {
return returnItem, NewErrorVersion(version)
return returnItem, fmt.Errorf("Unknown version: %d", version)
}
}
14 changes: 0 additions & 14 deletions utils/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,6 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
msgDec, err := netflow.DecodeMessage(buf, templates)
if err != nil {
switch err.(type) {
case *netflow.ErrorVersion:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_version",
}).
Inc()
case *netflow.ErrorFlowId:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_flow_id",
}).
Inc()
case *netflow.ErrorTemplateNotFound:
NetFlowErrors.With(
prometheus.Labels{
Expand Down

0 comments on commit de5e751

Please sign in to comment.