-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathindex.js
90 lines (77 loc) · 2.8 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/* eslint-disable dot-notation */
"use strict";
const kafka = require("node-rdkafka");
const debuglog = require("util").debuglog("exp-kafka-listener");
const {EventEmitter} = require("events");
function calculateLag(stats, topicName) {
const topic = stats.topics[topicName] || {};
const partitions = topic.partitions || {};
return Object.values(partitions)
.map((p) => Math.max(p.hi_offset - p.committed_offset, 0))
.reduce((a, b) => a + b, 0);
}
function listen(kafkaConfig, groupId, topics) {
const api = new EventEmitter();
// TODO: Throw error if unknown options are provided
const consumerConf = {
"metadata.broker.list": kafkaConfig.host,
"client.id": `exp-kafka-listener-${getProductName()}`,
"enable.auto.commit": !!kafkaConfig.autocommit,
"statistics.interval.ms": kafkaConfig.statsInterval || 30000,
"group.id": groupId
};
if (process.env.NODE_DEBUG && process.env.NODE_DEBUG.includes("exp-kafka-listener")) {
consumerConf.debug = "consumer"
}
if (kafkaConfig.username) {
consumerConf["security.protocol"] = "sasl_plaintext";
consumerConf["sasl.mechanism"] = "PLAIN";
consumerConf["sasl.username"] = kafkaConfig.username;
consumerConf["sasl.password"] = kafkaConfig.password;
}
const topicConfig = { "auto.offset.reset": kafkaConfig.fromOffset || "earliest" };
debuglog("Starting Kafka listener using conf: ", kafkaConfig);
const kafkaReader = kafka.KafkaConsumer.createReadStream(consumerConf, topicConfig, {
topics: topics,
fetchSize: kafkaConfig.fetchSize || 500
});
kafkaReader.consumer.on("ready", () => api.emit("ready"));
kafkaReader.consumer.on("event.error", (e) => api.emit("error", e));
kafkaReader.consumer.on("event.log", (e) => debuglog("rdkafka log", e));
kafkaReader.consumer.on("event", (e) => debuglog("rdkafka event", e));
const stats = {};
function statsHandler({ message }) {
try {
const statsData = JSON.parse(message);
const lag = calculateLag(statsData, topics);
Object.assign(stats, {
time: Date.now(),
error: null,
lag,
messageRate: stats.time && (1000 * (stats.lag - lag) / (Date.now() - stats.time))
});
} catch (lagError) {
debuglog("Error calculating lag:", lagError);
stats.error = lagError.message || lagError;
}
api.emit("stats", stats);
}
kafkaReader.consumer.on("event.stats", statsHandler);
return Object.assign(api, {
readStream: kafkaReader,
commit: (msg) => kafkaReader.consumer.commitMessage(msg)
});
}
function getProductName() {
try {
const pkg = require("./package.json");
const nodeEnv = (process.env.NODE_ENV || "development");
return `${pkg.name}-${nodeEnv}`;
} catch (e) {
debuglog("Failed to get product name", e);
return "unknown";
}
}
module.exports = {
listen
};