Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add trigger support #196

Merged
merged 3 commits into from
Jul 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
<field name="closuredelay" type="integer">
<bind-xml name="closuredelay" node="attribute" />
</field>

<field name="trigger" type="string">
<bind-xml name="trigger" node="attribute" />
</field>

<field name="auth" type="com.intel.cosbench.config.Auth">
<bind-xml name="auth" node="element" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
<field name="description" type="string">
<bind-xml name="description" node="attribute" />
</field>

<field name="trigger" type="string">
<bind-xml name="trigger" node="attribute" />
</field>

<field name="auth" type="com.intel.cosbench.config.Auth">
<bind-xml name="auth" node="element" />
Expand Down
11 changes: 10 additions & 1 deletion dev/cosbench-config/src/com/intel/cosbench/config/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public class Stage implements Iterable<Work> {

private String name;
private int closuredelay;
private Auth auth;
private String trigger=null;
private Auth auth;
private Storage storage;
private List<Work> works;

Expand All @@ -59,6 +60,14 @@ public void setName(String name) {
public int getClosuredelay() {
return closuredelay;
}

public String getTrigger() {
return trigger;
}

public void setTrigger(String trigger) {
this.trigger = trigger;
}

public void setClosuredelay(int closuredelay) {
if (closuredelay < 0)
Expand Down
19 changes: 13 additions & 6 deletions dev/cosbench-config/src/com/intel/cosbench/config/Work.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class Work implements Iterable<Operation> {
private int runtime = 0;
private int rampup = 0;
private int rampdown = 0;
private int afr = 200000; /* acceptable failure ratio, the unit is samples per one million,
private int afr = -1; /* acceptable failure ratio, the unit is samples per one million,
* default is 200000 for normal work, and 0 for init/prepare/cleanup/dispose/delay work */
private int totalOps = 0;
private long totalBytes = 0;
Expand Down Expand Up @@ -254,7 +254,7 @@ private void toPrepareWork() {
name = "prepare";
setDivision("object");
setRuntime(0);
setAfr(0);
setDefaultAfr(0);
setTotalBytes(0);
setTotalOps(getWorkers());
Operation op = new Operation();
Expand All @@ -274,7 +274,7 @@ private void toCleanupWork() {
name = "cleanup";
setDivision("object");
setRuntime(0);
setAfr(0);
setDefaultAfr(0);
setTotalBytes(0);
setTotalOps(getWorkers());
Operation op = new Operation();
Expand All @@ -294,7 +294,7 @@ private void toInitWork() {
name = "init";
setDivision("container");
setRuntime(0);
setAfr(0);
setDefaultAfr(0);
setTotalBytes(0);
setTotalOps(getWorkers());
Operation op = new Operation();
Expand All @@ -310,7 +310,7 @@ private void toDisposeWork() {
name = "dispose";
setDivision("container");
setRuntime(0);
setAfr(0);
setDefaultAfr(0);
setTotalBytes(0);
setTotalOps(getWorkers());
Operation op = new Operation();
Expand All @@ -326,7 +326,7 @@ public void toDelayWork() {
name = "delay";
setDivision("none");
setRuntime(0);
setAfr(0);
setDefaultAfr(0);
setTotalBytes(0);
setWorkers(1);
setTotalOps(getWorkers());
Expand All @@ -336,6 +336,11 @@ public void toDelayWork() {
op.setConfig("");
setOperations(Collections.singletonList(op));
}

private void setDefaultAfr(int def) {
if (afr < 0)
setAfr(def);
}

public void validate() {
if (type.equals("prepare"))
Expand All @@ -348,6 +353,8 @@ else if (type.equals("dispose"))
toDisposeWork();
else if (type.equals("delay"))
toDelayWork();
else
setDefaultAfr(200000);
setName(getName());
setWorkers(getWorkers());
if (runtime == 0 && totalOps == 0 && totalBytes == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Workload {

private String name;
private String description;
private String trigger=null;
private Auth auth = DEFAULT_AUTH;
private Storage storage = DEFAULT_STORAGE;
private Workflow workflow;
Expand All @@ -59,6 +60,14 @@ public void setDescription(String description) {
/* description might be empty */
this.description = description;
}

public String getTrigger() {
return trigger;
}

public void setTrigger(String trigger) {
this.trigger = trigger;
}

public Auth getAuth() {
return auth;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
<field name="closuredelay" type="integer">
<bind-xml name="closuredelay" node="attribute" />
</field>

<field name="trigger" type="string">
<bind-xml name="trigger" node="attribute" />
</field>

<field name="auth" type="com.intel.cosbench.config.Auth">
<bind-xml name="auth" node="element" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
<field name="description" type="string">
<bind-xml name="description" node="attribute" />
</field>

<field name="trigger" type="string">
<bind-xml name="trigger" node="attribute" />
</field>

<field name="auth" type="com.intel.cosbench.config.Auth">
<bind-xml name="auth" node="element" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.intel.cosbench.controller.model;

import java.util.HashMap;
import java.util.Map;

import com.intel.cosbench.model.DriverInfo;
import com.intel.cosbench.utils.MapRegistry;

Expand All @@ -31,8 +34,9 @@ public class DriverContext implements DriverInfo, MapRegistry.Item {
private String name;
private String url;
private boolean aliveState;
private Map<String, String> pIDMap = new HashMap<String, String>();

public DriverContext() {
public DriverContext() {
/* empty */
}

Expand Down Expand Up @@ -64,4 +68,16 @@ public boolean getAliveState(){
return aliveState;
}


public String getPIDMap(String scriptName) {
String pid = pIDMap.remove(scriptName);
return (pid == null) ? "0" : pid;
}

public void putPIDMap(String scriptName, String pid) {
if (pid == null)
pIDMap.put(scriptName, "0");
pIDMap.put(scriptName, pid);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.intel.cosbench.controller.service;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.intel.cosbench.controller.model.DriverRegistry;
import com.intel.cosbench.controller.tasklet.Tasklet;
import com.intel.cosbench.controller.tasklet.Tasklets;
import com.intel.cosbench.log.LogFactory;
import com.intel.cosbench.log.Logger;

public class TriggerRunner {
DriverRegistry registry;
private ExecutorService executor;
private static final Logger LOGGER = LogFactory.getSystemLogger();

public TriggerRunner(DriverRegistry registry) {
this.registry = registry;
createExecutor();
}

public void runTrigger(boolean option, String trigger, String wid) {
List<Tasklet> tasklets = Tasklets.newTriggers(trigger, registry, option, wid);
executeTasklets(tasklets, option);
dispose();//shutdown too quick?
}

private void createExecutor() {
int taskCount = registry.getSize();
executor = Executors.newFixedThreadPool(taskCount);
}

public void dispose() {
if (executor != null)
executor.shutdown();
executor = null;
}

private void executeTasklets(List<Tasklet> tasklets, boolean option) {
int num = tasklets.size();
LOGGER.debug("begin to execute {}-trigger tasklets, {} in total",
option ? "enable" : "kill", num);
try {
executor.invokeAll(tasklets);
} catch (InterruptedException e) {
LOGGER.debug("{}-trigger tasklets have interrupted",
option ? "enable" : "kill", num);
return; //no return is ok?
}
LOGGER.debug("all {} trigger tasklets have finished execution", num);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,14 @@ private void processWorkload() throws InterruptedException {
workloadContext.setState(PROCESSING);
workloadContext.setStartDate(new Date());
Iterator<StageContext> iter = queue.iterator();
String trigger = workloadContext.getWorkload().getTrigger();
executeTrigger(trigger, true);
while (iter.hasNext()) {
StageContext stageContext = iter.next();
iter.remove();
runStage(stageContext);
}
executeTrigger(trigger, false);
workloadContext.setStopDate(new Date());
workloadContext.setCurrentStage(null);
for (StageContext stageContext : workloadContext.getStageRegistry()
Expand Down Expand Up @@ -204,7 +207,6 @@ private void runStage(StageContext stageContext) throws InterruptedException {
if(closuredelay > 0)
executeDelay(stageContext, closuredelay);
}

LOGGER.info("successfully ran stage {}", id);
}

Expand All @@ -222,12 +224,16 @@ private void executeStage(StageContext stageContext) {
StageRunner runner = createStageRunner(stageContext);
StageChecker checker = createStageChecker(stageContext);
StageCallable[] callables = new StageCallable[] { runner, checker };
String trigger = stageContext.getStage().getTrigger();
executeTrigger(trigger, true);
try {
executor.invokeAll(Arrays.asList(callables));
} catch (InterruptedException ie) {
executeTrigger(trigger, false);
throw new CancelledException(); // workload cancelled
}
runner.dispose(); // early dispose runner
executeTrigger(trigger, false);
if (!stageContext.getState().equals(StageState.TERMINATED))
return;
String id = stageContext.getId();
Expand All @@ -254,10 +260,18 @@ private void terminateWorkload() {
LOGGER.info("begin to terminate workload {}", id);
for (StageContext stageContext : queue)
stageContext.setState(StageState.ABORTED);
executeTrigger(workloadContext.getWorkload().getTrigger(), false);
workloadContext.setStopDate(new Date());
workloadContext.setState(TERMINATED);
LOGGER.info("successfully terminated workload {}", id);
}

private void executeTrigger(String trigger, boolean isEnable) {
if (trigger == null || trigger.isEmpty())
return;
TriggerRunner runner = new TriggerRunner(controllerContext.getDriverRegistry());
runner.runTrigger(isEnable, trigger, workloadContext.getId());
}

public void cancel() {
String id = workloadContext.getId();
Expand Down Expand Up @@ -310,6 +324,7 @@ private void cancelWorkload() {
*/
for (StageContext stageContext : queue)
stageContext.setState(StageState.CANCELLED);
executeTrigger(workloadContext.getWorkload().getTrigger(), false);
workloadContext.setStopDate(new Date());
workloadContext.setState(CANCELLED);
LOGGER.info("successfully cancelled workload {}", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,13 @@ public static List<Tasklet> newAborters(TaskRegistry tasks) {
}
return result;
}

public static List<Tasklet> newTriggers(String trigger, DriverRegistry registry, boolean option, String wid) {
List<Tasklet> result = new ArrayList<Tasklet>();
for (DriverContext driver : registry) {
result.add(new Trigger(driver, trigger, option, wid));
}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.intel.cosbench.controller.tasklet;

import org.apache.commons.lang.StringUtils;

import com.intel.cosbench.controller.model.DriverContext;
import com.intel.cosbench.protocol.TriggerResponse;


public class Trigger extends TriggerHttpTasklet {
public Trigger(DriverContext driver, String trigger, boolean option, String wid) {
super(driver, trigger, option, wid);
}

@Override
public void execute() {
initHttpClient();
initObjectMapper();
String content = getCmdLine();
if (content == null || content.isEmpty())
return;
issueCommand("trigger", content);
try {
closeHttpClient();
} catch (Exception e) {
LOGGER.error("unexpected exception", e);
}
}

private String getCmdLine() {
trigger.replace(" ", "");
int idxLeft = StringUtils.indexOf(trigger, '(');
int idxRight = StringUtils.indexOf(trigger, ')');
if (idxLeft < 3 || ( idxRight != trigger.length()-1)){
LOGGER.error("can't enable trigger, the format is illegal!");
return null;
}
scriptName = StringUtils.left(trigger, idxLeft);
String argStr = StringUtils.substring(trigger, idxLeft+1, idxRight);
return isEnable ? ("enableTrigger," + scriptName + "," + argStr + "," + wID)
: ("killTrigger," + driver.getPIDMap(scriptName) + "," + scriptName + "," + wID);
}

@Override
protected void handleResponse(TriggerResponse response) {
if (!isEnable) {
driver.putPIDMap(scriptName, "0");
return;
}
driver.putPIDMap(scriptName, response.getPID());
}

}
Loading