Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Fix/add task perf (#195)
Browse files Browse the repository at this point in the history
* Added task id range

* Removed upload blob methods

* Removed upload blob

* Fixed trailing whitespace

* Discarded job id on merge task id name

* Adding chunk logic for argsList

* Added check for args containing data sets

* Removed container name for docker run command for all tasks

* Added test for hasDataSet

* Fix travis yml

* Adding before_install for R

* Removed before install, added github package of nycflights13
  • Loading branch information
brnleehng authored Jan 10, 2018
1 parent 9d50403 commit afde92f
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 35 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ warnings_are_errors: false
r_github_packages:
- Azure/rAzureBatch
- jimhester/lintr
- hadley/nycflights13
34 changes: 24 additions & 10 deletions R/doAzureParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ setHttpTraffic <- function(value = FALSE) {
assign("bioconductor", bioconductorPackages, .doAzureBatchGlobals)
assign("pkgName", pkgName, .doAzureBatchGlobals)

isDataSet <- hasDataSet(argsList)

if (!isDataSet) {
assign("argsList", argsList, .doAzureBatchGlobals)
}

if (!is.null(obj$options$azure$job)) {
id <- obj$options$azure$job
}
Expand Down Expand Up @@ -528,18 +534,26 @@ setHttpTraffic <- function(value = FALSE) {
tasks <- lapply(1:length(endIndices), function(i) {
startIndex <- startIndices[i]
endIndex <- endIndices[i]
taskId <- paste0(id, "-task", i)
taskId <- as.character(i)

args <- NULL
if (isDataSet) {
args <- argsList[startIndex:endIndex]
}

.addTask(
jobId = id,
taskId = taskId,
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R > $AZ_BATCH_TASK_ID.txt"),
args = argsList[startIndex:endIndex],
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/worker.R %i %i %i > $AZ_BATCH_TASK_ID.txt",
startIndex,
endIndex,
isDataSet),
envir = .doAzureBatchGlobals,
packages = obj$packages,
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
containerImage = data$containerImage,
args = args
)

cat("\r", sprintf("Submitting tasks (%s/%s)", i, length(endIndices)), sep = "")
Expand All @@ -548,14 +562,11 @@ setHttpTraffic <- function(value = FALSE) {
return(taskId)
})

rAzureBatch::updateJob(id)

if (enableCloudCombine) {
cat("\nSubmitting merge task")
mergeTaskId <- paste0(id, "-merge")
.addTask(
jobId = id,
taskId = mergeTaskId,
taskId = "merge",
rCommand = sprintf(
"Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/merger.R %s %s %s > $AZ_BATCH_TASK_ID.txt",
length(tasks),
Expand All @@ -564,14 +575,17 @@ setHttpTraffic <- function(value = FALSE) {
),
envir = .doAzureBatchGlobals,
packages = obj$packages,
dependsOn = tasks,
dependsOn = list(taskIdRanges = list(list(start = 1, end = length(tasks)))),
cloudCombine = cloudCombine,
outputFiles = obj$options$azure$outputFiles,
containerImage = data$containerImage
)
cat(". . .")
}

# Updating the job to terminate after all tasks are completed
rAzureBatch::updateJob(id)

if (wait) {
if (!is.null(obj$packages) ||
!is.null(githubPackages) ||
Expand All @@ -588,7 +602,7 @@ setHttpTraffic <- function(value = FALSE) {
response <-
rAzureBatch::downloadBlob(
id,
paste0("result/", id, "-merge-result.rds"),
paste0("result/", "merge-result.rds"),
sasToken = sasToken,
accountName = storageCredentials$name,
downloadPath = tempFile,
Expand Down
33 changes: 15 additions & 18 deletions R/helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@

args <- list(...)
.doAzureBatchGlobals <- args$envir
argsList <- args$args
dependsOn <- args$dependsOn
argsList <- args$args
cloudCombine <- args$cloudCombine
userOutputFiles <- args$outputFiles
containerImage <- args$containerImage

resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name

resourceFiles <- NULL
if (!is.null(argsList)) {
assign("argsList", argsList, .doAzureBatchGlobals)
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, file.path(getwd(), envFile))
file.remove(envFile)

readToken <- rAzureBatch::createSasToken("r", "c", jobId)
envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, readToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
}

# Only use the download command if cloudCombine is enabled
Expand All @@ -34,22 +44,9 @@
commands <- c(downloadCommand)
}

envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", envFile))
file.remove(envFile)

sasToken <- rAzureBatch::createSasToken("r", "c", jobId)
writeToken <- rAzureBatch::createSasToken("w", "c", jobId)

envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, sasToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))

exitConditions <- NULL
if (!is.null(args$dependsOn)) {
dependsOn <- list(taskIds = dependsOn)
dependsOn <- args$dependsOn
}
else {
exitConditions <- list(default = list(dependencyAction = "satisfy"))
Expand All @@ -59,7 +56,7 @@
rAzureBatch::createBlobUrl(
storageAccount = storageCredentials$name,
containerName = jobId,
sasToken = writeToken
sasToken = rAzureBatch::createSasToken("w", "c", jobId)
)

outputFiles <- list(
Expand Down Expand Up @@ -101,7 +98,7 @@

commands <-
c(commands,
dockerRunCommand(containerImage, rCommand, taskId))
dockerRunCommand(containerImage, rCommand))

commands <- linuxWrapCommands(commands)

Expand Down
2 changes: 1 addition & 1 deletion R/jobUtilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ waitForTasksToComplete <-
# Wait for merge task to complete
repeat {
# Verify that the merge cloud task didn't have any errors
mergeTask <- rAzureBatch::getTask(jobId, paste0(jobId, "-merge"))
mergeTask <- rAzureBatch::getTask(jobId, "merge")

# This test needs to go first as Batch service will not return an execution info as null
if (is.null(mergeTask$executionInfo$result)) {
Expand Down
13 changes: 13 additions & 0 deletions R/utility.R
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,16 @@ readMetadataBlob <- function(jobId) {
areShallowEqual <- function(a, b) {
!is.null(a) && !is.null(b) && a == b
}

hasDataSet <- function(list) {
if (length(list) > 0) {
for (arg in list[[1]]) {
# Data frames are shown as list in the foreach iterator
if (typeof(arg) == "list") {
return(TRUE)
}
}
}

return(FALSE)
}
7 changes: 4 additions & 3 deletions inst/startup/merger.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ chunkSize <- as.integer(args[2])
errorHandling <- args[3]

batchJobId <- Sys.getenv("AZ_BATCH_JOB_ID")
batchTaskId <- Sys.getenv("AZ_BATCH_TASK_ID")
batchJobPreparationDirectory <-
Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
batchTaskWorkingDirectory <- Sys.getenv("AZ_BATCH_TASK_WORKING_DIR")
Expand Down Expand Up @@ -75,14 +76,14 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {
}

for (t in 1:length(task)) {
results[count] <- task[t]
results[[count]] <- task[[t]]
count <- count + 1
}
}

saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
paste0(batchTaskId, "-result.rds")
))
}
else if (errorHandling == "pass") {
Expand Down Expand Up @@ -111,7 +112,7 @@ if (typeof(cloudCombine) == "list" && enableCloudCombine) {

saveRDS(results, file = file.path(
batchTaskWorkingDirectory,
paste0(batchJobId, "-merge-result.rds")
paste0(batchTaskId, "-result.rds")
))
}

Expand Down
15 changes: 12 additions & 3 deletions inst/startup/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
args <- commandArgs(trailingOnly = TRUE)
workerErrorStatus <- 0

startIndex <- as.integer(args[1])
endIndex <- as.integer(args[2])
isDataSet <- as.logical(as.integer(args[3]))

jobPrepDirectory <- Sys.getenv("AZ_BATCH_JOB_PREP_WORKING_DIR")
.libPaths(c(
jobPrepDirectory,
Expand Down Expand Up @@ -68,7 +72,12 @@ setwd(batchTaskWorkingDirectory)

azbatchenv <-
readRDS(paste0(batchJobPreparationDirectory, "/", batchJobEnvironment))
taskArgs <- readRDS(batchTaskEnvironment)

if (isDataSet) {
argsList <- readRDS(batchTaskEnvironment)
} else {
argsList <- azbatchenv$argsList[startIndex:endIndex]
}

for (package in azbatchenv$packages) {
library(package, character.only = TRUE)
Expand All @@ -83,7 +92,7 @@ if (!is.null(azbatchenv$inputs)) {
options("az_config" = list(container = azbatchenv$inputs))
}

result <- lapply(taskArgs, function(args) {
result <- lapply(argsList, function(args) {
tryCatch({
lapply(names(args), function(n)
assign(n, args[[n]], pos = azbatchenv$exportenv))
Expand All @@ -99,7 +108,7 @@ result <- lapply(taskArgs, function(args) {
})
})

if (!is.null(azbatchenv$gather) && length(taskArgs) > 1) {
if (!is.null(azbatchenv$gather) && length(argsList) > 1) {
result <- Reduce(azbatchenv$gather, result)
}

Expand Down
24 changes: 24 additions & 0 deletions tests/testthat/test-hasdataset.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
if (requireNamespace("nycflights13", quietly = TRUE)) {
context("hasDataSet function")

test_that("Arguments contains data set", {
byCarrierList <- split(nycflights13::flights, nycflights13::flights$carrier)
it <- iterators::iter(byCarrierList)
argsList <- as.list(it)

hasDataSet <- hasDataSet(argsList)

expect_equal(hasDataSet, TRUE)
})

test_that("Arguments does not contain data set", {
args <- seq(1:10)
it <- iterators::iter(args)
argsList <- as.list(it)

hasDataSet <- hasDataSet(argsList)

expect_equal(hasDataSet, FALSE)
})

}

0 comments on commit afde92f

Please sign in to comment.