Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #563] SDK SUPPORT CLOUD EVENT #575

Merged
merged 9 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eventmesh-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'io.netty:netty-all'

implementation "io.cloudevents:cloudevents-core"
testImplementation project(":eventmesh-sdk-java")
testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribe implements ReceiveMsgHook {
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class);

Expand Down Expand Up @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive async msg====================={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
public class AsyncSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class);

Expand Down Expand Up @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive broadcast msg==============={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncResponse implements ReceiveMsgHook {
public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(SyncResponse.class);

Expand Down Expand Up @@ -66,4 +67,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) {
Package pkg = EventMeshTestUtils.rrResponse(msg);
ctx.writeAndFlush(pkg);
}

@Override
public EventMeshMessage convert(Package pkg) {
return null;
}
}
2 changes: 2 additions & 0 deletions eventmesh-sdk-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies {
implementation "io.netty:netty-all"
implementation "org.apache.httpcomponents:httpclient"

implementation "io.cloudevents:cloudevents-core"

testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-common")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.client.tcp;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.SubscriptionType;
Expand All @@ -31,6 +32,10 @@ public interface EventMeshClient {

Package publish(Package msg, long timeout) throws Exception;

Package publish(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(Package msg, long timeout) throws Exception;

void init() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp;


import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand All @@ -39,6 +40,10 @@ public interface SimplePubClient {

Package publish(Package msg, long timeout) throws Exception;

Package publish(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(CloudEvent cloudEvent, long timeout) throws Exception;

void broadcast(Package msg, long timeout) throws Exception;

void registerBusiHandler(ReceiveMsgHook handler) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ public class EventMeshCommon {
public static String PREFIX_SESSION_TPS_STAT_EVENTSEND = "event_send_tps_";

public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_";

public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
Expand Down Expand Up @@ -76,6 +77,14 @@ public static Package asyncMessageAck(Package in) {
return msg;
}

public static Package asyncCloudEvent(CloudEvent cloudEvent) {
Package msg = new Package();
msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0,
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME, generateRandomString(seqLength)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the package header contains Command, code, msg, seq. The msg correspond with code,so please add the new attribute to represent the protocol not use the msg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

msg.setBody(cloudEvent);
return msg;
}

public static Package broadcastMessageAck(Package in) {
Package msg = new Package();
msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, in.getHeader().getSeq()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.common.protocol.tcp.Package;

public interface ReceiveMsgHook {
/**
* ReceiveMsgHook.
*
* @param <T> receive message type.
*/
public interface ReceiveMsgHook<T> {
void handle(Package msg, ChannelHandlerContext ctx);

T convert(Package pkg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method just execute by itself, it's not suggested to add this in interface. This will make interface unclear.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp.impl;


import io.cloudevents.CloudEvent;
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.SimplePubClient;
import org.apache.eventmesh.client.tcp.SimpleSubClient;
Expand Down Expand Up @@ -73,10 +74,20 @@ public Package publish(Package msg, long timeout) throws Exception {
return this.pubClient.publish(msg, timeout);
}

@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws Exception {
return this.pubClient.publish(cloudEvent, timeout);
}

public void broadcast(Package msg, long timeout) throws Exception {
this.pubClient.broadcast(msg, timeout);
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception {
this.pubClient.broadcast(cloudEvent, timeout);
}

public void init() throws Exception {
this.subClient.init();
this.pubClient.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.cloudevents.CloudEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand Down Expand Up @@ -145,6 +146,23 @@ public Package publish(Package msg, long timeout) throws Exception {
return io(msg, timeout);
}


@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws Exception {
Package msg = MessageUtils.asyncCloudEvent(cloudEvent);
logger.info("SimplePubClientImpl|{}|publish|send|type={}|msg={}", clientNo,
msg.getHeader().getCommand(), msg);
return io(MessageUtils.asyncCloudEvent(cloudEvent), timeout);
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception {
Package msg = MessageUtils.asyncCloudEvent(cloudEvent);
logger.info("SimplePubClientImpl|{}|publish|send|type={}|msg={}", clientNo,
msg.getHeader().getCommand(), msg);
super.send(msg);
}

/**
* Send broadcast message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribe implements ReceiveMsgHook {
public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class);

Expand Down Expand Up @@ -63,7 +63,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive async msg====================={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
public class AsyncSubscribeBroadcast implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class);

Expand Down Expand Up @@ -63,7 +63,12 @@ public static void main(String[] agrs) throws Exception {

@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody();
EventMeshMessage eventMeshMessage = convert(msg);
logger.info("receive broadcast msg==============={}", eventMeshMessage);
}

@Override
public EventMeshMessage convert(Package pkg) {
return (EventMeshMessage) pkg.getBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncResponse implements ReceiveMsgHook {
public class SyncResponse implements ReceiveMsgHook<EventMeshMessage> {

public static Logger logger = LoggerFactory.getLogger(SyncResponse.class);

Expand Down Expand Up @@ -66,4 +67,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) {
Package pkg = EventMeshTestUtils.rrResponse(msg);
ctx.writeAndFlush(pkg);
}

@Override
public EventMeshMessage convert(Package pkg) {
return null;
}
}