Skip to content

Commit

Permalink
feat: ordered messages production
Browse files Browse the repository at this point in the history
  • Loading branch information
fdelbrayelle committed Jun 19, 2020
1 parent c46090b commit 10b8cf0
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 18 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ The **END** represents the end of the prompts below, when files are written afte
<li>Producer</li>
</ul>
</li>
<li>For which topic?
<li>Which topic for (entity/prefix)?
<ul>
<li>Default topic name following this convention: message_type.application_type.entity_name <strong>(default)</strong></li>
<li>Custom topic name
<ul>
<li>What is the topic name?</li>
<li>What is the topic name for (entity/prefix)?</li>
</ul>
</li>
<li>queuing.application_name.existing_topic_name</li>
Expand All @@ -157,6 +157,7 @@ The **END** represents the end of the prompts below, when files are written afte
<li>none (throw exception to the consumer if no previous offset is found for the consumer group)</li>
</ul>
</li>
<li><em>If "Producer" was selected:</em> Do you want to send ordered messages for (entity/prefix) production? <strong>(default = Y)</strong></li>
<li>Do you want to continue adding consumers or producers? <strong>(default = N)</strong></li>
<li><em>If "N" was selected:</em> <strong>END</strong></li>
</ul>
Expand Down
22 changes: 12 additions & 10 deletions USE_CASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
3. "Do you want to clean up your current Kafka configuration?" - Your answer or 'n' by default
4. "For which entity (class name)?" - Foo (the available entities are retrieved in the `.jhipster` folder as `.json`)
5. "Which components would you like to generate?" - Consumer
6. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
6. "Which topic for Foo?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name for Foo?")
7. "What is the consumer polling timeout (in ms)?" - Your answer or '10000' by default (global for all consumers)
8. "Define the auto offset reset policy?" - Your answer or 'earliest' by default (global for all consumers)
9. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
Expand All @@ -39,10 +39,11 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
3. "Do you want to clean up your current Kafka configuration?" - Your answer or 'n' by default
4. "For which entity (class name)?" - Foo (the available entities are retrieved in the `.jhipster` folder as `.json`)
5. "Which components would you like to generate?" - Producer
6. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
7. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
8. Overwrite all files in conflict
9. `FooProducer` (produces `Foo`) is available with a `FooSerializer` and a `FooKafkaResource` to [help testing](README.md#test-consumers-and-producers)
6. "Which topic for Foo?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name for Foo?")
7. "Do you want to send ordered messages for (entity/prefix) production?" - Your answer or 'y' by default
8. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
9. Overwrite all files in conflict
10. `FooProducer` (produces `Foo`) is available with a `FooSerializer` and a `FooKafkaResource` to [help testing](README.md#test-consumers-and-producers)

### Create a consumer NOT linked to an entity

Expand All @@ -54,7 +55,7 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
4. "For which entity (class name)?" - No entity (will be typed String)
5. "How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?" - someEventType
6. "Which components would you like to generate?" - Consumer
7. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
7. "Which topic for someEventType?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name for someEventType?")
8. "What is the consumer polling timeout (in ms)?" - Your answer or '10000' by default (global for all consumers)
9. "Define the auto offset reset policy?" - Your answer or 'earliest' by default (global for all consumers)
10. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
Expand All @@ -71,7 +72,8 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
4. "For which entity (class name)?" - No entity (will be typed String)
5. "How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?" - someEventType
6. "Which components would you like to generate?" - Producer
7. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
8. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
9. Overwrite all files in conflict
10. `SomeEventTypeProducer` (produces `String`) is available with a `SomeEventTypeSerializer` and a `SomeEventTypeKafkaResource` to [help testing](README.md#test-consumers-and-producers)
7. "Which topic for someEventType?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name for someEventType?")
8. "Do you want to send ordered messages for (entity/prefix) production?" - Your answer or 'y' by default
9. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
10. Overwrite all files in conflict
11. `SomeEventTypeProducer` (produces `String`) is available with a `SomeEventTypeSerializer` and a `SomeEventTypeKafkaResource` to [help testing](README.md#test-consumers-and-producers)
3 changes: 3 additions & 0 deletions generators/app/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function initVariables(generator) {
generator.topics = generator.props.topics;
generator.pollingTimeout = generator.props.pollingTimeout;
generator.autoOffsetResetPolicy = generator.props.autoOffsetResetPolicy;
generator.entitiesOrder = generator.props.entitiesOrder;

// show all variables
generator.log('\n--- some config read from config ---');
Expand Down Expand Up @@ -152,6 +153,8 @@ function writeFiles(generator) {
generator.entityClass = entity;
generator.camelCaseEntityClass = _.camelCase(entity);
generator.type = useEntityAsType ? entity : 'String';
generator.entityOrdered = generator.props.entitiesOrder[entity];

generateSerdeFiles(generator, entity, useEntityAsType);

if (haveComponentForEntity(constants.CONSUMER_COMPONENT, entity)) {
Expand Down
3 changes: 2 additions & 1 deletion generators/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ module.exports = class extends BaseGenerator {
componentsByEntityConfig: [],
componentsPrefixes: [],
topics: [],
cleanup: false || this.options['skip-prompts']
cleanup: false || this.options['skip-prompts'],
entitiesOrder: []
};

this.setupClientOptions(this);
Expand Down
37 changes: 33 additions & 4 deletions generators/app/prompts.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ function askForEntityComponentsOperations(generator, done) {
return availableComponents;
};

let name = generator.props.currentEntity;
if (generator.props.currentEntity === constants.NO_ENTITY) {
name = generator.props.currentPrefix;
}

const unitaryEntityPrompt = [
{
when: generator.props.currentEntity,
Expand All @@ -271,15 +276,15 @@ function askForEntityComponentsOperations(generator, done) {
response.currentEntityComponents.includes(constants.PRODUCER_COMPONENT),
type: 'list',
name: 'topic',
message: 'For which topic?',
message: `Which topic for ${name}?`,
choices: topicsChoices(generator, previousConfiguration(generator, generator.props.cleanup)),
default: constants.DEFAULT_TOPIC
},
{
when: response => response.topic === constants.CUSTOM_TOPIC,
type: 'input',
name: 'topicName',
message: 'What is the topic name?',
message: `What is the topic name for ${name}?`,
validate: input => validateTopic(input)
},
{
Expand All @@ -304,6 +309,13 @@ function askForEntityComponentsOperations(generator, done) {
choices: offsetChoices(),
default: constants.EARLIEST_OFFSET
},
{
when: response => response.currentEntityComponents.includes(constants.PRODUCER_COMPONENT),
type: 'confirm',
name: 'sendOrderedMessages',
message: `Do you want to send ordered messages for ${name} production?`,
default: true
},
{
type: 'confirm',
name: 'continueAddingEntitiesComponents',
Expand All @@ -322,8 +334,6 @@ function askForEntityComponentsOperations(generator, done) {
generator.props.topics = [];
}

pushTopicName(generator, answers.topic, answers.topicName);

if (answers.currentEntityComponents && answers.currentEntityComponents.length > 0) {
if (generator.props.currentEntity === constants.NO_ENTITY) {
pushComponentsByEntity(
Expand All @@ -336,6 +346,8 @@ function askForEntityComponentsOperations(generator, done) {
}
}

pushTopicName(generator, answers.topic, answers.topicName);

if (answers.pollingTimeout) {
generator.props.pollingTimeout = +answers.pollingTimeout; // force conversion to int
}
Expand All @@ -344,6 +356,18 @@ function askForEntityComponentsOperations(generator, done) {
generator.props.autoOffsetResetPolicy = answers.autoOffsetResetPolicy;
}

if (answers.sendOrderedMessages) {
if (generator.props.currentEntity === constants.NO_ENTITY) {
pushEntitiesOrder(
generator,
utils.transformToJavaClassNameCase(generator.props.currentPrefix),
answers.sendOrderedMessages
);
} else {
pushEntitiesOrder(generator, generator.props.currentEntity, answers.sendOrderedMessages);
}
}

if (answers.continueAddingEntitiesComponents) {
askForEntityOperations(generator, done);
} else {
Expand All @@ -355,6 +379,11 @@ function askForEntityComponentsOperations(generator, done) {
});
}

function pushEntitiesOrder(generator, name, sendOrderedMessages) {
generator.props.entitiesOrder.push(name);
generator.props.entitiesOrder[name] = sendOrderedMessages;
}

function pushComponentsByEntity(generator, currentEntityComponents, entity) {
generator.props.componentsByEntityConfig.push(entity);
if (generator.props.componentsByEntityConfig[entity]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import <%= packageName %>.domain.<%= entityClass %>;
@Service
public class <%= entityClass %>Producer {

<%_ if (entityOrdered) { _%>
enum <%= entityClass %>ProducerKey {
<%= _.toUpper(entityClass) %>_CREATE, <%= _.toUpper(entityClass) %>_READ, <%= _.toUpper(entityClass) %>_UPDATE, <%= _.toUpper(entityClass) %>_DELETE
}
<%_ } _%>
private final Logger log = LoggerFactory.getLogger(<%= entityClass %>Producer.class);

private final KafkaProducer<String, <%= type %>> kafkaProducer;
Expand All @@ -51,7 +57,11 @@ public class <%= entityClass %>Producer {
}

public void send(final <%= type %> message) {
<%_ if (entityOrdered) { _%>
final ProducerRecord<String, <%= type %>> record = new ProducerRecord<>(topicName, <%= entityClass %>ProducerKey.<%= _.toUpper(entityClass) %>_READ.toString(), message);
<%_ } else { _%>
final ProducerRecord<String, <%= type %>> record = new ProducerRecord<>(topicName, message);
<%_ } _%>
try {
log.info("Sending asynchronously a <%= type %> record to topic: '" + topicName + "'");
kafkaProducer.send(record);
Expand Down
34 changes: 33 additions & 1 deletion test/app.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('JHipster generator kafka', () => {
});
});

describe('with only a producer for a single entity', () => {
describe('with only a producer for a single entity (ordered)', () => {
before(done => {
helpers
.run(path.join(__dirname, '../generators/app'))
Expand All @@ -84,6 +84,7 @@ describe('JHipster generator kafka', () => {
.withPrompts({
currentEntity: FOO_ENTITY,
currentEntityComponents: [constants.PRODUCER_COMPONENT],
sendOrderedMessages: true,
continueAddingEntitiesComponents: false
})
.on('end', done);
Expand Down Expand Up @@ -129,6 +130,37 @@ describe('JHipster generator kafka', () => {
const entityTestYmlConsumerBlock = testApplicationYml.kafka.consumer;
assert.strictEqual(entityTestYmlConsumerBlock, undefined);
});

it('should send ordered messages', () => {
assert.fileContent(
`${jhipsterConstants.SERVER_MAIN_SRC_DIR}com/mycompany/myapp/service/kafka/producer/${FOO_ENTITY}Producer.java`,
new RegExp(`enum ${FOO_ENTITY}ProducerKey`, 'g')
);
});
});

describe('with only a producer for a single entity (not ordered)', () => {
before(done => {
helpers
.run(path.join(__dirname, '../generators/app'))
.inTmpDir(dir => {
fse.copySync(path.join(__dirname, '../test/templates/message-broker-with-entities-1st-call'), dir);
})
.withPrompts({
currentEntity: FOO_ENTITY,
currentEntityComponents: [constants.PRODUCER_COMPONENT],
sendOrderedMessages: false,
continueAddingEntitiesComponents: false
})
.on('end', done);
});

it('should not send ordered messages', () => {
assert.noFileContent(
`${jhipsterConstants.SERVER_MAIN_SRC_DIR}com/mycompany/myapp/service/kafka/producer/${FOO_ENTITY}Producer.java`,
new RegExp(`enum ${FOO_ENTITY}ProducerKey`, 'g')
);
});
});

describe('with only a consumer for a single entity', () => {
Expand Down

0 comments on commit 10b8cf0

Please sign in to comment.