diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java index 859665fe1a3c..50d4efd93f90 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java @@ -651,7 +651,7 @@ public static class Builder { private int segmentCacheNumMax = 2; private SizeInBytes segmentCacheSizeMax = SizeInBytes.valueOf("200MB"); private SizeInBytes preallocatedSize = SizeInBytes.valueOf("4MB"); - private SizeInBytes writeBufferSize = SizeInBytes.valueOf("64KB"); + private SizeInBytes writeBufferSize = SizeInBytes.valueOf("8MB"); private int forceSyncNum = 128; private boolean unsafeFlushEnabled = true; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 07f3236735fe..b616b694248b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -81,6 +81,7 @@ import org.apache.ratis.server.DivisionInfo; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedSupplier; @@ -187,6 +188,7 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) RaftServer.newBuilder() .setServerId(myself.getId()) .setProperties(properties) + .setOption(RaftStorage.StartupOption.RECOVER) .setStateMachineRegistry( raftGroupId -> new ApplicationStateMachineProxy( @@ -428,7 +430,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), myself) : group; try (RatisClient client = getRaftClient(clientGroup)) { RaftClientReply reply = - client.getRaftClient().getGroupManagementApi(myself.getId()).add(group); + client.getRaftClient().getGroupManagementApi(myself.getId()).add(group, true); if (!reply.isSuccess()) { throw new RatisRequestFailedException(reply.getException()); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java index 29de306ba840..3b17aba9c9ca 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java @@ -41,6 +41,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.Cache; import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; @@ -300,7 +301,9 @@ public static void initRatisConfig(RaftProperties properties, RatisConfig config RaftServerConfigKeys.Log.setSegmentCacheSizeMax( properties, config.getLog().getSegmentCacheSizeMax()); RaftServerConfigKeys.Log.setPreallocatedSize(properties, config.getLog().getPreallocatedSize()); - RaftServerConfigKeys.Log.setWriteBufferSize(properties, config.getLog().getWriteBufferSize()); + final SizeInBytes writeBufferSize = + SizeInBytes.valueOf(config.getLeaderLogAppender().getBufferByteLimit().getSizeInt() + 8); + RaftServerConfigKeys.Log.setWriteBufferSize(properties, writeBufferSize); RaftServerConfigKeys.Log.setForceSyncNum(properties, config.getLog().getForceSyncNum()); RaftServerConfigKeys.Log.setUnsafeFlushEnabled( properties, config.getLog().isUnsafeFlushEnabled()); @@ -315,6 +318,7 @@ public static void initRatisConfig(RaftProperties properties, RatisConfig config properties, config.getLeaderLogAppender().isInstallSnapshotEnabled()); GrpcConfigKeys.Server.setHeartbeatChannel(properties, true); + GrpcConfigKeys.Server.setLogMessageBatchDuration(properties, TimeDuration.ONE_MINUTE); RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin( properties, config.getRpc().getFirstElectionTimeoutMin()); RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax( diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index be5a436fcf9f..ee2529f41f66 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -131,14 +131,14 @@ public void addMemberToGroup() throws Exception { miniCluster.waitUntilActiveLeaderElectedAndReady(); + doConsensus(0, 10, 20); + for (int i = 0; i < 3; i++) { if (servers.get(i).isLeaderReady(gid)) { Assert.assertEquals( 3, ((TestUtils.IntegerCounter) stateMachines.get(i)).getConfiguration().size()); } } - - doConsensus(0, 10, 20); } @Test diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java index 8dbd880f0dcd..f7a03339c915 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java @@ -233,7 +233,7 @@ public void consistentReadTimeout() throws Exception { // wait until active leader to serve read index requests miniCluster.waitUntilActiveLeaderElected(); - // query during redo: get exception that ratis is under recovery - Assert.assertThrows(RatisReadUnavailableException.class, () -> miniCluster.readThrough(0)); + // query, the result will return since it will be queued and handled by leader once ready + Assert.assertEquals(50, miniCluster.mustRead(0)); } } diff --git a/pom.xml b/pom.xml index 2f62d1808d4a..5cc90c2a37ab 100644 --- a/pom.xml +++ b/pom.xml @@ -151,7 +151,7 @@ is for ensuring the SNAPSHOT will stay available. We should however have the Ratis folks do a new release soon, as releasing with this version is more than sub-ideal. --> - 3.0.0 + 3.0.1 1.0.4 1.1.13 3.5.10