diff --git a/pom.xml b/pom.xml
index 9b2ace47f..34b3a9e14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -380,6 +380,11 @@
micrometer-registry-statsd
1.5.2
+
+ io.micrometer
+ micrometer-registry-prometheus
+ 1.5.2
+
diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties
index 80dd0f422..ea8eeb30b 100644
--- a/src/main/config/secor.common.properties
+++ b/src/main/config/secor.common.properties
@@ -500,6 +500,7 @@ secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.OstrichM
# secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.MicroMeterMetricCollector
# secor.monitoring.metrics.collector.micrometer.jmx.enabled=true
# secor.monitoring.metrics.collector.micrometer.statsd.enabled=true
+# secor.monitoring.metrics.collector.micrometer.prometheus.enabled=true
# Row group size in bytes for Parquet writers. Specifies how much data will be buffered in memory before flushing a
# block to disk. Larger values allow for larger column chinks which makes it possible to do larger sequential IO.
diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java
index 443a1cf1a..279d684d7 100644
--- a/src/main/java/com/pinterest/secor/common/SecorConfig.java
+++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java
@@ -766,11 +766,19 @@ public String getMetricsCollectorClass() {
}
public boolean getMicroMeterCollectorJmxEnabled() {
- return getBoolean("secor.monitoring.metrics.collector.micrometer.jmx.enabled");
+ return getBoolean("secor.monitoring.metrics.collector.micrometer.jmx.enabled", false);
}
public boolean getMicroMeterCollectorStatsdEnabled() {
- return getBoolean("secor.monitoring.metrics.collector.micrometer.statsd.enabled");
+ return getBoolean("secor.monitoring.metrics.collector.micrometer.statsd.enabled", false);
+ }
+
+ public boolean getMicroMeterCollectorPrometheusEnabled() {
+ return getBoolean("secor.monitoring.metrics.collector.micrometer.prometheus.enabled", false);
+ }
+
+ public int getMicroMeterCacheSize() {
+ return getInt("secor.monitoring.metrics.collector.micrometer.cache.size", 500);
}
/**
diff --git a/src/main/java/com/pinterest/secor/common/OstrichAdminService.java b/src/main/java/com/pinterest/secor/common/monitoring/OstrichAdminService.java
similarity index 84%
rename from src/main/java/com/pinterest/secor/common/OstrichAdminService.java
rename to src/main/java/com/pinterest/secor/common/monitoring/OstrichAdminService.java
index 820b68e84..fa89eb7f5 100644
--- a/src/main/java/com/pinterest/secor/common/OstrichAdminService.java
+++ b/src/main/java/com/pinterest/secor/common/monitoring/OstrichAdminService.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package com.pinterest.secor.common;
+package com.pinterest.secor.common.monitoring;
+import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.util.StatsUtil;
import com.twitter.ostrich.admin.AdminServiceFactory;
import com.twitter.ostrich.admin.CustomHttpHandler;
@@ -30,6 +31,7 @@
import scala.collection.JavaConversions;
import scala.collection.Map$;
import scala.collection.immutable.List$;
+import scala.collection.immutable.Map;
import scala.util.matching.Regex;
import java.util.Arrays;
@@ -44,13 +46,17 @@
public class OstrichAdminService {
private static final Logger LOG = LoggerFactory.getLogger(OstrichAdminService.class);
private final int mPort;
+ private final boolean mPrometheusEnabled;
- public OstrichAdminService(int port) {
- this.mPort = port;
+ public OstrichAdminService(SecorConfig config) {
+ mPort = config.getOstrichPort();
+ mPrometheusEnabled = config.getMicroMeterCollectorPrometheusEnabled();
}
public void start() {
Duration[] defaultLatchIntervals = {Duration.apply(1, TimeUnit.MINUTES)};
+ Map handlers = mPrometheusEnabled ?
+ new Map.Map1<>("/prometheus", new PrometheusHandler()) : Map$.MODULE$.empty();
@SuppressWarnings("deprecation")
AdminServiceFactory adminServiceFactory = new AdminServiceFactory(
this.mPort,
@@ -58,7 +64,7 @@ public void start() {
List$.MODULE$.empty(),
Option.empty(),
List$.MODULE$.empty(),
- Map$.MODULE$.empty(),
+ handlers,
JavaConversions
.asScalaBuffer(Arrays.asList(defaultLatchIntervals)).toList()
);
diff --git a/src/main/java/com/pinterest/secor/common/monitoring/PrometheusHandler.java b/src/main/java/com/pinterest/secor/common/monitoring/PrometheusHandler.java
new file mode 100644
index 000000000..70b8beeae
--- /dev/null
+++ b/src/main/java/com/pinterest/secor/common/monitoring/PrometheusHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.pinterest.secor.common.monitoring;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.twitter.ostrich.admin.CustomHttpHandler;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import org.apache.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Initializes Http Endpoint for Prometheus
+ *
+ * @author Paulius Dambrauskas (p.dambrauskas@gmail.com)
+ */
+public class PrometheusHandler extends CustomHttpHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(PrometheusHandler.class);
+
+ @Override
+ public void handle(HttpExchange exchange) {
+ Optional registry = Metrics.globalRegistry.getRegistries().stream()
+ .filter(meterRegistry -> meterRegistry instanceof PrometheusMeterRegistry)
+ .map(meterRegistry -> (PrometheusMeterRegistry) meterRegistry)
+ .findFirst();
+ if (registry.isPresent()) {
+ this.render(registry.get().scrape(), exchange, HttpStatus.SC_OK);
+ } else {
+ LOG.warn("Trying to scrape prometheus, while it is disabled, " +
+ "set \"secor.monitoring.metrics.collector.micrometer.prometheus.enabled\" to \"true\"");
+ this.render("Not Found", exchange, HttpStatus.SC_NOT_FOUND);
+ }
+ }
+}
diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java
index 2a11bbef5..c5fd1f4d8 100644
--- a/src/main/java/com/pinterest/secor/consumer/Consumer.java
+++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java
@@ -73,8 +73,9 @@ public class Consumer extends Thread {
private volatile boolean mShuttingDown = false;
private static volatile boolean mCallingSystemExit = false;
- public Consumer(SecorConfig config) {
+ public Consumer(SecorConfig config, MetricCollector metricCollector) {
mConfig = config;
+ mMetricCollector = metricCollector;
isLegacyConsumer = true;
}
@@ -89,8 +90,6 @@ private void init() throws Exception {
}
mKafkaMessageIterator = KafkaMessageIteratorFactory.getIterator(mConfig.getKafkaMessageIteratorClass(), mConfig);
mMessageReader = new MessageReader(mConfig, mOffsetTracker, mKafkaMessageIterator);
- mMetricCollector = ReflectionUtil.createMetricCollector(mConfig.getMetricsCollectorClass());
- mMetricCollector.initialize(mConfig);
FileRegistry fileRegistry = new FileRegistry(mConfig);
UploadManager uploadManager = ReflectionUtil.createUploadManager(mConfig.getUploadManagerClass(), mConfig);
diff --git a/src/main/java/com/pinterest/secor/main/ConsumerMain.java b/src/main/java/com/pinterest/secor/main/ConsumerMain.java
index 5ac41c4d9..dfa3aabef 100644
--- a/src/main/java/com/pinterest/secor/main/ConsumerMain.java
+++ b/src/main/java/com/pinterest/secor/main/ConsumerMain.java
@@ -18,15 +18,17 @@
*/
package com.pinterest.secor.main;
-import com.pinterest.secor.common.OstrichAdminService;
+import com.pinterest.secor.common.monitoring.OstrichAdminService;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.ShutdownHookRegistry;
import com.pinterest.secor.consumer.Consumer;
import com.pinterest.secor.io.StagingDirectoryCleaner;
+import com.pinterest.secor.monitoring.MetricCollector;
import com.pinterest.secor.tools.LogFileDeleter;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.RateLimitUtil;
+import com.pinterest.secor.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +61,11 @@ public static void main(String[] args) {
SecorConfig config = SecorConfig.load();
String stagingDirectoryPath = config.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
ShutdownHookRegistry.registerHook(10, new StagingDirectoryCleaner(stagingDirectoryPath));
- OstrichAdminService ostrichService = new OstrichAdminService(config.getOstrichPort());
+
+ MetricCollector metricCollector = ReflectionUtil.createMetricCollector(config.getMetricsCollectorClass());
+ metricCollector.initialize(config);
+
+ OstrichAdminService ostrichService = new OstrichAdminService(config);
ostrichService.start();
FileUtil.configure(config);
@@ -70,7 +76,7 @@ public static void main(String[] args) {
LOG.info("starting {} consumer threads", config.getConsumerThreads());
LinkedList consumers = new LinkedList();
for (int i = 0; i < config.getConsumerThreads(); ++i) {
- Consumer consumer = new Consumer(config);
+ Consumer consumer = new Consumer(config, metricCollector);
consumers.add(consumer);
consumer.start();
}
diff --git a/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java b/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java
index 78884687d..f44f1eea5 100644
--- a/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java
+++ b/src/main/java/com/pinterest/secor/monitoring/MicroMeterMetricCollector.java
@@ -19,26 +19,36 @@
package com.pinterest.secor.monitoring;
import com.pinterest.secor.common.SecorConfig;
-
-import com.google.common.util.concurrent.AtomicDouble;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.jmx.JmxConfig;
import io.micrometer.jmx.JmxMeterRegistry;
+import io.micrometer.prometheus.PrometheusConfig;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.statsd.StatsdConfig;
import io.micrometer.statsd.StatsdMeterRegistry;
-
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* MicorMeter meters can integrate with many different metrics backend
* (StatsD/Promethus/Graphite/JMX etc, see https://micrometer.io/docs)
*/
public class MicroMeterMetricCollector implements MetricCollector {
+ private static final Logger LOG = LoggerFactory.getLogger(MicroMeterMetricCollector.class);
+
+ private final Map mGaugeCache = new HashMap<>();
+ private SecorConfig mConfig;
+
@Override
public void initialize(SecorConfig config) {
+ mConfig = config;
+
if (config.getMicroMeterCollectorStatsdEnabled()) {
MeterRegistry statsdRegistry =
new StatsdMeterRegistry(StatsdConfig.DEFAULT, Clock.SYSTEM);
@@ -49,6 +59,11 @@ public void initialize(SecorConfig config) {
MeterRegistry jmxRegistry = new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM);
Metrics.addRegistry(jmxRegistry);
}
+
+ if (config.getMicroMeterCollectorPrometheusEnabled()) {
+ MeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
+ Metrics.addRegistry(prometheusRegistry);
+ }
}
@Override
@@ -63,13 +78,22 @@ public void increment(String label, int delta, String topic) {
@Override
public void metric(String label, double value, String topic) {
- Metrics.gauge(label, Collections.singletonList(
- Tag.of("topic", topic)), new AtomicDouble(0)).set(value);
+ gauge(label, value, topic);
}
@Override
public void gauge(String label, double value, String topic) {
+ String key = label + "_" + topic;
+ if (!mGaugeCache.containsKey(key) && mGaugeCache.size() >= mConfig.getMicroMeterCacheSize()) {
+ LOG.error("Gauge cache size reached maximum, this may result in inaccurate metrics, "
+ + "you can increase cache size by changing "
+ + "\"secor.monitoring.metrics.collector.micrometer.cache.size\" property.");
+ return;
+ }
+
+ mGaugeCache.put(key, value);
Metrics.gauge(label, Collections.singletonList(
- Tag.of("topic", topic)), new AtomicDouble(0)).set(value);
+ Tag.of("topic", topic)), mGaugeCache, g -> g.get(key));
}
+
}
diff --git a/src/test/java/com/pinterest/secor/monitoring/PrometheusTest.java b/src/test/java/com/pinterest/secor/monitoring/PrometheusTest.java
new file mode 100644
index 000000000..946504780
--- /dev/null
+++ b/src/test/java/com/pinterest/secor/monitoring/PrometheusTest.java
@@ -0,0 +1,41 @@
+package com.pinterest.secor.monitoring;
+
+import com.pinterest.secor.common.SecorConfig;
+import com.pinterest.secor.common.monitoring.PrometheusHandler;
+import com.sun.net.httpserver.HttpExchange;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class PrometheusTest {
+
+ @Test
+ public void testPrometheusIntegration() throws IOException {
+ PropertiesConfiguration properties = new PropertiesConfiguration();
+ properties.addProperty("secor.monitoring.metrics.collector.micrometer.prometheus.enabled", true);
+ SecorConfig config = new SecorConfig(properties);
+ MetricCollector collector = new MicroMeterMetricCollector();
+ collector.initialize(config);
+
+ final List responses = new ArrayList<>();
+ PrometheusHandler handler = new PrometheusHandler() {
+ @Override
+ public void render(String body, HttpExchange exchange, int code) {
+ responses.add(body);
+ }
+ };
+ HttpExchange exchange = mock(HttpExchange.class);
+
+ collector.gauge("test", 1, "topic");
+
+ handler.handle(exchange);
+ assertTrue(responses.get(0).contains("test{topic=\"topic\",} 1.0"));
+ }
+
+}
diff --git a/src/test/java/com/pinterest/secor/performance/PerformanceTest010.java b/src/test/java/com/pinterest/secor/performance/PerformanceTest010.java
index 51cbdad20..c62af9407 100644
--- a/src/test/java/com/pinterest/secor/performance/PerformanceTest010.java
+++ b/src/test/java/com/pinterest/secor/performance/PerformanceTest010.java
@@ -35,7 +35,7 @@
//import com.google.common.collect.Maps;
//import com.pinterest.secor.common.LegacyKafkaClient;
//import com.pinterest.secor.common.OffsetTracker;
-//import com.pinterest.secor.common.OstrichAdminService;
+//import com.pinterest.secor.common.monitoring.OstrichAdminService;
//import com.pinterest.secor.common.SecorConfig;
//import com.pinterest.secor.common.TopicPartition;
//import com.pinterest.secor.consumer.Consumer;
diff --git a/src/test/java/com/pinterest/secor/performance/PerformanceTest08.java b/src/test/java/com/pinterest/secor/performance/PerformanceTest08.java
index c0c76c659..dcfb52c8a 100644
--- a/src/test/java/com/pinterest/secor/performance/PerformanceTest08.java
+++ b/src/test/java/com/pinterest/secor/performance/PerformanceTest08.java
@@ -33,7 +33,7 @@
//import com.google.common.collect.Maps;
//import com.pinterest.secor.common.LegacyKafkaClient;
//import com.pinterest.secor.common.OffsetTracker;
-//import com.pinterest.secor.common.OstrichAdminService;
+//import com.pinterest.secor.common.monitoring.OstrichAdminService;
//import com.pinterest.secor.common.SecorConfig;
//import com.pinterest.secor.common.TopicPartition;
//import com.pinterest.secor.consumer.Consumer;