diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java index 67d5183ce58..5a945838aea 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -22,7 +22,6 @@ */ public class ConfigConstants { - public static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy"; public static final String KEY_PROMETHEUS_PORT = "audit.proxy.prometheus.port"; public static final int DEFAULT_PROMETHEUS_PORT = 10082; public static final String KEY_PROXY_METRIC_CLASSNAME = "audit.proxy.metric.classname"; diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java index f27920159b6..b91daf60a23 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java @@ -28,9 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT; import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT; import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME; public class MetricsManager { @@ -44,16 +42,15 @@ private static class Holder { private AbstractMetric metric; - public void init(String metricName) { + public void init() { try { ConfigManager configManager = ConfigManager.getInstance(); String metricClassName = configManager.getValue(KEY_PROXY_METRIC_CLASSNAME, DEFAULT_PROXY_METRIC_CLASSNAME); LOGGER.info("Metric class name: {}", metricClassName); Constructor constructor = Class.forName(metricClassName) - .getDeclaredConstructor(String.class, MetricItem.class, int.class); + .getDeclaredConstructor(MetricItem.class); constructor.setAccessible(true); - metric = (AbstractMetric) constructor.newInstance(metricName, metricItem, - configManager.getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT)); + metric = (AbstractMetric) constructor.newInstance(metricItem); timer.scheduleWithFixedDelay(() -> { metric.report(); @@ -89,9 +86,11 @@ public void addReceiveSuccess(long count, long pack, long size) { public void addSendSuccess(long count) { metricItem.getSendCountSuccess().addAndGet(count); } + public void addSendFailed(long count) { metricItem.getSendCountFailed().addAndGet(count); } + public void shutdown() { timer.shutdown(); metric.stop(); diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java index 0871a613b35..68b22c84b10 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java @@ -17,6 +17,7 @@ package org.apache.inlong.audit.metric.prometheus; +import org.apache.inlong.audit.file.ConfigManager; import org.apache.inlong.audit.metric.AbstractMetric; import org.apache.inlong.audit.metric.MetricDimension; import org.apache.inlong.audit.metric.MetricItem; @@ -31,23 +32,25 @@ import java.util.Collections; import java.util.List; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT; + /** * PrometheusMetric */ public class ProxyPrometheusMetric extends Collector implements AbstractMetric { private static final Logger LOGGER = LoggerFactory.getLogger(ProxyPrometheusMetric.class); - private static final String HELP_DESCRIPTION = "help"; + private static final String HELP_DESCRIPTION = "Audit Proxy metrics help description"; + private static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy"; private final MetricItem metricItem; - private final String metricName; private HTTPServer server; - public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int prometheusPort) { - this.metricName = metricName; + public ProxyPrometheusMetric(MetricItem metricItem) { this.metricItem = metricItem; try { - server = new HTTPServer(prometheusPort); + server = new HTTPServer(ConfigManager.getInstance().getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT)); this.register(); } catch (IOException e) { LOGGER.error("Construct proxy prometheus metric has IOException", e); @@ -66,23 +69,30 @@ public List collect() { createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue())); MetricFamilySamples metricFamilySamples = - new MetricFamilySamples(metricName, Type.GAUGE, HELP_DESCRIPTION, samples); + new MetricFamilySamples(AUDIT_PROXY_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples); return Collections.singletonList(metricFamilySamples); } private MetricFamilySamples.Sample createSample(MetricDimension key, double value) { - return new MetricFamilySamples.Sample(metricName, Collections.singletonList(MetricItem.K_DIMENSION_KEY), + return new MetricFamilySamples.Sample(AUDIT_PROXY_SERVER_NAME, + Collections.singletonList(MetricItem.K_DIMENSION_KEY), Collections.singletonList(key.getKey()), value); } @Override public void report() { - LOGGER.info("Report proxy prometheus metric: {} ", metricItem.toString()); + if (metricItem != null) { + LOGGER.info("Report proxy Prometheus metric: {}", metricItem); + } else { + LOGGER.warn("MetricItem is null, nothing to report."); + } } @Override public void stop() { - server.close(); + if (server != null) { + server.close(); + } } } \ No newline at end of file diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java index 1aa7f6b7d06..92df71e7017 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java @@ -59,8 +59,6 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.inlong.audit.config.ConfigConstants.AUDIT_PROXY_SERVER_NAME; - /** * Application */ @@ -351,7 +349,7 @@ public void run() { } }); - MetricsManager.getInstance().init(AUDIT_PROXY_SERVER_NAME); + MetricsManager.getInstance().init(); } catch (Exception e) { logger.error("A fatal error occurred while running. Exception follows.", e); diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java index 8463501ffdf..8454884db90 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java @@ -15,13 +15,15 @@ * limitations under the License. */ -package org.apache.inlong.audit.cache; +package org.apache.inlong.audit.service.cache; -import org.apache.inlong.audit.config.Configuration; -import org.apache.inlong.audit.entities.AuditCycle; -import org.apache.inlong.audit.entities.CacheKeyEntity; -import org.apache.inlong.audit.entities.StatData; -import org.apache.inlong.audit.utils.CacheUtils; +import org.apache.inlong.audit.service.config.ConfigConstants; +import org.apache.inlong.audit.service.config.Configuration; +import org.apache.inlong.audit.service.entities.AuditCycle; +import org.apache.inlong.audit.service.entities.CacheKeyEntity; +import org.apache.inlong.audit.service.entities.StatData; +import org.apache.inlong.audit.service.metric.MetricsManager; +import org.apache.inlong.audit.service.utils.CacheUtils; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -36,7 +38,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT; import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS; import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE; @@ -57,7 +58,7 @@ public class AbstractCache { // According to the startTime and endTime of the request parameters, the maximum number of cache keys generated. private static final int MAX_CACHE_KEY_SIZE = 1440; - private final DateTimeFormatter FORMATTER_YYMMDDHHMMSS = DateTimeFormatter.ofPattern(DATE_FORMAT); + private final DateTimeFormatter FORMATTER_YYMMDDHHMMSS = DateTimeFormatter.ofPattern(ConfigConstants.DATE_FORMAT); protected AbstractCache(AuditCycle auditCycle) { cache = Caffeine.newBuilder() @@ -108,10 +109,16 @@ public List getData(String startTime, String endTime, String inlongGro if (null != statData) { result.add(statData); } else { + long currentTimeMillis = System.currentTimeMillis(); + statData = fetchDataFromAuditStorage(cacheKey.getStartTime(), cacheKey.getEndTime(), inlongGroupId, inlongStreamId, auditId, auditTag); result.add(statData); + + MetricsManager.getInstance().addApiMetricNoCache(auditCycle, + System.currentTimeMillis() - currentTimeMillis); + } } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java index d74e162f782..67a7fdf1ff6 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -113,4 +113,10 @@ public class ConfigConstants { public static final String KEY_ENABLE_STAT_AUDIT_DAY = "enable.stat.audit.day"; public static final boolean DEFAULT_ENABLE_STAT_AUDIT_DAY = true; + public static final String KEY_AUDIT_SERVICE_PROMETHEUS_PORT = "audit.service.prometheus.port"; + public static final int DEFAULT_AUDIT_SERVICE_PROMETHEUS_PORT = 10084; + public static final String KEY_AUDIT_SERVICE_METRIC_CLASSNAME = "audit.service.metric.classname"; + public static final String DEFAULT_AUDIT_SERVICE_METRIC_CLASSNAME = + "org.apache.inlong.audit.service.metric.prometheus.ServicePrometheusMetric"; + } diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java index 7df068473a1..67b91c4e95c 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/main/Application.java @@ -15,18 +15,15 @@ * limitations under the License. */ -package org.apache.inlong.audit.main; - -import org.apache.inlong.audit.config.Configuration; -import org.apache.inlong.audit.entities.JdbcConfig; -import org.apache.inlong.audit.selector.api.Selector; -import org.apache.inlong.audit.selector.api.SelectorConfig; -import org.apache.inlong.audit.selector.api.SelectorFactory; -import org.apache.inlong.audit.service.ApiService; -import org.apache.inlong.audit.service.ConfigService; -import org.apache.inlong.audit.service.EtlService; -import org.apache.inlong.audit.service.PartitionManager; -import org.apache.inlong.audit.utils.JdbcUtils; +package org.apache.inlong.audit.service.node; + +import org.apache.inlong.audit.service.config.Configuration; +import org.apache.inlong.audit.service.entities.JdbcConfig; +import org.apache.inlong.audit.service.metric.MetricsManager; +import org.apache.inlong.audit.service.selector.api.Selector; +import org.apache.inlong.audit.service.selector.api.SelectorConfig; +import org.apache.inlong.audit.service.selector.api.SelectorFactory; +import org.apache.inlong.audit.service.utils.JdbcUtils; import org.apache.inlong.common.util.NetworkUtils; import org.slf4j.Logger; @@ -34,10 +31,10 @@ import java.util.UUID; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS; -import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS; -import static org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID; +import static org.apache.inlong.audit.service.config.ConfigConstants.DEFAULT_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS; +import static org.apache.inlong.audit.service.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID; +import static org.apache.inlong.audit.service.config.ConfigConstants.KEY_SELECTOR_FOLLOWER_LISTEN_CYCLE_MS; +import static org.apache.inlong.audit.service.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID; public class Application { @@ -54,6 +51,8 @@ public static void main(String[] args) { PartitionManager.getInstance().start(); + MetricsManager.getInstance().init(); + // Etl service aggregate the data from the data source and store the aggregated data to the target storage etlService.start(); @@ -77,6 +76,7 @@ private static void stopIfKilled() { etlService.stop(); apiService.stop(); selector.close(); + MetricsManager.getInstance().shutdown(); LOGGER.info("Stopping gracefully"); } catch (Exception ex) { LOGGER.error("Stop error: ", ex); diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java index 3fc095f6439..f4b61f2443f 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java @@ -15,19 +15,20 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.service.node; -import org.apache.inlong.audit.cache.AuditProxyCache; -import org.apache.inlong.audit.cache.DayCache; -import org.apache.inlong.audit.cache.HalfHourCache; -import org.apache.inlong.audit.cache.HourCache; -import org.apache.inlong.audit.cache.RealTimeQuery; -import org.apache.inlong.audit.cache.TenMinutesCache; -import org.apache.inlong.audit.config.Configuration; -import org.apache.inlong.audit.entities.ApiType; -import org.apache.inlong.audit.entities.AuditCycle; -import org.apache.inlong.audit.entities.StatData; import org.apache.inlong.audit.entity.AuditProxy; +import org.apache.inlong.audit.service.cache.AuditProxyCache; +import org.apache.inlong.audit.service.cache.DayCache; +import org.apache.inlong.audit.service.cache.HalfHourCache; +import org.apache.inlong.audit.service.cache.HourCache; +import org.apache.inlong.audit.service.cache.RealTimeQuery; +import org.apache.inlong.audit.service.cache.TenMinutesCache; +import org.apache.inlong.audit.service.config.Configuration; +import org.apache.inlong.audit.service.entities.ApiType; +import org.apache.inlong.audit.service.entities.AuditCycle; +import org.apache.inlong.audit.service.entities.StatData; +import org.apache.inlong.audit.service.metric.MetricsManager; import com.google.common.util.concurrent.RateLimiter; import com.google.gson.Gson; @@ -84,12 +85,12 @@ import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_IP; import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_START_TIME; import static org.apache.inlong.audit.consts.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE; -import static org.apache.inlong.audit.entities.ApiType.DAY; -import static org.apache.inlong.audit.entities.ApiType.GET_AUDIT_PROXY; -import static org.apache.inlong.audit.entities.ApiType.GET_IDS; -import static org.apache.inlong.audit.entities.ApiType.GET_IPS; -import static org.apache.inlong.audit.entities.ApiType.HOUR; -import static org.apache.inlong.audit.entities.ApiType.MINUTES; +import static org.apache.inlong.audit.service.entities.ApiType.DAY; +import static org.apache.inlong.audit.service.entities.ApiType.GET_AUDIT_PROXY; +import static org.apache.inlong.audit.service.entities.ApiType.GET_IDS; +import static org.apache.inlong.audit.service.entities.ApiType.GET_IPS; +import static org.apache.inlong.audit.service.entities.ApiType.HOUR; +import static org.apache.inlong.audit.service.entities.ApiType.MINUTES; public class ApiService { @@ -152,6 +153,9 @@ public AuditHandler(ApiType apiType) { @Override public void handle(HttpExchange exchange) { LOGGER.info("handle {}", exchange.getRequestURI().toString()); + + long currentTimeMillis = System.currentTimeMillis(); + if (null != limiter) { limiter.acquire(); } @@ -181,6 +185,8 @@ public void run() { } } }); + + MetricsManager.getInstance().addApiMetric(apiType, System.currentTimeMillis() - currentTimeMillis); } private Map parseRequestURI(String query) { diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricDimension.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricDimension.java new file mode 100644 index 00000000000..8fc005b7dda --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricDimension.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service.metric; + +public enum MetricDimension { + + COUNT("count"), + DURATION("duration"); + + private final String key; + + MetricDimension(String key) { + this.key = key; + } + + public String getKey() { + return key; + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricItem.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricItem.java new file mode 100644 index 00000000000..91515e5f953 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricItem.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service.metric; + +import lombok.Data; + +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +@Data +public class MetricItem { + + private static final String METRIC_ITEM_SPLITERATOR = ";"; + private AtomicLong inLongGroupIdNum = new AtomicLong(0); + private AtomicLong inLongStreamIdNum = new AtomicLong(0); + private ConcurrentHashMap metricStatMap = new ConcurrentHashMap<>(); + + public void resetAllMetrics() { + for (MetricStat entry : metricStatMap.values()) { + entry.getCount().set(0); + entry.getDuration().set(0); + } + } + + public MetricStat getMetricStat(String metricName) { + return metricStatMap.computeIfAbsent(metricName, k -> new MetricStat(new AtomicLong(), new AtomicLong())); + } + + public ConcurrentHashMap getMetricStatMap() { + return metricStatMap; + } + + @Override + public String toString() { + StringJoiner joiner = new StringJoiner(METRIC_ITEM_SPLITERATOR); + for (Map.Entry entry : metricStatMap.entrySet()) { + String stat = entry.getKey() + "[" + entry.getValue() + "]"; + joiner.add(stat); + } + + return joiner.toString(); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricStat.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricStat.java new file mode 100644 index 00000000000..b3d963931cc --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricStat.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service.metric; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.concurrent.atomic.AtomicLong; + +@Data +@AllArgsConstructor +public class MetricStat { + + private AtomicLong count; + private AtomicLong duration; +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricsManager.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricsManager.java new file mode 100644 index 00000000000..f34738d543f --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/MetricsManager.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service.metric; + +import org.apache.inlong.audit.metric.AbstractMetric; +import org.apache.inlong.audit.service.config.Configuration; +import org.apache.inlong.audit.service.entities.ApiType; +import org.apache.inlong.audit.service.entities.AuditCycle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.service.config.ConfigConstants.DEFAULT_AUDIT_SERVICE_METRIC_CLASSNAME; +import static org.apache.inlong.audit.service.config.ConfigConstants.KEY_AUDIT_SERVICE_METRIC_CLASSNAME; + +public class MetricsManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsManager.class); + + private static class Holder { + + private static final MetricsManager INSTANCE = new MetricsManager(); + } + + private AbstractMetric metric; + + public void init() { + try { + String metricClassName = + Configuration.getInstance().get(KEY_AUDIT_SERVICE_METRIC_CLASSNAME, + DEFAULT_AUDIT_SERVICE_METRIC_CLASSNAME); + LOGGER.info("Metric class name: {}", metricClassName); + Constructor constructor = Class.forName(metricClassName) + .getDeclaredConstructor(MetricItem.class); + constructor.setAccessible(true); + metric = (AbstractMetric) constructor.newInstance(metricItem); + + timer.scheduleWithFixedDelay(() -> { + metric.report(); + metricItem.resetAllMetrics(); + }, 0, 1, TimeUnit.MINUTES); + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException exception) { + LOGGER.error("Init metrics manager has exception: ", exception); + } + } + + public static MetricsManager getInstance() { + return Holder.INSTANCE; + } + + private final MetricItem metricItem = new MetricItem(); + protected final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + + public void addApiMetric(ApiType apiType, long duration) { + MetricStat metricStat = metricItem.getMetricStat(apiType.name()); + metricStat.getCount().addAndGet(1); + metricStat.getDuration().addAndGet(duration); + } + + public void addApiMetricNoCache(AuditCycle auditCycle, long duration) { + MetricStat metricStat = metricItem.getMetricStat(String.valueOf(auditCycle.getValue())); + metricStat.getCount().addAndGet(1); + metricStat.getDuration().addAndGet(duration); + } + + public void shutdown() { + timer.shutdown(); + metric.stop(); + } +} diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/prometheus/ServicePrometheusMetric.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/prometheus/ServicePrometheusMetric.java new file mode 100644 index 00000000000..592d3efb65a --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/metric/prometheus/ServicePrometheusMetric.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.service.metric.prometheus; + +import org.apache.inlong.audit.metric.AbstractMetric; +import org.apache.inlong.audit.service.config.Configuration; +import org.apache.inlong.audit.service.metric.MetricDimension; +import org.apache.inlong.audit.service.metric.MetricItem; +import org.apache.inlong.audit.service.metric.MetricStat; + +import io.prometheus.client.Collector; +import io.prometheus.client.exporter.HTTPServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.audit.service.config.ConfigConstants.DEFAULT_AUDIT_SERVICE_PROMETHEUS_PORT; +import static org.apache.inlong.audit.service.config.ConfigConstants.KEY_AUDIT_SERVICE_PROMETHEUS_PORT; +import static org.apache.inlong.audit.service.metric.MetricDimension.COUNT; +import static org.apache.inlong.audit.service.metric.MetricDimension.DURATION; + +/** + * PrometheusMetric + */ +public class ServicePrometheusMetric extends Collector implements AbstractMetric { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServicePrometheusMetric.class); + private static final String HELP_DESCRIPTION = "Audit service metrics help description"; + public static final String AUDIT_SERVICE_SERVER_NAME = "audit-service"; + private static final String METRIC_API_TYPE = "apiType"; + private static final String METRIC_DIMENSION = "dimension"; + + private final MetricItem metricItem; + private HTTPServer server; + + public ServicePrometheusMetric(MetricItem metricItem) { + this.metricItem = metricItem; + try { + server = new HTTPServer(Configuration.getInstance().get(KEY_AUDIT_SERVICE_PROMETHEUS_PORT, + DEFAULT_AUDIT_SERVICE_PROMETHEUS_PORT)); + this.register(); + } catch (IOException e) { + LOGGER.error("Construct store prometheus metric has IOException", e); + } + } + + @Override + public List collect() { + List samples = new ArrayList<>(); + for (Map.Entry entry : metricItem.getMetricStatMap().entrySet()) { + String apiType = entry.getKey(); + MetricStat stat = entry.getValue(); + samples.add(createSample(COUNT, apiType, stat.getCount().get())); + samples.add(createSample(DURATION, apiType, stat.getDuration().get())); + } + + MetricFamilySamples metricFamilySamples = new MetricFamilySamples( + AUDIT_SERVICE_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples); + + return Collections.singletonList(metricFamilySamples); + } + + private MetricFamilySamples.Sample createSample(MetricDimension metricDimension, String apiType, long statValue) { + return new MetricFamilySamples.Sample( + AUDIT_SERVICE_SERVER_NAME, + Arrays.asList(METRIC_API_TYPE, METRIC_DIMENSION), + Arrays.asList(apiType, metricDimension.getKey()), + statValue); + } + + @Override + public void report() { + if (metricItem != null) { + LOGGER.info("Report Service Prometheus metric: {}", metricItem); + } else { + LOGGER.warn("MetricItem is null, nothing to report."); + } + } + + @Override + public void stop() { + if (server != null) { + server.close(); + LOGGER.info("Prometheus HTTP server stopped successfully."); + } else { + LOGGER.warn("Prometheus HTTP server is not running."); + } + } + +} \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java index 6aa60feb6c7..bb72b7a1780 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java @@ -80,12 +80,17 @@ private MetricFamilySamples.Sample createSample(MetricDimension key, double valu @Override public void report() { - LOGGER.info("Report store prometheus metric: {} ", metricItem.toString()); + if (metricItem != null) { + LOGGER.info("Report store Prometheus metric: {}", metricItem); + } else { + LOGGER.warn("MetricItem is null, nothing to report."); + } } @Override public void stop() { - server.close(); + if (server != null) { + server.close(); + } } - } \ No newline at end of file