24
24
import java .util .concurrent .TimeUnit ;
25
25
import java .util .concurrent .locks .Lock ;
26
26
import java .util .concurrent .locks .ReentrantLock ;
27
+ import java .util .EnumSet ;
28
+ import java .util .Iterator ;
27
29
28
30
import org .apache .commons .logging .Log ;
29
31
import org .apache .commons .logging .LogFactory ;
30
32
import org .apache .hadoop .metrics2 .sink .timeline .TimelineMetric ;
31
33
import org .apache .hadoop .metrics2 .sink .timeline .TimelineMetrics ;
32
-
33
- import net .sf .ehcache .Cache ;
34
- import net .sf .ehcache .CacheException ;
35
- import net .sf .ehcache .CacheManager ;
36
- import net .sf .ehcache .Ehcache ;
37
- import net .sf .ehcache .Element ;
38
- import net .sf .ehcache .config .CacheConfiguration ;
39
- import net .sf .ehcache .config .PersistenceConfiguration ;
40
- import net .sf .ehcache .config .SizeOfPolicyConfiguration ;
41
- import net .sf .ehcache .event .CacheEventListener ;
42
- import net .sf .ehcache .store .MemoryStoreEvictionPolicy ;
34
+ import org .ehcache .Cache ;
35
+ import org .ehcache .CacheManager ;
36
+ import org .ehcache .config .CacheConfiguration ;
37
+ import org .ehcache .config .builders .CacheConfigurationBuilder ;
38
+ import org .ehcache .config .builders .CacheManagerBuilder ;
39
+ import org .ehcache .config .builders .ResourcePoolsBuilder ;
40
+ import org .ehcache .config .units .EntryUnit ;
41
+ import org .ehcache .event .CacheEvent ;
42
+ import org .ehcache .event .CacheEventListener ;
43
+ import org .ehcache .event .EventType ;
44
+ import org .ehcache .event .EventFiring ;
45
+ import org .ehcache .event .EventOrdering ;
46
+ import org .ehcache .expiry .Expirations ;
43
47
44
48
public class InternalMetricsCache {
45
49
private static final Log LOG = LogFactory .getLog (InternalMetricsCache .class );
46
50
private final String instanceName ;
47
- private final String maxHeapPercent ;
51
+ private final Integer internalCacheEntryCount ;
48
52
private volatile boolean isCacheInitialized = false ;
49
- private Cache cache ;
50
- static final String TIMELINE_METRIC_CACHE_MANAGER_NAME = "internalMetricsCacheManager" ;
53
+ private Cache <InternalMetricCacheKey , InternalMetricCacheValue > cache ;
51
54
private final Lock lock = new ReentrantLock ();
52
55
private static final int LOCK_TIMEOUT_SECONDS = 2 ;
53
56
54
- public InternalMetricsCache (String instanceName , String maxHeapPercent ) {
57
+ public InternalMetricsCache (String instanceName , Integer internalCacheEntryCount ) {
55
58
this .instanceName = instanceName ;
56
- this .maxHeapPercent = maxHeapPercent ;
59
+ this .internalCacheEntryCount = internalCacheEntryCount ;
57
60
initialize ();
58
61
}
59
62
@@ -63,71 +66,49 @@ private void initialize() {
63
66
throw new RuntimeException ("Cannot initialize internal cache twice" );
64
67
}
65
68
66
- System .setProperty ("net.sf.ehcache.skipUpdateCheck" , "true" );
67
- System .setProperty ("net.sf.ehcache.sizeofengine." + TIMELINE_METRIC_CACHE_MANAGER_NAME ,
68
- "org.apache.ambari.metrics.core.timeline.source.cache.InternalMetricsCacheSizeOfEngine" );
69
-
70
- net .sf .ehcache .config .Configuration managerConfig =
71
- new net .sf .ehcache .config .Configuration ();
72
- managerConfig .setName (TIMELINE_METRIC_CACHE_MANAGER_NAME );
73
-
74
- // Set max heap available to the cache manager
75
- managerConfig .setMaxBytesLocalHeap (maxHeapPercent );
76
-
77
- //Create a singleton CacheManager using defaults
78
- CacheManager manager = CacheManager .create (managerConfig );
79
-
80
- LOG .info ("Creating Metrics Cache with maxHeapPercent => " + maxHeapPercent );
69
+ CacheManager manager = CacheManagerBuilder .newCacheManagerBuilder ()
70
+ .build (true );
81
71
82
72
// Create a Cache specifying its configuration.
83
- CacheConfiguration cacheConfiguration = new CacheConfiguration ()
84
- .name (instanceName )
85
- .memoryStoreEvictionPolicy (MemoryStoreEvictionPolicy .LRU )
86
- .sizeOfPolicy (new SizeOfPolicyConfiguration () // Set sizeOf policy to continue on max depth reached - avoid OOM
87
- .maxDepth (10000 )
88
- .maxDepthExceededBehavior (SizeOfPolicyConfiguration .MaxDepthExceededBehavior .CONTINUE ))
89
- .eternal (true ) // infinite time until eviction
90
- .persistence (new PersistenceConfiguration ()
91
- .strategy (PersistenceConfiguration .Strategy .NONE .name ()));
92
-
93
- cache = new Cache (cacheConfiguration );
94
- cache .getCacheEventNotificationService ().registerListener (new InternalCacheEvictionListener ());
73
+ CacheConfiguration <InternalMetricCacheKey , InternalMetricCacheValue > cacheConfig =
74
+ CacheConfigurationBuilder .newCacheConfigurationBuilder (
75
+ InternalMetricCacheKey .class , InternalMetricCacheValue .class ,
76
+ ResourcePoolsBuilder .newResourcePoolsBuilder ().heap (internalCacheEntryCount , EntryUnit .ENTRIES )
77
+ ).withExpiry (Expirations .noExpiration ()).build ();
95
78
96
- LOG . info ( "Registering internal metrics cache with provider: name = " +
97
- cache .getName () + ", guid: " + cache . getGuid ( ));
79
+ cache = manager . createCache ( instanceName , cacheConfig );
80
+ cache .getRuntimeConfiguration (). registerCacheEventListener ( new InternalCacheEvictionListener (), EventOrdering . ORDERED , EventFiring . SYNCHRONOUS , EnumSet . of ( EventType . EVICTED ));
98
81
99
- manager .addCache (cache );
82
+ LOG .info ("Registering internal metrics cache with provider: name = " +
83
+ instanceName );
100
84
101
85
isCacheInitialized = true ;
102
86
}
103
87
104
88
public InternalMetricCacheValue getInternalMetricCacheValue (InternalMetricCacheKey key ) {
105
- Element ele = cache .get (key );
106
- if (ele != null ) {
107
- return (InternalMetricCacheValue ) ele .getObjectValue ();
108
- }
109
- return null ;
89
+ return cache .get (key );
110
90
}
111
91
112
92
public Collection <TimelineMetrics > evictAll () {
113
93
TimelineMetrics metrics = new TimelineMetrics ();
114
94
try {
115
95
if (lock .tryLock (LOCK_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
116
- try {
117
- List keys = cache .getKeys ();
118
- for (Object obj : keys ) {
96
+ try {
97
+ Iterator <Cache .Entry <InternalMetricCacheKey , InternalMetricCacheValue >> iterator = cache .iterator ();
98
+ while (iterator .hasNext ()) {
99
+ Cache .Entry <InternalMetricCacheKey , InternalMetricCacheValue > entry = iterator .next ();
119
100
TimelineMetric metric = new TimelineMetric ();
120
- InternalMetricCacheKey key = ( InternalMetricCacheKey ) obj ;
101
+ InternalMetricCacheKey key = entry . getKey () ;
121
102
metric .setMetricName (key .getMetricName ());
122
103
metric .setAppId (key .getAppId ());
123
104
metric .setInstanceId (key .getInstanceId ());
124
105
metric .setHostName (key .getHostname ());
125
106
metric .setStartTime (key .getStartTime ());
126
- Element ele = cache .get (key );
127
- metric .setMetricValues ((( InternalMetricCacheValue ) ele . getObjectValue ()) .getMetricValues ());
107
+ InternalMetricCacheValue value = cache .get (key );
108
+ metric .setMetricValues (value .getMetricValues ());
128
109
metrics .getMetrics ().add (metric );
110
+ iterator .remove ();
129
111
}
130
- cache .removeAll ();
131
112
} finally {
132
113
lock .unlock ();
133
114
}
@@ -157,14 +138,13 @@ public void putAll(Collection<TimelineMetrics> metrics) {
157
138
timelineMetric .getStartTime ()
158
139
);
159
140
160
- Element ele = cache .get (key );
161
- if (ele != null ) {
162
- InternalMetricCacheValue value = (InternalMetricCacheValue ) ele .getObjectValue ();
141
+ InternalMetricCacheValue value = cache .get (key );
142
+ if (value != null ) {
163
143
value .addMetricValues (timelineMetric .getMetricValues ());
164
144
} else {
165
- InternalMetricCacheValue value = new InternalMetricCacheValue ();
145
+ value = new InternalMetricCacheValue ();
166
146
value .setMetricValues (timelineMetric .getMetricValues ());
167
- cache .put (new Element ( key , value ) );
147
+ cache .put (key , value );
168
148
}
169
149
}
170
150
}
@@ -181,49 +161,14 @@ public void putAll(Collection<TimelineMetrics> metrics) {
181
161
}
182
162
}
183
163
184
- class InternalCacheEvictionListener implements CacheEventListener {
185
-
186
- @ Override
187
- public void notifyElementRemoved (Ehcache cache , Element element ) throws CacheException {
188
- // expected
189
- }
190
-
191
- @ Override
192
- public void notifyElementPut (Ehcache cache , Element element ) throws CacheException {
193
- // do nothing
194
- }
195
-
164
+ class InternalCacheEvictionListener implements CacheEventListener <InternalMetricCacheKey , InternalMetricCacheValue > {
196
165
@ Override
197
- public void notifyElementUpdated (Ehcache cache , Element element ) throws CacheException {
198
- // do nothing
199
- }
200
-
201
- @ Override
202
- public void notifyElementExpired (Ehcache cache , Element element ) {
203
- // do nothing
204
- }
205
-
206
- @ Override
207
- public void notifyElementEvicted (Ehcache cache , Element element ) {
208
- // Bad - Remote endpoint cannot keep up resulting in flooding
209
- InternalMetricCacheKey key = (InternalMetricCacheKey ) element .getObjectKey ();
210
- LOG .warn ("Evicting element from internal metrics cache, metric => " + key
211
- .getMetricName () + ", startTime = " + new Date (key .getStartTime ()));
212
- }
213
-
214
- @ Override
215
- public void notifyRemoveAll (Ehcache cache ) {
216
- // expected
217
- }
218
-
219
- @ Override
220
- public Object clone () throws CloneNotSupportedException {
221
- return null ;
222
- }
223
-
224
- @ Override
225
- public void dispose () {
226
- // do nothing
166
+ public void onEvent (CacheEvent <? extends InternalMetricCacheKey , ? extends InternalMetricCacheValue > event ) {
167
+ if (event .getType () == EventType .EVICTED ) {
168
+ InternalMetricCacheKey key = event .getKey ();
169
+ LOG .warn ("Evicting element from internal metrics cache, metric => " + key
170
+ .getMetricName () + ", startTime = " + new Date (key .getStartTime ()));
171
+ }
227
172
}
228
173
}
229
174
}
0 commit comments