Skip to content

Commit

Permalink
add code of testing template
Browse files Browse the repository at this point in the history
  • Loading branch information
monxxi committed Jun 15, 2020
1 parent 1138e71 commit fe71c17
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 147 deletions.
2 changes: 1 addition & 1 deletion generators/app/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ function writeFiles(generator) {
}
};

if (generator.props.cleanup) {
if (isCleanup(generator)) {
shelljs.rm('-rf', `${generator.javaDir}service/kafka/`, `${generator.javaDir}web/rest/kafka/`);
}

Expand Down
133 changes: 3 additions & 130 deletions generators/app/prompts.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ function askForOperations(generator) {
name: 'cleanup',
message: 'Do you want to clean up your current Kafka configuration?',
default: false,
validate: input => (_.lowerCase(input) !== 'o' && _.lowerCase(input) !== 'n' ? 'Please enter Y or N' : true)
validate: input => (_.lowerCase(input) !== 'y' && _.lowerCase(input) !== 'n' ? 'Please enter Y or N' : true)
}
];

Expand All @@ -125,134 +125,7 @@ function askForOperations(generator) {
done();
}
}
/*
function askForBigBangOperations(generator, done) {
const bigbangPrompt = [
{
when: !generator.options['skip-prompts'],
type: 'checkbox',
name: 'components',
message: 'Which components would you like to generate?',
choices: componentsChoices(),
default: [],
validate: input => (_.isEmpty(input) ? 'You have to choose at least one component' : true)
},
{
when: response =>
response.components.includes(constants.CONSUMER_COMPONENT) || response.components.includes(constants.PRODUCER_COMPONENT),
type: 'checkbox',
name: 'entities',
message: 'For which entity (class name)?',
choices: entitiesChoices(generator),
default: constants.NO_ENTITY,
validate: input => (_.isEmpty(input) ? 'You have to choose at least one option' : true)
},
{
when: response => response.entities.includes(constants.NO_ENTITY),
type: 'input',
name: 'componentPrefix',
message: 'How would you prefix your objects (no entity, for instance: [SomeEventType]Consumer|Producer...)?',
validate: input => {
if (_.isEmpty(input)) return 'Please enter a value';
if (entitiesChoices(generator).find(entity => entity.name === utils.transformToJavaClassNameCase(input))) {
return 'This name is already taken by an entity generated with JHipster';
}
return true;
}
},
{
when: response => response.components.includes(constants.CONSUMER_COMPONENT),
type: 'number',
name: 'pollingTimeout',
message: 'What is the consumer polling timeout (in ms)?',
default: constants.DEFAULT_POLLING_TIMEOUT,
validate: input => (isNaN(input) ? 'Please enter a number' : true)
},
{
when: response => response.components.includes(constants.CONSUMER_COMPONENT),
type: 'list',
name: 'autoOffsetResetPolicy',
message:
'Define the auto offset reset policy (what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server)?',
choices: offsetChoices(),
default: constants.EARLIEST_OFFSET
}
];
if (generator.options['skip-prompts']) {
generator.props = _.merge(generator.props, bigbangPrompt.map(prompt => prompt.default));
done();
return;
}
generator.prompt(bigbangPrompt).then(answers => {
if (answers.componentPrefix) {
generator.props.componentsPrefixes.push(answers.componentPrefix);
}
generator.props = _.merge(generator.props, answers);
askForBigBangEntityOperations(generator, answers, done);
});
}
function askForBigBangEntityOperations(generator, answers, done, entityIndex = 0) {
let name = answers.entities[entityIndex];
if (answers.entities[entityIndex] === constants.NO_ENTITY) {
name = answers.componentPrefix;
}
const bigbangEntityPrompt = [
{
when: answers.components.includes(constants.CONSUMER_COMPONENT) || answers.components.includes(constants.PRODUCER_COMPONENT),
type: 'list',
name: 'topic',
message: `Which topic for ${name}?`,
choices: topicsChoices(generator, null),
default: constants.DEFAULT_TOPIC
},
{
when: response => response.topic === constants.CUSTOM_TOPIC,
type: 'input',
name: 'topicName',
message: `What is the topic name for ${name}?`,
validate: input => validateTopic(input)
},
{
when: entityIndex < answers.entities.length - 1,
type: 'confirm',
name: 'confirmBigBangEntityOperations',
message: 'Do you want to continue to the next entity/prefix or exit?',
default: true
}
];
generator.prompt(bigbangEntityPrompt).then(subAnswers => {
if (!generator.props.topics) {
generator.props.topics = [];
}
generator.props.currentEntity = answers.entities[entityIndex];
if (generator.props.currentEntity === constants.NO_ENTITY) {
generator.props.currentPrefix = answers.componentPrefix;
}
pushTopicName(generator, subAnswers.topic, subAnswers.topicName);
generator.props = _.merge(generator.props, subAnswers);
if (entityIndex === answers.entities.length - 1) {
done();
}
if (subAnswers.confirmBigBangEntityOperations) {
askForBigBangEntityOperations(generator, answers, done, ++entityIndex);
}
});
}

*/
function askForEntityOperations(generator, done) {
const getConcernedEntities = previousConfiguration => {
const allEntities = entitiesChoices(generator);
Expand Down Expand Up @@ -314,7 +187,7 @@ function askForEntityOperations(generator, done) {
if (answers.currentPrefix && !generator.props.componentsPrefixes.includes(answers.currentPrefix)) {
generator.props.componentsPrefixes.push(answers.currentPrefix);
}
askForComponentsEntityOperation(generator, done);
askForEntityComponentsOperations(generator, done);
} else {
done();
}
Expand Down Expand Up @@ -344,7 +217,7 @@ function getAvailableComponentsWithoutEntity(generator, previousConfiguration, p
return availableComponents;
}

function askForComponentsEntityOperation(generator, done) {
function askForEntityComponentsOperations(generator, done) {
const getConcernedComponents = (previousConfiguration, entityName, currentPrefix) => {
const availableComponents = [];
const allComponentChoices = componentsChoices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ public class KafkaProperties {
@PostConstruct
public void init() {

for (String consumerKey: consumer.keySet()) {
for (String consumerKey : consumer.keySet()) {
final Map<String, Object> properties = consumer.get(consumerKey);
if (! properties.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
if (!properties.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
}

for (String consumerKey: producer.keySet()) {
for (String consumerKey : producer.keySet()) {
final Map<String, Object> properties = producer.get(consumerKey);
if (! properties.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
if (!properties.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void init() {
public void send(final Foo message) {
final ProducerRecord<String, Foo> record = new ProducerRecord<>(topicName, message);
try {
log.info("Sending asynchronously a Foo record to topic: '" + topicName + "'");
kafkaProducer.send(record);
} catch (final Exception e) {
log.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mycompany.myapp.domain.Foo;

import java.io.IOException;

import com.mycompany.myapp.domain.Foo;

public class FooDeserializer implements Deserializer<Either<DeserializationError, Foo>> {

private final Logger log = LoggerFactory.getLogger(FooDeserializer.class);

private final ObjectMapper objectMapper;

private String encoding = "UTF8";

public FooDeserializer() {
this.objectMapper =
new ObjectMapper()
Expand All @@ -38,7 +40,8 @@ public FooDeserializer() {
@Override
public Either<DeserializationError, Foo> deserialize(final String topicName, final byte[] data) {
try {
return Either.right(objectMapper.readValue(data, Foo.class));
final Foo value = objectMapper.readValue(data, Foo.class);
return Either.right(value);
} catch (final IOException e) {
return Either.left(new DeserializationError(data, e));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.mycompany.myapp.service.kafka.serde;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

import com.mycompany.myapp.service.kafka.serde.FooSerializer;
import com.mycompany.myapp.service.kafka.serde.FooDeserializer;

public class FooSerde<Foo> implements Serde<Foo> {
private final Serializer serializer = new FooSerializer();
private final Deserializer deserializer = new FooDeserializer();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.serializer.configure(configs, isKey);
this.deserializer.configure(configs, isKey);
}

@Override
public void close() {
this.serializer.close();
this.deserializer.close();
}

@Override
public Serializer<Foo> serializer() {
return serializer;
}

@Override
public Deserializer<Foo> deserializer() {
return deserializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mycompany.myapp.domain.Foo;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

import com.mycompany.myapp.domain.Foo;

public class FooSerializer implements Serializer<Foo> {

private final Logger log = LoggerFactory.getLogger(FooSerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,21 @@ jhipster:

kafka:
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
foo:
enabled: false
'[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
'[value.serializer]': com.mycompany.myapp.service.kafka.serde.FooSerializer
polling.timeout: 10000
consumer:
foo:
name: queuing.message_broker_with_entities.foo
enabled: false
'[key.deserializer]': org.apache.kafka.common.serialization.StringDeserializer
'[value.deserializer]': com.mycompany.myapp.service.kafka.serde.FooDeserializer
'[group.id]': message-broker-with-entities
'[auto.offset.reset]': earliest
producer:
foo:
name: queuing.message_broker_with_entities.foo
enabled: false
'[key.serializer]': org.apache.kafka.common.serialization.StringSerializer
'[value.serializer]': com.mycompany.myapp.service.kafka.serde.FooSerializer
topic:
foo: queuing.message_broker_with_entities.foo
# ===================================================================
# Application specific properties
# Add your own application properties here, see the ApplicationProperties class
Expand Down

0 comments on commit fe71c17

Please sign in to comment.