Skip to content

Commit

Permalink
Prioritize local-only actions over dynamically executed actions for l…
Browse files Browse the repository at this point in the history
…ocal resources.

Also prioritizes workers over non-worker within dynamic execution actions for resources, and awards resources LIFO so as to start the dynamic local actions that are most likely to win races.

PiperOrigin-RevId: 405652726
  • Loading branch information
larsrc-google authored and copybara-github committed Oct 26, 2021
1 parent 8d66a41 commit 8fa3ccb
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.util.Pair;
import java.io.IOException;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
Expand Down Expand Up @@ -62,8 +62,8 @@
public class ResourceManager {

/**
* A handle returned by {@link #acquireResources(ActionExecutionMetadata, ResourceSet)} that must
* be closed in order to free the resources again.
* A handle returned by {@link #acquireResources(ActionExecutionMetadata, ResourceSet,
* ResourcePriority)} that must be closed in order to free the resources again.
*/
public static class ResourceHandle implements AutoCloseable {
final ResourceManager rm;
Expand Down Expand Up @@ -94,9 +94,16 @@ protected Boolean initialValue() {
};

/**
* Singleton reference defined in a separate class to ensure thread-safe lazy
* initialization.
* Defines the possible priorities of resources. The earlier elements in this enum will get first
* chance at grabbing resources.
*/
public enum ResourcePriority {
LOCAL(), // Local execution not under dynamic execution
DYNAMIC_WORKER(),
DYNAMIC_STANDALONE();
}

/** Singleton reference defined in a separate class to ensure thread-safe lazy initialization. */
private static class Singleton {
static ResourceManager instance = new ResourceManager();
}
Expand All @@ -117,9 +124,19 @@ public static ResourceManager instance() {
private static final double MIN_NECESSARY_CPU_RATIO = 0.6;
private static final double MIN_NECESSARY_RAM_RATIO = 1.0;

// List of blocked threads. Associated CountDownLatch object will always
// Lists of blocked threads. Associated CountDownLatch object will always
// be initialized to 1 during creation in the acquire() method.
private final List<Pair<ResourceSet, CountDownLatch>> requestList;
// We use LinkedList because we will need to remove elements from the middle frequently in the
// middle of iterating through the list.
@SuppressWarnings("JdkObsolete")
private final Deque<Pair<ResourceSet, CountDownLatch>> localRequests = new LinkedList<>();

@SuppressWarnings("JdkObsolete")
private final Deque<Pair<ResourceSet, CountDownLatch>> dynamicWorkerRequests = new LinkedList<>();

@SuppressWarnings("JdkObsolete")
private final Deque<Pair<ResourceSet, CountDownLatch>> dynamicStandaloneRequests =
new LinkedList<>();

// The total amount of resources on the local host. Must be set by
// an explicit call to setAvailableResources(), often using
Expand All @@ -143,8 +160,10 @@ public static ResourceManager instance() {
// Determines if local memory estimates are used.
private boolean localMemoryEstimate = false;

/** If set, local-only actions are given priority over dynamically run actions. */
private boolean prioritizeLocalActions;

private ResourceManager() {
requestList = new LinkedList<>();
}

@VisibleForTesting public static ResourceManager instanceForTestingOnly() {
Expand All @@ -159,11 +178,18 @@ public synchronized void resetResourceUsage() {
usedCpu = 0;
usedRam = 0;
usedLocalTestCount = 0;
for (Pair<ResourceSet, CountDownLatch> request : requestList) {
// CountDownLatch can be set only to 0 or 1.
for (Pair<ResourceSet, CountDownLatch> request : localRequests) {
request.second.countDown();
}
for (Pair<ResourceSet, CountDownLatch> request : dynamicWorkerRequests) {
request.second.countDown();
}
for (Pair<ResourceSet, CountDownLatch> request : dynamicStandaloneRequests) {
request.second.countDown();
}
requestList.clear();
localRequests.clear();
dynamicWorkerRequests.clear();
dynamicStandaloneRequests.clear();
}

/**
Expand All @@ -178,7 +204,9 @@ public synchronized void setAvailableResources(ResourceSet resources) {
staticResources.getMemoryMb(),
staticResources.getCpuUsage(),
staticResources.getLocalTestCount());
processWaitingThreads();
processWaitingThreads(localRequests);
processWaitingThreads(dynamicWorkerRequests);
processWaitingThreads(dynamicStandaloneRequests);
}

/**
Expand All @@ -189,22 +217,28 @@ public void setUseLocalMemoryEstimate(boolean value) {
localMemoryEstimate = value;
}

/** Sets whether to prioritize local-only actions in resource allocation. */
public void setPrioritizeLocalActions(boolean prioritizeLocalActions) {
this.prioritizeLocalActions = prioritizeLocalActions;
}

/**
* Acquires requested resource set. Will block if resource is not available.
* NB! This method must be thread-safe!
* Acquires requested resource set. Will block if resource is not available. NB! This method must
* be thread-safe!
*/
public ResourceHandle acquireResources(ActionExecutionMetadata owner, ResourceSet resources)
public ResourceHandle acquireResources(
ActionExecutionMetadata owner, ResourceSet resources, ResourcePriority priority)
throws InterruptedException {
Preconditions.checkNotNull(
resources, "acquireResources called with resources == NULL during %s", owner);
Preconditions.checkState(
!threadHasResources(), "acquireResources with existing resource lock during %s", owner);

AutoProfiler p =
profiled("Aquiring resources for: " + owner.describe(), ProfilerTask.ACTION_LOCK);
profiled("Acquiring resources for: " + owner.describe(), ProfilerTask.ACTION_LOCK);
CountDownLatch latch = null;
try {
latch = acquire(resources);
latch = acquire(resources, priority);
if (latch != null) {
latch.await();
}
Expand Down Expand Up @@ -272,7 +306,12 @@ private void incrementResources(ResourceSet resources) {
* Return true if any resources have been claimed through this manager.
*/
public synchronized boolean inUse() {
return usedCpu != 0.0 || usedRam != 0.0 || usedLocalTestCount != 0 || !requestList.isEmpty();
return usedCpu != 0.0
|| usedRam != 0.0
|| usedLocalTestCount != 0
|| !localRequests.isEmpty()
|| !dynamicWorkerRequests.isEmpty()
|| !dynamicStandaloneRequests.isEmpty();
}


Expand Down Expand Up @@ -309,14 +348,32 @@ void releaseResources(ActionExecutionMetadata owner, ResourceSet resources) {
}
}

private synchronized CountDownLatch acquire(ResourceSet resources) {
private synchronized CountDownLatch acquire(ResourceSet resources, ResourcePriority priority) {
if (areResourcesAvailable(resources)) {
incrementResources(resources);
return null;
}
Pair<ResourceSet, CountDownLatch> request =
new Pair<>(resources, new CountDownLatch(1));
requestList.add(request);
if (this.prioritizeLocalActions) {
switch (priority) {
case LOCAL:
localRequests.addLast(request);
break;
case DYNAMIC_WORKER:
// Dynamic requests should be LIFO, because we are more likely to win the race on newer
// actions.
dynamicWorkerRequests.addFirst(request);
break;
case DYNAMIC_STANDALONE:
// Dynamic requests should be LIFO, because we are more likely to win the race on newer
// actions.
dynamicStandaloneRequests.addFirst(request);
break;
}
} else {
localRequests.addLast(request);
}
return request.second;
}

Expand All @@ -334,18 +391,24 @@ private synchronized boolean release(ResourceSet resources) {
if (usedRam < epsilon) {
usedRam = 0;
}
if (!requestList.isEmpty()) {
processWaitingThreads();
return true;
boolean anyProcessed = false;
if (!localRequests.isEmpty()) {
processWaitingThreads(localRequests);
anyProcessed = true;
}
return false;
if (!dynamicWorkerRequests.isEmpty()) {
processWaitingThreads(dynamicWorkerRequests);
anyProcessed = true;
}
if (!dynamicStandaloneRequests.isEmpty()) {
processWaitingThreads(dynamicStandaloneRequests);
anyProcessed = true;
}
return anyProcessed;
}

/**
* Tries to unblock one or more waiting threads if there are sufficient resources available.
*/
private synchronized void processWaitingThreads() {
Iterator<Pair<ResourceSet, CountDownLatch>> iterator = requestList.iterator();
private void processWaitingThreads(Deque<Pair<ResourceSet, CountDownLatch>> requests) {
Iterator<Pair<ResourceSet, CountDownLatch>> iterator = requests.iterator();
while (iterator.hasNext()) {
Pair<ResourceSet, CountDownLatch> request = iterator.next();
if (request.second.getCount() != 0) {
Expand Down Expand Up @@ -417,7 +480,7 @@ private boolean areResourcesAvailable(ResourceSet resources) {

@VisibleForTesting
synchronized int getWaitCount() {
return requestList.size();
return localRequests.size() + dynamicStandaloneRequests.size() + dynamicWorkerRequests.size();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,17 +879,13 @@ private Builder createBuilder(
@VisibleForTesting
public static void configureResourceManager(ResourceManager resourceMgr, BuildRequest request) {
ExecutionOptions options = request.getOptions(ExecutionOptions.class);
ResourceSet resources;
resources = ResourceSet.createWithRamCpu(options.localRamResources, options.localCpuResources);
resourceMgr.setPrioritizeLocalActions(options.prioritizeLocalActions);
resourceMgr.setUseLocalMemoryEstimate(options.localMemoryEstimate);

resourceMgr.setAvailableResources(
ResourceSet.create(
resources.getMemoryMb(),
resources.getCpuUsage(),
request.getExecutionOptions().usingLocalTestJobs()
? request.getExecutionOptions().localTestJobs
: Integer.MAX_VALUE));
options.localRamResources,
options.localCpuResources,
options.usingLocalTestJobs() ? options.localTestJobs : Integer.MAX_VALUE));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,22 @@ public boolean usingLocalTestJobs() {
}

@Option(
name = "debug_print_action_contexts",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
effectTags = {OptionEffectTag.UNKNOWN},
help = "Print the contents of the SpawnActionContext and ContextProviders maps."
)
name = "experimental_prioritize_local_actions",
defaultValue = "true",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.EXECUTION},
help =
"If set, actions that can only run locally are given first chance at acquiring resources,"
+ " dynamically run workers get second chance, and dynamically-run standalone actions"
+ " come last.")
public boolean prioritizeLocalActions;

@Option(
name = "debug_print_action_contexts",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
effectTags = {OptionEffectTag.UNKNOWN},
help = "Print the contents of the SpawnActionContext and ContextProviders maps.")
public boolean debugPrintActionContexts;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.devtools.build.lib.actions.ForbiddenActionInputException;
import com.google.devtools.build.lib.actions.ResourceManager;
import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnResult;
import com.google.devtools.build.lib.actions.SpawnResult.Status;
Expand Down Expand Up @@ -136,7 +137,12 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
ActionExecutionMetadata owner = spawn.getResourceOwner();
context.report(SpawnSchedulingEvent.create(getName()));
try (ResourceHandle handle =
resourceManager.acquireResources(owner, spawn.getLocalResources())) {
resourceManager.acquireResources(
owner,
spawn.getLocalResources(),
context.speculating()
? ResourcePriority.DYNAMIC_STANDALONE
: ResourcePriority.LOCAL)) {
context.report(SpawnExecutingEvent.create(getName()));
if (!localExecutionOptions.localLockfreeOutput) {
context.lockOutputFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.devtools.build.lib.actions.ForbiddenActionInputException;
import com.google.devtools.build.lib.actions.ResourceManager;
import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnResult;
import com.google.devtools.build.lib.actions.SpawnResult.Status;
Expand Down Expand Up @@ -85,7 +86,10 @@ public final SpawnResult exec(Spawn spawn, SpawnExecutionContext context)
ActionExecutionMetadata owner = spawn.getResourceOwner();
context.report(SpawnSchedulingEvent.create(getName()));
try (ResourceHandle ignored =
resourceManager.acquireResources(owner, spawn.getLocalResources())) {
resourceManager.acquireResources(
owner,
spawn.getLocalResources(),
context.speculating() ? ResourcePriority.DYNAMIC_STANDALONE : ResourcePriority.LOCAL)) {
context.report(SpawnExecutingEvent.create(getName()));
SandboxedSpawn sandbox = prepareSpawn(spawn, context);
return runSpawn(spawn, sandbox, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.ResourceManager;
import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
import com.google.devtools.build.lib.actions.ResourceManager.ResourcePriority;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnExecutedEvent;
import com.google.devtools.build.lib.actions.SpawnMetrics;
Expand Down Expand Up @@ -387,7 +388,10 @@ WorkResponse execInWorker(
}

try (ResourceHandle handle =
resourceManager.acquireResources(owner, spawn.getLocalResources())) {
resourceManager.acquireResources(
owner,
spawn.getLocalResources(),
context.speculating() ? ResourcePriority.DYNAMIC_WORKER : ResourcePriority.LOCAL)) {
// We acquired a worker and resources -- mark that as queuing time.
spawnMetrics.setQueueTime(queueStopwatch.elapsed());

Expand Down
Loading

0 comments on commit 8fa3ccb

Please sign in to comment.