From 88d1d65a0d8ad5cd15a9c401ee4290925756e963 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Mon, 7 Feb 2022 23:18:03 +0530 Subject: [PATCH] [Backport] Introduce FS Health HEALTHY threshold to fail stuck node (#1269) * Introduce FS Health HEALTHY threshold to fail stuck node (#1167) This will cause the leader stuck on IO during publication to step down and eventually trigger a leader election. Issue Description --- The publication of cluster state is time bound to 30s by a cluster.publish.timeout settings. If this time is reached before the new cluster state is committed, then the cluster state change is rejected and the leader considers itself to have failed. It stands down and starts trying to elect a new master. There is a bug in leader that when it tries to publish the new cluster state it first tries acquire a lock to flush the new state under a mutex to disk. The same lock is used to cancel the publication on timeout. Below is the state of the timeout scheduler meant to cancel the publication. So essentially if the flushing of cluster state is stuck on IO, so will the cancellation of the publication since both of them share the same mutex. So leader will not step down and effectively block the cluster from making progress. Signed-off-by: Bukhtawar Khan * Fix up settings Signed-off-by: Bukhtawar Khan * Fix up tests Signed-off-by: Bukhtawar Khan * Fix up tests Signed-off-by: Bukhtawar Khan * Fix up tests Signed-off-by: Bukhtawar Khan --- .../common/settings/ClusterSettings.java | 3 +- .../monitor/fs/FsHealthService.java | 66 ++++++++++++--- .../monitor/fs/FsHealthServiceTests.java | 83 +++++++++++++++++-- 3 files changed, 136 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 58f2375449b9c..e947103ffedbb 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -601,7 +601,8 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING, NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, - NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING, + FsHealthService.HEALTHY_TIMEOUT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 1b5a7e0bbd242..a1f526a3a1bd8 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -57,6 +57,8 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -78,6 +80,9 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private final NodeEnvironment nodeEnv; private final LongSupplier currentTimeMillisSupplier; private volatile Scheduler.Cancellable scheduledFuture; + private volatile TimeValue healthyTimeoutThreshold; + private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicBoolean checkInProgress = new AtomicBoolean(); @Nullable private volatile Set unhealthyPaths; @@ -90,7 +95,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH ); public static final Setting REFRESH_INTERVAL_SETTING = Setting.timeSetting( "monitor.fs.health.refresh_interval", - TimeValue.timeValueSeconds(120), + TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), Setting.Property.NodeScope ); @@ -101,6 +106,13 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH Setting.Property.NodeScope, Setting.Property.Dynamic ); + public static final Setting HEALTHY_TIMEOUT_SETTING = Setting.timeSetting( + "monitor.fs.health.healthy_timeout_threshold", + TimeValue.timeValueSeconds(60), + TimeValue.timeValueMillis(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { this.threadPool = threadPool; @@ -108,8 +120,10 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; + this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); this.nodeEnv = nodeEnv; clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); } @@ -134,6 +148,10 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { this.slowPathLoggingThreshold = slowPathLoggingThreshold; } + public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { + this.healthyTimeoutThreshold = healthyTimeoutThreshold; + } + @Override public StatusInfo getHealth() { StatusInfo statusInfo; @@ -142,14 +160,17 @@ public StatusInfo getHealth() { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); - } else if (unhealthyPaths == null) { - statusInfo = new StatusInfo(HEALTHY, "health check passed"); - } else { - String info = "health check failed on [" - + unhealthyPaths.stream().map(k -> k.toString()).collect(Collectors.joining(",")) - + "]"; - statusInfo = new StatusInfo(UNHEALTHY, info); - } + } else if (checkInProgress.get() + && currentTimeMillisSupplier.getAsLong() - lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { + statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); + } else if (unhealthyPaths == null) { + statusInfo = new StatusInfo(HEALTHY, "health check passed"); + } else { + String info = "health check failed on [" + + unhealthyPaths.stream().map(k -> k.toString()).collect(Collectors.joining(",")) + + "]"; + statusInfo = new StatusInfo(UNHEALTHY, info); + } return statusInfo; } @@ -164,13 +185,22 @@ class FsHealthMonitor implements Runnable { @Override public void run() { + boolean checkEnabled = enabled; try { - if (enabled) { + if (checkEnabled) { + setLastRunStartTimeMillis(); + boolean started = checkInProgress.compareAndSet(false, true); + assert started; monitorFSHealth(); logger.debug("health check succeeded"); } } catch (Exception e) { logger.error("health check failed", e); + } finally { + if (checkEnabled) { + boolean completed = checkInProgress.compareAndSet(true, false); + assert completed; + } } } @@ -205,6 +235,18 @@ private void monitorFSHealth() { slowPathLoggingThreshold ); } + if (elapsedTime > healthyTimeoutThreshold.millis()) { + logger.error( + "health check of [{}] failed, took [{}ms] which is above the healthy threshold of [{}]", + path, + elapsedTime, + healthyTimeoutThreshold + ); + if (currentUnhealthyPaths == null) { + currentUnhealthyPaths = new HashSet<>(1); + } + currentUnhealthyPaths.add(path); + } } } catch (Exception ex) { logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex); @@ -218,4 +260,8 @@ private void monitorFSHealth() { brokenLock = false; } } + + private void setLastRunStartTimeMillis() { + lastRunStartTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); + } } diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index ae2434be36f9a..7517e24d555db 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -42,6 +42,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.StatusInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; @@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -172,7 +174,7 @@ public void testLoggingOnHungIO() throws Exception { } // disrupt file system - disruptFileSystemProvider.injectIOException.set(true); + disruptFileSystemProvider.injectIODelay.set(true); fsHealthService.new FsHealthMonitor().run(); assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); assertBusy(mockAppender::assertAllExpectationsMatched); @@ -182,6 +184,66 @@ public void testLoggingOnHungIO() throws Exception { } } + public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { + long healthyTimeoutThreshold = randomLongBetween(500, 1000); + long refreshInterval = randomLongBetween(500, 1000); + long slowLogThreshold = randomLongBetween(100, 200); + long delayBetweenChecks = 100; + final Settings settings = Settings.builder() + .put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") + .put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") + .put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms") + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)// we need to verify exact time + .build(); + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, testThreadPool); + fileSystem = disruptFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + logger.info("--> Initial health status prior to the first monitor run"); + StatusInfo fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> First monitor run"); + fsHealthService.new FsHealthMonitor().run(); + fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> Disrupt file system"); + disruptFileSystemProvider.injectIODelay.set(true); + final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthSrvc.doStart(); + waitUntil( + () -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, + healthyTimeoutThreshold + (2 * refreshInterval), + TimeUnit.MILLISECONDS + ); + fsHealth = fsHealthSrvc.getHealth(); + assertEquals(UNHEALTHY, fsHealth.getStatus()); + assertEquals("healthy threshold breached", fsHealth.getInfo()); + int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); + assertThat(disruptedPathCount, equalTo(1)); + logger.info("--> Fix file system disruption"); + disruptFileSystemProvider.injectIODelay.set(false); + waitUntil( + () -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, + delayBetweenChecks + (4 * refreshInterval), + TimeUnit.MILLISECONDS + ); + fsHealth = fsHealthSrvc.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); + fsHealthSrvc.doStop(); + } finally { + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { FileSystem fileSystem = PathUtils.getDefaultFileSystem(); FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); @@ -349,11 +411,12 @@ public void force(boolean metaData) throws IOException { private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { - AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicBoolean injectIODelay = new AtomicBoolean(); AtomicInteger injectedPaths = new AtomicInteger(); private final long delay; private final ThreadPool threadPool; + private static final long AWAIT_BUSY_THRESHOLD = 100L; FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) { super("disrupt_fs_health://", inner); @@ -361,6 +424,12 @@ private static class FileSystemFsyncHungProvider extends FilterFileSystemProvide this.threadPool = threadPool; } + FileSystemFsyncHungProvider(FileSystem inner, ThreadPool threadPool) { + super("disrupt_fs_health://", inner); + this.threadPool = threadPool; + this.delay = Long.MAX_VALUE; + } + public int getInjectedPathCount() { return injectedPaths.get(); } @@ -370,17 +439,21 @@ public FileChannel newFileChannel(Path path, Set options, return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { - if (injectIOException.get()) { + if (injectIODelay.get()) { if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { injectedPaths.incrementAndGet(); final long startTimeMillis = threadPool.relativeTimeInMillis(); + long timeInMillis = 1; + long maxWaitTimeMillis = startTimeMillis + delay >= 0 ? startTimeMillis + delay : Long.MAX_VALUE;// long + // overflow do { try { - Thread.sleep(delay); + Thread.sleep(timeInMillis); } catch (InterruptedException e) { throw new AssertionError(e); } - } while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay); + timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2); + } while (threadPool.relativeTimeInMillis() <= maxWaitTimeMillis && injectIODelay.get()); } } super.force(metaData);