Skip to content

Commit

Permalink
[INLONG-4777][SortStandalone] Change ack policy from checking message…
Browse files Browse the repository at this point in the history
… count to checking every message (apache#4778)
  • Loading branch information
luchunliang authored and vernedeng committed Jul 4, 2022
1 parent 395a4b9 commit 9a136a6
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.config.holder;

/**
* AckPolicy
*
*/
public enum AckPolicy {

COUNT(0), TOKEN(1);

private final int value;

/**
* Constructor
* @param value
*/
private AckPolicy(int value) {
this.value = value;
}

/**
* getValue
* @return int
*/
public int getValue() {
return value;
}

/**
* getAckPolicy
* @param value
* @return AckPolicy
*/
public static AckPolicy getAckPolicy(int value) {
switch (value) {
case 0 :
return COUNT;
case 1 :
return TOKEN;
default :
return COUNT;
}
}

/**
* getAckPolicy
* @param name
* @return AckPolicy
*/
public static AckPolicy getAckPolicy(String name) {
if (AckPolicy.COUNT.name().equalsIgnoreCase(name)) {
return AckPolicy.COUNT;
} else if (AckPolicy.TOKEN.name().equalsIgnoreCase(name)) {
return AckPolicy.TOKEN;
} else {
return AckPolicy.COUNT;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.inlong.sort.standalone.config.holder;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Context;
Expand All @@ -28,6 +25,9 @@
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
*
* CommonPropertiesHolder
Expand All @@ -38,11 +38,13 @@ public class CommonPropertiesHolder {
public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
public static final String KEY_CLUSTER_ID = "clusterId";
public static final String KEY_SORT_SOURCE_ACKPOLICY = "sortSource.ackPolicy";

private static Map<String, String> props;
private static Context context;

private static long auditFormatInterval = 60000L;
private static AckPolicy ackPolicy;

/**
* init
Expand All @@ -60,8 +62,11 @@ private static void init() {
CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
props.putAll(loader.load());
LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
auditFormatInterval = NumberUtils
CommonPropertiesHolder.auditFormatInterval = NumberUtils
.toLong(CommonPropertiesHolder.getString("auditFormatInterval"), 60000L);
String strAckPolicy = CommonPropertiesHolder.getString(KEY_SORT_SOURCE_ACKPOLICY,
AckPolicy.COUNT.name());
CommonPropertiesHolder.ackPolicy = AckPolicy.getAckPolicy(strAckPolicy);
}
} catch (Throwable t) {
LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
Expand Down Expand Up @@ -205,4 +210,12 @@ public static long getAuditFormatInterval() {
return auditFormatInterval;
}

/**
* get ackPolicy
* @return the ackPolicy
*/
public static AckPolicy getAckPolicy() {
return ackPolicy;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void put(Event event) throws ChannelException {
ProfileEvent profile = (ProfileEvent) event;
transaction.doPut(profile);
} else {
ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders(), null);
ProfileEvent profile = new ProfileEvent(event.getHeaders(), event.getBody());
transaction.doPut(profile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.inlong.sort.standalone.config.holder.AckPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -35,24 +38,56 @@ public class CacheMessageRecord {
private final String msgKey;
private final String offset;
private final AtomicInteger ackCount;
private final AckPolicy ackPolicy;
private Set<Integer> tokenSet;

/**
* Constructor
*
* @param msgRecord
* @param client
*/
public CacheMessageRecord(MessageRecord msgRecord, SortClient client) {
public CacheMessageRecord(MessageRecord msgRecord, SortClient client, AckPolicy ackPolicy) {
this.msgKey = msgRecord.getMsgKey();
this.offset = msgRecord.getOffset();
this.ackCount = new AtomicInteger(msgRecord.getMsgs().size());
this.client = client;
this.ackPolicy = ackPolicy;
if (AckPolicy.TOKEN.equals(ackPolicy)) {
this.tokenSet = new HashSet<>();
for (int i = 0; i < msgRecord.getMsgs().size(); i++) {
this.tokenSet.add(i);
}
}
}

/**
* getToken
* @return
*/
public Integer getToken() {
if (AckPolicy.TOKEN.equals(ackPolicy)) {
return this.ackCount.decrementAndGet();
}
return 0;
}

/**
* ackMessage
* @param ackToken ackToken
*/
public void ackMessage(int ackToken) {
if (AckPolicy.TOKEN.equals(ackPolicy)) {
this.ackMessageByToken(ackToken);
return;
}
this.ackMessageByCount();
}

/**
* ackMessageByCount
*/
public void ackMessage() {
private void ackMessageByCount() {
int result = this.ackCount.decrementAndGet();
if (result == 0 && client != null) {
try {
Expand All @@ -62,4 +97,20 @@ public void ackMessage() {
}
}
}

/**
* ackMessageByToken
* @param ackToken ackToken
*/
private void ackMessageByToken(int ackToken) {
this.tokenSet.remove(ackToken);
int result = this.tokenSet.size();
if (result == 0 && client != null) {
try {
client.ack(msgKey, offset);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.event.SimpleEvent;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.Constants;

Expand All @@ -35,25 +36,45 @@ public class ProfileEvent extends SimpleEvent {
private final String uid;

private final long rawLogTime;
private final String sourceIp;
private final long fetchTime;
private final CacheMessageRecord cacheRecord;
private CacheMessageRecord cacheRecord;
private final int ackToken;

/**
* Constructor
*
* @param body
* @param headers
* @param cacheRecord
* @param body
*/
public ProfileEvent(byte[] body, Map<String, String> headers, CacheMessageRecord cacheRecord) {
super.setBody(body);
public ProfileEvent(Map<String, String> headers, byte[] body) {
super.setHeaders(headers);
this.cacheRecord = cacheRecord;
super.setBody(body);
this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.fetchTime = System.currentTimeMillis();
this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime);
this.sourceIp = headers.get(Constants.HEADER_KEY_SOURCE_IP);
this.ackToken = 0;
}

/**
* Constructor
*
* @param sdkMessage
* @param cacheRecord
*/
public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord cacheRecord) {
super.setHeaders(sdkMessage.getParams());
super.setBody(sdkMessage.getBody());
this.inlongGroupId = sdkMessage.getInlongGroupId();
this.inlongStreamId = sdkMessage.getInlongStreamId();
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.rawLogTime = sdkMessage.getMsgTime();
this.sourceIp = sdkMessage.getSourceIp();
this.cacheRecord = cacheRecord;
this.fetchTime = System.currentTimeMillis();
this.ackToken = cacheRecord.getToken();
}

/**
Expand Down Expand Up @@ -83,6 +104,14 @@ public long getRawLogTime() {
return rawLogTime;
}

/**
* get sourceIp
* @return the sourceIp
*/
public String getSourceIp() {
return sourceIp;
}

/**
* get fetchTime
*
Expand Down Expand Up @@ -115,7 +144,7 @@ public CacheMessageRecord getCacheRecord() {
*/
public void ack() {
if (cacheRecord != null) {
cacheRecord.ackMessage();
cacheRecord.ackMessage(ackToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.tencentcloudapi.cls.producer.util.NetworkUtils;

import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
Expand Down Expand Up @@ -268,25 +267,22 @@ private void removeExpireClient(String secretId) {
public void addSendResultMetric(ProfileEvent currentRecord, String bid, boolean result, long sendTime) {
Map<String, String> dimensions = this.getDimensions(currentRecord, bid);
SortMetricItem metricItem = this.getMetricItemSet().findMetricItem(dimensions);
long count = 1;
long size = currentRecord.getBody().length;
if (result) {
metricItem.sendSuccessCount.addAndGet(count);
metricItem.sendSuccessSize.addAndGet(size);
metricItem.sendSuccessCount.incrementAndGet();
metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
AuditUtils.add(AuditUtils.AUDIT_ID_SEND_SUCCESS, currentRecord);
if (sendTime > 0) {
long currentTime = System.currentTimeMillis();
final long currentTime = System.currentTimeMillis();
long sinkDuration = currentTime - sendTime;
long nodeDuration = currentTime
- NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, currentRecord.getRawLogTime());
long nodeDuration = currentTime - currentRecord.getFetchTime();
long wholeDuration = currentTime - currentRecord.getRawLogTime();
metricItem.sinkDuration.addAndGet(sinkDuration * count);
metricItem.nodeDuration.addAndGet(nodeDuration * count);
metricItem.wholeDuration.addAndGet(wholeDuration * count);
metricItem.sinkDuration.addAndGet(sinkDuration);
metricItem.nodeDuration.addAndGet(nodeDuration);
metricItem.wholeDuration.addAndGet(wholeDuration);
}
} else {
metricItem.sendFailCount.addAndGet(count);
metricItem.sendFailSize.addAndGet(size);
metricItem.sendFailCount.incrementAndGet();
metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
}
}

Expand Down
Loading

0 comments on commit 9a136a6

Please sign in to comment.