From 2a1e420f4718aa9c087a7a2ffb060e7e6953c705 Mon Sep 17 00:00:00 2001 From: hello vyas Date: Tue, 27 Dec 2022 11:03:08 +0530 Subject: [PATCH 1/3] #2678 --- .../core/protocol/http/push/AsyncHTTPPushRequest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 0cf7dcdd78..cf7e7ba594 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -90,7 +90,7 @@ public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext, @Override public void tryHTTPRequest() { - + String localAddress= IPUtils.getLocalAddress(); currPushUrl = getUrl(); if (StringUtils.isBlank(currPushUrl)) { @@ -113,7 +113,7 @@ public void tryHTTPRequest() { builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, handleMsgContext.getEventMeshHTTPServer() .getEventMeshHttpConfiguration().getEventMeshCluster()); - builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress()); + builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv()); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, @@ -199,7 +199,7 @@ public void tryHTTPRequest() { if (CMD_LOGGER.isInfoEnabled()) { CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode, - IPUtils.getLocalAddress(), currPushUrl); + localAddress, currPushUrl); } try { From 49de1dcbf47b53cc077deedafcfbee110ed44a7d Mon Sep 17 00:00:00 2001 From: hello vyas Date: Wed, 28 Dec 2022 13:49:45 +0530 Subject: [PATCH 2/3] checkStyle Formatting --- .../runtime/core/protocol/http/push/AsyncHTTPPushRequest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index cf7e7ba594..3fa71e6f39 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -90,7 +90,7 @@ public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext, @Override public void tryHTTPRequest() { - String localAddress= IPUtils.getLocalAddress(); + String localAddress = IPUtils.getLocalAddress(); currPushUrl = getUrl(); if (StringUtils.isBlank(currPushUrl)) { @@ -188,7 +188,7 @@ public void tryHTTPRequest() { WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(), eventMeshHttpConfiguration.getEventMeshWebhookOrigin(), - urlAuthType); + urlAuthType); eventMeshHTTPServer.metrics.getSummaryMetrics().recordPushMsg(); From b2e6a43341fee3bbe700e2f220bf698b120d78b2 Mon Sep 17 00:00:00 2001 From: hello vyas Date: Thu, 29 Dec 2022 12:43:11 +0530 Subject: [PATCH 3/3] resolve checkstyle --- .../http/push/AsyncHTTPPushRequest.java | 105 +++++++++--------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java index 3fa71e6f39..ffd31963f5 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java @@ -90,7 +90,7 @@ public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext, @Override public void tryHTTPRequest() { - String localAddress = IPUtils.getLocalAddress(); + currPushUrl = getUrl(); if (StringUtils.isBlank(currPushUrl)) { @@ -100,31 +100,30 @@ public void tryHTTPRequest() { HttpPost builder = new HttpPost(currPushUrl); String requestCode = ""; - if (SubscriptionType.SYNC == handleMsgContext.getSubscriptionItem().getType()) { requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode()); } else { requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode()); } - + String localAddress = IPUtils.getLocalAddress(); builder.addHeader(ProtocolKey.REQUEST_CODE, requestCode); builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA); builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion()); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, - handleMsgContext.getEventMeshHTTPServer() - .getEventMeshHttpConfiguration().getEventMeshCluster()); + handleMsgContext.getEventMeshHTTPServer() + .getEventMeshHttpConfiguration().getEventMeshCluster()); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, - handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv()); + handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv()); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, - handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshIDC()); + handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshIDC()); CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent()) - .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, - String.valueOf(System.currentTimeMillis())) - .withExtension(EventMeshConstants.RSP_URL, currPushUrl) - .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup()) - .build(); + .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, + String.valueOf(System.currentTimeMillis())) + .withExtension(EventMeshConstants.RSP_URL, currPushUrl) + .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup()) + .build(); handleMsgContext.setEvent(event); String content = ""; @@ -134,7 +133,7 @@ public void tryHTTPRequest() { ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); ProtocolTransportObject protocolTransportObject = - protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent()); + protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent()); if (protocolTransportObject instanceof HttpCommand) { content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString(); } else { @@ -158,25 +157,25 @@ public void tryHTTPRequest() { body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content)); if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) { body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, - RandomStringUtils.generateNum(20))); + RandomStringUtils.generateNum(20))); } else { body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, - handleMsgContext.getBizSeqNo())); + handleMsgContext.getBizSeqNo())); } if (StringUtils.isBlank(handleMsgContext.getUniqueId())) { body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, - RandomStringUtils.generateNum(20))); + RandomStringUtils.generateNum(20))); } else { body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, - handleMsgContext.getUniqueId())); + handleMsgContext.getUniqueId())); } body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO, - handleMsgContext.getMsgRandomNo())); + handleMsgContext.getMsgRandomNo())); body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic())); body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS, - JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent())))); + JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent())))); HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8); @@ -184,11 +183,11 @@ public void tryHTTPRequest() { // for CloudEvents Webhook spec String urlAuthType = handleMsgContext.getConsumerGroupConfig().getConsumerGroupTopicConf() - .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl); + .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl); WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(), - eventMeshHttpConfiguration.getEventMeshWebhookOrigin(), - urlAuthType); + eventMeshHttpConfiguration.getEventMeshWebhookOrigin(), + urlAuthType); eventMeshHTTPServer.metrics.getSummaryMetrics().recordPushMsg(); @@ -199,7 +198,7 @@ public void tryHTTPRequest() { if (CMD_LOGGER.isInfoEnabled()) { CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode, - localAddress, currPushUrl); + localAddress, currPushUrl); } try { @@ -215,7 +214,7 @@ public Object handleResponse(HttpResponse response) { String res = ""; try { res = EntityUtils.toString(response.getEntity(), - Charset.forName(EventMeshConstants.DEFAULT_CHARSET)); + Charset.forName(EventMeshConstants.DEFAULT_CHARSET)); } catch (IOException e) { handleMsgContext.finish(); return new Object(); @@ -223,10 +222,10 @@ public Object handleResponse(HttpResponse response) { ClientRetCode result = processResponseContent(res); if (MESSAGE_LOGGER.isInfoEnabled()) { MESSAGE_LOGGER.info( - "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", - result, currPushUrl, handleMsgContext.getTopic(), - handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost); + "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}" + + "|uniqueId={}|cost={}", + result, currPushUrl, handleMsgContext.getTopic(), + handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost); } if (result == ClientRetCode.OK || result == ClientRetCode.REMOTE_OK) { complete(); @@ -253,9 +252,9 @@ public Object handleResponse(HttpResponse response) { eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed(); if (MESSAGE_LOGGER.isInfoEnabled()) { MESSAGE_LOGGER.info( - "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}" - + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(), - handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost); + "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}" + + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(), + handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost); } if (isComplete()) { @@ -268,14 +267,14 @@ public Object handleResponse(HttpResponse response) { if (MESSAGE_LOGGER.isDebugEnabled()) { MESSAGE_LOGGER.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl, - handleMsgContext.getTopic(), - handleMsgContext.getEvent()); + handleMsgContext.getTopic(), + handleMsgContext.getEvent()); } else { if (MESSAGE_LOGGER.isInfoEnabled()) { MESSAGE_LOGGER - .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}", - currPushUrl, handleMsgContext.getTopic(), - handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId()); + .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}", + currPushUrl, handleMsgContext.getTopic(), + handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId()); } } } catch (IOException e) { @@ -292,22 +291,22 @@ public Object handleResponse(HttpResponse response) { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("asyncPushRequest={") - .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo()) - .append(",startIdx=").append(startIdx) - .append(",retryTimes=").append(retryTimes) - .append(",uniqueId=").append(handleMsgContext.getUniqueId()) - .append(",executeTime=") - .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT)) - .append(",lastPushTime=") - .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT)) - .append(",createTime=") - .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}"); + .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo()) + .append(",startIdx=").append(startIdx) + .append(",retryTimes=").append(retryTimes) + .append(",uniqueId=").append(handleMsgContext.getUniqueId()) + .append(",executeTime=") + .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT)) + .append(",lastPushTime=") + .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT)) + .append(",createTime=") + .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}"); return sb.toString(); } boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) { if (httpStatus == HttpStatus.SC_OK || httpStatus == HttpStatus.SC_CREATED - || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) { + || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) { // success http response return true; } else if (httpStatus == 429) { @@ -337,8 +336,8 @@ ClientRetCode processResponseContent(String content) { try { Map ret = - JsonUtils.deserialize(content, new TypeReference>() { - }); + JsonUtils.deserialize(content, new TypeReference>() { + }); Integer retCode = (Integer) ret.get("retCode"); if (retCode != null && ClientRetCode.contains(retCode)) { return ClientRetCode.get(retCode); @@ -348,19 +347,19 @@ ClientRetCode processResponseContent(String content) { } catch (NumberFormatException e) { if (MESSAGE_LOGGER.isWarnEnabled()) { MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, - handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content); + handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content); } return ClientRetCode.FAIL; } catch (JsonException e) { if (MESSAGE_LOGGER.isWarnEnabled()) { MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, - handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content); + handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content); } return ClientRetCode.FAIL; } catch (Throwable t) { if (MESSAGE_LOGGER.isWarnEnabled()) { MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, - handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content); + handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content); } return ClientRetCode.FAIL; } @@ -372,7 +371,7 @@ private void addToWaitingMap(AsyncHTTPPushRequest request) { return; } waitingRequests - .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet()); + .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet()); waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request); }