diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java index d5afb1306e5..d74e162f782 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -57,9 +57,6 @@ public class ConfigConstants { public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000; public static final String KEY_CONFIG_UPDATE_INTERVAL_SECONDS = "config.update.interval.seconds"; public static final int DEFAULT_CONFIG_UPDATE_INTERVAL_SECONDS = 60; - - public static final String KEY_ENABLE_MANAGE_PARTITIONS = "enable.manage.partitions"; - public static final boolean DEFAULT_ENABLE_MANAGE_PARTITIONS = true; public static final String KEY_CHECK_PARTITION_INTERVAL_HOURS = "check.partition.interval.hours"; public static final int DEFAULT_CHECK_PARTITION_INTERVAL_HOURS = 6; @@ -113,4 +110,7 @@ public class ConfigConstants { public static final int MAX_INIT_COUNT = 2; public static final int RANDOM_BOUND = 10; + public static final String KEY_ENABLE_STAT_AUDIT_DAY = "enable.stat.audit.day"; + public static final boolean DEFAULT_ENABLE_STAT_AUDIT_DAY = true; + } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java index b48368b9219..04fee349b5f 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java @@ -170,8 +170,12 @@ public class SqlConstants { public static final String KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = "audit.data.temp.delete.partition.sql"; public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = "ALTER TABLE audit_data_temp DROP PARTITION %s"; - - public static final String KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL = "audit.data.temp.check.partition.sql"; - public static final String DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL = - "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_NAME = 'audit_data_temp' and PARTITION_NAME = ?"; + public static final String KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL = "audit.data.check.partition.sql"; + public static final String DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL = + "SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_NAME = '%s' and PARTITION_NAME = '%s'"; + public static final String KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL = "audit.data.day.add.partition.sql"; + public static final String DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL = + "ALTER TABLE audit_data_day ADD PARTITION (PARTITION %s VALUES LESS THAN (TO_DAYS('%s')))"; + public static final String TABLE_AUDIT_DATA_DAY = "audit_data_day"; + public static final String TABLE_AUDIT_DATA_TEMP = "audit_data_temp"; } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java new file mode 100644 index 00000000000..4704ea1031d --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/PartitionEntity.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.entities; + +import org.apache.inlong.audit.config.Configuration; + +import lombok.Data; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL; + +@Data +public class PartitionEntity { + + private final String tableName; + private final String addPartitionStatement; + private final String deletePartitionStatement; + private final DateTimeFormatter FORMATTER_YYMMDDHH = DateTimeFormatter.ofPattern("yyyyMMdd"); + private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + private String formatPartitionName(LocalDate date) { + return "p" + date.format(FORMATTER_YYMMDDHH); + } + + public PartitionEntity(String tableName, String addPartitionStatement, String deletePartitionStatement) { + this.tableName = tableName; + this.addPartitionStatement = addPartitionStatement; + this.deletePartitionStatement = deletePartitionStatement; + } + + public String getAddPartitionSql(long daysToAdd) { + String partitionValue = LocalDate.now().plusDays(daysToAdd + 1).format(FORMATTER_YY_MM_DD_HH); + return String.format(addPartitionStatement, getAddPartitionName(daysToAdd), partitionValue); + } + + public String getDeletePartitionSql(long daysToDelete) { + return String.format(deletePartitionStatement, getDeletePartitionName(daysToDelete)); + } + + public String getCheckPartitionSql(long partitionDay, boolean isDelete) { + String partitionName = isDelete ? getDeletePartitionName(partitionDay) : getAddPartitionName(partitionDay); + return String.format(Configuration.getInstance().get(KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL, + DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL), tableName, partitionName); + } + + public String getAddPartitionName(long daysToAdd) { + return formatPartitionName(LocalDate.now().plusDays(daysToAdd)); + } + + public String getDeletePartitionName(long daysToDelete) { + return formatPartitionName(LocalDate.now().minusDays(daysToDelete)); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java index 88730a203a6..b43b72c0520 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/SourceConfig.java @@ -32,7 +32,7 @@ public class SourceConfig { private int statBackTimes; private final String driverClassName; private final String jdbcUrl; - private final String username; + private final String userName; private final String password; private boolean needJoin = false; @@ -48,7 +48,7 @@ public SourceConfig(AuditCycle auditCycle, this.statBackTimes = statBackTimes; this.driverClassName = driverClassName; this.jdbcUrl = jdbcUrl; - this.username = username; + this.userName = username; this.password = password; } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java index 067667133de..7df068473a1 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java @@ -25,6 +25,7 @@ import org.apache.inlong.audit.service.ApiService; import org.apache.inlong.audit.service.ConfigService; import org.apache.inlong.audit.service.EtlService; +import org.apache.inlong.audit.service.PartitionManager; import org.apache.inlong.audit.utils.JdbcUtils; import org.apache.inlong.common.util.NetworkUtils; @@ -51,6 +52,8 @@ public static void main(String[] args) { // Periodically obtain audit id and audit course configuration from DB ConfigService.getInstance().start(); + PartitionManager.getInstance().start(); + // Etl service aggregate the data from the data source and store the aggregated data to the target storage etlService.start(); diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java index 95e1cddd75c..a5eb286a081 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/EtlService.java @@ -26,11 +26,14 @@ import org.apache.inlong.audit.entities.JdbcConfig; import org.apache.inlong.audit.entities.SinkConfig; import org.apache.inlong.audit.entities.SourceConfig; +import org.apache.inlong.audit.entities.StatData; +import org.apache.inlong.audit.sink.AuditSink; import org.apache.inlong.audit.sink.CacheSink; import org.apache.inlong.audit.sink.JdbcSink; import org.apache.inlong.audit.source.JdbcSource; import org.apache.inlong.audit.utils.JdbcUtils; +import com.github.benmanes.caffeine.cache.Cache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +41,12 @@ import java.util.List; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_STAT_AUDIT_DAY; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_STAT_AUDIT_DAY; import static org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID; import static org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES; import static org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES; @@ -60,113 +65,70 @@ public class EtlService { private static final Logger LOGGER = LoggerFactory.getLogger(EtlService.class); - private JdbcSource mysqlSourceOfTemp; - private JdbcSource mysqlSourceOfTenMinutesCache; - private JdbcSource mysqlSourceOfHalfHourCache; - private JdbcSource mysqlSourceOfHourCache; - private JdbcSink mysqlSinkOfDay; - private final List auditJdbcSources = new LinkedList<>(); - private JdbcSink mysqlSinkOfTemp; - private CacheSink cacheSinkOfTenMinutesCache; - private CacheSink cacheSinkOfHalfHourCache; - private CacheSink cacheSinkOfHourCache; + + // Statistics of original audit data + private final List originalSources = new LinkedList<>(); private final int queueSize; - private final int statBackTimes; private final String serviceId; + private final Configuration configuration; + + private final List dataFlowSources = new LinkedList<>(); + private final List dataFlowSinks = new LinkedList<>(); public EtlService() { - queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE, + configuration = Configuration.getInstance(); + queueSize = configuration.get(KEY_DATA_QUEUE_SIZE, DEFAULT_DATA_QUEUE_SIZE); - statBackTimes = Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES, - DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES); - serviceId = Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID, DEFAULT_SELECTOR_SERVICE_ID); + serviceId = configuration.get(KEY_SELECTOR_SERVICE_ID, DEFAULT_SELECTOR_SERVICE_ID); } - /** - * Start the etl service. - */ public void start() { - mysqlToMysqlOfDay(); - mysqlToTenMinutesCache(); - mysqlToHalfHourCache(); - mysqlToHourCache(); - } - - /** - * Aggregate data from mysql data source and store the aggregated data in the target mysql table. - * The audit data cycle is days,and stored in table of day. - */ - private void mysqlToMysqlOfDay() { - DataQueue dataQueue = new DataQueue(queueSize); - - mysqlSourceOfTemp = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.DAY, - Configuration.getInstance().get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES, - DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES))); - mysqlSourceOfTemp.start(); - - SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL, - DEFAULT_MYSQL_SINK_INSERT_DAY_SQL)); - mysqlSinkOfDay = new JdbcSink(dataQueue, sinkConfig); - mysqlSinkOfDay.start(); - } - - /** - * Aggregate data from mysql data source and store in local cache for openapi. - */ - private void mysqlToTenMinutesCache() { - DataQueue dataQueue = new DataQueue(queueSize); - mysqlSourceOfTenMinutesCache = - new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes)); - mysqlSourceOfTenMinutesCache.start(); - - cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, TenMinutesCache.getInstance().getCache()); - cacheSinkOfTenMinutesCache.start(); - } + int statBackTimes = configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES, + DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES); - /** - * Aggregate data from mysql data source and store in local cache for openapi. - */ - private void mysqlToHalfHourCache() { - DataQueue dataQueue = new DataQueue(queueSize); - mysqlSourceOfHalfHourCache = - new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes)); - mysqlSourceOfHalfHourCache.start(); + startDataFlow(AuditCycle.MINUTE_10, statBackTimes, TenMinutesCache.getInstance().getCache()); + startDataFlow(AuditCycle.MINUTE_30, statBackTimes, HalfHourCache.getInstance().getCache()); + startDataFlow(AuditCycle.HOUR, statBackTimes, HourCache.getInstance().getCache()); - cacheSinkOfHalfHourCache = new CacheSink(dataQueue, HalfHourCache.getInstance().getCache()); - cacheSinkOfHalfHourCache.start(); + if (configuration.get(KEY_ENABLE_STAT_AUDIT_DAY, DEFAULT_ENABLE_STAT_AUDIT_DAY)) { + statBackTimes = configuration.get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES, DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES); + startDataFlow(AuditCycle.DAY, statBackTimes, null); + } } - /** - * Aggregate data from mysql data source and store in local cache for openapi. - */ - private void mysqlToHourCache() { + private void startDataFlow(AuditCycle cycle, int backTimes, Cache cache) { DataQueue dataQueue = new DataQueue(queueSize); - mysqlSourceOfHourCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes)); - mysqlSourceOfHourCache.start(); - - cacheSinkOfHourCache = new CacheSink(dataQueue, HourCache.getInstance().getCache()); - cacheSinkOfHourCache.start(); + JdbcSource source = new JdbcSource(dataQueue, buildMysqlSourceConfig(cycle, backTimes)); + source.start(); + dataFlowSources.add(source); + + AuditSink sink; + if (cache != null) { + sink = new CacheSink(dataQueue, cache); + } else { + SinkConfig sinkConfig = buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_DAY_SQL, + DEFAULT_MYSQL_SINK_INSERT_DAY_SQL)); + sink = new JdbcSink(dataQueue, sinkConfig); + } + sink.start(); + dataFlowSinks.add(sink); } - /** - * Aggregate data from clickhouse data source and store the aggregated data in the target mysql table. - * The default audit data cycle is 5 minutes,and stored in a temporary table. - * Support multiple audit source clusters. - */ public void auditSourceToMysql() { DataQueue dataQueue = new DataQueue(queueSize); List sourceList = ConfigService.getInstance().getAuditSourceByServiceId(serviceId); for (JdbcConfig jdbcConfig : sourceList) { JdbcSource jdbcSource = new JdbcSource(dataQueue, buildAuditJdbcSourceConfig(jdbcConfig)); jdbcSource.start(); - auditJdbcSources.add(jdbcSource); + originalSources.add(jdbcSource); LOGGER.info("Audit source to mysql jdbc config:{}", jdbcConfig); } - SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_TEMP_SQL, + SinkConfig sinkConfig = buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_TEMP_SQL, DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL)); - mysqlSinkOfTemp = new JdbcSink(dataQueue, sinkConfig); - mysqlSinkOfTemp.start(); + JdbcSink sink = new JdbcSink(dataQueue, sinkConfig); + sink.start(); + dataFlowSinks.add(sink); } /** @@ -193,7 +155,7 @@ private SinkConfig buildMysqlSinkConfig(String insertSql) { private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int statBackTimes) { JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig(); return new SourceConfig(auditCycle, - Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL, + configuration.get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL, DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL), statBackTimes, jdbcConfig.getDriverClass(), @@ -209,9 +171,9 @@ private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int statBackT */ private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) { return new SourceConfig(AuditCycle.MINUTE_5, - Configuration.getInstance().get(KEY_SOURCE_STAT_SQL, + configuration.get(KEY_SOURCE_STAT_SQL, DEFAULT_SOURCE_STAT_SQL), - Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES, + configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES, DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES), jdbcConfig.getDriverClass(), jdbcConfig.getJdbcUrl(), @@ -224,21 +186,14 @@ private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) { * Stop the etl service,and destroy related resources. */ public void stop() { - mysqlSourceOfTemp.destroy(); - mysqlSinkOfDay.destroy(); - - for (JdbcSource source : auditJdbcSources) { + for (JdbcSource source : originalSources) { source.destroy(); } - if (null != mysqlSinkOfTemp) - mysqlSinkOfTemp.destroy(); - - mysqlSourceOfTenMinutesCache.destroy(); - mysqlSourceOfHalfHourCache.destroy(); - mysqlSourceOfHourCache.destroy(); - - cacheSinkOfTenMinutesCache.destroy(); - cacheSinkOfHalfHourCache.destroy(); - cacheSinkOfHourCache.destroy(); + for (JdbcSource source : dataFlowSources) { + source.destroy(); + } + for (AuditSink sink : dataFlowSinks) { + sink.destroy(); + } } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java new file mode 100644 index 00000000000..5238bc1d05a --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/PartitionManager.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service; + +import org.apache.inlong.audit.config.Configuration; +import org.apache.inlong.audit.entities.JdbcConfig; +import org.apache.inlong.audit.entities.PartitionEntity; +import org.apache.inlong.audit.utils.JdbcUtils; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL; +import static org.apache.inlong.audit.config.SqlConstants.TABLE_AUDIT_DATA_DAY; +import static org.apache.inlong.audit.config.SqlConstants.TABLE_AUDIT_DATA_TEMP; + +public class PartitionManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class); + private static volatile PartitionManager partitionManager = null; + private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + private DataSource dataSource; + private final PartitionEntity auditDayTable; + private final PartitionEntity auditTempTable; + private final Configuration configuration; + + public static PartitionManager getInstance() { + if (partitionManager == null) { + synchronized (PartitionManager.class) { + if (partitionManager == null) { + partitionManager = new PartitionManager(); + } + } + } + return partitionManager; + } + + private PartitionManager() { + configuration = Configuration.getInstance(); + createDataSource(); + auditDayTable = createAndAddPartition(TABLE_AUDIT_DATA_DAY, + KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL, + DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL, + null, + null); + auditTempTable = createAndAddPartition(TABLE_AUDIT_DATA_TEMP, + KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL, + DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL, + KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL, + DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL); + } + + private PartitionEntity createAndAddPartition(String tableName, + String addPartitionKey, + String defaultAddPartitionSql, + String deletePartitionKey, + String defaultDeletePartitionSql) { + String addPartitionSql = configuration.get(addPartitionKey, defaultAddPartitionSql); + String deletePartitionSql = + deletePartitionKey != null ? configuration.get(deletePartitionKey, defaultDeletePartitionSql) : null; + PartitionEntity partitionEntity = new PartitionEntity(tableName, addPartitionSql, deletePartitionSql); + addPartition(partitionEntity, 0); + return partitionEntity; + } + + public void start() { + long intervalHours = + configuration.get(KEY_CHECK_PARTITION_INTERVAL_HOURS, DEFAULT_CHECK_PARTITION_INTERVAL_HOURS); + timer.scheduleWithFixedDelay(this::executePartitionManagement, 0, intervalHours, TimeUnit.HOURS); + } + + private void executePartitionManagement() { + try { + managePartition(auditDayTable, false); + managePartition(auditTempTable, true); + } catch (Exception e) { + LOGGER.error("Error occurred while managing partitions", e); + } + } + + private void managePartition(PartitionEntity partitionEntity, boolean delete) { + addPartition(partitionEntity, 1); + if (delete) { + long storageDays = + configuration.get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS, DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS); + deletePartition(partitionEntity, storageDays); + } + } + + private void addPartition(PartitionEntity partitionEntity, long daysToAdd) { + String partitionName = partitionEntity.getAddPartitionName(daysToAdd); + if (isPartitionExist(partitionEntity.getCheckPartitionSql(daysToAdd, false))) { + LOGGER.info("Partition [{}] of [{}] already exists. Don`t need to add.", partitionName, + partitionEntity.getTableName()); + return; + } + executeUpdate(partitionEntity.getAddPartitionSql(daysToAdd)); + } + + private void deletePartition(PartitionEntity partitionEntity, long daysToDelete) { + String partitionName = partitionEntity.getDeletePartitionName(daysToDelete); + if (!isPartitionExist(partitionEntity.getCheckPartitionSql(daysToDelete, true))) { + LOGGER.info("Partition [{}] of [{}] does not exist. Don`t need to delete.", partitionName, + partitionEntity.getTableName()); + return; + } + executeUpdate(partitionEntity.getDeletePartitionSql(daysToDelete)); + } + + private boolean isPartitionExist(String querySql) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(querySql)) { + return isPartitionInResultSet(statement); + } catch (SQLException exception) { + LOGGER.error("An exception occurred while checking partition [{}]:", querySql, exception); + } + return false; + } + + private boolean isPartitionInResultSet(PreparedStatement statement) { + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + return resultSet.getInt("count") > 0; + } + } catch (SQLException sqlException) { + LOGGER.error("An error occurred while processing the result set:", sqlException); + } + return false; + } + + private void executeUpdate(String sql) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + statement.executeUpdate(); + LOGGER.info("Success to manage partition, execute SQL: {}", sql); + } catch (SQLException e) { + LOGGER.error("Failed to execute update: {}", sql, e); + } + } + + private void createDataSource() { + JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig(); + HikariConfig hikariConfig = JdbcUtils.buildHikariConfig( + jdbcConfig.getDriverClass(), + jdbcConfig.getJdbcUrl(), + jdbcConfig.getUserName(), + jdbcConfig.getPassword()); + dataSource = new HikariDataSource(hikariConfig); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java new file mode 100644 index 00000000000..be7250e2dc4 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/AuditSink.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.sink; + +public interface AuditSink { + + void start(); + void destroy(); +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java index 6f5f809a3dd..a695d5d9b52 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/CacheSink.java @@ -38,7 +38,7 @@ /** * Cache sink */ -public class CacheSink { +public class CacheSink implements AuditSink { private static final Logger LOGGER = LoggerFactory.getLogger(CacheSink.class); private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor(); diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java index ac9afc50d4e..e802984da98 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/sink/JdbcSink.java @@ -21,6 +21,7 @@ import org.apache.inlong.audit.config.Configuration; import org.apache.inlong.audit.entities.SinkConfig; import org.apache.inlong.audit.entities.StatData; +import org.apache.inlong.audit.utils.JdbcUtils; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -31,75 +32,41 @@ import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CHECK_PARTITION_INTERVAL_HOURS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_MANAGE_PARTITIONS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_QUEUE_PULL_TIMEOUT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_BATCH; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_SINK_INTERVAL; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_AUDIT_DATA_TEMP_STORAGE_DAYS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_CHECK_PARTITION_INTERVAL_HOURS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_MANAGE_PARTITIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_QUEUE_PULL_TIMEOUT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_BATCH; import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL; -import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL; /** * Jdbc sink */ -public class JdbcSink implements AutoCloseable { +public class JdbcSink implements AutoCloseable, AuditSink { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class); private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor(); - private final ScheduledExecutorService partitionManagerTimer = Executors.newSingleThreadScheduledExecutor(); private final DataQueue dataQueue; private final int insertBatch; private final int pullTimeOut; private final SinkConfig sinkConfig; private DataSource dataSource; - - private final DateTimeFormatter FORMATTER_YYMMDDHH = DateTimeFormatter.ofPattern("yyyyMMdd"); - private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - private final String checkPartitionSql; + private final Configuration configuration; public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) { + configuration = Configuration.getInstance(); this.dataQueue = dataQueue; this.sinkConfig = sinkConfig; - insertBatch = Configuration.getInstance().get(KEY_SOURCE_DB_SINK_BATCH, + insertBatch = configuration.get(KEY_SOURCE_DB_SINK_BATCH, DEFAULT_SOURCE_DB_SINK_BATCH); - pullTimeOut = Configuration.getInstance().get(KEY_QUEUE_PULL_TIMEOUT, + pullTimeOut = configuration.get(KEY_QUEUE_PULL_TIMEOUT, DEFAULT_QUEUE_PULL_TIMEOUT); - checkPartitionSql = Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL, - DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL); } @@ -108,23 +75,11 @@ public JdbcSink(DataQueue dataQueue, SinkConfig sinkConfig) { */ public void start() { createDataSource(); - sinkTimer.scheduleWithFixedDelay(this::process, 0, - Configuration.getInstance().get(KEY_SOURCE_DB_SINK_INTERVAL, + configuration.get(KEY_SOURCE_DB_SINK_INTERVAL, DEFAULT_SOURCE_DB_SINK_INTERVAL), TimeUnit.MILLISECONDS); - if (Configuration.getInstance().get(KEY_ENABLE_MANAGE_PARTITIONS, - DEFAULT_ENABLE_MANAGE_PARTITIONS)) { - // Create MySQL data partition of today - addPartition(0); - - partitionManagerTimer.scheduleWithFixedDelay(this::managePartitions, - 0, - Configuration.getInstance().get(KEY_CHECK_PARTITION_INTERVAL_HOURS, - DEFAULT_CHECK_PARTITION_INTERVAL_HOURS), - TimeUnit.HOURS); - } } /** @@ -169,91 +124,12 @@ private void process() { * Create data source */ protected void createDataSource() { - HikariConfig config = new HikariConfig(); - config.setDriverClassName(sinkConfig.getDriverClassName()); - config.setJdbcUrl(sinkConfig.getJdbcUrl()); - config.setUsername(sinkConfig.getUserName()); - config.setPassword(sinkConfig.getPassword()); - config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT, - DEFAULT_CONNECTION_TIMEOUT)); - config.addDataSourceProperty(CACHE_PREP_STMTS, - Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, DEFAULT_CACHE_PREP_STMTS)); - config.addDataSourceProperty(PREP_STMT_CACHE_SIZE, - Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, DEFAULT_PREP_STMT_CACHE_SIZE)); - config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT, - Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, DEFAULT_PREP_STMT_CACHE_SQL_LIMIT)); - config.setMaximumPoolSize( - Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE, - DEFAULT_DATASOURCE_POOL_SIZE)); - dataSource = new HikariDataSource(config); - } - - private void managePartitions() { - // Create MySQL data partition of tomorrow - addPartition(1); - - deletePartition(); - } - - private String formatPartitionName(LocalDate date) { - return "p" + date.format(FORMATTER_YYMMDDHH); - } - - private void addPartition(long daysToAdd) { - String partitionName = formatPartitionName(LocalDate.now().plusDays(daysToAdd)); - if (isPartitionExists(partitionName)) { - LOGGER.info("Partition [{}] is exist, dont`t need add this partition.", partitionName); - return; - } - String partitionValue = LocalDate.now().plusDays(daysToAdd + 1).format(FORMATTER_YY_MM_DD_HH); - String addPartitionSQL = String.format( - Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL, - DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL), - partitionName, partitionValue); - executeUpdate(addPartitionSQL); - } - - private void deletePartition() { - int daysToSubtract = Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_STORAGE_DAYS, - DEFAULT_AUDIT_DATA_TEMP_STORAGE_DAYS); - String partitionName = formatPartitionName(LocalDate.now().minusDays(daysToSubtract)); - if (!isPartitionExists(partitionName)) { - LOGGER.info("Partition [{}] is not exist, dont`t need delete this partition.", partitionName); - return; - } - String deletePartitionSQL = String.format( - Configuration.getInstance().get(KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL, - DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL), - partitionName); - executeUpdate(deletePartitionSQL); - } - - private void executeUpdate(String updateSQL) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(updateSQL)) { - preparedStatement.executeUpdate(); - LOGGER.info("Execute update [{}] success!", updateSQL); - } catch (Exception exception) { - LOGGER.error("Execute update [{}] has exception!", updateSQL, exception); - } - } - - private boolean isPartitionExists(String partitionName) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(checkPartitionSql)) { - statement.setString(1, partitionName); - - try (ResultSet resultSet = statement.executeQuery()) { - if (resultSet.next()) { - return resultSet.getInt("count") > 0; - } - } catch (SQLException sqlException) { - LOGGER.error("An error occurred while checking partition [{}] existence:", partitionName, sqlException); - } - } catch (Exception exception) { - LOGGER.error("An exception occurred while checking partition [{}]existence:", partitionName, exception); - } - return false; + HikariConfig hikariConfig = JdbcUtils.buildHikariConfig( + sinkConfig.getDriverClassName(), + sinkConfig.getJdbcUrl(), + sinkConfig.getUserName(), + sinkConfig.getPassword()); + dataSource = new HikariDataSource(hikariConfig); } public void destroy() { @@ -261,7 +137,7 @@ public void destroy() { } @Override - public void close() throws Exception { + public void close() { } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java index 511efc1f97c..da009ac62ed 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java @@ -24,6 +24,7 @@ import org.apache.inlong.audit.entities.StatData; import org.apache.inlong.audit.service.ConfigService; import org.apache.inlong.audit.utils.CacheUtils; +import org.apache.inlong.audit.utils.JdbcUtils; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -51,26 +52,13 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS; import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SOURCE_DB_STAT_INTERVAL; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_BACK_INITIAL_OFFSET; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_STAT_THREAD_POOL_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_SOURCE_DB_STAT_INTERVAL; import static org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_BACK_INITIAL_OFFSET; import static org.apache.inlong.audit.config.ConfigConstants.KEY_STAT_THREAD_POOL_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; -import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; import static org.apache.inlong.audit.entities.AuditCycle.DAY; import static org.apache.inlong.audit.entities.AuditCycle.HOUR; @@ -181,23 +169,12 @@ public List getStatCycleOfDay(int daysAgo) { * Create data source */ protected void createDataSource() { - HikariConfig config = new HikariConfig(); - config.setDriverClassName(sourceConfig.getDriverClassName()); - config.setJdbcUrl(sourceConfig.getJdbcUrl()); - config.setUsername(sourceConfig.getUsername()); - config.setPassword(sourceConfig.getPassword()); - config.setConnectionTimeout(Configuration.getInstance().get(KEY_DATASOURCE_CONNECTION_TIMEOUT, - DEFAULT_CONNECTION_TIMEOUT)); - config.addDataSourceProperty(CACHE_PREP_STMTS, - Configuration.getInstance().get(KEY_CACHE_PREP_STMTS, DEFAULT_CACHE_PREP_STMTS)); - config.addDataSourceProperty(PREP_STMT_CACHE_SIZE, - Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SIZE, DEFAULT_PREP_STMT_CACHE_SIZE)); - config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT, - Configuration.getInstance().get(KEY_PREP_STMT_CACHE_SQL_LIMIT, DEFAULT_PREP_STMT_CACHE_SQL_LIMIT)); - config.setMaximumPoolSize( - Configuration.getInstance().get(KEY_DATASOURCE_POOL_SIZE, - DEFAULT_DATASOURCE_POOL_SIZE)); - dataSource = new HikariDataSource(config); + HikariConfig hikariConfig = JdbcUtils.buildHikariConfig( + sourceConfig.getDriverClassName(), + sourceConfig.getJdbcUrl(), + sourceConfig.getUserName(), + sourceConfig.getPassword()); + dataSource = new HikariDataSource(hikariConfig); } /** diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java index fa629a725cf..07f40e2e6b8 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/utils/JdbcUtils.java @@ -20,13 +20,28 @@ import org.apache.inlong.audit.config.Configuration; import org.apache.inlong.audit.entities.JdbcConfig; +import com.zaxxer.hikari.HikariConfig; + import java.util.Objects; +import static org.apache.inlong.audit.config.ConfigConstants.CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_CONNECTION_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_POOL_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PREP_STMT_CACHE_SQL_LIMIT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_CACHE_PREP_STMTS; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_CONNECTION_TIMEOUT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_POOL_SIZE; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DEFAULT_MYSQL_DRIVER; import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_DRIVER; import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_JDBC_URL; import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_PASSWORD; import static org.apache.inlong.audit.config.ConfigConstants.KEY_MYSQL_USERNAME; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PREP_STMT_CACHE_SQL_LIMIT; +import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SIZE; +import static org.apache.inlong.audit.config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; /** * Jdbc utils @@ -64,4 +79,26 @@ private static JdbcConfig doBuild(String driverClass, String jdbcUrl, String use userName, password); } + + public static HikariConfig buildHikariConfig(String driverClassName, String jdbcUrl, String userName, + String passWord) { + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setDriverClassName(driverClassName); + hikariConfig.setJdbcUrl(jdbcUrl); + hikariConfig.setUsername(userName); + hikariConfig.setPassword(passWord); + Configuration configuration = Configuration.getInstance(); + hikariConfig.setConnectionTimeout(configuration.get(KEY_DATASOURCE_CONNECTION_TIMEOUT, + DEFAULT_CONNECTION_TIMEOUT)); + hikariConfig.addDataSourceProperty(CACHE_PREP_STMTS, + configuration.get(KEY_CACHE_PREP_STMTS, DEFAULT_CACHE_PREP_STMTS)); + hikariConfig.addDataSourceProperty(PREP_STMT_CACHE_SIZE, + configuration.get(KEY_PREP_STMT_CACHE_SIZE, DEFAULT_PREP_STMT_CACHE_SIZE)); + hikariConfig.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT, + configuration.get(KEY_PREP_STMT_CACHE_SQL_LIMIT, DEFAULT_PREP_STMT_CACHE_SQL_LIMIT)); + hikariConfig.setMaximumPoolSize( + configuration.get(KEY_DATASOURCE_POOL_SIZE, + DEFAULT_DATASOURCE_POOL_SIZE)); + return hikariConfig; + } } diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql b/inlong-audit/sql/apache_inlong_audit_mysql.sql index e9e114e2e05..6fb07021f50 100644 --- a/inlong-audit/sql/apache_inlong_audit_mysql.sql +++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql @@ -92,7 +92,9 @@ CREATE TABLE IF NOT EXISTS `audit_data_day` `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time', PRIMARY KEY (`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`) ) ENGINE = InnoDB -DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table'; +DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data day table' +PARTITION BY RANGE (to_days(`log_ts`)) +(PARTITION pDefault VALUES LESS THAN (TO_DAYS('1970-01-01'))); -- ---------------------------- -- Table structure for selector