Skip to content

Commit

Permalink
update execute nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
息羽 committed Dec 1, 2015
1 parent bb4c5be commit 773b9e2
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/main/java/darks/grid/balance/GridBalance.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package darks.grid.balance;

import java.util.List;

import darks.grid.beans.GridNode;

public abstract class GridBalance
{

public abstract GridNode getBalanceNode();

public abstract List<GridNode> getTargetNodes();
}
7 changes: 7 additions & 0 deletions src/main/java/darks/grid/balance/RollPolingBalance.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ private synchronized boolean initNodes()
}
return nodesList != null;
}

@Override
public synchronized List<GridNode> getTargetNodes()
{
return nodesList;
}

}
1 change: 0 additions & 1 deletion src/main/java/darks/grid/executor/job/JobExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public String toSimpleString()
.append(getStatusType()).append('\t')
.append(System.currentTimeMillis() - getTimestamp()).append('/')
.append(job.getTimeout()).append(' ')
.append(System.currentTimeMillis() - getTimestamp()).append(' ')
.append(getDelay());
if (isCanncel())
buf.append(' ').append("CANCELED");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import darks.grid.GridRuntime;
import darks.grid.balance.GridBalance;
import darks.grid.beans.GridMessage;
import darks.grid.beans.GridNode;
import darks.grid.executor.ExecuteConfig;
Expand Down Expand Up @@ -55,8 +56,10 @@ public MapReduceExecutor(MapReduceTask<T, R> task,
protected R execute()
{
ExecuteConfig config = getConfig();
List<GridNode> nodesList = GridRuntime.nodes().getSnapshotNodes();
task.initialize(nodesList, config.getBalance());
GridBalance balance = config.getBalance();
List<GridNode> nodesList = balance.getTargetNodes() != null ?
balance.getTargetNodes() : GridRuntime.nodes().getSnapshotNodes();
task.initialize(nodesList, balance);
int jobCount = nodesList.size();
if (config.getCallType() == CallType.SINGLE)
jobCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void handler(GridSession session, GridMessage msg) throws Exception
node.setHeartAliveTime(System.currentTimeMillis());
node.context().setMachineInfo(meta.context().getMachineInfo());
}
node.setPingDelay(arriveTime - meta.getTimestamp());
node.setPingDelay(Math.abs(arriveTime - meta.getTimestamp()));
//HEART ALIVE REPLY
if (msg.getType() == GridMessage.MSG_HEART_ALIVE)
GridRuntime.events().publish(EventsChannel.SYSTEM_CHANNEL, GridEvent.HEART_ALIVE_REPLY, msg);
Expand Down

0 comments on commit 773b9e2

Please sign in to comment.