Skip to content

Commit

Permalink
Audit-service add local cache for openapi to improve the concurrency …
Browse files Browse the repository at this point in the history
…of service
  • Loading branch information
doleyzi committed Apr 10, 2024
1 parent a8d2027 commit 75a16f9
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_EXPIRED_HOURS;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PI_CACHE_MAX_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_API_CACHE_MAX_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_EXPIRED_HOURS;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_API_CACHE_MAX_SIZE;

Expand All @@ -52,7 +52,7 @@ public class AbstractCache {
protected AbstractCache(AuditCycle auditCycle) {
cache = Caffeine.newBuilder()
.maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE,
DEFAULT_PI_CACHE_MAX_SIZE))
DEFAULT_API_CACHE_MAX_SIZE))
.expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_HOURS,
DEFAULT_API_CACHE_EXPIRED_HOURS), TimeUnit.HOURS)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@
/**
* Cache Of day ,for day openapi
*/
public class CacheOfDay implements AutoCloseable {
public class DayCache implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(CacheOfDay.class);
private static volatile CacheOfDay cacheOfDay = null;
private static final Logger LOG = LoggerFactory.getLogger(DayCache.class);
private static volatile DayCache dayCache = null;
private DataSource dataSource;

private final String querySql;

private CacheOfDay() {
private DayCache() {
createDataSource();
querySql = Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
Expand All @@ -77,15 +77,15 @@ private CacheOfDay() {
* Get instance
* @return
*/
public static CacheOfDay getInstance() {
if (cacheOfDay == null) {
public static DayCache getInstance() {
if (dayCache == null) {
synchronized (Configuration.class) {
if (cacheOfDay == null) {
cacheOfDay = new CacheOfDay();
if (dayCache == null) {
dayCache = new DayCache();
}
}
}
return cacheOfDay;
return dayCache;
}

/**
Expand Down Expand Up @@ -113,22 +113,15 @@ public List<StatData> getData(String startTime, String endTime, String inlongGro
pstat.setString(5, auditId);
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
String inlongGroupID = resultSet.getString(1);
String InlongStreamID = resultSet.getString(2);
String AuditId = resultSet.getString(3);
String AuditTag = resultSet.getString(4);
long count = resultSet.getLong(5);
long size = resultSet.getLong(6);
long delay = resultSet.getLong(7);
StatData data = new StatData();
data.setLogTs(startTime);
data.setInlongGroupId(inlongGroupID);
data.setInlongStreamId(InlongStreamID);
data.setAuditId(AuditId);
data.setAuditTag(AuditTag);
data.setCount(count);
data.setSize(size);
data.setDelay(delay);
data.setInlongGroupId(resultSet.getString(1));
data.setInlongStreamId(resultSet.getString(2));
data.setAuditId(resultSet.getString(3));
data.setAuditTag(resultSet.getString(4));
data.setCount(resultSet.getLong(5));
data.setSize(resultSet.getLong(6));
data.setDelay(resultSet.getLong(7));
result.add(data);
}
} catch (SQLException sqlException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
/**
* Cache Of minute 30 ,for minute 30 openapi
*/
public class CacheOfMinute30 extends AbstractCache {
public class HalfHourCache extends AbstractCache {

private static volatile CacheOfMinute30 cacheOfMinute30 = null;
private static volatile HalfHourCache halfHourCache = null;

private CacheOfMinute30() {
private HalfHourCache() {
super(AuditCycle.MINUTE_30);
}

public static CacheOfMinute30 getInstance() {
if (cacheOfMinute30 == null) {
public static HalfHourCache getInstance() {
if (halfHourCache == null) {
synchronized (Configuration.class) {
if (cacheOfMinute30 == null) {
cacheOfMinute30 = new CacheOfMinute30();
if (halfHourCache == null) {
halfHourCache = new HalfHourCache();
}
}
}
return cacheOfMinute30;
return halfHourCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
/**
* Cache Of hour ,for hour openapi
*/
public class CacheOfHour extends AbstractCache {
public class HourCache extends AbstractCache {

private static volatile CacheOfHour cacheOfHour = null;
private CacheOfHour() {
private static volatile HourCache hourCache = null;
private HourCache() {
super(AuditCycle.HOUR);
}
public static CacheOfHour getInstance() {
if (cacheOfHour == null) {
public static HourCache getInstance() {
if (hourCache == null) {
synchronized (Configuration.class) {
if (cacheOfHour == null) {
cacheOfHour = new CacheOfHour();
if (hourCache == null) {
hourCache = new HourCache();
}
}
}
return cacheOfHour;
return hourCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
/**
* Cache Of minute 10 ,for minute 10 openapi
*/
public class CacheOfMinute10 extends AbstractCache {
public class TenMinutesCache extends AbstractCache {

private static volatile CacheOfMinute10 cacheOfMinute10 = null;
private static volatile TenMinutesCache tenMinutesCache = null;

private CacheOfMinute10() {
private TenMinutesCache() {
super(AuditCycle.MINUTE_10);
}
public static CacheOfMinute10 getInstance() {
if (cacheOfMinute10 == null) {
public static TenMinutesCache getInstance() {
if (tenMinutesCache == null) {
synchronized (Configuration.class) {
if (cacheOfMinute10 == null) {
cacheOfMinute10 = new CacheOfMinute10();
if (tenMinutesCache == null) {
tenMinutesCache = new TenMinutesCache();
}
}
}
return cacheOfMinute10;
return tenMinutesCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ public class ConfigConstants {
public static final int RANDOM_BOUND = 10;

// Cache config
public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.size";
public static final int DEFAULT_PI_CACHE_MAX_SIZE = 50000000;
public static final String KEY_API_CACHE_MAX_SIZE = "api.cache.max.size";
public static final int DEFAULT_API_CACHE_MAX_SIZE = 50000000;

public static final String KEY_API_CACHE_EXPIRED_HOURS = "api.cache.expired.hours";
public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.inlong.audit.service;

import org.apache.inlong.audit.cache.CacheOfHour;
import org.apache.inlong.audit.cache.CacheOfMinute10;
import org.apache.inlong.audit.cache.CacheOfMinute30;
import org.apache.inlong.audit.cache.HalfHourCache;
import org.apache.inlong.audit.cache.HourCache;
import org.apache.inlong.audit.cache.TenMinutesCache;
import org.apache.inlong.audit.channel.DataQueue;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.AuditCycle;
Expand Down Expand Up @@ -66,15 +66,15 @@ public class EtlService {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class);
private JdbcSource mysqlSourceOfTemp;
private JdbcSource mysqlSourceOfCacheMinute10;
private JdbcSource mysqlSourceOfCacheMinute30;
private JdbcSource mysqlSourceOfCacheHour;
private JdbcSource mysqlSourceOfTenMinutesCache;
private JdbcSource mysqlSourceOfHalfHourCache;
private JdbcSource mysqlSourceOfHourCache;
private JdbcSink mysqlSinkOfDay;
private JdbcSource clickhouseSource;
private JdbcSink mysqlSinkOfTemp;
private CacheSink cacheSinkOfMinute10;
private CacheSink cacheSinkOfMinute30;
private CacheSink cacheSinkOfHour;
private CacheSink cacheSinkOfTenMinutesCache;
private CacheSink cacheSinkOfHalfHourCache;
private CacheSink cacheSinkOfHourCache;
private final int queueSize;
private final int statBackTimes;

Expand All @@ -91,9 +91,9 @@ public EtlService() {
public void start() {
clickhouseToMysql();
mysqlToMysqlOfDay();
mysqlToCacheOfMinute10();
mysqlToCacheOfMinute30();
mysqlToCacheOfHour();
mysqlToTenMinutesCache();
mysqlToHalfHourCache();
mysqlToHourCache();
}

/**
Expand All @@ -117,39 +117,39 @@ private void mysqlToMysqlOfDay() {
/**
* Aggregate data from mysql data source and store in local cache for openapi.
*/
private void mysqlToCacheOfMinute10() {
private void mysqlToTenMinutesCache() {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfCacheMinute10 =
mysqlSourceOfTenMinutesCache =
new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes));
mysqlSourceOfCacheMinute10.start();
mysqlSourceOfTenMinutesCache.start();

cacheSinkOfMinute10 = new CacheSink(dataQueue, CacheOfMinute10.getInstance().getCache());
cacheSinkOfMinute10.start();
cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, TenMinutesCache.getInstance().getCache());
cacheSinkOfTenMinutesCache.start();
}

/**
* Aggregate data from mysql data source and store in local cache for openapi.
*/
private void mysqlToCacheOfMinute30() {
private void mysqlToHalfHourCache() {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfCacheMinute30 =
mysqlSourceOfHalfHourCache =
new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes));
mysqlSourceOfCacheMinute30.start();
mysqlSourceOfHalfHourCache.start();

cacheSinkOfMinute30 = new CacheSink(dataQueue, CacheOfMinute30.getInstance().getCache());
cacheSinkOfMinute30.start();
cacheSinkOfHalfHourCache = new CacheSink(dataQueue, HalfHourCache.getInstance().getCache());
cacheSinkOfHalfHourCache.start();
}

/**
* Aggregate data from mysql data source and store in local cache for openapi.
*/
private void mysqlToCacheOfHour() {
private void mysqlToHourCache() {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfCacheHour = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes));
mysqlSourceOfCacheHour.start();
mysqlSourceOfHourCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes));
mysqlSourceOfHourCache.start();

cacheSinkOfHour = new CacheSink(dataQueue, CacheOfHour.getInstance().getCache());
cacheSinkOfHour.start();
cacheSinkOfHourCache = new CacheSink(dataQueue, HourCache.getInstance().getCache());
cacheSinkOfHourCache.start();
}

/**
Expand Down Expand Up @@ -252,12 +252,12 @@ public void stop() {
clickhouseSource.destroy();
mysqlSinkOfTemp.destroy();

mysqlSourceOfCacheMinute10.destroy();
mysqlSourceOfCacheMinute30.destroy();
mysqlSourceOfCacheHour.destroy();
mysqlSourceOfTenMinutesCache.destroy();
mysqlSourceOfHalfHourCache.destroy();
mysqlSourceOfHourCache.destroy();

cacheSinkOfMinute10.destroy();
cacheSinkOfMinute30.destroy();
cacheSinkOfHour.destroy();
cacheSinkOfTenMinutesCache.destroy();
cacheSinkOfHalfHourCache.destroy();
cacheSinkOfHourCache.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ private void process() {
cache.put(cacheKey, data);
data = dataQueue.pull(pullTimeOut, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
LOG.error("Process exception! {}", e.getMessage());
} catch (Exception exception) {
LOG.error("Process exception! ", exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void start() {
ScheduledExecutorService timer =
statTimers.computeIfAbsent(statBackTime, k -> Executors.newSingleThreadScheduledExecutor());
timer.scheduleWithFixedDelay(new StatServer(statBackTime),
statBackTime,
0,
statInterval, TimeUnit.MINUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public class CacheUtils {

public static String buildCacheKey(String logTs, String inlongGroupId, String inlongStreamId,
String auditId, String auditTag) {
StringBuilder keyBuilder = new StringBuilder();
return keyBuilder
return new StringBuilder()
.append(logTs)
.append(inlongGroupId)
.append(inlongStreamId)
Expand Down

0 comments on commit 75a16f9

Please sign in to comment.