Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
fix/debug panic from kfc integration test (#184)
Browse files Browse the repository at this point in the history
* fix/debug panic from kfc integration test

* expect duplicate messages from kafka

* ~

* ~

* fail on too many duplicates

* swap checks
  • Loading branch information
Kiran RG authored May 1, 2017
1 parent ba68fc2 commit 691dffd
Showing 1 changed file with 53 additions and 7 deletions.
60 changes: 53 additions & 7 deletions test/integration/kfc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type kafkaMsg struct {
part int32
offs int64
seq int
count int
}

func (t *kafkaMsg) Equals(m *kafkaMsg) bool {
Expand All @@ -76,7 +77,8 @@ func (t *kafkaMsg) Equals(m *kafkaMsg) bool {
}

func (t *kafkaMsg) String() string {
return fmt.Sprintf("[%d] (topic:%v key:%v val:%d bytes) => (part:%d, offs:%d)", t.seq, t.topic, t.key, len(t.val), t.part, t.offs)
return fmt.Sprintf("[%d] (topic:%v key:%v val:%d bytes) => (part:%d, offs:%d) <count=%d>",
t.seq, t.topic, t.key, len(t.val), t.part, t.offs, t.count)
}

func (s *NetIntegrationSuiteParallelE) TestKafkaForCherami() {
Expand Down Expand Up @@ -140,10 +142,13 @@ func (s *NetIntegrationSuiteParallelE) TestKafkaForCherami() {
config.Producer.Return.Successes = true

kafkaProducer, err := sarama.NewSyncProducer([]string{kafkaBroker}, config)
if err != nil {
fmt.Printf("sarama.NewSyncProducer error=%v\n", err)
}
s.NoError(err)
defer kafkaProducer.Close()

msgs := make(map[string]*kafkaMsg)
sentMsgs := make(map[string]*kafkaMsg)

// publish messages to kafka
for i := 0; i < numMsgs; i++ {
Expand All @@ -169,12 +174,21 @@ func (s *NetIntegrationSuiteParallelE) TestKafkaForCherami() {
continue
}

msgs[key] = &kafkaMsg{topic: topic, key: key, val: val, part: part, offs: offs, seq: i}
sentMsgs[key] = &kafkaMsg{
topic: topic,
key: key,
val: val,
part: part,
offs: offs,
seq: i,
}
}

recvMsgs := make(map[string]*kafkaMsg)

// consume messages from cherami
loop:
for i := 0; i < numMsgs; i++ {
for i := 0; (len(recvMsgs) < numMsgs) && (i < 2*numMsgs); i++ {

select {
case cmsg := <-cheramiMsgsCh:
Expand All @@ -185,22 +199,52 @@ loop:
part, _ := strconv.Atoi(uc[`partition`])
offs, _ := strconv.Atoi(uc[`offset`])

// check for duplicates
if _, ok := recvMsgs[key]; ok {

if recvMsgs[key].count++; recvMsgs[key].count > 3 {
s.Fail("received message too many times")
}

continue loop
}

msg := &kafkaMsg{
topic: topic,
key: key,
val: payload.GetData(),
part: int32(part),
offs: int64(offs),
seq: i,
count: 1,
}

// validate that message is as expected
if !msg.Equals(msgs[key]) {
fmt.Printf("received=%v (expected=%v)\n", msg, msgs[key])
if sentMsgs[key] == nil || !msg.Equals(sentMsgs[key]) {

fmt.Printf("received[%d]=%v\n", i, msg)

if sentMsgs[key] == nil {
fmt.Printf("expected=<MISSING>\n")
} else {
fmt.Printf("expected=%v\n", sentMsgs[key])
}

fmt.Printf("\nreceived:\n")
for _, m := range recvMsgs {
fmt.Printf("%v\n", m)
}

fmt.Printf("\nsent (and not received):\n")
for _, m := range sentMsgs {
fmt.Printf("%v\n", m)
}

s.Fail("message validation failed")
}

delete(msgs, key) // ensure we don't see duplicates
// if we have seen all the messages, break out
recvMsgs[key] = msg

cmsg.Ack()

Expand All @@ -210,5 +254,7 @@ loop:
}
}

s.Equal(numMsgs, len(recvMsgs)) // we should have received all the messages

return
}

0 comments on commit 691dffd

Please sign in to comment.