-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindex.js
67 lines (52 loc) · 1.88 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// tests/index.js
process.on('uncaughtException', function (err) {
console.error(err.stack)
})
require('longjohn')
var kafkacat = require('../index')
, randWord = function (words) {
var dwords = ['mouse', 'cup', 'box', 'headphones', 'keyboard', 'screen', 'paper',
'desk', 'bag', 'pillow', 'chair', 'tree', 'car', 'window', 'dog',
'van', 'brick', 'road', 'sun', 'clouds', 'music', 'pants']
, pick = function (things) {
return things[Math.floor(Math.random()*things.length)]
}
return (words ? pick(words) : pick(dwords))
}
var tests = exports.tests = {}
tests.setUp = function (cb) {
cb()
}
tests.tearDown = function (cb) {
cb()
}
tests.txRxOneMsg = function (test) {
var cOptions = { brokers: process.env.BROKERS
, topic: 'foobar'
, partition: 0
, offset: 'end'
, unbuffered: true // tells kafkacat to use unbuffered I/O. Buffered can present a problem since kafkacat's stdout won't emit data unless a full chunk is ready.
}
, pOptions = { brokers: process.env.BROKERS
, topic: 'foobar'
, partition: 0
, unbuffered: true
}
var consumeStream = kafkacat.createConsumeStream(cOptions)
, produceStream = kafkacat.createProduceStream(pOptions)
, msg = randWord() + '\n'
consumeStream.on('readable', function () {
var chunk = consumeStream.read()
, data = new Buffer('')
if (!chunk) { return }
while (chunk) {
data += chunk
chunk = consumeStream.read()
}
test.equal(data.toString(), msg, 'Did not receive the same message that was transmitted\nSent: ' + msg + 'Received: ' + data.toString())
produceStream.end()
consumeStream.end()
test.done()
})
var isBuffered = produceStream.write(msg, test.error)
}