Skip to content

Commit

Permalink
Audit-service add codes of source
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi committed Apr 8, 2024
1 parent da856a4 commit 313da9a
Show file tree
Hide file tree
Showing 93 changed files with 680 additions and 451 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
| [INLONG-9467](https://github.com/apache/inlong/issues/9467) | [Improve][Agent] Improve code exception detection to ensure task and instance state transitions |
| [INLONG-9454](https://github.com/apache/inlong/issues/9454) | [Improve][Agent] Increase exit conditions to prevent dead loops |
| [INLONG-9556](https://github.com/apache/inlong/issues/9556) | [Improve][Agent] Prevent thread freeze caused by deleting data sources when the backend cannot send out |
| [INLONG-9572](https://github.com/apache/inlong/issues/9572) | [Improve][Agent] Set data time of message cache by sink data time |
| [INLONG-9572](https://github.com/apache/inlong/issues/9572) | [Improve][Agent] Set data time of message channel by sink data time |
| [INLONG-9548](https://github.com/apache/inlong/issues/9548) | [Improve][Agent] Supports HTTPS and can determine whether to enable it through configuration |
| [INLONG-9600](https://github.com/apache/inlong/issues/9600) | [Improve][Agent]Adjust the sinks directory for code consistency |

Expand Down
6 changes: 6 additions & 0 deletions inlong-audit/audit-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${HikariCP.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
37 changes: 0 additions & 37 deletions inlong-audit/audit-service/src/main/java/config/SqlConstants.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,25 @@
* limitations under the License.
*/

package channel;
package org.apache.inlong.audit.channel;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.inlong.audit.entities.StatData;

import entities.StatData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package config;
package org.apache.inlong.audit.config;

/**
* Config constants
Expand Down Expand Up @@ -112,6 +112,15 @@ public class ConfigConstants {
public static final String PREP_STMT_CACHE_SIZE = "prepStmtCacheSize";
public static final String PREP_STMT_CACHE_SQL_LIMIT = "prepStmtCacheSqlLimit";

public static final String KEY_CACHE_PREP_STMTS = "cache.prep.stmts";
public static final boolean DEFAULT_CACHE_PREP_STMTS = true;

public static final String KEY_PREP_STMT_CACHE_SIZE = "prep.stmt.cache.size";
public static final int DEFAULT_PREP_STMT_CACHE_SIZE = 250;

public static final String KEY_PREP_STMT_CACHE_SQL_LIMIT = "prep.stmt.cache.sql.limit";
public static final int DEFAULT_PREP_STMT_CACHE_SQL_LIMIT = 2048;

public static final int MAX_INIT_COUNT = 2;
public static final int RANDOM_BOUND = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package config;
package org.apache.inlong.audit.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,6 +72,11 @@ public String get(String key, String defaultValue) {
return value == null ? defaultValue : value.toString();
}

public boolean get(String key, boolean defaultValue) {
Object value = properties.get(key);
return value == null ? defaultValue : (Boolean) value;
}

/**
* @param key
* @param defaultValue
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.config;

/**
* Sql constants
*/
public class SqlConstants {

// HA selector sql
public static final String SELECTOR_SQL =
"insert ignore into {0} (service_id, leader_id, last_seen_active) values (''{1}'', ''{2}'', now()) on duplicate key update leader_id = if(last_seen_active < now() - interval # second, values(leader_id), leader_id),last_seen_active = if(leader_id = values(leader_id), values(last_seen_active), last_seen_active)";
public static final String REPLACE_LEADER_SQL =
"replace into {0} ( service_id, leader_id, last_seen_active ) values (''{1}'', ''#'', now())";
public static final String RELEASE_SQL = "delete from {0} where service_id=''{1}'' and leader_id= ''{2}''";
public static final String IS_LEADER_SQL =
"select count(*) as is_leader from {0} where service_id=''{1}'' and leader_id=''{2}''";
public static final String SEARCH_CURRENT_LEADER_SQL =
"select leader_id as leader from {0} where service_id=''{1}''";
public static final String SELECT_TEST_SQL = "SELECT 1 ";

// ClickHouse query sql
public static final String KEY_CLICKHOUSE_SOURCE_QUERY_SQL = "clickhouse.source.query.sql";
public static final String DEFAULT_CLICKHOUSE_SOURCE_QUERY_SQL =
"SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
" , sum(cnt) AS cnt, sum(size) AS size\n" +
" , sum(delay) AS delay\n" +
"FROM (\n" +
" SELECT max(audit_version), ip, docker_id, thread_id\n" +
" , inlong_group_id, inlong_stream_id, audit_id, audit_tag, cnt\n" +
" , size, delay\n" +
" FROM (\n" +
" SELECT audit_version, ip, docker_id, thread_id, inlong_group_id\n" +
" , inlong_stream_id, audit_id, audit_tag, sum(count) AS cnt\n" +
" , sum(size) AS size, sum(delay) AS delay\n" +
" FROM (\n" +
" SELECT audit_version, docker_id, thread_id, sdk_ts, packet_id\n" +
" , log_ts, ip, inlong_group_id, inlong_stream_id, audit_id\n" +
" , audit_tag, count, size, delay\n" +
" FROM audit_data \n" +
" WHERE log_ts BETWEEN ? AND ? \n" +
" AND audit_id = ? \n" +
" GROUP BY audit_version, docker_id, thread_id, sdk_ts, packet_id, log_ts, ip, inlong_group_id, inlong_stream_id, audit_id, audit_tag, count, size, delay\n"
+
" ) t1\n" +
" GROUP BY audit_version, ip, docker_id, thread_id, inlong_group_id, inlong_stream_id, audit_id, audit_tag\n"
+
" ) t2\n" +
" GROUP BY ip, docker_id, thread_id, inlong_group_id, inlong_stream_id, audit_id, audit_tag, cnt, size, delay\n"
+
") t3\n" +
"GROUP BY inlong_group_id, inlong_stream_id, audit_id, audit_tag";

// Mysql query sql
public static final String KEY_MYSQL_SOURCE_QUERY_TEMP_SQL = "mysql.query.temp.sql";
public static final String DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL =
"SELECT inlong_group_id, inlong_stream_id, audit_id, audit_tag\n" +
", sum(count) AS cnt, sum(size) AS size\n" +
", sum(delay) AS delay\n" +
"FROM audit_data_temp\n" +
"WHERE log_ts BETWEEN ? AND ? \n" +
"AND audit_id = ? \n" +
"GROUP BY inlong_group_id, inlong_stream_id, audit_id, audit_tag";

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package elector;
package org.apache.inlong.audit.elector;

import org.apache.inlong.audit.elector.api.SelectorChangeListener;

import elector.api.SelectorChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package elector.api;
package org.apache.inlong.audit.elector.api;

/**
* Selector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package elector.api;
package org.apache.inlong.audit.elector.api;

/**
* Selector change listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package elector.api;
package org.apache.inlong.audit.elector.api;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package elector.api;
package org.apache.inlong.audit.elector.api;

import elector.impl.SelectorImpl;
import org.apache.inlong.audit.elector.impl.SelectorImpl;

/**
* Selector factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* limitations under the License.
*/

package elector.impl;
package org.apache.inlong.audit.elector.impl;

import org.apache.inlong.audit.config.ConfigConstants;
import org.apache.inlong.audit.config.SqlConstants;
import org.apache.inlong.audit.elector.api.SelectorConfig;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import elector.api.SelectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,28 +32,17 @@
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;

import static config.ConfigConstants.CACHE_PREP_STMTS;
import static config.ConfigConstants.MAX_INIT_COUNT;
import static config.ConfigConstants.PREP_STMT_CACHE_SIZE;
import static config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
import static config.SqlConstants.IS_LEADER_SQL;
import static config.SqlConstants.RELEASE_SQL;
import static config.SqlConstants.REPLACE_LEADER_SQL;
import static config.SqlConstants.SEARCH_CURRENT_LEADER_SQL;
import static config.SqlConstants.SELECTOR_SQL;
import static config.SqlConstants.SELECT_TEST_SQL;

/**
* DB data source
*/
public class DBDataSource {

private static final Logger logger = LoggerFactory.getLogger(DBDataSource.class);
private String selectorSql = SELECTOR_SQL;
private String replaceLeaderSql = REPLACE_LEADER_SQL;
private String reLeaseSql = RELEASE_SQL;
private String isLeaderSql = IS_LEADER_SQL;
private String searchCurrentLeaderSql = SEARCH_CURRENT_LEADER_SQL;
private String selectorSql = SqlConstants.SELECTOR_SQL;
private String replaceLeaderSql = SqlConstants.REPLACE_LEADER_SQL;
private String reLeaseSql = SqlConstants.RELEASE_SQL;
private String isLeaderSql = SqlConstants.IS_LEADER_SQL;
private String searchCurrentLeaderSql = SqlConstants.SEARCH_CURRENT_LEADER_SQL;
private final SelectorConfig selectorConfig;
private HikariDataSource datasource;
public AtomicInteger getConnectionFailTimes;
Expand Down Expand Up @@ -90,7 +82,7 @@ public void initDataSource() throws Exception {
boolean initSucc = false;
int initCount = 0;

while (!initSucc && initCount < MAX_INIT_COUNT) {
while (!initSucc && initCount < ConfigConstants.MAX_INIT_COUNT) {
try {
++initCount;
if (datasource == null || datasource.isClosed()) {
Expand All @@ -104,11 +96,12 @@ public void initDataSource() throws Exception {
config.setAutoCommit(true);
config.setConnectionTimeout((long) selectorConfig.getConnectionTimeout());
config.setMaxLifetime((long) selectorConfig.getMaxLifetime());
config.addDataSourceProperty(CACHE_PREP_STMTS, selectorConfig.getCachePrepStmts());
config.addDataSourceProperty(PREP_STMT_CACHE_SIZE, selectorConfig.getPrepStmtCacheSize());
config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
config.addDataSourceProperty(ConfigConstants.CACHE_PREP_STMTS, selectorConfig.getCachePrepStmts());
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SIZE,
selectorConfig.getPrepStmtCacheSize());
config.addDataSourceProperty(ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT,
selectorConfig.getPrepStmtCacheSqlLimit());
config.setConnectionTestQuery(SELECT_TEST_SQL);
config.setConnectionTestQuery(SqlConstants.SELECT_TEST_SQL);
datasource = new HikariDataSource(config);
}

Expand Down
Loading

0 comments on commit 313da9a

Please sign in to comment.