diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java index 52b7ef76b4a..f46ba06e05b 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java @@ -100,4 +100,13 @@ default Registration register(EventListener.ReactiveGroupEventListener groupList default Collection listRegisteredGroups() { return ImmutableList.of(); } + + default void start() { + } + + default void restart() { + } + + default void stop() { + } } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java index 98c0b036707..1510e635ce3 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java @@ -19,8 +19,6 @@ package org.apache.james.events; -import jakarta.inject.Inject; - import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.reactivestreams.Publisher; @@ -29,10 +27,9 @@ import reactor.core.publisher.Mono; public class EventBusReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler { - private final RabbitMQEventBus rabbitMQEventBus; + private final EventBus rabbitMQEventBus; - @Inject - public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) { + public EventBusReconnectionHandler(EventBus rabbitMQEventBus) { this.rabbitMQEventBus = rabbitMQEventBus; } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index 5480af0a8fe..415bc8355f4 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -63,10 +63,6 @@ class GroupRegistrationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class); - public static class GroupRegistrationHandlerGroup extends Group { - - } - static final Group GROUP = new GroupRegistrationHandlerGroup(); private final NamingStrategy namingStrategy; diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java new file mode 100644 index 00000000000..77626bd3f2e --- /dev/null +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java @@ -0,0 +1,23 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.events; + +public class GroupRegistrationHandlerGroup extends Group { +} diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java index 34cf26046eb..aefa2ce7e9e 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java @@ -26,8 +26,6 @@ import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; -import jakarta.inject.Inject; - import org.apache.james.backends.rabbitmq.QueueArguments; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; @@ -47,7 +45,6 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection private final EventBusId eventBusId; private final RabbitMQConfiguration configuration; - @Inject public KeyReconnectionHandler(NamingStrategy namingStrategy, EventBusId eventBusId, RabbitMQConfiguration configuration) { this.namingStrategy = namingStrategy; this.eventBusId = eventBusId; diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java index 84785611215..ed0e462cc35 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java @@ -36,11 +36,11 @@ public class RabbitEventBusConsumerHealthCheck implements HealthCheck { public static final String COMPONENT = "EventbusConsumers"; - private final RabbitMQEventBus eventBus; + private final EventBus eventBus; private final NamingStrategy namingStrategy; private final SimpleConnectionPool connectionPool; - public RabbitEventBusConsumerHealthCheck(RabbitMQEventBus eventBus, NamingStrategy namingStrategy, + public RabbitEventBusConsumerHealthCheck(EventBus eventBus, NamingStrategy namingStrategy, SimpleConnectionPool connectionPool) { this.eventBus = eventBus; this.namingStrategy = namingStrategy; @@ -65,7 +65,7 @@ public Mono check() { private Result check(Channel channel) { Stream groups = Stream.concat( eventBus.listRegisteredGroups().stream(), - Stream.of(new GroupRegistrationHandler.GroupRegistrationHandlerGroup())); + Stream.of(new GroupRegistrationHandlerGroup())); Optional queueWithoutConsumers = groups .map(namingStrategy::workQueue) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index ccd8df7baca..71e3ea757df 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -83,6 +83,7 @@ public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverPr this.isStopping = false; } + @Override public void start() { if (!isRunning && !isStopping) { @@ -97,6 +98,7 @@ public void start() { } } + @Override public void restart() { keyRegistrationHandler.restart(); groupRegistrationHandler.restart(); diff --git a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java index 2abeb519fed..6d0ae07e4e0 100644 --- a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java +++ b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java @@ -51,7 +51,7 @@ import org.apache.james.modules.data.CassandraUsersRepositoryModule; import org.apache.james.modules.data.CassandraVacationModule; import org.apache.james.modules.event.JMAPEventBusModule; -import org.apache.james.modules.event.RabbitMQEventBusModule; +import org.apache.james.modules.event.MailboxEventBusModule; import org.apache.james.modules.eventstore.CassandraEventStoreModule; import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule; import org.apache.james.modules.mailbox.CassandraMailboxModule; @@ -181,7 +181,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain { protected static final Module MODULES = Modules.override(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule()) .with(new RabbitMQModule(), - new RabbitMQEventBusModule(), + new MailboxEventBusModule(), new DistributedTaskSerializationModule()); public static void main(String[] args) throws Exception { diff --git a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java index 9ecac2a5fb0..c1247e8474b 100644 --- a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java +++ b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java @@ -57,7 +57,7 @@ import org.apache.james.modules.data.CassandraUsersRepositoryModule; import org.apache.james.modules.data.CassandraVacationModule; import org.apache.james.modules.event.JMAPEventBusModule; -import org.apache.james.modules.event.RabbitMQEventBusModule; +import org.apache.james.modules.event.MailboxEventBusModule; import org.apache.james.modules.eventstore.CassandraEventStoreModule; import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule; import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule; @@ -175,7 +175,7 @@ public class DistributedPOP3JamesServerMain implements JamesServerMain { .with(new RabbitMQModule(), new RabbitMQMailQueueModule(), new RabbitMailQueueRoutesModule(), - new RabbitMQEventBusModule(), + new MailboxEventBusModule(), new DistributedTaskSerializationModule()); public static void main(String[] args) throws Exception { diff --git a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java index 772d321f503..3d7df65509d 100644 --- a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java +++ b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java @@ -49,7 +49,7 @@ import org.apache.james.modules.data.PostgresVacationModule; import org.apache.james.modules.data.SievePostgresRepositoryModules; import org.apache.james.modules.event.JMAPEventBusModule; -import org.apache.james.modules.event.RabbitMQEventBusModule; +import org.apache.james.modules.event.MailboxEventBusModule; import org.apache.james.modules.events.PostgresDeadLetterModule; import org.apache.james.modules.mailbox.DefaultEventModule; import org.apache.james.modules.mailbox.PostgresDeletedMessageVaultModule; @@ -231,7 +231,7 @@ public static List chooseEventBusModules(PostgresJamesConfiguration conf new ActiveMQQueueModule()); case RABBITMQ: return List.of( - Modules.override(new DefaultEventModule()).with(new RabbitMQEventBusModule()), + Modules.override(new DefaultEventModule()).with(new MailboxEventBusModule()), new RabbitMQModule(), new RabbitMQMailQueueModule(), new FakeMailQueueViewModule(), diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java similarity index 90% rename from server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java rename to server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java index 37522182a13..2e8d3edb0bd 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java @@ -52,7 +52,7 @@ import reactor.rabbitmq.Sender; -public class RabbitMQEventBusModule extends AbstractModule { +public class MailboxEventBusModule extends AbstractModule { @Override protected void configure() { @@ -64,10 +64,6 @@ protected void configure() { bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT); bind(EventBusId.class).toInstance(EventBusId.random()); - Multibinder reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class); - reconnectionHandlerMultibinder.addBinding().to(KeyReconnectionHandler.class); - reconnectionHandlerMultibinder.addBinding().to(EventBusReconnectionHandler.class); - Multibinder.newSetBinder(binder(), HealthCheck.class) .addBinding().to(RabbitMQMailboxEventBusDeadLetterQueueHealthCheck.class); } @@ -78,6 +74,16 @@ HealthCheck healthCheck(RabbitMQEventBus eventBus, NamingStrategy namingStrategy return new RabbitEventBusConsumerHealthCheck(eventBus, namingStrategy, connectionPool); } + @ProvidesIntoSet + SimpleConnectionPool.ReconnectionHandler provideReconnectionHandler(RabbitMQEventBus eventBus) { + return new EventBusReconnectionHandler(eventBus); + } + + @ProvidesIntoSet + SimpleConnectionPool.ReconnectionHandler provideReconnectionHandler(NamingStrategy namingStrategy, EventBusId eventBusId, RabbitMQConfiguration configuration) { + return new KeyReconnectionHandler(namingStrategy, eventBusId, configuration); + } + @ProvidesIntoSet InitializationOperation workQueue(RabbitMQEventBus instance) { return InitilizationOperationBuilder