Skip to content

Commit

Permalink
feat: topic management
Browse files Browse the repository at this point in the history
  • Loading branch information
fdelbrayelle committed Jun 11, 2020
1 parent e76ba89 commit 659d4ed
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 126 deletions.
159 changes: 79 additions & 80 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
- [x] Basic [Consumer/Producer API use](#consumer-api-and-producer-api)
- [x] Several [prompt options](#prompt-options-tree) (`polling.timeout`, `auto.offset.reset.policy`)
- [x] [AKHQ (KafkaHQ)](#akhq) support
- [x] Topic management

## 🛠 To do or doing...

You can have more details about work in progress in [issues](https://github.com/fdelbrayelle/generator-jhipster-kafka/issues):

- [ ] Topic management
- [ ] Producer API (ordered messages, high throughput...)
- [ ] Deserialization alternatives (JacksonSerde) as a prompt option
- [ ] Security (SSL protocol as a prompt option, safe mode...)
Expand Down Expand Up @@ -112,85 +112,84 @@ Choose your own adventure module!

The **END** represents the end of the prompts below, when files are written after confirmation (you can use the `--force` option with `yo jhipster-kafka` to overwrite all files).

```
.
├── Big Bang Mode (build a configuration from scratch) (default)
│ ├── Consumer
│ │ ├── No entity (will be typed String) (default)
│ │ │ └── componentPrefix
│ │ │ └── pollingTimeoutValue (default = 10000)
│ │ │ ├── earliest (automatically reset the offset to the earliest offset) (default)
│ │ │ │ └── END
│ │ │ ├── latest (automatically reset the offset to the latest offset)
│ │ │ │ └── END
│ │ │ └── none (throw exception to the consumer if no previous offset is found for the consumer group)
│ │ │ └── END
│ │ ├── FooEntity
│ │ │ └── pollingTimeoutValue (default = 10000)
│ │ │ ├── earliest (automatically reset the offset to the earliest offset) (default)
│ │ │ │ └── END
│ │ │ ├── latest (automatically reset the offset to the latest offset)
│ │ │ │ └── END
│ │ │ └── none (throw exception to the consumer if no previous offset is found for the consumer group)
│ │ │ └── END
│ │ └── BarEntity
│ │ └── pollingTimeoutValue (default = 10000)
│ │ ├── earliest (automatically reset the offset to the earliest offset) (default)
│ │ ├── latest (automatically reset the offset to the latest offset)
│ │ └── none (throw exception to the consumer if no previous offset is found for the consumer group)
│ └── Producer
│ ├── No entity (will be typed String)
│ │ └── componentPrefix
│ │ └── END
│ ├── FooEntity
│ │ └── END
│ └── BarEntity
│ └── END
└── Incremental Mode (upgrade an existing configuration)
├── No entity (will be typed String) (default)
│ └── componentPrefix
│ ├── Consumer
│ │ └── pollingTimeoutValue (default = 10000)
│ │ ├── earliest (automatically reset the offset to the earliest offset) (default)
│ │ │ ├── Continue adding consumers or producers (default = N)
│ │ │ └── END
│ │ ├── latest (automatically reset the offset to the latest offset)
│ │ │ ├── Continue adding consumers or producers (default = N)
│ │ │ └── END
│ │ └── none (throw exception to the consumer if no previous offset is found for the consumer group)
│ │ ├── Continue adding consumers or producers (default = N)
│ │ └── END
│ └── Producer
│ └── END
├── FooEntity
│ ├── Consumer
│ │ └── pollingTimeoutValue (default = 10000)
│ │ ├── earliest (automatically reset the offset to the earliest offset) (default)
│ │ │ ├── Continue adding consumers or producers (default = N)
│ │ │ └── END
│ │ ├── latest (automatically reset the offset to the latest offset)
│ │ │ ├── Continue adding consumers or producers (default = N)
│ │ │ └── END
│ │ └── none (throw exception to the consumer if no previous offset is found for the consumer group)
│ │ ├── Continue adding consumers or producers (default = N)
│ │ └── END
│ └── Producer
│ └── END
└── BarEntity
├── Consumer
│ └── pollingTimeoutValue (default = 10000)
│ ├── earliest (automatically reset the offset to the earliest offset) (default)
│ │ ├── Continue adding consumers or producers (default = N)
│ │ └── END
│ ├── latest (automatically reset the offset to the latest offset)
│ │ ├── Continue adding consumers or producers (default = N)
│ │ └── END
│ └── none (throw exception to the consumer if no previous offset is found for the consumer group)
│ ├── Continue adding consumers or producers (default = N)
│ └── END
└── Producer
└── END
```
<ul>
<li>Big Bang Mode (build a configuration from scratch) <strong>(default)</strong>
<ul>
<li>Which components would you like to generate?
<ul>
<li>Consumer</li>
<li>Producer</li>
</ul>
</li>
<li>For which entity (class name)?
<ul>
<li>No entity (will be typed String) <strong>(default)</strong>
<ul>
<li>How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?</li>
</ul>
</li>
<li>Foo</li>
<li>Bar</li>
<li>...</li>
</ul>
</li>
<li><em>If "Consumer" was selected:</em> What is the consumer polling timeout (in ms)? <strong>(default = 10000)</strong></li>
<li><em>If "Consumer" was selected:</em> Define the auto offset reset policy (what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server)?
<ul>
<li>earliest (automatically reset the offset to the earliest offset) <strong>(default)</strong></li>
<li>latest (automatically reset the offset to the latest offset)</li>
<li>none (throw exception to the consumer if no previous offset is found for the consumer group)</li>
</ul>
</li>
<li><strong>END</strong></li>
</ul>
</li>
<li>Incremental Mode (upgrade an existing configuration)
<ul>
<li>For which entity (class name)?
<ul>
<li>No entity (will be typed String) <strong>(default)</strong>
<ul>
<li>How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?</li>
</ul>
</li>
<li>Foo</li>
<li>Bar</li>
<li>...</li>
</ul>
</li>
<li>Which components would you like to generate?
<ul>
<li>Consumer</li>
<li>Producer</li>
</ul>
</li>
<li>For which topic?
<ul>
<li>Default topic name following this convention: message_type.application_type.entity_name <strong>(default)</strong></li>
<li>Custom topic name
<ul>
<li>What is the topic name?</li>
</ul>
</li>
<li>queuing.application_name.existing_topic_name</li>
<li>...</li>
</ul>
</li>
<li><em>If "Consumer" was selected:</em> What is the consumer polling timeout (in ms)? <strong>(default = 10000)</strong></li>
<li><em>If "Consumer" was selected:</em> Define the auto offset reset policy (what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server)?
<ul>
<li>earliest (automatically reset the offset to the earliest offset) <strong>(default)</strong></li>
<li>latest (automatically reset the offset to the latest offset)</li>
<li>none (throw exception to the consumer if no previous offset is found for the consumer group)</li>
</ul>
</li>
<li>Do you want to continue adding consumers or producers? <strong>(default = N)</strong></li>
<li><em>If "N" was selected:</em> <strong>END</strong></li>
</ul>
</li>
</ul>
</code>

## Skip prompts

Expand Down
36 changes: 20 additions & 16 deletions USE_CASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
3. "Which type of generation do you want?" - Incremental Mode (upgrade an existing configuration)
4. "For which entity (class name)?" - Foo (the available entities are retrieved in the `.jhipster` folder as `.json`)
5. "Which components would you like to generate?" - Consumer
6. "What is the consumer polling timeout (in ms)?" - Your answer or '10000' by default (global for all consumers)
7. "Define the auto offset reset policy?" - Your answer or 'earliest' by default (global for all consumers)
8. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
9. Overwrite all files in conflict
10. `FooConsumer` (consumes `Foo`) is available with a `FooDeserializer`
6. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
7. "What is the consumer polling timeout (in ms)?" - Your answer or '10000' by default (global for all consumers)
8. "Define the auto offset reset policy?" - Your answer or 'earliest' by default (global for all consumers)
9. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
10. Overwrite all files in conflict
11. `FooConsumer` (consumes `Foo`) is available with a `FooDeserializer`

### Create a producer linked to an entity

Expand All @@ -66,9 +67,10 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
3. "Which type of generation do you want?" - Incremental Mode (upgrade an existing configuration)
4. "For which entity (class name)?" - Foo (the available entities are retrieved in the `.jhipster` folder as `.json`)
5. "Which components would you like to generate?" - Producer
6. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
7. Overwrite all files in conflict
8. `FooProducer` (produces `Foo`) is available with a `FooSerializer` and a `FooKafkaResource` to [help testing](README.md#test-consumers-and-producers)
6. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
7. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
8. Overwrite all files in conflict
9. `FooProducer` (produces `Foo`) is available with a `FooSerializer` and a `FooKafkaResource` to [help testing](README.md#test-consumers-and-producers)

### Create a consumer NOT linked to an entity

Expand All @@ -95,11 +97,12 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
4. "For which entity (class name)?" - No entity (will be typed String)
5. "How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?" - someEventType
6. "Which components would you like to generate?" - Consumer
7. "What is the consumer polling timeout (in ms)?" - Your answer or '10000' by default (global for all consumers)
8. "Define the auto offset reset policy?" - Your answer or 'earliest' by default (global for all consumers)
9. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
10. Overwrite all files in conflict
11. `SomeEventTypeConsumer` (consumes `String`) is available with a `SomeEventTypeDeserializer`
7. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
8. "What is the consumer polling timeout (in ms)?" - Your answer or '10000' by default (global for all consumers)
9. "Define the auto offset reset policy?" - Your answer or 'earliest' by default (global for all consumers)
10. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
11. Overwrite all files in conflict
12. `SomeEventTypeConsumer` (consumes `String`) is available with a `SomeEventTypeDeserializer`

### Create a producer NOT linked to an entity

Expand All @@ -124,6 +127,7 @@ After following the first 3 steps of the [basic usage](README.md#basic-usage) ab
4. "For which entity (class name)?" - No entity (will be typed String)
5. "How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?" - someEventType
6. "Which components would you like to generate?" - Producer
7. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
8. Overwrite all files in conflict
9. `SomeEventTypeProducer` (produces `String`) is available with a `SomeEventTypeSerializer` and a `SomeEventTypeKafkaResource` to [help testing](README.md#test-consumers-and-producers)
7. "For which topic?" - Any choice (choosing "Custom topic name" will add another question "What is the topic name?")
8. "Do you want to continue adding consumers or producers?" - Your answer or 'N' par default
9. Overwrite all files in conflict
10. `SomeEventTypeProducer` (produces `String`) is available with a `SomeEventTypeSerializer` and a `SomeEventTypeKafkaResource` to [help testing](README.md#test-consumers-and-producers)
25 changes: 17 additions & 8 deletions generators/app/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ module.exports = {
writeFiles
};

// This is a default topic naming convention which can be updated (see also application-kafka.yml.ejs)
const topicNamingFormat = (generator, entity) => `queuing.${generator.snakeCaseBaseName}.${_.snakeCase(entity)}`;

function initVariables(generator) {
const vavrVersion = '0.10.3';
generator.addMavenProperty('vavr.version', vavrVersion);
Expand Down Expand Up @@ -47,6 +44,7 @@ function initVariables(generator) {
generator.componentsPrefixes = generator.props.componentsPrefixes || [];
generator.components = generator.props.components;
generator.componentsByEntityConfig = generator.props.componentsByEntityConfig || [];
generator.topics = generator.props.topics;
generator.pollingTimeout = generator.props.pollingTimeout;
generator.autoOffsetResetPolicy = generator.props.autoOffsetResetPolicy;

Expand Down Expand Up @@ -320,6 +318,19 @@ function writeFiles(generator) {
writeProperties(kafkaPreviousConfiguration, kafkaPreviousTestConfiguration, utils.transformToJavaClassNameCase(prefix));
});

if (!kafkaPreviousConfiguration.kafka.topic) {
kafkaPreviousConfiguration.kafka.topic = {};
}

if (!kafkaPreviousTestConfiguration.kafka.topic) {
kafkaPreviousTestConfiguration.kafka.topic = {};
}

generator.topics.forEach(topic => {
kafkaPreviousConfiguration.kafka.topic[topic.key] = topic.value;
kafkaPreviousTestConfiguration.kafka.topic[topic.key] = topic.value;
});

const kafkaProperties = jsYaml.dump(utils.orderKafkaProperties(kafkaPreviousConfiguration), {
lineWidth: -1
});
Expand Down Expand Up @@ -356,7 +367,6 @@ function writeFiles(generator) {

function buildJsonConsumerConfiguration(generator, entity, enabled) {
return {
name: topicNamingFormat(generator, entity),
enabled,
'[key.deserializer]': 'org.apache.kafka.common.serialization.StringDeserializer',
'[value.deserializer]': `${generator.packageName}.service.kafka.deserializer.${entity}Deserializer`,
Expand All @@ -367,16 +377,15 @@ function buildJsonConsumerConfiguration(generator, entity, enabled) {

function buildJsonProducerConfiguration(generator, entity, enabled) {
return {
name: topicNamingFormat(generator, entity),
enabled,
'[key.serializer]': 'org.apache.kafka.common.serialization.StringSerializer',
'[value.serializer]': `${generator.packageName}.service.kafka.serializer.${entity}Serializer`
};
}
function sanitizeProperties(jsyamlGeneratedProperties) {
// related to https://github.com/nodeca/js-yaml/issues/470
// Related to: https://github.com/nodeca/js-yaml/issues/470
const patternContainingSingleQuote = /^(\s.+)(:[ ]+)('((.+:)+.*)')$/gm;
// related to https://github.com/nodeca/js-yaml/issues/478
// Related to: https://github.com/nodeca/js-yaml/issues/478
const patternNullGeneratedValue = /^(\s.+)(:)([ ]+null.*)$/gm;
return jsyamlGeneratedProperties.replace(patternContainingSingleQuote, '$1$2$4').replace(patternNullGeneratedValue, '$1$2');
}
Expand Down Expand Up @@ -407,7 +416,7 @@ function registerToEntityPostHook(generator) {
'entity',
'post',
'entity',
'A JHipster module to generate Apache Kafka consumers and producers.'
'A JHipster module that generates Apache Kafka consumers and producers and more!'
);
} catch (e) {
generator.log(`${chalk.red.bold('WARN!')} Could not register as a jhipster entity post creation hook...\n`, e);
Expand Down
5 changes: 3 additions & 2 deletions generators/app/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ module.exports = class extends BaseGenerator {
defaults: false
});

// init props
// props used for writing
this.props = {
entities: [],
components: [],
componentsByEntityConfig: [],
componentsPrefixes: []
componentsPrefixes: [],
topics: []
};

this.setupClientOptions(this);
Expand Down
Loading

0 comments on commit 659d4ed

Please sign in to comment.