Skip to content

Commit

Permalink
[INLONG-10085][Audit] Optimize the performance of the audit-service (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Apr 26, 2024
1 parent 7012648 commit a2e0f46
Show file tree
Hide file tree
Showing 15 changed files with 379 additions and 229 deletions.
6 changes: 6 additions & 0 deletions inlong-audit/audit-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.utils.CacheUtils;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT;
import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE;
import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS;
Expand All @@ -50,6 +54,9 @@ public class AbstractCache {
protected AuditCycle auditCycle;
private static final int DEFAULT_MONITOR_INTERVAL = 1;

// According to the startTime and endTime of the request parameters, the maximum number of cache keys generated.
private static final int MAX_CACHE_KEY_SIZE = 1440;

protected AbstractCache(AuditCycle auditCycle) {
cache = Caffeine.newBuilder()
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
Expand Down Expand Up @@ -77,18 +84,55 @@ public Cache<String, StatData> getCache() {
}

/**
* Get data
*
* @param key
* @param startTime
* @param endTime
* @param inlongGroupId
* @param inlongStreamId
* @param auditId
* @param auditTag
* @return
*/
public List<StatData> getData(String key) {
StatData statData = cache.getIfPresent(key);
if (null == statData) {
// Compatible with scenarios where the auditTag openapi parameter can be empty.
statData = cache.getIfPresent(key + DEFAULT_AUDIT_TAG);
public List<StatData> getData(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId, String auditTag) {
List<StatData> result = new LinkedList<>();
List<String> keyList = buildCacheKeyList(startTime, endTime, inlongGroupId,
inlongStreamId, auditId, auditTag);
for (String cacheKey : keyList) {
StatData statData = cache.getIfPresent(cacheKey);
if (null == statData) {
// Compatible with scenarios where the auditTag openapi parameter can be empty.
statData = cache.getIfPresent(cacheKey + DEFAULT_AUDIT_TAG);
}
if (null != statData) {
result.add(statData);
}
}
return result;
}

private List<String> buildCacheKeyList(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId, String auditTag) {
List<String> keyList = new LinkedList<>();
try {
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
Date startDate = dateFormat.parse(startTime);
Date endDate = dateFormat.parse(endTime);
for (int index = 0; index < MAX_CACHE_KEY_SIZE; index++) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
calendar.add(Calendar.MINUTE, index * auditCycle.getValue());
calendar.set(Calendar.SECOND, 0);
if (calendar.getTime().compareTo(endDate) > 0) {
break;
}
String time = dateFormat.format(calendar.getTime());
keyList.add(CacheUtils.buildCacheKey(time, inlongGroupId, inlongStreamId, auditId, auditTag));
}
} catch (Exception exception) {
LOGGER.error("It has exception when build cache key list!", exception);
}
return statData == null ? new LinkedList<>() : Collections.singletonList(statData);
return keyList;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.service.ConfigService;
import org.apache.inlong.audit.utils.CacheUtils;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.dbcp.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,22 +33,21 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
Expand All @@ -64,7 +63,7 @@ public class RealTimeQuery {
private static final Logger LOGGER = LoggerFactory.getLogger(RealTimeQuery.class);
private static volatile RealTimeQuery realTimeQuery = null;

private final List<DataSource> dataSourceList = new LinkedList<>();
private final List<BasicDataSource> dataSourceList = new LinkedList<>();

private final String queryLogTsSql;
private final String queryIdsByIpSql;
Expand All @@ -73,7 +72,26 @@ public class RealTimeQuery {
private RealTimeQuery() {
List<JdbcConfig> jdbcConfigList = ConfigService.getInstance().getAllAuditSource();
for (JdbcConfig jdbcConfig : jdbcConfigList) {
dataSourceList.add(createDataSource(jdbcConfig));
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(jdbcConfig.getDriverClass());
dataSource.setUrl(jdbcConfig.getJdbcUrl());
dataSource.setUsername(jdbcConfig.getUserName());
dataSource.setPassword(jdbcConfig.getPassword());
dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS,
DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS));
dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS,
DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS));
dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS,
DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS));
dataSource.setTestOnBorrow(true);
dataSource.setValidationQuery("SELECT 1");
dataSource
.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS,
DEFAULT_DATASOURCE_DETECT_INTERVAL_MS));

dataSourceList.add(dataSource);
}

queryLogTsSql = Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
Expand All @@ -95,29 +113,6 @@ public static RealTimeQuery getInstance() {
return realTimeQuery;
}

/**
* Create data source.
*/
private DataSource createDataSource(JdbcConfig jdbcConfig) {
HikariConfig config = new HikariConfig();
config.setDriverClassName(jdbcConfig.getDriverClass());
config.setJdbcUrl(jdbcConfig.getJdbcUrl());
config.setUsername(jdbcConfig.getUserName());
config.setPassword(jdbcConfig.getPassword());
config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT));
config.addDataSourceProperty(CACHE_PREP_STMTS,
Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, DEFAULT_CACHE_PREP_STMTS));
config.addDataSourceProperty(PREP_STMT_CACHE_SIZE,
Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, DEFAULT_PREP_STMT_CACHE_SIZE));
config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT,
Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, DEFAULT_PREP_STMT_CACHE_SQL_LIMIT));
config.setMaximumPoolSize(
Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE,
DEFAULT_DATASOURCE_POOL_SIZE));
return new HikariDataSource(config);
}

/**
* Query the audit data of log time.
*
Expand All @@ -130,17 +125,57 @@ private DataSource createDataSource(JdbcConfig jdbcConfig) {
*/
public List<StatData> queryLogTs(String startTime, String endTime, String inlongGroupId,
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
List<StatData> statDataList = new LinkedList<>();
if (dataSourceList.isEmpty()) {
return statDataList;
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (DataSource dataSource : dataSourceList) {
statDataList =
doQueryLogTs(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId);
if (!statDataList.isEmpty()) {
break;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
List<StatData> statDataListTemp =
doQueryLogTs(dataSource, startTime, endTime, inlongGroupId, inlongStreamId, auditId);
statDataList.addAll(statDataListTemp);
});
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
LOGGER.info("Query log ts by params: {} {} {} {} {}, cost {} ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() - currentTime);
return filterMaxAuditVersion(statDataList);
}

/**
* @param allStatData
* @return
*/
public List<StatData> filterMaxAuditVersion(List<StatData> allStatData) {
HashMap<String, List<StatData>> allData = new HashMap<>();
for (StatData statData : allStatData) {
String dataKey = CacheUtils.buildCacheKey(
statData.getLogTs(),
statData.getInlongGroupId(),
statData.getInlongStreamId(),
statData.getAuditId(),
statData.getAuditTag());
List<StatData> statDataList = allData.computeIfAbsent(dataKey, k -> new LinkedList<>());
statDataList.add(statData);
}
List<StatData> result = new LinkedList<>();
for (Map.Entry<String, List<StatData>> entry : allData.entrySet()) {
long maxAuditVersion = Long.MIN_VALUE;
for (StatData maxData : entry.getValue()) {
maxAuditVersion =
maxData.getAuditVersion() > maxAuditVersion ? maxData.getAuditVersion() : maxAuditVersion;
}
for (StatData statData : entry.getValue()) {
if (statData.getAuditVersion() == maxAuditVersion) {
result.add(statData);
break;
}
}
LOGGER.info("Change another audit source to query data! Params is: {} {} {} {} {}",
startTime, endTime, inlongGroupId, inlongStreamId, auditId);
}
return statDataList;
return result;
}

/**
Expand Down Expand Up @@ -175,6 +210,7 @@ private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, Str
data.setCount(resultSet.getLong(6));
data.setSize(resultSet.getLong(7));
data.setDelay(resultSet.getLong(8));
data.setAuditVersion(resultSet.getLong(9));
result.add(data);
}
} catch (SQLException sqlException) {
Expand Down Expand Up @@ -203,6 +239,8 @@ public List<StatData> queryIdsByIp(String startTime, String endTime, String ip,
break;
}
}
LOGGER.info("Query ids by params:{} {} {} {}, result size:{} ", startTime,
endTime, ip, auditId, statDataList.size());
return statDataList;
}

Expand Down Expand Up @@ -265,6 +303,8 @@ public List<StatData> queryIpsById(String startTime, String endTime, String inlo
break;
}
}
LOGGER.info("Query ips by params:{} {} {} {} {}, result size:{} ",
startTime, endTime, inlongGroupId, inlongStreamId, auditId, statDataList.size());
return statDataList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,24 @@ public class ConfigConstants {
public static final String KEY_MYSQL_USERNAME = "mysql.username";
public static final String KEY_MYSQL_PASSWORD = "mysql.password";

public static final String KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS = "datasource.max.total.connections";
public static final int DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS = 10;

public static final String KEY_DATASOURCE_MAX_IDLE_CONNECTIONS = "datasource.max.idle.connections";
public static final int DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS = 2;

public static final String KEY_DATASOURCE_MIN_IDLE_CONNECTIONS = "datasource.min.idle.connections";
public static final int DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS = 1;

public static final String KEY_DATASOURCE_DETECT_INTERVAL_MS = "datasource.detect.interval.ms";
public static final int DEFAULT_DATASOURCE_DETECT_INTERVAL_MS = 60000;

// Time config
public static final String KEY_DATASOURCE_CONNECTION_TIMEOUT = "datasource.connection.timeout.ms";
public static final int DEFAULT_CONNECTION_TIMEOUT = 1000 * 60 * 5;
public static final String KEY_QUEUE_PULL_TIMEOUT = "queue.pull.timeout.ms";
public static final int DEFAULT_QUEUE_PULL_TIMEOUT = 1000;
public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

// Interval config
public static final String KEY_SOURCE_DB_STAT_INTERVAL = "source.db.stat.interval.minute";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class OpenApiConstants {
public static final String DEFAULT_API_GET_IPS_PATH = "/audit/query/getIps";
public static final String KEY_API_GET_IDS_PATH = "api.get.ids.path";
public static final String DEFAULT_API_GET_IDS_PATH = "/audit/query/getIds";
public static final String KEY_API_POOL_SIZE = "api.pool.size";
public static final int DEFAULT_POOL_SIZE = 10;
public static final String KEY_API_THREAD_POOL_SIZE = "api.thread.pool.size";
public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
public static final int DEFAULT_API_BACKLOG_SIZE = 100;
public static final String KEY_API_REAL_LIMITER_QPS = "api.real.limiter.qps";
Expand All @@ -61,7 +61,8 @@ public class OpenApiConstants {
public static final String KEY_HTTP_BODY_ERR_DATA = "data";
public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
public static final String VALUE_HTTP_HEADER_CONTENT_TYPE = "application/json;charset=utf-8";
public static final int BIND_PORT = 80;
public static final String KEY_HTTP_SERVER_BIND_PORT = "api.http.server.bind.port";
public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080;
public static final int HTTP_RESPOND_CODE = 200;
public static final String DEFAULT_PARAMS_AUDIT_TAG = "";
}
Loading

0 comments on commit a2e0f46

Please sign in to comment.