Skip to content

Commit

Permalink
Enhance the robustness of Ratis linearizable reads for node offline s…
Browse files Browse the repository at this point in the history
…cenarios (apache#11954)

Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>
  • Loading branch information
OneSizeFitsQuorum authored and SzyWilliam committed Nov 22, 2024
1 parent a65ec71 commit 7b33e7a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
/** RaftServer is unable to serve linearizable read requests. */
public class RatisReadUnavailableException extends ConsensusException {

public static final String RATIS_READ_UNAVAILABLE =
"Raft Server cannot serve read requests now (leader is unknown or under recovery). "
+ "Please try read later: ";

public RatisReadUnavailableException(Throwable cause) {
super(
"Raft Server cannot serve read requests now (leader is unknown or under recovery). "
+ "Please try read later: ",
cause);
super(RATIS_READ_UNAVAILABLE, cause);
}

public RatisReadUnavailableException(String cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry)
this.ratisMetricSet = new RatisMetricSet();
this.readRetryPolicy =
RetryPolicy.<RaftClientReply>newBuilder()
.setRetryHandler(c -> !c.isSuccess() && c.getException() instanceof ReadIndexException)
.setRetryHandler(
c ->
!c.isSuccess()
&& (c.getException() instanceof ReadIndexException
|| c.getException() instanceof ReadException
|| c.getException() instanceof NotLeaderException))
.setMaxAttempts(this.config.getImpl().getRetryTimesMax())
.setWaitTime(
TimeDuration.valueOf(
Expand Down Expand Up @@ -347,7 +352,7 @@ public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
if (canServeStaleRead != null && isLinearizableRead) {
canServeStaleRead.get(groupId).set(true);
}
} catch (ReadException | ReadIndexException e) {
} catch (ReadException | ReadIndexException | NotLeaderException e) {
if (isLinearizableRead) {
// linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve
// read requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
Expand Down Expand Up @@ -299,9 +300,14 @@ private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
client.sendFragmentInstance(sendFragmentInstanceReq);
if (!sendFragmentInstanceResp.accepted) {
logger.warn(sendFragmentInstanceResp.message);
throw new FragmentInstanceDispatchException(
RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message));
if (sendFragmentInstanceResp.message.contains(
RatisReadUnavailableException.RATIS_READ_UNAVAILABLE)) {
throw new RatisReadUnavailableException(sendFragmentInstanceResp.message);
} else {
throw new FragmentInstanceDispatchException(
RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message));
}
}
break;
case WRITE:
Expand Down Expand Up @@ -342,9 +348,9 @@ private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown read type [%s]", instance.getType())));
}
} catch (ClientManagerException | TException e) {
} catch (ClientManagerException | TException | RatisReadUnavailableException e) {
logger.warn(
"can't connect to node {}, error msg is {}.",
"can't execute request on node {}, error msg is {}.",
endPoint,
ExceptionUtils.getRootCause(e).toString());
TSStatus status = new TSStatus();
Expand Down

0 comments on commit 7b33e7a

Please sign in to comment.