18
18
package org .apache .ambari .server .controller .metrics .timeline .cache ;
19
19
20
20
import java .io .IOException ;
21
+ import java .net .ConnectException ;
21
22
import java .net .SocketTimeoutException ;
22
23
import java .util .concurrent .atomic .AtomicInteger ;
23
24
24
25
import org .apache .hadoop .metrics2 .sink .timeline .TimelineMetrics ;
25
- import org .ehcache .Cache ;
26
- import org .ehcache .core .internal .statistics .DefaultStatisticsService ;
27
- import org .ehcache .core .statistics .CacheStatistics ;
28
- import org .ehcache .spi .loaderwriter .CacheLoadingException ;
29
26
import org .slf4j .Logger ;
30
27
import org .slf4j .LoggerFactory ;
31
28
32
- public class TimelineMetricCache {
33
- private final Cache <TimelineAppMetricCacheKey , TimelineMetricsCacheValue > cache ;
34
- private final DefaultStatisticsService statisticsService ;
35
- private final TimelineMetricCacheEntryFactory cacheEntryFactory ;
36
- public static final String TIMELINE_METRIC_CACHE_INSTANCE_NAME = "timelineMetricCache" ;
29
+ import net .sf .ehcache .CacheException ;
30
+ import net .sf .ehcache .Ehcache ;
31
+ import net .sf .ehcache .Element ;
32
+ import net .sf .ehcache .constructs .blocking .LockTimeoutException ;
33
+ import net .sf .ehcache .constructs .blocking .UpdatingCacheEntryFactory ;
34
+ import net .sf .ehcache .constructs .blocking .UpdatingSelfPopulatingCache ;
35
+ import net .sf .ehcache .statistics .StatisticsGateway ;
36
+
37
+ public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
38
+
37
39
private final static Logger LOG = LoggerFactory .getLogger (TimelineMetricCache .class );
38
40
private static AtomicInteger printCacheStatsCounter = new AtomicInteger (0 );
39
41
40
42
/**
41
- * Creates a TimelineMetricCache .
43
+ * Creates a SelfPopulatingCache .
42
44
*
43
45
* @param cache @Cache
44
- * @param cacheEntryFactory @CacheEntryFactory
45
- * @param statisticsService @DefaultStatisticsService
46
+ * @param factory @CacheEntryFactory
46
47
*/
47
- public TimelineMetricCache (Cache <TimelineAppMetricCacheKey , TimelineMetricsCacheValue > cache , TimelineMetricCacheEntryFactory cacheEntryFactory , DefaultStatisticsService statisticsService ) {
48
- this .cache = cache ;
49
- this .cacheEntryFactory = cacheEntryFactory ;
50
- this .statisticsService = statisticsService ;
48
+ public TimelineMetricCache (Ehcache cache , UpdatingCacheEntryFactory factory ) throws CacheException {
49
+ super (cache , factory );
51
50
}
52
51
53
52
/**
@@ -64,22 +63,26 @@ public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey
64
63
// Make sure key is valid
65
64
validateKey (key );
66
65
67
- TimelineMetricsCacheValue value = null ;
66
+ Element element = null ;
68
67
try {
69
- value = cache .get (key );
70
- } catch (CacheLoadingException cle ) {
71
- Throwable t = cle .getCause ();
72
- if (t instanceof SocketTimeoutException ) {
73
- throw new SocketTimeoutException (t .getMessage ());
74
- }
75
- if (t instanceof IOException ) {
76
- throw new IOException (t .getMessage ());
68
+ element = get (key );
69
+ } catch (LockTimeoutException le ) {
70
+ // Ehcache masks the Socket Timeout to look as a LockTimeout
71
+ Throwable t = le .getCause ();
72
+ if (t instanceof CacheException ) {
73
+ t = t .getCause ();
74
+ if (t instanceof SocketTimeoutException ) {
75
+ throw new SocketTimeoutException (t .getMessage ());
76
+ }
77
+ if (t instanceof ConnectException ) {
78
+ throw new ConnectException (t .getMessage ());
79
+ }
77
80
}
78
- throw cle ;
79
81
}
80
82
81
83
TimelineMetrics timelineMetrics = new TimelineMetrics ();
82
- if (value != null ) {
84
+ if (element != null && element .getObjectValue () != null ) {
85
+ TimelineMetricsCacheValue value = (TimelineMetricsCacheValue ) element .getObjectValue ();
83
86
if (LOG .isDebugEnabled ()) {
84
87
LOG .debug ("Returning value from cache: {}" , value );
85
88
}
@@ -89,21 +92,51 @@ public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey
89
92
if (LOG .isDebugEnabled ()) {
90
93
// Print stats every 100 calls - Note: Supported in debug mode only
91
94
if (printCacheStatsCounter .getAndIncrement () == 0 ) {
92
- CacheStatistics cacheStatistics = statisticsService .getCacheStatistics (TIMELINE_METRIC_CACHE_INSTANCE_NAME );
93
- if (cacheStatistics == null ) {
94
- LOG .warn ("Cache statistics not available." );
95
- return timelineMetrics ;
96
- }
97
- LOG .debug ("Metrics cache stats => \n , Evictions = {}, Expired = {}, Hits = {}, Misses = {}, Hit ratio = {}, Puts = {}" ,
98
- cacheStatistics .getCacheEvictions (), cacheStatistics .getCacheExpirations (), cacheStatistics .getCacheHits (), cacheStatistics .getCacheMisses (), cacheStatistics .getCacheHitPercentage (), cacheStatistics .getCachePuts ()
99
- );
95
+ StatisticsGateway statistics = this .getStatistics ();
96
+ LOG .debug ("Metrics cache stats => \n , Evictions = {}, Expired = {}, Hits = {}, Misses = {}, Hit ratio = {}, Puts = {}, Size in MB = {}" ,
97
+ statistics .cacheEvictedCount (), statistics .cacheExpiredCount (), statistics .cacheHitCount (), statistics .cacheMissCount (), statistics .cacheHitRatio (),
98
+ statistics .cachePutCount (), statistics .getLocalHeapSizeInBytes () / 1048576 );
100
99
} else {
101
100
printCacheStatsCounter .compareAndSet (100 , 0 );
102
101
}
103
102
}
103
+
104
104
return timelineMetrics ;
105
105
}
106
106
107
+ /**
108
+ * Set new time bounds on the cache key so that update can use the new
109
+ * query window. We do this quietly which means regular get/update logic is
110
+ * not invoked.
111
+ */
112
+ @ Override
113
+ public Element get (Object key ) throws LockTimeoutException {
114
+ Element element = this .getQuiet (key );
115
+ if (element != null ) {
116
+ if (LOG .isTraceEnabled ()) {
117
+ LOG .trace ("key : {}" , element .getObjectKey ());
118
+ LOG .trace ("value : {}" , element .getObjectValue ());
119
+ }
120
+
121
+ // Set new time boundaries on the key
122
+ TimelineAppMetricCacheKey existingKey = (TimelineAppMetricCacheKey ) element .getObjectKey ();
123
+
124
+ LOG .debug ("Existing temporal info: {} for : {}" , existingKey .getTemporalInfo (), existingKey .getMetricNames ());
125
+
126
+ TimelineAppMetricCacheKey newKey = (TimelineAppMetricCacheKey ) key ;
127
+ existingKey .setTemporalInfo (newKey .getTemporalInfo ());
128
+
129
+ LOG .debug ("New temporal info: {} for : {}" , newKey .getTemporalInfo (), existingKey .getMetricNames ());
130
+
131
+ if (existingKey .getSpec () == null || !existingKey .getSpec ().equals (newKey .getSpec ())) {
132
+ existingKey .setSpec (newKey .getSpec ());
133
+ LOG .debug ("New spec: {} for : {}" , newKey .getSpec (), existingKey .getMetricNames ());
134
+ }
135
+ }
136
+
137
+ return super .get (key );
138
+ }
139
+
107
140
private void validateKey (TimelineAppMetricCacheKey key ) throws IllegalArgumentException {
108
141
StringBuilder msg = new StringBuilder ("Invalid metric key requested." );
109
142
boolean throwException = false ;
0 commit comments