Skip to content

Commit 7390899

Browse files
authored
Merge pull request #2700 from weihubeats/AbstractEventProcessor
[ISSUE #2699] AbstractEventProcessor Enhancement
2 parents efef5a3 + 8d8450c commit 7390899

File tree

5 files changed

+50
-65
lines changed

5 files changed

+50
-65
lines changed

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -91,27 +91,14 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
9191
// build sys header
9292
requestWrapper.buildSysHeaderForClient();
9393

94-
Map<String, Object> responseHeaderMap = new HashMap<>();
95-
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
96-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
97-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
98-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
99-
IPUtils.getLocalAddress());
100-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
101-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
102-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
103-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
94+
Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
10495

10596
Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
10697

10798
Map<String, Object> responseBodyMap = new HashMap<>();
10899

109100
//validate header
110-
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
111-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
112-
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
113-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {
114-
101+
if (validateSysHeader(sysHeaderMap)) {
115102
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
116103
responseBodyMap, null);
117104
return;

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -86,27 +86,14 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
8686
// build sys header
8787
requestWrapper.buildSysHeaderForClient();
8888

89-
Map<String, Object> responseHeaderMap = new HashMap<>();
90-
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
91-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
92-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
93-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
94-
IPUtils.getLocalAddress());
95-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
96-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
97-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
98-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
89+
Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
9990

10091
Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
10192

10293
Map<String, Object> responseBodyMap = new HashMap<>();
10394

10495
//validate header
105-
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
106-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
107-
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
108-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {
109-
96+
if (validateSysHeader(sysHeaderMap)) {
11097
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
11198
responseBodyMap, null);
11299
return;

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java

+3-15
Original file line numberDiff line numberDiff line change
@@ -95,27 +95,15 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
9595
// build sys header
9696
requestWrapper.buildSysHeaderForClient();
9797

98-
Map<String, Object> responseHeaderMap = new HashMap<>();
99-
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
100-
responseHeaderMap
101-
.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
102-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
103-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
104-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
105-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
106-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
107-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
98+
99+
Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
108100

109101
Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
110102

111103
Map<String, Object> responseBodyMap = new HashMap<>();
112104

113105
//validate header
114-
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
115-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
116-
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
117-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {
118-
106+
if (validateSysHeader(sysHeaderMap)) {
119107
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
120108
responseBodyMap, null);
121109
return;

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -94,26 +94,15 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
9494
// build sys header
9595
requestWrapper.buildSysHeaderForClient();
9696

97-
Map<String, Object> responseHeaderMap = new HashMap<>();
98-
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
99-
responseHeaderMap
100-
.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
101-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
102-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
103-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
104-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
105-
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
106-
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
97+
98+
Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
10799

108100
Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
109101

110102
Map<String, Object> responseBodyMap = new HashMap<>();
111103

112104
//validate header
113-
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
114-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
115-
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
116-
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {
105+
if (validateSysHeader(sysHeaderMap)) {
117106
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
118107
responseBodyMap, null);
119108
return;

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java

+40-6
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
2121
import org.apache.eventmesh.common.config.CommonConfiguration;
2222
import org.apache.eventmesh.common.protocol.SubscriptionItem;
23+
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
24+
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
2325
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
26+
import org.apache.eventmesh.common.utils.IPUtils;
2427
import org.apache.eventmesh.common.utils.JsonUtils;
2528
import org.apache.eventmesh.registry.nacos.constant.NacosConstant;
2629
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
30+
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
2731
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
2832
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupMetadata;
2933
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
@@ -38,18 +42,16 @@
3842
import java.util.Map;
3943
import java.util.Optional;
4044

41-
import org.slf4j.Logger;
42-
import org.slf4j.LoggerFactory;
43-
4445
import com.google.common.collect.Maps;
4546

47+
import lombok.extern.slf4j.Slf4j;
48+
4649
/**
4750
* EventProcessor
4851
*/
52+
@Slf4j
4953
public abstract class AbstractEventProcessor implements AsyncHttpProcessor {
5054

51-
private static final Logger LOGGER = LoggerFactory.getLogger("AbstractEventProcessor");
52-
5355
protected transient EventMeshHTTPServer eventMeshHTTPServer;
5456

5557
public AbstractEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -97,7 +99,7 @@ protected void updateMetadata() {
9799
eventMeshHTTPServer.getRegistry().registerMetadata(metadata);
98100

99101
} catch (Exception e) {
100-
LOGGER.error("[LocalSubscribeEventProcessor] update eventmesh metadata error", e);
102+
log.error("[LocalSubscribeEventProcessor] update eventmesh metadata error", e);
101103
}
102104
}
103105

@@ -148,4 +150,36 @@ protected String getTargetMesh(String consumerGroup, List<SubscriptionItem> subs
148150
}
149151
return targetMesh;
150152
}
153+
154+
/**
155+
* builder response header map
156+
* @param requestWrapper requestWrapper
157+
* @return Map
158+
*/
159+
protected Map<String, Object> builderResponseHeaderMap(HttpEventWrapper requestWrapper) {
160+
Map<String, Object> responseHeaderMap = new HashMap<>();
161+
EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
162+
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
163+
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
164+
eventMeshHttpConfiguration.getEventMeshCluster());
165+
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
166+
IPUtils.getLocalAddress());
167+
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
168+
eventMeshHttpConfiguration.getEventMeshEnv());
169+
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
170+
eventMeshHttpConfiguration.getEventMeshIDC());
171+
return responseHeaderMap;
172+
}
173+
174+
/**
175+
* validation sysHeaderMap is null
176+
* @param sysHeaderMap sysHeaderMap
177+
* @return Returns true if any is empty
178+
*/
179+
protected boolean validateSysHeader(Map<String, Object> sysHeaderMap) {
180+
return StringUtils.isAnyBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString(),
181+
sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString(),
182+
sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())
183+
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
184+
}
151185
}

0 commit comments

Comments
 (0)