diff --git a/ddd-core/pom.xml b/ddd-core/pom.xml index 9189e2d..8ffbbd3 100644 --- a/ddd-core/pom.xml +++ b/ddd-core/pom.xml @@ -22,6 +22,11 @@ spring-tx provided + + org.springframework + spring-messaging + provided + io.swagger.core.v3 swagger-annotations diff --git a/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/DomainEventMessageInterceptor.java b/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/DomainEventMessageInterceptor.java new file mode 100644 index 0000000..6219ca6 --- /dev/null +++ b/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/DomainEventMessageInterceptor.java @@ -0,0 +1,71 @@ +package org.netcorepal.cap4j.ddd.domain.event; + +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; + +import java.util.Map; +import java.util.UUID; + +/** + * 领域事件消息拦截器 + * + * @author binking338 + * @date 2024/8/8 + */ +public interface DomainEventMessageInterceptor { + + /** + * 发送前 + * + * @param message + * @return + */ + Message beforePublish(Message message); + + /** + * 消费前 + * + * @param message + * @return + */ + Message beforeSubscribe(Message message); + + /** + * 可变更消息头 + */ + public static class ModifiableMessageHeaders extends MessageHeaders { + + public ModifiableMessageHeaders(@Nullable Map headers) { + this( + headers, + headers.containsKey(ID) ? UUID.fromString(headers.get(ID).toString()) : null, + headers.containsKey(TIMESTAMP) ? Long.parseLong(headers.get(TIMESTAMP).toString()) : null + ); + } + + public ModifiableMessageHeaders(@Nullable Map headers, @Nullable UUID id, @Nullable Long timestamp) { + super(headers, id, timestamp); + } + + @Override + public void putAll(Map map) { + this.getRawHeaders().putAll(map); + } + + @Override + public Object put(String key, Object value) { + return this.getRawHeaders().put(key, value); + } + + @Override + public Object remove(Object key) { + return this.getRawHeaders().remove(key); + } + + @Override + public void clear() { + this.getRawHeaders().clear(); + } + } +} diff --git a/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecord.java b/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecord.java index cf0d156..37442d3 100644 --- a/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecord.java +++ b/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecord.java @@ -20,6 +20,12 @@ public interface EventRecord { */ void init(Object payload, String svcName, LocalDateTime now, Duration expireAfter, int retryTimes); + /** + * 获取事件ID + * @return + */ + String getId(); + /** * 获取事件主题 * @return diff --git a/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/share/Constants.java b/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/share/Constants.java index 4c76272..e18e4ce 100644 --- a/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/share/Constants.java +++ b/ddd-core/src/main/java/org/netcorepal/cap4j/ddd/share/Constants.java @@ -26,5 +26,6 @@ public class Constants { public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ARCHIVE_EXPIREDAYS = "${ddd.distributed.event.schedule.archive.expireDays:7}"; public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ARCHIVE_MAXLOCKSECONDS = "${ddd.distributed.event.schedule.archive.maxLockSeconds:172800}"; public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ARCHIVE_CRON = "${ddd.distributed.event.schedule.archive.cron:0 0 2 * * ?}"; + public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ADDPARTITION_ENABLE = "${ddd.domain.event.schedule.addpartition.enable:true}"; public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ADDPARTITION_CRON = "${ddd.domain.event.schedule.addpartition.cron:0 0 0 * * ?}"; } diff --git a/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecordImpl.java b/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecordImpl.java index 9509b40..29991bc 100644 --- a/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecordImpl.java +++ b/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/EventRecordImpl.java @@ -39,6 +39,11 @@ public void init(Object payload, String svcName, LocalDateTime now, Duration exp event.init(payload, svcName, now, expireAfter, retryTimes); } + @Override + public String getId(){ + return event.getEventUuid(); + } + @Override public String getEventTopic() { return event.getEventType(); diff --git a/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/JpaEventScheduleService.java b/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/JpaEventScheduleService.java index 0cc0ad0..5011199 100644 --- a/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/JpaEventScheduleService.java +++ b/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/JpaEventScheduleService.java @@ -27,8 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_THREADPOOLSIIZE; -import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_SVC_NAME; +import static org.netcorepal.cap4j.ddd.share.Constants.*; /** * 事件调度服务 @@ -62,6 +61,7 @@ private String getSvcName() { @PostConstruct public void init() { executor = new ThreadPoolExecutor(threadPoolsize, threadPoolsize, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + addPartition(); } @Value(KEY_COMPENSATION_LOCKER) @@ -209,7 +209,13 @@ public void migrate(List events, List archivedEvents) { eventRepository.deleteInBatch(events); } + @Value(CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ADDPARTITION_ENABLE) + private boolean enableAddPartition = true; + public void addPartition() { + if(!enableAddPartition){ + return; + } Date now = new Date(); addPartition("__event", DateUtils.addMonths(now, 1)); addPartition("__archived_event", DateUtils.addMonths(now, 1)); diff --git a/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/persistence/Event.java b/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/persistence/Event.java index 301f78d..a3c0227 100644 --- a/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/persistence/Event.java +++ b/ddd-domain-event-jpa/src/main/java/org/netcorepal/cap4j/ddd/domain/event/persistence/Event.java @@ -2,6 +2,7 @@ import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.parser.Feature; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -78,7 +79,7 @@ public Object getPayload() { } catch (ClassNotFoundException e) { log.error("事件类型解析错误", e); } - this.payload = JSON.parseObject(data, dataClass); + this.payload = JSON.parseObject(data, dataClass, Feature.SupportNonPublicField); } return this.payload; } diff --git a/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventPublisher.java b/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventPublisher.java index 39cc5fd..04c1e39 100644 --- a/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventPublisher.java +++ b/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventPublisher.java @@ -9,11 +9,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.time.Duration; import java.time.LocalDateTime; +import java.util.UUID; import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_SVC_NAME; @@ -28,6 +31,7 @@ public class RocketMqDomainEventPublisher implements DomainEventPublisher { private final RocketMqDomainEventSubscriberManager rocketMqDomainEventSubscriberManager; private final RocketMQTemplate rocketMQTemplate; private final EventRecordRepository eventRecordRepository; + @Value(CONFIG_KEY_4_SVC_NAME) private String svcName; private Duration defaultExpireAfter = Duration.ofDays(1); @@ -36,6 +40,9 @@ public class RocketMqDomainEventPublisher implements DomainEventPublisher { @Autowired Environment environment; + @Autowired(required = false) + private DomainEventMessageInterceptor domainEventMessageInterceptor; + /** * 如下配置需配置好,保障RocketMqTemplate被初始化 * ## rocketmq @@ -59,6 +66,7 @@ public RocketMqDomainEventPublisher( /** * 发布事件 + * * @param eventPayload */ @Transactional(propagation = Propagation.REQUIRES_NEW) @@ -77,7 +85,13 @@ public void publish(Object eventPayload) { destination = environment.resolvePlaceholders(destination); if (destination != null && !destination.isEmpty()) { // MQ消息 - rocketMQTemplate.asyncSend(destination, event.getPayload(), new DomainEventSendCallback(event, eventRecordRepository)); + Message message = null; + message = new GenericMessage(event.getPayload(), new DomainEventMessageInterceptor.ModifiableMessageHeaders(null, UUID.fromString(event.getId()), null)); + + if (domainEventMessageInterceptor != null) { + message = domainEventMessageInterceptor.beforePublish(message); + } + rocketMQTemplate.asyncSend(destination, message, new DomainEventSendCallback(event, eventRecordRepository)); } else { // 进程内消息 rocketMqDomainEventSubscriberManager.trigger(event.getPayload()); @@ -88,6 +102,7 @@ public void publish(Object eventPayload) { log.error(String.format("集成事件发布失败: %s", event.toString()), ex); } } + @Slf4j public static class DomainEventSendCallback implements SendCallback { private EventRecord event; diff --git a/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventSubscriberAdapter.java b/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventSubscriberAdapter.java index f520ae1..23a638b 100644 --- a/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventSubscriberAdapter.java +++ b/ddd-domain-event-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqDomainEventSubscriberAdapter.java @@ -1,6 +1,7 @@ package org.netcorepal.cap4j.ddd.domain.event; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.parser.Feature; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -16,12 +17,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Set; +import java.util.*; import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_DOMAIN_EVENT_SUB_PACKAGE; import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_SVC_NAME; @@ -36,6 +36,7 @@ @RequiredArgsConstructor public class RocketMqDomainEventSubscriberAdapter { private static final String CONFIG_KEY_4_ROCKETMQ_NAMESVC = "${rocketmq.name-server:}"; + private static final String CONFIG_KEY_4_ROCKETMQ_MSGCHARTSET = "${rocketmq.msg-charset:UTF-8}"; private final RocketMqDomainEventSubscriberManager rocketMqDomainEventSubscriberManager; List mqPushConsumers = new ArrayList<>(); @@ -43,6 +44,8 @@ public class RocketMqDomainEventSubscriberAdapter { String applicationName = null; @Value(CONFIG_KEY_4_ROCKETMQ_NAMESVC) String defaultNameSrv = null; + @Value(CONFIG_KEY_4_ROCKETMQ_MSGCHARTSET) + String msgCharset = null; @Value(CONFIG_KEY_4_DOMAIN_EVENT_SUB_PACKAGE) String scanPath = null; @Autowired @@ -51,6 +54,9 @@ public class RocketMqDomainEventSubscriberAdapter { @Autowired(required = false) MQConsumerConfigure mqConsumerConfigure; + @Autowired(required = false) + private DomainEventMessageInterceptor domainEventMessageInterceptor; + @PostConstruct public void init() { Set> classes = ScanUtils.scanClass(scanPath, true); @@ -117,8 +123,15 @@ public DefaultMQPushConsumer createDefaultConsumer(Class domainEventClass) { try { for (MessageExt msg : msgs) { - String strMsg = new String(msg.getBody(), "UTF-8"); - Object event = JSON.parseObject(strMsg, domainEventClass); + String strMsg = new String(msg.getBody(), msgCharset); + Object event = JSON.parseObject(strMsg, domainEventClass, Feature.SupportNonPublicField); + if (domainEventMessageInterceptor != null) { + Map headers = new HashMap<>(); + msg.getProperties().forEach((k,v) -> headers.put(k, v)); + Message message = new GenericMessage(event, new DomainEventMessageInterceptor.ModifiableMessageHeaders(headers)); + message = domainEventMessageInterceptor.beforeSubscribe(message); + event = message.getPayload(); + } rocketMqDomainEventSubscriberManager.trigger(event); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; diff --git a/starter/ddd-starter-jpa-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqEventAutoConfiguration.java b/starter/ddd-starter-jpa-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqEventAutoConfiguration.java index b5e1c2c..cea6b02 100644 --- a/starter/ddd-starter-jpa-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqEventAutoConfiguration.java +++ b/starter/ddd-starter-jpa-rocketmq/src/main/java/org/netcorepal/cap4j/ddd/domain/event/RocketMqEventAutoConfiguration.java @@ -76,7 +76,6 @@ public RocketMqDomainEventSubscriberAdapter rocketMqDomainEventSubscriberAdapter @Bean public JpaEventScheduleService eventScheduleService(DomainEventPublisher domainEventPublisher) { scheduleService = new JpaEventScheduleService(locker, domainEventPublisher, eventRepository, archivedEventJpaRepository, jdbcTemplate); - scheduleService.addPartition(); return scheduleService; }