Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix standalone connector interface, fix example #608

Merged
merged 1 commit into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import io.cloudevents.CloudEvent;



/**
* Consumer Interface.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.eventmesh.api.producer;

import org.apache.eventmesh.api.*;
import org.apache.eventmesh.api.LifeCycle;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.spi.EventMeshExtensionType;
import org.apache.eventmesh.spi.EventMeshSPI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

dependencies {
compileOnly project(":eventmesh-common")
compileOnly project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation project(":eventmesh-common")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.standalone.broker;

import io.cloudevents.CloudEvent;
import io.openmessaging.api.Message;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity;
Expand Down Expand Up @@ -53,7 +54,7 @@ public static StandaloneBroker getInstance() {
* @param message message
* @throws InterruptedException
*/
public MessageEntity putMessage(String topicName, Message message) throws InterruptedException {
public MessageEntity putMessage(String topicName, CloudEvent message) throws InterruptedException {
Pair<MessageQueue, AtomicLong> pair = createTopicIfAbsent(topicName);
AtomicLong topicOffset = pair.getRight();
MessageQueue messageQueue = pair.getLeft();
Expand All @@ -70,7 +71,7 @@ public MessageEntity putMessage(String topicName, Message message) throws Interr
*
* @param topicName
*/
public Message takeMessage(String topicName) throws InterruptedException {
public CloudEvent takeMessage(String topicName) throws InterruptedException {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).take().getMessage();
}
Expand All @@ -80,7 +81,7 @@ public Message takeMessage(String topicName) throws InterruptedException {
*
* @param topicName
*/
public Message getMessage(String topicName) {
public CloudEvent getMessage(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity head = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getHead();
if (head == null) {
Expand All @@ -96,7 +97,7 @@ public Message getMessage(String topicName) {
* @param offset offset
* @return
*/
public Message getMessage(String topicName, long offset) {
public CloudEvent getMessage(String topicName, long offset) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
MessageEntity messageEntity = messageContainer.computeIfAbsent(topicMetadata, k -> new MessageQueue()).getByOffset(offset);
if (messageEntity == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.standalone.broker.model;

import io.cloudevents.CloudEvent;
import io.openmessaging.api.Message;

import java.io.Serializable;
Expand All @@ -25,13 +26,13 @@ public class MessageEntity implements Serializable {

private TopicMetadata topicMetadata;

private Message message;
private CloudEvent message;

private long offset;

private long createTimeMills;

public MessageEntity(TopicMetadata topicMetadata, Message message, long offset, long currentTimeMills) {
public MessageEntity(TopicMetadata topicMetadata, CloudEvent message, long offset, long currentTimeMills) {
this.topicMetadata = topicMetadata;
this.message = message;
this.offset = offset;
Expand All @@ -46,11 +47,11 @@ public void setTopicMetadata(TopicMetadata topicMetadata) {
this.topicMetadata = topicMetadata;
}

public Message getMessage() {
public CloudEvent getMessage() {
return message;
}

public void setMessage(Message message) {
public void setMessage(CloudEvent message) {
this.message = message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.openmessaging.api.Message;
import io.cloudevents.CloudEvent;

public class SubScribeTask implements Runnable {

private String topicName;
private StandaloneBroker standaloneBroker;
private EventListener listener;
private volatile boolean isRunning;
private String topicName;
private StandaloneBroker standaloneBroker;
private EventListener listener;
private volatile boolean isRunning;

private AtomicInteger offset;

Expand All @@ -55,29 +55,31 @@ public void run() {
try {
logger.debug("execute subscribe task, topic: {}, offset: {}", topicName, offset);
if (offset == null) {
Message message = standaloneBroker.getMessage(topicName);
CloudEvent message = standaloneBroker.getMessage(topicName);
if (message != null) {
offset = new AtomicInteger((int) message.getOffset());
offset = new AtomicInteger((int) message.getExtension("offset"));
}
}
if (offset != null) {
Message message = standaloneBroker.getMessage(topicName, offset.get());
CloudEvent message = standaloneBroker.getMessage(topicName, offset.get());
if (message != null) {
EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
// update offset
logger.info("message commit, topic: {}, current offset:{}", topicName, offset.get());
logger.info("message commit, topic: {}, current offset:{}", topicName,
offset.get());
break;
case ReconsumeLater:
// don't update offset
break;
case ManualAck:
// update offset
offset.incrementAndGet();
logger.info("message ack, topic: {}, current offset:{}", topicName, offset.get());
logger
.info("message ack, topic: {}, current offset:{}", topicName, offset.get());
break;
default:

Expand All @@ -89,13 +91,14 @@ public void commit(EventMeshAction action) {
}

} catch (Exception ex) {
logger.error("consumer error, topic: {}, offset: {}", topicName, offset == null ? null : offset.get(), ex);
logger.error("consumer error, topic: {}, offset: {}", topicName, offset == null ? null : offset.get(),
ex);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Thread is interrupted, topic: {}, offset: {} thread name: {}",
topicName, offset == null ? null : offset.get(), Thread.currentThread().getName(), e);
topicName, offset == null ? null : offset.get(), Thread.currentThread().getName(), e);
Thread.currentThread().interrupt();
}
}
Expand Down
Loading