diff --git a/docker/centos7-jdk8/Dockerfile b/docker/centos7-jdk8/Dockerfile new file mode 100644 index 0000000000..546d3247fc --- /dev/null +++ b/docker/centos7-jdk8/Dockerfile @@ -0,0 +1,12 @@ +FROM docker.io/centos:7 + +MAINTAINER mikexue + +RUN yum update -y && yum install net-tools -y && yum install lrzsz -y && yum install vim -y +ADD jdk-8u281-linux-x64.tar.gz /usr/local/src/ +RUN ln -s /usr/local/src/jdk1.8.0_281/ /usr/local/jdk + +ENV JAVA_HOME /usr/local/jdk +ENV JRE_HOME $JAVA_HOME/jre +ENV CLASSPATH .:$JAVA_HOME/lib/:$JRE_HOME/lib/ +ENV PATH $PATH:$JAVA_HOME/bin diff --git a/docker/eventmesh-defibus/Dockerfile b/docker/eventmesh-defibus/Dockerfile new file mode 100644 index 0000000000..1077ff07e8 --- /dev/null +++ b/docker/eventmesh-defibus/Dockerfile @@ -0,0 +1,15 @@ +FROM centos7-jdk8:latest + +MAINTAINER mikexue mike_xwm@126.com + +WORKDIR /data +RUN mkdir /data/app +ADD eventmesh-runtime_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh +COPY eventmesh-connector-defibus-1.2.0-SNAPSHOT.jar /data/app/eventmesh/apps +WORKDIR /data/app/eventmesh/bin + +EXPOSE 10000 +EXPOSE 10105 + +ENV DOCKER true +CMD sh start.sh diff --git a/docker/eventmesh-rocketmq/Dockerfile b/docker/eventmesh-rocketmq/Dockerfile new file mode 100644 index 0000000000..f18309d88c --- /dev/null +++ b/docker/eventmesh-rocketmq/Dockerfile @@ -0,0 +1,16 @@ +FROM centos7-jdk8:latest + +MAINTAINER mikexue mike_xwm@126.com + +WORKDIR /data +RUN mkdir /data/app +ADD eventmesh-runtime_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh +COPY eventmesh-connector-rocketmq-1.2.0.jar /data/app/eventmesh/apps +WORKDIR /data/app/eventmesh/bin + +EXPOSE 10000 +EXPOSE 10105 + +ENV DOCKER true + +CMD sh start.sh diff --git a/eventmesh-runtime/bin/start.sh b/eventmesh-runtime/bin/start.sh index fd2e8ec884..9effccf2ba 100644 --- a/eventmesh-runtime/bin/start.sh +++ b/eventmesh-runtime/bin/start.sh @@ -155,6 +155,11 @@ echo "using jdk[$JAVA]" >> ${PROXY_LOG_HOME}/proxy.out PROXY_MAIN=com.webank.eventmesh.runtime.boot.ProxyStartup -$JAVA $JAVA_OPT -classpath ${PROXY_HOME}/conf:${PROXY_HOME}/apps/*:${PROXY_HOME}/lib/* $PROXY_MAIN >> ${PROXY_LOG_HOME}/proxy.out 2>&1 & +if [ $DOCKER ] +then + $JAVA $JAVA_OPT -classpath ${PROXY_HOME}/conf:${PROXY_HOME}/apps/*:${PROXY_HOME}/lib/* $PROXY_MAIN >> ${PROXY_LOG_HOME}/proxy.out +else + $JAVA $JAVA_OPT -classpath ${PROXY_HOME}/conf:${PROXY_HOME}/apps/*:${PROXY_HOME}/lib/* $PROXY_MAIN >> ${PROXY_LOG_HOME}/proxy.out 2>&1 & echo $!>pid.file +fi exit 0 diff --git a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java index b408cf1de3..e6c8b6170e 100644 --- a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java +++ b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java @@ -145,7 +145,15 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } synchronized (proxyHTTPServer.localClientInfoMapping){ for (Map.Entry> groupTopicClientMapping : tmp.entrySet()) { - proxyHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue()); + List localClientList = proxyHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey()); + if (CollectionUtils.isEmpty(localClientList)){ + proxyHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue()); + }else { + List tmpClientList = groupTopicClientMapping.getValue(); + supplyClientInfoList(tmpClientList, localClientList); + proxyHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), localClientList); + } + } } @@ -184,6 +192,22 @@ public void onResponse(HttpCommand httpCommand) { } + private void supplyClientInfoList(List tmpClientList, List localClientList) { + for (Client tmpClient : tmpClientList){ + boolean isContains = false; + for (Client localClient : localClientList){ + if (StringUtils.equals(localClient.url, tmpClient.url)){ + isContains = true; + localClient.lastUpTime = tmpClient.lastUpTime; + break; + } + } + if (!isContains){ + localClientList.add(tmpClient); + } + } + } + @Override public boolean rejectRequest() { return false; diff --git a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 45331604b1..2e183dc51e 100644 --- a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -441,6 +441,7 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception { keyValue.put("isBroadcast", "true"); keyValue.put("consumerGroup", groupName); keyValue.put("proxyIDC", accessConfiguration.proxyIDC); + keyValue.put("instanceName", ProxyUtil.buildProxyTcpClientID(sysId, dcn, "SUB", accessConfiguration.proxyCluster)); broadCastMsgConsumer.init(keyValue); // broadCastMsgConsumer.registerMessageListener(new ProxyMessageListenerConcurrently() { // @Override diff --git a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java index f2d2ac4218..69c904015f 100644 --- a/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java +++ b/eventmesh-runtime/src/main/java/com/webank/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/ProxyTcpRetryer.java @@ -162,14 +162,14 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext){ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext){ boolean flag =false; - long ttl = Long.valueOf(downStreamMsgContext.msgExt.sysHeaders().getString(ProxyConstants.PROPERTY_MESSAGE_TTL)); + long ttl = Long.parseLong(downStreamMsgContext.msgExt.userHeaders().getString(ProxyConstants.PROPERTY_MESSAGE_TTL)); //TODO 关注是否能取到 - long storeTimestamp = Long.valueOf(downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.STORE_TIME)); - String leaveTimeStr = downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.LEAVE_TIME); - long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.valueOf(leaveTimeStr) - storeTimestamp : 0; + long storeTimestamp = Long.parseLong(downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.STORE_TIME)); + String leaveTimeStr = downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.LEAVE_TIME); + long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0; - String arriveTimeStr = downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.ARRIVE_TIME); - long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.valueOf(arriveTimeStr) : 0; + String arriveTimeStr = downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.ARRIVE_TIME); + long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0; double elapseTime = brokerCost + accessCost; if (elapseTime >= ttl) { logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, ProxyUtil.getMessageBizSeq(downStreamMsgContext.msgExt));