Skip to content
This repository has been archived by the owner on Feb 19, 2025. It is now read-only.

Commit

Permalink
Length-delimited protobuf #65
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Mar 22, 2020
1 parent c970b07 commit edcb2b6
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 5 deletions.
2 changes: 2 additions & 0 deletions cmd/cnetflow/cnetflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
LogFmt = flag.String("logfmt", "normal", "Log formatter")

EnableKafka = flag.Bool("kafka", true, "Enable Kafka")
FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")
MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")
TemplatePath = flag.String("templates.path", "/templates", "NetFlow/IPFIX templates list")
Expand Down Expand Up @@ -80,6 +81,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
kafkaState.FixedLengthProto = *FixedLength
s.Transport = kafkaState
}
log.WithFields(log.Fields{
Expand Down
2 changes: 2 additions & 0 deletions cmd/cnflegacy/cnflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
LogFmt = flag.String("logfmt", "normal", "Log formatter")

EnableKafka = flag.Bool("kafka", true, "Enable Kafka")
FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")
MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")

Expand Down Expand Up @@ -78,6 +79,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
kafkaState.FixedLengthProto = *FixedLength
s.Transport = kafkaState
}
log.WithFields(log.Fields{
Expand Down
2 changes: 2 additions & 0 deletions cmd/csflow/csflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
LogFmt = flag.String("logfmt", "normal", "Log formatter")

EnableKafka = flag.Bool("kafka", true, "Enable Kafka")
FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")
MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")

Expand Down Expand Up @@ -78,6 +79,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
kafkaState.FixedLengthProto = *FixedLength
s.Transport = kafkaState
}
log.WithFields(log.Fields{
Expand Down
3 changes: 3 additions & 0 deletions cmd/goflow/goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
LogFmt = flag.String("logfmt", "normal", "Log formatter")

EnableKafka = flag.Bool("kafka", true, "Enable Kafka")
FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")
MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")

Expand Down Expand Up @@ -101,6 +102,8 @@ func main() {
if err != nil {
log.Fatal(err)
}
kafkaState.FixedLengthProto = *FixedLength

sSFlow.Transport = kafkaState
sNFL.Transport = kafkaState
sNF.Transport = kafkaState
Expand Down
18 changes: 13 additions & 5 deletions transport/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ var (
)

type KafkaState struct {
producer sarama.AsyncProducer
topic string
hashing bool
keying []string
FixedLengthProto bool
producer sarama.AsyncProducer
topic string
hashing bool
keying []string
}

// SetKafkaVersion sets the KafkaVersion that is used to set the log message format version
Expand Down Expand Up @@ -164,7 +165,14 @@ func (s KafkaState) SendKafkaFlowMessage(flowMessage *flowmessage.FlowMessage) {
keyStr := HashProto(s.keying, flowMessage)
key = sarama.StringEncoder(keyStr)
}
b, _ := proto.Marshal(flowMessage)
var b []byte
if !s.FixedLengthProto {
b, _ = proto.Marshal(flowMessage)
} else {
buf := proto.NewBuffer([]byte{})
buf.EncodeMessage(flowMessage)
b = buf.Bytes()
}
s.producer.Input() <- &sarama.ProducerMessage{
Topic: s.topic,
Key: key,
Expand Down

0 comments on commit edcb2b6

Please sign in to comment.