Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Event Log file cleanup issue #30

Merged
merged 7 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@

public enum StatExceptionCode {
TOTAL_ERROR("TotalError"),
METRICS_WRITE_ERROR("MetricsWriteError"),
METRICS_REMOVE_ERROR("MetricsRemoveError"),

// Tracks the number of VM attach/dataDump or detach failures.
JVM_ATTACH_ERROR("JvmAttachErrror"),

// This error is thrown if the java_pid file is missing.
JVM_ATTACH_ERROR_JAVA_PID_FILE_MISSING("JvmAttachErrorJavaPidFileMissing"),

// The lock could not be acquired within the timeout.
JVM_ATTACH_LOCK_ACQUISITION_FAILED("JvmAttachLockAcquisitionFailed"),

// ThreadState could not be found for an OpenSearch thread in the critical OpenSearch path.
NO_THREAD_STATE_INFO("NoThreadStateInfo"),

// This metric indicates that we successfully completed a thread-dump. Likewise,
// an omission of this should indicate that the thread taking the dump got stuck.
JVM_THREAD_DUMP_SUCCESSFUL("JvmThreadDumpSuccessful"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import org.opensearch.performanceanalyzer.collectors.DisksCollector;
import org.opensearch.performanceanalyzer.collectors.GCInfoCollector;
import org.opensearch.performanceanalyzer.collectors.HeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.MetricsPurgeActivity;
import org.opensearch.performanceanalyzer.collectors.MountedPartitionMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.NetworkE2ECollector;
import org.opensearch.performanceanalyzer.collectors.NetworkInterfaceCollector;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.jvm.GCMetrics;
import org.opensearch.performanceanalyzer.jvm.HeapMetrics;
import org.opensearch.performanceanalyzer.jvm.ThreadList;
Expand All @@ -50,26 +48,22 @@ public class MetricsConfiguration {
public static final int SAMPLING_INTERVAL = 5000;
public static final int ROTATION_INTERVAL = 30000;
public static final int STATS_ROTATION_INTERVAL = 60000;
public static final int DELETION_INTERVAL =
PluginSettings.instance().getMetricsDeletionInterval();

public static class MetricConfig {
public int samplingInterval;
public int rotationInterval;
public int deletionInterval;

public MetricConfig(int samplingInterval, int rotationInterval, int deletionInterval) {
public MetricConfig(int samplingInterval, int rotationInterval) {
this.samplingInterval = samplingInterval;
this.rotationInterval = rotationInterval;
this.deletionInterval = deletionInterval;
}
}

public static final Map<Class, MetricConfig> CONFIG_MAP = new HashMap<>();
public static final MetricConfig cdefault;

static {
cdefault = new MetricConfig(SAMPLING_INTERVAL, 0, 0);
cdefault = new MetricConfig(SAMPLING_INTERVAL, 0);

CONFIG_MAP.put(ThreadCPU.class, cdefault);
CONFIG_MAP.put(ThreadDiskIO.class, cdefault);
Expand All @@ -80,11 +74,8 @@ public MetricConfig(int samplingInterval, int rotationInterval, int deletionInte
CONFIG_MAP.put(NetworkE2ECollector.class, cdefault);
CONFIG_MAP.put(NetworkInterfaceCollector.class, cdefault);
CONFIG_MAP.put(OSGlobals.class, cdefault);
CONFIG_MAP.put(PerformanceAnalyzerMetrics.class, new MetricConfig(0, ROTATION_INTERVAL, 0));
CONFIG_MAP.put(
MetricsPurgeActivity.class,
new MetricConfig(ROTATION_INTERVAL, 0, DELETION_INTERVAL));
CONFIG_MAP.put(StatsCollector.class, new MetricConfig(STATS_ROTATION_INTERVAL, 0, 0));
CONFIG_MAP.put(PerformanceAnalyzerMetrics.class, new MetricConfig(0, ROTATION_INTERVAL));
CONFIG_MAP.put(StatsCollector.class, new MetricConfig(STATS_ROTATION_INTERVAL, 0));
CONFIG_MAP.put(DisksCollector.class, cdefault);
CONFIG_MAP.put(HeapMetricsCollector.class, cdefault);
CONFIG_MAP.put(GCInfoCollector.class, cdefault);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import java.nio.file.Paths;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.config.PluginSettings;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader_writer_shared.Event;

@SuppressWarnings("checkstyle:constantname")
Expand Down Expand Up @@ -139,7 +139,8 @@ public static void addMetricEntry(StringBuilder value, String metricKey, long me

private static void emitMetric(BlockingQueue<Event> q, Event entry) {
if (!q.offer(entry)) {
// TODO: Emit a metric here.
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.METRICS_WRITE_ERROR, entry.key, 1);
LOG.debug("Could not enter metric {}", entry);
}
}
Expand Down Expand Up @@ -199,15 +200,16 @@ public static void removeMetrics(File keyPathFile) {
LOG.debug("Purge Could not delete file {}", keyPathFile);
}
} catch (Exception ex) {
StatsCollector.instance().logException(StatExceptionCode.METRICS_REMOVE_ERROR);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.METRICS_REMOVE_ERROR, "", 1);
LOG.debug(
(Supplier<?>)
() ->
new ParameterizedMessage(
"Error in deleting file: {} for keyPath:{} with ExceptionCode: {}",
ex.toString(),
keyPathFile.getAbsolutePath(),
StatExceptionCode.METRICS_REMOVE_ERROR.toString()),
WriterMetrics.METRICS_REMOVE_ERROR.toString()),
ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@
import org.opensearch.performanceanalyzer.rca.stats.measurements.MeasurementSet;

public enum WriterMetrics implements MeasurementSet {
/** Measures the time spent in deleting the event log files */
EVENT_LOG_FILES_DELETION_TIME(
"EventLogFilesDeletionTime",
"millis",
Arrays.asList(Statistics.MAX, Statistics.MEAN, Statistics.SUM)),
/** Measures the count of event log files deleted */
EVENT_LOG_FILES_DELETED(
"EventLogFilesDeleted", "count", Arrays.asList(Statistics.MAX, Statistics.SUM)),

SHARD_STATE_COLLECTOR_EXECUTION_TIME(
"ShardStateCollectorExecutionTime",
"millis",
Expand Down Expand Up @@ -145,6 +154,15 @@ public enum WriterMetrics implements MeasurementSet {
Statistics.SUM)),

STALE_METRICS("StaleMetrics", "count", Arrays.asList(Statistics.COUNT)),

METRICS_WRITE_ERROR(
"MetricsWriteError",
"namedCount",
Collections.singletonList(Statistics.NAMED_COUNTERS)),

METRICS_REMOVE_ERROR("MetricsRemoveError", "count", Arrays.asList(Statistics.COUNT)),

METRICS_REMOVE_FAILURE("MetricsRemoveFailure", "count", Arrays.asList(Statistics.COUNT)),
khushbr marked this conversation as resolved.
Show resolved Hide resolved
;

/** What we want to appear as the metric name. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.core.Util;
import org.opensearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import org.opensearch.performanceanalyzer.reader.EventDispatcher;

public class EventLogFileHandler {
Expand Down Expand Up @@ -78,7 +82,7 @@ public void writeTmpFile(List<Event> dataEntries, long epoch) {
* data.
*
* <p>If any of the above steps fail, then the tmp file is not deleted from the filesystem. This
* is fine as the MetricsPurgeActivity, will eventually clean it. The copies are atomic and
* is fine as the {@link deleteFiles()}, will eventually clean it. The copies are atomic and
* therefore the reader never reads incompletely written file.
*
* @param dataEntries The metrics to be written to file.
Expand Down Expand Up @@ -166,4 +170,34 @@ private void readInternal(Path pathToFile, int bufferSize, EventDispatcher proce
LOG.error("Error reading file", ex);
}
}

public void deleteAllFiles() {
LOG.debug("Cleaning up any leftover files.");
File root = new File(metricsLocation);
// Filter out '.tmp' files, we do not want to delete currBucket .tmp files
String[] filesToDelete = root.list((dir, name) -> !name.endsWith(TMP_FILE_EXT));
deleteFiles(Arrays.asList(filesToDelete));
}

public void deleteFiles(List<String> filesToDelete) {
LOG.debug("Starting to delete old writer files");
long startTime = System.currentTimeMillis();

if (filesToDelete == null) {
return;
}
int filesDeletedCount = 0;
File root = new File(metricsLocation);
for (String fileToDelete : filesToDelete) {
File file = new File(root, fileToDelete);
PerformanceAnalyzerMetrics.removeMetrics(file);
filesDeletedCount += 1;
}
long duration = System.currentTimeMillis() - startTime;
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETION_TIME, "", duration);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.EVENT_LOG_FILES_DELETED, "", filesDeletedCount);
LOG.debug("'{}' Old writer files cleaned up.", filesDeletedCount);
}
}