diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java index 2cd43ddc31..b585467cb9 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/ConfigResolver.java @@ -1,9 +1,11 @@ package io.smallrye.reactive.messaging.pulsar; import static io.smallrye.reactive.messaging.providers.helpers.CDIUtils.getInstanceById; +import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging.log; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import jakarta.enterprise.context.ApplicationScoped; @@ -11,10 +13,18 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RedeliveryBackoff; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -30,6 +40,7 @@ import io.smallrye.reactive.messaging.ClientCustomizer; import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils; +import io.smallrye.reactive.messaging.providers.helpers.Validation; import io.vertx.core.json.JsonObject; /** @@ -110,6 +121,33 @@ public ClientBuilderImpl customize(ClientBuilderImpl builder, PulsarConnectorCom return (ClientBuilderImpl) ConfigUtils.customize(cc.config(), clientConfigCustomizers, builder); } + public ClientBuilderImpl configure(PulsarConnectorCommonConfiguration cc, ClientConfigurationData conf) + throws PulsarClientException { + setAuth(conf); + return customize(new ClientBuilderImpl(conf), cc); + } + + /** + * Sets the authentication object in the given configuration object using + * `authPluginClassName` and `authParams`/`authParamMap` attributes + * This use to be done by the PulsarClientImpl + * + * @param conf client configuration + * @throws PulsarClientException + */ + private void setAuth(ClientConfigurationData conf) throws PulsarClientException { + if (Validation.isBlank(conf.getAuthPluginClassName()) + || (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) { + return; + } + + if (!Validation.isBlank(conf.getAuthParams())) { + conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams())); + } else if (conf.getAuthParamMap() != null) { + conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap())); + } + } + /** * Extract the configuration map for building Pulsar consumer * @@ -129,6 +167,62 @@ public ConsumerBuilder customize(ConsumerBuilder builder, PulsarConnec return (ConsumerBuilder) ConfigUtils.customize(ic.config(), consumerConfigCustomizers, builder); } + public ConsumerBuilder configure(ConsumerBuilder builder, + PulsarConnectorIncomingConfiguration ic, + ConsumerConfigurationData conf) { + builder.loadConf(configToMap(conf)); + ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i))); + ic.getNegativeAckRedeliveryBackoff() + .ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s, ic.getChannel()))); + ic.getAckTimeoutRedeliveryBackoff() + .ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s, ic.getChannel()))); + if (conf.getConsumerEventListener() != null) { + builder.consumerEventListener(conf.getConsumerEventListener()); + } + if (conf.getPayloadProcessor() != null) { + builder.messagePayloadProcessor(conf.getPayloadProcessor()); + } + if (conf.getKeySharedPolicy() != null) { + builder.keySharedPolicy(conf.getKeySharedPolicy()); + } else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) { + builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); + } + if (conf.getCryptoKeyReader() != null) { + builder.cryptoKeyReader(conf.getCryptoKeyReader()); + } + if (conf.getMessageCrypto() != null) { + builder.messageCrypto(conf.getMessageCrypto()); + } + if (ic.getBatchReceive()) { + builder.batchReceivePolicy( + Objects.requireNonNullElse(conf.getBatchReceivePolicy(), BatchReceivePolicy.DEFAULT_POLICY)); + } + return customize(builder, ic); + } + + private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) { + return DeadLetterPolicy.builder() + .maxRedeliverCount(redeliverCount) + .deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null)) + .retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null)) + .initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null)) + .build(); + } + + private RedeliveryBackoff parseBackoff(String backoffString, String channel) { + String[] strings = backoffString.split(","); + try { + return MultiplierRedeliveryBackoff.builder() + .minDelayMs(Long.parseLong(strings[0])) + .maxDelayMs(Long.parseLong(strings[1])) + .multiplier(Double.parseDouble(strings[2])) + .build(); + } catch (Exception e) { + log.unableToParseRedeliveryBackoff(backoffString, channel); + return null; + } + } + /** * Extract the configuration map for building Pulsar producer * @@ -148,6 +242,25 @@ public ProducerBuilder customize(ProducerBuilder builder, PulsarConnec return (ProducerBuilder) ConfigUtils.customize(oc.config(), producerConfigCustomizers, builder); } + public ProducerBuilder configure(ProducerBuilder builder, + PulsarConnectorOutgoingConfiguration oc, + ProducerConfigurationData conf) { + builder.loadConf(configToMap(conf)); + if (conf.getCustomMessageRouter() != null) { + builder.messageRouter(conf.getCustomMessageRouter()); + } + if (conf.getBatcherBuilder() != null) { + builder.batcherBuilder(conf.getBatcherBuilder()); + } + if (conf.getCryptoKeyReader() != null) { + builder.cryptoKeyReader(conf.getCryptoKeyReader()); + } + for (String encryptionKey : conf.getEncryptionKeys()) { + builder.addEncryptionKey(encryptionKey); + } + return customize(builder, oc); + } + private Map mergeMap(Map defaultConfig, Map channelConfig) { Map map = new HashMap<>(defaultConfig); map.putAll(channelConfig); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java index 408ab39a29..2983a6ef1a 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarConnector.java @@ -21,12 +21,10 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.eclipse.microprofile.config.Config; @@ -41,7 +39,6 @@ import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; -import io.smallrye.reactive.messaging.providers.helpers.Validation; import io.vertx.mutiny.core.Vertx; @ApplicationScoped @@ -166,36 +163,14 @@ public void terminate( private PulsarClientImpl createPulsarClient(PulsarConnectorCommonConfiguration cc, ClientConfigurationData configuration) { try { - setAuth(configuration); - log.createdClientWithConfig(configuration); - ClientBuilderImpl customized = configResolver.customize(new ClientBuilderImpl(configuration), cc); - return new PulsarClientImpl(customized.getClientConfigurationData(), vertx.nettyEventLoopGroup()); + ClientConfigurationData data = configResolver.configure(cc, configuration).getClientConfigurationData(); + log.createdClientWithConfig(data); + return new PulsarClientImpl(data, vertx.nettyEventLoopGroup()); } catch (PulsarClientException e) { throw ex.illegalStateUnableToBuildClient(e); } } - /** - * Sets the authentication object in the given configuration object using - * `authPluginClassName` and `authParams`/`authParamMap` attributes - * This use to be done by the PulsarClientImpl - * - * @param conf client configuration - * @throws PulsarClientException - */ - private void setAuth(ClientConfigurationData conf) throws PulsarClientException { - if (Validation.isBlank(conf.getAuthPluginClassName()) - || (Validation.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) { - return; - } - - if (!Validation.isBlank(conf.getAuthParams())) { - conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParams())); - } else if (conf.getAuthParamMap() != null) { - conf.setAuthentication(AuthenticationFactory.create(conf.getAuthPluginClassName(), conf.getAuthParamMap())); - } - } - public PulsarClient getClient(String channel) { return clientsByChannel.get(channel); } diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index b61f88f7b6..4121f0865c 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -13,7 +13,6 @@ import jakarta.enterprise.inject.Instance; import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; @@ -62,7 +61,6 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, this.channel = ic.getChannel(); this.healthEnabled = ic.getHealthEnabled(); this.tracingEnabled = ic.getTracingEnabled(); - ConsumerBuilder builder = client.newConsumer(schema); ConsumerConfigurationData conf = configResolver.getConsumerConf(ic); if (conf.getSubscriptionName() == null) { String s = UUID.randomUUID().toString(); @@ -75,32 +73,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, if (conf.getConsumerName() == null) { conf.setConsumerName(channel); } - builder.loadConf(configResolver.configToMap(conf)); - ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(getDeadLetterPolicy(ic, i))); - ic.getNegativeAckRedeliveryBackoff().ifPresent(s -> builder.negativeAckRedeliveryBackoff(parseBackoff(s))); - ic.getAckTimeoutRedeliveryBackoff().ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(parseBackoff(s))); - if (conf.getConsumerEventListener() != null) { - builder.consumerEventListener(conf.getConsumerEventListener()); - } - if (conf.getPayloadProcessor() != null) { - builder.messagePayloadProcessor(conf.getPayloadProcessor()); - } - if (conf.getKeySharedPolicy() != null) { - builder.keySharedPolicy(conf.getKeySharedPolicy()); - } else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) { - builder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); - } - if (conf.getCryptoKeyReader() != null) { - builder.cryptoKeyReader(conf.getCryptoKeyReader()); - } - if (conf.getMessageCrypto() != null) { - builder.messageCrypto(conf.getMessageCrypto()); - } - if (ic.getBatchReceive() && conf.getBatchReceivePolicy() == null) { - builder.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY); - } - - this.consumer = configResolver.customize(builder, ic).subscribe(); + ConsumerBuilder builder = configResolver.configure(client.newConsumer(schema), ic, conf); + this.consumer = builder.subscribe(); log.createdConsumerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf); this.ackHandler = ackHandlerFactory.create(consumer, ic); this.failureHandler = failureHandlerFactory.create(consumer, ic, this::reportFailure); @@ -196,29 +170,6 @@ private boolean isEndOfStream(PulsarClient client, Throwable throwable) { return false; } - private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) { - return DeadLetterPolicy.builder() - .maxRedeliverCount(redeliverCount) - .deadLetterTopic(ic.getDeadLetterPolicyDeadLetterTopic().orElse(null)) - .retryLetterTopic(ic.getDeadLetterPolicyRetryLetterTopic().orElse(null)) - .initialSubscriptionName(ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null)) - .build(); - } - - private RedeliveryBackoff parseBackoff(String backoffString) { - String[] strings = backoffString.split(","); - try { - return MultiplierRedeliveryBackoff.builder() - .minDelayMs(Long.parseLong(strings[0])) - .maxDelayMs(Long.parseLong(strings[1])) - .multiplier(Double.parseDouble(strings[2])) - .build(); - } catch (Exception e) { - log.unableToParseRedeliveryBackoff(backoffString, this.channel); - return null; - } - } - static boolean hasTopicConfig(ConsumerConfigurationData conf) { return conf.getTopicsPattern() != null || (conf.getTopicNames() != null && !conf.getTopicNames().isEmpty()); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java index 3948a6a16f..ba245ed2d3 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.java @@ -57,19 +57,8 @@ public PulsarOutgoingChannel(PulsarClient client, Schema schema, PulsarConnec if (conf.getMaxPendingMessages() > 0 && conf.getMaxPendingMessagesAcrossPartitions() == 0) { conf.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessages()); } - Map producerConf = configResolver.configToMap(conf); - ProducerBuilder builder = client.newProducer(schema) - .loadConf(producerConf); - if (conf.getBatcherBuilder() != null) { - builder.batcherBuilder(conf.getBatcherBuilder()); - } - if (conf.getCryptoKeyReader() != null) { - builder.cryptoKeyReader(conf.getCryptoKeyReader()); - } - for (String encryptionKey : conf.getEncryptionKeys()) { - builder.addEncryptionKey(encryptionKey); - } - this.producer = configResolver.customize(builder, oc).create(); + ProducerBuilder builder = configResolver.configure(client.newProducer(schema), oc, conf); + this.producer = builder.create(); log.createdProducerWithConfig(channel, SchemaResolver.getSchemaName(schema), conf); long requests = getRequests(oc, conf); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java index 1bebfae0ae..413bd23042 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java @@ -11,14 +11,18 @@ import java.util.concurrent.CopyOnWriteArrayList; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; @@ -45,7 +49,7 @@ static void insertMessages() throws PulsarClientException { } @Test - void testBatchRecieveAppUsingPulsarConnector() { + void testBatchReceiveAppUsingPulsarConnector() { // Run app ConsumingApp app = runApplication(config(), ConsumingApp.class); long start = System.currentTimeMillis(); @@ -58,6 +62,35 @@ void testBatchRecieveAppUsingPulsarConnector() { assertThat(app.getResults()).containsExactlyElementsOf(expected); } + @Test + void testBatchReceiveAppWithCustomConfig() { + addBeans(BatchConfig.class); + // Run app + ConsumingApp app = runApplication(config() + .with("mp.messaging.incoming.data.consumer-configuration", "batch-config"), ConsumingApp.class); + long start = System.currentTimeMillis(); + + // Check for consumed messages in app + await().atMost(Duration.ofSeconds(30)).until(() -> app.getResults().size() == NUMBER_OF_MESSAGES); + long end = System.currentTimeMillis(); + + System.out.println("Ack - Estimate: " + (end - start) + " ms"); + assertThat(app.getResults()).containsExactlyElementsOf(expected); + } + + @ApplicationScoped + public static class BatchConfig { + @Produces + @Identifier("batch-config") + public ConsumerConfigurationData configureBatchConsumer() { + var data = new ConsumerConfigurationData<>(); + data.setBatchReceivePolicy(BatchReceivePolicy.builder() + .maxNumMessages(10) + .build()); + return data; + } + } + MapBasedConfig config() { return new MapBasedConfig() .with("mp.messaging.incoming.data.connector", PulsarConnector.CONNECTOR_NAME) @@ -77,7 +110,6 @@ public static class ConsumingApp { @Incoming("data") public CompletionStage consume(PulsarIncomingBatchMessage message) { - // System.out.println(message.getIncomingMessages().size()); results.addAll(message.getPayload()); return message.ack(); }