diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java new file mode 100644 index 00000000000..6dc0127f8bf --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/AckPolicy.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.config.holder; + +/** + * AckPolicy + * + */ +public enum AckPolicy { + + COUNT(0), TOKEN(1); + + private final int value; + + /** + * Constructor + * @param value + */ + private AckPolicy(int value) { + this.value = value; + } + + /** + * getValue + * @return int + */ + public int getValue() { + return value; + } + + /** + * getAckPolicy + * @param value + * @return AckPolicy + */ + public static AckPolicy getAckPolicy(int value) { + switch (value) { + case 0 : + return COUNT; + case 1 : + return TOKEN; + default : + return COUNT; + } + } + + /** + * getAckPolicy + * @param name + * @return AckPolicy + */ + public static AckPolicy getAckPolicy(String name) { + if (AckPolicy.COUNT.name().equalsIgnoreCase(name)) { + return AckPolicy.COUNT; + } else if (AckPolicy.TOKEN.name().equalsIgnoreCase(name)) { + return AckPolicy.TOKEN; + } else { + return AckPolicy.COUNT; + } + } + +} diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java index 05100fe6873..129ce24f0b4 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java @@ -17,9 +17,6 @@ package org.apache.inlong.sort.standalone.config.holder; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Context; @@ -28,6 +25,9 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import org.slf4j.Logger; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * * CommonPropertiesHolder @@ -38,11 +38,13 @@ public class CommonPropertiesHolder { public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName(); public static final String KEY_COMMON_PROPERTIES = "common_properties_loader"; public static final String KEY_CLUSTER_ID = "clusterId"; + public static final String KEY_SORT_SOURCE_ACKPOLICY = "sortSource.ackPolicy"; private static Map props; private static Context context; private static long auditFormatInterval = 60000L; + private static AckPolicy ackPolicy; /** * init @@ -60,8 +62,11 @@ private static void init() { CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject; props.putAll(loader.load()); LOG.info("loaderClass:{},properties:{}", loaderClassName, props); - auditFormatInterval = NumberUtils + CommonPropertiesHolder.auditFormatInterval = NumberUtils .toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L); + String strAckPolicy = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_ACKPOLICY, + AckPolicy.COUNT.name()); + CommonPropertiesHolder.ackPolicy = AckPolicy.getAckPolicy(strAckPolicy); } } catch (Throwable t) { LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}", @@ -205,4 +210,12 @@ public static long getAuditFormatInterval() { return auditFormatInterval; } + /** + * get ackPolicy + * @return the ackPolicy + */ + public static AckPolicy getAckPolicy() { + return ackPolicy; + } + } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java index 52f2643d34d..57ad61a8114 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java @@ -81,7 +81,7 @@ public void put(Event event) throws ChannelException { ProfileEvent profile = (ProfileEvent) event; transaction.doPut(profile); } else { - ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders(), null); + ProfileEvent profile = new ProfileEvent(event.getHeaders(), event.getBody()); transaction.doPut(profile); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java index 63dd517503b..bda5f246eb0 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java @@ -19,9 +19,12 @@ import org.apache.inlong.sdk.sort.api.SortClient; import org.apache.inlong.sdk.sort.entity.MessageRecord; +import org.apache.inlong.sort.standalone.config.holder.AckPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -35,6 +38,8 @@ public class CacheMessageRecord { private final String msgKey; private final String offset; private final AtomicInteger ackCount; + private final AckPolicy ackPolicy; + private Set tokenSet; /** * Constructor @@ -42,17 +47,47 @@ public class CacheMessageRecord { * @param msgRecord * @param client */ - public CacheMessageRecord(MessageRecord msgRecord, SortClient client) { + public CacheMessageRecord(MessageRecord msgRecord, SortClient client, AckPolicy ackPolicy) { this.msgKey = msgRecord.getMsgKey(); this.offset = msgRecord.getOffset(); this.ackCount = new AtomicInteger(msgRecord.getMsgs().size()); this.client = client; + this.ackPolicy = ackPolicy; + if (AckPolicy.TOKEN.equals(ackPolicy)) { + this.tokenSet = new HashSet<>(); + for (int i = 0; i < msgRecord.getMsgs().size(); i++) { + this.tokenSet.add(i); + } + } + } + + /** + * getToken + * @return + */ + public Integer getToken() { + if (AckPolicy.TOKEN.equals(ackPolicy)) { + return this.ackCount.decrementAndGet(); + } + return 0; } /** * ackMessage + * @param ackToken ackToken + */ + public void ackMessage(int ackToken) { + if (AckPolicy.TOKEN.equals(ackPolicy)) { + this.ackMessageByToken(ackToken); + return; + } + this.ackMessageByCount(); + } + + /** + * ackMessageByCount */ - public void ackMessage() { + private void ackMessageByCount() { int result = this.ackCount.decrementAndGet(); if (result == 0 && client != null) { try { @@ -62,4 +97,20 @@ public void ackMessage() { } } } + + /** + * ackMessageByToken + * @param ackToken ackToken + */ + private void ackMessageByToken(int ackToken) { + this.tokenSet.remove(ackToken); + int result = this.tokenSet.size(); + if (result == 0 && client != null) { + try { + client.ack(msgKey, offset); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java index 27f7962bf82..2dfc3dd734f 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java @@ -19,6 +19,7 @@ import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.event.SimpleEvent; +import org.apache.inlong.sdk.sort.entity.InLongMessage; import org.apache.inlong.sort.standalone.config.pojo.InlongId; import org.apache.inlong.sort.standalone.utils.Constants; @@ -35,25 +36,45 @@ public class ProfileEvent extends SimpleEvent { private final String uid; private final long rawLogTime; + private final String sourceIp; private final long fetchTime; - private final CacheMessageRecord cacheRecord; + private CacheMessageRecord cacheRecord; + private final int ackToken; /** * Constructor - * - * @param body * @param headers - * @param cacheRecord + * @param body */ - public ProfileEvent(byte[] body, Map headers, CacheMessageRecord cacheRecord) { - super.setBody(body); + public ProfileEvent(Map headers, byte[] body) { super.setHeaders(headers); - this.cacheRecord = cacheRecord; + super.setBody(body); this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID); this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID); this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); this.fetchTime = System.currentTimeMillis(); this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime); + this.sourceIp = headers.get(Constants.HEADER_KEY_SOURCE_IP); + this.ackToken = 0; + } + + /** + * Constructor + * + * @param sdkMessage + * @param cacheRecord + */ + public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord cacheRecord) { + super.setHeaders(sdkMessage.getParams()); + super.setBody(sdkMessage.getBody()); + this.inlongGroupId = sdkMessage.getInlongGroupId(); + this.inlongStreamId = sdkMessage.getInlongStreamId(); + this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); + this.rawLogTime = sdkMessage.getMsgTime(); + this.sourceIp = sdkMessage.getSourceIp(); + this.cacheRecord = cacheRecord; + this.fetchTime = System.currentTimeMillis(); + this.ackToken = cacheRecord.getToken(); } /** @@ -83,6 +104,14 @@ public long getRawLogTime() { return rawLogTime; } + /** + * get sourceIp + * @return the sourceIp + */ + public String getSourceIp() { + return sourceIp; + } + /** * get fetchTime * @@ -115,7 +144,7 @@ public CacheMessageRecord getCacheRecord() { */ public void ack() { if (cacheRecord != null) { - cacheRecord.ackMessage(); + cacheRecord.ackMessage(ackToken); } } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java index 244a4a42660..65ce2cb93e5 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java @@ -25,7 +25,6 @@ import com.tencentcloudapi.cls.producer.util.NetworkUtils; import org.apache.commons.lang3.ClassUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; @@ -268,25 +267,22 @@ private void removeExpireClient(String secretId) { public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) { Map dimensions = this.getDimensions(currentRecord, bid); SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); - long count = 1; - long size = currentRecord.getBody().length; if (result) { - metricItem.sendSuccessCount.addAndGet(count); - metricItem.sendSuccessSize.addAndGet(size); + metricItem.sendSuccessCount.incrementAndGet(); + metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length); AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord); if (sendTime > 0) { - long currentTime = System.currentTimeMillis(); + final long currentTime = System.currentTimeMillis(); long sinkDuration = currentTime - sendTime; - long nodeDuration = currentTime - - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, currentRecord.getRawLogTime()); + long nodeDuration = currentTime - currentRecord.getFetchTime(); long wholeDuration = currentTime - currentRecord.getRawLogTime(); - metricItem.sinkDuration.addAndGet(sinkDuration * count); - metricItem.nodeDuration.addAndGet(nodeDuration * count); - metricItem.wholeDuration.addAndGet(wholeDuration * count); + metricItem.sinkDuration.addAndGet(sinkDuration); + metricItem.nodeDuration.addAndGet(nodeDuration); + metricItem.wholeDuration.addAndGet(wholeDuration); } } else { - metricItem.sendFailCount.addAndGet(count); - metricItem.sendFailSize.addAndGet(size); + metricItem.sendFailCount.incrementAndGet(); + metricItem.sendFailSize.addAndGet(currentRecord.getBody().length); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java index 3f6abbd4ab1..6f22a6876cb 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java @@ -228,28 +228,26 @@ public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean fillInlongId(currentRecord, dimensions); dimensions.put(SortMetricItem.KEY_SINK_ID, this.getSinkName()); dimensions.put(SortMetricItem.KEY_SINK_DATA_ID, bid); + final long currentTime = System.currentTimeMillis(); long msgTime = currentRecord.getRawLogTime(); long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval(); dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime)); SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions); - long count = 1; - long size = currentRecord.getBody().length; if (result) { - metricItem.sendSuccessCount.addAndGet(count); - metricItem.sendSuccessSize.addAndGet(size); + metricItem.sendSuccessCount.incrementAndGet(); + metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length); AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord); if (sendTime > 0) { - long currentTime = System.currentTimeMillis(); long sinkDuration = currentTime - sendTime; - long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime); - long wholeDuration = currentTime - msgTime; - metricItem.sinkDuration.addAndGet(sinkDuration * count); - metricItem.nodeDuration.addAndGet(nodeDuration * count); - metricItem.wholeDuration.addAndGet(wholeDuration * count); + long nodeDuration = currentTime - currentRecord.getFetchTime(); + long wholeDuration = currentTime - currentRecord.getRawLogTime(); + metricItem.sinkDuration.addAndGet(sinkDuration); + metricItem.nodeDuration.addAndGet(nodeDuration); + metricItem.wholeDuration.addAndGet(wholeDuration); } } else { - metricItem.sendFailCount.addAndGet(count); - metricItem.sendFailSize.addAndGet(size); + metricItem.sendFailCount.incrementAndGet(); + metricItem.sendFailSize.addAndGet(currentRecord.getBody().length); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java index 458a22f4ed5..af09e1e9a22 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java @@ -18,9 +18,7 @@ package org.apache.inlong.sort.standalone.source.sortsdk; import com.google.common.base.Preconditions; -import java.util.List; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; + import org.apache.flume.channel.ChannelProcessor; import org.apache.inlong.sdk.sort.api.ReadCallback; import org.apache.inlong.sdk.sort.api.SortClient; @@ -28,21 +26,18 @@ import org.apache.inlong.sdk.sort.entity.MessageRecord; import org.apache.inlong.sort.standalone.channel.CacheMessageRecord; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + /** * Implementation of {@link ReadCallback}. * - * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store - * The code should be like : - * - * public void onFinished(final MessageRecord messageRecord, ACKer acker) { - * doSomething(); - * final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker); - * channelProcessor.processEvent(profileEvent); - * } - * * The ACKer will be used to ACK upstream after that the downstream ACKed sort-standalone. * This process seems like transaction of the whole sort-standalone, and which * ensure At Least One semantics. @@ -103,14 +98,10 @@ public void setClient(@NotNull SortClient client) { public void onFinished(final MessageRecord messageRecord) { try { Preconditions.checkState(messageRecord != null, "Fetched msg is null."); - CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client); + CacheMessageRecord cacheRecord = new CacheMessageRecord(messageRecord, client, + CommonPropertiesHolder.getAckPolicy()); for (InLongMessage inLongMessage : messageRecord.getMsgs()) { - final SubscribeFetchResult result = SubscribeFetchResult.Factory - .create(sortTaskName, messageRecord.getMsgKey(), messageRecord.getOffset(), - inLongMessage.getParams(), messageRecord.getRecTime(), - inLongMessage.getBody()); - final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), - cacheRecord); + final ProfileEvent profileEvent = new ProfileEvent(inLongMessage, cacheRecord); channelProcessor.processEvent(profileEvent); context.reportToMetric(profileEvent, sortTaskName, "-", SortSdkSourceContext.FetchResult.SUCCESS); } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java index 9e8a6979244..278fe10e038 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/cls/TestDefaultEvent2LogItemHandler.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.standalone.sink.cls; import com.tencentcloudapi.cls.producer.common.LogItem; + import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.utils.Constants; import org.junit.Assert; @@ -85,7 +86,7 @@ private ProfileEvent prepareEvent() { headers.put(Constants.INLONG_GROUP_ID, "testGroup"); headers.put(Constants.INLONG_STREAM_ID, "testStream"); headers.put(Constants.HEADER_KEY_MSG_TIME, "1234456"); - return new ProfileEvent(body, headers, null); + return new ProfileEvent(headers, body); } } \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java index 732f78484ea..7e76dd9f016 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java @@ -86,7 +86,7 @@ public static ProfileEvent mockProfileEvent(String inlongGroupId, String inlongS headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(System.currentTimeMillis())); headers.put(Constants.HEADER_KEY_SOURCE_IP, "127.0.0.1"); byte[] body = content.getBytes(Charset.defaultCharset()); - return new ProfileEvent(body, headers, null); + return new ProfileEvent(headers, body); } /**