Skip to content

Commit

Permalink
Audit-service add codes of jdbc source
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi committed Apr 9, 2024
1 parent 5b47790 commit fb4f79f
Showing 1 changed file with 44 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.inlong.audit.source;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Data;
import org.apache.inlong.audit.channel.DataQueue;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.SourceConfig;
import org.apache.inlong.audit.entities.StartEndTime;
import org.apache.inlong.audit.entities.StatData;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -206,7 +208,7 @@ public void destroy() {
/**
* Stat server
*/
class StatServer implements Runnable {
class StatServer implements Runnable, AutoCloseable {

private final int statBackTimes;

Expand All @@ -228,21 +230,22 @@ public void run() {
* Stat by step
*/
public void statByStep() {
List<CompletableFuture<String>> futures = new ArrayList<>();
if (auditIds.isEmpty()) {
return;
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String auditId : auditIds) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
aggregate(auditId);
return "Task " + auditId + " completed";
});
futures.add(future);
}
CompletableFuture[] futureArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureArray);
allFutures.join();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
}

/**
* Aggregate
*
* @param auditId
*/
public void aggregate(String auditId) {
Expand All @@ -261,46 +264,51 @@ public void aggregate(String auditId) {

/**
* Query
*
* @param startTime
* @param endTime
* @param auditId
*/
public void query(String startTime, String endTime, String auditId) {
try {
Connection connection = dataSource.getConnection();
if (connection == null || connection.isClosed()) {
try (Connection connection = dataSource.getConnection();
PreparedStatement pstat = connection.prepareStatement(sourceConfig.getQuerySql())) {
if (connection.isClosed()) {
createDataSource();
}
PreparedStatement pstat = connection.prepareStatement(sourceConfig.getQuerySql());
pstat.setString(1, startTime);
pstat.setString(2, endTime);
pstat.setString(3, auditId);
ResultSet resultSet = pstat.executeQuery();
while (resultSet.next()) {
String inlongGroupID = resultSet.getString(1);
String InlongStreamID = resultSet.getString(2);
String AuditId = resultSet.getString(3);
String AuditTag = resultSet.getString(4);
long count = resultSet.getLong(5);
long size = resultSet.getLong(6);
long delay = resultSet.getLong(7);
StatData data = new StatData();
data.setLogTs(startTime);
data.setInlongGroupId(inlongGroupID);
data.setInlongStreamId(InlongStreamID);
data.setAuditId(AuditId);
data.setAuditTag(AuditTag);
data.setCount(count);
data.setSize(size);
data.setDelay(delay);
dataQueue.push(data);
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
String inlongGroupID = resultSet.getString(1);
String InlongStreamID = resultSet.getString(2);
String AuditId = resultSet.getString(3);
String AuditTag = resultSet.getString(4);
long count = resultSet.getLong(5);
long size = resultSet.getLong(6);
long delay = resultSet.getLong(7);
StatData data = new StatData();
data.setLogTs(startTime);
data.setInlongGroupId(inlongGroupID);
data.setInlongStreamId(InlongStreamID);
data.setAuditId(AuditId);
data.setAuditTag(AuditTag);
data.setCount(count);
data.setSize(size);
data.setDelay(delay);
dataQueue.push(data);
}
} catch (SQLException sqlException) {
LOG.error("Query has SQL exception! ", sqlException);
}
resultSet.close();
pstat.close();
connection.close();
} catch (Exception exception) {
LOG.error("Query has exception! ", exception);
}
}

@Override
public void close() throws Exception {

}
}
}

0 comments on commit fb4f79f

Please sign in to comment.