Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Feature/getjobresultlocal (#204)
Browse files Browse the repository at this point in the history
* Get job result locally

* Get job result locally

* preserve task sequence in getjobresult

* keep task result sequence

* keep task result in sequence and as a flat list

* fix lintr error

* fix typo in error message

* delete cluster after test is done

* add retry to getJobResultLocal, resolve xml2 issue

* fix typo, lintr and missing variable
  • Loading branch information
zfengms authored Feb 9, 2018
1 parent 5845985 commit 7aa04f7
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 39 deletions.
143 changes: 106 additions & 37 deletions R/jobUtilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -171,48 +171,45 @@ getJobResult <- function(jobId) {
metadata <- readMetadataBlob(jobId)

if (!is.null(metadata)) {
if (metadata$enableCloudCombine == "FALSE") {
cat("enalbeCloudCombine is set to FALSE, no job merge result is available",
fill = TRUE)

return()
job <- getJob(jobId, verbose = FALSE)

if (job$jobState == "active") {
stop(sprintf("job %s has not finished yet, please try again later",
job$jobId))
} else if (job$jobState != "completed") {
stop(sprintf(
"job %s is in %s state, no job result is available",
job$jobId,
job$jobState
))
}

if (metadata$wait == "FALSE") {
job <- getJob(jobId, verbose = FALSE)

if (job$jobState == "active") {
stop(sprintf(
"job %s is not finished yet, please try again later",
job$jobId
))
} else if (job$jobState != "completed") {
stop(sprintf(
"job %s is %s state, no job result is available",
job$jobId,
job$jobState
))
}

# if the job has failed task
if (job$tasks$failed > 0) {
if (metadata$errorHandling == "stop") {
stop(
sprintf(
"job %s has failed tasks and error handling is set to 'stop', no result will be avaialble",
job$jobId
)
# if the job has failed task
if (job$tasks$failed > 0) {
if (metadata$errorHandling == "stop") {
stop(
sprintf(
"job %s has failed tasks and error handling is set to 'stop', no result will be available",
job$jobId
)
} else {
if (job$tasks$succeeded == 0) {
stop(sprintf(
"all tasks failed for job %s, no result will be avaialble",
job$jobId
))
}
)
} else {
if (job$tasks$succeeded == 0) {
stop(sprintf(
"all tasks failed for job %s, no result will be available",
job$jobId
))
}
}
}

if (metadata$enableCloudCombine == "FALSE") {
cat("enableCloudCombine is set to FALSE, we will merge job result locally",
fill = TRUE)

results <- .getJobResultLocal(job)
return(results)
}
}

tempFile <- tempfile("getJobResult", fileext = ".rds")
Expand All @@ -234,7 +231,7 @@ getJobResult <- function(jobId) {

results <- rAzureBatch::downloadBlob(
jobId,
paste0("result/", jobId, "-merge-result.rds"),
"result/merge-result.rds",
downloadPath = tempFile,
overwrite = TRUE
)
Expand All @@ -249,6 +246,74 @@ getJobResult <- function(jobId) {
}
}

.getJobResultLocal <- function(job) {
results <- vector("list", job$tasks$completed)
count <- 1

for (i in 1:job$tasks$completed) {
retryCounter <- 0
maxRetryCount <- 3
repeat {
if (retryCounter > maxRetryCount) {
stop(
sprintf("Error getting job result: Maxmium number of retries (%d) reached\r\n",
maxRetryCount)
)
} else {
retryCounter <- retryCounter + 1
}

tryCatch({
# Create a temporary file on disk
tempFile <- tempfile(fileext = ".rds")

# Create the temporary file's directory if it doesn't exist
dir.create(dirname(tempFile), showWarnings = FALSE)

# Download the blob to the temporary file
rAzureBatch::downloadBlob(
containerName = job$jobId,
blobName = paste0("result/", i, "-result.rds"),
downloadPath = tempFile,
overwrite = TRUE
)

#Read the rds as an object in memory
taskResult <- readRDS(tempFile)

for (t in 1:length(taskResult)) {
if (isError(taskResult[[t]])) {
if (metadata$errorHandling == "stop") {
stop("Error found")
}
else if (metadata$errorHandling == "pass") {
results[[count]] <- NA
count <- count + 1
}
} else {
results[[count]] <- taskResult[[t]]
count <- count + 1
}
}

# Delete the temporary file
file.remove(tempFile)

break
},
error = function(e) {
warning(sprintf(
"error downloading task result %s from blob, retrying...\r\n%s",
paste0(job$jobId, "result/", i, "-result.rds"),
e
))
})
}
}
# Return the object
return(results)
}

#' Delete a job
#'
#' @param jobId A job id
Expand Down Expand Up @@ -537,3 +602,7 @@ waitForJobPreparation <- function(jobId, poolId) {

cat("\n")
}

isError <- function(x) {
inherits(x, "simpleError") || inherits(x, "try-error")
}
2 changes: 1 addition & 1 deletion R/stringUtilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ getTaskFailedErrorString <- function(...) {
...,
"Error handling is set to 'stop' and has proceeded to terminate the job.",
"The user will have to handle deleting the job.",
"If this is not the correct behavior, change the errorHandling property to 'pass'",
"If this is not the correct behavior, change the errorhandling property to 'pass'",
" or 'remove' in the foreach object. Use the 'getJobFile' function to obtain the logs.",
"For more information about getting job logs, follow this link:",
paste0(
Expand Down
2 changes: 1 addition & 1 deletion R/utility.R
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ readMetadataBlob <- function(jobId) {

if (is.vector(result)) {
result <- readRDS(tempFile)
result <- xml2::as_xml_document(result)
result <- xml2::read_xml(result)
chunkSize <- getXmlValues(result, ".//chunkSize")
packages <- getXmlValues(result, ".//packages")
errorHandling <- getXmlValues(result, ".//errorHandling")
Expand Down
42 changes: 42 additions & 0 deletions tests/testthat/test-local-merge.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Run this test for users to make sure the local result merge feature
# of doAzureParallel are still working
context("merge job result locally test")
test_that("merge job result locally test", {
testthat::skip("merge job result locally test")
testthat::skip_on_travis()
credentialsFileName <- "credentials.json"
clusterFileName <- "cluster.json"

doAzureParallel::generateCredentialsConfig(credentialsFileName)
doAzureParallel::generateClusterConfig(clusterFileName)

doAzureParallel::setCredentials(credentialsFileName)
cluster <- doAzureParallel::makeCluster(clusterFileName)
doAzureParallel::registerDoAzureParallel(cluster)

setChunkSize(2)
'%dopar%' <- foreach::'%dopar%'
jobId <-
foreach::foreach(
i = 1:11,
.errorhandling = "pass",
.options.azure = list(
enableCloudCombine = FALSE,
wait = FALSE
)
) %dopar% {
i
}

res <- getJobResult(jobId)

testthat::expect_equal(length(res),
10)

for (i in 1:10) {
testthat::expect_equal(res[[i]],
i)
}

stopCluster(cluster)
})

0 comments on commit 7aa04f7

Please sign in to comment.