diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java index b0d2e17b54f..180d6930a74 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/entity/AuditComponent.java @@ -19,7 +19,7 @@ public enum AuditComponent { - AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"), COMMON_AUDIT("Common"); + AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"); private final String component; /** diff --git a/inlong-audit/audit-docker/Dockerfile b/inlong-audit/audit-docker/Dockerfile index 2f842c4b9e7..e77126d047d 100644 --- a/inlong-audit/audit-docker/Dockerfile +++ b/inlong-audit/audit-docker/Dockerfile @@ -41,6 +41,9 @@ ENV AUDIT_JDBC_URL=127.0.0.1:3306 ENV AUDIT_JDBC_USERNAME=root ENV AUDIT_JDBC_PASSWORD=inlong +# Audit Proxy host +ENV AUDIT_PROXY_HOST=127.0.0.1:10081 + # jvm ENV AUDIT_JVM_HEAP_OPTS="-XX:+UseContainerSupport -XX:InitialRAMPercentage=40.0 -XX:MaxRAMPercentage=80.0 -XX:-UseAdaptiveSizePolicy" WORKDIR /opt/inlong-audit diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index 2c73b18c347..187d1b96edb 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -61,6 +61,9 @@ sed -i "s/audit.store.jdbc.password=.*$/audit.store.jdbc.password=${AUDIT_JDBC_P sed -i "s/mysql.jdbc.url=.*$/mysql.jdbc.url=jdbc:mysql:\/\/${AUDIT_JDBC_URL}\/${AUDIT_DBNAME}/g" "${service_conf_file}" sed -i "s/mysql.jdbc.username=.*$/mysql.jdbc.username=${AUDIT_JDBC_USERNAME}/g" "${service_conf_file}" sed -i "s/mysql.jdbc.password=.*$/mysql.jdbc.password=${AUDIT_JDBC_PASSWORD}/g" "${service_conf_file}" +sed -i "s/audit.proxy.host.list.agent=.*$/audit.proxy.host.list.agent = ${AUDIT_PROXY_HOST}/g" "${service_conf_file}" +sed -i "s/audit.proxy.host.list.dataproxy=.*$/audit.proxy.host.list.dataproxy = ${AUDIT_PROXY_HOST}/g" "${service_conf_file}" +sed -i "s/audit.proxy.host.list.sort=.*$/audit.proxy.host.list.sort = ${AUDIT_PROXY_HOST}/g" "${service_conf_file}" # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. if [[ "${AUDIT_JDBC_URL}" =~ (.+):([0-9]+) ]]; then diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java deleted file mode 100644 index e4f23a6ed8c..00000000000 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/heartbeat/Heartbeat.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.heartbeat; - -import org.apache.inlong.audit.file.ConfigManager; -import org.apache.inlong.common.util.NetworkUtils; - -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -import static org.apache.inlong.audit.entity.AuditComponent.COMMON_AUDIT; - -public class Heartbeat { - - private static final Logger LOGGER = LoggerFactory.getLogger(Heartbeat.class); - private final static String HEARTBEAT_PATH = "/audit/proxy/heartbeat"; - private static final String AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY = "audit.heartbeat.interval"; - private static final String AUDIT_SERVICE_HOST_CONFIG_KEY = "audit.service.host"; - private static final String AUDIT_SERVICE_PORT_CONFIG_KEY = "agent1.sources.tcp-source.port"; - private static final String AUDIT_COMPONENT_CONFIG_KEY = "audit.component"; - private String heartbeatHost; - private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); - private final String localIp; - private final String localPort; - - public Heartbeat() { - localIp = NetworkUtils.getLocalIp(); - localPort = getLocalPort(); - } - - public void Start() { - heartbeatHost = getConfiguredValue(AUDIT_SERVICE_HOST_CONFIG_KEY); - timer.scheduleWithFixedDelay(this::heartbeat, - 1, - getConfiguredInterval(), - TimeUnit.MINUTES); - } - - private void heartbeat() { - if (heartbeatHost == null || localPort == null) { - LOGGER.info("Heartbeat is not configure, Don`t need heartbeat"); - return; - } - try (CloseableHttpClient httpClient = HttpClients.createDefault()) { - URIBuilder uriBuilder = new URIBuilder("http://" + heartbeatHost + HEARTBEAT_PATH); - uriBuilder.setParameter("component", getConfiguredComponent()); - uriBuilder.setParameter("host", localIp); - uriBuilder.setParameter("port", localPort); - - HttpGet httpGet = new HttpGet(uriBuilder.build()); - - try (CloseableHttpResponse response = httpClient.execute(httpGet)) { - String responseBody = EntityUtils.toString(response.getEntity()); - LOGGER.info("Heartbeat response: {}", responseBody); - } - } catch (Exception exception) { - LOGGER.error("Heartbeat has exception", exception); - } - } - - private int getConfiguredInterval() { - String intervalConfigValue = getConfiguredValue(AUDIT_HEARTBEAT_INTERVAL_CONFIG_KEY); - return intervalConfigValue != null ? Integer.parseInt(intervalConfigValue) : 1; - } - - private String getConfiguredComponent() { - String intervalConfigComponent = getConfiguredValue(AUDIT_COMPONENT_CONFIG_KEY); - return intervalConfigComponent != null ? intervalConfigComponent : COMMON_AUDIT.getComponent(); - } - - private String getConfiguredValue(String configKey) { - return ConfigManager.getInstance().getValue(configKey); - } - - private String getLocalPort() { - try (Stream paths = Files.walk(Paths.get("."))) { - Path selectedPath = paths.filter(Files::isRegularFile) - .filter(p -> p.toString().endsWith(".conf")) - .findFirst() - .orElse(null); - if (selectedPath != null) { - String port = loadProperties(selectedPath); - LOGGER.info("File: {} , TCP Source Port: {}", selectedPath, port); - if (port != null) { - return port; - } - } - } catch (IOException e) { - LOGGER.error("Get local port has error", e); - } - return null; - } - - private String loadProperties(Path path) { - Properties prop = new Properties(); - try { - prop.load(Files.newInputStream(path)); - return prop.getProperty(AUDIT_SERVICE_PORT_CONFIG_KEY); - } catch (IOException e) { - LOGGER.error("Load properties has error", e); - } - return null; - } -} diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java index fcd32fb4935..4ab7f29e515 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java @@ -18,7 +18,6 @@ package org.apache.inlong.audit.node; import org.apache.inlong.audit.file.ConfigManager; -import org.apache.inlong.audit.heartbeat.Heartbeat; import com.google.common.base.Throwables; import com.google.common.collect.Lists; @@ -77,8 +76,6 @@ public class Application { private MonitorService monitorServer; private final ReentrantLock lifecycleLock = new ReentrantLock(); - private static final Heartbeat heartbeat = new Heartbeat(); - public Application() { this(new ArrayList(0)); } @@ -338,8 +335,6 @@ public static void main(String[] args) { application.handleConfigurationEvent(configurationProvider.getConfiguration()); } - heartbeat.Start(); - // start application application.start(); diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java index f05525096c3..500f5b92cfc 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AuditProxyCache.java @@ -18,217 +18,90 @@ package org.apache.inlong.audit.cache; import org.apache.inlong.audit.config.Configuration; -import org.apache.inlong.audit.entities.JdbcConfig; import org.apache.inlong.audit.entity.AuditProxy; -import org.apache.inlong.audit.utils.JdbcUtils; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import org.apache.commons.dbcp.BasicDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_MINUTES; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_MINUTES; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_HOST_LIST_AGENT; +import static org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_HOST_LIST_DATAPROXY; +import static org.apache.inlong.audit.config.ProxyConstants.DEFAULT_AUDIT_PROXY_HOST_LIST_SORT; +import static org.apache.inlong.audit.config.ProxyConstants.IP_PORT_SEPARATOR; +import static org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_HOST_LIST_AGENT; +import static org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_HOST_LIST_DATAPROXY; +import static org.apache.inlong.audit.config.ProxyConstants.KEY_AUDIT_PROXY_HOST_LIST_SORT; +import static org.apache.inlong.audit.config.ProxyConstants.PROXY_SEPARATOR; +import static org.apache.inlong.audit.entity.AuditComponent.AGENT; +import static org.apache.inlong.audit.entity.AuditComponent.DATAPROXY; +import static org.apache.inlong.audit.entity.AuditComponent.SORT; public class AuditProxyCache { private static final Logger LOGGER = LoggerFactory.getLogger(AuditProxyCache.class); private static final AuditProxyCache instance = new AuditProxyCache(); - private final Cache> cache; - protected final ScheduledExecutorService monitorTimer = Executors.newSingleThreadScheduledExecutor(); - private final BasicDataSource dataSource = new BasicDataSource(); - private final String queryAllAuditProxyHostSQL; - private final String queryAuditProxyHostByComponentSQL; - - private final String queryAuditProxyHeartbeatSQL; - private final String queryAuditProxyHeartbeatByComponentSQL; + private final Map> auditProxyCache = new HashMap<>(); private AuditProxyCache() { - cache = Caffeine.newBuilder() - .maximumSize(Configuration.getInstance().get(KEY_API_CACHE_MAX_SIZE, - DEFAULT_API_CACHE_MAX_SIZE)) - .expireAfterWrite(Configuration.getInstance().get(KEY_API_CACHE_EXPIRED_MINUTES, - DEFAULT_API_CACHE_EXPIRED_MINUTES), TimeUnit.MINUTES) - .build(); - queryAuditProxyHostByComponentSQL = Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL, - DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL); - - queryAllAuditProxyHostSQL = Configuration.getInstance().get(KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL, - DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL); - queryAuditProxyHeartbeatSQL = Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL, - DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL); - queryAuditProxyHeartbeatByComponentSQL = - Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL, - DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL); - - initDataSource(); - monitorTimer.scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - update(); - } - }, 0, 1, TimeUnit.MINUTES); } - private void initDataSource() { - JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig(); - dataSource.setDriverClassName(jdbcConfig.getDriverClass()); - dataSource.setUrl(jdbcConfig.getJdbcUrl()); - dataSource.setUsername(jdbcConfig.getUserName()); - dataSource.setPassword(jdbcConfig.getPassword()); - dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS, - DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS)); - dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS, - DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS)); - dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS, - DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS)); - dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS, - DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS)); - dataSource.setTestOnBorrow(true); - dataSource.setValidationQuery("SELECT 1"); - dataSource - .setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS, - DEFAULT_DATASOURCE_DETECT_INTERVAL_MS)); + public boolean init() { + return initializeAuditProxyCache(); } - public static AuditProxyCache getInstance() { - return instance; - } + private boolean initializeAuditProxyCache() { + AtomicBoolean isSuccess = new AtomicBoolean(true); + Map proxyConfigs = getProxyConfigs(); + proxyConfigs.forEach((component, proxyList) -> { + List auditProxies = createAuditProxySet(proxyList); + if (auditProxies.isEmpty()) { + LOGGER.error("Audit proxy config is invalid for component: {} {}", component, proxyList); + isSuccess.set(false); + } else { + auditProxyCache.put(component, auditProxies); + } + }); - public List getData(String component) { - HashSet result = cache.getIfPresent(component); - if (result != null) { - return new ArrayList<>(result); - } - result = queryAuditProxyInfo(component); - if (result.isEmpty()) { - result = queryAuditProxyHeartbeat(component); - } - if (!result.isEmpty()) { - cache.put(component, result); - } - return new ArrayList<>(result); + return isSuccess.get(); } - private void update() { - Map> auditProxyInfo = queryAllAuditProxyInfo(); - if (auditProxyInfo.isEmpty()) { - auditProxyInfo = queryAuditProxyHeartbeat(); - } - if (auditProxyInfo.isEmpty()) { - return; - } - for (Map.Entry> entry : auditProxyInfo.entrySet()) { - try { - cache.put(entry.getKey(), entry.getValue()); - } catch (Exception e) { - LOGGER.error("Put data into audit proxy cache has exception! ", e); - // Decide whether to break or continue based on your requirement - break; - } - } + private Map getProxyConfigs() { + Configuration config = Configuration.getInstance(); + Map proxyConfigs = new HashMap<>(); + proxyConfigs.put(AGENT.getComponent(), + config.get(KEY_AUDIT_PROXY_HOST_LIST_AGENT, DEFAULT_AUDIT_PROXY_HOST_LIST_AGENT)); + proxyConfigs.put(DATAPROXY.getComponent(), + config.get(KEY_AUDIT_PROXY_HOST_LIST_DATAPROXY, DEFAULT_AUDIT_PROXY_HOST_LIST_DATAPROXY)); + proxyConfigs.put(SORT.getComponent(), + config.get(KEY_AUDIT_PROXY_HOST_LIST_SORT, DEFAULT_AUDIT_PROXY_HOST_LIST_SORT)); + return proxyConfigs; } - private Map> queryAllAuditProxyInfo() { - Map> result = new HashMap<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(queryAllAuditProxyHostSQL); - ResultSet resultSet = statement.executeQuery()) { - while (resultSet.next()) { - String component = resultSet.getString("component"); - AuditProxy auditProxyInfo = new AuditProxy(resultSet.getString("host"), resultSet.getInt("port")); - result.computeIfAbsent(component, k -> new HashSet<>()).add(auditProxyInfo); - } - } catch (Exception exception) { - LOGGER.error("Query audit proxy info has exception! ", exception); - } - return result; + private List createAuditProxySet(String proxyList) { + return Arrays.stream(proxyList.split(PROXY_SEPARATOR)) + .map(element -> element.split(IP_PORT_SEPARATOR)) + .filter(ipPort -> ipPort.length == 2) + .map(ipPort -> new AuditProxy(ipPort[0], Integer.parseInt(ipPort[1]))) + .collect(Collectors.toList()); } - private HashSet queryAuditProxyInfo(String component) { - HashSet result = new HashSet<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(queryAuditProxyHostByComponentSQL)) { - statement.setString(1, component); - try (ResultSet resultSet = statement.executeQuery()) { - while (resultSet.next()) { - AuditProxy auditProxyInfo = new AuditProxy(resultSet.getString("host"), resultSet.getInt("port")); - result.add(auditProxyInfo); - } - } catch (SQLException sqlException) { - LOGGER.error("Query audit proxy info by {} has SQL exception ", component, sqlException); - } - } catch (Exception exception) { - LOGGER.error("Query audit proxy info by {} has exception ", component, exception); - } - return result; + public static AuditProxyCache getInstance() { + return instance; } - private Map> queryAuditProxyHeartbeat() { - Map> result = new HashMap<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(queryAuditProxyHeartbeatSQL); - ResultSet resultSet = statement.executeQuery()) { - while (resultSet.next()) { - String component = resultSet.getString("component"); - AuditProxy auditProxyInfo = new AuditProxy(resultSet.getString("host"), resultSet.getInt("port")); - result.computeIfAbsent(component, k -> new HashSet<>()).add(auditProxyInfo); - } - } catch (Exception exception) { - LOGGER.error("Query audit proxy info has exception! ", exception); + public List getData(String component) { + List result = auditProxyCache.get(component); + if (result == null) { + return new LinkedList<>(); } return result; } - private HashSet queryAuditProxyHeartbeat(String component) { - HashSet result = new HashSet<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(queryAuditProxyHeartbeatByComponentSQL)) { - statement.setString(1, component); - try (ResultSet resultSet = statement.executeQuery()) { - while (resultSet.next()) { - AuditProxy auditProxyInfo = new AuditProxy(resultSet.getString("host"), resultSet.getInt("port")); - result.add(auditProxyInfo); - } - } catch (SQLException sqlException) { - LOGGER.error("Query audit proxy info by {} has SQL exception ", component, sqlException); - } - } catch (Exception exception) { - LOGGER.error("Query audit proxy info by {} has exception ", component, exception); - } - return result; - } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java index 1c22a28fad9..a727eba46bc 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java @@ -35,8 +35,6 @@ public class OpenApiConstants { public static final String DEFAULT_API_GET_IDS_PATH = "/audit/query/getIds"; public static final String KEY_API_GET_AUDIT_PROXY_PATH = "api.get.audit.proxy"; public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH = "/audit/query/getAuditProxy"; - public static final String KEY_API_PROXY_HEART_BEAT_PATH = "api.proxy.heartbeat"; - public static final String DEFAULT_API_PROXY_HEART_BEAT_PATH = "/audit/proxy/heartbeat"; public static final String KEY_API_THREAD_POOL_SIZE = "api.thread.pool.size"; public static final int DEFAULT_API_THREAD_POOL_SIZE = 10; public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size"; @@ -51,9 +49,6 @@ public class OpenApiConstants { public static final String KEY_API_CACHE_EXPIRED_HOURS = "api.cache.expired.hours"; public static final int DEFAULT_API_CACHE_EXPIRED_HOURS = 12; - public static final String KEY_API_CACHE_EXPIRED_MINUTES = "api.cache.expired.minutes"; - public static final int DEFAULT_API_CACHE_EXPIRED_MINUTES = 1; - // Http config public static final String PARAMS_START_TIME = "startTime"; public static final String PARAMS_END_TIME = "endTime"; @@ -72,6 +67,4 @@ public class OpenApiConstants { public static final int DEFAULT_HTTP_SERVER_BIND_PORT = 10080; public static final int HTTP_RESPOND_CODE = 200; public static final String PARAMS_AUDIT_COMPONENT = "component"; - public static final String PARAMS_AUDIT_HOST = "host"; - public static final String PARAMS_AUDIT_PORT = "port"; } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java new file mode 100644 index 00000000000..96e43a87b02 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ProxyConstants.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.config; + +/** + * Proxy constants + */ +public class ProxyConstants { + + public static final String PROXY_SEPARATOR = ";"; + public static final String IP_PORT_SEPARATOR = ":"; + public static final String KEY_AUDIT_PROXY_HOST_LIST_AGENT = "audit.proxy.host.list.agent"; + public static final String DEFAULT_AUDIT_PROXY_HOST_LIST_AGENT = ""; + + public static final String KEY_AUDIT_PROXY_HOST_LIST_DATAPROXY = "audit.proxy.host.list.dataproxy"; + public static final String DEFAULT_AUDIT_PROXY_HOST_LIST_DATAPROXY = ""; + + public static final String KEY_AUDIT_PROXY_HOST_LIST_SORT = "audit.proxy.host.list.sort"; + public static final String DEFAULT_AUDIT_PROXY_HOST_LIST_SORT = ""; + +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java index 9eb3950ea52..8adaa4cbb11 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/SqlConstants.java @@ -163,68 +163,6 @@ public class SqlConstants { public static final String DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL = "replace into audit_data_temp (log_ts,inlong_group_id, inlong_stream_id, audit_id,audit_tag,count, size, delay) " + " values (?,?,?,?,?,?,?,?)"; - - public static final String KEY_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL = "audit.proxy.host.query.component.sql"; - public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_COMPONENT_SQL = - "select\n" + - " host as host,\n" + - " port as port \n" + - "from\n" + - " audit_proxy_host\n" + - "where\n" + - " LOWER(component) = LOWER(?)\n" + - " and status = 1\n" + - "group by\n" + - " host,\n" + - " port"; - - public static final String KEY_AUDIT_PROXY_HOST_QUERY_ALL_SQL = "audit.proxy.host.query.all.sql"; - public static final String DEFAULT_AUDIT_PROXY_HOST_QUERY_ALL_SQL = - "select\n" + - " component,\n" + - " host as host,\n" + - " port as port\n" + - "from\n" + - " audit_proxy_host\n" + - "where\n" + - " status = 1\n" + - "group by\n" + - " component,\n" + - " host,\n" + - " port"; - - public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL = "audit.proxy.heartbeat.query.sql"; - public static final String DEFAULT_AUDIT_PROXY_HEARTBEAT_QUERY_COMPONENT_SQL = - "select\n" + - " host,\n" + - " port\n" + - "from\n" + - " audit_proxy_heartbeat\n" + - "where\n" + - " LOWER(component) = LOWER(?)\n" + - " and update_time > (NOW() - INTERVAL 2 MINUTE)\n" + - "group by\n" + - " host,\n" + - " port "; - - public static final String KEY_AUDIT_PROXY_HEARTBEAT_QUERY_ALL_SQL = "audit.proxy.heartbeat.query.all.sql"; - public static final String DEFAULT_AUDIT_HEARTBEAT_QUERY_ALL_SQL = - "select\n" + - " host,\n" + - " port\n" + - "from\n" + - " audit_proxy_heartbeat\n" + - "where\n" + - " update_time > (NOW() - INTERVAL 2 MINUTE)\n" + - "group by\n" + - " host,\n" + - " port"; - - public static final String KEY_AUDIT_PROXY_HEARTBEAT_SQL = "audit.proxy.heartbeat.sql"; - public static final String DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL = - "replace into audit_proxy_heartbeat (component, host, port)\n" + - "values (?, ?, ?)"; - public static final String KEY_AUDIT_DATA_TEMP_ADD_PARTITION_SQL = "audit.data.temp.add.partition.sql"; public static final String DEFAULT_AUDIT_DATA_TEMP_ADD_PARTITION_SQL = "ALTER TABLE audit_data_temp ADD PARTITION (PARTITION %s VALUES LESS THAN (TO_DAYS('%s')))"; diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java index 04fc9186df7..d87b7e3aa51 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/entities/ApiType.java @@ -21,5 +21,5 @@ * OpenAPI type */ public enum ApiType { - MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, PROXY_HEARTBEAT; + MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY; } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java deleted file mode 100644 index 1a46d757f01..00000000000 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/heartbeat/ProxyHeartbeat.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.audit.heartbeat; - -import org.apache.inlong.audit.cache.AuditProxyCache; -import org.apache.inlong.audit.config.Configuration; -import org.apache.inlong.audit.entities.JdbcConfig; -import org.apache.inlong.audit.utils.JdbcUtils; - -import org.apache.commons.dbcp.BasicDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_DETECT_INTERVAL_MS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_DETECT_INTERVAL_MS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL; -import static org.apache.inlong.audit.config.SqlConstants.KEY_AUDIT_PROXY_HEARTBEAT_SQL; - -public class ProxyHeartbeat { - - private static final Logger LOGGER = LoggerFactory.getLogger(AuditProxyCache.class); - private static final ProxyHeartbeat instance = new ProxyHeartbeat(); - private final BasicDataSource dataSource = new BasicDataSource(); - private final String heartbeatSQL; - - ProxyHeartbeat() { - heartbeatSQL = - Configuration.getInstance().get(KEY_AUDIT_PROXY_HEARTBEAT_SQL, DEFAULT_AUDIT_PROXY_HEARTBEAT_SQL); - initDataSource(); - } - - private void initDataSource() { - JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig(); - setDataSourceConfig(jdbcConfig); - dataSource.setTestOnBorrow(true); - dataSource.setValidationQuery("SELECT 1"); - dataSource.setTimeBetweenEvictionRunsMillis(Configuration.getInstance().get(KEY_DATASOURCE_DETECT_INTERVAL_MS, - DEFAULT_DATASOURCE_DETECT_INTERVAL_MS)); - } - - private void setDataSourceConfig(JdbcConfig jdbcConfig) { - dataSource.setDriverClassName(jdbcConfig.getDriverClass()); - dataSource.setUrl(jdbcConfig.getJdbcUrl()); - dataSource.setUsername(jdbcConfig.getUserName()); - dataSource.setPassword(jdbcConfig.getPassword()); - dataSource.setInitialSize(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS, - DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS)); - dataSource.setMaxActive(Configuration.getInstance().get(KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS, - DEFAULT_DATASOURCE_MAX_TOTAL_CONNECTIONS)); - dataSource.setMaxIdle(Configuration.getInstance().get(KEY_DATASOURCE_MAX_IDLE_CONNECTIONS, - DEFAULT_DATASOURCE_MAX_IDLE_CONNECTIONS)); - dataSource.setMinIdle(Configuration.getInstance().get(KEY_DATASOURCE_MIN_IDLE_CONNECTIONS, - DEFAULT_DATASOURCE_MIX_IDLE_CONNECTIONS)); - } - - public static ProxyHeartbeat getInstance() { - return instance; - } - - public void heartbeat(String component, String host, int port) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement statement = connection.prepareStatement(heartbeatSQL)) { - statement.setString(1, component); - statement.setString(2, host); - statement.setInt(3, port); - statement.executeUpdate(); - } catch (SQLException exception) { - LOGGER.error("Heartbeat {} {} {} has exception ", component, host, port, exception); - } - } -} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java index 3861ff60a28..363b9d239c2 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java @@ -27,9 +27,7 @@ import org.apache.inlong.audit.entities.ApiType; import org.apache.inlong.audit.entities.AuditCycle; import org.apache.inlong.audit.entities.StatData; -import org.apache.inlong.audit.entity.AuditComponent; import org.apache.inlong.audit.entity.AuditProxy; -import org.apache.inlong.audit.heartbeat.ProxyHeartbeat; import com.google.common.util.concurrent.RateLimiter; import com.google.gson.Gson; @@ -57,7 +55,6 @@ import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_PROXY_HEART_BEAT_PATH; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT; @@ -69,7 +66,6 @@ import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_PROXY_HEART_BEAT_PATH; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE; import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA; @@ -79,9 +75,7 @@ import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_HOST; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_PORT; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME; import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id; @@ -96,13 +90,17 @@ import static org.apache.inlong.audit.entities.ApiType.GET_IPS; import static org.apache.inlong.audit.entities.ApiType.HOUR; import static org.apache.inlong.audit.entities.ApiType.MINUTES; -import static org.apache.inlong.audit.entities.ApiType.PROXY_HEARTBEAT; public class ApiService { private static final Logger LOGGER = LoggerFactory.getLogger(ApiService.class); public void start() { + if (!AuditProxyCache.getInstance().init()) { + LOGGER.error("Audit proxy cache init failed! exit..."); + System.exit(1); + } + initHttpServer(); } @@ -130,9 +128,6 @@ private void initHttpServer() { server.createContext( Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH, DEFAULT_API_GET_AUDIT_PROXY_PATH), new AuditHandler(GET_AUDIT_PROXY)); - server.createContext( - Configuration.getInstance().get(KEY_API_PROXY_HEART_BEAT_PATH, DEFAULT_API_PROXY_HEART_BEAT_PATH), - new AuditHandler(PROXY_HEARTBEAT)); server.start(); LOGGER.info("Init http server success. Bind port is: {}", bindPort); } catch (Exception e) { @@ -202,7 +197,6 @@ private Map parseRequestURI(String query) { } } params.putIfAbsent(PARAMS_AUDIT_TAG, DEFAULT_AUDIT_TAG); - params.putIfAbsent(PARAMS_AUDIT_COMPONENT, AuditComponent.COMMON_AUDIT.getComponent()); return params; } @@ -228,10 +222,8 @@ private boolean checkNecessaryParams(Map params) { && params.containsKey(PARAMS_END_TIME) && params.containsKey(PARAMS_AUDIT_ID) && params.containsKey(PARAMS_IP); - case PROXY_HEARTBEAT: - return params.containsKey(PARAMS_AUDIT_HOST) && params.containsKey(PARAMS_AUDIT_PORT); case GET_AUDIT_PROXY: - return true; + return params.containsKey(PARAMS_AUDIT_COMPONENT); default: return false; } @@ -294,11 +286,6 @@ private void handleLegalParams(JsonObject responseJson, Map para AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT)); responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(auditProxy)); break; - case PROXY_HEARTBEAT: - ProxyHeartbeat.getInstance().heartbeat(params.get(PARAMS_AUDIT_COMPONENT), - params.get(PARAMS_AUDIT_HOST), Integer.parseInt(params.get(PARAMS_AUDIT_PORT))); - responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new LinkedList<>())); - break; default: LOGGER.error("Unsupported interface type! type is {}", apiType); responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new LinkedList<>())); diff --git a/inlong-audit/conf/audit-service.properties b/inlong-audit/conf/audit-service.properties index 4d74ff6959a..40c01135b60 100644 --- a/inlong-audit/conf/audit-service.properties +++ b/inlong-audit/conf/audit-service.properties @@ -19,4 +19,9 @@ # mysql config mysql.jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true mysql.username=root -mysql.password=inlong \ No newline at end of file +mysql.password=inlong + +# Audit Proxy config +audit.proxy.host.list.agent=127.0.0.1:10081 +audit.proxy.host.list.dataproxy=127.0.0.1:10081 +audit.proxy.host.list.sort=127.0.0.1:10081 \ No newline at end of file diff --git a/inlong-audit/sql/apache_inlong_audit_mysql.sql b/inlong-audit/sql/apache_inlong_audit_mysql.sql index 57016d95587..e9e114e2e05 100644 --- a/inlong-audit/sql/apache_inlong_audit_mysql.sql +++ b/inlong-audit/sql/apache_inlong_audit_mysql.sql @@ -58,20 +58,21 @@ DEFAULT CHARSET = UTF8 COMMENT ='Inlong audit data table'; -- You can create daily partitions or hourly partitions through the log_ts field. -- The specific partition type is determined based on the actual data volume. -- ---------------------------- -CREATE TABLE IF NOT EXISTS `audit_data_temp` -( - `log_ts` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 'log timestamp', - `inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong group id', - `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong stream id', - `audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id', - `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag', - `count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count', - `size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size', - `delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay count', - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time', - PRIMARY KEY (`log_ts`,`inlong_group_id`,`inlong_stream_id`,`audit_id`,`audit_tag`) +CREATE TABLE IF NOT EXISTS `audit_data_temp` ( + `log_ts` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 'log timestamp', + `audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id', + `inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong group id', + `inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong stream id', + `audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag', + `count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count', + `size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size', + `delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay count', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time', + PRIMARY KEY (`log_ts`,`audit_id`,`inlong_group_id`,`inlong_stream_id`,`audit_tag`) ) ENGINE = InnoDB -DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data temp table'; +DEFAULT CHARSET = UTF8 COMMENT ='InLong audit data temp table' +PARTITION BY RANGE (to_days(`log_ts`)) +(PARTITION pDefault VALUES LESS THAN (TO_DAYS('1970-01-01'))); -- ---------------------------- -- Table structure for audit_data_day @@ -132,30 +133,3 @@ CREATE TABLE IF NOT EXISTS `audit_source_config` PRIMARY KEY (`source_name`, `jdbc_url`) ) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT = 'Audit source config'; --- ---------------------------- --- Table structure for audit proxy heartbeat --- ---------------------------- -CREATE TABLE IF NOT EXISTS `audit_proxy_heartbeat` -( - `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name, such as: Agent, Sort...', - `host` varchar(64) NOT NULL COMMENT 'Audit proxy IP', - `port` bigint NOT NULL COMMENT 'Audit Proxy Port', - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time', - PRIMARY KEY (`component`, `host`, `port`) -) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Proxy Heartbeat'; - - --- ---------------------------- --- Table structure for audit proxy host --- ---------------------------- -CREATE TABLE IF NOT EXISTS `audit_proxy_host` -( - `component` varchar(64) NOT NULL DEFAULT 'Common' COMMENT 'Component name, such as: Agent, Sort...', - `host` varchar(128) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name', - `port` bigint NOT NULL COMMENT 'Audit Proxy Port', - `status` int(11) DEFAULT '1' COMMENT 'Audit source config status. 0:Offline,1:Online', - `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', - `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', - PRIMARY KEY (`component`, `host`, `port`) -) ENGINE = InnoDB DEFAULT CHARSET = utf8 COMMENT = 'Audit Porxy Host'; -