Skip to content

Commit

Permalink
[INLONG-10132][Audit] Clean up ElasticSearch and ClickHouse related c…
Browse files Browse the repository at this point in the history
…ode of audit-store (apache#10134)
  • Loading branch information
doleyzi authored May 8, 2024
1 parent 3c703dd commit aa674dc
Show file tree
Hide file tree
Showing 28 changed files with 64 additions and 1,572 deletions.
27 changes: 5 additions & 22 deletions inlong-audit/audit-docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,11 @@ ENV TUBE_AUDIT_TOPIC="inlong-audit"
ENV AUDIT_DBNAME="apache_inlong_audit"
# proxy/store/all, start audit module individually, or all
ENV START_MODE="all"
# mysql / clickhouse / elasticsearch / starrocks
ENV STORE_MODE=mysql
# mysql
ENV JDBC_URL=127.0.0.1:3306
ENV USERNAME=root
ENV PASSWORD=inlong
# clickhouse
ENV STORE_CK_URL=127.0.0.1:8123
ENV STORE_CK_USERNAME=default
ENV STORE_CK_PASSWD=default
ENV STORE_CK_DBNAME="apache_inlong_audit"
# elasticsearch
ENV STORE_ES_HOST=127.0.0.1
ENV STORE_ES_PORT=9200
ENV STORE_ES_AUTHENABLE=false
ENV STORE_ES_USERNAME=elastic
ENV STORE_ES_PASSWD=inlong
# starrocks
ENV STORE_SR_URL=127.0.0.1:9030
ENV STORE_SR_USERNAME=default
ENV STORE_SR_PASSWD=default
ENV STORE_SR_DBNAME="apache_inlong_audit"
# MySQL / StarRocks
ENV AUDIT_JDBC_URL=127.0.0.1:3306
ENV AUDIT_JDBC_USERNAME=root
ENV AUDIT_JDBC_PASSWORD=inlong

# jvm
ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy"
WORKDIR /opt/inlong-audit
Expand Down
61 changes: 21 additions & 40 deletions inlong-audit/audit-docker/audit-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
#

file_path=$(cd "$(dirname "$0")"/../;pwd)
# store config
store_conf_file=${file_path}/conf/application.properties

#SQL file
sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql

# proxy config
proxy_conf_file=${file_path}/conf/audit-proxy-${MQ_TYPE}.conf
sql_mysql_file="${file_path}"/sql/apache_inlong_audit_mysql.sql
sql_ck_file="${file_path}"/sql/apache_inlong_audit_clickhouse.sql
sql_sr_file="${file_path}"/sql/apache_inlong_audit_starrocks.sql

# store config
store_conf_file=${file_path}/conf/application.properties

# audit-service config
service_conf_file=${file_path}/conf/audit-service.properties

# replace the configuration for audit proxy
# replace the configuration for audit-proxy
sed -i "s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g" "${store_conf_file}"
sed -i "s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${store_conf_file}"
if [ "${MQ_TYPE}" = "pulsar" ]; then
Expand All @@ -47,51 +49,29 @@ if [ "${MQ_TYPE}" = "tubemq" ]; then
sed -i "s/agent1.sinks.tube-sink-msg2.topic = .*$/agent1.sinks.tube-sink-msg2.topic = ${TUBE_AUDIT_TOPIC}/g" "${proxy_conf_file}"
fi

# replace the configuration for audit store
if [ -n "${STORE_MODE}" ]; then
sed -i "s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" "${store_conf_file}"
fi
# DB
sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}"
sed -i "s/spring.datasource.druid.username=.*$/spring.datasource.druid.username=${USERNAME}/g" "${store_conf_file}"
sed -i "s/spring.datasource.druid.password=.*$/spring.datasource.druid.password=${PASSWORD}/g" "${store_conf_file}"
# mysql file for audit
# replace the audit db name for audit sql file
sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_mysql_file}"
# clickhouse
sed -i "s/clickhouse.url=.*$/clickhouse.url=jdbc:clickhouse:\/\/${STORE_CK_URL}\/${STORE_CK_DBNAME}/g" "${store_conf_file}"
sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" "${store_conf_file}"
sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g" "${store_conf_file}"
# mysql file for clickhouse
sed -i "s/apache_inlong_audit/${STORE_CK_DBNAME}/g" "${sql_ck_file}"
# elasticsearch
sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g" "${store_conf_file}"
sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g" "${store_conf_file}"
sed -i "s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHENABLE}/g" "${store_conf_file}"
sed -i "s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" "${store_conf_file}"
sed -i "s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" "${store_conf_file}"

# StarRocks SQL file for audit
sed -i "s/apache_inlong_audit/${AUDIT_DBNAME}/g" "${sql_sr_file}"
# StarRocks
sed -i "s/jdbc.url=.*$/jdbc.url=jdbc:mysql:\/\/${STORE_SR_URL}\/${STORE_SR_DBNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.username=.*$/jdbc.username=${STORE_SR_USERNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.password=.*$/jdbc.password=${STORE_SR_PASSWD}/g" "${store_conf_file}"
# replace the configuration for audit-store
sed -i "s/127.0.0.1:3306\/apache_inlong_audit/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.username=.*$/jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${store_conf_file}"
sed -i "s/jdbc.password=.*$/jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${store_conf_file}"

# audit-service config
sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${USERNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${PASSWORD}/g" "${service_conf_file}"
# replace the configuration for audit-service
sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${service_conf_file}"
sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${service_conf_file}"

# Whether the database table exists. If it does not exist, initialize the database and skip if it exists.
if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then
if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then
datasource_hostname=${BASH_REMATCH[1]}
datasource_port=${BASH_REMATCH[2]}

select_db_sql="SELECT COUNT(*) FROM information_schema.TABLES WHERE table_schema = 'apache_inlong_audit'"
inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME} -p${PASSWORD} -e "${select_db_sql}")
inlong_audit_count=$(mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} -e "${select_db_sql}")
inlong_num=$(echo "$inlong_audit_count" | tr -cd "[0-9]")
if [ "${inlong_num}" = 0 ]; then
mysql -h${datasource_hostname} -P${datasource_port} -u${USERNAME} -p${PASSWORD} < sql/apache_inlong_audit_mysql.sql
mysql -h${datasource_hostname} -P${datasource_port} -u${AUDIT_JDBC_USERNAME} -p${AUDIT_JDBC_PASSWORD} < sql/apache_inlong_audit_mysql.sql
fi
fi

Expand All @@ -108,6 +88,7 @@ if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "proxy" ]; then
bash +x ./bin/proxy-start.sh tubemq
fi
fi

# start store
if [ "${START_MODE}" = "all" ] || [ "${START_MODE}" = "store" ]; then
bash +x ./bin/store-start.sh
Expand Down
6 changes: 0 additions & 6 deletions inlong-audit/audit-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
Expand All @@ -48,6 +51,8 @@
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
Expand All @@ -68,8 +73,12 @@ public class RealTimeQuery {
private final String queryLogTsSql;
private final String queryIdsByIpSql;
private final String queryReportIpsSql;
private final ExecutorService executor =
Executors.newFixedThreadPool(
Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, DEFAULT_API_THREAD_POOL_SIZE));

private RealTimeQuery() {

List<JdbcConfig> jdbcConfigList = ConfigService.getInstance().getAllAuditSource();
for (JdbcConfig jdbcConfig : jdbcConfigList) {
BasicDataSource dataSource = new BasicDataSource();
Expand Down Expand Up @@ -126,7 +135,7 @@ public static RealTimeQuery getInstance() {
public List<StatData> queryLogTs(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
List<StatData> statDataList = new LinkedList<>();
List<StatData> statDataList = new CopyOnWriteArrayList<>();
if (dataSourceList.isEmpty()) {
return statDataList;
}
Expand All @@ -136,11 +145,11 @@ public List<StatData> queryLogTs(String startTime, String endTime, String inlong
List<StatData> statDataListTemp =
doQueryLogTs(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId);
statDataList.addAll(statDataListTemp);
});
}, executor);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId,
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {} ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() - currentTime);
return filterMaxAuditVersion(statDataList);
}
Expand All @@ -165,8 +174,7 @@ public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
for (Map.Entry<String, List<StatData>> entry : allData.entrySet()) {
long maxAuditVersion = Long.MIN_VALUE;
for (StatData maxData : entry.getValue()) {
maxAuditVersion =
maxData.getAuditVersion() > maxAuditVersion ? maxData.getAuditVersion() : maxAuditVersion;
maxAuditVersion = Math.max(maxData.getAuditVersion(), maxAuditVersion);
}
for (StatData statData : entry.getValue()) {
if (statData.getAuditVersion() == maxAuditVersion) {
Expand All @@ -191,6 +199,7 @@ public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
*/
private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
List<StatData> result = new LinkedList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement pstat = connection.prepareStatement(queryLogTsSql)) {
Expand Down Expand Up @@ -219,6 +228,8 @@ private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, Str
} catch (Exception exception) {
LOGGER.error("Query log time has exception!, datasource={} ", dataSource, exception);
}
LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() - currentTime);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class OpenApiConstants {
public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;

public static final String KEY_API_CACHE_EXPIRED_HOURS = "api.cache.expired.hours";
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 18;

// Http config
public static final String PARAMS_START_TIME = "startTime";
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit aa674dc

Please sign in to comment.