-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
201 lines (182 loc) · 6.59 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//for all env variables imports
require("dotenv").config();
//setting up kafka
const kafka=require("./kafka");
var eventsDataModel = require("./models/eventsData");
var listenerRouter = require("./routes/listenerroutes");
//creating a consumer
const consumer = kafka.consumer({ groupId: process.env.TOPIC, retries: Number.MAX_VALUE });
// import library to serialize Events Data
var serialize = require('serialize-javascript');
function deserialize(serializedJavascript) {
return eval("(" + serializedJavascript + ")");
}
var queuePopFlag = 0;
async function saveEventInDataBase(
deployHash,
eventName,
timestamp,
blockHash,
eventsdata
) {
let eventResult = new eventsDataModel({
deployHash: deployHash,
eventName: eventName,
timestamp: timestamp,
block_hash: blockHash,
status: "pending",
eventType: "NotSame",
eventsdata: eventsdata,
});
await eventsDataModel.create(eventResult);
return eventResult;
}
async function callMutations(redis) {
if (queuePopFlag == 0) {
let redisLength = await redis.client.LLEN(process.env.GRAPHQLREDISQUEUE);
//check redis queue length
if (redisLength > 0) {
queuePopFlag = 1;
let headValue = await redis.client.LRANGE(
process.env.GRAPHQLREDISQUEUE,
0,
0
);
let deserializedHeadValue = deserialize(headValue).obj;
console.log("Event Read from queue's head: ", deserializedHeadValue);
//check if event is in the database
let eventResult = await eventsDataModel.findOne({
deployHash: deserializedHeadValue.deployHash,
eventName: deserializedHeadValue.eventName,
timestamp: deserializedHeadValue.timestamp,
block_hash: deserializedHeadValue.block_hash,
});
if (
eventResult != null &&
JSON.stringify(eventResult.eventsdata) ==
JSON.stringify(deserializedHeadValue.eventsdata) &&
eventResult.status == "completed"
) {
console.log("Event is repeated, skipping mutation call...");
} else {
if (eventResult == null) {
console.log("Event is New, Calling Mutation...");
//store new event Data
let result = await saveEventInDataBase(
deserializedHeadValue.deployHash,
deserializedHeadValue.eventName,
deserializedHeadValue.timestamp,
deserializedHeadValue.block_hash,
deserializedHeadValue.eventsdata
);
//call mutation
await listenerRouter.geteventsdata(
result,
deserializedHeadValue.deployHash,
deserializedHeadValue.timestamp,
deserializedHeadValue.block_hash,
deserializedHeadValue.eventName,
deserializedHeadValue.eventsdata
);
} else {
if (
JSON.stringify(eventResult.eventsdata) !=
JSON.stringify(deserializedHeadValue.eventsdata)
) {
if (eventResult.eventType == "NotSame") {
console.log("Event has same EventName, Calling Mutation...");
//store new event Data
let result = await saveEventInDataBase(
deserializedHeadValue.deployHash,
deserializedHeadValue.eventName,
deserializedHeadValue.timestamp,
deserializedHeadValue.block_hash,
deserializedHeadValue.eventsdata
);
result.eventType = "same";
eventResult.eventType = "same";
await result.save();
await eventResult.save();
//call mutation
await listenerRouter.geteventsdata(
result,
deserializedHeadValue.deployHash,
deserializedHeadValue.timestamp,
deserializedHeadValue.block_hash,
deserializedHeadValue.eventName,
deserializedHeadValue.eventsdata
);
} else {
console.log("Event is repeated, skipping mutation call...");
}
} else if (eventResult.status == "pending") {
console.log("Event is Not performed Yet, Calling Mutation...");
//call mutation
await listenerRouter.geteventsdata(
eventResult,
deserializedHeadValue.deployHash,
deserializedHeadValue.timestamp,
deserializedHeadValue.block_hash,
deserializedHeadValue.eventName,
deserializedHeadValue.eventsdata
);
}
}
}
await redis.client.LPOP(process.env.GRAPHQLREDISQUEUE);
queuePopFlag = 0;
} else {
console.log("There are currently no Events in the Redis queue...");
return;
}
} else {
console.log("Already, one Event is calling the mutation...");
return;
}
}
async function consumeEvent (redis)
{
try {
//connection a producer
await consumer.connect();
//subcribing the topic to consume data
await consumer.subscribe({ topic: process.env.TOPIC});
//consuming data
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning() || isStale())
{
break;
}
console.log(`Consumed event from topic ${batch.topic}: value = ${message.value}`);
let _value = JSON.parse(message.value.toString());
let serializedValue = serialize({obj:_value})
await redis.client.RPUSH(process.env.GRAPHQLREDISQUEUE,serializedValue);
console.log("Event pushed to queue...");
let interval = setInterval(async () => {
console.log("Heartbeat Signaled...");
heartbeat();
}, 15000)
await callMutations(redis);
clearInterval(interval);
//committing offset
resolveOffset(message.offset);
await heartbeat();
console.log("Offset Committed...");
console.log("Heartbeat Signaled...");
}
}
});
process.on('SIGINT', () => {
console.log('\nDisconnecting consumer and shutting down Graphql backend ...');
consumer.disconnect();
process.exit(0);
});
}
catch (error) {
console.error('Error listening message', error)
}
}
module.exports = {consumeEvent};