Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
v0.3.0 Release (#20)
Browse files Browse the repository at this point in the history
* Added set chunk size

* Adding resource files on pool creation

* renaming generate file functions

* Moved worker/merger scripts to doAzureParallel and created common job env

* Added stdout and stderr logs in uploads

* added to docs / README

* Switched params for cluster and added examples

* setCreds, resizeCluster, job management

* cred generator update

* Added samples, moved autoscale, and low-pri/output files

* Added documentation on methods for ??R feature

* Added export for makeCluster

* Namespace missing export

* clusterSetting param name

* cluster id param name

* NumOfNodes param for wait nodes completion fix

* Added proper naming for registerDoAzureParallel

* readme update'

* typo readme

* low pri in readme

* monte carlo simulation

* Added new sample for sas resource files

* caret + annotation on montecarlo sim

* samples readme.md

* samples readme

* Fixed the resource files to use proper storage account for example

* Update README.md

* Update 11-autoscale.md

* Fixed autoscale formula for task queue to take maxTaskPerNode

* Added named args to createSasToken

* Update resource-files-example.R

* Update 21-distributing-data.md

* Renamed samples files to underscore format

* Update 21-distributing-data.md

* Update README.md

* Update README.md

* Update README.md

* Edited changelog file

* Update plyr_example.R

* Update README.md
  • Loading branch information
brnleehng authored May 23, 2017
1 parent 96ad39a commit 02c5eac
Show file tree
Hide file tree
Showing 31 changed files with 1,480 additions and 191 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
0.3.0
- [BREAKING CHANGE] Two configuration files for easier debugging - credentials and cluster settings
- [BREAKING CHANGE] Added low priority virtual machine support for additional cost saving
- Added external method for setting chunk size (SetChunkSize)
- Added getJobList function to check the status of user's jobs
- Added resizeCluster function to allow users to change their autoscale formulas on the fly
17 changes: 11 additions & 6 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
Package: doAzureParallel
Type: Package
Title: doAzureParallel
Version: 0.2.2
Version: 0.3.0
Author: Brian Hoang
Maintainer: Who to complain to <yourfault@somewhere.net>
Description: More about what it does (maybe more than one line)
License: What license is it under?
Maintainer: Brian Hoang <brhoan@microsoft.com>
Description: The project is for data experts who use R at scale. The project
comes together as an R package that will allow users to run their R code in
parallel across a cluster hosted on Azure. The cluster will be created and
maintained by Azure Batch and, for the initial version, will be a public/
communal pool. The orchestration for each job that needs to be parallelized in
the cluster will be done by a middle layer that schedules each request.
License: Microsoft Corporation
LazyData: TRUE
Depends:
foreach (>= 1.4.3),
iterators (>= 1.0.8),
rAzureBatch (>= 0.1.0)
rAzureBatch (>= 0.2.4)
Suggests:
testthat
testthat, caret, plyr
RoxygenNote: 5.0.1
13 changes: 12 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
# Generated by roxygen2: do not edit by hand
exportPattern("^[^\\.]")

export(generateClusterConfig)
export(generateCredentialsConfig)
export(getJobList)
export(getJobResult)
export(makeCluster)
export(registerDoAzureParallel)
export(setChunkSize)
export(setCredentials)
export(setVerbose)
export(stopCluster)
export(waitForNodesToComplete)
79 changes: 79 additions & 0 deletions R/autoscale.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
AUTOSCALE_WORKDAY_FORMULA <- paste0(
"$curTime = time();",
"$workHours = $curTime.hour >= 8 && $curTime.hour < 18;",
"$isWeekday = $curTime.weekday >= 1 && $curTime.weekday <= 5;",
"$isWorkingWeekdayHour = $workHours && $isWeekday;",
"$TargetDedicatedNodes = $isWorkingWeekdayHour ? %s:%s;")

AUTOSCALE_WEEKEND_FORMULA <- paste0(
"$isWeekend = $curTime.weekday >= 6 && $curTime.weekday <= 7;",
"$TargetDedicatedNodes = $isWeekend ? %s:%s;")

AUTOSCALE_MAX_CPU_FORMULA <- "$totalNodes =
(min($CPUPercent.GetSample(TimeInterval_Minute * 10)) > 0.7) ?
($CurrentDedicated * 1.1) : $CurrentDedicated; $totalNodes =
(avg($CPUPercent.GetSample(TimeInterval_Minute * 60)) < 0.2) ?
($CurrentDedicated * 0.9) : $totalNodes;
$TargetDedicatedNodes = min(%s, $totalNodes)"

AUTOSCALE_QUEUE_FORMULA <- paste0(
"$samples = $ActiveTasks.GetSamplePercent(TimeInterval_Minute * 15);",
"$tasks = $samples < 70 ? max(0,$ActiveTasks.GetSample(1)) : max( $ActiveTasks.GetSample(1), avg($ActiveTasks.GetSample(TimeInterval_Minute * 15)));",
"$maxTasksPerNode = %s;",
"$round = $maxTasksPerNode - 1;",
"$targetVMs = $tasks > 0? (($tasks + $round)/ $maxTasksPerNode) : max(0, $TargetDedicated/2) + 0.5;",
"$TargetDedicatedNodes = max(%s, min($targetVMs, %s));",
"$TargetLowPriorityNodes = max(%s, min($targetVMs, %s));",
"$NodeDeallocationOption = taskcompletion;"
)

AUTOSCALE_FORMULA = list("WEEKEND" = AUTOSCALE_WEEKEND_FORMULA,
"WORKDAY" = AUTOSCALE_WORKDAY_FORMULA,
"MAX_CPU" = AUTOSCALE_MAX_CPU_FORMULA,
"QUEUE" = AUTOSCALE_QUEUE_FORMULA)

getAutoscaleFormula <- function(formulaName, dedicatedMin, dedicatedMax, lowPriorityMin, lowPriorityMax, maxTasksPerNode = 1){
formulas <- names(AUTOSCALE_FORMULA)

if(formulaName == formulas[1]){
return(sprintf(AUTOSCALE_WEEKEND_FORMULA, dedicatedMin, dedicatedMax))
}
else if(formulaName == formulas[2]){
return(sprintf(AUTOSCALE_WORKDAY_FORMULA, dedicatedMin, dedicatedMax))
}
else if(formulaName == formulas[3]){
return(sprintf(AUTOSCALE_MAX_CPU_FORMULA, dedicatedMin))
}
else if(formulaName == formulas[4]){
return(sprintf(AUTOSCALE_QUEUE_FORMULA, maxTasksPerNode, dedicatedMin, dedicatedMax, lowPriorityMin, lowPriorityMax))
}
else{
stop("Incorrect autoscale formula: QUEUE, MAX_CPU, WEEKEND, WORKDAY")
}
}

#' Resize an Azure cloud-enabled cluster.
#'
#' @param cluster Cluster object that was referenced in \code{makeCluster}
#' @param dedicatedMin The minimum number of dedicated nodes
#' @param dedicatedMax The maximum number of dedicated nodes
#' @param lowPriorityMin The minimum number of low priority nodes
#' @param lowPriorityMax The maximum number of low priority nodes
#' @param algorithm Current built-in autoscale formulas: QUEUE, MAX_CPU, WEEKEND, WEEKDAY
#' @param timeInterval
#'
#' @examples
#' resizeCluster(cluster, dedicatedMin = 2, dedicatedMax = 6, dedicatedMin = 2, dedicatedMax = 6, algorithm = "QUEUE", timeInterval = "PT10M")
resizeCluster <- function(cluster,
dedicatedMin,
dedicatedMax,
lowPriorityMin,
lowPriorityMax,
algorithm = "QUEUE",
timeInterval = "PT5M"){
pool <- getPool(cluster$poolId)

resizePool(cluster$poolId,
autoscaleFormula = getAutoscaleFormula(algorithm, dedicatedMin, dedicatedMax, lowPriorityMin, lowPriorityMax, maxTasksPerNode = pool$maxTasksPerNode),
autoscaleInterval = timeInterval)
}
136 changes: 96 additions & 40 deletions R/cluster.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#' Creates a configuration file for the user's cluster setup.
#' Creates a credentials file for rAzureBatch package authentication
#'
#' @param fileName Cluster configuration's file name.
#' @param fileName Credentials file name
#' @param ... Further named parameters
#' \itemize{
#' \item{"batchAccount"}: {A list of files that the Batch service will download to the compute node before running the command line.}
Expand All @@ -11,10 +11,11 @@
#'}
#' @return The request to the Batch service was successful.
#' @examples {
#' generateClusterConfig("test_config.json")
#' generateClusterConfig("test_config.json", batchAccount = "testbatchaccount", batchKey = "test_batch_account_key", batchUrl = "http://testbatchaccount.azure.com", storageAccount = "teststorageaccount", storageKey = "test_storage_account_key")
#' generateCredentialsConfig("test_config.json")
#' generateCredentialsConfig("test_config.json", batchAccount = "testbatchaccount", batchKey = "test_batch_account_key", batchUrl = "http://testbatchaccount.azure.com", storageAccount = "teststorageaccount", storageKey = "test_storage_account_key")
#' }
generateClusterConfig <- function(fileName, ...){
#' @export
generateCredentialsConfig <- function(fileName, ...){
args <- list(...)

batchAccount <- ifelse(is.null(args$batchAccount), "batch_account_name", args$batchAccount)
Expand All @@ -31,35 +32,64 @@ generateClusterConfig <- function(fileName, ...){
batchAccount = list(
name = batchAccount,
key = batchKey,
url = batchUrl,
pool = list(
name = "myPoolName",
vmSize = "Standard_D2_v2",
maxTasksPerNode = 1,
poolSize = list(
minNodes = 3,
maxNodes = 10,
autoscaleFormula = "QUEUE"
)
),
rPackages = list(
cran = vector(),
github = vector()
)
url = batchUrl
),
storageAccount = list(
name = storageName,
key = storageKey
),
settings = list(
verbose = FALSE
)
)

configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file=paste0(getwd(), "/", fileName))

print(sprintf("A config file has been generated %s. Please enter your Batch credentials.", paste0(getwd(), "/", fileName)))
}
}

#' Creates a configuration file for the user's cluster setup.
#'
#' @param fileName Cluster settings file name
#' @return The request to the Batch service was successful.
#' @examples {
#' generateClusterConfig("test_config.json")
#' generateClusterConfig("test_config.json")
#' }
#'
#' @export
generateClusterConfig <- function(fileName, ...){
args <- list(...)

packages <- ifelse(is.null(args$packages), list(), args$packages)

if(!file.exists(fileName) || !file.exists(paste0(getwd(), "/", fileName))){
config <- list(
pool = list(
name = "myPoolName",
vmSize = "Standard_D2_v2",
maxTasksPerNode = 1,
poolSize = list(
dedicatedNodes = list(
min = 3,
max = 3
),
lowPriorityNodes = list(
min = 3,
max = 3
),
autoscaleFormula = "QUEUE"
)
),
rPackages = list(
cran = vector(),
github = vector()
)
)

configJson <- jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
write(configJson, file=paste0(getwd(), "/", fileName))

print(sprintf("A cluster settings has been generated %s. Please enter your cluster specification.", paste0(getwd(), "/", fileName)))
print("Note: To maximize all CPU cores, set the maxTasksPerNode property up to 4x the number of cores for the VM size.")
}
}
Expand All @@ -69,35 +99,48 @@ generateClusterConfig <- function(fileName, ...){
#' @param fileName Cluster configuration's file name
#' @param fullName A boolean flag for checking the file full name
#' @param wait A boolean flag to wait for all nodes to boot up
#' @param resourceFiles A list of files that Batch will download to the compute node before running the command line
#'
#' @return The request to the Batch service was successful.
#' @examples
#' cluster <- makeCluster("cluster_config.json", fullName = TRUE, wait = TRUE)
makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TRUE, resourceFiles = list()){
setPoolOption(fileName, fullName)
#' @export
makeCluster <- function(clusterSetting = "cluster_settings.json", fullName = FALSE, wait = TRUE, resourceFiles = list()){
if(fullName){
pool <- rjson::fromJSON(file=paste0(clusterSetting))
}
else{
pool <- rjson::fromJSON(file=paste0(getwd(), "/", clusterSetting))
}

config <- getOption("az_config")
pool <- config$batchAccount$pool
if(is.null(config)){
stop("Credentials were not set.")
}

config$poolId = pool$pool$name
options("az_config" = config)

packages <- NULL
if(!is.null(config$batchAccount$rPackages) && !is.null(config$batchAccount$rPackages$cran) && length(config$batchAccount$rPackages$cran) > 0){
packages <- getInstallationCommand(config$batchAccount$rPackages$cran)
if(!is.null(pool$rPackages) && !is.null(pool$rPackages$cran) && length(pool$rPackages$cran) > 0){
packages <- getInstallationCommand(pool$rPackages$cran)
}

if(!is.null(config$batchAccount$rPackages) && !is.null(config$batchAccount$rPackages$github) && length(config$batchAccount$rPackages$github) > 0){
if(!is.null(pool$rPackages) && !is.null(pool$rPackages$github) && length(pool$rPackages$github) > 0){
if(is.null(packages)){
packages <- getGithubInstallationCommand(config$batchAccount$rPackages$github)
packages <- getGithubInstallationCommand(pool$rPackages$github)
}
else{
packages <- paste0(packages, ";", getGithubInstallationCommand(config$batchAccount$rPackages$github))
packages <- paste0(packages, ";", getGithubInstallationCommand(pool$rPackages$github))
}
}

response <- .addPool(
pool = pool,
pool = pool$pool,
packages = packages,
resourceFiles = resourceFiles)

pool <- getPool(pool$name)
pool <- getPool(pool$pool$name)

if(grepl("AuthenticationFailed", response)){
stop("Check your credentials and try again.");
Expand All @@ -108,36 +151,49 @@ makeCluster <- function(fileName = "az_config.json", fullName = FALSE, wait = TR
}
else{
if(wait){
waitForNodesToComplete(pool$id, 60000, targetDedicated = pool$targetDedicated)
waitForNodesToComplete(pool$id, 60000)
}
}

print("Your pool has been registered.")
print(sprintf("Node Count: %i", pool$targetDedicated))
print(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes))
print(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes))
return(getOption("az_config"))
}

#' Deletes the cluster from your Azure account.
#'
#' @param cluster The cluster configuration that was created in \code{makeCluster}
#'
#' @return The request to the Batch service was successful.
#' @examples
#' clusterConfiguration <- makeCluster("pool_configuration.json")
#' clusterConfiguration <- makeCluster("cluster_settings.json")
#' stopCluster(clusterConfiguration)
#' @export
stopCluster <- function(cluster){
deletePool(pool$batchAccount$pool$name)
deletePool(cluster$poolId)

print(sprintf("Your %s cluster has been destroyed.", cluster$poolId))
}

setPoolOption <- function(fileName = "az_config.json", fullName = FALSE){
if(fullName){
#' Deletes the cluster from your Azure account.
#'
#' @param fileName The cluster configuration that was created in \code{makeCluster}
#'
#' @return The request to the Batch service was successful.
#' @examples
#' clusterConfiguration <- makeCluster("cluster_settings.json")
#' stopCluster(clusterConfiguration)
#' @export
setCredentials <- function(fileName = "az_config.json"){
if(file.exists(fileName)){
config <- rjson::fromJSON(file=paste0(fileName))
}
else{
config <- rjson::fromJSON(file=paste0(getwd(), "/", fileName))
}

options("az_config" = config)
print("Your azure credentials have been set.")
}

getPoolWorkers <- function(poolId, ...){
Expand Down
Loading

0 comments on commit 02c5eac

Please sign in to comment.