Skip to content

Commit

Permalink
[Issue apache#744] Fix Data models in Grpc Request-Reply API
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo committed Feb 2, 2022
1 parent 0450346 commit 7d05921
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,44 +59,33 @@ public static CloudEvent buildEvent(SimpleMessage message) {
String producerGroup = StringUtils.isEmpty(message.getProducerGroup())
? event.getExtension(ProtocolKey.PRODUCERGROUP).toString() : message.getProducerGroup();

CloudEventBuilder eventBuilder;
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
return CloudEventBuilder.v1(event)
.withSubject(topic)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl).build();
eventBuilder = CloudEventBuilder.v1(event);
} else {
return CloudEventBuilder.v03(event)
.withSubject(topic)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl).build();
eventBuilder = CloudEventBuilder.v03(event);
}

eventBuilder.withSubject(topic)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl);

message.getPropertiesMap().forEach((k, v) -> eventBuilder.withExtension(k, v));

return eventBuilder.build();
}

public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {
Expand Down Expand Up @@ -133,13 +122,13 @@ public static SimpleMessageWrapper buildSimpleMessage(CloudEvent cloudEvent) {

SimpleMessage.Builder messageBuilder = SimpleMessage.newBuilder()
.setHeader(header)
.setContent(new String( eventFormat.serialize(cloudEvent), StandardCharsets.UTF_8))
.setContent(new String(eventFormat.serialize(cloudEvent), StandardCharsets.UTF_8))
.setProducerGroup(producerGroup)
.setSeqNum(seqNum)
.setUniqueId(uniqueId)
.setTopic(cloudEvent.getSubject())
.setTtl(ttl)
.putProperties(ProtocolKey.CONTENT_TYPE, contentType) ;
.putProperties(ProtocolKey.CONTENT_TYPE, contentType);

for (String key : cloudEvent.getExtensionNames()) {
messageBuilder.putProperties(key, cloudEvent.getExtension(key).toString());
Expand Down Expand Up @@ -192,48 +181,33 @@ public static List<CloudEvent> buildBatchEvents(BatchMessage batchMessage) {
event.getExtension(ProtocolKey.PRODUCERGROUP).toString() : batchMessage.getProducerGroup();
String ttl = StringUtils.isEmpty(item.getTtl()) ? event.getExtension(ProtocolKey.TTL).toString() : item.getTtl();

CloudEventBuilder eventBuilder;
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
CloudEvent enhancedEvent = CloudEventBuilder.v1(event)
.withSubject(topic)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl)
.build();
cloudEvents.add(enhancedEvent);
eventBuilder = CloudEventBuilder.v1(event);
} else {
CloudEvent enhancedEvent = CloudEventBuilder.v03(event)
.withSubject(topic)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl)
.build();
cloudEvents.add(enhancedEvent);
eventBuilder = CloudEventBuilder.v03(event);
}

eventBuilder.withSubject(topic)
.withExtension(ProtocolKey.ENV, env)
.withExtension(ProtocolKey.IDC, idc)
.withExtension(ProtocolKey.IP, ip)
.withExtension(ProtocolKey.PID, pid)
.withExtension(ProtocolKey.SYS, sys)
.withExtension(ProtocolKey.USERNAME, username)
.withExtension(ProtocolKey.PASSWD, passwd)
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(ProtocolKey.SEQ_NUM, seqNum)
.withExtension(ProtocolKey.UNIQUE_ID, uniqueId)
.withExtension(ProtocolKey.PRODUCERGROUP, producerGroup)
.withExtension(ProtocolKey.TTL, ttl);

item.getPropertiesMap().forEach((k, v) -> eventBuilder.withExtension(k, v));

cloudEvents.add(eventBuilder.build());
}
return cloudEvents;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,20 @@ public void onCompleted() {
private Subscription buildReplyMessage(SimpleMessage reqMessage, T replyMessage) {
SimpleMessage simpleMessage = EventMeshClientUtil.buildSimpleMessage(replyMessage, clientConfig, listener.getProtocolType());

// set the producerGroup
simpleMessage = SimpleMessage.newBuilder(simpleMessage)
Subscription.Reply reply = Subscription.Reply.newBuilder()
.setProducerGroup(clientConfig.getConsumerGroup())
.build();

Subscription.Reply.Builder replyBuilder = Subscription.Reply.newBuilder()
.setProducerGroup(simpleMessage.getProducerGroup())
.setTopic(simpleMessage.getTopic())
.setContent(simpleMessage.getContent())
.setSeqNum(simpleMessage.getSeqNum())
.setUniqueId(simpleMessage.getUniqueId())
.setTtl(simpleMessage.getTtl())
.putAllProperties(reqMessage.getPropertiesMap())
.putAllProperties(simpleMessage.getPropertiesMap());

.putAllProperties(simpleMessage.getPropertiesMap())
.build();

return Subscription.newBuilder()
.setHeader(simpleMessage.getHeader())
.setReply(replyBuilder.build()).build();
.setReply(reply).build();
}

public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage;
import org.apache.eventmesh.common.protocol.grpc.protos.PublisherServiceGrpc.PublisherServiceBlockingStub;
import org.apache.eventmesh.common.protocol.grpc.protos.Response;
Expand Down Expand Up @@ -103,7 +102,7 @@ private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent, String timeout
.withExtension(ProtocolKey.SYS, clientConfig.getSys())
.withExtension(ProtocolKey.LANGUAGE, "JAVA")
.withExtension(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE)
.withExtension(ProtocolKey.PROTOCOL_DESC, cloudEvent.getSpecVersion().name())
.withExtension(ProtocolKey.PROTOCOL_DESC, "grpc")
.withExtension(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString())
.withExtension(ProtocolKey.UNIQUE_ID, RandomStringUtils.generateNum(30))
.withExtension(ProtocolKey.SEQ_NUM, RandomStringUtils.generateNum(30))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -16,6 +17,7 @@
import org.apache.eventmesh.common.protocol.grpc.protos.SimpleMessage;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,7 +65,15 @@ public static <T> T buildMessage(SimpleMessage message, String protocolType) {
try {
CloudEvent cloudEvent = EventFormatProvider.getInstance().resolveFormat(contentType)
.deserialize(content.getBytes(StandardCharsets.UTF_8));
return (T) cloudEvent;

CloudEventBuilder cloudEventBuilder = CloudEventBuilder.from(cloudEvent)
.withSubject(message.getTopic())
.withExtension(ProtocolKey.SEQ_NUM, message.getSeqNum())
.withExtension(ProtocolKey.UNIQUE_ID, message.getUniqueId());

message.getPropertiesMap().forEach((k, v) -> cloudEventBuilder.withExtension(k, v));

return (T) cloudEventBuilder.build();
} catch (Throwable t) {
logger.warn("Error in building message. {}", t.getMessage());
return null;
Expand Down Expand Up @@ -92,30 +102,47 @@ public static <T> SimpleMessage buildSimpleMessage(T message, EventMeshGrpcClien
String ttl = cloudEvent.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL) == null ? "4000"
: cloudEvent.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL).toString();

return SimpleMessage.newBuilder()
String seqNum = cloudEvent.getExtension(ProtocolKey.SEQ_NUM) == null ? RandomStringUtils.generateNum(30)
: cloudEvent.getExtension(ProtocolKey.SEQ_NUM).toString();

String uniqueId = cloudEvent.getExtension(ProtocolKey.UNIQUE_ID) == null ? RandomStringUtils.generateNum(30)
: cloudEvent.getExtension(ProtocolKey.UNIQUE_ID).toString();

SimpleMessage.Builder builder = SimpleMessage.newBuilder()
.setHeader(EventMeshClientUtil.buildHeader(clientConfig, protocolType))
.setProducerGroup(clientConfig.getProducerGroup())
.setTopic(cloudEvent.getSubject())
.setTtl(ttl)
.setSeqNum(cloudEvent.getExtension(ProtocolKey.SEQ_NUM).toString())
.setUniqueId(cloudEvent.getExtension(ProtocolKey.UNIQUE_ID).toString())
.setSeqNum(seqNum)
.setUniqueId(uniqueId)
.setContent(content)
.putProperties(ProtocolKey.CONTENT_TYPE, contentType)
.build();
.putProperties(ProtocolKey.CONTENT_TYPE, contentType);

for (String extName: cloudEvent.getExtensionNames()) {
builder.putProperties(extName, cloudEvent.getExtension(extName).toString());
}

return builder.build();
} else {
EventMeshMessage eventMeshMessage = (EventMeshMessage) message;

String ttl = eventMeshMessage.getProp(Constants.EVENTMESH_MESSAGE_CONST_TTL) == null ? "4000"
: eventMeshMessage.getProp(Constants.EVENTMESH_MESSAGE_CONST_TTL);
Map<String, String> props = eventMeshMessage.getProp() == null ? new HashMap<>() : eventMeshMessage.getProp();

String seqNum = eventMeshMessage.getBizSeqNo() == null ? RandomStringUtils.generateNum(30)
: eventMeshMessage.getBizSeqNo();

String uniqueId = eventMeshMessage.getUniqueId() == null ? RandomStringUtils.generateNum(30)
: eventMeshMessage.getUniqueId();

return SimpleMessage.newBuilder()
.setHeader(EventMeshClientUtil.buildHeader(clientConfig, protocolType))
.setProducerGroup(clientConfig.getProducerGroup())
.setTopic(eventMeshMessage.getTopic())
.setContent(eventMeshMessage.getContent())
.setSeqNum(eventMeshMessage.getBizSeqNo())
.setUniqueId(eventMeshMessage.getUniqueId())
.setSeqNum(seqNum)
.setUniqueId(uniqueId)
.setTtl(ttl)
.putAllProperties(props)
.build();
Expand Down

0 comments on commit 7d05921

Please sign in to comment.