diff --git a/all/pom.xml b/all/pom.xml index 06a84ec47..4088bf94b 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -282,6 +282,11 @@ sofa-rpc-metrics-micrometer ${project.version} + + com.alipay.sofa + sofa-rpc-metrics-prometheus + ${project.version} + com.alipay.sofa sofa-rpc-doc-swagger @@ -513,6 +518,7 @@ com.alipay.sofa:sofa-rpc-log-common-tools com.alipay.sofa:sofa-rpc-metrics-lookout com.alipay.sofa:sofa-rpc-metrics-micrometer + com.alipay.sofa:sofa-rpc-metrics-prometheus com.alipay.sofa:sofa-rpc-registry-consul com.alipay.sofa:sofa-rpc-registry-local com.alipay.sofa:sofa-rpc-registry-zk diff --git a/bom/pom.xml b/bom/pom.xml index c5b35941f..ca3ee0d7b 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -29,6 +29,7 @@ 1.5.18 7.0 27.0-jre + 0.16.0 3.3.13 0.9.2 @@ -544,6 +545,18 @@ ${polaris.version} test + + + io.prometheus + simpleclient + ${prometheus.client.version} + + + io.prometheus + simpleclient_httpserver + ${prometheus.client.version} + test + diff --git a/metrics/metrics-prometheus/pom.xml b/metrics/metrics-prometheus/pom.xml new file mode 100644 index 000000000..7725ca5b7 --- /dev/null +++ b/metrics/metrics-prometheus/pom.xml @@ -0,0 +1,107 @@ + + + 4.0.0 + + + sofa-rpc-metrics + com.alipay.sofa + ${revision} + + + sofa-rpc-metrics-prometheus + + + + + com.alipay.sofa + sofa-rpc-api + + + + + io.prometheus + simpleclient + + + + + io.prometheus + simpleclient_httpserver + test + + + + junit + junit + test + + + + com.alipay.sofa + sofa-rpc-log + test + + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + **/*Test.java + + once + + + + + \ No newline at end of file diff --git a/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java new file mode 100644 index 000000000..ae5223e70 --- /dev/null +++ b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.metrics.prometheus; + +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; + +public class MetricsBuilder { + public static final String BYTES = "bytes"; + + public static final String TASKS = "tasks"; + + public static final String THREADS = "threads"; + + private Histogram.Builder clientTotalBuilder = Histogram.build(); + + private Histogram.Builder clientFailBuilder = Histogram.build(); + + private Histogram.Builder serverTotalBuilder = Histogram.build(); + + private Histogram.Builder serverFailBuilder = Histogram.build(); + + private Histogram.Builder requestSizeBuilder = Histogram.build(); + + private Histogram.Builder responseSizeBuilder = Histogram.build(); + + private Counter.Builder providerCounterBuilder = Counter.build(); + + private Counter.Builder consumerCounterBuilder = Counter.build(); + + private Gauge.Builder threadPoolConfigCoreBuilder = Gauge.build(); + + private Gauge.Builder threadPoolConfigMaxBuilder = Gauge.build(); + + private Gauge.Builder threadPoolConfigQueueBuilder = Gauge.build(); + + private Gauge.Builder threadPoolActiveBuilder = Gauge.build(); + + private Gauge.Builder threadPoolIdleBuilder = Gauge.build(); + + private Gauge.Builder threadPoolQueueBuilder = Gauge.build(); + + public Histogram.Builder getClientTotalBuilder() { + return clientTotalBuilder; + } + + public Histogram.Builder getClientFailBuilder() { + return clientFailBuilder; + } + + public Histogram.Builder getServerTotalBuilder() { + return serverTotalBuilder; + } + + public Histogram.Builder getServerFailBuilder() { + return serverFailBuilder; + } + + public Histogram.Builder getRequestSizeBuilder() { + return requestSizeBuilder; + } + + public Histogram.Builder getResponseSizeBuilder() { + return responseSizeBuilder; + } + + public Counter.Builder getProviderCounterBuilder() { + return providerCounterBuilder; + } + + public Counter.Builder getConsumerCounterBuilder() { + return consumerCounterBuilder; + } + + public Gauge.Builder getThreadPoolConfigCoreBuilder() { + return threadPoolConfigCoreBuilder; + } + + public Gauge.Builder getThreadPoolConfigMaxBuilder() { + return threadPoolConfigMaxBuilder; + } + + public Gauge.Builder getThreadPoolConfigQueueBuilder() { + return threadPoolConfigQueueBuilder; + } + + public Gauge.Builder getThreadPoolActiveBuilder() { + return threadPoolActiveBuilder; + } + + public Gauge.Builder getThreadPoolIdleBuilder() { + return threadPoolIdleBuilder; + } + + public Gauge.Builder getThreadPoolQueueBuilder() { + return threadPoolQueueBuilder; + } + + Histogram buildClientTotal(String[] labelNames) { + return clientTotalBuilder + .name("sofa_client_total") + .help("sofa_client_total") + .labelNames(labelNames) + .create(); + } + + Histogram buildClientFail(String[] labelNames) { + return clientFailBuilder + .name("sofa_client_fail") + .help("sofa_client_fail") + .labelNames(labelNames) + .create(); + } + + Histogram buildServerTotal(String[] labelNames) { + return serverTotalBuilder + .name("sofa_server_total") + .help("sofa_server_total") + .labelNames(labelNames) + .create(); + } + + Histogram buildServerFail(String[] labelNames) { + return serverFailBuilder + .name("sofa_server_fail") + .help("sofa_server_fail") + .labelNames(labelNames) + .create(); + } + + Histogram buildRequestSize(String[] labelNames) { + return requestSizeBuilder + .name("sofa_request_size") + .help("sofa_request_size") + .unit(BYTES) + .labelNames(labelNames) + .create(); + } + + Histogram buildResponseSize(String[] labelNames) { + return responseSizeBuilder + .name("sofa_response_size") + .help("sofa_response_size") + .unit(BYTES) + .labelNames(labelNames) + .create(); + } + + Counter buildProviderCounter(String[] labelNames) { + return providerCounterBuilder + .name("sofa_provider") + .help("sofa_provider") + .labelNames(labelNames) + .create(); + } + + Counter buildConsumerCounter(String[] labelNames) { + return consumerCounterBuilder + .name("sofa_consumer") + .help("sofa_consumer") + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolConfigCore(String[] labelNames) { + return threadPoolConfigCoreBuilder + .name("sofa_threadpool_config_core") + .help("sofa_threadpool_config_core") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolConfigMax(String[] labelNames) { + return threadPoolConfigMaxBuilder + .name("sofa_threadpool_config_max") + .help("sofa_threadpool_config_max") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolConfigQueue(String[] labelNames) { + return threadPoolConfigQueueBuilder + .name("sofa_threadpool_config_queue") + .help("sofa_threadpool_config_queue") + .unit(TASKS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolActive(String[] labelNames) { + return threadPoolActiveBuilder + .name("sofa_threadpool_active") + .help("sofa_threadpool_active") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolIdle(String[] labelNames) { + return threadPoolIdleBuilder + .name("sofa_threadpool_idle") + .help("sofa_threadpool_idle") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolQueue(String[] labelNames) { + return threadPoolQueueBuilder + .name("sofa_threadpool_queue_size") + .help("sofa_threadpool_queue_size") + .unit(TASKS) + .labelNames(labelNames) + .create(); + } + + public static MetricsBuilder defaultOf() { + return new MetricsBuilder(); + } + +} diff --git a/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java new file mode 100644 index 000000000..4c872bbc6 --- /dev/null +++ b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.metrics.prometheus; + +import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.ClientEndInvokeEvent; +import com.alipay.sofa.rpc.event.ConsumerSubEvent; +import com.alipay.sofa.rpc.event.Event; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.event.ProviderPubEvent; +import com.alipay.sofa.rpc.event.ServerSendEvent; +import com.alipay.sofa.rpc.event.ServerStartedEvent; +import com.alipay.sofa.rpc.event.ServerStoppedEvent; +import com.alipay.sofa.rpc.event.Subscriber; +import io.prometheus.client.Collector; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicReference; + +public class SofaRpcMetricsCollector extends Collector implements AutoCloseable { + + private static final String[] INVOKE_LABEL_NAMES = new String[]{"app", "service", "method", "protocol", "invoke_type", "caller_app"}; + + private String[] commonLabelNames; + private String[] commonLabelValues; + + private PrometheusSubscriber subscriber; + + private Histogram clientTotal; + + private Histogram clientFail; + + private Histogram serverTotal; + + private Histogram serverFail; + + private Histogram requestSize; + + private Histogram responseSize; + + private Counter providerCounter; + + private Counter consumerCounter; + + private Gauge threadPoolConfigCore; + + private Gauge threadPoolConfigMax; + + private Gauge threadPoolConfigQueue; + + private Gauge threadPoolActive; + + private Gauge threadPoolIdle; + + private Gauge threadPoolQueue; + + + private final AtomicReference serverConfigReference = new AtomicReference<>(); + private final AtomicReference executorReference = new AtomicReference<>(); + + public SofaRpcMetricsCollector() { + this(Collections.emptyMap(), MetricsBuilder.defaultOf()); + } + + public SofaRpcMetricsCollector(Map commonLabels) { + this(commonLabels, MetricsBuilder.defaultOf()); + } + + public SofaRpcMetricsCollector(MetricsBuilder metricsBuilder) { + this(Collections.emptyMap(), metricsBuilder); + } + + public SofaRpcMetricsCollector(Map commonLabels, MetricsBuilder metricsBuilder) { + this.commonLabelNames = commonLabels.keySet().toArray(new String[0]); + this.commonLabelValues = commonLabels.values().toArray(new String[0]); + this.subscriber = new PrometheusSubscriber(); + + String[] labelNames; + int clength = commonLabelNames.length; + if (clength == 0) { + labelNames = INVOKE_LABEL_NAMES; + } else { + int ilength = INVOKE_LABEL_NAMES.length; + labelNames = new String[clength + ilength]; + System.arraycopy(commonLabelNames, 0, labelNames, 0, clength); + System.arraycopy(INVOKE_LABEL_NAMES, 0, labelNames, clength, ilength); + } + + this.clientTotal = metricsBuilder.buildClientTotal(labelNames); + this.clientFail = metricsBuilder.buildClientFail(labelNames); + this.serverTotal = metricsBuilder.buildServerTotal(labelNames); + this.serverFail = metricsBuilder.buildServerFail(labelNames); + this.requestSize = metricsBuilder.buildRequestSize(labelNames); + this.responseSize = metricsBuilder.buildResponseSize(labelNames); + + this.providerCounter = metricsBuilder.buildProviderCounter(commonLabelNames); + this.consumerCounter = metricsBuilder.buildConsumerCounter(commonLabelNames); + this.threadPoolConfigCore = metricsBuilder.buildThreadPoolConfigCore(commonLabelNames); + this.threadPoolConfigMax = metricsBuilder.buildThreadPoolConfigMax(commonLabelNames); + this.threadPoolConfigQueue = metricsBuilder.buildThreadPoolConfigQueue(commonLabelNames); + this.threadPoolActive = metricsBuilder.buildThreadPoolActive(commonLabelNames); + this.threadPoolIdle = metricsBuilder.buildThreadPoolIdle(commonLabelNames); + this.threadPoolQueue = metricsBuilder.buildThreadPoolQueue(commonLabelNames); + + registerSubscriber(); + } + + private void registerSubscriber() { + EventBus.register(ClientEndInvokeEvent.class, subscriber); + EventBus.register(ServerSendEvent.class, subscriber); + EventBus.register(ServerStartedEvent.class, subscriber); + EventBus.register(ServerStoppedEvent.class, subscriber); + EventBus.register(ProviderPubEvent.class, subscriber); + EventBus.register(ConsumerSubEvent.class, subscriber); + } + + @Override + public List collect() { + List result = new ArrayList<>(); + result.addAll(clientTotal.collect()); + result.addAll(clientFail.collect()); + result.addAll(serverTotal.collect()); + result.addAll(serverFail.collect()); + result.addAll(requestSize.collect()); + result.addAll(responseSize.collect()); + result.addAll(providerCounter.collect()); + result.addAll(consumerCounter.collect()); + + ServerConfig serverConfig = serverConfigReference.get(); + ThreadPoolExecutor threadPoolExecutor = executorReference.get(); + if (serverConfig != null) { + threadPoolConfigCore.labels(commonLabelValues) + .set(serverConfig.getCoreThreads()); + result.addAll(threadPoolConfigCore.collect()); + + + threadPoolConfigMax.labels(commonLabelValues) + .set(serverConfig.getMaxThreads()); + result.addAll(threadPoolConfigMax.collect()); + + + threadPoolConfigQueue.labels(commonLabelValues) + .set(serverConfig.getQueues()); + result.addAll(threadPoolConfigQueue.collect()); + } + + + if (threadPoolExecutor != null) { + threadPoolActive.labels(commonLabelValues) + .set(threadPoolExecutor.getActiveCount()); + result.addAll(threadPoolActive.collect()); + + threadPoolIdle.labels(commonLabelValues) + .set(threadPoolExecutor.getPoolSize() - threadPoolExecutor.getActiveCount()); + result.addAll(threadPoolIdle.collect()); + + + threadPoolQueue.labels(commonLabelValues) + .set(threadPoolExecutor.getQueue().size()); + result.addAll(threadPoolQueue.collect()); + } + + return result; + } + + @Override + public void close() throws Exception { + EventBus.unRegister(ClientEndInvokeEvent.class, subscriber); + EventBus.unRegister(ServerSendEvent.class, subscriber); + EventBus.unRegister(ServerStartedEvent.class, subscriber); + EventBus.unRegister(ServerStoppedEvent.class, subscriber); + EventBus.unRegister(ProviderPubEvent.class, subscriber); + EventBus.unRegister(ConsumerSubEvent.class, subscriber); + } + + private static Long getLongAvoidNull(Object object) { + if (object == null) { + return null; + } + + if (object instanceof Integer) { + return Long.parseLong(object.toString()); + } + + return (Long) object; + } + + private static String getStringAvoidNull(Object object) { + if (object == null) { + return null; + } + + return (String) object; + } + + + private class PrometheusSubscriber extends Subscriber { + + @Override + public void onEvent(Event event) { + if (event instanceof ClientEndInvokeEvent) { + onEvent((ClientEndInvokeEvent) event); + } else if (event instanceof ServerSendEvent) { + onEvent((ServerSendEvent) event); + } else if (event instanceof ServerStartedEvent) { + onEvent((ServerStartedEvent) event); + } else if (event instanceof ServerStoppedEvent) { + onEvent((ServerStoppedEvent) event); + } else if (event instanceof ProviderPubEvent) { + onEvent((ProviderPubEvent) event); + } else if (event instanceof ConsumerSubEvent) { + onEvent((ConsumerSubEvent) event); + } else { + throw new IllegalArgumentException("unexpected event: " + event); + } + } + + private void onEvent(ClientEndInvokeEvent event) { + InvokeMeta meta = new InvokeMeta( + event.getRequest(), + event.getResponse(), + getLongAvoidNull(RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE)) + ); + long elapsed = meta.elapsed(); + String[] labelValues = meta.labelValues(commonLabelValues); + + clientTotal.labels(labelValues).observe(elapsed); + if (!meta.success()) { + clientFail.labels(labelValues).observe(elapsed); + } + + RpcInternalContext context = RpcInternalContext.getContext(); + requestSize.labels(labelValues) + .observe(getLongAvoidNull(context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE))); + responseSize.labels(labelValues) + .observe(getLongAvoidNull(context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE))); + } + + private void onEvent(ServerSendEvent event) { + InvokeMeta meta = new InvokeMeta( + event.getRequest(), + event.getResponse(), + getLongAvoidNull(RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE)) + ); + long elapsed = meta.elapsed(); + String[] labelValues = meta.labelValues(commonLabelValues); + + serverTotal.labels(labelValues).observe(elapsed); + if (!meta.success()) { + serverFail.labels(labelValues).observe(elapsed); + } + } + + private void onEvent(ServerStartedEvent event) { + serverConfigReference.set(event.getServerConfig()); + executorReference.set(event.getThreadPoolExecutor()); + } + + private void onEvent(ServerStoppedEvent event) { + serverConfigReference.set(null); + executorReference.set(null); + } + + private void onEvent(ProviderPubEvent event) { + providerCounter.labels(commonLabelValues) + .inc(); + } + + private void onEvent(ConsumerSubEvent event) { + consumerCounter.labels(commonLabelValues) + .inc(); + } + } + + private static class InvokeMeta { + + private final SofaRequest request; + private final SofaResponse response; + private final long elapsed; + + private InvokeMeta(SofaRequest request, SofaResponse response, long elapsed) { + this.request = request; + this.response = response; + this.elapsed = elapsed; + } + + public String app() { + return Optional.ofNullable(request.getTargetAppName()).orElse(""); + } + + public String callerApp() { + return Optional.ofNullable(getStringAvoidNull( + request.getRequestProp(RemotingConstants.HEAD_APP_NAME))).orElse(""); + } + + public String service() { + return Optional.ofNullable(request.getTargetServiceUniqueName()).orElse(""); + } + + public String method() { + return Optional.ofNullable(request.getMethodName()).orElse(""); + } + + public String protocol() { + return Optional.ofNullable(getStringAvoidNull( + request.getRequestProp(RemotingConstants.HEAD_PROTOCOL))).orElse(""); + } + + public String invokeType() { + return Optional.ofNullable(request.getInvokeType()).orElse(""); + } + + public long elapsed() { + return elapsed; + } + + public boolean success() { + return response != null + && !response.isError() + && response.getErrorMsg() == null + && (!(response.getAppResponse() instanceof Throwable)); + } + + public String[] labelValues(String[] commonLabelValues) { + String[] labelValues; + String[] invokeLabelValues = new String[]{app(), service(), method(), protocol(), invokeType(), callerApp()}; + int clength = commonLabelValues.length; + if (clength == 0) { + labelValues = invokeLabelValues; + } else { + int ilength = invokeLabelValues.length; + labelValues = new String[clength + ilength]; + System.arraycopy(commonLabelValues, 0, labelValues, 0, clength); + System.arraycopy(invokeLabelValues, 0, labelValues, clength, ilength); + } + return labelValues; + } + } + +} diff --git a/metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java b/metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java new file mode 100644 index 000000000..5ee0fc0e4 --- /dev/null +++ b/metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.metrics.prometheus; + +import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; +import com.alipay.sofa.rpc.core.request.RequestBase; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.ClientEndInvokeEvent; +import com.alipay.sofa.rpc.event.ConsumerSubEvent; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.event.ProviderPubEvent; +import com.alipay.sofa.rpc.event.ServerSendEvent; +import com.alipay.sofa.rpc.event.ServerStartedEvent; +import com.alipay.sofa.rpc.event.ServerStoppedEvent; +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class SofaRpcMetricsCollectorTest { + + @Test + public void testPrometheusMetricsCollect1() throws Exception { + try (SofaRpcMetricsCollector collector = new SofaRpcMetricsCollector()) { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + + SofaRequest request = buildRequest(); + SofaResponse successResponse = buildSuccessResponse(); + SofaResponse failResponse = buildFailResponse(); + RpcInternalContext.getContext() + .setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, 100) + .setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE, 10) + .setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE, 3) + .setAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE, 4); + + List samplesList; + + EventBus.post(new ClientEndInvokeEvent(request, successResponse, null)); + EventBus.post(new ClientEndInvokeEvent(request, failResponse, null)); + + EventBus.post(new ServerSendEvent(request, successResponse, null)); + EventBus.post(new ServerSendEvent(request, failResponse, null)); + + EventBus.post(new ProviderPubEvent(new ProviderConfig<>())); + + EventBus.post(new ConsumerSubEvent(new ConsumerConfig<>())); + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 8); + + ServerConfig serverConfig = new ServerConfig(); + EventBus.post(new ServerStartedEvent(serverConfig, new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()))); + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 14); + + EventBus.post(new ServerStoppedEvent(serverConfig)); + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 8); + +// new HTTPServer(new InetSocketAddress(9000),registry); +// Thread.currentThread().join(); + } + } + + @Test + public void testPrometheusMetricsCollect2() throws Exception { + MetricsBuilder metricsBuilder = new MetricsBuilder(); + // set buckets + metricsBuilder.getClientTotalBuilder() + .exponentialBuckets(1, 2, 15); + metricsBuilder.getClientFailBuilder() + .linearBuckets(0, 5, 15); + + Map testLabels = new HashMap<>(); + testLabels.put("from", "test"); + + try (SofaRpcMetricsCollector collector = new SofaRpcMetricsCollector(testLabels, metricsBuilder)) { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + + SofaRequest request = buildRequest(); + SofaResponse successResponse = buildSuccessResponse(); + SofaResponse failResponse = buildFailResponse(); + RpcInternalContext.getContext() + .setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, 100) + .setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE, 10) + .setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE, 3) + .setAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE, 4); + + List samplesList; + + EventBus.post(new ClientEndInvokeEvent(request, successResponse, null)); + EventBus.post(new ClientEndInvokeEvent(request, failResponse, null)); + + EventBus.post(new ProviderPubEvent(new ProviderConfig<>())); + + EventBus.post(new ConsumerSubEvent(new ConsumerConfig<>())); + + ServerConfig serverConfig = new ServerConfig(); + EventBus.post(new ServerStartedEvent(serverConfig, new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()))); + + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 14); + +// new HTTPServer(new InetSocketAddress(9000),registry); +// Thread.currentThread().join(); + } + } + + private SofaRequest buildRequest() throws NoSuchMethodException { + SofaRequest request = new SofaRequest(); + request.setInterfaceName(TestService.class.getName()); + request.setMethodName("echoStr"); + request.setMethod(TestService.class.getMethod("func")); + request.setMethodArgs(new Object[] {}); + request.setMethodArgSigs(new String[] {}); + request.setTargetServiceUniqueName(TestService.class.getName() + ":1.0"); + request.setTargetAppName("targetApp"); + request.setSerializeType((byte) 11); + request.setTimeout(1024); + request.setInvokeType(RpcConstants.INVOKER_TYPE_SYNC); + request.addRequestProp(RemotingConstants.HEAD_APP_NAME, "app"); + request.addRequestProp(RemotingConstants.HEAD_PROTOCOL, "bolt"); + request.setSofaResponseCallback(new SofaResponseCallback() { + @Override + public void onAppResponse(Object appResponse, String methodName, RequestBase request) { + + } + + @Override + public void onAppException(Throwable throwable, String methodName, RequestBase request) { + + } + + @Override + public void onSofaException(SofaRpcException sofaException, String methodName, RequestBase request) { + + } + }); + return request; + } + + private SofaResponse buildSuccessResponse() { + SofaResponse response = new SofaResponse(); + response.setAppResponse("123"); + return response; + } + + private SofaResponse buildFailResponse() { + SofaResponse response = new SofaResponse(); + response.setAppResponse(new RuntimeException()); + return response; + } + + private static class TestService { + + public String func() { + return null; + } + } +} diff --git a/metrics/pom.xml b/metrics/pom.xml index e4a753544..e50d83a16 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -16,6 +16,7 @@ metrics-lookout metrics-micrometer + metrics-prometheus