Skip to content

Commit

Permalink
feat: topic management
Browse files Browse the repository at this point in the history
  • Loading branch information
fdelbrayelle committed Jun 10, 2020
1 parent e76ba89 commit b1461a6
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 27 deletions.
23 changes: 16 additions & 7 deletions generators/app/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ module.exports = {
writeFiles
};

// This is a default topic naming convention which can be updated (see also application-kafka.yml.ejs)
const topicNamingFormat = (generator, entity) => `queuing.${generator.snakeCaseBaseName}.${_.snakeCase(entity)}`;

function initVariables(generator) {
const vavrVersion = '0.10.3';
generator.addMavenProperty('vavr.version', vavrVersion);
Expand Down Expand Up @@ -47,6 +44,7 @@ function initVariables(generator) {
generator.componentsPrefixes = generator.props.componentsPrefixes || [];
generator.components = generator.props.components;
generator.componentsByEntityConfig = generator.props.componentsByEntityConfig || [];
generator.topics = generator.props.topics;
generator.pollingTimeout = generator.props.pollingTimeout;
generator.autoOffsetResetPolicy = generator.props.autoOffsetResetPolicy;

Expand Down Expand Up @@ -320,6 +318,19 @@ function writeFiles(generator) {
writeProperties(kafkaPreviousConfiguration, kafkaPreviousTestConfiguration, utils.transformToJavaClassNameCase(prefix));
});

if (!kafkaPreviousConfiguration.kafka.topic) {
kafkaPreviousConfiguration.kafka.topic = {};
}

if (!kafkaPreviousTestConfiguration.kafka.topic) {
kafkaPreviousTestConfiguration.kafka.topic = {};
}

generator.topics.forEach(topic => {
kafkaPreviousConfiguration.kafka.topic[topic.key] = topic.value;
kafkaPreviousTestConfiguration.kafka.topic[topic.key] = topic.value;
});

const kafkaProperties = jsYaml.dump(utils.orderKafkaProperties(kafkaPreviousConfiguration), {
lineWidth: -1
});
Expand Down Expand Up @@ -356,7 +367,6 @@ function writeFiles(generator) {

function buildJsonConsumerConfiguration(generator, entity, enabled) {
return {
name: topicNamingFormat(generator, entity),
enabled,
'[key.deserializer]': 'org.apache.kafka.common.serialization.StringDeserializer',
'[value.deserializer]': `${generator.packageName}.service.kafka.deserializer.${entity}Deserializer`,
Expand All @@ -367,16 +377,15 @@ function buildJsonConsumerConfiguration(generator, entity, enabled) {

function buildJsonProducerConfiguration(generator, entity, enabled) {
return {
name: topicNamingFormat(generator, entity),
enabled,
'[key.serializer]': 'org.apache.kafka.common.serialization.StringSerializer',
'[value.serializer]': `${generator.packageName}.service.kafka.serializer.${entity}Serializer`
};
}
function sanitizeProperties(jsyamlGeneratedProperties) {
// related to https://github.com/nodeca/js-yaml/issues/470
// Related to: https://github.com/nodeca/js-yaml/issues/470
const patternContainingSingleQuote = /^(\s.+)(:[ ]+)('((.+:)+.*)')$/gm;
// related to https://github.com/nodeca/js-yaml/issues/478
// Related to: https://github.com/nodeca/js-yaml/issues/478
const patternNullGeneratedValue = /^(\s.+)(:)([ ]+null.*)$/gm;
return jsyamlGeneratedProperties.replace(patternContainingSingleQuote, '$1$2$4').replace(patternNullGeneratedValue, '$1$2');
}
Expand Down
5 changes: 3 additions & 2 deletions generators/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ module.exports = class extends BaseGenerator {
defaults: false
});

// init props
// props used for writing
this.props = {
entities: [],
components: [],
componentsByEntityConfig: [],
componentsPrefixes: []
componentsPrefixes: [],
topics: []
};

this.setupClientOptions(this);
Expand Down
98 changes: 92 additions & 6 deletions generators/app/prompts.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function offsetChoices() {
];
}

function componentChoices() {
function componentsChoices() {
return [
{
name: 'Consumer',
Expand All @@ -56,6 +56,34 @@ function componentChoices() {
];
}

function topicsChoices(generator, previousConfiguration) {
const topicsChoices = [];
topicsChoices.push({
name: 'Default topic name following this convention: message_type.application_type.entity_name',
value: constants.DEFAULT_TOPIC
});
topicsChoices.push({
name: 'Custom topic name',
value: constants.CUSTOM_TOPIC
});

if (previousConfiguration && previousConfiguration.topic) {
Object.entries(previousConfiguration.topic).forEach(topic => {
if (!topicsChoices.some(t => t.name === topic[1])) {
topicsChoices.push({ name: topic[1], value: topic[1] });
}
});
}

generator.props.topics.forEach(topic => {
if (!topic.existingTopicName) {
topicsChoices.push({ name: topic.value, value: topic.value });
}
});

return topicsChoices;
}

/**
* Retrieve from .jhipster metadata, the list of all project entities.
*
Expand Down Expand Up @@ -120,7 +148,7 @@ function askForBigBangOperations(generator, done) {
type: 'checkbox',
name: 'components',
message: 'Which components would you like to generate?',
choices: componentChoices(),
choices: componentsChoices(),
default: [],
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true)
},
Expand Down Expand Up @@ -247,7 +275,7 @@ function askForIncrementalOperations(generator, done) {

function getAvailableComponentsWithoutEntity(generator, previousConfiguration, prefix) {
const availableComponents = [];
const allComponentChoices = componentChoices();
const allComponentChoices = componentsChoices();
const entitiesComponents = utils.extractEntitiesComponents(previousConfiguration);
const prefixJavaClassName = utils.transformToJavaClassNameCase(prefix);
if (generator.props.componentsByEntityConfig) {
Expand All @@ -271,7 +299,7 @@ function getAvailableComponentsWithoutEntity(generator, previousConfiguration, p
function askForUnitaryEntityOperations(generator, done) {
const getConcernedComponents = (previousConfiguration, entityName, currentPrefix) => {
const availableComponents = [];
const allComponentChoices = componentChoices();
const allComponentChoices = componentsChoices();
const entitiesComponents = utils.extractEntitiesComponents(previousConfiguration);

if (entityName === constants.NO_ENTITY) {
Expand Down Expand Up @@ -308,10 +336,40 @@ function askForUnitaryEntityOperations(generator, done) {
when: generator.props.currentEntity,
type: 'checkbox',
name: 'currentEntityComponents',
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true),
message: 'Which components would you like to generate?',
choices: getConcernedComponents(previousConfiguration(generator), generator.props.currentEntity, generator.props.currentPrefix),
default: []
default: [],
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true)
},
{
when: response =>
response.currentEntityComponents.includes(constants.CONSUMER_COMPONENT) ||
response.currentEntityComponents.includes(constants.PRODUCER_COMPONENT),
type: 'list',
name: 'topic',
message: 'For which topic?',
choices: topicsChoices(generator, previousConfiguration(generator)),
default: constants.DEFAULT_TOPIC
},
{
when: response => response.topic === constants.CUSTOM_TOPIC,
type: 'input',
name: 'topicName',
message: 'What is the topic name?',
validate: input => {
if (_.isEmpty(input)) return 'You have to choose a topic name';

const legalChars = /^[A-Za-z0-9._]+$/gm;
if (!input.match(new RegExp(legalChars))) {
return 'You can only use alphanumeric characters, dots and underscores';
}

if (input.length > constants.TOPIC_NAME_MAX_SIZE) {
return `Your topic name cannot exceed ${constants.TOPIC_NAME_MAX_SIZE} characters`;
}

return true;
}
},
{
when: response =>
Expand Down Expand Up @@ -348,19 +406,29 @@ function askForUnitaryEntityOperations(generator, done) {
if (!generator.props.componentsByEntityConfig) {
generator.props.componentsByEntityConfig = [];
}

if (!generator.props.topics) {
generator.props.topics = [];
}

pushTopicName(generator, answers);

if (answers.currentEntityComponents && answers.currentEntityComponents.length > 0) {
if (generator.props.currentEntity === constants.NO_ENTITY) {
pushComponentsByEntity(generator, answers, utils.transformToJavaClassNameCase(generator.props.currentPrefix));
} else {
pushComponentsByEntity(generator, answers, generator.props.currentEntity);
}
}

if (answers.pollingTimeout) {
generator.props.pollingTimeout = +answers.pollingTimeout; // force conversion to int
}

if (answers.autoOffsetResetPolicy) {
generator.props.autoOffsetResetPolicy = answers.autoOffsetResetPolicy;
}

if (answers.continueAddingEntitiesComponents) {
askForIncrementalOperations(generator, done);
} else {
Expand All @@ -380,3 +448,21 @@ function pushComponentsByEntity(generator, answers, entity) {
generator.props.componentsByEntityConfig[entity] = [...answers.currentEntityComponents];
}
}

function pushTopicName(generator, answers) {
const name = generator.props.currentEntity === constants.NO_ENTITY ? generator.props.currentPrefix : generator.props.currentEntity;

if (answers.topic) {
if (answers.topic === constants.CUSTOM_TOPIC) {
generator.props.topics.push({ key: _.camelCase(name), value: answers.topicName, existingTopicName: false });
} else if (answers.topic === constants.DEFAULT_TOPIC) {
generator.props.topics.push({
key: _.camelCase(name),
value: utils.topicNamingFormat(_.snakeCase(generator.jhipsterAppConfig.baseName), _.snakeCase(name)),
existingTopicName: false
});
} else {
generator.props.topics.push({ key: _.camelCase(name), value: answers.topic, existingTopicName: true });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class <%= entityClass %>Consumer extends GenericConsumer<<%= type %>> {

private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Consumer.class);

public <%= entityClass %>Consumer(@Value("${kafka.consumer.<%= camelCaseEntityClass %>.name}") final String topicName, final KafkaProperties kafkaProperties) {
public <%= entityClass %>Consumer(@Value("${kafka.topic.<%= camelCaseEntityClass %>}") final String topicName, final KafkaProperties kafkaProperties) {
super(topicName, kafkaProperties.getConsumer().get("<%= camelCaseEntityClass %>"), kafkaProperties.getPollingTimeout());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class <%= entityClass %>Producer {

private final String topicName;

public <%= entityClass %>Producer(@Value("${kafka.producer.<%= camelCaseEntityClass %>.name}") final String topicName, final KafkaProperties kafkaProperties) {
public <%= entityClass %>Producer(@Value("${kafka.topic.<%= camelCaseEntityClass %>}") final String topicName, final KafkaProperties kafkaProperties) {
this.topicName = topicName;
this.kafkaProducer = new KafkaProducer<>(kafkaProperties.getProducer().get("<%= camelCaseEntityClass %>"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ kafka:
}
_%>
<%= _.camelCase(entity) _%>:
name: queuing.<%= snakeCaseBaseName %>.<%= _.snakeCase(entity) %>
enabled: <%= enabled %>
'[key.deserializer]': org.apache.kafka.common.serialization.StringDeserializer
'[value.deserializer]': <%= packageName %>.service.kafka.deserializer.<%= _.upperFirst(_.camelCase(entity)) %>Deserializer
Expand All @@ -31,10 +30,19 @@ _%>
}
_%>
<%= _.camelCase(entity) _%>:
name: queuing.<%= snakeCaseBaseName %>.<%= _.snakeCase(entity) %>
enabled: <%= enabled %>
'[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
'[value.serializer]': <%= packageName %>.service.kafka.serializer.<%= _.upperFirst(_.camelCase(entity)) %>Serializer
<%_ } _%>
<%_ } _%>
<%_ if (components.includes('consumer') || components.includes('producer')) { _%>
topic:
<%_ for (entity of entities) {
if (entity === 'no_entity' && componentsPrefixes.length > 0 && componentsPrefixes[0]) {
entity = componentsPrefixes[0];
}
_%>
<%= _.camelCase(entity) _%>: queuing.<%= snakeCaseBaseName %>.<%= _.snakeCase(entity) %>
<%_ } _%>
<%_ } _%>

8 changes: 7 additions & 1 deletion generators/app/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ module.exports = {
getPreviousKafkaConfiguration,
extractEntitiesComponents,
orderKafkaProperties,
transformToJavaClassNameCase
transformToJavaClassNameCase,
topicNamingFormat
};

// This is a default topic naming convention which can be updated (see also application-kafka.yml.ejs)
function topicNamingFormat(baseName, name) {
return `queuing.${baseName}.${name}`;
}

function transformToJavaClassNameCase(entityName) {
return _.upperFirst(_.camelCase(entityName));
}
Expand Down
10 changes: 9 additions & 1 deletion generators/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const NONE_OFFSET = 'none';
const DEFAULT_POLLING_TIMEOUT = '10000';
const JSON_EXTENSION = '.json';
const EMPTY_STRING = '';
const DEFAULT_TOPIC = 'default_topic';
const CUSTOM_TOPIC = 'custom_topic';

// Related to: https://github.com/apache/kafka/commit/ad3dfc6ab25c3f80d2425e24e72ae732b850dc60
const TOPIC_NAME_MAX_SIZE = 249;

const constants = {
JHIPSTER_CONFIG_DIR,
Expand All @@ -46,7 +51,10 @@ const constants = {
NONE_OFFSET,
DEFAULT_POLLING_TIMEOUT,
JSON_EXTENSION,
EMPTY_STRING
EMPTY_STRING,
DEFAULT_TOPIC,
CUSTOM_TOPIC,
TOPIC_NAME_MAX_SIZE
};

module.exports = constants;
13 changes: 11 additions & 2 deletions test/app.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -846,10 +846,19 @@ function assertMinimalProperties(applicationYml, testApplicationYml, entityName,
entityTestYmlBlock = testApplicationYml.kafka.producer[`${_.camelCase(entityName)}`];
}

assert.textEqual(entityYmlBlock.name, `queuing.message_broker_with_entities.${_.snakeCase(entityName)}`);
assert.textEqual(entityYmlBlock.enabled.toString(), 'true');
assert.textEqual(entityTestYmlBlock.name, `queuing.message_broker_with_entities.${_.snakeCase(entityName)}`);
assert.textEqual(entityTestYmlBlock.enabled.toString(), 'false');

Object.keys(applicationYml.kafka.topic).forEach(key => {
if (key === _.camelCase(entityName)) {
assert.textEqual(applicationYml.kafka.topic[key], `queuing.message_broker_with_entities.${_.snakeCase(entityName)}`);
}
});
Object.keys(testApplicationYml.kafka.topic).forEach(key => {
if (key === _.camelCase(entityName)) {
assert.textEqual(testApplicationYml.kafka.topic[key], `queuing.message_broker_with_entities.${_.snakeCase(entityName)}`);
}
});
}

function assertThatKafkaPropertiesAreOrdered(applicationYml) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class FooConsumer extends GenericConsumer<Foo> {

private final Logger log = LoggerFactory.getLogger(FooConsumer.class);

public FooConsumer(@Value("${kafka.consumer.foo.name}") final String topicName, final KafkaProperties kafkaProperties) {
public FooConsumer(@Value("${kafka.topic.foo}") final String topicName, final KafkaProperties kafkaProperties) {
super(topicName, kafkaProperties.getConsumer().get("foo"), kafkaProperties.getPollingTimeout());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class FooProducer {

private final String topicName;

public FooProducer(@Value("${kafka.producer.foo.name}") final String topicName, final KafkaProperties kafkaProperties) {
public FooProducer(@Value("${kafka.topic.foo}") final String topicName, final KafkaProperties kafkaProperties) {
this.topicName = topicName;
this.kafkaProducer = new KafkaProducer<>(kafkaProperties.getProducer().get("foo"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,19 @@ kafka:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
foo:
name: queuing.message_broker_with_entities.foo
enabled: true
'[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
'[value.serializer]': com.mycompany.myapp.service.kafka.serializer.FooSerializer
polling.timeout: 10000
consumer:
foo:
name: queuing.message_broker_with_entities.foo
enabled: true
'[key.deserializer]': org.apache.kafka.common.serialization.StringDeserializer
'[value.deserializer]': com.mycompany.myapp.service.kafka.deserializer.FooDeserializer
'[group.id]': message-broker-with-entities
'[auto.offset.reset]': earliest
topic:
foo: queuing.message_broker_with_entities.foo
# ===================================================================
# Application specific properties
# Add your own application properties here, see the ApplicationProperties class
Expand Down

0 comments on commit b1461a6

Please sign in to comment.