Skip to content

Commit

Permalink
feat: topic management
Browse files Browse the repository at this point in the history
  • Loading branch information
fdelbrayelle committed Jun 8, 2020
1 parent 48054b6 commit e57ab8c
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 21 deletions.
14 changes: 9 additions & 5 deletions generators/app/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ module.exports = {
writeFiles
};

// This is a default topic naming convention which can be updated (see also application-kafka.yml.ejs)
const topicNamingFormat = (generator, entity) => `queuing.${generator.snakeCaseBaseName}.${_.snakeCase(entity)}`;

function initVariables(generator) {
const vavrVersion = '0.10.3';
generator.addMavenProperty('vavr.version', vavrVersion);
Expand Down Expand Up @@ -47,6 +44,7 @@ function initVariables(generator) {
generator.componentsPrefixes = generator.props.componentsPrefixes || [];
generator.components = generator.props.components;
generator.componentsByEntityConfig = generator.props.componentsByEntityConfig || [];
generator.topics = generator.props.topics;
generator.pollingTimeout = generator.props.pollingTimeout;
generator.autoOffsetResetPolicy = generator.props.autoOffsetResetPolicy;

Expand Down Expand Up @@ -312,6 +310,14 @@ function writeFiles(generator) {
kafkaPreviousTestConfiguration.kafka['polling.timeout'] = generator.pollingTimeout;
}

if (!kafkaPreviousConfiguration.kafka.topic) {
kafkaPreviousConfiguration.kafka.topic = [];
}

generator.topics.forEach(topic => {
kafkaPreviousConfiguration.kafka.topic[topic.key] = topic.value;
});

generator.entities.forEach(entity => {
writeProperties(kafkaPreviousConfiguration, kafkaPreviousTestConfiguration, entity);
});
Expand Down Expand Up @@ -356,7 +362,6 @@ function writeFiles(generator) {

function buildJsonConsumerConfiguration(generator, entity, enabled) {
return {
name: topicNamingFormat(generator, entity),
enabled,
'[key.deserializer]': 'org.apache.kafka.common.serialization.StringDeserializer',
'[value.deserializer]': `${generator.packageName}.service.kafka.deserializer.${entity}Deserializer`,
Expand All @@ -367,7 +372,6 @@ function buildJsonConsumerConfiguration(generator, entity, enabled) {

function buildJsonProducerConfiguration(generator, entity, enabled) {
return {
name: topicNamingFormat(generator, entity),
enabled,
'[key.serializer]': 'org.apache.kafka.common.serialization.StringSerializer',
'[value.serializer]': `${generator.packageName}.service.kafka.serializer.${entity}Serializer`
Expand Down
5 changes: 3 additions & 2 deletions generators/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ module.exports = class extends BaseGenerator {
defaults: false
});

// init props
// props used for writing
this.props = {
entities: [],
components: [],
componentsByEntityConfig: [],
componentsPrefixes: []
componentsPrefixes: [],
topics: []
};

this.setupClientOptions(this);
Expand Down
73 changes: 67 additions & 6 deletions generators/app/prompts.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function offsetChoices() {
];
}

function componentChoices() {
function componentsChoices() {
return [
{
name: 'Consumer',
Expand All @@ -56,6 +56,26 @@ function componentChoices() {
];
}

function topicsChoices(generator, mode, previousConfiguration) {
const topicsChoices = [];
topicsChoices.push({
name: 'Default topic name following this convention: message_type.application_type.entity_name',
value: constants.DEFAULT_TOPIC
});
topicsChoices.push({
name: 'Custom topic name',
value: constants.CUSTOM_TOPIC
});

if (previousConfiguration && previousConfiguration.topics) {
Object.entries(previousConfiguration.topics).forEach(topic => {
topicsChoices.push({ name: topic[1], value: topic[0] });
});
}

return topicsChoices;
}

/**
* Retrieve from .jhipster metadata, the list of all project entities.
*
Expand Down Expand Up @@ -120,7 +140,7 @@ function askForBigBangOperations(generator, done) {
type: 'checkbox',
name: 'components',
message: 'Which components would you like to generate?',
choices: componentChoices(),
choices: componentsChoices(),
default: [],
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true)
},
Expand Down Expand Up @@ -247,7 +267,7 @@ function askForIncrementalOperations(generator, done) {

function getAvailableComponentsWithoutEntity(generator, previousConfiguration, prefix) {
const availableComponents = [];
const allComponentChoices = componentChoices();
const allComponentChoices = componentsChoices();
const entitiesComponents = utils.extractEntitiesComponents(previousConfiguration);
const prefixJavaClassName = utils.transformToJavaClassNameCase(prefix);
if (generator.props.componentsByEntityConfig) {
Expand All @@ -271,7 +291,7 @@ function getAvailableComponentsWithoutEntity(generator, previousConfiguration, p
function askForUnitaryEntityOperations(generator, done) {
const getConcernedComponents = (previousConfiguration, entityName, currentPrefix) => {
const availableComponents = [];
const allComponentChoices = componentChoices();
const allComponentChoices = componentsChoices();
const entitiesComponents = utils.extractEntitiesComponents(previousConfiguration);

if (entityName === constants.NO_ENTITY) {
Expand Down Expand Up @@ -308,10 +328,27 @@ function askForUnitaryEntityOperations(generator, done) {
when: generator.props.currentEntity,
type: 'checkbox',
name: 'currentEntityComponents',
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true),
message: 'Which components would you like to generate?',
choices: getConcernedComponents(previousConfiguration(generator), generator.props.currentEntity, generator.props.currentPrefix),
default: []
default: [],
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true)
},
{
when: response =>
response.currentEntityComponents.includes(constants.CONSUMER_COMPONENT) ||
response.currentEntityComponents.includes(constants.PRODUCER_COMPONENT),
type: 'list',
name: 'topic',
message: 'For which topic?',
choices: topicsChoices(generator, previousConfiguration(generator)),
default: constants.DEFAULT_TOPIC
},
{
when: response => response.topic === constants.CUSTOM_TOPIC,
type: 'input',
name: 'topicName',
message: 'What is the topic name?',
validate: input => (_.isEmpty(input) ? 'You have to choose a topic name' : true)
},
{
when: response =>
Expand Down Expand Up @@ -348,19 +385,29 @@ function askForUnitaryEntityOperations(generator, done) {
if (!generator.props.componentsByEntityConfig) {
generator.props.componentsByEntityConfig = [];
}

if (!generator.props.topics) {
generator.props.topics = [];
}

pushTopicName(generator, answers);

if (answers.currentEntityComponents && answers.currentEntityComponents.length > 0) {
if (generator.props.currentEntity === constants.NO_ENTITY) {
pushComponentsByEntity(generator, answers, utils.transformToJavaClassNameCase(generator.props.currentPrefix));
} else {
pushComponentsByEntity(generator, answers, generator.props.currentEntity);
}
}

if (answers.pollingTimeout) {
generator.props.pollingTimeout = +answers.pollingTimeout; // force conversion to int
}

if (answers.autoOffsetResetPolicy) {
generator.props.autoOffsetResetPolicy = answers.autoOffsetResetPolicy;
}

if (answers.continueAddingEntitiesComponents) {
askForIncrementalOperations(generator, done);
} else {
Expand All @@ -380,3 +427,17 @@ function pushComponentsByEntity(generator, answers, entity) {
generator.props.componentsByEntityConfig[entity] = [...answers.currentEntityComponents];
}
}

function pushTopicName(generator, answers) {
const name = generator.props.currentEntity === constants.NO_ENTITY ? generator.props.currentPrefix : generator.props.currentEntity;

if (answers.topic) {
if (answers.topic === constants.CUSTOM_TOPIC) {
generator.props.topics.push({ key: name, value: answers.topicName });
} else if (answers.topic === constants.DEFAULT_TOPIC) {
generator.props.topics.push({ key: name, value: utils.topicNamingFormat(generator, name) });
} else {
generator.props.topics.push({ key: name, value: answers.topic });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class <%= entityClass %>Consumer extends GenericConsumer<<%= type %>> {

private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Consumer.class);

public <%= entityClass %>Consumer(@Value("${kafka.consumer.<%= camelCaseEntityClass %>.name}") final String topicName, final KafkaProperties kafkaProperties) {
public <%= entityClass %>Consumer(@Value("${kafka.topic.<%= camelCaseEntityClass %>}") final String topicName, final KafkaProperties kafkaProperties) {
super(topicName, kafkaProperties.getConsumer().get("<%= camelCaseEntityClass %>"), kafkaProperties.getPollingTimeout());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class <%= entityClass %>Producer {

private final String topicName;

public <%= entityClass %>Producer(@Value("${kafka.producer.<%= camelCaseEntityClass %>.name}") final String topicName, final KafkaProperties kafkaProperties) {
public <%= entityClass %>Producer(@Value("${kafka.topic.<%= camelCaseEntityClass %>}") final String topicName, final KafkaProperties kafkaProperties) {
this.topicName = topicName;
this.kafkaProducer = new KafkaProducer<>(kafkaProperties.getProducer().get("<%= camelCaseEntityClass %>"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ kafka:
}
_%>
<%= _.camelCase(entity) _%>:
name: queuing.<%= snakeCaseBaseName %>.<%= _.snakeCase(entity) %>
enabled: <%= enabled %>
'[key.deserializer]': org.apache.kafka.common.serialization.StringDeserializer
'[value.deserializer]': <%= packageName %>.service.kafka.deserializer.<%= _.upperFirst(_.camelCase(entity)) %>Deserializer
Expand All @@ -31,10 +30,19 @@ _%>
}
_%>
<%= _.camelCase(entity) _%>:
name: queuing.<%= snakeCaseBaseName %>.<%= _.snakeCase(entity) %>
enabled: <%= enabled %>
'[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
'[value.serializer]': <%= packageName %>.service.kafka.serializer.<%= _.upperFirst(_.camelCase(entity)) %>Serializer
<%_ } _%>
<%_ } _%>
<%_ if (components.includes('consumer') || components.includes('producer')) { _%>
topic:
<%_ for (entity of entities) {
if (entity === 'no_entity' && componentsPrefixes.length > 0 && componentsPrefixes[0]) {
entity = componentsPrefixes[0];
}
_%>
<%= _.camelCase(entity) _%>: queuing.<%= snakeCaseBaseName %>.<%= _.snakeCase(entity) %>
<%_ } _%>
<%_ } _%>

8 changes: 7 additions & 1 deletion generators/app/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ module.exports = {
getPreviousKafkaConfiguration,
extractEntitiesComponents,
orderKafkaProperties,
transformToJavaClassNameCase
transformToJavaClassNameCase,
topicNamingFormat
};

// This is a default topic naming convention which can be updated (see also application-kafka.yml.ejs)
function topicNamingFormat(generator, entity) {
return `queuing.${generator.snakeCaseBaseName}.${_.snakeCase(entity)}`;
}

function transformToJavaClassNameCase(entityName) {
return _.upperFirst(_.camelCase(entityName));
}
Expand Down
6 changes: 5 additions & 1 deletion generators/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const NONE_OFFSET = 'none';
const DEFAULT_POLLING_TIMEOUT = '10000';
const JSON_EXTENSION = '.json';
const EMPTY_STRING = '';
const DEFAULT_TOPIC = 'default_topic';
const CUSTOM_TOPIC = 'custom_topic';

const constants = {
JHIPSTER_CONFIG_DIR,
Expand All @@ -46,7 +48,9 @@ const constants = {
NONE_OFFSET,
DEFAULT_POLLING_TIMEOUT,
JSON_EXTENSION,
EMPTY_STRING
EMPTY_STRING,
DEFAULT_TOPIC,
CUSTOM_TOPIC
};

module.exports = constants;
2 changes: 0 additions & 2 deletions test/app.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,7 @@ function assertMinimalProperties(applicationYml, testApplicationYml, entityName,
entityTestYmlBlock = testApplicationYml.kafka.producer[`${_.camelCase(entityName)}`];
}

assert.textEqual(entityYmlBlock.name, `queuing.message_broker_with_entities.${_.snakeCase(entityName)}`);
assert.textEqual(entityYmlBlock.enabled.toString(), 'true');
assert.textEqual(entityTestYmlBlock.name, `queuing.message_broker_with_entities.${_.snakeCase(entityName)}`);
assert.textEqual(entityTestYmlBlock.enabled.toString(), 'false');
}

Expand Down

0 comments on commit e57ab8c

Please sign in to comment.