Skip to content

Commit

Permalink
Java sdk update (apache#615)
Browse files Browse the repository at this point in the history
* update java sdk

* fix compile error

* fix sdk error

1.remove the openmessage from connector-api
2.fix the standalone connector

* 1.fix the standalone connector
2.fix the sdk
  • Loading branch information
xwm1992 committed Dec 27, 2021
1 parent af05181 commit 4a0cc25
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public class Constants {

public static final String HTTPS_PROTOCOL_PREFIX = "https://";

public static final String PROTOCOL_TYPE = "protocol_type";
public static final String PROTOCOL_TYPE = "protocoltype";

public static final String PROTOCOL_VERSION = "protocol_version";
public static final String PROTOCOL_VERSION = "protocolversion";

public static final String PROTOCOL_DESC = "protocol_desc";
public static final String PROTOCOL_DESC = "protocoldesc";

public static final int DEFAULT_HTTP_TIME_OUT = 3000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,42 +40,42 @@ public StandaloneConsumerAdaptor() {

@Override
public boolean isStarted() {
return false;
return consumer.isStarted();
}

@Override
public boolean isClosed() {
return false;
return consumer.isClosed();
}

@Override
public void start() {

consumer.start();
}

@Override
public void shutdown() {

consumer.shutdown();
}

@Override
public void init(Properties keyValue) throws Exception {

consumer = new StandaloneConsumer(keyValue);
}

@Override
public void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context) {

consumer.updateOffset(cloudEvents, context);
}

@Override
public void subscribe(String topic, EventListener listener) throws Exception {

consumer.subscribe(topic, listener);
}

@Override
public void unsubscribe(String topic) {

consumer.unsubscribe(topic);
}

// @Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public static void main(String[] agrs) throws Exception {

Thread.sleep(1000);
}
client.listen();
Thread.sleep(2000);
} catch (Exception e) {
logger.warn("AsyncPublish failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand All @@ -39,6 +40,8 @@ public class AsyncSubscribe implements ReceiveMsgHook<EventMeshMessage> {

public static AsyncSubscribe handler = new AsyncSubscribe();

private static EventMeshTCPClient client;

public static void main(String[] agrs) throws Exception {
Properties properties = Utils.readPropertiesFile("application.properties");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
Expand All @@ -49,8 +52,8 @@ public static void main(String[] agrs) throws Exception {
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
try (EventMeshTCPClient<EventMeshMessage> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, EventMeshMessage.class)) {
try {
client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class);
client.init();
client.heartbeat();

Expand All @@ -59,6 +62,11 @@ public static void main(String[] agrs) throws Exception {

client.listen();

//client.unsubscribe();

// release resource and close client
// client.close();

} catch (Exception e) {
log.warn("AsyncSubscribe failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
Expand Down Expand Up @@ -128,7 +129,7 @@ public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshExce
Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER);
log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg);
return io(msg, timeout);
} catch (Exception ex) {
throw new EventMeshException("publish error", ex);
Expand All @@ -141,7 +142,7 @@ public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshExcep
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER);
log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg);
super.send(msg);
} catch (Exception ex) {
throw new EventMeshException("Broadcast message error", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand Down Expand Up @@ -72,8 +73,8 @@ public void init() throws EventMeshException {

@Override
public void heartbeat() throws EventMeshException {
if (task != null) {
synchronized (EventMeshMessageTCPPubClient.class) {
// if (task != null) {
// synchronized (EventMeshMessageTCPPubClient.class) {
task = scheduler.scheduleAtFixedRate(() -> {
try {
if (!isActive()) {
Expand All @@ -86,8 +87,8 @@ public void heartbeat() throws EventMeshException {
}
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
}
}
// }
// }

@Override
public void reconnect() throws EventMeshException {
Expand Down Expand Up @@ -128,9 +129,9 @@ public Package publish(EventMeshMessage eventMeshMessage, long timeout) throws E
try {
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.ASYNC_MESSAGE_TO_SERVER);
log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
log.info("SimplePubClientImpl em message|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg);
return io(msg, timeout);
} catch (Exception ex) {
throw new EventMeshException("publish error", ex);
Expand All @@ -143,7 +144,7 @@ public void broadcast(EventMeshMessage eventMeshMessage, long timeout) throws Ev
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(eventMeshMessage, Command.BROADCAST_MESSAGE_TO_SERVER);
log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
msg.getHeader().getProperty(Constants.PROTOCOL_TYPE), msg);
super.send(msg);
} catch (Exception ex) {
throw new EventMeshException("Broadcast message error", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void init() throws EventMeshException {

@Override
public void heartbeat() throws EventMeshException {
if (task == null) {
synchronized (EventMeshMessageTCPSubClient.class) {
// if (task == null) {
// synchronized (EventMeshMessageTCPSubClient.class) {
task = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Expand All @@ -88,8 +88,8 @@ public void run() {
}
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
}
}
// }
// }

@Override
public void reconnect() throws EventMeshException {
Expand Down

0 comments on commit 4a0cc25

Please sign in to comment.