Skip to content

Commit

Permalink
add controllerMQ listener to Payment Service
Browse files Browse the repository at this point in the history
  • Loading branch information
abd0123 committed May 17, 2024
1 parent 36e78ec commit 5ddb645
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.workup.payments;

import com.workup.payments.commands.PaymentCommandMap;
import com.workup.shared.commands.controller.*;
import com.workup.shared.enums.ServiceQueueNames;
import com.workup.shared.enums.ThreadPoolSize;
import com.zaxxer.hikari.HikariDataSource;
import java.lang.reflect.Field;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = "#{controllerQueue.name}", id = "#{controllerQueue.name}")
public class ControllerMQListener {
@Autowired public PaymentCommandMap commandMap;
@Autowired public ThreadPoolTaskExecutor taskExecutor;
@Autowired private ApplicationContext context;
@Autowired private RabbitListenerEndpointRegistry registry;
@Autowired private HikariDataSource hikariDataSource;

@RabbitHandler
public void receive(SetMaxThreadsRequest in) throws Exception {
try {
System.out.println("Max threads is: " + taskExecutor.getMaxPoolSize());
setThreads(in.getMaxThreads());
ThreadPoolSize.POOL_SIZE = taskExecutor.getMaxPoolSize();
System.out.println("Max threads set to: " + taskExecutor.getMaxPoolSize());
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(SetLoggingLevelRequest in) throws Exception {
try {
Logger logger = LogManager.getRootLogger();
Configurator.setAllLevels(logger.getName(), Level.valueOf(in.getLevel()));
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(FreezeRequest in) throws Exception {
try {
registry.getListenerContainer(ServiceQueueNames.JOBS).stop();
taskExecutor.shutdown();
setThreads(0);
System.out.println("Stopped all threads.");
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(ContinueRequest in) throws Exception {
try {
taskExecutor.start();
setThreads(ThreadPoolSize.POOL_SIZE);
registry.getListenerContainer(ServiceQueueNames.JOBS).start();
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
public void receive(UpdateCommandRequest in) throws Exception {
try {
String className = commandMap.getCommand(in.getName()).getClass().getName();
System.out.println("Updating command: " + in.getName());
System.out.println("Class: " + className);
Class newClass = new MyClassLoader().loadClass(in.getByteCode(), className);
commandMap.replaceCommand(in.getName(), newClass);
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

@RabbitHandler
private void SetMaxDBConnections(SetMaxDBConnectionsRequest in) {
try {
if (hikariDataSource == null) {
System.out.println("HikariDataSource is null");
return;
}
hikariDataSource.setMaximumPoolSize(in.getMaxDBConnections());
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

private void setThreads(int threads) throws NoSuchFieldException, IllegalAccessException {
ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class);
Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize");
maxPoolSize.setAccessible(true);
maxPoolSize.set(myBean, threads);
Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize");
corePoolSize.setAccessible(true);
corePoolSize.set(myBean, threads);
}
}

class MyClassLoader extends ClassLoader {
public Class<?> loadClass(byte[] byteCode, String className) {
System.out.println("Loading class: " + className);
return defineClass(className, byteCode, 0, byteCode.length);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.workup.payments;

import com.workup.shared.enums.ControllerQueueNames;
import com.workup.shared.enums.ServiceQueueNames;
import com.workup.shared.enums.ThreadPoolSize;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
Expand All @@ -9,6 +15,7 @@
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@SpringBootApplication
@ComponentScan(basePackages = "com.workup")
Expand All @@ -24,8 +31,35 @@ public Queue myQueue() {
return new Queue(ServiceQueueNames.PAYMENTS);
}

@Bean
public Queue controllerQueue() {
return new AnonymousQueue();
}

@Bean
public FanoutExchange fanout() {
return new FanoutExchange(ControllerQueueNames.PAYMENTS);
}

@Bean
public Binding fanoutBinding(FanoutExchange fanout, Queue controllerQueue) {
return BindingBuilder.bind(controllerQueue).to(fanout);
}

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(ThreadPoolSize.POOL_SIZE);
executor.setMaxPoolSize(ThreadPoolSize.POOL_SIZE);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("payments-");
executor.initialize();
return executor;
}
}

0 comments on commit 5ddb645

Please sign in to comment.