Skip to content

Commit

Permalink
Merge pull request #2690 from dilraj-vyas/master
Browse files Browse the repository at this point in the history
[ISSUE #2678] [Enhancement] Method appears to call the same method on the same object [redundantly[AsyncHTTPPushRequest]
  • Loading branch information
xwm1992 authored Dec 29, 2022
2 parents 2b5c7f2 + b2e6a43 commit cee43df
Showing 1 changed file with 51 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,30 @@ public void tryHTTPRequest() {
HttpPost builder = new HttpPost(currPushUrl);

String requestCode = "";

if (SubscriptionType.SYNC == handleMsgContext.getSubscriptionItem().getType()) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
}

String localAddress = IPUtils.getLocalAddress();
builder.addHeader(ProtocolKey.REQUEST_CODE, requestCode);
builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
handleMsgContext.getEventMeshHTTPServer()
.getEventMeshHttpConfiguration().getEventMeshCluster());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
handleMsgContext.getEventMeshHTTPServer()
.getEventMeshHttpConfiguration().getEventMeshCluster());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv());
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshIDC());
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshIDC());

CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.RSP_URL, currPushUrl)
.withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
.build();
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.RSP_URL, currPushUrl)
.withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
.build();
handleMsgContext.setEvent(event);

String content = "";
Expand All @@ -134,7 +133,7 @@ public void tryHTTPRequest() {
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);

ProtocolTransportObject protocolTransportObject =
protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
if (protocolTransportObject instanceof HttpCommand) {
content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
} else {
Expand All @@ -158,36 +157,36 @@ public void tryHTTPRequest() {
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
RandomStringUtils.generateNum(20)));
RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
handleMsgContext.getBizSeqNo()));
handleMsgContext.getBizSeqNo()));
}
if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
RandomStringUtils.generateNum(20)));
RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
handleMsgContext.getUniqueId()));
handleMsgContext.getUniqueId()));
}

body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
handleMsgContext.getMsgRandomNo()));
handleMsgContext.getMsgRandomNo()));
body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));

body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));

HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8);

builder.setEntity(httpEntity);

// for CloudEvents Webhook spec
String urlAuthType = handleMsgContext.getConsumerGroupConfig().getConsumerGroupTopicConf()
.get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
.get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);

WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(),
eventMeshHttpConfiguration.getEventMeshWebhookOrigin(),
eventMeshHttpConfiguration.getEventMeshWebhookOrigin(),
urlAuthType);


Expand All @@ -199,7 +198,7 @@ public void tryHTTPRequest() {

if (CMD_LOGGER.isInfoEnabled()) {
CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
IPUtils.getLocalAddress(), currPushUrl);
localAddress, currPushUrl);
}

try {
Expand All @@ -215,18 +214,18 @@ public Object handleResponse(HttpResponse response) {
String res = "";
try {
res = EntityUtils.toString(response.getEntity(),
Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
} catch (IOException e) {
handleMsgContext.finish();
return new Object();
}
ClientRetCode result = processResponseContent(res);
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info(
"message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}",
result, currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
"message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}",
result, currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
}
if (result == ClientRetCode.OK || result == ClientRetCode.REMOTE_OK) {
complete();
Expand All @@ -253,9 +252,9 @@ public Object handleResponse(HttpResponse response) {
eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER.info(
"message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
"message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+ "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
}

if (isComplete()) {
Expand All @@ -268,14 +267,14 @@ public Object handleResponse(HttpResponse response) {

if (MESSAGE_LOGGER.isDebugEnabled()) {
MESSAGE_LOGGER.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
handleMsgContext.getTopic(),
handleMsgContext.getEvent());
handleMsgContext.getTopic(),
handleMsgContext.getEvent());
} else {
if (MESSAGE_LOGGER.isInfoEnabled()) {
MESSAGE_LOGGER
.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
}
}
} catch (IOException e) {
Expand All @@ -292,22 +291,22 @@ public Object handleResponse(HttpResponse response) {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("asyncPushRequest={")
.append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
.append(",startIdx=").append(startIdx)
.append(",retryTimes=").append(retryTimes)
.append(",uniqueId=").append(handleMsgContext.getUniqueId())
.append(",executeTime=")
.append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
.append(",lastPushTime=")
.append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
.append(",createTime=")
.append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
.append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
.append(",startIdx=").append(startIdx)
.append(",retryTimes=").append(retryTimes)
.append(",uniqueId=").append(handleMsgContext.getUniqueId())
.append(",executeTime=")
.append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
.append(",lastPushTime=")
.append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
.append(",createTime=")
.append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
return sb.toString();
}

boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) {
if (httpStatus == HttpStatus.SC_OK || httpStatus == HttpStatus.SC_CREATED
|| httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
|| httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
// success http response
return true;
} else if (httpStatus == 429) {
Expand Down Expand Up @@ -337,8 +336,8 @@ ClientRetCode processResponseContent(String content) {

try {
Map<String, Object> ret =
JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
});
JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
});
Integer retCode = (Integer) ret.get("retCode");
if (retCode != null && ClientRetCode.contains(retCode)) {
return ClientRetCode.get(retCode);
Expand All @@ -348,19 +347,19 @@ ClientRetCode processResponseContent(String content) {
} catch (NumberFormatException e) {
if (MESSAGE_LOGGER.isWarnEnabled()) {
MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
}
return ClientRetCode.FAIL;
} catch (JsonException e) {
if (MESSAGE_LOGGER.isWarnEnabled()) {
MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
}
return ClientRetCode.FAIL;
} catch (Throwable t) {
if (MESSAGE_LOGGER.isWarnEnabled()) {
MESSAGE_LOGGER.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
}
return ClientRetCode.FAIL;
}
Expand All @@ -372,7 +371,7 @@ private void addToWaitingMap(AsyncHTTPPushRequest request) {
return;
}
waitingRequests
.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
}

Expand Down

0 comments on commit cee43df

Please sign in to comment.