diff --git a/compose.override.yaml b/compose.override.yaml
index 9902dd7a..b70f434b 100644
--- a/compose.override.yaml
+++ b/compose.override.yaml
@@ -1,4 +1,3 @@
-
version: '3.7'
services:
service_jobs:
diff --git a/compose.yaml b/compose.yaml
index cb11db81..0700244d 100644
--- a/compose.yaml
+++ b/compose.yaml
@@ -30,6 +30,8 @@ services:
service_mq:
image: rabbitmq:3.13-management
+ ports:
+ - "5672:5672" # hacky method.. dont ever do this :(
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
@@ -38,7 +40,7 @@ services:
networks:
- default
- frontend
-
+
service_redis:
image: redis:7.2.4
healthcheck:
@@ -101,7 +103,7 @@ services:
POSTGRES_USER: payments_user
POSTGRES_DB: payments_database
healthcheck:
- test: ["CMD", "pg_isready"]
+ test: [ "CMD", "pg_isready" ]
interval: 20s
timeout: 10s
retries: 10
diff --git a/controller/.env b/controller/.env
new file mode 100644
index 00000000..3fedf8df
--- /dev/null
+++ b/controller/.env
@@ -0,0 +1,4 @@
+JOBS_MQ_URL=service_mq
+JOBS_MQ_USER=guest
+JOBS_MQ_PASSWORD=guest
+JOBS_MQ_PORT=guest
\ No newline at end of file
diff --git a/controller/Dockerfile b/controller/Dockerfile
new file mode 100644
index 00000000..6730ed34
--- /dev/null
+++ b/controller/Dockerfile
@@ -0,0 +1,5 @@
+FROM eclipse-temurin:21-jre-alpine
+VOLUME /tmp
+ARG JAR_FILE=target/*.jar
+COPY ${JAR_FILE} controller.jar
+ENTRYPOINT ["tail", "-f", "/dev/null"]
\ No newline at end of file
diff --git a/controller/pom.xml b/controller/pom.xml
index c2035f0c..b458f126 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -20,16 +20,53 @@
org.springframework.boot
spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ com.workup
+ jobs
+ ${project.version}
+
+
+ org.testcontainers
+ cassandra
+
+
+ org.springframework.boot
+ spring-boot-starter-data-cassandra
+
+
-
org.springframework.boot
spring-boot-starter-test
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
test
org.springframework.boot
spring-boot-starter-amqp
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ javassist
+ javassist
+ 3.12.1.GA
com.workup
@@ -41,6 +78,10 @@
cliche
110413
+
+ org.springframework.boot
+ spring-boot-starter-log4j2
+
diff --git a/controller/src/main/java/com/workup/controller/CLIHandler.java b/controller/src/main/java/com/workup/controller/CLIHandler.java
index 69be21e8..9e0200fe 100644
--- a/controller/src/main/java/com/workup/controller/CLIHandler.java
+++ b/controller/src/main/java/com/workup/controller/CLIHandler.java
@@ -2,10 +2,15 @@
import asg.cliche.CLIException;
import asg.cliche.Command;
-import com.workup.shared.commands.controller.SetMaxThreadsRequest;
+import com.workup.shared.commands.controller.*;
+import com.workup.shared.commands.jobs.requests.CreateJobRequest;
import com.workup.shared.enums.ControllerQueueNames;
+import com.workup.shared.enums.ServiceQueueNames;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import javassist.*;
+import org.apache.logging.log4j.Level;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@@ -21,7 +26,7 @@ public class CLIHandler {
}
@Command(description = "Set the maximum number of threads for a specific app")
- public String maxthreads(String app, int maxThreads) throws CLIException {
+ public String maxThreads(String app, int maxThreads) throws CLIException {
app = app.toLowerCase();
if (!appQueueMap.containsKey(app)) {
return "Error: app can only be jobs, users, contracts or payments!";
@@ -33,22 +38,44 @@ public String maxthreads(String app, int maxThreads) throws CLIException {
appQueueMap.get(app),
"",
SetMaxThreadsRequest.builder().withMaxThreads(maxThreads).build());
- return "MaxThreads";
+ return "Command sent";
}
@Command(description = "Set the maximum number of DB connections for a specific app")
- public String maxdb(String app, int appNum, String maxDBConn) {
- return "maxdb";
+ public String maxdb(String app, int maxDBConn) {
+ app = app.toLowerCase();
+ if (!appQueueMap.containsKey(app)) {
+ return "Error: app can only be jobs, users, contracts or payments!";
+ }
+ if (maxDBConn > 100 || maxDBConn < 1) {
+ return "Error: Max threads must have a value between 1 and 100";
+ }
+ rabbitTemplate.convertAndSend(
+ appQueueMap.get(app),
+ "",
+ SetMaxDBConnectionsRequest.builder().withMaxDBConnections(maxDBConn).build());
+ return "Command Sent!";
}
@Command(description = "starts a specific app")
- public String start(String app, int appNum) {
- return "start";
+ public String start(String app) {
+ app = app.toLowerCase();
+ if (!appQueueMap.containsKey(app)) {
+ return "Error: app can only be jobs, users, contracts or payments!";
+ }
+
+ rabbitTemplate.convertAndSend(appQueueMap.get(app), "", ContinueRequest.builder().build());
+ return "Command sent";
}
@Command(description = "stops a specific app")
- public String freeze(String app, int appNum) {
- return "freeze";
+ public String freeze(String app) {
+ app = app.toLowerCase();
+ if (!appQueueMap.containsKey(app)) {
+ return "Error: app can only be jobs, users, contracts or payments!";
+ }
+ rabbitTemplate.convertAndSend(appQueueMap.get(app), "", FreezeRequest.builder().build());
+ return "Command sent";
}
@Command(description = "stops a specific app")
@@ -57,8 +84,22 @@ public String setmq(String app, int appNum) {
}
@Command(description = "stops a specific app")
- public String setErrorReportingLevel(String app, int appNum) {
- return "error level";
+ public String setLoggingLevel(String app, String level) {
+ app = app.toLowerCase();
+ if (!appQueueMap.containsKey(app)) {
+ return "Error: app can only be jobs, users, contracts or payments!";
+ }
+ // To throw an error in case an invalid level is provided :)
+ Level.valueOf(level);
+ rabbitTemplate.convertAndSend(
+ appQueueMap.get(app), "", SetLoggingLevelRequest.builder().withLevel(level).build());
+ return "Command sent!!";
+ }
+
+ @Command(description = "test")
+ public void test() {
+ CreateJobRequest request = CreateJobRequest.builder().withTitle("Ziko").build();
+ rabbitTemplate.convertSendAndReceive(ServiceQueueNames.JOBS, request);
}
@Command(description = "Creates a new command")
@@ -67,8 +108,37 @@ public String addcommand(String app, String commandName, String className) {
}
@Command(description = "Updates an existing command")
- public String updatecommand(String app, String commandName, String className) {
- return "Update Command";
+ public String updateCommand(String app, String commandName, String className) throws Exception {
+ app = app.toLowerCase();
+ if (!appQueueMap.containsKey(app)) {
+ return "Error: app can only be jobs, users, contracts or payments!";
+ }
+ try {
+ byte[] byteArray = getByteCode(commandName, className);
+ rabbitTemplate.convertAndSend(
+ appQueueMap.get(app),
+ "",
+ UpdateCommandRequest.builder()
+ .withCommandName(commandName)
+ .withByteCode(byteArray)
+ .build());
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ return "Command sent!!";
+ }
+
+ private byte[] getByteCode(String commandName, String className)
+ throws InstantiationException,
+ IllegalAccessException,
+ NotFoundException,
+ IOException,
+ CannotCompileException {
+ ClassPool pool = ClassPool.getDefault();
+ pool.insertClassPath(new ClassClassPath(ControllerApplication.class));
+ CtClass ctClass = pool.get(className);
+ // That's the compiled class byte code
+ return ctClass.toBytecode();
}
@Command(description = "Deletes an existing command")
diff --git a/pom.xml b/pom.xml
index 750662ae..c85af379 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,6 @@
shared
- controller
services/jobs
services/payments
services/users
@@ -61,18 +60,18 @@
git-build-hook-maven-plugin
3.5.0
-
- true
-
+
+ true
+
-
-
- configure
-
-
+
+
+ configure
+
+
-
+
diff --git a/services/jobs/pom.xml b/services/jobs/pom.xml
index 53281e1e..d6d51d0f 100644
--- a/services/jobs/pom.xml
+++ b/services/jobs/pom.xml
@@ -48,19 +48,26 @@
shared
${project.version}
-
- javassist
- javassist
- 3.12.1.GA
-
org.springframework.boot
spring-boot-starter-test
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
test
org.springframework.boot
spring-boot-testcontainers
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
test
@@ -94,6 +101,12 @@
org.springframework.boot
spring-boot-starter-data-cassandra
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
org.projectlombok
diff --git a/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java b/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java
index 1a63839a..8ef28ae6 100644
--- a/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java
+++ b/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java
@@ -1,38 +1,123 @@
package com.workup.jobs;
+import com.workup.jobs.commands.JobCommand;
import com.workup.jobs.commands.JobCommandMap;
-import com.workup.shared.commands.controller.SetMaxThreadsRequest;
+import com.workup.shared.commands.Command;
+import com.workup.shared.commands.CommandRequest;
+import com.workup.shared.commands.CommandResponse;
+import com.workup.shared.commands.controller.*;
+import com.workup.shared.enums.ServiceQueueNames;
+import com.workup.shared.enums.ThreadPoolSize;
import java.lang.reflect.Field;
-import javassist.*;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
-@RabbitListener(queues = "#{controllerQueue.name}")
+@RabbitListener(queues = "#{controllerQueue.name}", id = "#{controllerQueue.name}")
public class ControllerMQListener {
-
@Autowired public JobCommandMap commandMap;
@Autowired public ThreadPoolTaskExecutor taskExecutor;
-
@Autowired private ApplicationContext context;
+ @Autowired private RabbitListenerEndpointRegistry registry;
@RabbitHandler
public void receive(SetMaxThreadsRequest in) throws Exception {
try {
- ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class);
- Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize");
- maxPoolSize.setAccessible(true);
- maxPoolSize.set(myBean, in.getMaxThreads());
- Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize");
- corePoolSize.setAccessible(true);
- corePoolSize.set(myBean, in.getMaxThreads());
+ System.out.println("Max threads is: " + taskExecutor.getMaxPoolSize());
+ setThreads(in.getMaxThreads());
+ ThreadPoolSize.POOL_SIZE = taskExecutor.getMaxPoolSize();
+ System.out.println("Max threads set to: " + taskExecutor.getMaxPoolSize());
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @RabbitHandler
+ public void receive(SetLoggingLevelRequest in) throws Exception {
+ try {
+ Logger logger = LogManager.getRootLogger();
+ Configurator.setAllLevels(logger.getName(), Level.valueOf(in.getLevel()));
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @RabbitHandler
+ public void receive(FreezeRequest in) throws Exception {
+ try {
+ registry.getListenerContainer(ServiceQueueNames.JOBS).stop();
+ taskExecutor.shutdown();
+ setThreads(0);
+ System.out.println("Stopped all threads.");
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}
+
+ @RabbitHandler
+ public void receive(ContinueRequest in) throws Exception {
+ try {
+ taskExecutor.start();
+ setThreads(ThreadPoolSize.POOL_SIZE);
+ registry.getListenerContainer(ServiceQueueNames.JOBS).start();
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @RabbitHandler
+ public void receive(UpdateCommandRequest in) throws Exception {
+ try {
+ String className = commandMap.getCommand(in.getCommandName()).getClass().getName();
+ byte[] byteArray = in.getByteCode();
+ Class> clazz =
+ (Class>)
+ (new MyClassLoader(this.getClass().getClassLoader()).loadClass(byteArray, className));
+
+ commandMap.replaceCommand(
+ in.getCommandName(),
+ (Class extends JobCommand extends CommandRequest, ? extends CommandResponse>>)
+ ((Command, ?>) clazz.newInstance()).getClass());
+
+ System.out.println("Updated command: " + in.getCommandName());
+ // clazz.newInstance().Run(null);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ static class MyClassLoader extends ClassLoader {
+ public MyClassLoader(ClassLoader classLoader) {
+ super(classLoader);
+ }
+
+ public Class> loadClass(byte[] byteCode, String className) {
+ return defineClass(className, byteCode, 0, byteCode.length);
+ }
+ }
+
+ private void setThreads(int threads) throws NoSuchFieldException, IllegalAccessException {
+ ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class);
+ Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize");
+ maxPoolSize.setAccessible(true);
+ maxPoolSize.set(myBean, threads);
+ Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize");
+ corePoolSize.setAccessible(true);
+ corePoolSize.set(myBean, threads);
+ }
}
diff --git a/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java b/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java
index 01eb15b9..4ae506ef 100644
--- a/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java
+++ b/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java
@@ -2,6 +2,7 @@
import com.workup.shared.enums.ControllerQueueNames;
import com.workup.shared.enums.ServiceQueueNames;
+import com.workup.shared.enums.ThreadPoolSize;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@@ -56,8 +57,9 @@ public MessageConverter messageConverter() {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(50);
- executor.setMaxPoolSize(50);
+ executor.setCorePoolSize(ThreadPoolSize.POOL_SIZE);
+ executor.setMaxPoolSize(ThreadPoolSize.POOL_SIZE);
+ executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("jobs-");
executor.initialize();
diff --git a/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java b/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java
index 847f20c3..76e5a1b7 100644
--- a/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java
+++ b/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java
@@ -1,7 +1,6 @@
package com.workup.jobs;
import com.workup.jobs.commands.AcceptProposalCommand;
-import com.workup.jobs.commands.CreateJobCommand;
import com.workup.jobs.commands.CreateProposalCommand;
import com.workup.jobs.commands.GetJobByIdCommand;
import com.workup.jobs.commands.GetMyJobsCommand;
@@ -9,6 +8,8 @@
import com.workup.jobs.commands.GetProposalsByJobIdCommand;
import com.workup.jobs.commands.JobCommandMap;
import com.workup.jobs.commands.SearchJobsCommand;
+import com.workup.shared.commands.Command;
+import com.workup.shared.commands.CommandRequest;
import com.workup.shared.commands.jobs.proposals.requests.AcceptProposalRequest;
import com.workup.shared.commands.jobs.proposals.requests.CreateProposalRequest;
import com.workup.shared.commands.jobs.proposals.requests.GetMyProposalsRequest;
@@ -34,7 +35,7 @@
import org.springframework.stereotype.Service;
@Service
-@RabbitListener(queues = ServiceQueueNames.JOBS)
+@RabbitListener(queues = ServiceQueueNames.JOBS, id = ServiceQueueNames.JOBS)
public class RabbitMQListener {
@Autowired public JobCommandMap commandMap;
@@ -42,8 +43,10 @@ public class RabbitMQListener {
@RabbitHandler
@Async
public CompletableFuture receive(CreateJobRequest in) throws Exception {
- CreateJobResponse response = ((CreateJobCommand) commandMap.getCommand("CreateJob")).Run(in);
- return CompletableFuture.completedFuture(response);
+ CreateJobResponse resp =
+ (CreateJobResponse)
+ ((Command) commandMap.getCommand("CreateJob")).Run(in);
+ return CompletableFuture.completedFuture(resp);
}
@RabbitHandler
diff --git a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java
index de96129e..1b46a9f7 100644
--- a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java
+++ b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java
@@ -11,9 +11,9 @@
public abstract class JobCommand
implements Command {
- @Setter JobRepository jobRepository;
+ @Setter public JobRepository jobRepository;
- @Setter ProposalRepository proposalRepository;
+ @Setter public ProposalRepository proposalRepository;
- @Setter AmqpTemplate rabbitTemplate;
+ @Setter public AmqpTemplate rabbitTemplate;
}
diff --git a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java
index 8bc9456a..3de55692 100644
--- a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java
+++ b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java
@@ -13,11 +13,11 @@
public class JobCommandMap
extends CommandMap> {
- @Autowired JobRepository jobRepository;
+ @Autowired public JobRepository jobRepository;
- @Autowired ProposalRepository proposalRepository;
+ @Autowired public ProposalRepository proposalRepository;
- @Autowired AmqpTemplate rabbitTemplate;
+ @Autowired public AmqpTemplate rabbitTemplate;
public void registerCommands() {
commands.put("CreateJob", CreateJobCommand.class);
diff --git a/services/jobs/src/main/resources/application.properties b/services/jobs/src/main/resources/application.properties
index 61221848..b5b92468 100644
--- a/services/jobs/src/main/resources/application.properties
+++ b/services/jobs/src/main/resources/application.properties
@@ -5,4 +5,5 @@ spring.rabbitmq.password=guest
spring.cassandra.local-datacenter=datacenter1
spring.cassandra.keyspace-name=jobs_data
spring.cassandra.contact-points=${JOBS_DB_URL}
-spring.cassandra.schema-action=CREATE_IF_NOT_EXISTS
\ No newline at end of file
+spring.cassandra.schema-action=CREATE_IF_NOT_EXISTS
+logging.level.root=INFO
diff --git a/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java b/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java
index 706bf74d..17526e5f 100644
--- a/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java
+++ b/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java
@@ -1,38 +1,125 @@
-package com.workup.payments;
-
-import com.workup.shared.commands.controller.SetMaxThreadsRequest;
-import java.lang.reflect.Field;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.springframework.amqp.rabbit.annotation.RabbitHandler;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Service;
-
-@Service
-@RabbitListener(queues = "#{controllerQueue.name}")
-public class ControllerMQListener {
-
- private static final Logger logger = LogManager.getLogger(ControllerMQListener.class);
-
- @Autowired public ThreadPoolTaskExecutor taskExecutor;
-
- @Autowired private ApplicationContext context;
-
- @RabbitHandler
- public void receive(SetMaxThreadsRequest in) throws Exception {
- try {
- ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class);
- Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize");
- maxPoolSize.setAccessible(true);
- maxPoolSize.set(myBean, in.getMaxThreads());
- Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize");
- corePoolSize.setAccessible(true);
- corePoolSize.set(myBean, in.getMaxThreads());
- } catch (Exception e) {
- logger.error("Error setting max threads", e.getMessage());
- }
- }
-}
+package com.workup.payments;
+
+import com.workup.payments.commands.PaymentCommandMap;
+import com.workup.shared.commands.controller.*;
+import com.workup.shared.enums.ServiceQueueNames;
+import com.workup.shared.enums.ThreadPoolSize;
+import com.zaxxer.hikari.HikariDataSource;
+import java.lang.reflect.Field;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+
+@Service
+@RabbitListener(queues = "#{controllerQueue.name}", id = "#{controllerQueue.name}")
+public class ControllerMQListener {
+ @Autowired public PaymentCommandMap commandMap;
+ @Autowired public ThreadPoolTaskExecutor taskExecutor;
+ @Autowired private ApplicationContext context;
+ @Autowired private RabbitListenerEndpointRegistry registry;
+ @Autowired private HikariDataSource hikariDataSource;
+
+ @RabbitHandler
+ public void receive(SetMaxThreadsRequest in) throws Exception {
+ try {
+ System.out.println("Max threads is: " + taskExecutor.getMaxPoolSize());
+ setThreads(in.getMaxThreads());
+ ThreadPoolSize.POOL_SIZE = taskExecutor.getMaxPoolSize();
+ System.out.println("Max threads set to: " + taskExecutor.getMaxPoolSize());
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @RabbitHandler
+ public void receive(SetLoggingLevelRequest in) throws Exception {
+ try {
+ Logger logger = LogManager.getRootLogger();
+ Configurator.setAllLevels(logger.getName(), Level.valueOf(in.getLevel()));
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @RabbitHandler
+ public void receive(FreezeRequest in) throws Exception {
+ try {
+ registry.getListenerContainer(ServiceQueueNames.JOBS).stop();
+ taskExecutor.shutdown();
+ setThreads(0);
+ System.out.println("Stopped all threads.");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @RabbitHandler
+ public void receive(ContinueRequest in) throws Exception {
+ try {
+ taskExecutor.start();
+ setThreads(ThreadPoolSize.POOL_SIZE);
+ registry.getListenerContainer(ServiceQueueNames.JOBS).start();
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @RabbitHandler
+ public void receive(UpdateCommandRequest in) throws Exception {
+ // try {
+ // String className = commandMap.getCommand(in.getName()).getClass().getName();
+ // System.out.println("Updating command: " + in.getName());
+ // System.out.println("Class: " + className);
+ // Class newClass = new MyClassLoader().loadClass(in.getByteCode(), className);
+ // commandMap.replaceCommand(in.getName(), newClass);
+ // } catch (Exception e) {
+ // System.out.println(e.getMessage());
+ // e.printStackTrace();
+ // }
+ }
+
+ @RabbitHandler
+ private void SetMaxDBConnections(SetMaxDBConnectionsRequest in) {
+ try {
+ if (hikariDataSource == null) {
+ System.out.println("HikariDataSource is null");
+ return;
+ }
+ System.out.println("Max DB connections is: " + hikariDataSource.getMaximumPoolSize());
+ hikariDataSource.setMaximumPoolSize(in.getMaxDBConnections());
+ System.out.println("Max DB connections set to: " + hikariDataSource.getMaximumPoolSize());
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private void setThreads(int threads) throws NoSuchFieldException, IllegalAccessException {
+ ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class);
+ Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize");
+ maxPoolSize.setAccessible(true);
+ maxPoolSize.set(myBean, threads);
+ Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize");
+ corePoolSize.setAccessible(true);
+ corePoolSize.set(myBean, threads);
+ }
+}
+
+class MyClassLoader extends ClassLoader {
+ public Class> loadClass(byte[] byteCode, String className) {
+ System.out.println("Loading class: " + className);
+ return defineClass(className, byteCode, 0, byteCode.length);
+ }
+}
diff --git a/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java b/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java
index fee10a68..819e50fc 100644
--- a/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java
+++ b/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java
@@ -2,6 +2,8 @@
import com.workup.shared.enums.ControllerQueueNames;
import com.workup.shared.enums.ServiceQueueNames;
+import com.workup.shared.enums.ThreadPoolSize;
+
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
@@ -32,11 +34,6 @@ public Queue myQueue() {
return new Queue(ServiceQueueNames.PAYMENTS);
}
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
-
@Bean
public Queue controllerQueue() {
return new AnonymousQueue();
@@ -52,11 +49,17 @@ public Binding fanoutBinding(FanoutExchange fanout, Queue controllerQueue) {
return BindingBuilder.bind(controllerQueue).to(fanout);
}
+ @Bean
+ public MessageConverter messageConverter() {
+ return new Jackson2JsonMessageConverter();
+ }
+
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(50);
- executor.setMaxPoolSize(50);
+ executor.setCorePoolSize(ThreadPoolSize.POOL_SIZE);
+ executor.setMaxPoolSize(ThreadPoolSize.POOL_SIZE);
+ executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("payments-");
executor.initialize();
diff --git a/shared/pom.xml b/shared/pom.xml
index 17c02a8c..cec90cd1 100644
--- a/shared/pom.xml
+++ b/shared/pom.xml
@@ -1,42 +1,53 @@
- 4.0.0
- com.workup
- shared
- 0.0.1-SNAPSHOT
- shared
-
- 21
- 21
-
-
- com.workup
- main
- 1.0-SNAPSHOT
-
-
-
- com.fasterxml.jackson.dataformat
- jackson-dataformat-xml
- 2.16.0
-
-
- org.springframework.boot
- spring-boot-starter-data-redis
- 3.1.2
-
-
- redis.clients
- jedis
- 3.7.0
-
-
- org.projectlombok
- lombok
- 1.18.30
- provided
-
-
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
+ com.workup
+ shared
+ 0.0.1-SNAPSHOT
+ shared
+
+ 21
+ 21
+
+
+ com.workup
+ main
+ 1.0-SNAPSHOT
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-xml
+ 2.16.0
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+ 3.1.2
+
+
+ org.springframework.boot
+ spring-boot-starter-logging
+
+
+
+
+ redis.clients
+ jedis
+ 3.7.0
+
+
+ org.projectlombok
+ lombok
+ 1.18.30
+ provided
+
+
+ org.springframework.boot
+ spring-boot-starter-log4j2
+ 3.2.5
+
+
\ No newline at end of file
diff --git a/shared/src/main/java/com/workup/shared/commands/CommandMap.java b/shared/src/main/java/com/workup/shared/commands/CommandMap.java
index 8d34facd..ece18e29 100644
--- a/shared/src/main/java/com/workup/shared/commands/CommandMap.java
+++ b/shared/src/main/java/com/workup/shared/commands/CommandMap.java
@@ -24,4 +24,8 @@ public R getCommand(String command) throws Exception {
setupCommand(commandInstance);
return commandInstance;
}
+
+ public void replaceCommand(String command, Class extends R> newCommand) {
+ commands.put(command, newCommand);
+ }
}
diff --git a/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java
index 947802a6..99f68a77 100644
--- a/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java
+++ b/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java
@@ -1,6 +1,11 @@
package com.workup.shared.commands.controller;
+import lombok.Builder;
+import lombok.extern.jackson.Jacksonized;
+
/** Makes a service start accepting requests and acquire resources again. */
+@Builder
+@Jacksonized
public class ContinueRequest {
// No fields are required?
}
diff --git a/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java
index 563c88c2..3f841e89 100644
--- a/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java
+++ b/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java
@@ -1,6 +1,11 @@
package com.workup.shared.commands.controller;
+import lombok.Builder;
+import lombok.extern.jackson.Jacksonized;
+
/** Makes a service stop accepting requests and release resources. */
+@Builder
+@Jacksonized
public class FreezeRequest {
// No fields are required?
}
diff --git a/shared/src/main/java/com/workup/shared/commands/controller/SetErrorReportingLevelRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/SetLoggingLevelRequest.java
similarity index 68%
rename from shared/src/main/java/com/workup/shared/commands/controller/SetErrorReportingLevelRequest.java
rename to shared/src/main/java/com/workup/shared/commands/controller/SetLoggingLevelRequest.java
index f7b15132..e035135a 100644
--- a/shared/src/main/java/com/workup/shared/commands/controller/SetErrorReportingLevelRequest.java
+++ b/shared/src/main/java/com/workup/shared/commands/controller/SetLoggingLevelRequest.java
@@ -1,6 +1,5 @@
package com.workup.shared.commands.controller;
-import com.workup.shared.enums.ErrorLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.jackson.Jacksonized;
@@ -9,6 +8,6 @@
@Getter
@Builder(setterPrefix = "with")
@Jacksonized
-public class SetErrorReportingLevelRequest {
- ErrorLevel errorLevel;
+public class SetLoggingLevelRequest {
+ String level;
}
diff --git a/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java
index 1d2fbafa..b8c01ed0 100644
--- a/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java
+++ b/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java
@@ -9,5 +9,6 @@
@Builder(setterPrefix = "with")
@Jacksonized
public class UpdateCommandRequest {
- String name;
+ String commandName;
+ byte[] byteCode;
}
diff --git a/shared/src/main/java/com/workup/shared/enums/ErrorLevel.java b/shared/src/main/java/com/workup/shared/enums/ErrorLevel.java
deleted file mode 100644
index 444baebe..00000000
--- a/shared/src/main/java/com/workup/shared/enums/ErrorLevel.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.workup.shared.enums;
-
-public enum ErrorLevel {
- INFO,
- WARNING,
- ERROR,
- FATAL
-}
diff --git a/shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java b/shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java
new file mode 100644
index 00000000..4a6df8bc
--- /dev/null
+++ b/shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java
@@ -0,0 +1,5 @@
+package com.workup.shared.enums;
+
+public class ThreadPoolSize {
+ public static int POOL_SIZE = 50;
+}