Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Automatic PR] SDK changes from pull request #975 #33

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

/**
* The type representing node in a {@link DAGraph}.
Expand All @@ -20,6 +21,7 @@ public class DAGNode<T> extends Node<T> {
private List<String> dependentKeys;
private int toBeResolved;
private boolean isPreparer;
private ReentrantLock lock;

/**
* Creates a DAG node.
Expand All @@ -30,6 +32,14 @@ public class DAGNode<T> extends Node<T> {
public DAGNode(String key, T data) {
super(key, data);
dependentKeys = new ArrayList<>();
lock = new ReentrantLock();
}

/**
* @return the lock to be used while performing thread safe operation on this node.
*/
public ReentrantLock lock() {
return this.lock;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

package com.microsoft.azure;

import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Type representing a DAG (directed acyclic graph).
Expand All @@ -20,7 +19,7 @@
* @param <U> the type of the nodes in the graph
*/
public class DAGraph<T, U extends DAGNode<T>> extends Graph<T, U> {
private Queue<String> queue;
private ConcurrentLinkedQueue<String> queue;
private boolean hasParent;
private U rootNode;

Expand All @@ -31,7 +30,7 @@ public class DAGraph<T, U extends DAGNode<T>> extends Graph<T, U> {
*/
public DAGraph(U rootNode) {
this.rootNode = rootNode;
this.queue = new ArrayDeque<>();
this.queue = new ConcurrentLinkedQueue<>();
this.rootNode.setPreparer(true);
this.addNode(rootNode);
}
Expand Down Expand Up @@ -103,10 +102,14 @@ public void prepare() {
* Gets next node in the DAG which has no dependency or all of it's dependencies are resolved and
* ready to be consumed.
*
* @return next node or null if all the nodes have been explored
* @return next node or null if all the nodes have been explored or no node is available at this moment.
*/
public U getNext() {
return graph.get(queue.poll());
String nextItemKey = queue.poll();
if (nextItemKey == null) {
return null;
}
return graph.get(nextItemKey);
}

/**
Expand All @@ -129,9 +132,14 @@ public void reportedCompleted(U completed) {
String dependency = completed.key();
for (String dependentKey : graph.get(dependency).dependentKeys()) {
DAGNode<T> dependent = graph.get(dependentKey);
dependent.reportResolved(dependency);
if (dependent.hasAllResolved()) {
queue.add(dependent.key());
dependent.lock().lock();
try {
dependent.reportResolved(dependency);
if (dependent.hasAllResolved()) {
queue.add(dependent.key());
}
} finally {
dependent.lock().lock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,54 @@

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import com.microsoft.rest.ServiceResponse;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
/**
* An instance of this class provides access to the underlying REST service call running
* in parallel.
*
* @param <T>
*/
class ParallelServiceCall<T> extends ServiceCall {
private TaskGroupBase<T> taskGroup;

/**
* Creates a ParallelServiceCall.
*
* @param taskGroup the task group
*/
ParallelServiceCall(TaskGroupBase<T> taskGroup) {
super(null);
this.taskGroup = taskGroup;
}

/**
* Cancels all the service calls currently executing.
*/
public void cancel() {
for (ServiceCall call : this.taskGroup.calls()) {
call.cancel();
}
}

/**
* @return true if the call has been canceled; false otherwise.
*/
public boolean isCancelled() {
for (ServiceCall call : this.taskGroup.calls()) {
if (!call.isCanceled()) {
return false;
}
}
return true;
}
}

/**
* The base implementation of TaskGroup interface.
Expand All @@ -18,6 +66,8 @@
public abstract class TaskGroupBase<T>
implements TaskGroup<T, TaskItem<T>> {
private DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag;
private ConcurrentLinkedQueue<ServiceCall> serviceCalls = new ConcurrentLinkedQueue<>();
private ParallelServiceCall<T> parallelServiceCall;

/**
* Creates TaskGroupBase.
Expand All @@ -27,6 +77,11 @@ public abstract class TaskGroupBase<T>
*/
public TaskGroupBase(String rootTaskItemId, TaskItem<T> rootTaskItem) {
this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem));
this.parallelServiceCall = new ParallelServiceCall<>(this);
}

List<ServiceCall> calls() {
return Collections.unmodifiableList(Arrays.asList(serviceCalls.toArray(new ServiceCall[0])));
}

@Override
Expand All @@ -39,6 +94,14 @@ public boolean isPreparer() {
return dag.isPreparer();
}

/**
* @return Gets the ParallelServiceCall instance that wraps the service calls running
* in parallel.
*/
public ParallelServiceCall<T> parallelServiceCall() {
return this.parallelServiceCall;
}

@Override
public void merge(TaskGroup<T, TaskItem<T>> parentTaskGroup) {
dag.merge(parentTaskGroup.dag());
Expand All @@ -58,54 +121,43 @@ public void execute() throws Exception {
return;
}

if (dag.isRootNode(nextNode)) {
executeRootTask(nextNode.data());
} else {
nextNode.data().execute(this, nextNode);
}
nextNode.data().execute(this, nextNode);
}

@Override
public ServiceCall executeAsync(final ServiceCallback<Void> callback) {
final DAGNode<TaskItem<T>> nextNode = dag.getNext();
if (nextNode == null) {
return null;
}
ServiceCall serviceCall = null;
DAGNode<TaskItem<T>> nextNode = dag.getNext();
while (nextNode != null) {
if (dag.isRootNode(nextNode)) {
serviceCall = nextNode.data().executeAsync(this, nextNode, new ServiceCallback<Void>() {
@Override
public void failure(Throwable t) {
callback.failure(t);
}

@Override
public void success(ServiceResponse<Void> result) {
callback.success(result);
}
});
} else {
serviceCall = nextNode.data().executeAsync(this, nextNode, callback);
}

if (dag.isRootNode(nextNode)) {
return executeRootTaskAsync(nextNode.data(), callback);
} else {
return nextNode.data().executeAsync(this, nextNode, callback);
if (serviceCall != null) {
// We need to filter out the null value returned by executeAsync. This can
// happen when TaskItem::executeAsync invokes TaskGroupBase::executeAsync
// but there is no task available in the queue at the moment.
this.serviceCalls.add(serviceCall);
}
nextNode = dag.getNext();
}
return serviceCall;
}

@Override
public T taskResult(String taskId) {
return dag.getNodeData(taskId).result();
}

/**
* Executes the root task in this group.
* <p>
* This method will be invoked when all the task dependencies of the root task are finished
* executing, at this point root task can be executed by consuming the result of tasks it
* depends on.
*
* @param task the root task in this group
* @throws Exception the exception
*/
public abstract void executeRootTask(TaskItem<T> task) throws Exception;

/**
* Executes the root task in this group asynchronously.
* <p>
* This method will be invoked when all the task dependencies of the root task are finished
* executing, at this point root task can be executed by consuming the result of tasks it
* depends on.
*
* @param task the root task in this group
* @param callback the callback when the task fails or succeeds
* @return the handle to the REST call
*/
public abstract ServiceCall executeRootTaskAsync(TaskItem<T> task, ServiceCallback<Void> callback);
}