Skip to content

Commit

Permalink
Fix failing file handler test
Browse files Browse the repository at this point in the history
Signed-off-by: Sruti Parthiban <partsrut@amazon.com>
  • Loading branch information
sruti1312 committed Jul 15, 2021
1 parent 9c79039 commit 0be71fc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
eventLogFileHandler,
MetricsConfiguration.SAMPLING_INTERVAL,
QUEUE_PURGE_INTERVAL_MS,
0,
performanceAnalyzerController)
.scheduleExecutor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@
package org.opensearch.performanceanalyzer.writer;


import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;
import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -49,29 +38,41 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction;
import org.opensearch.performanceanalyzer.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;
import org.opensearch.performanceanalyzer.reader_writer_shared.EventLogFileHandler;

public class EventLogQueueProcessor {
private static final Logger LOG = LogManager.getLogger(EventLogQueueProcessor.class);

private final ScheduledExecutorService writerExecutor = Executors.newScheduledThreadPool(1);
private final int filesCleanupPeriodicityMillis = PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds
private final int filesCleanupPeriodicityMillis =
PluginSettings.instance().getMetricsDeletionInterval(); // defaults to 60seconds
private final EventLogFileHandler eventLogFileHandler;
private final long initialDelayMillis;
private final long purgePeriodicityMillis;
private final PerformanceAnalyzerController controller;
private long lastCleanupTimeBucket;
private long lastTimeBucket;

public EventLogQueueProcessor(
EventLogFileHandler eventLogFileHandler,
long initialDelayMillis,
long purgePeriodicityMillis,
long lastCleanupTimeBucket,
PerformanceAnalyzerController controller) {
this.eventLogFileHandler = eventLogFileHandler;
this.initialDelayMillis = initialDelayMillis;
this.purgePeriodicityMillis = purgePeriodicityMillis;
this.lastCleanupTimeBucket = 0;
this.lastCleanupTimeBucket = lastCleanupTimeBucket;
this.lastTimeBucket = 0;
this.controller = controller;
}
Expand All @@ -83,7 +84,8 @@ public void scheduleExecutor() {
} catch (Exception ex) {
LOG.error("Unable to cleanup lingering files from previous plugin run.", ex);
}
lastCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());
lastCleanupTimeBucket =
PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());

ScheduledFuture<?> futureHandle =
writerExecutor.scheduleAtFixedRate(
Expand Down Expand Up @@ -187,14 +189,21 @@ public void purgeQueueAndPersist() {
}

private void cleanup() {
// Delete Event log files belonging to time bucket older than past filesCleanupPeriod(defaults to 60s)
long currCleanupTimeBucket = PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());
// Delete Event log files belonging to time bucket older than past
// filesCleanupPeriod(defaults to 60s)
long currCleanupTimeBucket =
PerformanceAnalyzerMetrics.getTimeInterval(System.currentTimeMillis());
if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis) {
// Get list of files(time buckets) for purging, considered range : [lastCleanupTimeBucket, currCleanupTimeBucket)
List<String> filesForCleanup = LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket)
.filter(timeMillis -> timeMillis % MetricsConfiguration.SAMPLING_INTERVAL == 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList());
// Get list of files(time buckets) for purging, considered range :
// [lastCleanupTimeBucket, currCleanupTimeBucket)
List<String> filesForCleanup =
LongStream.range(lastCleanupTimeBucket, currCleanupTimeBucket)
.filter(
timeMillis ->
timeMillis % MetricsConfiguration.SAMPLING_INTERVAL
== 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList());
eventLogFileHandler.deleteFiles(Collections.unmodifiableList(filesForCleanup));
lastCleanupTimeBucket = currCleanupTimeBucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void init() {
eventLogFileHandler,
MetricsConfiguration.SAMPLING_INTERVAL,
MetricsConfiguration.SAMPLING_INTERVAL,
System.currentTimeMillis(),
mockController);
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
}
Expand Down

0 comments on commit 0be71fc

Please sign in to comment.