diff --git a/R/cluster.R b/R/cluster.R index c784eadd..c052ccf2 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -73,7 +73,7 @@ generateClusterConfig <- function(fileName, ...){ #' @return The request to the Batch service was successful. #' @examples #' cluster <- makeCluster("cluster_config.json", fullName = TRUE, wait = TRUE) -makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TRUE){ +makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TRUE, timeout = 60000){ setPoolOption(fileName, fullName) config <- getOption("az_config") pool <- config$batchAccount$pool @@ -92,12 +92,8 @@ makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TR } } - response <- addPool( - pool$name, - pool$vmSize, - autoscaleFormula = getAutoscaleFormula(pool$poolSize$autoscaleFormula, pool$poolSize$minNodes, pool$poolSize$maxNodes), - maxTasksPerNode = pool$maxTasksPerNode, - raw = TRUE, + response <- .addPool( + pool = pool, packages = packages) pool <- getPool(pool$name) @@ -111,7 +107,7 @@ makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TR } else{ if(wait){ - waitForNodesToComplete(pool$id, 60000, targetDedicated = pool$targetDedicated) + waitForNodesToComplete(pool$id, timeout, targetDedicated = pool$targetDedicated) } } diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index b8508bb9..04b2d8af 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -186,7 +186,7 @@ getparentenv <- function(pkgname) { resourceFiles <- obj$options$azure$resourcefiles } - sasToken <- constructSas("2016-11-30", "r", "c", id, storageCredentials$key) + sasToken <- constructSas("r", "c", id, storageCredentials$key) requiredJobResourceFiles <- list(generateResourceFile(storageCredentials$name, id, "splitter.R", sasToken), generateResourceFile(storageCredentials$name, id, "worker.R", sasToken), generateResourceFile(storageCredentials$name, id, "merger.R", sasToken)) @@ -194,7 +194,11 @@ getparentenv <- function(pkgname) { # We need to merge any files passed by the calling lib with the resource files specified here resourceFiles <- append(resourceFiles, requiredJobResourceFiles) - response <- addJob(id, config = data$config, packages = obj$packages, resourceFiles = resourceFiles, raw = TRUE) + response <- .addJob(jobId = id, + poolId = data$config$batchAccount$pool$name, + resourceFiles = resourceFiles, + packages = obj$packages) + if(grepl("ActiveJobAndScheduleQuotaReached", response)){ jobquotaReachedResponse <- grepl("ActiveJobAndScheduleQuotaReached", response) } @@ -244,7 +248,7 @@ getparentenv <- function(pkgname) { inputs <- FALSE if(!is.null(obj$options$azure$inputs)){ storageCredentials <- getStorageCredentials() - sasToken <- constructSas("2016-11-30", "r", "c", inputs, storageCredentials$key) + sasToken <- constructSas("r", "c", inputs, storageCredentials$key) assign("inputs", list(name = storageCredentials$name, sasToken = sasToken), @@ -276,7 +280,7 @@ getparentenv <- function(pkgname) { startIndex <- startIndices[i] endIndex <- endIndices[i] - addTask(id, + .addTask(id, taskId = paste0(id, "-task", i), args = argsList[startIndex:endIndex], envir = .doAzureBatchGlobals, @@ -288,7 +292,7 @@ getparentenv <- function(pkgname) { updateJob(id) i <- length(tasks) + 1 - r <- addTaskMerge(id, + r <- .addTaskMerge(id, taskId = paste0(id, "-merge"), index = i, envir = .doAzureBatchGlobals, diff --git a/R/helpers.R b/R/helpers.R new file mode 100644 index 00000000..070984d5 --- /dev/null +++ b/R/helpers.R @@ -0,0 +1,177 @@ +.addTask <- function(jobId, taskId, ...){ + storageCredentials <- getStorageCredentials() + + args <- list(...) + .doAzureBatchGlobals <- args$envir + argsList <- args$args + packages <- args$packages + dependsOn <- args$dependsOn + + if(!is.null(argsList)){ + assign('argsList', argsList, .doAzureBatchGlobals) + } + + envFile <- paste0(taskId, ".rds") + saveRDS(.doAzureBatchGlobals, file = envFile) + uploadBlob(jobId, paste0(getwd(), "/", envFile)) + file.remove(envFile) + + sasToken <- constructSas("r", "c", jobId, storageCredentials$key) + + taskPrep <- getInstallationCommand(packages) + rCommand <- sprintf("Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/%s %s %s > %s.txt", "worker.R", "$AZ_BATCH_TASK_WORKING_DIR", envFile, taskId) + + resultFile <- paste0(taskId, "-result", ".rds") + logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt")) + autoUploadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource result/%s", storageCredentials$name, jobId, resultFile, resultFile) + downloadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include result/*.rds", storageCredentials$name, jobId, "$AZ_BATCH_TASK_WORKING_DIR") + logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt")) + + commands <- c("export PATH=/anaconda/envs/py35/bin:$PATH", downloadCommand, rCommand, logsCommand, autoUploadCommand) + if(taskPrep != ""){ + commands <- c(taskPrep, commands) + } + + sasToken <- constructSas("rwcl", "c", jobId, storageCredentials$key) + sasQuery <- generateSasUrl(sasToken) + + setting = list(name = "BLOBXFER_SASKEY", + value = sasQuery) + + resourceFiles <- list(generateResourceFile(storageCredentials$name, jobId, envFile, sasToken)) + + addTask(jobId, + taskId, + environmentSettings = list(setting), + resourceFiles = resourceFiles, + commandLine = linuxWrapCommands(commands)) +} + +.addTaskMerge <- function(jobId, taskId, ...){ + storageCredentials <- getStorageCredentials() + + args <- list(...) + .doAzureBatchGlobals <- args$envir + argsList <- args$args + packages <- args$packages + numOfTasks <- args$numOfTasks + dependsOn <- args$dependsOn + + if(!is.null(argsList)){ + assign('argsList', argsList, .doAzureBatchGlobals) + } + + envFile <- paste0(taskId, ".rds") + saveRDS(.doAzureBatchGlobals, file = envFile) + uploadBlob(jobId, paste0(getwd(), "/", envFile)) + file.remove(envFile) + + sasToken <- constructSas("r", "c", jobId, storageCredentials$key) + + taskPrep <- getInstallationCommand(packages) + rCommand <- sprintf("Rscript --vanilla --verbose $AZ_BATCH_JOB_PREP_WORKING_DIR/%s %s %s %s %s %s > %s.txt", "merger.R", "$AZ_BATCH_TASK_WORKING_DIR", envFile, length(dependsOn), jobId, numOfTasks, taskId) + + resultFile <- paste0(taskId, "-result", ".rds") + logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt")) + autoUploadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource result/%s", storageCredentials$name, jobId, resultFile, resultFile) + downloadCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include result/*.rds", storageCredentials$name, jobId, "$AZ_BATCH_TASK_WORKING_DIR") + logsCommand <- sprintf("env PATH=$PATH blobxfer %s %s %s --upload --saskey $BLOBXFER_SASKEY --remoteresource logs/%s", storageCredentials$name, jobId, paste0(taskId, ".txt"), paste0(taskId, ".txt")) + + commands <- c("export PATH=/anaconda/envs/py35/bin:$PATH", downloadCommand, rCommand, logsCommand, autoUploadCommand) + if(taskPrep != ""){ + commands <- c(taskPrep, commands) + } + + sasToken <- constructSas("rwcl", "c", jobId, storageCredentials$key) + sasQuery <- generateSasUrl(sasToken) + + setting = list(name = "BLOBXFER_SASKEY", + value = sasQuery) + + resourceFiles <- list(generateResourceFile(storageCredentials$name, jobId, envFile, sasToken)) + + addTask(jobId, + taskId, + environmentSettings = list(setting), + resourceFiles = resourceFiles, + commandLine = linuxWrapCommands(commands), + dependsOn = list(taskIds = dependsOn)) +} + +.addJob <- function(jobId, + poolId, + resourceFiles, + ...){ + args <- list(...) + packages <- args$packages + + poolInfo <- list("poolId" = poolId) + + commands <- linuxWrapCommands(c("ls")) + if(!is.null(packages)){ + commands <- paste0(commands, ";", getInstallationCommand(packages)) + } + + jobPreparationTask <- list( + commandLine = commands, + userIdentity = list( + autoUser = list( + scope = "task", + elevationLevel = "admin" + ) + ), + waitForSuccess = TRUE, + resourceFiles = resourceFiles, + constraints = list( + maxTaskRetryCount = 2 + ) + ) + + usesTaskDependencies <- TRUE + + response <- addJob(jobId, + poolInfo = poolInfo, + jobPreparationTask = jobPreparationTask, + usesTaskDependencies = usesTaskDependencies, + raw = TRUE) + + return(response) +} + +.addPool <- function(pool, packages){ + commands <- c("sed -i -e 's/Defaults requiretty.*/ #Defaults requiretty/g' /etc/sudoers", + "export PATH=/anaconda/envs/py35/bin:$PATH", + "sudo env PATH=$PATH pip install --no-dependencies blobxfer") + + commands <- paste0(linuxWrapCommands(commands), ";", packages) + + startTask <- list( + commandLine = commands, + userIdentity = list( + autoUser = list( + scope = "task", + elevationLevel = "admin" + ) + ), + waitForSuccess = TRUE + ) + + virtualMachineConfiguration <- list( + imageReference = list(publisher = "microsoft-ads", + offer = "linux-data-science-vm", + sku = "linuxdsvm", + version = "latest"), + nodeAgentSKUId ="batch.node.centos 7") + + response <- addPool(pool$name, + pool$vmSize, + startTask = startTask, + virtualMachineConfiguration = virtualMachineConfiguration, + enableAutoScale = TRUE, + autoscaleFormula = getAutoscaleFormula(pool$poolSize$autoscaleFormula, pool$poolSize$minNodes, pool$poolSize$maxNodes), + autoScaleEvaluationInterval = "PT5M", + maxTasksPerNode = pool$maxTasksPerNode, + raw = TRUE) + + return(response) +} diff --git a/R/utility.R b/R/utility.R index e10195c8..030ffdbc 100644 --- a/R/utility.R +++ b/R/utility.R @@ -26,3 +26,7 @@ getGithubInstallationCommand <- function(packages){ installation <- substr(installation, 1, nchar(installation) - 1) } + +linuxWrapCommands <- function(commands = c()){ + commandLine <- sprintf("/bin/bash -c \"set -e; set -o pipefail; %s wait\"", paste0(paste(commands, sep = " ", collapse = "; "),"; ")) +}