Skip to content

Commit

Permalink
[Issue apache#417] support cloudevents in GRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jinrongluo committed Jan 11, 2022
1 parent 11ab652 commit 5362d64
Show file tree
Hide file tree
Showing 21 changed files with 790 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public class ProtocolKey {
public static final String TTL = "ttl";
public static final String PRODUCERGROUP = "producergroup";
public static final String TAG = "tag";

public static final String CONTENT_TYPE = "contenttype";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class ProtocolKey {

public static final String PROTOCOL_DESC = "protocoldesc";

public static final String CONTENT_TYPE = "contenttype";

public static class ClientInstanceKey {
////////////////////////////////////Protocol layer requester description///////////
public static final String ENV = "env";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.grpc.protos.BatchMessage;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.util.Utils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

@Slf4j
public class CloudEventsBatchPublishInstance {

public static void main(String[] args) throws Exception {

Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port");

final String topic = "TEST-TOPIC-GRPC-RR";

EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.producerGroup("EventMeshTest-producerGroup")
.env("env").idc("idc")
.sys("1234").build();

EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);

eventMeshGrpcProducer.init();

Map<String, String> content = new HashMap<>();
content.put("content", "testRequestReplyMessage");

List<CloudEvent> cloudEventList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(topic)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();

cloudEventList.add(event);
}

eventMeshGrpcProducer.publish(cloudEventList);
Thread.sleep(10000);
try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) {
// ignore
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.util.Utils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

@Slf4j
public class CloudEventsPublishInstance {

// This messageSize is also used in SubService.java (Subscriber)
public static int messageSize = 5;

public static void main(String[] args) throws Exception {

Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port");

final String topic = "TEST-TOPIC-GRPC-ASYNC";

EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.producerGroup("EventMeshTest-producerGroup")
.env("env").idc("idc")
.sys("1234").build();

EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);

eventMeshGrpcProducer.init();

Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

for (int i = 0; i < messageSize; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(topic)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshGrpcProducer.publish(event);
Thread.sleep(1000);
}
Thread.sleep(30000);
try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) {
// ignore
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.grpc.protos.EventMeshMessage;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
import org.apache.eventmesh.util.Utils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

@Slf4j
public class CloudEventsRequestReplyInstance {

// This messageSize is also used in SubService.java (Subscriber)
public static int messageSize = 5;

public static void main(String[] args) throws Exception {

Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port");

final String topic = "TEST-TOPIC-GRPC-RR";

EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.producerGroup("EventMeshTest-producerGroup")
.env("env").idc("idc")
.sys("1234").build();

EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);

eventMeshGrpcProducer.init();

Map<String, String> content = new HashMap<>();
content.put("content", "testRequestReplyMessage");

for (int i = 0; i < messageSize; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(topic)
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();

eventMeshGrpcProducer.requestReply(event, 10000);
Thread.sleep(1000);
}
Thread.sleep(30000);
try (EventMeshGrpcProducer ignore = eventMeshGrpcProducer) {
// ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub;
package org.apache.eventmesh.grpc.pub.eventmeshmessage;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub;
package org.apache.eventmesh.grpc.pub.eventmeshmessage;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.eventmesh.grpc.pub;
package org.apache.eventmesh.grpc.pub.eventmeshmessage;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcProducer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.eventmesh.grpc.sub;

import io.cloudevents.CloudEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.client.grpc.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.ReceiveMsgHook;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription.SubscriptionItem;
import org.apache.eventmesh.util.Utils;

import java.util.Optional;
import java.util.Properties;

@Slf4j
public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {

public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe();

public static void main(String[] args) {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
final String eventMeshGrpcPort = properties.getProperty("eventmesh.grpc.port");

final String topic = "TEST-TOPIC-GRPC-ASYNC";

EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr(eventMeshIp)
.serverPort(Integer.parseInt(eventMeshGrpcPort))
.consumerGroup("EventMeshTest-consumerGroup")
.env("env").idc("idc")
.sys("1234").build();

SubscriptionItem subscriptionItem = SubscriptionItem.newBuilder()
.setTopic(topic)
.setMode(SubscriptionItem.SubscriptionMode.CLUSTERING)
.setType(SubscriptionItem.SubscriptionType.ASYNC)
.build();

Subscription subscription = Subscription.newBuilder()
.addSubscriptionItems(subscriptionItem)
.build();


EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);

eventMeshGrpcConsumer.init();

eventMeshGrpcConsumer.registerListener(handler);

eventMeshGrpcConsumer.subscribeStream(subscription);


//eventMeshGrpcConsumer.unsubscribe(subscription);
}

@Override
public Optional<CloudEvent> handle(CloudEvent msg) {
log.info("receive async msg====================={}", msg);
return Optional.empty();
}

@Override
public String getProtocolType() {
return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME;
}
}
Loading

0 comments on commit 5362d64

Please sign in to comment.