Skip to content

Commit

Permalink
HA: fixed synchronisation problems and improved tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Dec 15, 2016
1 parent 8df50fb commit 4899db1
Show file tree
Hide file tree
Showing 35 changed files with 794 additions and 664 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,10 @@ public void change(final Object iCurrentValue, final Object iNewValue) {
// DISTRIBUTED

DISTRIBUTED_CRUD_TASK_SYNCH_TIMEOUT("distributed.crudTaskTimeout", "Maximum timeout (in ms) to wait for CRUD remote tasks",
Long.class, 3000l, true),
Long.class, 10000l, true),

DISTRIBUTED_MAX_STARTUP_DELAY("distributed.maxStartupDelay", "Maximum delay time (in ms) to wait for a server to start",
Long.class, 10000l, true),

DISTRIBUTED_COMMAND_TASK_SYNCH_TIMEOUT("distributed.commandTaskTimeout",
"Maximum timeout (in ms) to wait for command distributed tasks", Long.class, 2 * 60 * 1000l, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,11 @@ public static <T> Object executeAsDefault(final Callable<T> iCallback) {
public void setRunMode(final RUN_MODE value) {
final RunContext context = get();
context.runMode = value;
super.set(context);
}

public void setInDatabaseLock(final boolean value) {
final RunContext context = get();
context.inDatabaseLock = value;
super.set(context);
}

public RUN_MODE getRunMode() {
Expand All @@ -123,10 +121,7 @@ public boolean isInDatabaseLock() {
}

@Override
public RunContext get() {
RunContext result = super.get();
if (result == null)
result = new RunContext();
return result;
protected RunContext initialValue() {
return new RunContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ public void create() {
@Override
public void close() {
classes.clear();
clustersToClasses.clear();
blobClusters.clear();
properties.clear();
document.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public Object execute(final Map<Object, Object> iArgs) {
}
cls.set(attribute, value);

return null;
return Boolean.TRUE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void oldParsing(OCommandRequestText iRequest) {
}

@Override public long getDistributedTimeout() {
if (className != null)
if (className != null && getDatabase().getMetadata().getSchema().existsClass(className))
return OGlobalConfiguration.DISTRIBUTED_COMMAND_QUICK_TASK_SYNCH_TIMEOUT.getValueAsLong() + (2 * getDatabase()
.countClass(className));

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.server.distributed.ODistributedRequest;
import com.orientechnologies.orient.server.distributed.ODistributedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
import com.orientechnologies.orient.server.distributed.*;
import com.orientechnologies.orient.server.distributed.impl.task.OHeartbeatTask;
import com.orientechnologies.orient.server.distributed.task.ODatabaseIsOldException;
import com.orientechnologies.orient.server.distributed.task.ODistributedOperationException;
Expand Down Expand Up @@ -131,16 +128,17 @@ private void checkServerStatus() {
"Trying to recover current server for database '%s'...", dbName);

try {
final boolean result = manager.installDatabase(true, dbName,
((ODistributedStorage) manager.getStorage(dbName)).getDistributedConfiguration().getDocument(), false, true);

if (result)
ODistributedServerLog.info(this, manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE,
"Recover complete for database '%s'...", dbName);
else
ODistributedServerLog.info(this, manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE,
"Recover cannot be completed for database '%s'...", dbName);

final ODistributedConfiguration dCfg = ((ODistributedStorage) manager.getStorage(dbName)).getDistributedConfiguration();
if (dCfg != null) {
final boolean result = manager.installDatabase(true, dbName, dCfg.getDocument(), false, true);

if (result)
ODistributedServerLog.info(this, manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE,
"Recover complete for database '%s'...", dbName);
else
ODistributedServerLog.info(this, manager.getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE,
"Recover cannot be completed for database '%s'...", dbName);
}
} catch (ODatabaseIsOldException e) {
// CURRENT DATABASE IS NEWER, SET ALL OTHER DATABASES AS NOT_AVAILABLE TO FORCE THEM TO ASK FOR THE CURRENT DATABASE
manager.setDatabaseStatus(manager.getLocalNodeName(), dbName, ODistributedServerManager.DB_STATUS.ONLINE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,75 +902,98 @@ public synchronized boolean installDatabase(final boolean iStartup, final String
// OFFLINE: AVOID TO INSTALL IT
return false;

final ODistributedConfiguration cfg = new ODistributedConfiguration(config);

// GET ALL THE OTHER SERVERS
final Collection<String> nodes = cfg.getServers(null, nodeName);
getAvailableNodes(nodes, databaseName);
if (nodes.size() == 0) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE,
"Cannot install database '%s' on local node, because no servers are available", databaseName);
return false;
}

ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE, "Current node is a %s for database '%s'",
cfg.getServerRole(nodeName), databaseName);

final Set<String> configuredDatabases = serverInstance.getAvailableStorageNames().keySet();
if (!iStartup && configuredDatabases.contains(databaseName))
return false;

// INIT STORAGE + UPDATE LOCAL FILE ONLY
removeStorage(databaseName);
getStorage(databaseName);
updateCachedDatabaseConfiguration(databaseName, config, true, false);

final ODistributedDatabaseImpl distrDatabase = messageService.registerDatabase(databaseName);

// DISCARD MESSAGES DURING THE REQUEST OF DATABASE INSTALLATION
distrDatabase.setParsing(false);
return executeInDistributedDatabaseLock(databaseName, 0, new OCallable<Boolean, ODistributedConfiguration>() {
@Override
public Boolean call(ODistributedConfiguration iArgument) {
final ODistributedConfiguration cfg = new ODistributedConfiguration(config);

// GET ALL THE OTHER SERVERS
final Collection<String> nodes = cfg.getServers(null, nodeName);
getAvailableNodes(nodes, databaseName);
if (nodes.size() == 0) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE,
"Cannot install database '%s' on local node, because no servers are available", databaseName);
return false;
}

final Boolean deploy = forceDeployment ? Boolean.TRUE : (Boolean) config.field("autoDeploy");
ODistributedServerLog.info(this, nodeName, null, DIRECTION.NONE, "Current node is a %s for database '%s'",
cfg.getServerRole(nodeName), databaseName);

boolean databaseInstalled;
final Set<String> configuredDatabases = serverInstance.getAvailableStorageNames().keySet();
if (!iStartup && configuredDatabases.contains(databaseName))
return false;

// CREATE THE DISTRIBUTED QUEUE
if (!distrDatabase.exists() || distrDatabase.getSyncConfiguration().getMomentum().isEmpty()) {
// INIT STORAGE + UPDATE LOCAL FILE ONLY
removeStorage(databaseName);
getStorage(databaseName);
updateCachedDatabaseConfiguration(databaseName, config, true, false);

if (deploy == null || !deploy) {
// NO AUTO DEPLOY
setDatabaseStatus(nodeName, databaseName, DB_STATUS.ONLINE);
return false;
}
final ODistributedDatabaseImpl distrDatabase = messageService.registerDatabase(databaseName);

// FIRST TIME, ASK FOR FULL REPLICA
databaseInstalled = requestFullDatabase(distrDatabase, databaseName, iStartup);
// DISCARD MESSAGES DURING THE REQUEST OF DATABASE INSTALLATION
distrDatabase.setParsing(false);

} else {
if (tryWithDeltaFirst) {
try {
final Boolean deploy = forceDeployment ? Boolean.TRUE : (Boolean) config.field("autoDeploy");

// TRY WITH DELTA SYNC
databaseInstalled = requestDatabaseDelta(distrDatabase, databaseName);
boolean databaseInstalled;

} catch (ODistributedDatabaseDeltaSyncException e) {
// FALL BACK TO FULL BACKUP
removeStorage(databaseName);
// CREATE THE DISTRIBUTED QUEUE
if (!distrDatabase.exists() || distrDatabase.getSyncConfiguration().getMomentum().isEmpty()) {

if (deploy == null || !deploy) {
// NO AUTO DEPLOY
setDatabaseStatus(nodeName, databaseName, DB_STATUS.ONLINE);
distrDatabase.setParsing(true);
return false;
}

// FIRST TIME, ASK FOR FULL REPLICA
databaseInstalled = requestFullDatabase(distrDatabase, databaseName, iStartup);

} else {
if (tryWithDeltaFirst) {
try {

// TRY WITH DELTA SYNC
databaseInstalled = requestDatabaseDelta(distrDatabase, databaseName);

} catch (ODistributedDatabaseDeltaSyncException e) {
// FALL BACK TO FULL BACKUP
removeStorage(databaseName);

if (deploy == null || !deploy) {
// NO AUTO DEPLOY
setDatabaseStatus(nodeName, databaseName, DB_STATUS.ONLINE);
distrDatabase.setParsing(true);
return false;
}

databaseInstalled = requestFullDatabase(distrDatabase, databaseName, iStartup);
}
} else
// SKIP DELTA AND EXECUTE FULL BACKUP
databaseInstalled = requestFullDatabase(distrDatabase, databaseName, iStartup);
}

if (databaseInstalled) {
// OVERWRITE THE LSN
final ODatabaseDocumentTx db = distrDatabase.getDatabaseInstance();
try {
try {
distrDatabase.getSyncConfiguration().setLastLSN(nodeName,
((OLocalPaginatedStorage) db.getStorage().getUnderlying()).getLSN(), true);
} catch (IOException e) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE,
"Error on setting LSN after the installation of database '%s'", databaseName);
}
} finally {
db.close();
}
}
} else
// SKIP DELTA AND EXECUTE FULL BACKUP
databaseInstalled = requestFullDatabase(distrDatabase, databaseName, iStartup);
}

return databaseInstalled;
return databaseInstalled;
}
});
}

protected boolean requestFullDatabase(final ODistributedDatabaseImpl distrDatabase, final String databaseName,
Expand Down Expand Up @@ -1066,6 +1089,9 @@ public boolean requestDatabaseDelta(final ODistributedDatabaseImpl distrDatabase
ODistributedServerLog.error(this, nodeName, server, DIRECTION.IN, "Error on installing database delta %s in %s (%s)",
value, databaseName, dbPath, value);

setDatabaseStatus(nodeName, databaseName, DB_STATUS.NOT_AVAILABLE);
return false;

} else if (value instanceof ODistributedDatabaseChunk) {
distrDatabase.filterBeforeThisMomentum(((ODistributedDatabaseChunk) value).getMomentum());
distrDatabase.setParsing(true);
Expand Down Expand Up @@ -1143,7 +1169,9 @@ protected boolean requestDatabaseFullSync(final ODistributedDatabaseImpl distrDa
ODistributedServerLog.warn(this, nodeName, selectedNodes.toString(), DIRECTION.OUT,
"Requesting deploy of database '%s' on local server...", databaseName);

final OAbstractReplicatedTask deployTask = new OSyncDatabaseTask(
final OLogSequenceNumber lastLSN = distrDatabase.getSyncConfiguration().getLastLSN(getLocalNodeName());

final OAbstractReplicatedTask deployTask = new OSyncDatabaseTask(lastLSN,
distrDatabase.getSyncConfiguration().getLastOperationTimestamp());

final Map<String, Object> results = (Map<String, Object>) sendRequest(databaseName, null, selectedNodes, deployTask,
Expand All @@ -1163,6 +1191,8 @@ else if (value instanceof Throwable) {
ODistributedServerLog.error(this, nodeName, r.getKey(), DIRECTION.IN, "Error on installing database '%s' in %s",
(Exception) value, databaseName, dbPath);

setDatabaseStatus(nodeName, databaseName, DB_STATUS.NOT_AVAILABLE);

if (value instanceof ODistributedException)
throw (ODistributedException) value;

Expand Down Expand Up @@ -1390,7 +1420,8 @@ else if (result instanceof Exception) {
// OVERWRITE THE MOMENTUM FROM THE ORIGINAL SERVER AND ADD LAST LOCAL LSN
try {
distrDatabase.getSyncConfiguration().load();
distrDatabase.getSyncConfiguration().setLastLSN(localNodeName, ((OLocalPaginatedStorage) db.getStorage().getUnderlying()).getLSN(), true);
distrDatabase.getSyncConfiguration().setLastLSN(localNodeName,
((OLocalPaginatedStorage) db.getStorage().getUnderlying()).getLSN(), false);
} catch (IOException e) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE, "Error on loading %s file for database '%s'", e,
DISTRIBUTED_SYNC_JSON_FILENAME, databaseName);
Expand All @@ -1405,12 +1436,11 @@ public Void call(final ODistributedConfiguration cfg) {
getServerInstance().openDatabase(db);

db.reload();
db.getMetadata().reload();

distrDatabase.setOnline();

rebalanceClusterOwnership(nodeName, db, cfg, new HashSet<String>(), true);

distrDatabase.setOnline();

return null;
}
});
Expand Down Expand Up @@ -1527,8 +1557,6 @@ public <T> T executeInDistributedDatabaseLock(final String databaseName, final l
if (lastCfg.getVersion() > cfgVersion)
// CONFIGURATION CHANGED, UPDATE IT ON THE CLUSTER AND DISK
updateCachedDatabaseConfiguration(databaseName, lastCfg.getDocument(), true, true);

OScenarioThreadLocal.INSTANCE.setInDatabaseLock(false);
}
}

Expand All @@ -1553,6 +1581,9 @@ public <T> T executeInDistributedDatabaseLock(final String databaseName, final l
try {
OScenarioThreadLocal.INSTANCE.setInDatabaseLock(true);

ODistributedServerLog.debug(this, nodeName, null, DIRECTION.NONE, "Acquired distributed lock for database '%s'",
databaseName);

// ASSURE TO GET LAST VERSION. IN THIS WAY THERE ARE NO SYNCHRONIZATION PROBLEM
final ODistributedConfiguration lastCfg = getLastDatabaseConfiguration(databaseName);

Expand All @@ -1577,6 +1608,9 @@ public <T> T executeInDistributedDatabaseLock(final String databaseName, final l
throw new RuntimeException(e);

} finally {
ODistributedServerLog.debug(this, nodeName, null, DIRECTION.NONE, "Released distributed lock for database '%s'",
databaseName);

lock.unlock();
}
}
Expand Down
Loading

0 comments on commit 4899db1

Please sign in to comment.