Skip to content

Commit

Permalink
Fix race condition when reconnect in leader election client. (#2814)
Browse files Browse the repository at this point in the history
Fix race condition when reconnect in leader election client.
  • Loading branch information
xyuanlu authored Jun 26, 2024
1 parent 3055f26 commit 49c29dc
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -167,6 +166,7 @@ private void createParticipantInfo(String leaderPath, LeaderInfo participantInfo
// try to create participant info entry, assuming leader election group node is already there
_metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, participantInfo,
MetaClientInterface.EntryMode.EPHEMERAL);
LOG.info("Participant {} joined leader group {}.", _participant, leaderPath);
} catch (MetaClientNodeExistsException ex) {
throw new ConcurrentModificationException("Already joined leader election group. ", ex);
} catch (MetaClientNoNodeException ex) {
Expand Down Expand Up @@ -261,6 +261,7 @@ private void relinquishLeaderHelper(String leaderPath, Boolean exitLeaderElectio
// deleting ZNode. So that handler in ReElectListener won't recreate the leader node.
if (exitLeaderElectionParticipantPool) {
_leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
LOG.info("Leaving leader election pool {}.", leaderPath);
_metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant);
}
// check if current participant is the leader
Expand All @@ -272,12 +273,13 @@ private void relinquishLeaderHelper(String leaderPath, Boolean exitLeaderElectio
List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion));
//Execute transactional support on operations
List<OpResult> opResults = _metaClient.transactionOP(ops);
LOG.info("Try relinquish leader {}.", leaderPath);
if (opResults.get(0).getType() == ERRORRESULT) {
if (isLeader(leaderPath)) {
// Participant re-elected as leader.
throw new ConcurrentModificationException("Concurrent operation, please retry");
} else {
LOG.info("Someone else is already leader");
LOG.info("Someone else is already leader when relinquishing leadership for path {}.", leaderPath);
}
}
}
Expand Down Expand Up @@ -366,7 +368,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen

@Override
public void close() throws Exception {

LOG.info("Closing leader election client.");
_metaClient.unsubscribeConnectStateChanges(_connectStateListener);

// exit all previous joined leader election groups
Expand Down Expand Up @@ -406,14 +408,20 @@ class ConnectStateListener implements ConnectStateChangeListener {
@Override
public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
MetaClientInterface.ConnectState currentState) throws Exception {
if (prevState == MetaClientInterface.ConnectState.EXPIRED
&& currentState == MetaClientInterface.ConnectState.CONNECTED) {
LOG.info("Connect state changed from {} to {}", prevState, currentState);
if (currentState == MetaClientInterface.ConnectState.CONNECTED) {
// when reconnected, we try to recreate the ephemeral node for participant
for (String leaderPath : _participantInfos.keySet()) {
_metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath),
MetaClientInterface.EntryMode.EPHEMERAL);
try {
LOG.info("Recreate participant node for leaderPath {}.", leaderPath);
_metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + _participant, _participantInfos.get(leaderPath),
MetaClientInterface.EntryMode.EPHEMERAL);
} catch (MetaClientNodeExistsException ex) {
// If reconnected before expire, the ephemeral node is still there.
LOG.info("Participant {} already in leader group {}.", _participant, leaderPath);
}
}
} else if (prevState == MetaClientInterface.ConnectState.DISCONNECTED
&& currentState == MetaClientInterface.ConnectState.CONNECTED) {
// touch leader node to renew session ID
touchLeaderNode();
}
}
Expand All @@ -431,6 +439,7 @@ private void touchLeaderNode() {
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
try {
LOG.info("Try touch leader node for path {}", _leaderGroups);
_metaClient.set(key, tup.left, expectedVersion);
} catch (MetaClientNoNodeException ex) {
LOG.info("leaderPath {} gone when retouch leader node.", key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,22 @@ public static void simulateZkStateReconnected(ZkMetaClient client) throws Interr
zkClient.process(event);
}

/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
* This need to be done in a separate thread to simulate ZkClient eventThread.
*/
public static void simulateZkStateClosedAndReconnect(ZkMetaClient client) throws InterruptedException {
final ZkClient zkClient = client.getZkClient();
WatchedEvent event =
new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Closed,
null);
zkClient.process(event);

Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);

event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected,
null);
zkClient.process(event);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.TestUtil;
import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
Expand Down Expand Up @@ -41,6 +40,7 @@ public void cleanUp() {
}
}


// Test that calling isLeader before client joins LeaderElectionParticipantPool returns false and does not throw NPE
@Test
public void testIsLeaderBeforeJoiningParticipantPool() throws Exception {
Expand Down Expand Up @@ -299,7 +299,7 @@ public void testSessionExpire() throws Exception {
System.out.println("END TestLeaderElection.testSessionExpire");
}

@Test(dependsOnMethods = "testSessionExpire")
@Test (dependsOnMethods = "testSessionExpire")
public void testClientDisconnectAndReconnectBeforeExpire() throws Exception {
System.out.println("START TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
String leaderPath = LEADER_PATH + "/testClientDisconnectAndReconnectBeforeExpire";
Expand Down Expand Up @@ -354,6 +354,34 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea
System.out.println("END TestLeaderElection.testClientDisconnectAndReconnectBeforeExpire");
}

@Test(dependsOnMethods = "testClientDisconnectAndReconnectBeforeExpire")
public void testClientClosedAndReconnectAfterExpire() throws Exception {
System.out.println("START TestLeaderElection.testClientClosedAndReconnectAfterExpire");
String leaderPath = LEADER_PATH + "/testClientClosedAndReconnectAfterExpire";
LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
participantInfo.setSimpleField("Key1", "value1");
LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
participantInfo2.setSimpleField("Key2", "value2");
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);

clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);

// session expire and reconnect
expireSession((ZkMetaClient) clt1.getMetaClient());
// clt1 closed and reconnected
simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());

// when session recreated, participant info node should maintain
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");

((ZkMetaClient<?>) clt1.getMetaClient()).close();
clt2.close();
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
}

private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2)
throws Exception {
clt1.joinLeaderElectionParticipantPool(leaderPath);
Expand Down

0 comments on commit 49c29dc

Please sign in to comment.