Skip to content
This repository has been archived by the owner on Jan 23, 2025. It is now read-only.

Commit

Permalink
wip: tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
rakibansary committed Oct 27, 2022
1 parent 2a58a44 commit ac5170f
Showing 1 changed file with 80 additions and 81 deletions.
161 changes: 80 additions & 81 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
require('./bootstrap')

const AWSXRay = require('aws-xray-sdk')
const ns = AWSXRay.getNamespace();

const _ = require('lodash')
const config = require('config')
Expand All @@ -28,91 +27,95 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
* this function will be invoked
*/
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
const segment = new AWSXRay.Segment('legacy-challenge-processor');
AWSXRay.setSegment(segment);

const message = m.message.value.toString('utf8')
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${m.offset}; Message: ${message}.`)

let messageJSON
try {
messageJSON = JSON.parse(message)
} catch (e) {
logger.error('Invalid message JSON.')
logger.logFullError(e)

// commit the message and ignore it
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}
const ns = AWSXRay.getNamespace();

ns.run(async () => {
const segment = new AWSXRay.Segment('legacy-challenge-processor');
AWSXRay.setSegment(segment);

const message = m.message.value.toString('utf8')
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${m.offset}; Message: ${message}.`)

let messageJSON
try {
messageJSON = JSON.parse(message)
} catch (e) {
logger.error('Invalid message JSON.')
logger.logFullError(e)

// commit the message and ignore it
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}

if (messageJSON.topic !== topic) {
logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}. Message: ${JSON.stringify(messageJSON)}`)
if (messageJSON.topic !== topic) {
logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}. Message: ${JSON.stringify(messageJSON)}`)

// commit the message and ignore it
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}
// commit the message and ignore it
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}

if (_.includes(config.IGNORED_ORIGINATORS, messageJSON.originator)) {
logger.error(`The message originator is in the ignored list. Originator: ${messageJSON.originator}`)
if (_.includes(config.IGNORED_ORIGINATORS, messageJSON.originator)) {
logger.error(`The message originator is in the ignored list. Originator: ${messageJSON.originator}`)

// commit the message and ignore it
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}
// commit the message and ignore it
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}

const { traceInformation: {
traceId,
parentSegmentId,
} = {
traceId: null,
parentSegmentId: null
} } = messageJSON.payload;
const { traceInformation: {
traceId,
parentSegmentId,
} = {
traceId: null,
parentSegmentId: null
} } = messageJSON.payload;

console.log('tracing information', traceId, parentSegmentId);
console.log('tracing information', traceId, parentSegmentId);

if (traceId) {
segment.trace_id = traceId;
segment.id = parentSegmentId;
}
if (traceId) {
segment.trace_id = traceId;
segment.id = parentSegmentId;
}


// do not trust the message payload
// the message.payload will be replaced with the data from the API
try {
console.log('Fetch challenge details');
const challengeUuid = _.get(messageJSON, 'payload.id')
if (_.isEmpty(challengeUuid)) {
segment.close();
// do not trust the message payload
// the message.payload will be replaced with the data from the API
try {
console.log('Fetch challenge details');
const challengeUuid = _.get(messageJSON, 'payload.id')
if (_.isEmpty(challengeUuid)) {
segment.close();
segment.addError(new Error(err));
throw new Error('Invalid payload')
}
const m2mToken = await helper.getM2MToken()
const v5Challenge = await helper.getRequest(`${config.V5_CHALLENGE_API_URL}/${challengeUuid}`, m2mToken)
// TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
messageJSON.payload = { billingAccountId: messageJSON.payload.billingAccountId, ...v5Challenge.body }
} catch (err) {
segment.addError(new Error(err));
throw new Error('Invalid payload')
logger.debug('Failed to fetch challenge information')
logger.logFullError(err)
}
const m2mToken = await helper.getM2MToken()
const v5Challenge = await helper.getRequest(`${config.V5_CHALLENGE_API_URL}/${challengeUuid}`, m2mToken)
// TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
messageJSON.payload = { billingAccountId: messageJSON.payload.billingAccountId, ...v5Challenge.body }
} catch (err) {
segment.addError(new Error(err));
logger.debug('Failed to fetch challenge information')
logger.logFullError(err)
}

try {
console.log('Process challenge')
await ProcessorService.processMessage(messageJSON)

// logger.debug('Successfully processed message')
} catch (err) {
segment.addError(new Error(err));
logger.error(`Error processing message ${JSON.stringify(messageJSON)}`)
logger.logFullError(err)
} finally {
// Commit offset regardless of error
await consumer.commitOffset({ topic, partition, offset: m.offset })
}
try {
console.log('Process challenge')
await ProcessorService.processMessage(messageJSON)

segment.close();
// logger.debug('Successfully processed message')
} catch (err) {
segment.addError(new Error(err));
logger.error(`Error processing message ${JSON.stringify(messageJSON)}`)
logger.logFullError(err)
} finally {
// Commit offset regardless of error
await consumer.commitOffset({ topic, partition, offset: m.offset })
}

segment.close();
});
})

// check if there is kafka connection alive
Expand All @@ -128,14 +131,12 @@ const check = () => {
return connected
}

const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC]
const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC];

(() => {
ns.run(() => {
consumer
.init([{
consumer
.init([{
subscriptions: topics,
handler: dataHandler
handler: dataHandler
}])
// consume configured topics
.then(() => {
Expand All @@ -146,8 +147,6 @@ const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC]
logger.info('Kick Start.......')
})
.catch((err) => logger.error(err))
})
})();

if (process.env.NODE_ENV === 'test') {
module.exports = consumer
Expand Down

0 comments on commit ac5170f

Please sign in to comment.