diff --git a/src/main/java/darks/grid/balance/GridBalance.java b/src/main/java/darks/grid/balance/GridBalance.java index 57bd991..9ed4b37 100644 --- a/src/main/java/darks/grid/balance/GridBalance.java +++ b/src/main/java/darks/grid/balance/GridBalance.java @@ -16,6 +16,8 @@ */ package darks.grid.balance; +import java.util.List; + import darks.grid.beans.GridNode; public abstract class GridBalance @@ -23,4 +25,5 @@ public abstract class GridBalance public abstract GridNode getBalanceNode(); + public abstract List getTargetNodes(); } diff --git a/src/main/java/darks/grid/balance/RollPolingBalance.java b/src/main/java/darks/grid/balance/RollPolingBalance.java index 9296363..ab0487f 100644 --- a/src/main/java/darks/grid/balance/RollPolingBalance.java +++ b/src/main/java/darks/grid/balance/RollPolingBalance.java @@ -66,4 +66,11 @@ private synchronized boolean initNodes() } return nodesList != null; } + + @Override + public synchronized List getTargetNodes() + { + return nodesList; + } + } diff --git a/src/main/java/darks/grid/executor/job/JobExecutor.java b/src/main/java/darks/grid/executor/job/JobExecutor.java index e067adb..49f1ad9 100644 --- a/src/main/java/darks/grid/executor/job/JobExecutor.java +++ b/src/main/java/darks/grid/executor/job/JobExecutor.java @@ -168,7 +168,6 @@ public String toSimpleString() .append(getStatusType()).append('\t') .append(System.currentTimeMillis() - getTimestamp()).append('/') .append(job.getTimeout()).append(' ') - .append(System.currentTimeMillis() - getTimestamp()).append(' ') .append(getDelay()); if (isCanncel()) buf.append(' ').append("CANCELED"); diff --git a/src/main/java/darks/grid/executor/task/mapred/MapReduceExecutor.java b/src/main/java/darks/grid/executor/task/mapred/MapReduceExecutor.java index 5253e10..197ab02 100644 --- a/src/main/java/darks/grid/executor/task/mapred/MapReduceExecutor.java +++ b/src/main/java/darks/grid/executor/task/mapred/MapReduceExecutor.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.balance.GridBalance; import darks.grid.beans.GridMessage; import darks.grid.beans.GridNode; import darks.grid.executor.ExecuteConfig; @@ -55,8 +56,10 @@ public MapReduceExecutor(MapReduceTask task, protected R execute() { ExecuteConfig config = getConfig(); - List nodesList = GridRuntime.nodes().getSnapshotNodes(); - task.initialize(nodesList, config.getBalance()); + GridBalance balance = config.getBalance(); + List nodesList = balance.getTargetNodes() != null ? + balance.getTargetNodes() : GridRuntime.nodes().getSnapshotNodes(); + task.initialize(nodesList, balance); int jobCount = nodesList.size(); if (config.getCallType() == CallType.SINGLE) jobCount = 1; diff --git a/src/main/java/darks/grid/network/handler/msg/HEART_ALIVE.java b/src/main/java/darks/grid/network/handler/msg/HEART_ALIVE.java index a45d6a8..4b1ead1 100644 --- a/src/main/java/darks/grid/network/handler/msg/HEART_ALIVE.java +++ b/src/main/java/darks/grid/network/handler/msg/HEART_ALIVE.java @@ -46,7 +46,7 @@ public void handler(GridSession session, GridMessage msg) throws Exception node.setHeartAliveTime(System.currentTimeMillis()); node.context().setMachineInfo(meta.context().getMachineInfo()); } - node.setPingDelay(arriveTime - meta.getTimestamp()); + node.setPingDelay(Math.abs(arriveTime - meta.getTimestamp())); //HEART ALIVE REPLY if (msg.getType() == GridMessage.MSG_HEART_ALIVE) GridRuntime.events().publish(EventsChannel.SYSTEM_CHANNEL, GridEvent.HEART_ALIVE_REPLY, msg);