Skip to content

Commit

Permalink
feature:DomainEventSupervisor支持基于实体附加事件。只有实体进入持久化上下文(unitOfWork.persi…
Browse files Browse the repository at this point in the history
…st 或 unitOfWork.remove),附加在实体上的领域事件才会触发。
  • Loading branch information
binking338 committed Aug 21, 2024
1 parent e526a25 commit deedeb0
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 56 deletions.
40 changes: 26 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
[![Maven Central Version](https://img.shields.io/maven-central/v/io.github.netcorepal/cap4j)](https://central.sonatype.com/artifact/io.github.netcorepal/cap4j)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/netcorepal/cap4j/blob/main/LICENSE)

本项目是 [CAP](https://github.com/dotnetcore/CAP) 项目的 Java 实现,其基于 Outbox 模式的事件总线可用于解决微服务架构中的分布式事务问题
本项目是 [CAP](https://github.com/dotnetcore/CAP) 项目的 Java 实现,基于[整洁架构](https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html)、领域模型、Outbox模式、CQS模式以及UoW等理念,cap4j期望解决如何实现领域驱动设计的问题

同时,基于[整洁架构](https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html)、领域模型以及CQS模式,cap4j期望解决如何实现领域驱动设计的问题。

如果对整洁架构、领域模型和CQS模式有充分了解,那么cap4j的使用将会非常顺手。另一方面,通过cap4j来构建你的服务,将会对你展示一种实现领域驱动设计的完整落地方法。
如果对以上架构理念有充分了解,那么cap4j的使用将会非常顺手。另一方面,通过cap4j来构建你的服务,你将学会一种实现领域驱动设计的完整落地方法。

## 快速开始

Expand Down Expand Up @@ -880,17 +878,31 @@ public class OrderPlacedDomainEvent {

```java
import org.netcorepal.cap4j.ddd.domain.event.impl.DefaultDomainEventSupervisor;
import java.time.LocalDateTime;

// 省略

// 代码省略...
public class Order {
// 省略
DefaultDomainEventSupervisor.instance.attach(OrderPlacedDomainEvent.builder()
.orderNo(this.orderNo)
.amount(this.amount)
.orderTime(LocalDateTime.now())
.build());
// 省略
// 代码省略...
public class Order {

// 【行为方法开始】

/**
* 下单初始化
* @param items
*/
public void init(List<OrderItem> items){
// 代码省略...
DefaultDomainEventSupervisor.instance.attach(OrderPlacedDomainEvent.builder()
.orderNo(this.orderNo)
.amount(this.amount)
.orderTime(LocalDateTime.now())
.build(), this);
}

// 【行为方法结束】
// 代码省略...
}
}
```

Expand All @@ -909,7 +921,7 @@ import org.springframework.stereotype.Service;
public class OrderPlacedDomainEventSubscriber{
@EventListener(DeliveryReceivedDomainEvent.class)
public void onEvent(DeliveryReceivedDomainEvent event){
// 处理事件
// 事件处理逻辑
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;

/**
* 领域事件管理器
Expand Down Expand Up @@ -31,11 +32,40 @@ public interface DomainEventSupervisor {
*/
void attach(Object eventPayload, LocalDateTime schedule);

/**
* 附加事件
* @param eventPayload
* @param entity 绑定实体,该实体对象进入持久化上下文才会触发事件分发
*/
void attach(Object eventPayload, Object entity);

/**
* 附加事件
* @param eventPayload
* @param entity 绑定实体,该实体对象进入持久化上下文才会触发事件分发
* @param delay 延迟发送
*/
void attach(Object eventPayload, Object entity, Duration delay);

/**
* 附加事件
* @param eventPayload
* @param entity 绑定实体,该实体对象进入持久化上下文才会触发事件分发
* @param schedule 指定时间发送
*/
void attach(Object eventPayload, Object entity, LocalDateTime schedule);

/**
* 剥离事件
* @param eventPayload
*/
void detach(Object eventPayload);
/**
* 剥离事件
* @param eventPayload
* @param entity
*/
void detach(Object eventPayload, Object entity);
/**
* 重置事件
*/
Expand All @@ -45,12 +75,19 @@ public interface DomainEventSupervisor {
* 获取事件列表
* @return
*/
List<Object> getEvents();
Set<Object> getEvents();

/**
* 获取实体绑定的事件列表
* @param entity
* @return
*/
public Set<Object> getEvents(Object entity);

/**
* 获取发送事件
* @param eventPlayload
* @param eventPayload
* @return
*/
LocalDateTime getDeliverTime(Object eventPlayload);
LocalDateTime getDeliverTime(Object eventPayload);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* 默认领域事件管理器
Expand All @@ -17,62 +14,114 @@
*/
public class DefaultDomainEventSupervisor implements DomainEventSupervisor {
public static DomainEventSupervisor instance = new DefaultDomainEventSupervisor();
private static final ThreadLocal<List<Object>> TL_EVENT_PAYLOADS = new ThreadLocal<List<Object>>();
private static final ThreadLocal<Set<Object>> TL_EVENT_PAYLOADS = new ThreadLocal<Set<Object>>();
private static final ThreadLocal<Map<Object, Set<Object>>> TL_ENTITY_EVENT_PAYLOADS = new ThreadLocal<Map<Object, Set<Object>>>();
private static final ThreadLocal<Map<Object, LocalDateTime>> TL_EVENT_SCHEDULE_MAP = new ThreadLocal<Map<Object, LocalDateTime>>();
private static final List<Object> EMPTY_EVENT_PAYLOADS = Collections.emptyList();
private static final Set<Object> EMPTY_EVENT_PAYLOADS = Collections.emptySet();

@Override
public void attach(Object eventPayload) {
attach(eventPayload, LocalDateTime.now());
}

@Override
public void attach(Object eventPayload, Duration delay){
public void attach(Object eventPayload, Duration delay) {
attach(eventPayload, LocalDateTime.now().plus(delay));
}

@Override
public void attach(Object eventPayload, LocalDateTime schedule){
List<Object> eventPayloads = TL_EVENT_PAYLOADS.get();
if(eventPayloads == null){
eventPayloads = new java.util.ArrayList<Object>();
public void attach(Object eventPayload, LocalDateTime schedule) {
Set<Object> eventPayloads = TL_EVENT_PAYLOADS.get();
if (eventPayloads == null) {
eventPayloads = new HashSet<>();
TL_EVENT_PAYLOADS.set(eventPayloads);
}
eventPayloads.add(eventPayload);

Map<Object, LocalDateTime> eventScheduleMap = TL_EVENT_SCHEDULE_MAP.get();
if(eventScheduleMap == null){
eventScheduleMap = new HashMap<>();
TL_EVENT_SCHEDULE_MAP.set(eventScheduleMap);
putDeliverTime(eventPayload, schedule);
}

@Override
public void attach(Object eventPayload, Object entity) {
attach(eventPayload, entity, LocalDateTime.now());
}

@Override
public void attach(Object eventPayload, Object entity, Duration delay) {
attach(eventPayload, entity, LocalDateTime.now().plus(delay));
}

@Override
public void attach(Object eventPayload, Object entity, LocalDateTime schedule) {
Map<Object, Set<Object>> entityEventPayloads = TL_ENTITY_EVENT_PAYLOADS.get();
if (entityEventPayloads == null) {
entityEventPayloads = new HashMap<>();
TL_ENTITY_EVENT_PAYLOADS.set(entityEventPayloads);
}
eventScheduleMap.put(eventPayload, schedule);
if (!entityEventPayloads.containsKey(entity)) {
entityEventPayloads.put(entity, new HashSet<>());
}
entityEventPayloads.get(entity).add(eventPayload);

putDeliverTime(eventPayload, schedule);
}

@Override
public void detach(Object eventPayload) {
List<Object> eventPayloads = TL_EVENT_PAYLOADS.get();
if(eventPayloads != null){
eventPayloads.remove(eventPayload);
Set<Object> eventPayloads = TL_EVENT_PAYLOADS.get();
if (eventPayloads == null) {
return;
}
eventPayloads.remove(eventPayload);
}

@Override
public void detach(Object eventPayload, Object entity) {
Map<Object, Set<Object>> entityEventPayloads = TL_ENTITY_EVENT_PAYLOADS.get();
if (entityEventPayloads == null) {
return;
}
Set<Object> eventPayloads = entityEventPayloads.containsKey(entity) ? entityEventPayloads.get(entity) : null;
if (eventPayloads == null) {
return;
}

eventPayloads.remove(eventPayload);
}

@Override
public void reset() {
TL_EVENT_PAYLOADS.remove();
TL_EVENT_SCHEDULE_MAP.remove();;
TL_ENTITY_EVENT_PAYLOADS.remove();
TL_EVENT_SCHEDULE_MAP.remove();
}

@Override
public List<Object> getEvents() {
List<Object> eventPayloads = TL_EVENT_PAYLOADS.get();
public Set<Object> getEvents() {
Set<Object> eventPayloads = TL_EVENT_PAYLOADS.get();
return eventPayloads != null ? eventPayloads : EMPTY_EVENT_PAYLOADS;
}

@Override
public LocalDateTime getDeliverTime(Object eventPlayload) {
public Set<Object> getEvents(Object entity) {
Map<Object, Set<Object>> entityEventPayloads = TL_ENTITY_EVENT_PAYLOADS.get();
return entityEventPayloads != null && entityEventPayloads.containsKey(entity) ? entityEventPayloads.get(entity) : EMPTY_EVENT_PAYLOADS;
}

protected void putDeliverTime(Object eventPayload, LocalDateTime schedule) {
Map<Object, LocalDateTime> eventScheduleMap = TL_EVENT_SCHEDULE_MAP.get();
if (eventScheduleMap == null) {
eventScheduleMap = new HashMap<>();
TL_EVENT_SCHEDULE_MAP.set(eventScheduleMap);
}
eventScheduleMap.put(eventPayload, schedule);
}

@Override
public LocalDateTime getDeliverTime(Object eventPayload) {
Map<Object, LocalDateTime> eventScheduleMap = TL_EVENT_SCHEDULE_MAP.get();
if(eventScheduleMap != null && eventScheduleMap.containsKey(eventPlayload)){
return eventScheduleMap.get(eventPlayload);
if (eventScheduleMap != null && eventScheduleMap.containsKey(eventPayload)) {
return eventScheduleMap.get(eventPayload);
} else {
return LocalDateTime.now();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void publish(Message message, EventRecord event) {
Duration delay = Duration.ZERO;
if (message.getHeaders().containsKey(HEADER_KEY_CAP4J_SCHEDULE)) {
LocalDateTime scheduleAt = (LocalDateTime) message.getHeaders().get(HEADER_KEY_CAP4J_SCHEDULE);
if(scheduleAt!=null) {
if (scheduleAt != null) {
delay = Duration.between(LocalDateTime.now(), scheduleAt);
}
}
Expand All @@ -85,11 +85,12 @@ public void publish(Message message, EventRecord event) {
}

private void internalPublish(Message message, EventRecord event) {
String destination = event.getEventTopic();
destination = TextUtils.resolvePlaceholderWithCache(destination, environment);
boolean isIntergrationEvent = destination != null && !destination.isEmpty();
try {
String destination = event.getEventTopic();
destination = TextUtils.resolvePlaceholderWithCache(destination, environment);
// MQ消息
if (destination != null && !destination.isEmpty()) {
if (isIntergrationEvent) {
rocketMQTemplate.asyncSend(destination, message, new DomainEventSendCallback(event, eventRecordRepository, environment));
} else {
// 进程内消息
Expand All @@ -98,7 +99,7 @@ private void internalPublish(Message message, EventRecord event) {
eventRecordRepository.save(event);
}
} catch (Exception ex) {
log.error(String.format("集成事件发布失败: %s", event.toString()), ex);
log.error(String.format("%s发布失败: %s", isIntergrationEvent ? "集成事件" : "领域事件", event.toString()), ex);
}
}

Expand Down
Loading

0 comments on commit deedeb0

Please sign in to comment.