Skip to content

Commit

Permalink
Audit-store support the feature of audit-version
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi committed Apr 15, 2024
1 parent 0fee4a1 commit fc098b9
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,14 @@ public void setAuditConfig(AuditConfig config) {
* Add audit data
*/
public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size);
add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size, DEFAULT_AUDIT_VERSION);
}

public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
long count, long size) {
long count, long size, long auditVersion) {
long delayTime = System.currentTimeMillis() - logTime;
add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count, size,
delayTime * count, DEFAULT_AUDIT_VERSION);
delayTime * count, auditVersion);
}

public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ public int getDataMapSize() {
* processing return package
*/
public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) {
if (null == msg) {
return;
}
try {
// Analyze abnormal events
AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(msg);
Expand All @@ -276,16 +279,16 @@ public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) {
}
return;
}
// check resp
LOG.debug("Audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode());
// Check audit-proxy response code
LOG.info("Audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode());
if (AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode())) {
this.dataMap.remove(requestId);
return;
}
LOG.error("Audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode());
LOG.error("Audit-proxy has error response! code={},message={}",
baseCommand.getAuditReply().getRspCode(), baseCommand.getAuditReply().getMessage());

int resendTimes = data.increaseResendTimes();
if (resendTimes < SenderGroup.MAX_SEND_TIMES) {
if (data.increaseResendTimes() < SenderGroup.MAX_SEND_TIMES) {
this.sendData(data.getDataByte());
}
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class AuditDataPo {
private String inlongStreamId;
private String auditId;
private String auditTag;
private long auditVersion;
private Long count;
private Long size;
private Long delay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ClickHouseDataPo {
private String inlongStreamId;
private String auditId;
private String auditTag;
private long auditVersion;
private Long count;
private Long size;
private Long delay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ESDataPo {
private String inlongStreamId;
private String auditId;
private String auditTag;
private long auditVersion;
private long count;
private long size;
private long delay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class ClickHouseService implements InsertData, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseService.class);
public static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id,\r\n"
+ " sdk_ts, packet_id, log_ts,\r\n"
+ " inlong_group_id, inlong_stream_id, audit_id,audit_tag,\r\n"
+ " inlong_group_id, inlong_stream_id, audit_id,audit_tag,audit_version, \r\n"
+ " count, size, delay, \r\n"
+ " update_time)\r\n"
+ " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

private ClickHouseConfig chConfig;

Expand Down Expand Up @@ -111,10 +111,11 @@ private void processOutput() {
pstat.setString(8, data.getInlongStreamId());
pstat.setString(9, data.getAuditId());
pstat.setString(10, data.getAuditTag());
pstat.setLong(11, data.getCount());
pstat.setLong(12, data.getSize());
pstat.setLong(13, data.getDelay());
pstat.setTimestamp(14, data.getUpdateTime());
pstat.setLong(11, data.getAuditVersion());
pstat.setLong(12, data.getCount());
pstat.setLong(13, data.getSize());
pstat.setLong(14, data.getDelay());
pstat.setTimestamp(15, data.getUpdateTime());
pstat.addBatch();
this.batchCounter.decrementAndGet();
if (++counter >= chConfig.getBatchThreshold()) {
Expand Down Expand Up @@ -175,6 +176,7 @@ public void insert(AuditData msgBody) {
data.setLogTs(new Timestamp(msgBody.getLogTs()));
data.setAuditId(msgBody.getAuditId());
data.setAuditTag(msgBody.getAuditTag());
data.setAuditVersion(msgBody.getAuditVersion());
data.setCount(msgBody.getCount());
data.setDelay(msgBody.getDelay());
data.setInlongGroupId(msgBody.getInlongGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ protected XContentBuilder generateBuilder() throws IOException {
}
builder.endObject();
}
{
builder.startObject("audit_version");
{
builder.field("type", "keyword");
}
builder.endObject();
}
{
builder.startObject("inlong_group_id");
{
Expand Down Expand Up @@ -338,6 +345,7 @@ public void insert(AuditData msgBody) {
esPo.setLogTs(new Date(msgBody.getLogTs()));
esPo.setAuditId(msgBody.getAuditId());
esPo.setAuditTag(msgBody.getAuditTag());
esPo.setAuditVersion(msgBody.getAuditVersion());
esPo.setCount(msgBody.getCount());
esPo.setDelay(msgBody.getDelay());
esPo.setInlongGroupId(msgBody.getInlongGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void insert(AuditData msgBody) {
po.setLogTs(new Date(msgBody.getLogTs()));
po.setAuditId(msgBody.getAuditId());
po.setAuditTag(msgBody.getAuditTag());
po.setAuditVersion(msgBody.getAuditVersion());
po.setCount(msgBody.getCount());
po.setDelay(msgBody.getDelay());
po.setInlongGroupId(msgBody.getInlongGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
<result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
<result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
<result column="audit_id" jdbcType="VARCHAR" property="auditId"/>
<result column="audit_tag" jdbcType="VARCHAR" property="auditTag"/>
<result column="audit_version" jdbcType="BIGINT" property="auditVersion"/>
<result column="count" jdbcType="BIGINT" property="count"/>
<result column="size" jdbcType="BIGINT" property="size"/>
<result column="delay" jdbcType="BIGINT" property="delay"/>
Expand All @@ -38,11 +40,12 @@
<insert id="insert" parameterType="org.apache.inlong.audit.db.entities.AuditDataPo">
insert into audit_data (ip, docker_id, thread_id,
sdk_ts, packet_id, log_ts,
inlong_group_id, inlong_stream_id, audit_id,
inlong_group_id, inlong_stream_id, audit_id,audit_tag,audit_version,
`count`, size, delay)
values (#{ip,jdbcType=VARCHAR}, #{dockerId,jdbcType=VARCHAR}, #{threadId,jdbcType=VARCHAR},
#{sdkTs,jdbcType=TIMESTAMP}, #{packetId,jdbcType=BIGINT}, #{logTs,jdbcType=TIMESTAMP},
#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{auditId,jdbcType=VARCHAR},
#{auditTag,jdbcType=VARCHAR},#{audit_version,jdbcType=BIGINT},
#{count,jdbcType=BIGINT}, #{size,jdbcType=BIGINT}, #{delay,jdbcType=BIGINT})
</insert>
</mapper>

This file was deleted.

This file was deleted.

24 changes: 24 additions & 0 deletions inlong-audit/sql/audit-store/clickhouse-changes-1.12.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- When upgrading to version 1.12.0, please execute those SQLs in the clickhouse which audit used.

USE `apache_inlong_audit`;

ALTER TABLE audit_data ADD COLUMN audit_version Int64 DEFAULT -1 COMMENT 'Audit version' after `audit_tag`;


24 changes: 24 additions & 0 deletions inlong-audit/sql/audit-store/mysql-changes-1.12.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- When upgrading to version 1.12.0, please execute those SQLs in the DB (such as MySQL) which audit used.

USE `apache_inlong_audit`;

ALTER TABLE audit_data ADD COLUMN audit_version BIGINT DEFAULT -1 COMMENT 'Audit version' after `audit_tag`;


0 comments on commit fc098b9

Please sign in to comment.