Skip to content

Commit

Permalink
Fix standalone connector interface, fix example (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Nov 24, 2021
1 parent c66c8f7 commit 5f61c24
Show file tree
Hide file tree
Showing 36 changed files with 696 additions and 717 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import io.cloudevents.CloudEvent;



/**
* Consumer Interface.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.eventmesh.api.producer;

import org.apache.eventmesh.api.*;
import org.apache.eventmesh.api.LifeCycle;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

dependencies {
compileOnly project(":eventmesh-common")
compileOnly project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation project(":eventmesh-common")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.standalone.broker;

import io.cloudevents.CloudEvent;
import io.openmessaging.api.Message;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity;
Expand Down Expand Up @@ -53,7 +54,7 @@ public static StandaloneBroker getInstance() {
* @param message message
* @throws InterruptedException
*/
public MessageEntity putMessage(String topicName, Message message) throws InterruptedException {
public MessageEntity putMessage(String topicName, CloudEvent message) throws InterruptedException {
Pair<MessageQueue, AtomicLong> pair = createTopicIfAbsent(topicName);
AtomicLong topicOffset = pair.getRight();
MessageQueue messageQueue = pair.getLeft();
Expand All @@ -70,7 +71,7 @@ public MessageEntity putMessage(String topicName, Message message) throws Interr
*
* @param topicName
*/
public Message takeMessage(String topicName) throws InterruptedException {
public CloudEvent takeMessage(String topicName) throws InterruptedException {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).take().getMessage();
}
Expand All @@ -80,7 +81,7 @@ public Message takeMessage(String topicName) throws InterruptedException {
*
* @param topicName
*/
public Message getMessage(String topicName) {
public CloudEvent getMessage(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getHead();
if (head == null) {
Expand All @@ -96,7 +97,7 @@ public Message getMessage(String topicName) {
* @param offset offset
* @return
*/
public Message getMessage(String topicName, long offset) {
public CloudEvent getMessage(String topicName, long offset) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity messageEntity = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getByOffset(offset);
if (messageEntity == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.standalone.broker.model;

import io.cloudevents.CloudEvent;
import io.openmessaging.api.Message;

import java.io.Serializable;
Expand All @@ -25,13 +26,13 @@ public class MessageEntity implements Serializable {

private TopicMetadata topicMetadata;

private Message message;
private CloudEvent message;

private long offset;

private long createTimeMills;

public MessageEntity(TopicMetadata topicMetadata, Message message, long offset, long currentTimeMills) {
public MessageEntity(TopicMetadata topicMetadata, CloudEvent message, long offset, long currentTimeMills) {
this.topicMetadata = topicMetadata;
this.message = message;
this.offset = offset;
Expand All @@ -46,11 +47,11 @@ public void setTopicMetadata(TopicMetadata topicMetadata) {
this.topicMetadata = topicMetadata;
}

public Message getMessage() {
public CloudEvent getMessage() {
return message;
}

public void setMessage(Message message) {
public void setMessage(CloudEvent message) {
this.message = message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.openmessaging.api.Message;
import io.cloudevents.CloudEvent;

public class SubScribeTask implements Runnable {

private String topicName;
private StandaloneBroker standaloneBroker;
private EventListener listener;
private volatile boolean isRunning;
private String topicName;
private StandaloneBroker standaloneBroker;
private EventListener listener;
private volatile boolean isRunning;

private AtomicInteger offset;

Expand All @@ -55,29 +55,31 @@ public void run() {
try {
logger.debug("execute subscribe task, topic: {}, offset: {}", topicName, offset);
if (offset == null) {
Message message = standaloneBroker.getMessage(topicName);
CloudEvent message = standaloneBroker.getMessage(topicName);
if (message != null) {
offset = new AtomicInteger((int) message.getOffset());
offset = new AtomicInteger((int) message.getExtension("offset"));
}
}
if (offset != null) {
Message message = standaloneBroker.getMessage(topicName, offset.get());
CloudEvent message = standaloneBroker.getMessage(topicName, offset.get());
if (message != null) {
EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
// update offset
logger.info("message commit, topic: {}, current offset:{}", topicName, offset.get());
logger.info("message commit, topic: {}, current offset:{}", topicName,
offset.get());
break;
case ReconsumeLater:
// don't update offset
break;
case ManualAck:
// update offset
offset.incrementAndGet();
logger.info("message ack, topic: {}, current offset:{}", topicName, offset.get());
logger
.info("message ack, topic: {}, current offset:{}", topicName, offset.get());
break;
default:

Expand All @@ -89,13 +91,14 @@ public void commit(EventMeshAction action) {
}

} catch (Exception ex) {
logger.error("consumer error, topic: {}, offset: {}", topicName, offset == null ? null : offset.get(), ex);
logger.error("consumer error, topic: {}, offset: {}", topicName, offset == null ? null : offset.get(),
ex);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Thread is interrupted, topic: {}, offset: {} thread name: {}",
topicName, offset == null ? null : offset.get(), Thread.currentThread().getName(), e);
topicName, offset == null ? null : offset.get(), Thread.currentThread().getName(), e);
Thread.currentThread().interrupt();
}
}
Expand Down
Loading

0 comments on commit 5f61c24

Please sign in to comment.