diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index f0269c37..40966960 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -269,6 +269,7 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa eventLogFileHandler, MetricsConfiguration.SAMPLING_INTERVAL, QUEUE_PURGE_INTERVAL_MS, + 0, performanceAnalyzerController) .scheduleExecutor(); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java index 139c5889..fc951d3b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/writer/EventLogQueueProcessor.java @@ -27,17 +27,6 @@ package org.opensearch.performanceanalyzer.writer; -import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp; -import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; -import org.opensearch.performanceanalyzer.config.PluginSettings; -import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction; -import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration; -import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; -import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; -import org.opensearch.performanceanalyzer.reader_writer_shared.Event; -import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler; -import org.apache.logging.log4j.Logger; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,29 +38,41 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; - import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.config.PluginSettings; +import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction; +import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; +import org.opensearch.performanceanalyzer.reader_writer_shared.Event; +import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler; public class EventLogQueueProcessor { private static final Logger LOG = LogManager.getLogger(EventLogQueueProcessor.class); private final ScheduledExecutorService writerExecutor = Executors.newScheduledThreadPool(1); - private final int filesCleanupPeriodicityMillis = PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds + private final int filesCleanupPeriodicityMillis = + PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds private final EventLogFileHandler eventLogFileHandler; private final long initialDelayMillis; private final long purgePeriodicityMillis; private final PerformanceAnalyzerController controller; private long lastCleanupTimeBucket; private long lastTimeBucket; + public EventLogQueueProcessor( EventLogFileHandler eventLogFileHandler, long initialDelayMillis, long purgePeriodicityMillis, + long lastCleanupTimeBucket, PerformanceAnalyzerController controller) { this.eventLogFileHandler = eventLogFileHandler; this.initialDelayMillis = initialDelayMillis; this.purgePeriodicityMillis = purgePeriodicityMillis; - this.lastCleanupTimeBucket = 0; + this.lastCleanupTimeBucket = lastCleanupTimeBucket; this.lastTimeBucket = 0; this.controller = controller; } @@ -83,7 +84,8 @@ public void scheduleExecutor() { } catch (Exception ex) { LOG.error("Unable to cleanup lingering files from previous plugin run.", ex); } - lastCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); + lastCleanupTimeBucket = + PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); ScheduledFuture futureHandle = writerExecutor.scheduleAtFixedRate( @@ -187,14 +189,21 @@ public void purgeQueueAndPersist() { } private void cleanup() { - // Delete Event log files belonging to time bucket older than past filesCleanupPeriod(defaults to 60s) - long currCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); + // Delete Event log files belonging to time bucket older than past + // filesCleanupPeriod(defaults to 60s) + long currCleanupTimeBucket = + PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis()); if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) { - // Get list of files(time buckets) for purging, considered range : [lastCleanupTimeBucket, currCleanupTimeBucket) - List filesForCleanup = LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket) - .filter(timeMillis -> timeMillis % MetricsConfiguration.SAMPLING_INTERVAL == 0) - .mapToObj(String::valueOf) - .collect(Collectors.toList()); + // Get list of files(time buckets) for purging, considered range : + // [lastCleanupTimeBucket, currCleanupTimeBucket) + List filesForCleanup = + LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket) + .filter( + timeMillis -> + timeMillis % MetricsConfiguration.SAMPLING_INTERVAL + == 0) + .mapToObj(String::valueOf) + .collect(Collectors.toList()); eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup)); lastCleanupTimeBucket = currCleanupTimeBucket; } diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader_writer_shared/EventLogFileHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/reader_writer_shared/EventLogFileHandlerTests.java index 96249aef..d9b7cfe5 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/reader_writer_shared/EventLogFileHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/reader_writer_shared/EventLogFileHandlerTests.java @@ -71,6 +71,7 @@ public void init() { eventLogFileHandler, MetricsConfiguration.SAMPLING_INTERVAL, MetricsConfiguration.SAMPLING_INTERVAL, + System.currentTimeMillis(), mockController); System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); }