diff --git a/.lintr b/.lintr new file mode 100644 index 00000000..74ebe133 --- /dev/null +++ b/.lintr @@ -0,0 +1 @@ +exclusions: list("R/validators.R") diff --git a/CHANGELOG.md b/CHANGELOG.md index efce86ec..a3ff1801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,14 @@ # Change Log +## [0.5.1] 2017-09-28 +### Added +- Support for users to get job and job results for long running job +### Changed +- [BREAKING CHANGE] Update get job list to take state filter and return job status in a data frame + ## [0.4.3] 2017-09-28 ### Fixed - Allow merge task to run on task failures + ## [0.4.2] 2017-09-08 ### Added - Support for users to get files from nodes and tasks diff --git a/DESCRIPTION b/DESCRIPTION index 53bd6657..7e843fb2 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: doAzureParallel Type: Package Title: doAzureParallel -Version: 0.4.3 +Version: 0.5.0 Author: Brian Hoang Maintainer: Brian Hoang Description: The project is for data experts who use R at scale. The project @@ -19,7 +19,8 @@ Imports: rAzureBatch (>= 0.5.1), jsonlite, rjson, - xml2 + xml2, + R6 Suggests: testthat, caret, diff --git a/NAMESPACE b/NAMESPACE index 7bea6c23..f10cbb06 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -6,6 +6,7 @@ export(deleteStorageFile) export(generateClusterConfig) export(generateCredentialsConfig) export(getClusterFile) +export(getJob) export(getJobFile) export(getJobList) export(getJobResult) diff --git a/R/cluster.R b/R/cluster.R index ac38e940..bc2e54ea 100644 --- a/R/cluster.R +++ b/R/cluster.R @@ -196,6 +196,14 @@ makeCluster <- validateClusterConfig(clusterSetting) } + tryCatch({ + `Validators`$isValidPoolName(poolConfig$name) + }, + error = function(e){ + stop(paste("Invalid pool name: \n", + e)) + }) + response <- .addPool( pool = poolConfig, packages = packages, @@ -253,7 +261,7 @@ makeCluster <- clusterNodeMismatchWarning <- paste( - "There is a mismatched between the projected cluster %s", + "There is a mismatched between the requested cluster %s", "nodes min/max '%s'/'%s' and the existing cluster %s nodes '%s'.", "Use the 'resizeCluster' function to get the correct amount", "of workers." diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index b42e719a..90d1a200 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -195,14 +195,22 @@ setHttpTraffic <- function(value = FALSE) { assign("packages", obj$packages, .doAzureBatchGlobals) assign("pkgName", pkgName, .doAzureBatchGlobals) - time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT") - id <- sprintf("%s%s", - "job", - time) - if (!is.null(obj$options$azure$job)) { id <- obj$options$azure$job } + else { + time <- format(Sys.time(), "%Y%m%d%H%M%S", tz = "GMT") + id <- sprintf("%s%s", "job", time) + } + + tryCatch({ + `Validators`$isValidStorageContainerName(id) + `Validators`$isValidJobName(id) + }, + error = function(e){ + stop(paste("Invalid job name: \n", + e)) + }) wait <- TRUE if (!is.null(obj$options$azure$wait)) { @@ -321,13 +329,49 @@ setHttpTraffic <- function(value = FALSE) { ) # We need to merge any files passed by the calling lib with the resource files specified here + resourceFiles <- append(resourceFiles, requiredJobResourceFiles) + enableCloudCombineKeyValuePair <- + list(name = "enableCloudCombine", value = as.character(enableCloudCombine)) + + chunkSize <- 1 + + if (!is.null(obj$options$azure$chunkSize)) { + chunkSize <- obj$options$azure$chunkSize + } + + if (!is.null(obj$options$azure$chunksize)) { + chunkSize <- obj$options$azure$chunksize + } + + if (exists("chunkSize", envir = .doAzureBatchGlobals)) { + chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) + } + + chunkSizeKeyValuePair <- + list(name = "chunkSize", value = as.character(chunkSize)) + + if (is.null(obj$packages)) { + metadata <- + list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) + } else { + packagesKeyValuePair <- + list(name = "packages", + value = paste(obj$packages, collapse = ";")) + + metadata <- + list(enableCloudCombineKeyValuePair, + chunkSizeKeyValuePair, + packagesKeyValuePair) + } + response <- .addJob( jobId = id, poolId = data$poolId, resourceFiles = resourceFiles, + metadata = metadata, packages = obj$packages ) @@ -376,20 +420,6 @@ setHttpTraffic <- function(value = FALSE) { job <- rAzureBatch::getJob(id) cat(sprintf("Id: %s", job$id), fill = TRUE) - chunkSize <- 1 - - if (!is.null(obj$options$azure$chunkSize)) { - chunkSize <- obj$options$azure$chunkSize - } - - if (!is.null(obj$options$azure$chunksize)) { - chunkSize <- obj$options$azure$chunksize - } - - if (exists("chunkSize", envir = .doAzureBatchGlobals)) { - chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) - } - ntasks <- length(argsList) startIndices <- seq(1, length(argsList), chunkSize) diff --git a/R/helpers.R b/R/helpers.R index 5655ed45..4db44122 100644 --- a/R/helpers.R +++ b/R/helpers.R @@ -138,6 +138,7 @@ .addJob <- function(jobId, poolId, resourceFiles, + metadata, ...) { args <- list(...) packages <- args$packages @@ -168,7 +169,8 @@ poolInfo = poolInfo, jobPreparationTask = jobPreparationTask, usesTaskDependencies = usesTaskDependencies, - content = "text" + content = "text", + metadata = metadata ) return(response) diff --git a/R/utility.R b/R/utility.R index 42948d81..7a106578 100644 --- a/R/utility.R +++ b/R/utility.R @@ -49,64 +49,150 @@ linuxWrapCommands <- function(commands = c()) { commandLine } -#' Get a list of job statuses from the given job ids +#' Get a list of job statuses from the given filter #' -#' @param jobIds A character vector of job ids +#' @param filter A filter containing job state #' #' @examples #' \dontrun{ -#' getJobList(c("job-001", "job-002")) +#' getJobList() #' } #' @export -getJobList <- function(jobIds = c()) { - filter <- "" +getJobList <- function(filter = NULL) { + filterClause <- "" + + if (!is.null(filter)) { + if (!is.null(filter$state)) { + for (i in 1:length(filter$state)) { + filterClause <- + paste0(filterClause, + sprintf("state eq '%s'", filter$state[i]), + " or ") + } - if (length(jobIds) > 1) { - for (i in 1:length(jobIds)) { - filter <- paste0(filter, sprintf("id eq '%s'", jobIds[i]), " or ") + filterClause <- + substr(filterClause, 1, nchar(filterClause) - 3) } - - filter <- substr(filter, 1, nchar(filter) - 3) } jobs <- - rAzureBatch::listJobs(query = list("$filter" = filter, "$select" = "id,state")) - print("Job List: ") - - for (j in 1:length(jobs$value)) { - tasks <- rAzureBatch::listTask(jobs$value[[j]]$id) - count <- 0 - if (length(tasks$value) > 0) { - taskStates <- - lapply(tasks$value, function(x) - x$state == "completed") - - for (i in 1:length(taskStates)) { - if (taskStates[[i]] == TRUE) { - count <- count + 1 - } - } + rAzureBatch::listJobs(query = list("$filter" = filterClause, "$select" = "id,state")) - summary <- - sprintf( - "[ id: %s, state: %s, status: %d", - jobs$value[[j]]$id, - jobs$value[[j]]$state, - ceiling(count / length(tasks$value) * 100) - ) - print(paste0(summary, "% ]")) + id <- character(length(jobs$value)) + state <- character(length(jobs$value)) + status <- character(length(jobs$value)) + failedTasks <- integer(length(jobs$value)) + totalTasks <- integer(length(jobs$value)) + + if (length(jobs$value) > 0) { + if (is.null(jobs$value[[1]]$id)) { + stop(jobs$value) } - else { - print( - sprintf( - "[ id: %s, state: %s, status: %s ]", - jobs$value[[j]]$id, - jobs$value[[j]]$state, - "No tasks were run." - ) - ) + for (j in 1:length(jobs$value)) { + id[j] <- jobs$value[[j]]$id + state[j] <- jobs$value[[j]]$state + taskCounts <- + rAzureBatch::getJobTaskCounts(jobId = jobs$value[[j]]$id) + failedTasks[j] <- + as.integer(taskCounts$failed) + totalTasks[j] <- + as.integer(taskCounts$active + taskCounts$running + taskCounts$completed) + + completed <- as.integer(taskCounts$completed) + + if (totalTasks[j] > 0) { + status[j] <- + sprintf("%s %%", ceiling(completed / totalTasks[j] * 100)) + } + else { + status[j] <- "No tasks in the job" + } } } + + return ( + data.frame( + Id = id, + State = state, + Status = status, + FailedTasks = failedTasks, + TotalTasks = totalTasks + ) + ) +} + +#' Get a job for the given job id +#' +#' @param jobId A job id +#' @param verbose show verbose log output +#' +#' @examples +#' \dontrun{ +#' getJob("job-001", FALSE) +#' } +#' @export +getJob <- function(jobId, verbose = TRUE) { + if (is.null(jobId)) { + stop("must specify the jobId parameter") + } + + job <- rAzureBatch::getJob(jobId = jobId) + + metadata <- + list( + chunkSize = 1, + enableCloudCombine = "TRUE", + packages = "" + ) + + if (!is.null(job$metadata)) { + for (i in 1:length(job$metadata)) { + metadata[[job$metadata[[i]]$name]] <- job$metadata[[i]]$value + } + } + + if (verbose == TRUE) { + cat(sprintf("Job Id: %s", job$id), fill = TRUE) + cat("\njob metadata:", fill = TRUE) + cat(sprintf("\tchunkSize: %s", metadata$chunkSize), + fill = TRUE) + cat(sprintf("\tenableCloudCombine: %s", metadata$enableCloudCombine), + fill = TRUE) + cat(sprintf("\tpackages: %s", metadata$packages), + fill = TRUE) + } + + taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId) + + tasks <- list( + active = taskCounts$active, + running = taskCounts$running, + completed = taskCounts$completed, + succeeded = taskCounts$succeeded, + failed = taskCounts$failed + ) + + if (verbose == TRUE) { + cat("\ntasks:", fill = TRUE) + cat(sprintf("\tactive: %s", taskCounts$active), fill = TRUE) + cat(sprintf("\trunning: %s", taskCounts$running), fill = TRUE) + cat(sprintf("\tcompleted: %s", taskCounts$completed), fill = TRUE) + cat(sprintf("\t\tsucceeded: %s", taskCounts$succeeded), fill = TRUE) + cat(sprintf("\t\tfailed: %s", taskCounts$failed), fill = TRUE) + cat( + sprintf( + "\ttotal: %s", + taskCounts$active + taskCounts$running + taskCounts$completed + ), + fill = TRUE + ) + } + + jobObj <- list(jobId = job$id, + metadata = metadata, + tasks = tasks) + + return(jobObj) } #' Polling method to check status of cluster boot up @@ -240,10 +326,6 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { } #' Download the results of the job -#' @param ... Further named parameters -#' \itemize{ -#' \item{"container"}: {The container to download from.} -#' } #' @param jobId The jobId to download from #' #' @return The results from the job. @@ -252,142 +334,27 @@ waitForNodesToComplete <- function(poolId, timeout = 86400) { #' getJobResult(jobId = "job-001") #' } #' @export -getJobResult <- function(jobId = "", ...) { - args <- list(...) +getJobResult <- function(jobId) { + cat("Getting job results...", fill = TRUE) - if (!is.null(args$container)) { - results <- - rAzureBatch::downloadBlob(args$container, - paste0("result/", jobId, "-merge-result.rds")) - } - else{ - results <- - rAzureBatch::downloadBlob(jobId, paste0("result/", jobId, "-merge-result.rds")) - } - - return(results) -} - -validateClusterConfig <- function(clusterFilePath) { - if (file.exists(clusterFilePath)) { - pool <- rjson::fromJSON(file = clusterFilePath) - } - else{ - pool <- rjson::fromJSON(file = file.path(getwd(), clusterFilePath)) + if (nchar(jobId) < 3) { + stop("jobId must contain at least 3 characters.") } - if (is.null(pool$poolSize)) { - stop("Missing poolSize entry") - } - - if (is.null(pool$poolSize$dedicatedNodes)) { - stop("Missing dedicatedNodes entry") - } + tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds") - if (is.null(pool$poolSize$lowPriorityNodes)) { - stop("Missing lowPriorityNodes entry") - } - - if (is.null(pool$poolSize$autoscaleFormula)) { - stop("Missing autoscaleFormula entry") - } - - if (is.null(pool$poolSize$dedicatedNodes$min)) { - stop("Missing dedicatedNodes$min entry") - } - - if (is.null(pool$poolSize$dedicatedNodes$max)) { - stop("Missing dedicatedNodes$max entry") - } - - if (is.null(pool$poolSize$lowPriorityNodes$min)) { - stop("Missing lowPriorityNodes$min entry") - } - - if (is.null(pool$poolSize$lowPriorityNodes$max)) { - stop("Missing lowPriorityNodes$max entry") - } - - stopifnot(is.character(pool$name)) - stopifnot(is.character(pool$vmSize)) - stopifnot(is.character(pool$poolSize$autoscaleFormula)) - stopifnot(pool$poolSize$autoscaleFormula %in% names(autoscaleFormula)) - - stopifnot(pool$poolSize$dedicatedNodes$min <= pool$poolSize$dedicatedNodes$max) - stopifnot(pool$poolSize$lowPriorityNodes$min <= pool$poolSize$lowPriorityNodes$max) - stopifnot(pool$maxTasksPerNode >= 1) - - stopifnot(is.double(pool$poolSize$dedicatedNodes$min)) - stopifnot(is.double(pool$poolSize$dedicatedNodes$max)) - stopifnot(is.double(pool$poolSize$lowPriorityNodes$min)) - stopifnot(is.double(pool$poolSize$lowPriorityNodes$max)) - stopifnot(is.double(pool$maxTasksPerNode)) - - TRUE -} - -# Validating cluster configuration files below doAzureParallel version 0.3.2 -validateDeprecatedClusterConfig <- function(clusterFilePath) { - if (file.exists(clusterFilePath)) { - poolConfig <- rjson::fromJSON(file = clusterFilePath) - } - else{ - poolConfig <- - rjson::fromJSON(file = file.path(getwd(), clusterFilePath)) - } - - if (is.null(poolConfig$pool$poolSize)) { - stop("Missing poolSize entry") - } - - if (is.null(poolConfig$pool$poolSize$dedicatedNodes)) { - stop("Missing dedicatedNodes entry") - } - - if (is.null(poolConfig$pool$poolSize$lowPriorityNodes)) { - stop("Missing lowPriorityNodes entry") - } - - if (is.null(poolConfig$pool$poolSize$autoscaleFormula)) { - stop("Missing autoscaleFormula entry") - } - - if (is.null(poolConfig$pool$poolSize$dedicatedNodes$min)) { - stop("Missing dedicatedNodes$min entry") - } - - if (is.null(poolConfig$pool$poolSize$dedicatedNodes$max)) { - stop("Missing dedicatedNodes$max entry") - } - - if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$min)) { - stop("Missing lowPriorityNodes$min entry") - } - - if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$max)) { - stop("Missing lowPriorityNodes$max entry") - } - - stopifnot(is.character(poolConfig$pool$name)) - stopifnot(is.character(poolConfig$pool$vmSize)) - stopifnot(is.character(poolConfig$pool$poolSize$autoscaleFormula)) - stopifnot(poolConfig$pool$poolSize$autoscaleFormula %in% names(autoscaleFormula)) - - stopifnot( - poolConfig$pool$poolSize$dedicatedNodes$min <= poolConfig$pool$poolSize$dedicatedNodes$max + results <- rAzureBatch::downloadBlob( + jobId, + paste0("result/", jobId, "-merge-result.rds"), + downloadPath = tempFile, + overwrite = TRUE ) - stopifnot( - poolConfig$pool$poolSize$lowPriorityNodes$min <= poolConfig$pool$poolSize$lowPriorityNodes$max - ) - stopifnot(poolConfig$pool$maxTasksPerNode >= 1) - stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$min)) - stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$max)) - stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$min)) - stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$max)) - stopifnot(is.double(poolConfig$pool$maxTasksPerNode)) + if (is.vector(results)) { + results <- readRDS(tempFile) + } - TRUE + return(results) } #' Utility function for creating an output file @@ -438,7 +405,6 @@ createOutputFile <- function(filePattern, url) { output } - #' Wait for current tasks to complete #' #' @export @@ -499,10 +465,9 @@ waitForTasksToComplete <- tasksFailureWarningLabel <- sprintf(paste("%i task(s) failed while running the job.", - "This caused the job to terminate automatically.", - "To disable this behavior and continue on failure, set .errorHandling='remove | pass'", - "in the foreach loop\n"), taskCounts$failed) - + "This caused the job to terminate automatically.", + "To disable this behavior and continue on failure, set .errorHandling='remove | pass'", + "in the foreach loop\n"), taskCounts$failed) for (i in 1:length(failedTasks$value)) { if (failedTasks$value[[i]]$executionInfo$result == "Failure") { @@ -520,11 +485,11 @@ waitForTasksToComplete <- stop(sprintf( paste("Errors have occurred while running the job '%s'.", - "Error handling is set to 'stop' and has proceeded to terminate the job.", - "The user will have to handle deleting the job.", - "If this is not the correct behavior, change the errorHandling property to 'pass'", - " or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.", - "For more information about getting job logs, follow this link:", + "Error handling is set to 'stop' and has proceeded to terminate the job.", + "The user will have to handle deleting the job.", + "If this is not the correct behavior, change the errorHandling property to 'pass'", + " or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.", + "For more information about getting job logs, follow this link:", paste0("https://github.com/Azure/doAzureParallel/blob/master/docs/", "40-troubleshooting.md#viewing-files-directly-from-compute-node")), jobId @@ -533,11 +498,11 @@ waitForTasksToComplete <- if (Sys.time() > timeToTimeout) { stop(sprintf(paste("Timeout has occurred while waiting for tasks to complete.", - "Users will have to manually track the job '%s' and get the results.", - "Use the getJobResults function to obtain the results and getJobList for", - "tracking job status. To change the timeout, set 'timeout' property in the", + "Users will have to manually track the job '%s' and get the results.", + "Use the getJobResults function to obtain the results and getJobList for", + "tracking job status. To change the timeout, set 'timeout' property in the", "foreach's options.azure.")), - jobId) + jobId) } if (taskCounts$completed >= totalTasks && diff --git a/R/validationUtilities.R b/R/validationUtilities.R new file mode 100644 index 00000000..cd7714fa --- /dev/null +++ b/R/validationUtilities.R @@ -0,0 +1,121 @@ +validateClusterConfig <- function(clusterFilePath) { + if (file.exists(clusterFilePath)) { + pool <- rjson::fromJSON(file = clusterFilePath) + } + else{ + pool <- rjson::fromJSON(file = file.path(getwd(), clusterFilePath)) + } + + if (is.null(pool$poolSize)) { + stop("Missing poolSize entry") + } + + if (is.null(pool$poolSize$dedicatedNodes)) { + stop("Missing dedicatedNodes entry") + } + + if (is.null(pool$poolSize$lowPriorityNodes)) { + stop("Missing lowPriorityNodes entry") + } + + if (is.null(pool$poolSize$autoscaleFormula)) { + stop("Missing autoscaleFormula entry") + } + + if (is.null(pool$poolSize$dedicatedNodes$min)) { + stop("Missing dedicatedNodes$min entry") + } + + if (is.null(pool$poolSize$dedicatedNodes$max)) { + stop("Missing dedicatedNodes$max entry") + } + + if (is.null(pool$poolSize$lowPriorityNodes$min)) { + stop("Missing lowPriorityNodes$min entry") + } + + if (is.null(pool$poolSize$lowPriorityNodes$max)) { + stop("Missing lowPriorityNodes$max entry") + } + + stopifnot(is.character(pool$name)) + stopifnot(is.character(pool$vmSize)) + stopifnot(is.character(pool$poolSize$autoscaleFormula)) + stopifnot(pool$poolSize$autoscaleFormula %in% names(autoscaleFormula)) + + stopifnot(pool$poolSize$dedicatedNodes$min <= pool$poolSize$dedicatedNodes$max) + stopifnot(pool$poolSize$lowPriorityNodes$min <= pool$poolSize$lowPriorityNodes$max) + stopifnot(pool$maxTasksPerNode >= 1) + + stopifnot(is.double(pool$poolSize$dedicatedNodes$min)) + stopifnot(is.double(pool$poolSize$dedicatedNodes$max)) + stopifnot(is.double(pool$poolSize$lowPriorityNodes$min)) + stopifnot(is.double(pool$poolSize$lowPriorityNodes$max)) + stopifnot(is.double(pool$maxTasksPerNode)) + + TRUE +} + +# Validating cluster configuration files below doAzureParallel version 0.3.2 +validateDeprecatedClusterConfig <- function(clusterFilePath) { + if (file.exists(clusterFilePath)) { + poolConfig <- rjson::fromJSON(file = clusterFilePath) + } + else{ + poolConfig <- + rjson::fromJSON(file = file.path(getwd(), clusterFilePath)) + } + + if (is.null(poolConfig$pool$poolSize)) { + stop("Missing poolSize entry") + } + + if (is.null(poolConfig$pool$poolSize$dedicatedNodes)) { + stop("Missing dedicatedNodes entry") + } + + if (is.null(poolConfig$pool$poolSize$lowPriorityNodes)) { + stop("Missing lowPriorityNodes entry") + } + + if (is.null(poolConfig$pool$poolSize$autoscaleFormula)) { + stop("Missing autoscaleFormula entry") + } + + if (is.null(poolConfig$pool$poolSize$dedicatedNodes$min)) { + stop("Missing dedicatedNodes$min entry") + } + + if (is.null(poolConfig$pool$poolSize$dedicatedNodes$max)) { + stop("Missing dedicatedNodes$max entry") + } + + if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$min)) { + stop("Missing lowPriorityNodes$min entry") + } + + if (is.null(poolConfig$pool$poolSize$lowPriorityNodes$max)) { + stop("Missing lowPriorityNodes$max entry") + } + + stopifnot(is.character(poolConfig$pool$name)) + stopifnot(is.character(poolConfig$pool$vmSize)) + stopifnot(is.character(poolConfig$pool$poolSize$autoscaleFormula)) + stopifnot(poolConfig$pool$poolSize$autoscaleFormula %in% names(autoscaleFormula)) + + stopifnot( + poolConfig$pool$poolSize$dedicatedNodes$min <= poolConfig$pool$poolSize$dedicatedNodes$max + ) + stopifnot( + poolConfig$pool$poolSize$lowPriorityNodes$min <= poolConfig$pool$poolSize$lowPriorityNodes$max + ) + stopifnot(poolConfig$pool$maxTasksPerNode >= 1) + + stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$min)) + stopifnot(is.double(poolConfig$pool$poolSize$dedicatedNodes$max)) + stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$min)) + stopifnot(is.double(poolConfig$pool$poolSize$lowPriorityNodes$max)) + stopifnot(is.double(poolConfig$pool$maxTasksPerNode)) + + TRUE +} diff --git a/R/validators.R b/R/validators.R new file mode 100644 index 00000000..fccea662 --- /dev/null +++ b/R/validators.R @@ -0,0 +1,28 @@ +Validators <- R6::R6Class( + "Validators", + lock_objects = TRUE, + public = list( + isValidStorageContainerName = function(storageContainerName) { + if (!grepl("^([a-z]|[0-9]|[-]){3,64}$", storageContainerName)) { + stop(paste("Storage Container names can contain only lowercase letters, numbers,", + "and the dash (-) character. Names must be 3 through 64 characters long.")) + } + }, + isValidPoolName = function(poolName) { + if (!grepl("^([a-zA-Z0-9]|[-]|[_]){1,64}$", poolName)) { + stop(paste("The pool name can contain any combination of alphanumeric characters", + "including hyphens and underscores, and cannot contain more", + "than 64 characters.")) + } + }, + isValidJobName = function(jobName) { + if (!grepl("^([a-zA-Z0-9]|[-]|[_]){1,64}$", jobName)) { + stop(paste("The job name can contain any combination of alphanumeric characters", + "including hyphens and underscores, and cannot contain more", + "than 64 characters.")) + } + } + ) +) + +`Validators` <- Validators$new() diff --git a/README.md b/README.md index e939ac6d..71898ad6 100644 --- a/README.md +++ b/README.md @@ -1,386 +1,391 @@ -[![Build Status](https://travis-ci.org/Azure/doAzureParallel.svg?branch=master)](https://travis-ci.org/Azure/doAzureParallel) -# doAzureParallel - -```R -# set your credentials -setCredentials("credentials.json") - -# setup your cluster with a simple config file -cluster<- makeCluster("cluster.json") - -# register the cluster as your parallel backend -registerDoAzureParallel(cluster) - -# run your foreach loop on a distributed cluster in Azure -number_of_iterations <- 10 -results <- foreach(i = 1:number_of_iterations) %dopar% { - myParallelAlgorithm() -} -``` - -## Introduction - -The *doAzureParallel* package is a parallel backend for the widely popular *foreach* package. With *doAzureParallel*, each iteration of the *foreach* loop runs in parallel on an Azure Virtual Machine (VM), allowing users to scale up their R jobs to tens or hundreds of machines. - -*doAzureParallel* is built to support the *foreach* parallel computing package. The *foreach* package supports parallel execution - it can execute multiple processes across some parallel backend. With just a few lines of code, the *doAzureParallel* package helps create a cluster in Azure, register it as a parallel backend, and seamlessly connects to the *foreach* package. - -NOTE: The terms *pool* and *cluster* are used interchangably throughout this document. - -## Dependencies - -- R (>= 3.3.1) -- httr (>= 1.2.1) -- rjson (>= 0.2.15) -- RCurl (>= 1.95-4.8) -- digest (>= 0.6.9) -- foreach (>= 1.4.3) -- iterators (>= 1.0.8) -- bitops (>= 1.0.5) - -## Installation - -Install doAzureParallel directly from Github. - -```R -# install the package devtools -install.packages("devtools") -library(devtools) - -# install the doAzureParallel and rAzureBatch package -install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel")) -``` - -## Azure Requirements - -To run your R code across a cluster in Azure, we'll need to get keys and account information. - -### Setup Azure Account -First, set up your Azure Account ([Get started for free!](https://azure.microsoft.com/en-us/free/)) - -Once you have an Azure account, you'll need to create the following two services in the Azure portal: -- Azure Batch Account ([Create an Azure Batch Account in the Portal](https://docs.microsoft.com/en-us/azure/Batch/batch-account-create-portal)) -- Azure Storage Account (this can be created with the Batch Account) - -### Get Keys and Account Information -For your Azure Batch Account, we need to get: -- Batch Account Name -- Batch Account URL -- Batch Account Access Key - -This information can be found in the Azure Portal inside your Batch Account: - -![Azure Batch Acccount in the Portal](./vignettes/doAzureParallel-azurebatch-instructions.PNG "Azure Batch Acccount in the Portal") - -For your Azure Storage Account, we need to get: -- Storage Account Name -- Storage Account Access Key - -This information can be found in the Azure Portal inside your Azure Storage Account: - -![Azure Storage Acccount in the Portal](./vignettes/doAzureParallel-azurestorage-instructions.PNG "Azure Storage Acccount in the Portal") - -Keep track of the above keys and account information as it will be used to connect your R session with Azure. - -## Getting Started - -Import the package -```R -library(doAzureParallel) -``` - -Set up your parallel backend with Azure. This is your set of Azure VMs. -```R -# 1. Generate your credential and cluster configuration files. -generateClusterConfig("cluster.json") -generateCredentialsConfig("credentials.json") - -# 2. Fill out your credential config and cluster config files. -# Enter your Azure Batch Account & Azure Storage keys/account-info into your credential config ("credentials.json") and configure your cluster in your cluster config ("cluster.json") - -# 3. Set your credentials - you need to give the R session your credentials to interact with Azure -setCredentials("credentials.json") - -# 4. Register the pool. This will create a new pool if your pool hasn't already been provisioned. -cluster <- makeCluster("cluster.json") - -# 5. Register the pool as your parallel backend -registerDoAzureParallel(cluster) - -# 6. Check that your parallel backend has been registered -getDoParWorkers() -``` - -Run your parallel *foreach* loop with the *%dopar%* keyword. The *foreach* function will return the results of your parallel code. - -```R -number_of_iterations <- 10 -results <- foreach(i = 1:number_of_iterations) %dopar% { - # This code is executed, in parallel, across your cluster. - myAlgorithm() -} -``` - -After you finish running your R code in Azure, you may want to shut down your cluster of VMs to make sure that you are not being charged anymore. - -```R -# shut down your pool -stopCluster(cluster) -``` - -### Configuration JSON files - -#### Credentials -Use your credential config JSON file to enter your credentials. - -```javascript -{ - "batchAccount": { - "name": , - "key": , - "url": - }, - "storageAccount": { - "name": , - "key": - } -} -``` -Learn more: - - [Batch account / Storage account](./README.md#azure-requirements) - - -#### Cluster Settings -Use your pool configuration JSON file to define your pool in Azure. - -```javascript -{ - "name": , // example: "myazurecluster" - "vmSize": , // example: "Standard_F2" - "maxTasksPerNode": , // example: "2" - "poolSize": { - "dedicatedNodes": { // dedicated vms - "min": 2, - "max": 2 - }, - "lowPriorityNodes": { // low priority vms - "min": 1, - "max": 10 - }, - "autoscaleFormula": "QUEUE" - }, - "rPackages": { - "cran": ["some_cran_package", "some_other_cran_package"], - "github": ["username/some_github_package", "another_username/some_other_github_package"], - "githubAuthenticationToken": {} - }, - "commandLine": [] -} -``` -NOTE: If you do **not** want your cluster to autoscale, simply set the number of min nodes equal to max nodes for low-priority and dedicated. - -Learn more: - - [Choosing VM size](./docs/10-vm-sizes.md#vm-size-table) - - [MaxTasksPerNode](./docs/22-parallelizing-cores.md) - - [LowPriorityNodes](#low-priority-vms) - - [Autoscale](./docs/11-autoscale.md) - - [PoolSize Limitations](./docs/12-quota-limitations.md) - - [rPackages](./docs/20-package-management.md) - -### Low Priority VMs -Low-priority VMs are a way to obtain and consume Azure compute at a much lower price using Azure Batch. Since doAzureParallel is built on top of Azure Batch, this package is able to take advantage of low-priority VMs and allocate compute resources from Azure's surplus capacity at up to **80% discount**. - -Low-priority VMs come with the understanding that when you request it, there is the possibility that we'll need to take some or all of it back. Hence the name *low-priority* - VMs may not be allocated or may be preempted due to higher priority allocations, which equate to full-priced VMs that have an SLA. - -And as the name suggests, this significant cost reduction is ideal for *low priority* workloads that do not have a strict performance requirement. - -With Azure Batch's first-class support for low-priority VMs, you can use them in conjunction with normal on-demand VMs (*dedicated VMs*) and enable job cost to be balanced with job execution flexibility: - - * Batch pools can contain both on-demand nodes and low-priority nodes. The two types can be independently scaled, either explicitly with the resize operation or automatically using auto-scale. Different configurations can be used, such as maximizing cost savings by always using low-priority nodes or spinning up on-demand nodes at full price, to maintain capacity by replacing any preempted low-priority nodes. - * If any low-priority nodes are preempted, then Batch will automatically attempt to replace the lost capacity, continually seeking to maintain the target amount of low-priority capacity in the pool. - * If tasks are interrupted when the node on which it is running is preempted, then the tasks are automatically re-queued to be re-run. - -For more information about low-priority VMs, please visit the [documentation](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms). - -You can also check out information on low-priority pricing [here](https://azure.microsoft.com/en-us/pricing/details/batch/). - -### Distributing Data -When developing at scale, you may also want to chunk up your data and distribute the data across your nodes. Learn more about that [here](./docs/21-distributing-data.md#chunking-data) - -### Using %do% vs %dopar% -When developing at scale, it is always recommended that you test and debug your code locally first. Switch between *%dopar%* and *%do%* to toggle between running in parallel on Azure and running in sequence on your local machine. - -```R -# run your code sequentially on your local machine -results <- foreach(i = 1:number_of_iterations) %do% { ... } - -# use the doAzureParallel backend to run your code in parallel across your Azure cluster -results <- foreach(i = 1:number_of_iterations) %dopar% { ... } -``` - -### Error Handling -The errorhandling option specifies how failed tasks should be evaluated. By default, the error handling is 'stop' to ensure users' can have reproducible results. If a combine function is assigned, it must be able to handle error objects. - -Error Handling Type | Description ---- | --- -stop | The execution of the foreach will stop if an error occurs -pass | The error object of the task is included the results -remove | The result of a failed task will not be returned - -```R -# Remove R error objects from the results -res <- foreach::foreach(i = 1:4, .errorhandling = "remove") %dopar% { - if (i == 2 || i == 4) { - randomObject - } - - mean(1:3) -} - -#> res -#[[1]] -#[1] 2 -# -#[[2]] -#[1] 2 -``` - -```R -# Passing R error objects into the results -res <- foreach::foreach(i = 1:4, .errorhandling = "pass") %dopar% { - if (i == 2|| i == 4) { - randomObject - } - - sum(i, 1) -} - -#> res -#[[1]] -#[1] 2 -# -#[[2]] -# -# -#[[3]] -#[1] 4 -# -#[[4]] -# -``` - -### Long-running Jobs + Job Management - -doAzureParallel also helps you manage your jobs so that you can run many jobs at once while managing it through a few simple methods. - - -```R -# List your jobs: -getJobList() -``` - -This will also let you run *long running jobs* easily. - -With long running jobs, you will need to keep track of your jobs as well as set your job to a non-blocking state. You can do this with the *.options.azure* options: - -```R -# set the .options.azure option in the foreach loop -opt <- list(job = 'unique_job_id', wait = FALSE) - -# NOTE - if the option wait = FALSE, foreach will return your unique job id -job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { ... } - -# get back your job results with your unique job id -results <- getJobResult(job_id) -``` - -Finally, you may also want to track the status of jobs that you've name: - -```R -# List specific jobs: -getJobList(c('unique_job_id', 'another_job_id')) -``` - -You can learn more about how to execute long-running jobs [here](./docs/23-persistent-storage.md). - -With long-running jobs, you can take advantage of Azure's autoscaling capabilities to save time and/or money. Learn more about autoscale [here](./docs/11-autoscale.md). - -### Using the 'chunkSize' option - -doAzureParallel also supports custom chunk sizes. This option allows you to group iterations of the foreach loop together and execute them in a single R session. - -```R -# set the chunkSize option -opt <- list(chunkSize = 3) -results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } -``` - -You should consider using the chunkSize if each iteration in the loop executes very quickly. - -If you have a static cluster and want to have a single chunk for each worker, you can compute the chunkSize as follows: - -```R -# compute the chunk size -cs <- ceiling(number_of_iterations / getDoParWorkers()) - -# run the foreach loop with chunkSize optimized -opt <- list(chunkSize = cs) -results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } -``` - -### Resizing Your Cluster - -At some point, you may also want to resize your cluster manually. You can do this simply with the command *resizeCluster*. - -```R -cluster <- makeCluster("cluster.json") - -# resize so that we have a min of 10 dedicated nodes and a max of 20 dedicated nodes -# AND a min of 10 low priority nodes and a max of 20 low priority nodes -resizeCluster( - cluster, - dedicatedMin = 10, - dedicatedMax = 20, - lowPriorityMin = 10, - lowPriorityMax = 20, - algorithm = 'QUEUE', - timeInterval = '5m' ) -``` - -If your cluster is using autoscale but you want to set it to a static size of 10, you can also use this method: - -```R -# resize to a static cluster of 10 -resizeCluster(cluster, - dedicatedMin = 10, - dedicatedMax = 10, - lowPriorityMin = 0, - lowPriorityMax = 0) -``` - -### Setting Verbose Mode to Debug - -To debug your doAzureParallel jobs, you can set the package to operate on *verbose* mode: - -```R -# turn on verbose mode -setVerbose(TRUE) - -# turn off verbose mode -setVerbose(FALSE) -``` -### Bypassing merge task - -Skipping the merge task is useful when the tasks results don't need to be merged into a list. To bypass the merge task, you can pass the *enableMerge* flag to the foreach object: - -```R -# Enable merge task -foreach(i = 1:3, .options.azure = list(enableMerge = TRUE)) - -# Disable merge task -foreach(i = 1:3, .options.azure = list(enableMerge = FALSE)) -``` -Note: User defined functions for the merge task is on our list of features that we are planning on doing. - -## Next Steps - -For more information, please visit [our documentation](./docs/README.md). \ No newline at end of file +[![Build Status](https://travis-ci.org/Azure/doAzureParallel.svg?branch=master)](https://travis-ci.org/Azure/doAzureParallel) +# doAzureParallel + +```R +# set your credentials +setCredentials("credentials.json") + +# setup your cluster with a simple config file +cluster<- makeCluster("cluster.json") + +# register the cluster as your parallel backend +registerDoAzureParallel(cluster) + +# run your foreach loop on a distributed cluster in Azure +number_of_iterations <- 10 +results <- foreach(i = 1:number_of_iterations) %dopar% { + myParallelAlgorithm() +} +``` + +## Introduction + +The *doAzureParallel* package is a parallel backend for the widely popular *foreach* package. With *doAzureParallel*, each iteration of the *foreach* loop runs in parallel on an Azure Virtual Machine (VM), allowing users to scale up their R jobs to tens or hundreds of machines. + +*doAzureParallel* is built to support the *foreach* parallel computing package. The *foreach* package supports parallel execution - it can execute multiple processes across some parallel backend. With just a few lines of code, the *doAzureParallel* package helps create a cluster in Azure, register it as a parallel backend, and seamlessly connects to the *foreach* package. + +NOTE: The terms *pool* and *cluster* are used interchangably throughout this document. + +## Dependencies + +- R (>= 3.3.1) +- httr (>= 1.2.1) +- rjson (>= 0.2.15) +- RCurl (>= 1.95-4.8) +- digest (>= 0.6.9) +- foreach (>= 1.4.3) +- iterators (>= 1.0.8) +- bitops (>= 1.0.5) + +## Installation + +Install doAzureParallel directly from Github. + +```R +# install the package devtools +install.packages("devtools") +library(devtools) + +# install the doAzureParallel and rAzureBatch package +install_github(c("Azure/rAzureBatch", "Azure/doAzureParallel")) +``` + +## Azure Requirements + +To run your R code across a cluster in Azure, we'll need to get keys and account information. + +### Setup Azure Account +First, set up your Azure Account ([Get started for free!](https://azure.microsoft.com/en-us/free/)) + +Once you have an Azure account, you'll need to create the following two services in the Azure portal: +- Azure Batch Account ([Create an Azure Batch Account in the Portal](https://docs.microsoft.com/en-us/azure/Batch/batch-account-create-portal)) +- Azure Storage Account (this can be created with the Batch Account) + +### Get Keys and Account Information +For your Azure Batch Account, we need to get: +- Batch Account Name +- Batch Account URL +- Batch Account Access Key + +This information can be found in the Azure Portal inside your Batch Account: + +![Azure Batch Acccount in the Portal](./vignettes/doAzureParallel-azurebatch-instructions.PNG "Azure Batch Acccount in the Portal") + +For your Azure Storage Account, we need to get: +- Storage Account Name +- Storage Account Access Key + +This information can be found in the Azure Portal inside your Azure Storage Account: + +![Azure Storage Acccount in the Portal](./vignettes/doAzureParallel-azurestorage-instructions.PNG "Azure Storage Acccount in the Portal") + +Keep track of the above keys and account information as it will be used to connect your R session with Azure. + +## Getting Started + +Import the package +```R +library(doAzureParallel) +``` + +Set up your parallel backend with Azure. This is your set of Azure VMs. +```R +# 1. Generate your credential and cluster configuration files. +generateClusterConfig("cluster.json") +generateCredentialsConfig("credentials.json") + +# 2. Fill out your credential config and cluster config files. +# Enter your Azure Batch Account & Azure Storage keys/account-info into your credential config ("credentials.json") and configure your cluster in your cluster config ("cluster.json") + +# 3. Set your credentials - you need to give the R session your credentials to interact with Azure +setCredentials("credentials.json") + +# 4. Register the pool. This will create a new pool if your pool hasn't already been provisioned. +cluster <- makeCluster("cluster.json") + +# 5. Register the pool as your parallel backend +registerDoAzureParallel(cluster) + +# 6. Check that your parallel backend has been registered +getDoParWorkers() +``` + +Run your parallel *foreach* loop with the *%dopar%* keyword. The *foreach* function will return the results of your parallel code. + +```R +number_of_iterations <- 10 +results <- foreach(i = 1:number_of_iterations) %dopar% { + # This code is executed, in parallel, across your cluster. + myAlgorithm() +} +``` + +After you finish running your R code in Azure, you may want to shut down your cluster of VMs to make sure that you are not being charged anymore. + +```R +# shut down your pool +stopCluster(cluster) +``` + +### Configuration JSON files + +#### Credentials +Use your credential config JSON file to enter your credentials. + +```javascript +{ + "batchAccount": { + "name": , + "key": , + "url": + }, + "storageAccount": { + "name": , + "key": + } +} +``` +Learn more: + - [Batch account / Storage account](./README.md#azure-requirements) + + +#### Cluster Settings +Use your pool configuration JSON file to define your pool in Azure. + +```javascript +{ + "name": , // example: "myazurecluster" + "vmSize": , // example: "Standard_F2" + "maxTasksPerNode": , // example: "2" + "poolSize": { + "dedicatedNodes": { // dedicated vms + "min": 2, + "max": 2 + }, + "lowPriorityNodes": { // low priority vms + "min": 1, + "max": 10 + }, + "autoscaleFormula": "QUEUE" + }, + "rPackages": { + "cran": ["some_cran_package", "some_other_cran_package"], + "github": ["username/some_github_package", "another_username/some_other_github_package"], + "githubAuthenticationToken": {} + }, + "commandLine": [] +} +``` +NOTE: If you do **not** want your cluster to autoscale, simply set the number of min nodes equal to max nodes for low-priority and dedicated. + +Learn more: + - [Choosing VM size](./docs/10-vm-sizes.md#vm-size-table) + - [MaxTasksPerNode](./docs/22-parallelizing-cores.md) + - [LowPriorityNodes](#low-priority-vms) + - [Autoscale](./docs/11-autoscale.md) + - [PoolSize Limitations](./docs/12-quota-limitations.md) + - [rPackages](./docs/20-package-management.md) + +### Low Priority VMs +Low-priority VMs are a way to obtain and consume Azure compute at a much lower price using Azure Batch. Since doAzureParallel is built on top of Azure Batch, this package is able to take advantage of low-priority VMs and allocate compute resources from Azure's surplus capacity at up to **80% discount**. + +Low-priority VMs come with the understanding that when you request it, there is the possibility that we'll need to take some or all of it back. Hence the name *low-priority* - VMs may not be allocated or may be preempted due to higher priority allocations, which equate to full-priced VMs that have an SLA. + +And as the name suggests, this significant cost reduction is ideal for *low priority* workloads that do not have a strict performance requirement. + +With Azure Batch's first-class support for low-priority VMs, you can use them in conjunction with normal on-demand VMs (*dedicated VMs*) and enable job cost to be balanced with job execution flexibility: + + * Batch pools can contain both on-demand nodes and low-priority nodes. The two types can be independently scaled, either explicitly with the resize operation or automatically using auto-scale. Different configurations can be used, such as maximizing cost savings by always using low-priority nodes or spinning up on-demand nodes at full price, to maintain capacity by replacing any preempted low-priority nodes. + * If any low-priority nodes are preempted, then Batch will automatically attempt to replace the lost capacity, continually seeking to maintain the target amount of low-priority capacity in the pool. + * If tasks are interrupted when the node on which it is running is preempted, then the tasks are automatically re-queued to be re-run. + +For more information about low-priority VMs, please visit the [documentation](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms). + +You can also check out information on low-priority pricing [here](https://azure.microsoft.com/en-us/pricing/details/batch/). + +### Distributing Data +When developing at scale, you may also want to chunk up your data and distribute the data across your nodes. Learn more about that [here](./docs/21-distributing-data.md#chunking-data) + +### Using %do% vs %dopar% +When developing at scale, it is always recommended that you test and debug your code locally first. Switch between *%dopar%* and *%do%* to toggle between running in parallel on Azure and running in sequence on your local machine. + +```R +# run your code sequentially on your local machine +results <- foreach(i = 1:number_of_iterations) %do% { ... } + +# use the doAzureParallel backend to run your code in parallel across your Azure cluster +results <- foreach(i = 1:number_of_iterations) %dopar% { ... } +``` + +### Error Handling +The errorhandling option specifies how failed tasks should be evaluated. By default, the error handling is 'stop' to ensure users' can have reproducible results. If a combine function is assigned, it must be able to handle error objects. + +Error Handling Type | Description +--- | --- +stop | The execution of the foreach will stop if an error occurs +pass | The error object of the task is included the results +remove | The result of a failed task will not be returned + +```R +# Remove R error objects from the results +res <- foreach::foreach(i = 1:4, .errorhandling = "remove") %dopar% { + if (i == 2 || i == 4) { + randomObject + } + + mean(1:3) +} + +#> res +#[[1]] +#[1] 2 +# +#[[2]] +#[1] 2 +``` + +```R +# Passing R error objects into the results +res <- foreach::foreach(i = 1:4, .errorhandling = "pass") %dopar% { + if (i == 2|| i == 4) { + randomObject + } + + sum(i, 1) +} + +#> res +#[[1]] +#[1] 2 +# +#[[2]] +# +# +#[[3]] +#[1] 4 +# +#[[4]] +# +``` + +### Long-running Jobs + Job Management + +doAzureParallel also helps you manage your jobs so that you can run many jobs at once while managing it through a few simple methods. + + +```R +# List your jobs: +getJobList() +# Get your job by job id: +getJob(jobId = 'unique_job_id', verbose = TRUE) +``` + +This will also let you run *long running jobs* easily. + +With long running jobs, you will need to keep track of your jobs as well as set your job to a non-blocking state. You can do this with the *.options.azure* options: + +```R +# set the .options.azure option in the foreach loop +opt <- list(job = 'unique_job_id', wait = FALSE) + +# NOTE - if the option wait = FALSE, foreach will return your unique job id +job_id <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar % { ... } + +# get back your job results with your unique job id +results <- getJobResult(job_id) +``` + +Finally, you may also want to track the status of jobs by state (active, completed etc): + +```R +# List jobs in completed state: +filter <- list() +filter$state <- c("active", "completed") +jobList <- getJobList(filter) +View(jobList) +``` + +You can learn more about how to execute long-running jobs [here](./docs/23-persistent-storage.md). + +With long-running jobs, you can take advantage of Azure's autoscaling capabilities to save time and/or money. Learn more about autoscale [here](./docs/11-autoscale.md). + +### Using the 'chunkSize' option + +doAzureParallel also supports custom chunk sizes. This option allows you to group iterations of the foreach loop together and execute them in a single R session. + +```R +# set the chunkSize option +opt <- list(chunkSize = 3) +results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } +``` + +You should consider using the chunkSize if each iteration in the loop executes very quickly. + +If you have a static cluster and want to have a single chunk for each worker, you can compute the chunkSize as follows: + +```R +# compute the chunk size +cs <- ceiling(number_of_iterations / getDoParWorkers()) + +# run the foreach loop with chunkSize optimized +opt <- list(chunkSize = cs) +results <- foreach(i = 1:number_of_iterations, .options.azure = opt) %dopar% { ... } +``` + +### Resizing Your Cluster + +At some point, you may also want to resize your cluster manually. You can do this simply with the command *resizeCluster*. + +```R +cluster <- makeCluster("cluster.json") + +# resize so that we have a min of 10 dedicated nodes and a max of 20 dedicated nodes +# AND a min of 10 low priority nodes and a max of 20 low priority nodes +resizeCluster( + cluster, + dedicatedMin = 10, + dedicatedMax = 20, + lowPriorityMin = 10, + lowPriorityMax = 20, + algorithm = 'QUEUE', + timeInterval = '5m' ) +``` + +If your cluster is using autoscale but you want to set it to a static size of 10, you can also use this method: + +```R +# resize to a static cluster of 10 +resizeCluster(cluster, + dedicatedMin = 10, + dedicatedMax = 10, + lowPriorityMin = 0, + lowPriorityMax = 0) +``` + +### Setting Verbose Mode to Debug + +To debug your doAzureParallel jobs, you can set the package to operate on *verbose* mode: + +```R +# turn on verbose mode +setVerbose(TRUE) + +# turn off verbose mode +setVerbose(FALSE) +``` +### Bypassing merge task + +Skipping the merge task is useful when the tasks results don't need to be merged into a list. To bypass the merge task, you can pass the *enableMerge* flag to the foreach object: + +```R +# Enable merge task +foreach(i = 1:3, .options.azure = list(enableMerge = TRUE)) + +# Disable merge task +foreach(i = 1:3, .options.azure = list(enableMerge = FALSE)) +``` +Note: User defined functions for the merge task is on our list of features that we are planning on doing. + +## Next Steps + +For more information, please visit [our documentation](./docs/README.md). diff --git a/docs/README.md b/docs/README.md index 7c679108..fec31aeb 100644 --- a/docs/README.md +++ b/docs/README.md @@ -41,4 +41,3 @@ This section will provide information about how Azure works, how best to take ad Take a look at our [**Troubleshooting Guide**](./40-troubleshooting.md) for information on how to diagnose common issues. Read our [**FAQ**](./42-faq.md) for known issues and common questions. - \ No newline at end of file diff --git a/man/getJob.Rd b/man/getJob.Rd new file mode 100644 index 00000000..b42af61d --- /dev/null +++ b/man/getJob.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utility.R +\name{getJob} +\alias{getJob} +\title{Get a job for the given job id} +\usage{ +getJob(jobId, verbose = TRUE) +} +\arguments{ +\item{jobId}{A job id} + +\item{verbose}{show verbose log output} +} +\description{ +Get a job for the given job id +} +\examples{ +\dontrun{ +getJob("job-001", FALSE) +} +} diff --git a/man/getJobList.Rd b/man/getJobList.Rd index 1d951a10..75e7ea6b 100644 --- a/man/getJobList.Rd +++ b/man/getJobList.Rd @@ -2,18 +2,18 @@ % Please edit documentation in R/utility.R \name{getJobList} \alias{getJobList} -\title{Get a list of job statuses from the given job ids} +\title{Get a list of job statuses from the given filter} \usage{ -getJobList(jobIds = c()) +getJobList(filter = NULL) } \arguments{ -\item{jobIds}{A character vector of job ids} +\item{filter}{A filter containing job state} } \description{ -Get a list of job statuses from the given job ids +Get a list of job statuses from the given filter } \examples{ \dontrun{ -getJobList(c("job-001", "job-002")) +getJobList() } } diff --git a/man/getJobResult.Rd b/man/getJobResult.Rd index 1abbdb5b..1e001f9e 100644 --- a/man/getJobResult.Rd +++ b/man/getJobResult.Rd @@ -4,15 +4,10 @@ \alias{getJobResult} \title{Download the results of the job} \usage{ -getJobResult(jobId = "", ...) +getJobResult(jobId) } \arguments{ \item{jobId}{The jobId to download from} - -\item{...}{Further named parameters -\itemize{ - \item{"container"}: {The container to download from.} -}} } \value{ The results from the job. diff --git a/man/waitForTasksToComplete.Rd b/man/waitForTasksToComplete.Rd index d41c5455..d2207324 100644 --- a/man/waitForTasksToComplete.Rd +++ b/man/waitForTasksToComplete.Rd @@ -4,7 +4,7 @@ \alias{waitForTasksToComplete} \title{Wait for current tasks to complete} \usage{ -waitForTasksToComplete(jobId, timeout, errorhandling = "stop") +waitForTasksToComplete(jobId, timeout, errorHandling = "stop") } \description{ Wait for current tasks to complete diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R new file mode 100644 index 00000000..da2d9d2a --- /dev/null +++ b/tests/testthat/test-long-running-job.R @@ -0,0 +1,55 @@ +# Run this test for users to make sure the long running job feature +# of doAzureParallel are still working +context("long running job scenario test") +test_that("Long Running Job Test", { + testthat::skip_on_travis() + credentialsFileName <- "credentials.json" + clusterFileName <- "cluster.json" + + doAzureParallel::generateCredentialsConfig(credentialsFileName) + doAzureParallel::generateClusterConfig(clusterFileName) + + # set your credentials + doAzureParallel::setCredentials(credentialsFileName) + cluster <- doAzureParallel::makeCluster(clusterFileName) + doAzureParallel::registerDoAzureParallel(cluster) + + opt <- list(wait = FALSE) + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach( + i = 1:4, + .packages = c('httr'), + .options.azure = opt + ) %dopar% { + mean(1:3) + } + + job <- getJob(res) + + # get active/running job list + filter <- filter <- list() + filter$state <- c("active", "completed") + getJobList(filter) + + # get job list for all jobs + getJobList() + + # wait 2 minutes for job to finish + Sys.sleep(120) + + # get job result + jobResult <- getJobResult(res) + + doAzureParallel::stopCluster(cluster) + + # verify the job result is correct + testthat::expect_equal(length(jobResult), + 4) + + testthat::expect_equal(jobResult, + list(2, 2, 2, 2)) + + # delete the job + rAzureBatch::deleteJob(res) +})