Skip to content

Commit

Permalink
updates based on pr comments (validation, property name updates), rec…
Browse files Browse the repository at this point in the history
…overy rule updated to interrupt thread
  • Loading branch information
dev-mlb committed Nov 16, 2023
1 parent 9e11756 commit 1002975
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 9 deletions.
20 changes: 17 additions & 3 deletions src/main/java/emissary/core/sentinel/Sentinel.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import emissary.core.NamespaceException;
import emissary.core.sentinel.rules.Notify;
import emissary.core.sentinel.rules.Rule;
import emissary.directory.DirectoryPlace;
import emissary.directory.KeyManipulator;
import emissary.pool.MobileAgentFactory;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -131,17 +133,18 @@ protected void configure() {
protected void init() {
this.enabled = config.findBooleanEntry("ENABLED", false);
if (this.enabled) {
this.pollingInterval = config.findIntEntry("POLLING_INTERVAL", 5);
this.pollingInterval = config.findIntEntry("POLLING_INTERVAL_MINUTES", 5);

logger.trace("Loading rules...");
for (String ruleId : config.findEntries("RULE_ID")) {
try {
validate(ruleId);
Map<String, String> map = config.findStringMatchMap(ruleId + "_");
String rule = map.getOrDefault("RULE", Notify.class.getName());
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT"), map.get("THRESHOLD"));
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD"));
this.rules.put(ruleId, ruleImpl);
} catch (Exception e) {
logger.warn("Unable to configure Sentinel for: {}", ruleId);
logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage());
}
}

Expand All @@ -153,6 +156,17 @@ protected void init() {
}
}

protected void validate(String place) throws NamespaceException {
// validate that the place exists
if (!DEFAULT_RULE.equalsIgnoreCase(place)) {
DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next();
if (directoryPlace.getEntries().stream()
.noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) {
throw new IllegalStateException("Place not found in the directory");
}
}
}

/**
* Checks to see if the mobile agents are processing the same data since the last polling event
*
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Exit.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ public Exit(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

/**
* Try to terminate the JVM
*
* @param tracker the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param counter number of mobile agents stuck on the place
*/
@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], exiting now!!", placeSimpleName);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Kill.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ public Kill(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

/**
* Force a shutdown of the system
*
* @param tracker the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param counter number of mobile agents stuck on the place
*/
@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating forceful shutdown...", placeSimpleName);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Notify.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ public Notify(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

/**
* Log the problem agents/threads
*
* @param tracker the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param counter number of mobile agents stuck on the place
*/
@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.warn("Sentinel detected locked agent(s) running [{}]", placeSimpleName);
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/emissary/core/sentinel/rules/Recover.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package emissary.core.sentinel.rules;

import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.sentinel.Sentinel;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Recover extends Rule {
public Recover(String place, long timeLimit, double threshold) {
Expand All @@ -13,9 +17,29 @@ public Recover(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

/**
* Attempts to recover the mobile agents by interrupting the thread
*
* @param tracker the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param counter number of mobile agents stuck on the place
*/
@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.warn("Sentinel attempting recovery mode...");
throw new UnsupportedOperationException("Recovery unavailable");
logger.warn("Sentinel detected locked agent(s) running [{}], attempting recovery...", placeSimpleName);
List<String> agentNames = tracker.values().stream()
.filter(t -> t.getPlaceSimpleName().equalsIgnoreCase(placeSimpleName))
.map(Sentinel.Tracker::getAgentName)
.collect(Collectors.toList());

for (String agentName : agentNames) {
try {
IMobileAgent mobileAgent = (IMobileAgent) Namespace.lookup(agentName);
logger.warn("Sentinel attempting recovery for {}", agentName);
mobileAgent.interrupt();
} catch (Exception e) {
throw new IllegalStateException("Recovery unavailable");
}
}
}
}
7 changes: 7 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Stop.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ public Stop(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

/**
* Attempt a graceful shutdown of the system
*
* @param tracker the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param counter number of mobile agents stuck on the place
*/
@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating graceful shut down...", placeSimpleName);
Expand Down
8 changes: 4 additions & 4 deletions src/main/resources/emissary/core/sentinel/Sentinel.cfg
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
ENABLED = true
POLLING_INTERVAL = 1
POLLING_INTERVAL_MINUTES = 1

RULE_ID = "DEFAULT"
DEFAULT_TIME_LIMIT = "60"
DEFAULT_TIME_LIMIT_MINUTES = "60"
DEFAULT_THRESHOLD = "1.0"

# Sample rule
# RULE_ID = "DelayPlace"
# DelayPlace_RULE= "emissary.core.sentinel.rules.Kill"
# DelayPlace_TIME_LIMIT = "5"
# DelayPlace_RULE = "emissary.core.sentinel.rules.Kill"
# DelayPlace_TIME_LIMIT_MINUTES = "5"
# DelayPlace_THRESHOLD = "0.75"

0 comments on commit 1002975

Please sign in to comment.