Skip to content

Commit

Permalink
Merge pull request #2700 from weihubeats/AbstractEventProcessor
Browse files Browse the repository at this point in the history
[ISSUE #2699] AbstractEventProcessor Enhancement
  • Loading branch information
xwm1992 authored Dec 29, 2022
2 parents 0b4596b + 2d54c87 commit 2b5c7f2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,14 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
// build sys header
requestWrapper.buildSysHeaderForClient();

Map<String, Object> responseHeaderMap = new HashMap<>();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
IPUtils.getLocalAddress());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);

Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();

Map<String, Object> responseBodyMap = new HashMap<>();

//validate header
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {

if (validateSysHeader(sysHeaderMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, null);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,14 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
// build sys header
requestWrapper.buildSysHeaderForClient();

Map<String, Object> responseHeaderMap = new HashMap<>();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
IPUtils.getLocalAddress());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);

Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();

Map<String, Object> responseBodyMap = new HashMap<>();

//validate header
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {

if (validateSysHeader(sysHeaderMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, null);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,15 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
// build sys header
requestWrapper.buildSysHeaderForClient();

Map<String, Object> responseHeaderMap = new HashMap<>();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap
.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());

Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);

Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();

Map<String, Object> responseBodyMap = new HashMap<>();

//validate header
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {

if (validateSysHeader(sysHeaderMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, null);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,15 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest
// build sys header
requestWrapper.buildSysHeaderForClient();

Map<String, Object> responseHeaderMap = new HashMap<>();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap
.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());

Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);

Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();

Map<String, Object> responseBodyMap = new HashMap<>();

//validate header
if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString())
|| StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) {
if (validateSysHeader(sysHeaderMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, null);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.registry.nacos.constant.NacosConstant;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupMetadata;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
Expand All @@ -38,18 +42,16 @@
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

/**
* EventProcessor
*/
@Slf4j
public abstract class AbstractEventProcessor implements AsyncHttpProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger("AbstractEventProcessor");

protected transient EventMeshHTTPServer eventMeshHTTPServer;

public AbstractEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
Expand Down Expand Up @@ -97,7 +99,7 @@ protected void updateMetadata() {
eventMeshHTTPServer.getRegistry().registerMetadata(metadata);

} catch (Exception e) {
LOGGER.error("[LocalSubscribeEventProcessor] update eventmesh metadata error", e);
log.error("[LocalSubscribeEventProcessor] update eventmesh metadata error", e);
}
}

Expand Down Expand Up @@ -148,4 +150,36 @@ protected String getTargetMesh(String consumerGroup, List<SubscriptionItem> subs
}
return targetMesh;
}

/**
* builder response header map
* @param requestWrapper requestWrapper
* @return Map
*/
protected Map<String, Object> builderResponseHeaderMap(HttpEventWrapper requestWrapper) {
Map<String, Object> responseHeaderMap = new HashMap<>();
EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHttpConfiguration.getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
IPUtils.getLocalAddress());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHttpConfiguration.getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHttpConfiguration.getEventMeshIDC());
return responseHeaderMap;
}

/**
* validation sysHeaderMap is null
* @param sysHeaderMap sysHeaderMap
* @return Returns true if any is empty
*/
protected boolean validateSysHeader(Map<String, Object> sysHeaderMap) {
return StringUtils.isAnyBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString(),
sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString(),
sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())
|| !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString());
}
}

0 comments on commit 2b5c7f2

Please sign in to comment.