diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java index 7e5b234b43..2b2ad88d56 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.java @@ -91,27 +91,14 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest // build sys header requestWrapper.buildSysHeaderForClient(); - Map responseHeaderMap = new HashMap<>(); - responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, - IPUtils.getLocalAddress()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC()); + Map responseHeaderMap = builderResponseHeaderMap(requestWrapper); Map sysHeaderMap = requestWrapper.getSysHeaderMap(); Map responseBodyMap = new HashMap<>(); //validate header - if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) { - + if (validateSysHeader(sysHeaderMap)) { handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap, responseBodyMap, null); return; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java index 454fea3aa9..4bdc7f70c5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java @@ -86,27 +86,14 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest // build sys header requestWrapper.buildSysHeaderForClient(); - Map responseHeaderMap = new HashMap<>(); - responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, - IPUtils.getLocalAddress()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC()); + Map responseHeaderMap = builderResponseHeaderMap(requestWrapper); Map sysHeaderMap = requestWrapper.getSysHeaderMap(); Map responseBodyMap = new HashMap<>(); //validate header - if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) { - + if (validateSysHeader(sysHeaderMap)) { handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap, responseBodyMap, null); return; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java index 952daef88c..1dc1ce0435 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteSubscribeEventProcessor.java @@ -95,27 +95,15 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest // build sys header requestWrapper.buildSysHeaderForClient(); - Map responseHeaderMap = new HashMap<>(); - responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI()); - responseHeaderMap - .put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC()); + + Map responseHeaderMap = builderResponseHeaderMap(requestWrapper); Map sysHeaderMap = requestWrapper.getSysHeaderMap(); Map responseBodyMap = new HashMap<>(); //validate header - if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) { - + if (validateSysHeader(sysHeaderMap)) { handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap, responseBodyMap, null); return; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java index 3d484af7bc..992030d12a 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/RemoteUnSubscribeEventProcessor.java @@ -94,26 +94,15 @@ public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest // build sys header requestWrapper.buildSysHeaderForClient(); - Map responseHeaderMap = new HashMap<>(); - responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI()); - responseHeaderMap - .put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv()); - responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, - eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC()); + + Map responseHeaderMap = builderResponseHeaderMap(requestWrapper); Map sysHeaderMap = requestWrapper.getSysHeaderMap(); Map responseBodyMap = new HashMap<>(); //validate header - if (StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()) - || StringUtils.isBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString())) { + if (validateSysHeader(sysHeaderMap)) { handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap, responseBodyMap, null); return; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java index 9a11dd0dd5..02e022214f 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/AbstractEventProcessor.java @@ -20,10 +20,14 @@ import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.http.HttpEventWrapper; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; import org.apache.eventmesh.common.utils.ConfigurationContextUtil; +import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.registry.nacos.constant.NacosConstant; import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer; +import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupMetadata; import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf; @@ -38,18 +42,16 @@ import java.util.Map; import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; + /** * EventProcessor */ +@Slf4j public abstract class AbstractEventProcessor implements AsyncHttpProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger("AbstractEventProcessor"); - protected transient EventMeshHTTPServer eventMeshHTTPServer; public AbstractEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) { @@ -97,7 +99,7 @@ protected void updateMetadata() { eventMeshHTTPServer.getRegistry().registerMetadata(metadata); } catch (Exception e) { - LOGGER.error("[LocalSubscribeEventProcessor] update eventmesh metadata error", e); + log.error("[LocalSubscribeEventProcessor] update eventmesh metadata error", e); } } @@ -148,4 +150,36 @@ protected String getTargetMesh(String consumerGroup, List subs } return targetMesh; } + + /** + * builder response header map + * @param requestWrapper requestWrapper + * @return Map + */ + protected Map builderResponseHeaderMap(HttpEventWrapper requestWrapper) { + Map responseHeaderMap = new HashMap<>(); + EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration(); + responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI()); + responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, + eventMeshHttpConfiguration.getEventMeshCluster()); + responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, + IPUtils.getLocalAddress()); + responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, + eventMeshHttpConfiguration.getEventMeshEnv()); + responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, + eventMeshHttpConfiguration.getEventMeshIDC()); + return responseHeaderMap; + } + + /** + * validation sysHeaderMap is null + * @param sysHeaderMap sysHeaderMap + * @return Returns true if any is empty + */ + protected boolean validateSysHeader(Map sysHeaderMap) { + return StringUtils.isAnyBlank(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.IDC).toString(), + sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString(), + sysHeaderMap.get(ProtocolKey.ClientInstanceKey.SYS).toString()) + || !StringUtils.isNumeric(sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID).toString()); + } }