diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java index 1d88285e4c..accd22ebbc 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/source/JdbcSource.java @@ -17,18 +17,20 @@ package org.apache.inlong.audit.source; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import lombok.Data; import org.apache.inlong.audit.channel.DataQueue; import org.apache.inlong.audit.config.Configuration; import org.apache.inlong.audit.entities.SourceConfig; import org.apache.inlong.audit.entities.StartEndTime; import org.apache.inlong.audit.entities.StatData; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import lombok.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.sql.DataSource; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -206,7 +208,7 @@ public void destroy() { /** * Stat server */ - class StatServer implements Runnable { + class StatServer implements Runnable, AutoCloseable { private final int statBackTimes; @@ -228,21 +230,22 @@ public void run() { * Stat by step */ public void statByStep() { - List> futures = new ArrayList<>(); + if (auditIds.isEmpty()) { + return; + } + List> futures = new ArrayList<>(); for (String auditId : auditIds) { - CompletableFuture future = CompletableFuture.supplyAsync(() -> { + CompletableFuture future = CompletableFuture.runAsync(() -> { aggregate(auditId); - return "Task " + auditId + " completed"; }); futures.add(future); } - CompletableFuture[] futureArray = futures.toArray(new CompletableFuture[futures.size()]); - CompletableFuture allFutures = CompletableFuture.allOf(futureArray); - allFutures.join(); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); } /** * Aggregate + * * @param auditId */ public void aggregate(String auditId) { @@ -261,46 +264,51 @@ public void aggregate(String auditId) { /** * Query + * * @param startTime * @param endTime * @param auditId */ public void query(String startTime, String endTime, String auditId) { - try { - Connection connection = dataSource.getConnection(); - if (connection == null || connection.isClosed()) { + try (Connection connection = dataSource.getConnection(); + PreparedStatement pstat = connection.prepareStatement(sourceConfig.getQuerySql())) { + if (connection.isClosed()) { createDataSource(); } - PreparedStatement pstat = connection.prepareStatement(sourceConfig.getQuerySql()); pstat.setString(1, startTime); pstat.setString(2, endTime); pstat.setString(3, auditId); - ResultSet resultSet = pstat.executeQuery(); - while (resultSet.next()) { - String inlongGroupID = resultSet.getString(1); - String InlongStreamID = resultSet.getString(2); - String AuditId = resultSet.getString(3); - String AuditTag = resultSet.getString(4); - long count = resultSet.getLong(5); - long size = resultSet.getLong(6); - long delay = resultSet.getLong(7); - StatData data = new StatData(); - data.setLogTs(startTime); - data.setInlongGroupId(inlongGroupID); - data.setInlongStreamId(InlongStreamID); - data.setAuditId(AuditId); - data.setAuditTag(AuditTag); - data.setCount(count); - data.setSize(size); - data.setDelay(delay); - dataQueue.push(data); + try (ResultSet resultSet = pstat.executeQuery()) { + while (resultSet.next()) { + String inlongGroupID = resultSet.getString(1); + String InlongStreamID = resultSet.getString(2); + String AuditId = resultSet.getString(3); + String AuditTag = resultSet.getString(4); + long count = resultSet.getLong(5); + long size = resultSet.getLong(6); + long delay = resultSet.getLong(7); + StatData data = new StatData(); + data.setLogTs(startTime); + data.setInlongGroupId(inlongGroupID); + data.setInlongStreamId(InlongStreamID); + data.setAuditId(AuditId); + data.setAuditTag(AuditTag); + data.setCount(count); + data.setSize(size); + data.setDelay(delay); + dataQueue.push(data); + } + } catch (SQLException sqlException) { + LOG.error("Query has SQL exception! ", sqlException); } - resultSet.close(); - pstat.close(); - connection.close(); } catch (Exception exception) { LOG.error("Query has exception! ", exception); } } + + @Override + public void close() throws Exception { + + } } }