diff --git a/src/app.js b/src/app.js index 8fbc206..ba748b9 100644 --- a/src/app.js +++ b/src/app.js @@ -5,7 +5,6 @@ require('./bootstrap') const AWSXRay = require('aws-xray-sdk') -const ns = AWSXRay.getNamespace(); const _ = require('lodash') const config = require('config') @@ -28,91 +27,95 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions()) * this function will be invoked */ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, async (m) => { - const segment = new AWSXRay.Segment('legacy-challenge-processor'); - AWSXRay.setSegment(segment); - - const message = m.message.value.toString('utf8') - logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${m.offset}; Message: ${message}.`) - - let messageJSON - try { - messageJSON = JSON.parse(message) - } catch (e) { - logger.error('Invalid message JSON.') - logger.logFullError(e) - - // commit the message and ignore it - await consumer.commitOffset({ topic, partition, offset: m.offset }) - return - } + const ns = AWSXRay.getNamespace(); + + ns.run(async () => { + const segment = new AWSXRay.Segment('legacy-challenge-processor'); + AWSXRay.setSegment(segment); + + const message = m.message.value.toString('utf8') + logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${m.offset}; Message: ${message}.`) + + let messageJSON + try { + messageJSON = JSON.parse(message) + } catch (e) { + logger.error('Invalid message JSON.') + logger.logFullError(e) + + // commit the message and ignore it + await consumer.commitOffset({ topic, partition, offset: m.offset }) + return + } - if (messageJSON.topic !== topic) { - logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}. Message: ${JSON.stringify(messageJSON)}`) + if (messageJSON.topic !== topic) { + logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}. Message: ${JSON.stringify(messageJSON)}`) - // commit the message and ignore it - await consumer.commitOffset({ topic, partition, offset: m.offset }) - return - } + // commit the message and ignore it + await consumer.commitOffset({ topic, partition, offset: m.offset }) + return + } - if (_.includes(config.IGNORED_ORIGINATORS, messageJSON.originator)) { - logger.error(`The message originator is in the ignored list. Originator: ${messageJSON.originator}`) + if (_.includes(config.IGNORED_ORIGINATORS, messageJSON.originator)) { + logger.error(`The message originator is in the ignored list. Originator: ${messageJSON.originator}`) - // commit the message and ignore it - await consumer.commitOffset({ topic, partition, offset: m.offset }) - return - } + // commit the message and ignore it + await consumer.commitOffset({ topic, partition, offset: m.offset }) + return + } - const { traceInformation: { - traceId, - parentSegmentId, - } = { - traceId: null, - parentSegmentId: null - } } = messageJSON.payload; + const { traceInformation: { + traceId, + parentSegmentId, + } = { + traceId: null, + parentSegmentId: null + } } = messageJSON.payload; - console.log('tracing information', traceId, parentSegmentId); + console.log('tracing information', traceId, parentSegmentId); - if (traceId) { - segment.trace_id = traceId; - segment.id = parentSegmentId; - } + if (traceId) { + segment.trace_id = traceId; + segment.id = parentSegmentId; + } - // do not trust the message payload - // the message.payload will be replaced with the data from the API - try { - console.log('Fetch challenge details'); - const challengeUuid = _.get(messageJSON, 'payload.id') - if (_.isEmpty(challengeUuid)) { - segment.close(); + // do not trust the message payload + // the message.payload will be replaced with the data from the API + try { + console.log('Fetch challenge details'); + const challengeUuid = _.get(messageJSON, 'payload.id') + if (_.isEmpty(challengeUuid)) { + segment.close(); + segment.addError(new Error(err)); + throw new Error('Invalid payload') + } + const m2mToken = await helper.getM2MToken() + const v5Challenge = await helper.getRequest(`${config.V5_CHALLENGE_API_URL}/${challengeUuid}`, m2mToken) + // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object + messageJSON.payload = { billingAccountId: messageJSON.payload.billingAccountId, ...v5Challenge.body } + } catch (err) { segment.addError(new Error(err)); - throw new Error('Invalid payload') + logger.debug('Failed to fetch challenge information') + logger.logFullError(err) } - const m2mToken = await helper.getM2MToken() - const v5Challenge = await helper.getRequest(`${config.V5_CHALLENGE_API_URL}/${challengeUuid}`, m2mToken) - // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object - messageJSON.payload = { billingAccountId: messageJSON.payload.billingAccountId, ...v5Challenge.body } - } catch (err) { - segment.addError(new Error(err)); - logger.debug('Failed to fetch challenge information') - logger.logFullError(err) - } - try { - console.log('Process challenge') - await ProcessorService.processMessage(messageJSON) - - // logger.debug('Successfully processed message') - } catch (err) { - segment.addError(new Error(err)); - logger.error(`Error processing message ${JSON.stringify(messageJSON)}`) - logger.logFullError(err) - } finally { - // Commit offset regardless of error - await consumer.commitOffset({ topic, partition, offset: m.offset }) - } + try { + console.log('Process challenge') + await ProcessorService.processMessage(messageJSON) - segment.close(); + // logger.debug('Successfully processed message') + } catch (err) { + segment.addError(new Error(err)); + logger.error(`Error processing message ${JSON.stringify(messageJSON)}`) + logger.logFullError(err) + } finally { + // Commit offset regardless of error + await consumer.commitOffset({ topic, partition, offset: m.offset }) + } + + segment.close(); + }); }) // check if there is kafka connection alive @@ -128,14 +131,12 @@ const check = () => { return connected } -const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC] +const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC]; -(() => { - ns.run(() => { - consumer - .init([{ +consumer + .init([{ subscriptions: topics, - handler: dataHandler + handler: dataHandler }]) // consume configured topics .then(() => { @@ -146,8 +147,6 @@ const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC] logger.info('Kick Start.......') }) .catch((err) => logger.error(err)) - }) -})(); if (process.env.NODE_ENV === 'test') { module.exports = consumer