Skip to content

Commit

Permalink
修复非公开字段json反序列化问题
Browse files Browse the repository at this point in the history
增加领域事件消息拦截处理机制
  • Loading branch information
binking338 committed Aug 8, 2024
1 parent 6106f59 commit bf4be03
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 11 deletions.
5 changes: 5 additions & 0 deletions ddd-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<artifactId>spring-tx</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.netcorepal.cap4j.ddd.domain.event;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import java.util.Map;
import java.util.UUID;

/**
* 领域事件消息拦截器
*
* @author binking338
* @date 2024/8/8
*/
public interface DomainEventMessageInterceptor {

/**
* 发送前
*
* @param message
* @return
*/
Message beforePublish(Message message);

/**
* 消费前
*
* @param message
* @return
*/
Message beforeSubscribe(Message message);

/**
* 可变更消息头
*/
public static class ModifiableMessageHeaders extends MessageHeaders {

public ModifiableMessageHeaders(@Nullable Map<String, Object> headers) {
this(
headers,
headers.containsKey(ID) ? UUID.fromString(headers.get(ID).toString()) : null,
headers.containsKey(TIMESTAMP) ? Long.parseLong(headers.get(TIMESTAMP).toString()) : null
);
}

public ModifiableMessageHeaders(@Nullable Map<String, Object> headers, @Nullable UUID id, @Nullable Long timestamp) {
super(headers, id, timestamp);
}

@Override
public void putAll(Map<? extends String, ?> map) {
this.getRawHeaders().putAll(map);
}

@Override
public Object put(String key, Object value) {
return this.getRawHeaders().put(key, value);
}

@Override
public Object remove(Object key) {
return this.getRawHeaders().remove(key);
}

@Override
public void clear() {
this.getRawHeaders().clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public interface EventRecord {
*/
void init(Object payload, String svcName, LocalDateTime now, Duration expireAfter, int retryTimes);

/**
* 获取事件ID
* @return
*/
String getId();

/**
* 获取事件主题
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ public class Constants {
public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ARCHIVE_EXPIREDAYS = "${ddd.distributed.event.schedule.archive.expireDays:7}";
public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ARCHIVE_MAXLOCKSECONDS = "${ddd.distributed.event.schedule.archive.maxLockSeconds:172800}";
public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ARCHIVE_CRON = "${ddd.distributed.event.schedule.archive.cron:0 0 2 * * ?}";
public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ADDPARTITION_ENABLE = "${ddd.domain.event.schedule.addpartition.enable:true}";
public static final String CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ADDPARTITION_CRON = "${ddd.domain.event.schedule.addpartition.cron:0 0 0 * * ?}";
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void init(Object payload, String svcName, LocalDateTime now, Duration exp
event.init(payload, svcName, now, expireAfter, retryTimes);
}

@Override
public String getId(){
return event.getEventUuid();
}

@Override
public String getEventTopic() {
return event.getEventType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_THREADPOOLSIIZE;
import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_SVC_NAME;
import static org.netcorepal.cap4j.ddd.share.Constants.*;

/**
* 事件调度服务
Expand Down Expand Up @@ -62,6 +61,7 @@ private String getSvcName() {
@PostConstruct
public void init() {
executor = new ThreadPoolExecutor(threadPoolsize, threadPoolsize, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
addPartition();
}

@Value(KEY_COMPENSATION_LOCKER)
Expand Down Expand Up @@ -209,7 +209,13 @@ public void migrate(List<Event> events, List<ArchivedEvent> archivedEvents) {
eventRepository.deleteInBatch(events);
}

@Value(CONFIG_KEY_4_DISTRIBUTED_EVENT_SCHEDULE_ADDPARTITION_ENABLE)
private boolean enableAddPartition = true;

public void addPartition() {
if(!enableAddPartition){
return;
}
Date now = new Date();
addPartition("__event", DateUtils.addMonths(now, 1));
addPartition("__archived_event", DateUtils.addMonths(now, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -78,7 +79,7 @@ public Object getPayload() {
} catch (ClassNotFoundException e) {
log.error("事件类型解析错误", e);
}
this.payload = JSON.parseObject(data, dataClass);
this.payload = JSON.parseObject(data, dataClass, Feature.SupportNonPublicField);
}
return this.payload;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.UUID;

import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_SVC_NAME;

Expand All @@ -28,6 +31,7 @@ public class RocketMqDomainEventPublisher implements DomainEventPublisher {
private final RocketMqDomainEventSubscriberManager rocketMqDomainEventSubscriberManager;
private final RocketMQTemplate rocketMQTemplate;
private final EventRecordRepository eventRecordRepository;

@Value(CONFIG_KEY_4_SVC_NAME)
private String svcName;
private Duration defaultExpireAfter = Duration.ofDays(1);
Expand All @@ -36,6 +40,9 @@ public class RocketMqDomainEventPublisher implements DomainEventPublisher {
@Autowired
Environment environment;

@Autowired(required = false)
private DomainEventMessageInterceptor domainEventMessageInterceptor;

/**
* 如下配置需配置好,保障RocketMqTemplate被初始化
* ## rocketmq
Expand All @@ -59,6 +66,7 @@ public RocketMqDomainEventPublisher(

/**
* 发布事件
*
* @param eventPayload
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
Expand All @@ -77,7 +85,13 @@ public void publish(Object eventPayload) {
destination = environment.resolvePlaceholders(destination);
if (destination != null && !destination.isEmpty()) {
// MQ消息
rocketMQTemplate.asyncSend(destination, event.getPayload(), new DomainEventSendCallback(event, eventRecordRepository));
Message message = null;
message = new GenericMessage(event.getPayload(), new DomainEventMessageInterceptor.ModifiableMessageHeaders(null, UUID.fromString(event.getId()), null));

if (domainEventMessageInterceptor != null) {
message = domainEventMessageInterceptor.beforePublish(message);
}
rocketMQTemplate.asyncSend(destination, message, new DomainEventSendCallback(event, eventRecordRepository));
} else {
// 进程内消息
rocketMqDomainEventSubscriberManager.trigger(event.getPayload());
Expand All @@ -88,6 +102,7 @@ public void publish(Object eventPayload) {
log.error(String.format("集成事件发布失败: %s", event.toString()), ex);
}
}

@Slf4j
public static class DomainEventSendCallback implements SendCallback {
private EventRecord event;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.netcorepal.cap4j.ddd.domain.event;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -16,12 +17,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.*;

import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_DOMAIN_EVENT_SUB_PACKAGE;
import static org.netcorepal.cap4j.ddd.share.Constants.CONFIG_KEY_4_SVC_NAME;
Expand All @@ -36,13 +36,16 @@
@RequiredArgsConstructor
public class RocketMqDomainEventSubscriberAdapter {
private static final String CONFIG_KEY_4_ROCKETMQ_NAMESVC = "${rocketmq.name-server:}";
private static final String CONFIG_KEY_4_ROCKETMQ_MSGCHARTSET = "${rocketmq.msg-charset:UTF-8}";
private final RocketMqDomainEventSubscriberManager rocketMqDomainEventSubscriberManager;

List<MQPushConsumer> mqPushConsumers = new ArrayList<>();
@Value(CONFIG_KEY_4_SVC_NAME)
String applicationName = null;
@Value(CONFIG_KEY_4_ROCKETMQ_NAMESVC)
String defaultNameSrv = null;
@Value(CONFIG_KEY_4_ROCKETMQ_MSGCHARTSET)
String msgCharset = null;
@Value(CONFIG_KEY_4_DOMAIN_EVENT_SUB_PACKAGE)
String scanPath = null;
@Autowired
Expand All @@ -51,6 +54,9 @@ public class RocketMqDomainEventSubscriberAdapter {
@Autowired(required = false)
MQConsumerConfigure mqConsumerConfigure;

@Autowired(required = false)
private DomainEventMessageInterceptor domainEventMessageInterceptor;

@PostConstruct
public void init() {
Set<Class<?>> classes = ScanUtils.scanClass(scanPath, true);
Expand Down Expand Up @@ -117,8 +123,15 @@ public DefaultMQPushConsumer createDefaultConsumer(Class domainEventClass) {
try {
for (MessageExt msg :
msgs) {
String strMsg = new String(msg.getBody(), "UTF-8");
Object event = JSON.parseObject(strMsg, domainEventClass);
String strMsg = new String(msg.getBody(), msgCharset);
Object event = JSON.parseObject(strMsg, domainEventClass, Feature.SupportNonPublicField);
if (domainEventMessageInterceptor != null) {
Map<String, Object> headers = new HashMap<>();
msg.getProperties().forEach((k,v) -> headers.put(k, v));
Message message = new GenericMessage(event, new DomainEventMessageInterceptor.ModifiableMessageHeaders(headers));
message = domainEventMessageInterceptor.beforeSubscribe(message);
event = message.getPayload();
}
rocketMqDomainEventSubscriberManager.trigger(event);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public RocketMqDomainEventSubscriberAdapter rocketMqDomainEventSubscriberAdapter
@Bean
public JpaEventScheduleService eventScheduleService(DomainEventPublisher domainEventPublisher) {
scheduleService = new JpaEventScheduleService(locker, domainEventPublisher, eventRepository, archivedEventJpaRepository, jdbcTemplate);
scheduleService.addPartition();
return scheduleService;
}

Expand Down

0 comments on commit bf4be03

Please sign in to comment.