diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 58f2375449b9c..e947103ffedbb 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -601,7 +601,8 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, - NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + FsHealthService.HEALTHY_TIMEOUT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 1b5a7e0bbd242..a1f526a3a1bd8 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -57,6 +57,8 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -78,6 +80,9 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private final NodeEnvironment nodeEnv; private final LongSupplier currentTimeMillisSupplier; private volatile Scheduler.Cancellable scheduledFuture; + private volatile TimeValue healthyTimeoutThreshold; + private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicBoolean checkInProgress = new AtomicBoolean(); @Nullable private volatile Set unhealthyPaths; @@ -90,7 +95,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH ); public static final Setting REFRESH_INTERVAL_SETTING = Setting.timeSetting( "monitor.fs.health.refresh_interval", - TimeValue.timeValueSeconds(120), + TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), Setting.Property.NodeScope ); @@ -101,6 +106,13 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH Setting.Property.NodeScope, Setting.Property.Dynamic ); + public static final Setting HEALTHY_TIMEOUT_SETTING = Setting.timeSetting( + "monitor.fs.health.healthy_timeout_threshold", + TimeValue.timeValueSeconds(60), + TimeValue.timeValueMillis(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { this.threadPool = threadPool; @@ -108,8 +120,10 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; + this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); this.nodeEnv = nodeEnv; clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); } @@ -134,6 +148,10 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { this.slowPathLoggingThreshold = slowPathLoggingThreshold; } + public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { + this.healthyTimeoutThreshold = healthyTimeoutThreshold; + } + @Override public StatusInfo getHealth() { StatusInfo statusInfo; @@ -142,14 +160,17 @@ public StatusInfo getHealth() { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); - } else if (unhealthyPaths == null) { - statusInfo = new StatusInfo(HEALTHY, "health check passed"); - } else { - String info = "health check failed on [" - + unhealthyPaths.stream().map(k -> k.toString()).collect(Collectors.joining(",")) - + "]"; - statusInfo = new StatusInfo(UNHEALTHY, info); - } + } else if (checkInProgress.get() + && currentTimeMillisSupplier.getAsLong() - lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { + statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); + } else if (unhealthyPaths == null) { + statusInfo = new StatusInfo(HEALTHY, "health check passed"); + } else { + String info = "health check failed on [" + + unhealthyPaths.stream().map(k -> k.toString()).collect(Collectors.joining(",")) + + "]"; + statusInfo = new StatusInfo(UNHEALTHY, info); + } return statusInfo; } @@ -164,13 +185,22 @@ class FsHealthMonitor implements Runnable { @Override public void run() { + boolean checkEnabled = enabled; try { - if (enabled) { + if (checkEnabled) { + setLastRunStartTimeMillis(); + boolean started = checkInProgress.compareAndSet(false, true); + assert started; monitorFSHealth(); logger.debug("health check succeeded"); } } catch (Exception e) { logger.error("health check failed", e); + } finally { + if (checkEnabled) { + boolean completed = checkInProgress.compareAndSet(true, false); + assert completed; + } } } @@ -205,6 +235,18 @@ private void monitorFSHealth() { slowPathLoggingThreshold ); } + if (elapsedTime > healthyTimeoutThreshold.millis()) { + logger.error( + "health check of [{}] failed, took [{}ms] which is above the healthy threshold of [{}]", + path, + elapsedTime, + healthyTimeoutThreshold + ); + if (currentUnhealthyPaths == null) { + currentUnhealthyPaths = new HashSet<>(1); + } + currentUnhealthyPaths.add(path); + } } } catch (Exception ex) { logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex); @@ -218,4 +260,8 @@ private void monitorFSHealth() { brokenLock = false; } } + + private void setLastRunStartTimeMillis() { + lastRunStartTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); + } } diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index ae2434be36f9a..7517e24d555db 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -42,6 +42,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.StatusInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; @@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -172,7 +174,7 @@ public void testLoggingOnHungIO() throws Exception { } // disrupt file system - disruptFileSystemProvider.injectIOException.set(true); + disruptFileSystemProvider.injectIODelay.set(true); fsHealthService.new FsHealthMonitor().run(); assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); assertBusy(mockAppender::assertAllExpectationsMatched); @@ -182,6 +184,66 @@ public void testLoggingOnHungIO() throws Exception { } } + public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { + long healthyTimeoutThreshold = randomLongBetween(500, 1000); + long refreshInterval = randomLongBetween(500, 1000); + long slowLogThreshold = randomLongBetween(100, 200); + long delayBetweenChecks = 100; + final Settings settings = Settings.builder() + .put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") + .put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") + .put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms") + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)// we need to verify exact time + .build(); + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, testThreadPool); + fileSystem = disruptFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + logger.info("--> Initial health status prior to the first monitor run"); + StatusInfo fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> First monitor run"); + fsHealthService.new FsHealthMonitor().run(); + fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> Disrupt file system"); + disruptFileSystemProvider.injectIODelay.set(true); + final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthSrvc.doStart(); + waitUntil( + () -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, + healthyTimeoutThreshold + (2 * refreshInterval), + TimeUnit.MILLISECONDS + ); + fsHealth = fsHealthSrvc.getHealth(); + assertEquals(UNHEALTHY, fsHealth.getStatus()); + assertEquals("healthy threshold breached", fsHealth.getInfo()); + int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); + assertThat(disruptedPathCount, equalTo(1)); + logger.info("--> Fix file system disruption"); + disruptFileSystemProvider.injectIODelay.set(false); + waitUntil( + () -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, + delayBetweenChecks + (4 * refreshInterval), + TimeUnit.MILLISECONDS + ); + fsHealth = fsHealthSrvc.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); + fsHealthSrvc.doStop(); + } finally { + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { FileSystem fileSystem = PathUtils.getDefaultFileSystem(); FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); @@ -349,11 +411,12 @@ public void force(boolean metaData) throws IOException { private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { - AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicBoolean injectIODelay = new AtomicBoolean(); AtomicInteger injectedPaths = new AtomicInteger(); private final long delay; private final ThreadPool threadPool; + private static final long AWAIT_BUSY_THRESHOLD = 100L; FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) { super("disrupt_fs_health://", inner); @@ -361,6 +424,12 @@ private static class FileSystemFsyncHungProvider extends FilterFileSystemProvide this.threadPool = threadPool; } + FileSystemFsyncHungProvider(FileSystem inner, ThreadPool threadPool) { + super("disrupt_fs_health://", inner); + this.threadPool = threadPool; + this.delay = Long.MAX_VALUE; + } + public int getInjectedPathCount() { return injectedPaths.get(); } @@ -370,17 +439,21 @@ public FileChannel newFileChannel(Path path, Set options, return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { - if (injectIOException.get()) { + if (injectIODelay.get()) { if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { injectedPaths.incrementAndGet(); final long startTimeMillis = threadPool.relativeTimeInMillis(); + long timeInMillis = 1; + long maxWaitTimeMillis = startTimeMillis + delay >= 0 ? startTimeMillis + delay : Long.MAX_VALUE;// long + // overflow do { try { - Thread.sleep(delay); + Thread.sleep(timeInMillis); } catch (InterruptedException e) { throw new AssertionError(e); } - } while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay); + timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2); + } while (threadPool.relativeTimeInMillis() <= maxWaitTimeMillis && injectIODelay.get()); } } super.force(metaData);