From 364bdc5a2417126a30e39d7de8dd38d7532e0c31 Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Tue, 30 Apr 2024 09:06:40 +0800 Subject: [PATCH] [INLONG-10111][DataProxy] Add auditVersion field processing (#10112) Co-authored-by: gosonzhang --- .../dataproxy/metrics/audit/AuditUtils.java | 32 ++++++++++++++++--- .../source/httpMsg/HttpMessageHandler.java | 8 +++++ .../dataproxy/source/v0msg/AbsV0MsgCodec.java | 2 ++ .../dataproxy/source/v0msg/CodecBinMsg.java | 3 ++ .../dataproxy/source/v0msg/CodecTextMsg.java | 3 ++ 5 files changed, 44 insertions(+), 4 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java index 371a4de0c99..899aca38d4b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java @@ -26,11 +26,14 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.utils.Constants; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Event; import java.util.Map; +import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; + /** * Audit utils */ @@ -72,15 +75,17 @@ public static void add(int auditID, Event event) { if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) { msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY)); } - AuditOperator.getInstance().add(auditID, inlongGroupId, - inlongStreamId, logTime, msgCount, event.getBody().length); + long auditVersion = getAuditVersion(headers); + AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG, + inlongGroupId, inlongStreamId, logTime, msgCount, event.getBody().length, auditVersion); } else { String groupId = headers.get(AttributeConstants.GROUP_ID); String streamId = headers.get(AttributeConstants.STREAM_ID); long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME)); long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY)); - AuditOperator.getInstance().add(auditID, groupId, - streamId, dataTime, msgCount, event.getBody().length); + long auditVersion = getAuditVersion(headers); + AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG, + groupId, streamId, dataTime, msgCount, event.getBody().length, auditVersion); } } @@ -119,6 +124,25 @@ public static long getAuditFormatTime(long msgTime) { return msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs(); } + /** + * Get Audit version + * + * @param headers the message headers + * + * @return audit version + */ + public static long getAuditVersion(Map headers) { + String strAuditVersion = headers.get(AttributeConstants.AUDIT_VERSION); + if (StringUtils.isNotBlank(strAuditVersion)) { + try { + return Long.parseLong(strAuditVersion); + } catch (Throwable ex) { + // + } + } + return -1L; + } + /** * Send audit data */ diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java index 89c8e048571..3510b6df788 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java @@ -26,6 +26,7 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.consts.HttpAttrConst; import org.apache.inlong.dataproxy.consts.StatConstants; +import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.BaseSource; import org.apache.inlong.dataproxy.utils.AddressUtils; import org.apache.inlong.sdk.commons.protocol.EventConstants; @@ -335,6 +336,8 @@ private void processMessage(ChannelHandlerContext ctx, Map reqAt // get message count int intMsgCnt = NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1); String strMsgCount = String.valueOf(intMsgCnt); + // get audit version + long auditVersion = AuditUtils.getAuditVersion(reqAttrs); // build message attributes InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed()); strBuff.append("groupId=").append(groupId) @@ -345,6 +348,10 @@ private void processMessage(ChannelHandlerContext ctx, Map reqAt .append("&rt=").append(msgRcvTime) .append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME) .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime); + if (auditVersion != -1L) { + strBuff.append(AttributeConstants.SEPARATOR).append(AttributeConstants.AUDIT_VERSION) + .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(auditVersion); + } inLongMsg.addMsg(strBuff.toString(), body.getBytes(HttpAttrConst.VAL_DEF_CHARSET)); byte[] inlongMsgData = inLongMsg.buildArray(); long pkgTime = inLongMsg.getCreatetime(); @@ -365,6 +372,7 @@ private void processMessage(ChannelHandlerContext ctx, Map reqAt MessageWrapType.INLONG_MSG_V0.getStrId()); eventHeaders.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); eventHeaders.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(pkgTime)); + eventHeaders.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditVersion)); Event event = EventBuilder.withBody(inlongMsgData, eventHeaders); try { source.getCachedChProcessor().processEvent(event); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java index 68e05d4dc2c..b6b5d1fdf66 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java @@ -68,6 +68,7 @@ public abstract class AbsV0MsgCodec { protected String msgProcType = "b2b"; protected boolean needResp = true; protected long msgPkgTime; + protected long auditVersion = -1L; public AbsV0MsgCodec(int totalDataLen, int msgTypeValue, long msgRcvTime, String strRemoteIP) { @@ -246,6 +247,7 @@ protected Map buildEventHeaders(BaseSource source) { headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime)); headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq)); headers.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(msgPkgTime)); + headers.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditVersion)); // add extra key-value information if (!needResp) { headers.put(AttributeConstants.MESSAGE_IS_ACK, "false"); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java index 71014d3058e..71ade878722 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java @@ -24,6 +24,7 @@ import org.apache.inlong.dataproxy.base.SinkRspEvent; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.StatConstants; +import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.BaseSource; import io.netty.buffer.ByteBuf; @@ -157,6 +158,8 @@ public boolean validAndFillFields(BaseSource source, StringBuilder strBuff) { .append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime); attrMap.put(AttributeConstants.MSG_RPT_TIME, String.valueOf(msgRcvTime)); } + // get audit version + this.auditVersion = AuditUtils.getAuditVersion(this.attrMap); // get trace requirement if (this.needTraceMsg) { if (strBuff.length() > 0) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java index 1b06d7e2f05..fc83805900b 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java @@ -23,6 +23,7 @@ import org.apache.inlong.common.msg.MsgType; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.StatConstants; +import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.source.BaseSource; import io.netty.buffer.ByteBuf; @@ -188,6 +189,8 @@ public boolean validAndFillFields(BaseSource source, StringBuilder strBuff) { attrMap.put(AttributeConstants.DATA_TIME, String.valueOf(this.dataTimeMs)); } } + // get audit version + this.auditVersion = AuditUtils.getAuditVersion(this.attrMap); // process sequence id String sequenceId = attrMap.get(AttributeConstants.SEQUENCE_ID); if (StringUtils.isNotBlank(sequenceId)) {