Skip to content

Commit

Permalink
Add support for reactive PubSub consumers (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyg484 authored Oct 27, 2023
1 parent 591b23a commit 160724a
Show file tree
Hide file tree
Showing 65 changed files with 2,571 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id "io.micronaut.build.internal.gcp-base"
id "io.micronaut.test-resources"
}

tasks.withType(Test) {
useJUnitPlatform()
}

dependencies {
annotationProcessor(mn.micronaut.inject.java)

implementation(projects.micronautGcpFunctionHttp)
implementation(projects.micronautGcpPubsub)
implementation(projects.micronautGcpSecretManager)
implementation(mn.reactor)
implementation mnSerde.micronaut.serde.jackson
implementation mnJacksonXml.micronaut.jackson.xml

compileOnly(libs.managed.functions.framework.api)

testRuntimeOnly(libs.junit.jupiter.engine)
testRuntimeOnly mnLogging.logback.classic
testRuntimeOnly mn.snakeyaml
}

micronaut {
importMicronautPlatform.set(false)
}
2 changes: 1 addition & 1 deletion gcp-function-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
testAnnotationProcessor(mnSerde.micronaut.serde.processor)
testImplementation(mnSerde.micronaut.serde.jackson)

testImplementation(mnRxjava2.micronaut.rxjava2)
testImplementation(mnRxjava3.micronaut.rxjava3)
testAnnotationProcessor(mn.micronaut.inject.java)
testImplementation(libs.managed.functions.framework.api)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Flowable;
import io.reactivex.rxjava3.core.Flowable;

@Controller("/reactive")
public class ReactiveController {
Expand Down
3 changes: 2 additions & 1 deletion gcp-pubsub/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ dependencies {

testAnnotationProcessor(mn.micronaut.inject.java)
testRuntimeOnly(mn.micronaut.discovery.core)
testImplementation(mnRxjava2.micronaut.rxjava2)
testImplementation(mnReactor.micronaut.reactor)
testImplementation(mnRxjava3.micronaut.rxjava3)
testImplementation libs.testcontainers.spock
testImplementation(mn.micronaut.http.server.netty)
testImplementation(testFixtures(project(":micronaut-gcp-common")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
package io.micronaut.gcp.pubsub.bind;

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.convert.MutableConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.gcp.pubsub.exception.PubSubListenerException;
import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDes;
import io.micronaut.gcp.pubsub.serdes.PubSubMessageSerDesRegistry;

import io.micronaut.messaging.annotation.MessageBody;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import reactor.core.publisher.Mono;

import java.util.Optional;

/**
Expand All @@ -37,9 +42,33 @@
@Singleton
public class PubSubBodyBinder implements PubSubAnnotatedArgumentBinder<MessageBody> {

private final ConversionService conversionService;

private final PubSubMessageSerDesRegistry serDesRegistry;

/**
* Constructs a PubSub body binder instance.
*
* @deprecated An instance of {@link ConversionService} is needed for binding the full range of supported
* types (including reactive) to PubSub subscriber methods. <p>{@link #PubSubBodyBinder(ConversionService, PubSubMessageSerDesRegistry)} should be used instead.
*
* @param serDesRegistry the SerDe registry
*/
@Deprecated(since = "5.2.0", forRemoval = true)
public PubSubBodyBinder(PubSubMessageSerDesRegistry serDesRegistry) {
this.conversionService = MutableConversionService.create();
this.serDesRegistry = serDesRegistry;
}

/**
* Constructs a PubSub body binder instance.
*
* @param conversionService the conversion service
* @param serDesRegistry the SerDe registry
*/
@Inject
public PubSubBodyBinder(ConversionService conversionService, PubSubMessageSerDesRegistry serDesRegistry) {
this.conversionService = conversionService;
this.serDesRegistry = serDesRegistry;
}

Expand All @@ -50,21 +79,29 @@ public Class<MessageBody> getAnnotationType() {

@Override
public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState state) {
Argument<Object> bodyType = context.getArgument();
boolean isPublisher = Publishers.isConvertibleToPublisher(context.getArgument().getType());
Argument<?> bodyType = isPublisher ?
context.getArgument().getFirstTypeVariable().orElseThrow(() -> new PubSubListenerException("Could not determine publisher's argument type for PubSub message deserialization")) :
context.getArgument();
Object result = null;
if (bodyType.getType().equals(byte[].class)) {
result = state.getPubsubMessage().getData().toByteArray();
} else if (bodyType.getType().equals(PubsubMessage.class)) {
result = state.getPubsubMessage();
} else {
if (StringUtils.isEmpty(state.getContentType()) && !state.getPubsubMessage().containsAttributes("Content-Type")) {
throw new PubSubListenerException("Could not detect Content-Type header at message and no Content-Type specified on method.");
throw new PubSubListenerException("Could not detect Content-Type header at message and no Content-Type specified on method.");
}
PubSubMessageSerDes serDes = serDesRegistry.find(state.getContentType())
.orElseThrow(() -> new PubSubListenerException("Could not locate a valid SerDes implementation for type: " + state.getContentType()));
.orElseThrow(() -> new PubSubListenerException("Could not locate a valid SerDes implementation for type: " + state.getContentType()));
result = serDes.deserialize(state.getPubsubMessage().getData().toByteArray(), bodyType);
}
Object finalResult = result;
return () -> Optional.ofNullable(finalResult);

if (isPublisher && result.getClass().isArray()) {
result = Mono.just(result);
}

Optional<Object> finalResult = conversionService.convert(result, context);
return () -> finalResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import jakarta.inject.Singleton;

/**
* Default body binder of PubSub consumers. If no @{@link io.micronaut.messaging.annotation.Body} arguments are annotated.
* Default body binder of PubSub consumers. If no @{@link io.micronaut.messaging.annotation.MessageBody} arguments are annotated.
* Delegates to {@link PubSubBodyBinder}
* @author Vinicius Carvalho
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import io.micronaut.context.BeanContext;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.util.StringUtils;
import io.micronaut.gcp.GoogleCloudConfiguration;
Expand All @@ -49,10 +51,13 @@
import jakarta.annotation.PreDestroy;
import jakarta.inject.Qualifier;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -73,6 +78,7 @@ public class PubSubConsumerAdvice implements ExecutableMethodProcessor<Subscript

private final Logger logger = LoggerFactory.getLogger(PubSubConsumerAdvice.class);
private final BeanContext beanContext;
private final ConversionService conversionService;
private final SubscriberFactory subscriberFactory;
private final GoogleCloudConfiguration googleCloudConfiguration;
private final PubSubConfigurationProperties pubSubConfigurationProperties;
Expand All @@ -89,6 +95,7 @@ public PubSubConsumerAdvice(BeanContext beanContext,
PubSubBinderRegistry binderRegistry,
PubSubMessageReceiverExceptionHandler exceptionHandler) {
this.beanContext = beanContext;
this.conversionService = conversionService;
this.subscriberFactory = subscriberFactory;
this.googleCloudConfiguration = googleCloudConfiguration;
this.pubSubConfigurationProperties = pubSubConfigurationProperties;
Expand All @@ -97,6 +104,7 @@ public PubSubConsumerAdvice(BeanContext beanContext,
}

@Override
@SuppressWarnings("unchecked")
public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
if (beanDefinition.hasDeclaredAnnotation(PubSubListener.class)) {
AnnotationValue<Subscription> subscriptionAnnotation = method.getAnnotation(Subscription.class);
Expand Down Expand Up @@ -134,39 +142,38 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
projectSubscriptionName, contentType);
boolean autoAcknowledge = !hasAckArg;
try {
BoundExecutable executable = null;
try {
executable = binder.bind(method, binderRegistry, consumerState);
} catch (Exception ex) {
handleException(new PubSubMessageReceiverException("Error binding message to the method", ex, bean, consumerState, autoAcknowledge));
}
executable.invoke(bean); // Discard result
if (autoAcknowledge) { // if manual ack is not specified we auto ack message after method execution
pubSubAcknowledgement.ack();
} else {
Optional<Object> boundAck = Arrays
.stream(executable.getBoundArguments())
.filter(o -> (o instanceof DefaultPubSubAcknowledgement))
.findFirst();
if (boundAck.isPresent()) {
DefaultPubSubAcknowledgement manualAck = (DefaultPubSubAcknowledgement) boundAck.get();
if (!manualAck.isClientAck()) {
logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", method.getName());
}
}
}
@SuppressWarnings("rawtypes")
BoundExecutable executable = binder.bind(method, binderRegistry, consumerState);
Flux<?> resultPublisher = resultAsFlux(Objects.requireNonNull(executable).invoke(bean));
resultPublisher.subscribe(data -> { }, //no-op
ex -> handleException(new PubSubMessageReceiverException("Error handling message", ex, bean, consumerState, autoAcknowledge)),
autoAcknowledge ? pubSubAcknowledgement::ack : () -> this.verifyManualAcknowledgment(executable, method.getName()));
} catch (UnsatisfiedArgumentException e) {
handleException(new PubSubMessageReceiverException("Error binding message to the method", e, bean, consumerState, autoAcknowledge));
} catch (Exception e) {
handleException(new PubSubMessageReceiverException("Error handling message", e, bean, consumerState, autoAcknowledge));
}
};
try {
this.subscriberFactory.createSubscriber(new SubscriberFactoryConfig(projectSubscriptionName, receiver, configuration, pubSubConfigurationProperties.getSubscribingExecutor()));
} catch (Exception e) {
throw new PubSubListenerException("Failed to create subscriber", e);
throw new PubSubListenerException("Failed to create subscriber for %s with subscription method %s".formatted(beanDefinition.getBeanType(), method.getName()), e);
}
}
}
}

private void verifyManualAcknowledgment(@SuppressWarnings("rawtypes") BoundExecutable executable, String methodName) {
Optional<Object> boundAck = Arrays
.stream(executable.getBoundArguments())
.filter(o -> (o instanceof DefaultPubSubAcknowledgement))
.findFirst();
if (boundAck.isPresent()) {
DefaultPubSubAcknowledgement manualAck = (DefaultPubSubAcknowledgement) boundAck.get();
if (!manualAck.isClientAck()) {
logger.warn("Method {} was executed and no message acknowledge detected. Did you forget to invoke ack()/nack()?", methodName);
}
}
}

@PreDestroy
Expand All @@ -182,4 +189,12 @@ private void handleException(PubSubMessageReceiverException ex) {
}
}

@SuppressWarnings("unchecked")
private <T> Flux<T> resultAsFlux(T result) {
if (!Publishers.isConvertibleToPublisher(result)) {
return Flux.empty();
}
return Flux.from(Publishers.convertPublisher(conversionService, result, Publisher.class));
}

}
Loading

0 comments on commit 160724a

Please sign in to comment.