From 5362d646484da224f76a38b9baf3ce641469754a Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Fri, 7 Jan 2022 12:37:49 -0500 Subject: [PATCH] [Issue #417] support cloudevents in GRPC --- .../protocol/grpc/common/ProtocolKey.java | 2 + .../protocol/http/common/ProtocolKey.java | 2 + .../CloudEventsBatchPublishInstance.java | 87 +++++++++ .../CloudEventsPublishInstance.java | 85 +++++++++ .../CloudEventsRequestReplyInstance.java | 86 +++++++++ .../AsyncPublishInstance.java | 2 +- .../BatchPublishInstance.java | 2 +- .../RequestReplyInstance.java | 2 +- .../grpc/sub/CloudEventsAsyncSubscribe.java | 68 +++++++ ...ribe.java => EventmeshAsyncSubscribe.java} | 10 +- .../sub/app/controller/SubController.java | 8 +- .../grpc/sub/app/service/SubService.java | 2 +- .../build.gradle | 1 + .../CloudEventsProtocolAdaptor.java | 20 +- .../grpc/GrpcMessageProtocolResolver.java | 179 ++++++++++++++++++ .../grpc/push/WebhookPushRequest.java | 8 + .../client/grpc/EventMeshGrpcConsumer.java | 50 +++-- .../client/grpc/EventMeshGrpcProducer.java | 30 ++- .../eventmesh/client/grpc/ReceiveMsgHook.java | 7 +- .../grpc/producer/CloudEventProducer.java | 166 ++++++++++++++++ .../client/grpc/util/EventMeshClientUtil.java | 5 +- 21 files changed, 790 insertions(+), 32 deletions(-) create mode 100644 eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java create mode 100644 eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java create mode 100644 eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java rename eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/{ => eventmeshmessage}/AsyncPublishInstance.java (98%) rename eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/{ => eventmeshmessage}/BatchPublishInstance.java (98%) rename eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/{ => eventmeshmessage}/RequestReplyInstance.java (98%) create mode 100644 eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java rename eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/{AsyncSubscribe.java => EventmeshAsyncSubscribe.java} (86%) create mode 100644 eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java create mode 100644 eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java index 62d171bc76..0294ce1c54 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/common/ProtocolKey.java @@ -37,4 +37,6 @@ public class ProtocolKey { public static final String TTL = "ttl"; public static final String PRODUCERGROUP = "producergroup"; public static final String TAG = "tag"; + + public static final String CONTENT_TYPE = "contenttype"; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java index e032f1fe9a..1af3b75154 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java @@ -29,6 +29,8 @@ public class ProtocolKey { public static final String PROTOCOL_DESC = "protocoldesc"; + public static final String CONTENT_TYPE = "contenttype"; + public static class ClientInstanceKey { ////////////////////////////////////Protocol layer requester description/////////// public static final String ENV = "env"; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java new file mode 100644 index 0000000000..4c3fcde8e6 --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsBatchPublishInstance.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.grpc.pub.cloudevents; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; +import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.common.utils.RandomStringUtils; +import org.apache.eventmesh.util.Utils; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +@Slf4j +public class CloudEventsBatchPublishInstance { + + public static void main(String[] args) throws Exception { + + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port"); + + final String topic = "TEST-TOPIC-GRPC-RR"; + + EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() + .serverAddr(eventMeshIp) + .serverPort(Integer.parseInt(eventMeshGrpcPort)) + .producerGroup("EventMeshTest-producerGroup") + .env("env").idc("idc") + .sys("1234").build(); + + EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig); + + eventMeshGrpcProducer.init(); + + Map content = new HashMap<>(); + content.put("content", "testRequestReplyMessage"); + + List cloudEventList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSubject(topic) + .withSource(URI.create("/")) + .withDataContentType("application/cloudevents+json") + .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) + .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) + .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) + .build(); + + cloudEventList.add(event); + } + + eventMeshGrpcProducer.publish(cloudEventList); + Thread.sleep(10000); + try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) { + // ignore + } + } +} diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java new file mode 100644 index 0000000000..ec0dd5ed01 --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsPublishInstance.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.grpc.pub.cloudevents; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; +import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.common.utils.RandomStringUtils; +import org.apache.eventmesh.util.Utils; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +@Slf4j +public class CloudEventsPublishInstance { + + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + + public static void main(String[] args) throws Exception { + + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port"); + + final String topic = "TEST-TOPIC-GRPC-ASYNC"; + + EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() + .serverAddr(eventMeshIp) + .serverPort(Integer.parseInt(eventMeshGrpcPort)) + .producerGroup("EventMeshTest-producerGroup") + .env("env").idc("idc") + .sys("1234").build(); + + EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig); + + eventMeshGrpcProducer.init(); + + Map content = new HashMap<>(); + content.put("content", "testAsyncMessage"); + + for (int i = 0; i < messageSize; i++) { + CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSubject(topic) + .withSource(URI.create("/")) + .withDataContentType("application/cloudevents+json") + .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) + .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) + .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) + .build(); + eventMeshGrpcProducer.publish(event); + Thread.sleep(1000); + } + Thread.sleep(30000); + try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) { + // ignore + } + } +} diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java new file mode 100644 index 0000000000..dd7203beaf --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/cloudevents/CloudEventsRequestReplyInstance.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.grpc.pub.cloudevents; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; +import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.common.utils.RandomStringUtils; +import org.apache.eventmesh.util.Utils; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +@Slf4j +public class CloudEventsRequestReplyInstance { + + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + + public static void main(String[] args) throws Exception { + + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port"); + + final String topic = "TEST-TOPIC-GRPC-RR"; + + EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() + .serverAddr(eventMeshIp) + .serverPort(Integer.parseInt(eventMeshGrpcPort)) + .producerGroup("EventMeshTest-producerGroup") + .env("env").idc("idc") + .sys("1234").build(); + + EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig); + + eventMeshGrpcProducer.init(); + + Map content = new HashMap<>(); + content.put("content", "testRequestReplyMessage"); + + for (int i = 0; i < messageSize; i++) { + CloudEvent event = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSubject(topic) + .withSource(URI.create("/")) + .withDataContentType("application/cloudevents+json") + .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME) + .withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8)) + .withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000)) + .build(); + + eventMeshGrpcProducer.requestReply(event, 10000); + Thread.sleep(1000); + } + Thread.sleep(30000); + try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) { + // ignore + } + } +} diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java similarity index 98% rename from eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/AsyncPublishInstance.java rename to eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java index 53dd34d8b4..552273b284 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/AsyncPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/AsyncPublishInstance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.grpc.pub; +package org.apache.eventmesh.grpc.pub.eventmeshmessage; import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/BatchPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java similarity index 98% rename from eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/BatchPublishInstance.java rename to eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java index 6f9edefa92..5f4c8fb89c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/BatchPublishInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/BatchPublishInstance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.grpc.pub; +package org.apache.eventmesh.grpc.pub.eventmeshmessage; import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/RequestReplyInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java similarity index 98% rename from eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/RequestReplyInstance.java rename to eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java index 2e964527ef..8273ec2d2b 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/RequestReplyInstance.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/pub/eventmeshmessage/RequestReplyInstance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.grpc.pub; +package org.apache.eventmesh.grpc.pub.eventmeshmessage; import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java new file mode 100644 index 0000000000..8730be65ea --- /dev/null +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/CloudEventsAsyncSubscribe.java @@ -0,0 +1,68 @@ +package org.apache.eventmesh.grpc.sub; + +import io.cloudevents.CloudEvent; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.client.grpc.EventMeshGrpcConsumer; +import org.apache.eventmesh.client.grpc.ReceiveMsgHook; +import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; +import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; +import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem; +import org.apache.eventmesh.util.Utils; + +import java.util.Optional; +import java.util.Properties; + +@Slf4j +public class CloudEventsAsyncSubscribe implements ReceiveMsgHook { + + public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe(); + + public static void main(String[] args) { + Properties properties = Utils.readPropertiesFile("application.properties"); + final String eventMeshIp = properties.getProperty("eventmesh.ip"); + final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port"); + + final String topic = "TEST-TOPIC-GRPC-ASYNC"; + + EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder() + .serverAddr(eventMeshIp) + .serverPort(Integer.parseInt(eventMeshGrpcPort)) + .consumerGroup("EventMeshTest-consumerGroup") + .env("env").idc("idc") + .sys("1234").build(); + + SubscriptionItem subscriptionItem = SubscriptionItem.newBuilder() + .setTopic(topic) + .setMode(SubscriptionItem.SubscriptionMode.CLUSTERING) + .setType(SubscriptionItem.SubscriptionType.ASYNC) + .build(); + + Subscription subscription = Subscription.newBuilder() + .addSubscriptionItems(subscriptionItem) + .build(); + + + EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig); + + eventMeshGrpcConsumer.init(); + + eventMeshGrpcConsumer.registerListener(handler); + + eventMeshGrpcConsumer.subscribeStream(subscription); + + + //eventMeshGrpcConsumer.unsubscribe(subscription); + } + + @Override + public Optional handle(CloudEvent msg) { + log.info("receive async msg====================={}", msg); + return Optional.empty(); + } + + @Override + public String getProtocolType() { + return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME; + } +} diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java similarity index 86% rename from eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/AsyncSubscribe.java rename to eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java index addc095c6a..939969e02c 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/EventmeshAsyncSubscribe.java @@ -4,6 +4,7 @@ import org.apache.eventmesh.client.grpc.EventMeshGrpcConsumer; import org.apache.eventmesh.client.grpc.ReceiveMsgHook; import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem; @@ -13,9 +14,9 @@ import java.util.Properties; @Slf4j -public class AsyncSubscribe implements ReceiveMsgHook { +public class EventmeshAsyncSubscribe implements ReceiveMsgHook { - public static AsyncSubscribe handler = new AsyncSubscribe(); + public static EventmeshAsyncSubscribe handler = new EventmeshAsyncSubscribe(); public static void main(String[] args) { Properties properties = Utils.readPropertiesFile("application.properties"); @@ -59,4 +60,9 @@ public Optional handle(EventMeshMessage msg) { log.info("receive async msg====================={}", msg); return Optional.empty(); } + + @Override + public String getProtocolType() { + return EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME; + } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/controller/SubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/controller/SubController.java index cb5b8489ed..002bcb19c1 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/controller/SubController.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/controller/SubController.java @@ -19,7 +19,6 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.provider.EventFormatProvider; -import io.cloudevents.jackson.JsonFormat; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; @@ -47,11 +46,14 @@ public class SubController { @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(HttpServletRequest request) { + String protocolType = request.getHeader(ProtocolKey.PROTOCOL_TYPE); String content = request.getParameter("content"); log.info("=======receive message======= {}", content); Map contentMap = JsonUtils.deserialize(content, HashMap.class); - if (StringUtils.equals(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME, contentMap.get(ProtocolKey.PROTOCOL_TYPE))) { - CloudEvent event = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE) + if (StringUtils.equals(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME, protocolType)) { + String contentType = request.getHeader(ProtocolKey.CONTENT_TYPE); + + CloudEvent event = EventFormatProvider.getInstance().resolveFormat(contentType) .deserialize(content.getBytes(StandardCharsets.UTF_8)); String data = new String(event.getData().toBytes(), StandardCharsets.UTF_8); log.info("=======receive data======= {}", data); diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java index 4bc755c4d8..e19419c0da 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/grpc/sub/app/service/SubService.java @@ -26,7 +26,7 @@ import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionMode; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem.SubscriptionType; import org.apache.eventmesh.common.utils.IPUtils; -import org.apache.eventmesh.grpc.pub.AsyncPublishInstance; +import org.apache.eventmesh.grpc.pub.eventmeshmessage.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle index 7f77c3db21..ed6c77b92e 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation "io.cloudevents:cloudevents-core" implementation "com.google.guava:guava" implementation "io.cloudevents:cloudevents-json-jackson" + implementation "io.grpc:grpc-protobuf:1.15.0" testImplementation "junit:junit" } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java index c54a88e7fa..86babadc31 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java @@ -18,7 +18,11 @@ package org.apache.eventmesh.protocol.cloudevents; import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.EventMeshMessage; import org.apache.eventmesh.common.protocol.ProtocolTransportObject; +import org.apache.eventmesh.common.protocol.grpc.common.BatchMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; import org.apache.eventmesh.common.protocol.http.HttpCommand; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.common.RequestCode; @@ -26,6 +30,7 @@ import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; +import org.apache.eventmesh.protocol.cloudevents.resolver.grpc.GrpcMessageProtocolResolver; import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver; import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchV2ProtocolResolver; import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver; @@ -69,6 +74,10 @@ public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws Protoc String requestCode = ((HttpCommand) cloudEvent).getRequestCode(); return deserializeHttpProtocol(requestCode, header, body); + } else if (cloudEvent instanceof EventMeshMessageWrapper) { + org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage eventMeshMessage + = ((EventMeshMessageWrapper) cloudEvent).getMessage(); + return GrpcMessageProtocolResolver.buildEvent(eventMeshMessage); } else { throw new ProtocolHandleException(String.format("protocol class: %s", cloudEvent.getClass())); } @@ -98,8 +107,13 @@ private CloudEvent deserializeHttpProtocol(String requestCode, @Override public List toBatchCloudEvent(ProtocolTransportObject protocol) - throws ProtocolHandleException { - return null; + throws ProtocolHandleException { + if (protocol instanceof BatchMessageWrapper) { + BatchMessage batchMessage = ((BatchMessageWrapper) protocol).getMessage(); + return GrpcMessageProtocolResolver.buildBatchEvents(batchMessage); + } else { + throw new ProtocolHandleException(String.format("protocol class: %s", protocol.getClass())); + } } @Override @@ -130,6 +144,8 @@ public Map toMap() { String.format("DateContentType:%s is not supported", dataContentType)); pkg.setBody(eventFormat.serialize(cloudEvent)); return pkg; + } else if (StringUtils.equals("grpc", protocolDesc)){ + return GrpcMessageProtocolResolver.buildEventMeshMessage(cloudEvent); } else { throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc)); } diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java new file mode 100644 index 0000000000..3a2d3def4b --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/grpc/GrpcMessageProtocolResolver.java @@ -0,0 +1,179 @@ +package org.apache.eventmesh.protocol.cloudevents.resolver.grpc; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.grpc.common.EventMeshMessageWrapper; +import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class GrpcMessageProtocolResolver { + + public static CloudEvent buildEvent(EventMeshMessage message) { + String cloudEventJson = message.getContent(); + + String contentType = message.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, "application/cloudevents+json"); + EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType); + CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8)); + + RequestHeader header = message.getHeader(); + String env = StringUtils.isEmpty(header.getEnv()) ? event.getExtension(ProtocolKey.ENV).toString() : header.getEnv(); + String idc = StringUtils.isEmpty(header.getIdc()) ? event.getExtension(ProtocolKey.IDC).toString() : header.getIdc(); + String ip = StringUtils.isEmpty(header.getIp()) ? event.getExtension(ProtocolKey.IP).toString() : header.getIp(); + String pid = StringUtils.isEmpty(header.getPid()) ? event.getExtension(ProtocolKey.PID).toString() : header.getPid(); + String sys = StringUtils.isEmpty(header.getSys()) ? event.getExtension(ProtocolKey.SYS).toString() : header.getSys(); + + String language = StringUtils.isEmpty(header.getLanguage()) + ? event.getExtension(ProtocolKey.LANGUAGE).toString() : header.getLanguage(); + + String protocolType = StringUtils.isEmpty(header.getProtocolType()) + ? event.getExtension(ProtocolKey.PROTOCOL_TYPE).toString() : header.getProtocolType(); + + String protocolDesc = StringUtils.isEmpty(header.getProtocolDesc()) + ? event.getExtension(ProtocolKey.PROTOCOL_DESC).toString() : header.getProtocolDesc(); + + String protocolVersion = StringUtils.isEmpty(header.getProtocolVersion()) + ? event.getExtension(ProtocolKey.PROTOCOL_VERSION).toString() : header.getProtocolVersion(); + + String uniqueId = StringUtils.isEmpty(message.getUniqueId()) + ? event.getExtension(ProtocolKey.UNIQUE_ID).toString() : message.getUniqueId(); + + String seqNum = StringUtils.isEmpty(message.getSeqNum()) + ? event.getExtension(ProtocolKey.SEQ_NUM).toString() : header.getProtocolVersion(); + + String username = StringUtils.isEmpty(header.getUsername()) ? event.getExtension(ProtocolKey.USERNAME).toString() : header.getUsername(); + String passwd = StringUtils.isEmpty(header.getPassword()) ? event.getExtension(ProtocolKey.PASSWD).toString() : header.getPassword(); + String ttl = StringUtils.isEmpty(message.getTtl()) ? event.getExtension(ProtocolKey.TTL).toString() : message.getTtl(); + + String producerGroup = StringUtils.isEmpty(message.getProducerGroup()) + ? event.getExtension(ProtocolKey.PRODUCERGROUP).toString() : message.getProducerGroup(); + + return CloudEventBuilder.from(event) + .withExtension(ProtocolKey.ENV, env) + .withExtension(ProtocolKey.IDC, idc) + .withExtension(ProtocolKey.IP, ip) + .withExtension(ProtocolKey.PID, pid) + .withExtension(ProtocolKey.SYS, sys) + .withExtension(ProtocolKey.USERNAME, username) + .withExtension(ProtocolKey.PASSWD, passwd) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(ProtocolKey.SEQ_NUM, seqNum) + .withExtension(ProtocolKey.UNIQUE_ID, uniqueId) + .withExtension(ProtocolKey.PRODUCERGROUP, producerGroup) + .withExtension(ProtocolKey.TTL, ttl).build(); + } + + public static EventMeshMessageWrapper buildEventMeshMessage(CloudEvent cloudEvent) { + String env = cloudEvent.getExtension(ProtocolKey.ENV) == null ? null : cloudEvent.getExtension(ProtocolKey.ENV).toString(); + String idc = cloudEvent.getExtension(ProtocolKey.IDC) == null ? null : cloudEvent.getExtension(ProtocolKey.IDC).toString(); + String ip = cloudEvent.getExtension(ProtocolKey.IP) == null ? null : cloudEvent.getExtension(ProtocolKey.IP).toString(); + String pid = cloudEvent.getExtension(ProtocolKey.PID) == null ? null : cloudEvent.getExtension(ProtocolKey.PID).toString(); + String sys = cloudEvent.getExtension(ProtocolKey.SYS) == null ? null : cloudEvent.getExtension(ProtocolKey.SYS).toString(); + String userName = cloudEvent.getExtension(ProtocolKey.USERNAME) == null ? null : cloudEvent.getExtension(ProtocolKey.USERNAME).toString(); + String passwd = cloudEvent.getExtension(ProtocolKey.PASSWD) == null ? null : cloudEvent.getExtension(ProtocolKey.PASSWD).toString(); + String language = cloudEvent.getExtension(ProtocolKey.LANGUAGE) == null ? null : cloudEvent.getExtension(ProtocolKey.LANGUAGE).toString(); + String protocol = cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE) == null ? null : + cloudEvent.getExtension(ProtocolKey.PROTOCOL_TYPE).toString(); + String protocolDesc = cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC) == null ? null : + cloudEvent.getExtension(ProtocolKey.PROTOCOL_DESC).toString(); + String protocolVersion = cloudEvent.getExtension(ProtocolKey.PROTOCOL_VERSION) == null ? null : + cloudEvent.getExtension(ProtocolKey.PROTOCOL_VERSION).toString(); + String seqNum = cloudEvent.getExtension(ProtocolKey.SEQ_NUM) == null ? null : cloudEvent.getExtension(ProtocolKey.SEQ_NUM).toString(); + String uniqueId = cloudEvent.getExtension(ProtocolKey.UNIQUE_ID) == null ? null : cloudEvent.getExtension(ProtocolKey.UNIQUE_ID).toString(); + String producerGroup = cloudEvent.getExtension(ProtocolKey.PRODUCERGROUP) == null ? null : + cloudEvent.getExtension(ProtocolKey.PRODUCERGROUP).toString(); + String ttl = cloudEvent.getExtension(ProtocolKey.TTL) == null ? null : cloudEvent.getExtension(ProtocolKey.TTL).toString(); + + RequestHeader header = RequestHeader.newBuilder() + .setEnv(env).setIdc(idc) + .setIp(ip).setPid(pid) + .setSys(sys).setUsername(userName).setPassword(passwd) + .setLanguage(language).setProtocolType(protocol) + .setProtocolDesc(protocolDesc).setProtocolVersion(protocolVersion) + .build(); + + String contentType = cloudEvent.getDataContentType(); + EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType); + + EventMeshMessage.Builder messageBuilder = EventMeshMessage.newBuilder() + .setHeader(header) + .setContent(new String( eventFormat.serialize(cloudEvent), StandardCharsets.UTF_8)) + .setProducerGroup(producerGroup) + .setSeqNum(seqNum) + .setUniqueId(uniqueId) + .setTopic(cloudEvent.getSubject()) + .setTtl(ttl) + .putProperties(ProtocolKey.CONTENT_TYPE, contentType) ; + + for (String key : cloudEvent.getExtensionNames()) { + messageBuilder.putProperties(key, cloudEvent.getExtension(key).toString()); + } + + EventMeshMessage eventMeshMessage = messageBuilder.build(); + + return new EventMeshMessageWrapper(eventMeshMessage); + } + + public static List buildBatchEvents(BatchMessage batchMessage) { + List cloudEvents = new ArrayList<>(); + + RequestHeader header = batchMessage.getHeader(); + + for (BatchMessage.MessageItem item : batchMessage.getMessageItemList()) { + String cloudEventJson = item.getContent(); + + String contentType = item.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, "application/cloudevents+json"); + EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(contentType); + CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8)); + + String env = StringUtils.isEmpty(header.getEnv()) ? event.getExtension(ProtocolKey.ENV).toString() : header.getEnv(); + String idc = StringUtils.isEmpty(header.getIdc()) ? event.getExtension(ProtocolKey.IDC).toString() : header.getIdc(); + String ip = StringUtils.isEmpty(header.getIp()) ? event.getExtension(ProtocolKey.IP).toString() : header.getIp(); + String pid = StringUtils.isEmpty(header.getPid()) ? event.getExtension(ProtocolKey.PID).toString() : header.getPid(); + String sys = StringUtils.isEmpty(header.getSys()) ? event.getExtension(ProtocolKey.SYS).toString() : header.getSys(); + + String language = StringUtils.isEmpty(header.getLanguage()) + ? event.getExtension(ProtocolKey.LANGUAGE).toString() : header.getLanguage(); + + String protocolType = StringUtils.isEmpty(header.getProtocolType()) + ? event.getExtension(ProtocolKey.PROTOCOL_TYPE).toString() : header.getProtocolType(); + + String protocolDesc = StringUtils.isEmpty(header.getProtocolDesc()) + ? event.getExtension(ProtocolKey.PROTOCOL_DESC).toString() : header.getProtocolDesc(); + + String protocolVersion = StringUtils.isEmpty(header.getProtocolVersion()) + ? event.getExtension(ProtocolKey.PROTOCOL_VERSION).toString() : header.getProtocolVersion(); + + String username = StringUtils.isEmpty(header.getUsername()) ? event.getExtension(ProtocolKey.USERNAME).toString() : header.getUsername(); + String passwd = StringUtils.isEmpty(header.getPassword()) ? event.getExtension(ProtocolKey.PASSWD).toString() : header.getPassword(); + + CloudEvent enhancedEvent = CloudEventBuilder.from(event) + .withExtension(ProtocolKey.ENV, env) + .withExtension(ProtocolKey.IDC, idc) + .withExtension(ProtocolKey.IP, ip) + .withExtension(ProtocolKey.PID, pid) + .withExtension(ProtocolKey.SYS, sys) + .withExtension(ProtocolKey.USERNAME, username) + .withExtension(ProtocolKey.PASSWD, passwd) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .build(); + + cloudEvents.add(enhancedEvent); + } + return cloudEvents; + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java index 46d22b0fd9..8dd613b8c6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/push/WebhookPushRequest.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody; import org.apache.eventmesh.common.protocol.http.common.ClientRetCode; import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; @@ -104,6 +105,13 @@ public void tryPushRequest() { builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshGrpcConfiguration.eventMeshEnv); builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshGrpcConfiguration.eventMeshIDC); + RequestHeader requestHeader = eventMeshMessage.getHeader(); + builder.addHeader(ProtocolKey.PROTOCOL_TYPE, requestHeader.getProtocolType()); + builder.addHeader(ProtocolKey.PROTOCOL_DESC, requestHeader.getProtocolDesc()); + builder.addHeader(ProtocolKey.PROTOCOL_VERSION, requestHeader.getProtocolVersion()); + builder.addHeader(ProtocolKey.CONTENT_TYPE, eventMeshMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, + "application/cloudevents+json")); + List body = new ArrayList<>(); body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, eventMeshMessage.getContent())); body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, eventMeshMessage.getSeqNum())); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java index 0af2e1229a..367cd23c2e 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcConsumer.java @@ -1,6 +1,9 @@ package org.apache.eventmesh.client.grpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.commons.lang3.StringUtils; @@ -16,9 +19,12 @@ import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader; import org.apache.eventmesh.common.protocol.grpc.protos.Response; import org.apache.eventmesh.common.protocol.grpc.protos.Subscription; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.awt.Event; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -38,7 +44,7 @@ public class EventMeshGrpcConsumer implements AutoCloseable { private ConsumerServiceBlockingStub consumerClient; private HeartbeatServiceBlockingStub heartbeatClient; - private ReceiveMsgHook listener; + private ReceiveMsgHook listener; private final List listenerThreads = new LinkedList<>(); @@ -68,7 +74,7 @@ public Response subscribe(Subscription subscription) { addSubscription(subscription); Subscription enhancedSubscription = Subscription.newBuilder(subscription) - .setHeader(EventMeshClientUtil.buildHeader(clientConfig)) + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME)) .setConsumerGroup(clientConfig.getConsumerGroup()) .build(); try { @@ -87,7 +93,7 @@ public void subscribeStream(Subscription subscription) { addSubscription(subscription); Subscription enhancedSubscription = Subscription.newBuilder(subscription) - .setHeader(EventMeshClientUtil.buildHeader(clientConfig)) + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME)) .setConsumerGroup(clientConfig.getConsumerGroup()) .build(); Iterator msgIterator; @@ -98,7 +104,7 @@ public void subscribeStream(Subscription subscription) { return; } - ListenerThread listenerThread = new ListenerThread(msgIterator, listener); + ListenerThread listenerThread = new ListenerThread(msgIterator, listener, listener.getProtocolType()); listenerThreads.add(listenerThread); listenerThread.start(); } @@ -122,7 +128,7 @@ public Response unsubscribe(Subscription subscription) { removeSubscription(subscription); Subscription enhancedSubscription = Subscription.newBuilder(subscription) - .setHeader(EventMeshClientUtil.buildHeader(clientConfig)) + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME)) .setConsumerGroup(clientConfig.getConsumerGroup()) .build(); @@ -136,14 +142,14 @@ public Response unsubscribe(Subscription subscription) { } } - public void registerListener(ReceiveMsgHook listener) { + public void registerListener(ReceiveMsgHook listener) { if (this.listener == null) { this.listener = listener; } } private void heartBeat() { - RequestHeader header = EventMeshClientUtil.buildHeader(clientConfig); + RequestHeader header = EventMeshClientUtil.buildHeader(clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME); scheduler.scheduleAtFixedRate(() -> { if (subscriptionMap.isEmpty()) { return; @@ -184,14 +190,17 @@ public void close() { scheduler.shutdown(); } - static class ListenerThread extends Thread { + static class ListenerThread extends Thread { private final Iterator msgIterator; - private final ReceiveMsgHook listener; + private final ReceiveMsgHook listener; - ListenerThread(Iterator msgIterator, ReceiveMsgHook listener) { + private String protocolType; + + ListenerThread(Iterator msgIterator, ReceiveMsgHook listener, String protocolType) { this.msgIterator = msgIterator; this.listener = listener; + this.protocolType = protocolType; } public void run() { @@ -199,11 +208,30 @@ public void run() { try { while (msgIterator.hasNext()) { logger.info("sdk received message "); - listener.handle(msgIterator.next()); + + EventMeshMessage eventMeshMessage = msgIterator.next(); + T msg = buildMessage(eventMeshMessage); + if (msg != null) { + listener.handle(msg); + } } } catch (Throwable t) { logger.warn("Error in handling message. {}", t.getMessage()); } } + + private T buildMessage(EventMeshMessage eventMeshMessage) { + try { + if (EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME.equals(protocolType)) { + String contentType = eventMeshMessage.getPropertiesOrDefault(ProtocolKey.CONTENT_TYPE, JsonFormat.CONTENT_TYPE); + return (T) EventFormatProvider.getInstance().resolveFormat(contentType) + .deserialize(eventMeshMessage.getContent().getBytes(StandardCharsets.UTF_8)); + } + return (T) eventMeshMessage; + } catch (Throwable t) { + logger.warn("Error in building message. {}", t.getMessage()); + return null; + } + } } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java index 7025717a9e..a521fb890c 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/EventMeshGrpcProducer.java @@ -1,8 +1,10 @@ package org.apache.eventmesh.client.grpc; +import io.cloudevents.CloudEvent; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.grpc.producer.CloudEventProducer; import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil; import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; @@ -12,16 +14,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class EventMeshGrpcProducer implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(EventMeshGrpcProducer.class); + private final static String PROTOCOL_TYPE = "eventmeshmessage"; + private final EventMeshGrpcClientConfig clientConfig; private ManagedChannel channel; private PublisherServiceBlockingStub publisherClient; + private CloudEventProducer cloudEventProducer; + public EventMeshGrpcProducer(EventMeshGrpcClientConfig clientConfig) { this.clientConfig = clientConfig; } @@ -30,13 +38,27 @@ public void init() { channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()) .usePlaintext().build(); publisherClient = PublisherServiceGrpc.newBlockingStub(channel); + + cloudEventProducer = new CloudEventProducer(clientConfig, publisherClient); + } + + public Response publish(CloudEvent cloudEvent) { + return cloudEventProducer.publish(cloudEvent); + } + + public Response publish(List cloudEventList) { + return cloudEventProducer.publish(cloudEventList); + } + + public Response requestReply(CloudEvent cloudEvent, int timeout) { + return cloudEventProducer.requestReply(cloudEvent, timeout); } public Response publish(EventMeshMessage message) { logger.info("Publish message " + message.toString()); EventMeshMessage enhancedMessage = EventMeshMessage.newBuilder(message) - .setHeader(EventMeshClientUtil.buildHeader(clientConfig)) + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) .setProducerGroup(clientConfig.getProducerGroup()) .build(); try { @@ -53,7 +75,7 @@ public Response requestReply(EventMeshMessage message, int timeout) { logger.info("RequestReply message " + message.toString()); EventMeshMessage enhancedMessage = EventMeshMessage.newBuilder(message) - .setHeader(EventMeshClientUtil.buildHeader(clientConfig)) + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) .setProducerGroup(clientConfig.getProducerGroup()) .setTtl(String.valueOf(timeout)) .build(); @@ -71,7 +93,7 @@ public Response publish(BatchMessage message) { logger.info("BatchPublish message " + message.toString()); BatchMessage enhancedMessage = BatchMessage.newBuilder(message) - .setHeader(EventMeshClientUtil.buildHeader(clientConfig)) + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) .setProducerGroup(clientConfig.getProducerGroup()) .build(); try { @@ -79,7 +101,7 @@ public Response publish(BatchMessage message) { logger.info("Received response " + response.toString()); return response; } catch (Exception e) { - logger.error("Error in RequestReply message {}, error {}", message, e.getMessage()); + logger.error("Error in BatchPublish message {}, error {}", message, e.getMessage()); return null; } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/ReceiveMsgHook.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/ReceiveMsgHook.java index 86ddd278c7..507a7cb7c3 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/ReceiveMsgHook.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/ReceiveMsgHook.java @@ -19,8 +19,7 @@ import java.util.Optional; -@FunctionalInterface -public interface ReceiveMsgHook { +public interface ReceiveMsgHook { /** * Handle the received message, return the response message. @@ -28,6 +27,8 @@ public interface ReceiveMsgHook { * @param msg * @return */ - Optional handle(EventMeshMessage msg); + Optional handle(T msg); + + String getProtocolType(); } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java new file mode 100644 index 0000000000..3941d94c9f --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/producer/CloudEventProducer.java @@ -0,0 +1,166 @@ +package org.apache.eventmesh.client.grpc.producer; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.provider.EventFormatProvider; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer; +import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig; +import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage.MessageItem; +import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage; +import org.apache.eventmesh.common.protocol.grpc.protos.PublisherServiceGrpc.PublisherServiceBlockingStub; +import org.apache.eventmesh.common.protocol.grpc.protos.Response; +import org.apache.eventmesh.common.protocol.grpc.common.ProtocolKey; +import org.apache.eventmesh.common.utils.IPUtils; +import org.apache.eventmesh.common.utils.RandomStringUtils; +import org.apache.eventmesh.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; + +public class CloudEventProducer { + private static final Logger logger = LoggerFactory.getLogger(EventMeshGrpcProducer.class); + + private final static String PROTOCOL_TYPE = "cloudevents"; + + private final EventMeshGrpcClientConfig clientConfig; + + private final PublisherServiceBlockingStub publisherClient; + + public CloudEventProducer(EventMeshGrpcClientConfig clientConfig, PublisherServiceBlockingStub publisherClient) { + this.clientConfig = clientConfig; + this.publisherClient = publisherClient; + } + + public Response publish(CloudEvent cloudEvent) { + logger.info("Publish message " + cloudEvent.toString()); + CloudEvent enhanceEvent = enhanceCloudEvent(cloudEvent, null); + + EventMeshMessage enhancedMessage = buildEventMeshMessage(enhanceEvent); + + try { + Response response = publisherClient.publish(enhancedMessage); + logger.info("Received response " + response.toString()); + return response; + } catch (Exception e) { + logger.error("Error in publishing message {}, error {}", cloudEvent, e.getMessage()); + return null; + } + } + + public Response requestReply(CloudEvent cloudEvent, int timeout) { + logger.info("RequestReply message " + cloudEvent.toString()); + CloudEvent enhanceEvent = enhanceCloudEvent(cloudEvent, String.valueOf(timeout)); + + EventMeshMessage enhancedMessage = buildEventMeshMessage(enhanceEvent); + try { + Response response = publisherClient.requestReply(enhancedMessage); + logger.info("Received response " + response.toString()); + return response; + } catch (Exception e) { + logger.error("Error in RequestReply message {}, error {}", cloudEvent, e.getMessage()); + return null; + } + } + + public Response publish(List events) { + logger.info("BatchPublish message, batch size=" + events.size()); + + if (events.size() == 0) { + return null; + } + List enhancedEvents = events.stream() + .map(event -> enhanceCloudEvent(event, null)) + .collect(Collectors.toList()); + + BatchMessage enhancedMessage = buildBatchMessage(enhancedEvents); + try { + Response response = publisherClient.batchPublish(enhancedMessage); + logger.info("Received response " + response.toString()); + return response; + } catch (Exception e) { + logger.error("Error in BatchPublish message {}, error {}", events, e.getMessage()); + return null; + } + } + + private CloudEvent enhanceCloudEvent(final CloudEvent cloudEvent, String timeout) { + CloudEventBuilder builder = CloudEventBuilder.from(cloudEvent) + .withExtension(ProtocolKey.ENV, clientConfig.getEnv()) + .withExtension(ProtocolKey.IDC, clientConfig.getIdc()) + .withExtension(ProtocolKey.IP, IPUtils.getLocalAddress()) + .withExtension(ProtocolKey.PID, Long.toString(ThreadUtils.getPID())) + .withExtension(ProtocolKey.SYS, clientConfig.getSys()) + .withExtension(ProtocolKey.LANGUAGE, "JAVA") + .withExtension(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE) + .withExtension(ProtocolKey.PROTOCOL_DESC, cloudEvent.getSpecVersion().name()) + .withExtension(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString()) + .withExtension(ProtocolKey.UNIQUE_ID, RandomStringUtils.generateNum(30)) + .withExtension(ProtocolKey.SEQ_NUM, RandomStringUtils.generateNum(30)) + .withExtension(ProtocolKey.USERNAME, clientConfig.getUserName()) + .withExtension(ProtocolKey.PASSWD, clientConfig.getPassword()) + .withExtension(ProtocolKey.PRODUCERGROUP, clientConfig.getProducerGroup()); + + if (timeout != null) { + builder.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, timeout); + } + return builder.build(); + } + + private EventMeshMessage buildEventMeshMessage(CloudEvent cloudEvent) { + String contentType = StringUtils.isEmpty(cloudEvent.getDataContentType()) ? "application/cloudevents+json" + : cloudEvent.getDataContentType(); + byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(contentType) + .serialize(cloudEvent); + String content = new String(bodyByte, StandardCharsets.UTF_8); + String ttl = cloudEvent.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL) == null ? "4000" + : cloudEvent.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL).toString(); + + return EventMeshMessage.newBuilder() + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) + .setProducerGroup(clientConfig.getProducerGroup()) + .setTopic(cloudEvent.getSubject()) + .setTtl(ttl) + .setSeqNum(cloudEvent.getExtension(ProtocolKey.SEQ_NUM).toString()) + .setUniqueId(cloudEvent.getExtension(ProtocolKey.UNIQUE_ID).toString()) + .setContent(content) + .putProperties(ProtocolKey.CONTENT_TYPE, contentType) + .build(); + } + + private BatchMessage buildBatchMessage(List events) { + BatchMessage.Builder messageBuilder = BatchMessage.newBuilder() + .setHeader(EventMeshClientUtil.buildHeader(clientConfig, PROTOCOL_TYPE)) + .setProducerGroup(clientConfig.getProducerGroup()) + .setTopic(events.get(0).getSubject()); + + for (CloudEvent event: events) { + String contentType = StringUtils.isEmpty(event.getDataContentType()) ? "application/cloudevents+json" + : event.getDataContentType(); + byte[] bodyByte = EventFormatProvider.getInstance().resolveFormat(contentType) + .serialize(event); + String content = new String(bodyByte, StandardCharsets.UTF_8); + + String ttl = event.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL) == null ? "4000" + : event.getExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL).toString(); + + MessageItem messageItem = MessageItem.newBuilder() + .setContent(content) + .setTtl(ttl) + .setSeqNum(event.getExtension(ProtocolKey.SEQ_NUM).toString()) + .setUniqueId(event.getExtension(ProtocolKey.UNIQUE_ID).toString()) + .putProperties(ProtocolKey.CONTENT_TYPE, contentType) + .build(); + + messageBuilder.addMessageItem(messageItem); + } + + return messageBuilder.build(); + } +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshClientUtil.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshClientUtil.java index 629ff93b52..c48ed9ee72 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshClientUtil.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/grpc/util/EventMeshClientUtil.java @@ -8,10 +8,9 @@ public class EventMeshClientUtil { - private final static String PROTOCOL_TYPE = "eventmeshmessage"; private final static String PROTOCOL_DESC = "grpc"; - public static RequestHeader buildHeader(EventMeshGrpcClientConfig clientConfig) { + public static RequestHeader buildHeader(EventMeshGrpcClientConfig clientConfig, String protocolType) { RequestHeader header = RequestHeader.newBuilder() .setEnv(clientConfig.getEnv()) .setIdc(clientConfig.getIdc()) @@ -21,7 +20,7 @@ public static RequestHeader buildHeader(EventMeshGrpcClientConfig clientConfig) .setLanguage(clientConfig.getLanguage()) .setUsername(clientConfig.getUserName()) .setPassword(clientConfig.getPassword()) - .setProtocolType(PROTOCOL_TYPE) + .setProtocolType(protocolType) .setProtocolDesc(PROTOCOL_DESC) // default CloudEvents version is V1 .setProtocolVersion(SpecVersion.V1.toString())