From 7b33e7a089b9ea0bb2965416c70fbf0886897a33 Mon Sep 17 00:00:00 2001 From: Potato Date: Tue, 23 Jan 2024 15:54:57 +0800 Subject: [PATCH] Enhance the robustness of Ratis linearizable reads for node offline scenarios (#11954) Signed-off-by: OneSizeFitQuorum --- .../exception/RatisReadUnavailableException.java | 13 +++++++++---- .../iotdb/consensus/ratis/RatisConsensus.java | 9 +++++++-- .../FragmentInstanceDispatcherImpl.java | 16 +++++++++++----- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java index d5ac76b93fe1..f9505495d516 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisReadUnavailableException.java @@ -22,10 +22,15 @@ /** RaftServer is unable to serve linearizable read requests. */ public class RatisReadUnavailableException extends ConsensusException { + public static final String RATIS_READ_UNAVAILABLE = + "Raft Server cannot serve read requests now (leader is unknown or under recovery). " + + "Please try read later: "; + public RatisReadUnavailableException(Throwable cause) { - super( - "Raft Server cannot serve read requests now (leader is unknown or under recovery). " - + "Please try read later: ", - cause); + super(RATIS_READ_UNAVAILABLE, cause); + } + + public RatisReadUnavailableException(String cause) { + super(cause); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 950af3658e1d..c627692b72d6 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -157,7 +157,12 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) this.ratisMetricSet = new RatisMetricSet(); this.readRetryPolicy = RetryPolicy.newBuilder() - .setRetryHandler(c -> !c.isSuccess() && c.getException() instanceof ReadIndexException) + .setRetryHandler( + c -> + !c.isSuccess() + && (c.getException() instanceof ReadIndexException + || c.getException() instanceof ReadException + || c.getException() instanceof NotLeaderException)) .setMaxAttempts(this.config.getImpl().getRetryTimesMax()) .setWaitTime( TimeDuration.valueOf( @@ -347,7 +352,7 @@ public DataSet read(ConsensusGroupId groupId, IConsensusRequest request) if (canServeStaleRead != null && isLinearizableRead) { canServeStaleRead.get(groupId).set(true); } - } catch (ReadException | ReadIndexException e) { + } catch (ReadException | ReadIndexException | NotLeaderException e) { if (isLinearizableRead) { // linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve // read requests. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 08498bcb1413..b48e8201629b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -299,9 +300,14 @@ private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint) client.sendFragmentInstance(sendFragmentInstanceReq); if (!sendFragmentInstanceResp.accepted) { logger.warn(sendFragmentInstanceResp.message); - throw new FragmentInstanceDispatchException( - RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); + if (sendFragmentInstanceResp.message.contains( + RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) { + throw new RatisReadUnavailableException(sendFragmentInstanceResp.message); + } else { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); + } } break; case WRITE: @@ -342,9 +348,9 @@ private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint) TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } - } catch (ClientManagerException | TException e) { + } catch (ClientManagerException | TException | RatisReadUnavailableException e) { logger.warn( - "can't connect to node {}, error msg is {}.", + "can't execute request on node {}, error msg is {}.", endPoint, ExceptionUtils.getRootCause(e).toString()); TSStatus status = new TSStatus();