Skip to content

Commit

Permalink
HA: fixed a bug on the distributed re-locking after the coordinator c…
Browse files Browse the repository at this point in the history
…rashes
  • Loading branch information
lvca committed Dec 28, 2016
1 parent c6a3842 commit 01574e2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ public void reassignClustersOwnership(final String iNode, final String databaseN
// REASSIGN CLUSTERS WITHOUT AN OWNER, AVOIDING TO REBALANCE EXISTENT
final ODatabaseDocumentTx database = serverInstance.openDatabase(databaseName, "internal", "internal", null, true);
try {
executeInDistributedDatabaseLock(databaseName, 5000, cfg, new OCallable<Boolean, OModifiableDistributedConfiguration>() {
executeInDistributedDatabaseLock(databaseName, 15000, cfg, new OCallable<Boolean, OModifiableDistributedConfiguration>() {
@Override
public Boolean call(final OModifiableDistributedConfiguration cfg) {
rebalanceClusterOwnership(iNode, database, cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.orientechnologies.orient.server.OSystemDatabase;
import com.orientechnologies.orient.server.distributed.*;
import com.orientechnologies.orient.server.distributed.impl.task.ODistributedLockTask;
import com.orientechnologies.orient.server.distributed.task.ODistributedOperationException;

import java.util.*;

Expand All @@ -43,32 +44,53 @@ public ODistributedLockManagerRequester(final ODistributedServerManager manager)

@Override
public void acquireExclusiveLock(final String resource, final String nodeSource, final long timeout) {
final String coordinator = getCoordinatorServer();
if (coordinator == null || coordinator.equals(manager.getLocalNodeName()))
// NO MASTERS, USE LOCAL SERVER
manager.getLockManagerExecutor().acquireExclusiveLock(resource, manager.getLocalNodeName(), timeout);
else {
// SEND A DISTRIBUTED MSG TO THE COORDINATOR SERVER
final Set<String> servers = new HashSet<String>();
servers.add(coordinator);

ODistributedServerLog.debug(this, manager.getLocalNodeName(), coordinator, ODistributedServerLog.DIRECTION.OUT,
"Server '%s' is acquiring distributed lock on resource '%s'...", nodeSource, resource);
String coordinator = getCoordinatorServer();
while (true) {
if (coordinator == null || coordinator.equals(manager.getLocalNodeName())) {
// NO MASTERS, USE LOCAL SERVER
manager.getLockManagerExecutor().acquireExclusiveLock(resource, manager.getLocalNodeName(), timeout);
break;
} else {
// SEND A DISTRIBUTED MSG TO THE COORDINATOR SERVER
final Set<String> servers = new HashSet<String>();
servers.add(coordinator);

ODistributedServerLog.debug(this, manager.getLocalNodeName(), coordinator, ODistributedServerLog.DIRECTION.OUT,
"Server '%s' is acquiring distributed lock on resource '%s'...", nodeSource, resource);

final ODistributedResponse dResponse = manager
.sendRequest(OSystemDatabase.SYSTEM_DB_NAME, null, servers, new ODistributedLockTask(resource, timeout, true),
manager.getNextMessageIdCounter(), ODistributedRequest.EXECUTION_MODE.RESPONSE, null, null);

if (dResponse == null) {
ODistributedServerLog.warn(this, manager.getLocalNodeName(), coordinator, ODistributedServerLog.DIRECTION.OUT,
"Server '%s' cannot acquire distributed lock on resource '%s' (timeout=%d)...", nodeSource, resource, timeout);

throw new OLockException(
"Server '" + nodeSource + "' cannot acquire exclusive lock on resource '" + resource + "' (timeout=" + timeout + ")");
}

final ODistributedResponse dResponse = manager
.sendRequest(OSystemDatabase.SYSTEM_DB_NAME, null, servers, new ODistributedLockTask(resource, timeout, true),
manager.getNextMessageIdCounter(), ODistributedRequest.EXECUTION_MODE.RESPONSE, null, null);
final Object result = dResponse.getPayload();

if (result instanceof ODistributedOperationException) {
if (manager.getActiveServers().contains(coordinator))
// WAIT ONLY IN THE CASE THE COORDINATOR IS STILL ONLINE
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// IGNORE IT
}

if (!manager.getActiveServers().contains(coordinator)) {
// THE COORDINATOR WAS DOWN DURING THE REQUEST, RETRY WITH ANOTHER COORDINATOR
coordinator = getCoordinatorServer();
continue;
}
} else if (result instanceof RuntimeException)
throw (RuntimeException) result;

if (dResponse == null) {
ODistributedServerLog.warn(this, manager.getLocalNodeName(), coordinator, ODistributedServerLog.DIRECTION.OUT,
"Server '%s' cannot acquire distributed lock on resource '%s' (timeout=%d)...", nodeSource, resource, timeout);
throw new OLockException(
"Server '" + nodeSource + "' cannot acquire exclusive lock on resource '" + resource + "' (timeout=" + timeout + ")");
break;
}

final Object result = dResponse.getPayload();
if (result instanceof RuntimeException)
throw (RuntimeException) result;
}

ODistributedServerLog.debug(this, manager.getLocalNodeName(), coordinator, ODistributedServerLog.DIRECTION.OUT,
Expand Down Expand Up @@ -149,6 +171,7 @@ public void handleUnreachableServer(final String nodeLeftName) {
/**
* Returns the coordinator server name. If it's not available, uses the next available always starting from 0.
*/

protected String getCoordinatorServer() {
if (coordinatorName == null) {
final List<String> sortedServers = new ArrayList<String>(manager.getActiveServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,11 @@ public Boolean call(OModifiableDistributedConfiguration cfg) {
}
});

setDatabaseStatus(nodeLeftName, databaseName, statusOffline ? DB_STATUS.OFFLINE : DB_STATUS.NOT_AVAILABLE);
final DB_STATUS nodeLeftStatus = getDatabaseStatus(nodeLeftName, databaseName);
if (statusOffline && nodeLeftStatus != DB_STATUS.OFFLINE)
setDatabaseStatus(nodeLeftName, databaseName, DB_STATUS.OFFLINE);
else if (!statusOffline && nodeLeftStatus != DB_STATUS.NOT_AVAILABLE)
setDatabaseStatus(nodeLeftName, databaseName, DB_STATUS.NOT_AVAILABLE);

return found;
}
Expand All @@ -1258,6 +1262,10 @@ public synchronized void removeServer(final String nodeLeftName, final boolean r
if (nodeLeftName == null)
return;

final Member member = activeNodes.remove(nodeLeftName);
if (member == null)
return;

ODistributedServerLog
.debug(this, nodeName, nodeLeftName, DIRECTION.NONE, "Distributed server '%s' is unreachable", nodeLeftName);

Expand All @@ -1282,12 +1290,9 @@ public synchronized void removeServer(final String nodeLeftName, final boolean r
messageService.getDatabase(dbName).handleUnreachableNode(nodeLeftName);
}

final Member member = activeNodes.remove(nodeLeftName);
if (member != null) {
if (member.getUuid() != null)
activeNodesNamesByUuid.remove(member.getUuid());
activeNodesUuidByName.remove(nodeLeftName);
}
if (member.getUuid() != null)
activeNodesNamesByUuid.remove(member.getUuid());
activeNodesUuidByName.remove(nodeLeftName);

if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning())
return;
Expand Down Expand Up @@ -1319,7 +1324,8 @@ public void run() {
}

for (String databaseName : getManagedDatabases()) {
if (getDatabaseStatus(nodeLeftName, databaseName) != DB_STATUS.OFFLINE)
final DB_STATUS nodeLeftStatus = getDatabaseStatus(nodeLeftName, databaseName);
if (nodeLeftStatus != DB_STATUS.OFFLINE && nodeLeftStatus != DB_STATUS.NOT_AVAILABLE)
configurationMap.put(CONFIG_DBSTATUS_PREFIX + nodeLeftName + "." + databaseName, DB_STATUS.NOT_AVAILABLE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ public void run() {
try {
boolean isRunning = true;
for (int j = 0; j < 1000000; j++) {
String sql = "SELECT FROM " + className;
// String sql = "SELECT FROM " + className;
try {
// Iterable<OrientVertex> result = graph.command(new OCommandSQL(sql)).execute();
String sql = "SELECT FROM " + className;
// String sql = "DELETE FROM " + className;
// Iterable<OrientVertex> result = graph.command(new OCommandSQL(sql)).execute();
Iterable<Vertex> vtxs = graph.command(new OCommandSQL(sql)).execute();
for (Vertex vtx : vtxs) {
try {
Expand Down

0 comments on commit 01574e2

Please sign in to comment.