|
1 | 1 | /*
|
2 |
| - * Copyright 2017 the original author or authors. |
| 2 | + * Copyright 2017-2018 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
16 | 16 |
|
17 | 17 | package org.springframework.cloud.dataflow.metrics.collector;
|
18 | 18 |
|
| 19 | +import java.io.IOException; |
| 20 | +import java.util.List; |
| 21 | +import java.util.stream.Collectors; |
| 22 | + |
| 23 | +import com.fasterxml.jackson.core.type.TypeReference; |
| 24 | +import com.fasterxml.jackson.databind.ObjectMapper; |
19 | 25 | import org.slf4j.Logger;
|
20 | 26 | import org.slf4j.LoggerFactory;
|
21 | 27 |
|
22 | 28 | import org.springframework.cloud.dataflow.metrics.collector.model.ApplicationMetrics;
|
| 29 | +import org.springframework.cloud.dataflow.metrics.collector.model.Metric; |
| 30 | +import org.springframework.cloud.dataflow.metrics.collector.model.MicrometerMetric; |
23 | 31 | import org.springframework.cloud.dataflow.metrics.collector.services.ApplicationMetricsService;
|
24 | 32 | import org.springframework.cloud.stream.annotation.StreamListener;
|
25 | 33 | import org.springframework.cloud.stream.messaging.Sink;
|
26 | 34 | import org.springframework.stereotype.Component;
|
| 35 | +import org.springframework.util.StringUtils; |
27 | 36 |
|
28 | 37 | /**
|
29 |
| - * Adds the incoming {@link ApplicationMetrics} payload into the backend store |
| 38 | + * Adds the incoming {@link ApplicationMetrics} payload into the in memory cache. |
| 39 | + * Supports metrics sent from Spring Cloud Stream 1.x and 2.x applications |
| 40 | + * |
30 | 41 | * @author Vinicius Carvalho
|
| 42 | + * @author Christian Tzolov |
31 | 43 | */
|
32 | 44 | @Component
|
33 | 45 | public class MetricsAggregator {
|
| 46 | + private Logger logger = LoggerFactory.getLogger(MetricsAggregator.class); |
34 | 47 |
|
| 48 | + private ObjectMapper mapper; |
35 | 49 | private ApplicationMetricsService service;
|
36 | 50 |
|
37 |
| - private Logger logger = LoggerFactory.getLogger(MetricsAggregator.class); |
38 | 51 |
|
39 | 52 | public MetricsAggregator(ApplicationMetricsService service) {
|
40 | 53 | this.service = service;
|
| 54 | + this.mapper = new ObjectMapper(); |
41 | 55 | }
|
42 | 56 |
|
| 57 | + private final static class Metric1TypeReference extends TypeReference<ApplicationMetrics<Metric<Number>>> {} |
| 58 | + |
| 59 | + private final static class Metric2TypeReference extends TypeReference<ApplicationMetrics<MicrometerMetric<Number>>> {} |
| 60 | + |
43 | 61 | @StreamListener(Sink.INPUT)
|
44 |
| - public void receive(ApplicationMetrics metrics) { |
| 62 | + public void receive(String metrics) { |
| 63 | + |
| 64 | + ApplicationMetrics<Metric<Double>> applicationMetrics; |
| 65 | + try { |
| 66 | + // Use the "spring.integration.send" metric name as a version discriminator for old and new metrics |
| 67 | + if (StringUtils.hasText(metrics) && metrics.contains("spring.integration.send")) { |
| 68 | + ApplicationMetrics<MicrometerMetric<Number>> applicationMetrics2 = mapper.readValue(metrics, new Metric2TypeReference()); |
| 69 | + applicationMetrics = convertMetric2ToMetric(applicationMetrics2); |
| 70 | + applicationMetrics.getProperties().put(ApplicationMetrics.STREAM_METRICS_VERSION, ApplicationMetrics.METRICS_VERSION_2); |
| 71 | + } |
| 72 | + else { |
| 73 | + applicationMetrics = mapper.readValue(metrics, new Metric1TypeReference()); |
| 74 | + applicationMetrics.getProperties().put(ApplicationMetrics.STREAM_METRICS_VERSION, ApplicationMetrics.METRICS_VERSION_1); |
| 75 | + } |
| 76 | + |
| 77 | + this.processApplicationMetrics(applicationMetrics); |
| 78 | + } |
| 79 | + catch (IOException e) { |
| 80 | + logger.warn("Invalid metrics Json", e); |
| 81 | + } |
| 82 | + |
| 83 | + } |
| 84 | + |
| 85 | + /** |
| 86 | + * Converts the new Micrometer metrics into the previous {@link Metric} |
| 87 | + * format (e.g. to the Spring Boot 1.x actuator metrics) |
| 88 | + * Only the successful Spring Integration channel metrics are filtered in. All other metrics are discarded! |
| 89 | + * @param applicationMetrics2 {@link ApplicationMetrics} with Micrometer Metrics collection |
| 90 | + * @return Returns {@link ApplicationMetrics} with SpringBoot1.5's Metric collection. |
| 91 | + */ |
| 92 | + private ApplicationMetrics<Metric<Double>> convertMetric2ToMetric(ApplicationMetrics<MicrometerMetric<Number>> applicationMetrics2) { |
| 93 | + List<Metric<Double>> metrics = applicationMetrics2.getMetrics().stream() |
| 94 | + .filter(metric -> metric.getId().getName().matches("spring\\.integration\\.send")) |
| 95 | + .filter(metric -> metric.getId().getTag("type").equals("channel")) |
| 96 | + .filter(metric -> metric.getId().getTag("result").equals("success")) |
| 97 | + .map(m2 -> new Metric<>( |
| 98 | + generateOldMetricName(m2), |
| 99 | + m2.getCount().doubleValue() / (applicationMetrics2.getInterval() / 1000), // normalize rate |
| 100 | + m2.getTimestamp())) |
| 101 | + .collect(Collectors.toList()); |
| 102 | + ApplicationMetrics<Metric<Double>> applicationMetrics = new ApplicationMetrics(applicationMetrics2.getName(), metrics); |
| 103 | + applicationMetrics.setCreatedTime(applicationMetrics2.getCreatedTime()); |
| 104 | + applicationMetrics.setProperties(applicationMetrics2.getProperties()); |
| 105 | + return applicationMetrics; |
| 106 | + } |
| 107 | + |
| 108 | + /** |
| 109 | + * Build an hierarchical Metric Ver.1 name from the the multi-dimension (e.g. multi-tag) micrometer Metric |
| 110 | + * @param metric2 Micrometer based metrics |
| 111 | + * @return Returns an hierarchical name compatible with the Metric ver.1 name convention. |
| 112 | + */ |
| 113 | + private String generateOldMetricName(MicrometerMetric<Number> metric2) { |
| 114 | + String oldMetricName = metric2.getId().getName(); |
| 115 | + if (metric2.getId().getName().startsWith("spring.integration.")) { |
| 116 | + String channelName = metric2.getId().getTag("name"); |
| 117 | + String metricResult = metric2.getId().getTag("result"); |
| 118 | + String successSuffix = "success".equals(metricResult) ? "" : "." + metricResult; |
| 119 | + oldMetricName = "integration.channel." + channelName + ".send.mean" + successSuffix; |
| 120 | + } |
| 121 | + return oldMetricName; |
| 122 | + } |
| 123 | + |
| 124 | + private void processApplicationMetrics(ApplicationMetrics<Metric<Double>> metrics) { |
45 | 125 | if (metrics.getProperties().get(ApplicationMetrics.APPLICATION_GUID) != null
|
46 | 126 | && metrics.getProperties().get(ApplicationMetrics.APPLICATION_NAME) != null
|
47 | 127 | && metrics.getProperties().get(ApplicationMetrics.STREAM_NAME) != null) {
|
48 | 128 | this.service.add(metrics);
|
49 |
| - }else{ |
50 |
| - if(logger.isDebugEnabled()){ |
51 |
| - logger.debug("Metric : {} is missing key properties and will not be consumed by the collector",metrics.getName()); |
| 129 | + } |
| 130 | + else { |
| 131 | + if (logger.isDebugEnabled()) { |
| 132 | + logger.debug("Metric : {} is missing key properties and will not be consumed by the collector", metrics.getName()); |
52 | 133 | }
|
53 | 134 | }
|
54 | 135 | }
|
55 |
| - |
56 | 136 | }
|
0 commit comments