Skip to content

Commit

Permalink
HA: dump of statistics on SIGTRAP (kill -5 <pid>)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Apr 4, 2017
1 parent 4917af4 commit 41ac19c
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,36 @@

package com.orientechnologies.orient.core;

import java.util.Hashtable;
import java.util.Map.Entry;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;

import sun.misc.Signal;
import sun.misc.SignalHandler;

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map.Entry;

@SuppressWarnings("restriction")
public class OSignalHandler implements SignalHandler {
private Hashtable<Signal, SignalHandler> redefinedHandlers = new Hashtable(4);
private List<OSignalListener> listeners = new ArrayList<OSignalListener>();

public interface OSignalListener {
void onSignal(Signal signal);
}

public OSignalHandler() {
}

public void registerListener(final OSignalListener listener) {
listeners.add(listener);
}

public void unregisterListener(final OSignalListener listener) {
listeners.remove(listener);
}

public void listenTo(final String name, final SignalHandler iListener) {
Signal signal = new Signal(name);
SignalHandler redefinedHandler = Signal.handle(signal, iListener);
Expand All @@ -44,13 +58,13 @@ public void listenTo(final String name, final SignalHandler iListener) {
}
}

public void handle(Signal signal) {
public void handle(final Signal signal) {
OLogManager.instance().warn(this, "Received signal: %s", signal);

final String s = signal.toString().trim();

if (Orient.instance().isSelfManagedShutdown()
&& (s.equals("SIGKILL") || s.equals("SIGHUP") || s.equals("SIGINT") || s.equals("SIGTERM"))) {
if (Orient.instance().isSelfManagedShutdown() && (s.equals("SIGKILL") || s.equals("SIGHUP") || s.equals("SIGINT") || s
.equals("SIGTERM"))) {
Orient.instance().shutdown();
System.exit(1);
} else if (s.equals("SIGTRAP")) {
Expand All @@ -66,6 +80,9 @@ public void handle(Signal signal) {
redefinedHandler.handle(signal);
}
}

for (OSignalListener l : listeners)
l.onSignal(signal);
}

public void installDefaultSignals() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ public void removeShutdownHook() {
}
}

public OSignalHandler getSignalHandler() {
return signalHandler;
}

public void removeSignalHandler() {
if (signalHandler != null) {
signalHandler.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ public void change(final Object iCurrentValue, final Object iNewValue) {
* @Since 2.2.18
*/
DISTRIBUTED_DUMP_STATS_EVERY("distributed.dumpStatsEvery", "Time in ms to dump the cluster stats. Set to 0 to disable such dump",
Long.class, 60000l, true),
Long.class, 0, true),

DISTRIBUTED_CRUD_TASK_SYNCH_TIMEOUT("distributed.crudTaskTimeout", "Maximum timeout (in ms) to wait for CRUD remote tasks",
Long.class, 10000l, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,11 +1071,23 @@ public void resume() {
}

@Override
public void dumpLocks() {
OLogManager.instance().info(this, "Current locks database '%s' server '%s'", databaseName, manager.getLocalNodeName());
for (Map.Entry<ORID, ODistributedLock> entry : lockManager.entrySet()) {
OLogManager.instance()
.info(this, "- %s = %s (count=%d)", entry.getKey(), entry.getValue().reqId, entry.getValue().lock.getCount());
public String dump() {
final StringBuilder buffer = new StringBuilder(1024);

buffer.append("\n\nDATABASE '" + databaseName + "' ON SERVER '" + manager.getLocalNodeName() + "'");

buffer.append("\n- "+ODistributedOutput.formatLocks(manager, databaseName));

buffer.append("\n- MESSAGES IN QUEUES:");

for (ODistributedWorker t : workerThreads) {
buffer.append("\n - QUEUE " + t.id +" EXECUTING: " +t.getProcessing() );
int i = 0;
for (ODistributedRequest m : t.localQueue) {
if (m != null)
buffer.append("\n - " + i + " = " + m.toString());
}
}
return buffer.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public static Object formatLocks(final ODistributedAbstractPlugin manager, final
.getDatabase(db).lockManager;

final StringBuilder buffer = new StringBuilder();
buffer.append("\nHA LOCKS FOR DATABASE '" + db + "'");
buffer.append("HA LOCKS FOR DATABASE '" + db + "'");
final OTableFormatter table = new OTableFormatter(new OTableFormatter.OTableOutput() {
@Override
public void onMessage(final String text, final Object... args) {
Expand Down Expand Up @@ -674,6 +674,7 @@ public void onMessage(final String text, final Object... args) {
row.field("server", manager.getNodeNameById(lock.reqId.getNodeId()));
row.field("acquiredOn", dateFormat.format(new Date(lock.acquiredOn)));
row.field("reqId", lock.reqId);
row.field("threadCount", lock.lock.getCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ODistributedWorker extends Thread {
private AtomicBoolean waitingForNextRequest = new AtomicBoolean(true);

private static final long MAX_SHUTDOWN_TIMEOUT = 5000l;
private volatile ODistributedRequest currentExecuting;

public ODistributedWorker(final ODistributedDatabaseImpl iDistributed, final String iDatabaseName, final int i) {
id = i;
Expand Down Expand Up @@ -95,12 +96,16 @@ public void run() {
try {
message = readRequest();

currentExecuting = message;

if (message != null) {
message.getId();
reqId = message.getId();
onMessage(message);
}

currentExecuting = null;

} catch (InterruptedException e) {
// EXIT CURRENT THREAD
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -444,4 +449,8 @@ public void sendShutdown() {
running = false;
this.interrupt();
}

public ODistributedRequest getProcessing() {
return currentExecuting;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.orientechnologies.common.util.OCallable;
import com.orientechnologies.common.util.OCallableNoParamNoReturn;
import com.orientechnologies.common.util.OCallableUtils;
import com.orientechnologies.orient.core.OSignalHandler;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
Expand All @@ -57,6 +58,7 @@
import com.orientechnologies.orient.server.distributed.task.ODistributedOperationException;
import com.orientechnologies.orient.server.network.OServerNetworkListener;
import com.orientechnologies.orient.server.network.protocol.OBeforeDatabaseOpenNetworkEventListener;
import sun.misc.Signal;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -85,7 +87,8 @@ public class OHazelcastPlugin extends ODistributedAbstractPlugin
protected volatile HazelcastInstance hazelcastInstance;

// THIS MAP IS BACKED BY HAZELCAST EVENTS. IN THIS WAY WE AVOID TO USE HZ MAP DIRECTLY
protected OHazelcastDistributedMap configurationMap;
protected OHazelcastDistributedMap configurationMap;
private OSignalHandler.OSignalListener signalListener;

public OHazelcastPlugin() {
}
Expand Down Expand Up @@ -268,7 +271,7 @@ public void run() {
haStatsTask = new TimerTask() {
@Override
public void run() {
printStats();
dumpStats();
}
};
Orient.instance().scheduleTask(haStatsTask, statsDelay, statsDelay);
Expand All @@ -286,6 +289,15 @@ public void run() {
// WAIT ALL THE MESSAGES IN QUEUE ARE PROCESSED OR MAX 10 SECONDS
waitStartupIsCompleted();

signalListener = new OSignalHandler.OSignalListener() {
@Override
public void onSignal(final Signal signal) {
if (signal.toString().trim().equalsIgnoreCase("SIGTRAP"))
dumpStats();
}
};
Orient.instance().getSignalHandler().registerListener(signalListener);

} catch (Exception e) {
ODistributedServerLog.error(this, localNodeName, null, DIRECTION.NONE, "Error on starting distributed plugin", e);
throw OException.wrapException(new ODistributedStartupException("Error on starting distributed plugin"), e);
Expand Down Expand Up @@ -432,7 +444,7 @@ protected void publishLocalNodeConfiguration() {
}
}

protected void printStats() {
protected void dumpStats() {
try {
final ODocument clusterCfg = getClusterConfiguration();

Expand All @@ -443,11 +455,13 @@ protected void printStats() {
buffer.append(ODistributedOutput.formatLatency(this, clusterCfg));
buffer.append(ODistributedOutput.formatMessages(this, clusterCfg));

for (String db : dbs)
buffer.append(ODistributedOutput.formatLocks(this, db));
OLogManager.instance().flush();
for (String db : dbs) {
buffer.append(messageService.getDatabase(db).dump());
}

// DUMP HA STATS
ODistributedServerLog.debug(this, getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, "%s", buffer);
ODistributedServerLog.info(this, getLocalNodeName(), null, ODistributedServerLog.DIRECTION.NONE, "%s", buffer);

} catch (Throwable t) {
ODistributedServerLog.error(this, nodeName, null, DIRECTION.NONE, "Error on printing HA stats");
Expand Down Expand Up @@ -483,6 +497,8 @@ public void shutdown() {
if (!enabled)
return;

Orient.instance().getSignalHandler().unregisterListener(signalListener);

for (OServerNetworkListener nl : serverInstance.getNetworkListeners())
nl.unregisterBeforeConnectNetworkEventListener(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ ODistributedResponse send2Nodes(ODistributedRequest iRequest, Collection<String>
*/
void unlockRecord(OIdentifiable iRecord, ODistributedRequestId requestId);

void dumpLocks();
String dump();

void unlockResourcesOfServer(ODatabaseDocumentInternal database, String serverName);

Expand Down

0 comments on commit 41ac19c

Please sign in to comment.