Skip to content

Commit

Permalink
[INLONG-8318][DataProxy] Change notification synchronization through …
Browse files Browse the repository at this point in the history
…condition variables and locks (#8320)
  • Loading branch information
gosonzhang authored Jun 26, 2023
1 parent 297e2d6 commit 017e400
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class StatConstants {
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING = "default.topic.empty";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED = "default.topic.used";
public static final java.lang.String EVENT_SINK_PRODUCER_NULL = "sink.producer.null";
public static final java.lang.String EVENT_SINK_CLUSTER_EMPTY = "sink.cluster.empty";
public static final java.lang.String EVENT_SINK_CLUSTER_UNMATCHED = "sink.cluster.unmatched";
public static final java.lang.String EVENT_SINK_CPRODUCER_NULL = "sink.cluster.producer.null";
public static final java.lang.String EVENT_SINK_SEND_EXCEPTION = "sink.send.exception";

public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.consts.StatConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,8 +31,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
*
Expand All @@ -46,7 +45,6 @@ public class MessageQueueZoneProducer {
private final CacheClusterSelector cacheClusterSelector;

private final AtomicInteger clusterIndex = new AtomicInteger(0);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private List<String> currentClusterNames = new ArrayList<>();
private final ConcurrentHashMap<String, Long> usingTimeMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, MessageQueueClusterProducer> usingClusterMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -150,24 +148,29 @@ public void clearExpiredProducers() {
*/
public boolean send(PackProfile profile) {
String clusterName;
List<String> tmpClusters;
MessageQueueClusterProducer clusterProducer;
readWriteLock.readLock().lock();
try {
do {
clusterName = currentClusterNames.get(
Math.abs(clusterIndex.getAndIncrement()) % currentClusterNames.size());
if (clusterName == null) {
continue;
}
clusterProducer = usingClusterMap.get(clusterName);
if (clusterProducer == null) {
continue;
}
return clusterProducer.send(profile);
} while (true);
} finally {
readWriteLock.readLock().unlock();
}
do {
tmpClusters = currentClusterNames;
if (tmpClusters == null || tmpClusters.isEmpty()) {
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_CLUSTER_EMPTY);
sleepSomeTime(100);
continue;
}
clusterName = tmpClusters.get(Math.abs(clusterIndex.getAndIncrement()) % tmpClusters.size());
if (clusterName == null) {
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_CLUSTER_UNMATCHED);
sleepSomeTime(100);
continue;
}
clusterProducer = usingClusterMap.get(clusterName);
if (clusterProducer == null) {
context.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_CPRODUCER_NULL, clusterName);
sleepSomeTime(100);
continue;
}
return clusterProducer.send(profile);
} while (true);
}

private void checkAndReloadClusterInfo() {
Expand Down Expand Up @@ -225,14 +228,9 @@ private void checkAndReloadClusterInfo() {
}
}
// replace cluster names
readWriteLock.writeLock().lock();
try {
if (!lastClusterNames.equals(currentClusterNames)) {
changed = true;
currentClusterNames = lastClusterNames;
}
} finally {
readWriteLock.writeLock().unlock();
if (!lastClusterNames.equals(currentClusterNames)) {
currentClusterNames = lastClusterNames;
changed = true;
}
// filter removed records
Set<String> needRmvs = new HashSet<>();
Expand Down Expand Up @@ -293,4 +291,12 @@ private void checkAndPublishTopics() {
clusterProducer.publishTopic(curTopicSet);
}
}

private void sleepSomeTime(long millis) {
try {
Thread.sleep(millis);
} catch (Throwable e) {
//
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* MessageQueueZoneSink
Expand All @@ -72,7 +74,8 @@ public class MessageQueueZoneSink extends AbstractSink implements Configurable,

private MessageQueueZoneProducer zoneProducer;
// configure change notify
private final Object syncLock = new Object();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private final AtomicLong lastNotifyTime = new AtomicLong(0);
// changeListerThread
private Thread configListener;
Expand Down Expand Up @@ -295,8 +298,13 @@ public void update() {
if (zoneProducer == null) {
return;
}
lastNotifyTime.set(System.currentTimeMillis());
syncLock.notifyAll();
reentrantLock.lock();
try {
lastNotifyTime.set(System.currentTimeMillis());
condition.signal();
} finally {
reentrantLock.unlock();
}
}

/**
Expand All @@ -311,14 +319,16 @@ private class ConfigChangeProcessor implements Runnable {
@Override
public void run() {
long lastCheckTime;
logger.info("{} config-change processor start!", getName());
while (!isShutdown) {
reentrantLock.lock();
try {
syncLock.wait();
} catch (InterruptedException e) {
logger.error("{} config-change processor meet interrupt, exit!", getName());
condition.await();
} catch (InterruptedException e1) {
logger.info("{} config-change processor meet interrupt, break!", getName());
break;
} catch (Throwable e2) {
//
} finally {
reentrantLock.unlock();
}
if (zoneProducer == null) {
continue;
Expand All @@ -328,6 +338,7 @@ public void run() {
zoneProducer.reloadMetaConfig();
} while (lastCheckTime != lastNotifyTime.get());
}
logger.info("{} config-change processor exit!", getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Event encEventPackage(BaseSource source, Channel channel) {
- BIN_MSG_ATTRLEN_SIZE - BIN_MSG_MAGIC_SIZE - origAttr.length(), (short) origAttr.length());
if (origAttr.length() > 0) {
System.arraycopy(origAttr.getBytes(StandardCharsets.UTF_8), 0, dataBuf.array(),
totalPkgLength - BIN_MSG_MAGIC_SIZE - origAttr.length(), bodyData.length);
totalPkgLength - BIN_MSG_MAGIC_SIZE - origAttr.length(), origAttr.length());
}
dataBuf.putShort(totalPkgLength - BIN_MSG_MAGIC_SIZE, (short) BIN_MSG_MAGIC);
// build InLong message
Expand Down

0 comments on commit 017e400

Please sign in to comment.