Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Moved all rAzureBatch logic to doAzureParallel
Browse files Browse the repository at this point in the history
  • Loading branch information
brnleehng committed Mar 23, 2017
1 parent cb4cd93 commit 34f3e6b
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 13 deletions.
12 changes: 4 additions & 8 deletions R/cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ generateClusterConfig <- function(fileName, ...){
#' @return The request to the Batch service was successful.
#' @examples
#' cluster <- makeCluster("cluster_config.json", fullName = TRUE, wait = TRUE)
makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TRUE){
makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TRUE, timeout = 60000){
setPoolOption(fileName, fullName)
config <- getOption("az_config")
pool <- config$batchAccount$pool
Expand All @@ -92,12 +92,8 @@ makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TR
}
}

response <- addPool(
pool$name,
pool$vmSize,
autoscaleFormula = getAutoscaleFormula(pool$poolSize$autoscaleFormula, pool$poolSize$minNodes, pool$poolSize$maxNodes),
maxTasksPerNode = pool$maxTasksPerNode,
raw = TRUE,
response <- .addPool(
pool = pool,
packages = packages)

pool <- getPool(pool$name)
Expand All @@ -111,7 +107,7 @@ makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TR
}
else{
if(wait){
waitForNodesToComplete(pool$id, 60000, targetDedicated = pool$targetDedicated)
waitForNodesToComplete(pool$id, timeout, targetDedicated = pool$targetDedicated)
}
}

Expand Down
14 changes: 9 additions & 5 deletions R/doAzureParallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,19 @@ getparentenv <- function(pkgname) {
resourceFiles <- obj$options$azure$resourcefiles
}

sasToken <- constructSas("2016-11-30", "r", "c", id, storageCredentials$key)
sasToken <- constructSas("r", "c", id, storageCredentials$key)
requiredJobResourceFiles <- list(generateResourceFile(storageCredentials$name, id, "splitter.R", sasToken),
generateResourceFile(storageCredentials$name, id, "worker.R", sasToken),
generateResourceFile(storageCredentials$name, id, "merger.R", sasToken))

# We need to merge any files passed by the calling lib with the resource files specified here
resourceFiles <- append(resourceFiles, requiredJobResourceFiles)

response <- addJob(id, config = data$config, packages = obj$packages, resourceFiles = resourceFiles, raw = TRUE)
response <- .addJob(jobId = id,
poolId = data$config$batchAccount$pool$name,
resourceFiles = resourceFiles,
packages = obj$packages)

if(grepl("ActiveJobAndScheduleQuotaReached", response)){
jobquotaReachedResponse <- grepl("ActiveJobAndScheduleQuotaReached", response)
}
Expand Down Expand Up @@ -244,7 +248,7 @@ getparentenv <- function(pkgname) {
inputs <- FALSE
if(!is.null(obj$options$azure$inputs)){
storageCredentials <- getStorageCredentials()
sasToken <- constructSas("2016-11-30", "r", "c", inputs, storageCredentials$key)
sasToken <- constructSas("r", "c", inputs, storageCredentials$key)

assign("inputs", list(name = storageCredentials$name,
sasToken = sasToken),
Expand Down Expand Up @@ -276,7 +280,7 @@ getparentenv <- function(pkgname) {
startIndex <- startIndices[i]
endIndex <- endIndices[i]

addTask(id,
.addTask(id,
taskId = paste0(id, "-task", i),
args = argsList[startIndex:endIndex],
envir = .doAzureBatchGlobals,
Expand All @@ -288,7 +292,7 @@ getparentenv <- function(pkgname) {
updateJob(id)

i <- length(tasks) + 1
r <- addTaskMerge(id,
r <- .addTaskMerge(id,
taskId = paste0(id, "-merge"),
index = i,
envir = .doAzureBatchGlobals,
Expand Down
177 changes: 177 additions & 0 deletions R/helpers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
.addTask <- function(jobId, taskId, ...){
storageCredentials <- getStorageCredentials()

args <- list(...)
.doAzureBatchGlobals <- args$envir
argsList <- args$args
packages <- args$packages
dependsOn <- args$dependsOn

if(!is.null(argsList)){
assign('argsList', argsList, .doAzureBatchGlobals)
}

envFile <- paste0(taskId, ".rds")
saveRDS(.doAzureBatchGlobals, file = envFile)
uploadBlob(jobId, paste0(getwd(), "/", envFile))
file.remove(envFile)

sasToken <- constructSas("r", "c", jobId, storageCredentials$key)

taskPrep <- getInstallationCommand(packages)
rCommand <- sprintf("Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/%s %s %s > %s.txt", "worker.R", "$AZ_BATCH_TASK_WORKING_DIR", envFile, taskId)

resultFile <- paste0(taskId, "-result", ".rds")
logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt"))
autoUploadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource result/%s", storageCredentials$name, jobId, resultFile, resultFile)
downloadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include result/*.rds", storageCredentials$name, jobId, "$AZ_BATCH_TASK_WORKING_DIR")
logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt"))

commands <- c("export PATH=/anaconda/envs/py35/bin:$PATH", downloadCommand, rCommand, logsCommand, autoUploadCommand)
if(taskPrep != ""){
commands <- c(taskPrep, commands)
}

sasToken <- constructSas("rwcl", "c", jobId, storageCredentials$key)
sasQuery <- generateSasUrl(sasToken)

setting = list(name = "BLOBXFER_SASKEY",
value = sasQuery)

resourceFiles <- list(generateResourceFile(storageCredentials$name, jobId, envFile, sasToken))

addTask(jobId,
taskId,
environmentSettings = list(setting),
resourceFiles = resourceFiles,
commandLine = linuxWrapCommands(commands))
}

.addTaskMerge <- function(jobId, taskId, ...){
storageCredentials <- getStorageCredentials()

args <- list(...)
.doAzureBatchGlobals <- args$envir
argsList <- args$args
packages <- args$packages
numOfTasks <- args$numOfTasks
dependsOn <- args$dependsOn

if(!is.null(argsList)){
assign('argsList', argsList, .doAzureBatchGlobals)
}

envFile <- paste0(taskId, ".rds")
saveRDS(.doAzureBatchGlobals, file = envFile)
uploadBlob(jobId, paste0(getwd(), "/", envFile))
file.remove(envFile)

sasToken <- constructSas("r", "c", jobId, storageCredentials$key)

taskPrep <- getInstallationCommand(packages)
rCommand <- sprintf("Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/%s %s %s %s %s %s > %s.txt", "merger.R", "$AZ_BATCH_TASK_WORKING_DIR", envFile, length(dependsOn), jobId, numOfTasks, taskId)

resultFile <- paste0(taskId, "-result", ".rds")
logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt"))
autoUploadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource result/%s", storageCredentials$name, jobId, resultFile, resultFile)
downloadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include result/*.rds", storageCredentials$name, jobId, "$AZ_BATCH_TASK_WORKING_DIR")
logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt"))

commands <- c("export PATH=/anaconda/envs/py35/bin:$PATH", downloadCommand, rCommand, logsCommand, autoUploadCommand)
if(taskPrep != ""){
commands <- c(taskPrep, commands)
}

sasToken <- constructSas("rwcl", "c", jobId, storageCredentials$key)
sasQuery <- generateSasUrl(sasToken)

setting = list(name = "BLOBXFER_SASKEY",
value = sasQuery)

resourceFiles <- list(generateResourceFile(storageCredentials$name, jobId, envFile, sasToken))

addTask(jobId,
taskId,
environmentSettings = list(setting),
resourceFiles = resourceFiles,
commandLine = linuxWrapCommands(commands),
dependsOn = list(taskIds = dependsOn))
}

.addJob <- function(jobId,
poolId,
resourceFiles,
...){
args <- list(...)
packages <- args$packages

poolInfo <- list("poolId" = poolId)

commands <- linuxWrapCommands(c("ls"))
if(!is.null(packages)){
commands <- paste0(commands, ";", getInstallationCommand(packages))
}

jobPreparationTask <- list(
commandLine = commands,
userIdentity = list(
autoUser = list(
scope = "task",
elevationLevel = "admin"
)
),
waitForSuccess = TRUE,
resourceFiles = resourceFiles,
constraints = list(
maxTaskRetryCount = 2
)
)

usesTaskDependencies <- TRUE

response <- addJob(jobId,
poolInfo = poolInfo,
jobPreparationTask = jobPreparationTask,
usesTaskDependencies = usesTaskDependencies,
raw = TRUE)

return(response)
}

.addPool <- function(pool, packages){
commands <- c("sed -i -e 's/Defaults requiretty.*/ #Defaults requiretty/g' /etc/sudoers",
"export PATH=/anaconda/envs/py35/bin:$PATH",
"sudo env PATH=$PATH pip install --no-dependencies blobxfer")

commands <- paste0(linuxWrapCommands(commands), ";", packages)

startTask <- list(
commandLine = commands,
userIdentity = list(
autoUser = list(
scope = "task",
elevationLevel = "admin"
)
),
waitForSuccess = TRUE
)

virtualMachineConfiguration <- list(
imageReference = list(publisher = "microsoft-ads",
offer = "linux-data-science-vm",
sku = "linuxdsvm",
version = "latest"),
nodeAgentSKUId ="batch.node.centos 7")

response <- addPool(pool$name,
pool$vmSize,
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = TRUE,
autoscaleFormula = getAutoscaleFormula(pool$poolSize$autoscaleFormula, pool$poolSize$minNodes, pool$poolSize$maxNodes),
autoScaleEvaluationInterval = "PT5M",
maxTasksPerNode = pool$maxTasksPerNode,
raw = TRUE)

return(response)
}
4 changes: 4 additions & 0 deletions R/utility.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ getGithubInstallationCommand <- function(packages){

installation <- substr(installation, 1, nchar(installation) - 1)
}

linuxWrapCommands <- function(commands = c()){
commandLine <- sprintf("/bin/bash -c \"set -e; set -o pipefail; %s wait\"", paste0(paste(commands, sep = " ", collapse = "; "),"; "))
}

0 comments on commit 34f3e6b

Please sign in to comment.