Skip to content

Commit

Permalink
add offset to SparkR
Browse files Browse the repository at this point in the history
  • Loading branch information
actuaryzhang committed Aug 3, 2017
1 parent 9f5647d commit 6ec068e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
18 changes: 14 additions & 4 deletions R/pkg/R/mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
#' @param maxIter integer giving the maximal number of IRLS iterations.
#' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance
#' weights as 1.0.
#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets
#' as 0.0. The feature specified as offset has a constant coefficient of 1.0.
#' @param regParam regularization parameter for L2 regularization.
#' @param var.power the power in the variance function of the Tweedie distribution which provides
#' the relationship between the variance and mean of the distribution. Only
Expand Down Expand Up @@ -125,7 +127,7 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
#' @seealso \link{glm}, \link{read.ml}
setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL,
regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power,
offsetCol = NULL, regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power,
stringIndexerOrderType = c("frequencyDesc", "frequencyAsc",
"alphabetDesc", "alphabetAsc")) {

Expand Down Expand Up @@ -159,10 +161,16 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
weightCol <- as.character(weightCol)
}

if (!is.null(offsetCol) && offsetCol == "") {
offsetCol <- NULL
} else if (!is.null(offsetCol)) {
offsetCol <- as.character(offsetCol)
}

# For known families, Gamma is upper-cased
jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
"fit", formula, data@sdf, tolower(family$family), family$link,
tol, as.integer(maxIter), weightCol, regParam,
tol, as.integer(maxIter), weightCol, offsetCol, regParam,
as.double(var.power), as.double(link.power),
stringIndexerOrderType)
new("GeneralizedLinearRegressionModel", jobj = jobj)
Expand All @@ -182,6 +190,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
#' \code{poisson}, \code{Gamma}, and \code{tweedie}.
#' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance
#' weights as 1.0.
#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets
#' as 0.0. The feature specified as offset has a constant coefficient of 1.0.
#' @param epsilon positive convergence tolerance of iterations.
#' @param maxit integer giving the maximal number of IRLS iterations.
#' @param var.power the index of the power variance function in the Tweedie family.
Expand All @@ -207,11 +217,11 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
#' @seealso \link{spark.glm}
setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDataFrame"),
function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL,
var.power = 0.0, link.power = 1.0 - var.power,
offsetCol = NULL, var.power = 0.0, link.power = 1.0 - var.power,
stringIndexerOrderType = c("frequencyDesc", "frequencyAsc",
"alphabetDesc", "alphabetAsc")) {
spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol,
var.power = var.power, link.power = link.power,
offsetCol = offsetCol, var.power = var.power, link.power = link.power,
stringIndexerOrderType = stringIndexerOrderType)
})

Expand Down
7 changes: 7 additions & 0 deletions R/pkg/tests/fulltests/test_mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ test_that("spark.glm summary", {
expect_equal(stats$df.residual, rStats$df.residual)
expect_equal(stats$aic, rStats$aic)

# Test spark.glm works with offset
stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
family = poisson(), offsetCol = "Pedal_Length"))
rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species,
data = iris, family = poisson(), offset = Pedal_Length))
expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3))

# Test summary works on base GLM models
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
baseSummary <- summary(baseModel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private[r] object GeneralizedLinearRegressionWrapper
tol: Double,
maxIter: Int,
weightCol: String,
offsetCol: String,
regParam: Double,
variancePower: Double,
linkPower: Double,
Expand All @@ -99,6 +100,7 @@ private[r] object GeneralizedLinearRegressionWrapper
glr.setLink(link)
}
if (weightCol != null) glr.setWeightCol(weightCol)
if (offsetCol != null) glr.setOffsetCol(offsetCol)

val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, glr))
Expand Down

0 comments on commit 6ec068e

Please sign in to comment.