Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9600][Agent] Adjust the sins directory for code consistency #9601

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,22 @@

package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.filecollect.ProxyMessageCache;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
import org.apache.inlong.agent.plugin.file.Sink;
import org.apache.inlong.common.metric.MetricRegister;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
Expand All @@ -60,39 +52,26 @@ public abstract class AbstractSink implements Sink {
protected Map<String, String> dimensions;
protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);

protected JobProfile jobConf;
protected InstanceProfile profile;
protected String sourceName;
protected String jobInstanceId;
protected int batchFlushInterval;
// key is stream id, value is a batch of messages belong to the same stream id
protected ConcurrentHashMap<String, PackProxyMessage> cache;

@Override
public MessageFilter initMessageFilter(JobProfile jobConf) {
if (jobConf.hasKey(AGENT_MESSAGE_FILTER_CLASSNAME)) {
try {
return (MessageFilter) Class.forName(jobConf.get(AGENT_MESSAGE_FILTER_CLASSNAME))
.getDeclaredConstructor().newInstance();
} catch (Exception e) {
LOGGER.error("init message filter error", e);
}
}
return null;
}
protected ProxyMessageCache cache;

@Override
public void setSourceName(String sourceFileName) {
this.sourceName = sourceFileName;
}

@Override
public void init(JobProfile jobConf) {
this.jobConf = jobConf;
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
cache = new ConcurrentHashMap<>(10);
batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
public void init(InstanceProfile profile) {
this.profile = profile;
jobInstanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId);
batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);

this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;

import java.nio.charset.StandardCharsets;

Expand All @@ -33,7 +32,7 @@ public ConsoleSink() {
}

@Override
public void write(Message message) {
public boolean write(Message message) {
if (message != null) {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
// increment the count of successful sinks
Expand All @@ -42,6 +41,7 @@ public void write(Message message) {
// increment the count of failed sinks
sinkMetric.sinkFailCount.incrementAndGet();
}
return true;
}

@Override
Expand All @@ -50,17 +50,17 @@ public void setSourceName(String sourceFileName) {
}

@Override
public MessageFilter initMessageFilter(JobProfile jobConf) {
return null;
public void init(InstanceProfile profile) {
super.init(profile);
}

@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
public void destroy() {

}

@Override
public void destroy() {

public boolean sinkFinish() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -88,7 +84,7 @@ public class KafkaSink extends AbstractSink {
private boolean asyncSend;

@Override
public void init(JobProfile jobConf) {
public void init(InstanceProfile jobConf) {
super.init(jobConf);
int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE);
kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
Expand All @@ -103,38 +99,11 @@ public void init(JobProfile jobConf) {
kafkaSenders = new ArrayList<>();
initKafkaSender();
EXECUTOR_SERVICE.execute(sendDataThread());
EXECUTOR_SERVICE.execute(flushCache());
}

@Override
public void write(Message message) {
if (message == null || message instanceof EndMessage) {
return;
}

try {
ProxyMessage proxyMessage = new ProxyMessage(message);
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
if (packProxyMessage == null) {
packProxyMessage =
new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
}
// add message to package proxy
packProxyMessage.addProxyMessage(proxyMessage);
return packProxyMessage;
});
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} catch (Exception e) {
sinkMetric.sinkFailCount.incrementAndGet();
LOGGER.error("write job[{}] data to cache error", jobInstanceId, e);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
public boolean write(Message message) {
return true;
}

@Override
Expand All @@ -153,47 +122,8 @@ public void destroy() {
}
}

private boolean sinkFinish() {
return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && kafkaSendQueue.isEmpty();
}

/**
* flush cache by batch
*
* @return thread runner
*/
private Runnable flushCache() {
return () -> {
LOGGER.info("start kafka sink flush cache thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId);
while (!shutdown) {
try {
cache.forEach((batchKey, packProxyMessage) -> {
BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch();
if (batchProxyMessage == null) {
return;
}
try {
kafkaSendQueue.put(batchProxyMessage);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"send group id {}, message key {},with message size {}, the job id is {}, "
+ "read source is {} sendTime is {}",
inlongGroupId, batchKey,
batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
batchProxyMessage.getDataTime());
}
} catch (Exception e) {
LOGGER.error("flush job[{}] data to send queue exception", jobInstanceId, e);
}
});
AgentUtils.silenceSleepInMs(batchFlushInterval);
} catch (Exception ex) {
LOGGER.error("error caught", ex);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
}
};
public boolean sinkFinish() {
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.sinks.filecollect;
package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
Expand All @@ -29,6 +29,7 @@
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

Expand Down Expand Up @@ -115,7 +111,7 @@ public class PulsarSink extends AbstractSink {
private boolean asyncSend;

@Override
public void init(JobProfile jobConf) {
public void init(InstanceProfile jobConf) {
super.init(jobConf);
// agentConf
sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE);
Expand Down Expand Up @@ -149,41 +145,11 @@ public void init(JobProfile jobConf) {
pulsarSenders = new ArrayList<>();
initPulsarSender();
EXECUTOR_SERVICE.execute(sendDataThread());
EXECUTOR_SERVICE.execute(flushCache());
}

@Override
public void write(Message message) {
try {
if (message != null) {
if (!(message instanceof EndMessage)) {
ProxyMessage proxyMessage = new ProxyMessage(message);
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
if (packProxyMessage == null) {
packProxyMessage =
new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
}
// add message to package proxy
packProxyMessage.addProxyMessage(proxyMessage);
return packProxyMessage;
});
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} else {
// increment the count of failed sinks
sinkMetric.sinkFailCount.incrementAndGet();
}
}
} catch (Exception e) {
LOGGER.error("write message to Proxy sink error", e);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}

public boolean write(Message message) {
return true;
}

@Override
Expand All @@ -203,45 +169,8 @@ public void destroy() {
}
}

private boolean sinkFinish() {
return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && pulsarSendQueue.isEmpty();
}

/**
* flush cache by batch
*
* @return thread runner
*/
private Runnable flushCache() {
return () -> {
LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId);
while (!shutdown) {
try {
cache.forEach((batchKey, packProxyMessage) -> {
BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch();
if (batchProxyMessage != null) {
try {
sendQueueSemaphore.acquire();
pulsarSendQueue.put(batchProxyMessage);
LOGGER.info("send group id {}, message key {},with message size {}, the job id is {}, "
+ "read source is {} sendTime is {}", inlongGroupId, batchKey,
batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
batchProxyMessage.getDataTime());
} catch (Exception e) {
sendQueueSemaphore.release();
LOGGER.error("flush data to send queue", e);
}
}
});
} catch (Exception ex) {
LOGGER.error("error caught", ex);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
} finally {
AgentUtils.silenceSleepInMs(batchFlushInterval);
}
}
};
public boolean sinkFinish() {
return true;
}

/**
Expand Down
Loading