Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression training limit #413

Merged
merged 19 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ case object ModelSelectorNames {
val HoldOutEval = "testSetEvaluationResults"
val ResampleValues = "resamplingValues"
val CuttValues = "cuttValues"
val PreSplitterDataCount = "preSplitterDataCount"
val BestModelUid = "bestModelUID"
val BestModelName = "bestModelName"
val Positive = "positiveLabels"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,24 +338,6 @@ trait DataBalancerParams extends Params {

def getSampleFraction: Double = $(sampleFraction)

/**
* Maximum size of dataset want to train on.
* Value should be > 0.
* Default is 5000.
*
* @group param
*/
final val maxTrainingSample = new IntParam(this, "maxTrainingSample",
"maximum size of dataset want to train on", ParamValidators.inRange(
lowerBound = 0, upperBound = 1 << 30, lowerInclusive = false, upperInclusive = true
)
)
setDefault(maxTrainingSample, SplitterParamsDefault.MaxTrainingSampleDefault)

def setMaxTrainingSample(value: Int): this.type = set(maxTrainingSample, value)

def getMaxTrainingSample: Int = $(maxTrainingSample)

/**
* Fraction to sample minority data
* Value should be > 0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
package com.salesforce.op.stages.impl.tuning

import com.salesforce.op.UID
import com.salesforce.op.stages.impl.selector.ModelSelectorNames
import org.apache.spark.ml.param._
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.types.{Metadata, MetadataBuilder}

case object DataSplitter {
Expand All @@ -46,11 +47,13 @@ case object DataSplitter {
*/
def apply(
seed: Long = SplitterParamsDefault.seedDefault,
reserveTestFraction: Double = SplitterParamsDefault.ReserveTestFractionDefault
reserveTestFraction: Double = SplitterParamsDefault.ReserveTestFractionDefault,
maxTrainingSample: Int = SplitterParamsDefault.MaxTrainingSampleDefault
): DataSplitter = {
new DataSplitter()
.setSeed(seed)
.setReserveTestFraction(reserveTestFraction)
.setMaxTrainingSample(maxTrainingSample)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this also exposed for the datacutter class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added it to SplitterParams which datacutter has access to - e4b8a92. So that I can use the same set/get functions across DataBalancer, DataCutter and DataSplitter.

}
}

Expand All @@ -59,30 +62,70 @@ case object DataSplitter {
*
* @param uid
*/
class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) {
class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) with DataSplitterParams {

/**
* Function to set parameters before passing into the validation step
* eg - do data balancing or dropping based on the labels
* Function to set the down sampling fraction and parameters before passing into the validation step
*
* @param data
* @return Parameters set in examining data
*/
override def preValidationPrepare(data: Dataset[Row]): PrevalidationVal = {
summary = Option(DataSplitterSummary())
val dataSetSize = data.count()
val sampleF = getMaxTrainingSample / dataSetSize.toDouble
val downSampleFraction = math.min(sampleF, SplitterParamsDefault.DownSampleFractionDefault)
summary = Option(DataSplitterSummary(dataSetSize, downSampleFraction))
setDownSampleFraction(downSampleFraction)
PrevalidationVal(summary, None)
}

/**
* Rebalance the training data within the validation step
*
* @param data to prepare for model training. first column must be the label as a double
* @return balanced training set and a test set
*/
override def validationPrepare(data: Dataset[Row]): Dataset[Row] = {

val dataPrep = super.validationPrepare(data)

// check if down sampling is needed
val balanced: DataFrame = if (getDownSampleFraction < 1) {
dataPrep.sample( false, getDownSampleFraction, getSeed)
} else {
dataPrep
}
balanced.persist()
}
override def copy(extra: ParamMap): DataSplitter = {
val copy = new DataSplitter(uid)
copyValues(copy, extra)
}
}
trait DataSplitterParams extends Params {
/**
* Fraction to down sample data
* Value should be in [0.0, 1.0]
*
* @group param
*/
protected[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction",
"fraction to down sample data", ParamValidators.inRange(
lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true
)
)
setDefault(downSampleFraction, SplitterParamsDefault.DownSampleFractionDefault)

protected[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value)

protected[op] def getDownSampleFraction: Double = $(downSampleFraction)
}

/**
* Empty class because no summary information for a data splitter
* Summary for data splitter run for storage in metadata
* @param downSamplingFraction down sampling fraction for training set
*/
case class DataSplitterSummary() extends SplitterSummary {
case class DataSplitterSummary(preSplitterDataCount: Long, downSamplingFraction: Double) extends SplitterSummary {

/**
* Converts to [[Metadata]]
Expand All @@ -94,6 +137,8 @@ case class DataSplitterSummary() extends SplitterSummary {
def toMetadata(skipUnsupported: Boolean): Metadata = {
new MetadataBuilder()
.putString(SplitterSummary.ClassName, this.getClass.getName)
.putLong(ModelSelectorNames.PreSplitterDataCount, preSplitterDataCount)
.putDouble(ModelSelectorNames.DownSample, downSamplingFraction)
.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@ trait SplitterParams extends Params {
def setReserveTestFraction(value: Double): this.type = set(reserveTestFraction, value)
def getReserveTestFraction: Double = $(reserveTestFraction)

/**
* Maximum size of dataset want to train on.
* Value should be > 0.
* Default is 1000000.
*
* @group param
*/
final val maxTrainingSample = new IntParam(this, "maxTrainingSample",
"maximum size of dataset want to train on", ParamValidators.inRange(
lowerBound = 0, upperBound = 1 << 30, lowerInclusive = false, upperInclusive = true
)
)
setDefault(maxTrainingSample, SplitterParamsDefault.MaxTrainingSampleDefault)

def setMaxTrainingSample(value: Int): this.type = set(maxTrainingSample, value)

def getMaxTrainingSample: Int = $(maxTrainingSample)

final val labelColumnName = new Param[String](this, "labelColumnName",
"label column name, column 0 if not specified")
private[op] def getLabelColumnName = $(labelColumnName)
Expand All @@ -143,6 +161,7 @@ object SplitterParamsDefault {
val MaxTrainingSampleDefault = 1E6.toInt
val MaxLabelCategoriesDefault = 100
val MinLabelFractionDefault = 0.0
val DownSampleFractionDefault = 1.0
}

trait SplitterSummary extends MetadataLike
Expand All @@ -152,7 +171,10 @@ private[op] object SplitterSummary {

def fromMetadata(metadata: Metadata): Try[SplitterSummary] = Try {
metadata.getString(ClassName) match {
case s if s == classOf[DataSplitterSummary].getName => DataSplitterSummary()
case s if s == classOf[DataSplitterSummary].getName => DataSplitterSummary(
preSplitterDataCount = metadata.getLong(ModelSelectorNames.PreSplitterDataCount),
downSamplingFraction = metadata.getDouble(ModelSelectorNames.DownSample)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add the downsample fraction to the datacutter params as well...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added downsample fraction into the datacutter params as part of the multi class classification training limit changes. I'll create the PR for it today.

case s if s == classOf[DataBalancerSummary].getName => DataBalancerSummary(
positiveLabels = metadata.getLong(ModelSelectorNames.Positive),
negativeLabels = metadata.getLong(ModelSelectorNames.Negative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import com.salesforce.op.stages.impl.CompareParamGrid
import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => RMT}
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorSummary}
import com.salesforce.op.stages.impl.tuning.BestEstimator
import com.salesforce.op.stages.impl.tuning.{BestEstimator, DataSplitter}
import com.salesforce.op.test.TestSparkContext
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.RichMetadata._
Expand All @@ -62,6 +62,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext
with CompareParamGrid with OpXGBoostQuietLogging {
val seed = 1234L
val stageNames = "label_prediction"
val dataCount = 200

import spark.implicits._
val rand = new Random(seed)
Expand Down Expand Up @@ -120,9 +121,32 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext

it should "set the data splitting params correctly" in {
val modelSelector = RegressionModelSelector()
modelSelector.splitter.get.setReserveTestFraction(0.1).setSeed(11L)
modelSelector.splitter.get.setReserveTestFraction(0.1).setSeed(11L).setMaxTrainingSample(1000)

modelSelector.splitter.get.getSeed shouldBe 11L
modelSelector.splitter.get.getReserveTestFraction shouldBe 0.1
modelSelector.splitter.get.getMaxTrainingSample shouldBe 1000
}

it should "down-sample when the training set is greater than the maxTrainingSample" in {

implicit val vectorEncoder: org.apache.spark.sql.Encoder[Vector] = ExpressionEncoder()
implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder)
val maxTrainingSample = 100
val sampleF = maxTrainingSample / dataCount.toDouble
val downSampleFraction = math.min(sampleF, 1.0)
val dataSplitter = DataSplitter(maxTrainingSample = maxTrainingSample, seed = seed, reserveTestFraction = 0.0)
val modelSelector =
RegressionModelSelector.withTrainValidationSplit(
modelTypesToUse = Seq(RMT.OpLinearRegression),
dataSplitter = Option(dataSplitter),
seed = seed)
val model = modelSelector.setInput(label, features).fit(data)
val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata())

val modelDownSampleFraction = metaData.dataPrepParameters("downSampleFraction" )

modelDownSampleFraction shouldBe downSampleFraction
}

it should "split into training and test" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma

val seed = 1234L
val dataCount = 1000
val trainingLimitDefault = 1E6.toLong

val data =
RandomRDDs.normalVectorRDD(sc, 1000, 3, seed = seed)
Expand All @@ -56,6 +57,37 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma
train.count() shouldBe dataCount
}

it should "down-sample when the data count is above the default training limit" in {
val numRows = trainingLimitDefault * 2
val data =
RandomRDDs.normalVectorRDD(sc, numRows, 3, seed = seed)
.map(v => (1.0, Vectors.dense(v.toArray), "A")).toDF()
dataSplitter.preValidationPrepare(data)

val dataBalanced = dataSplitter.validationPrepare(data)
// validationPrepare calls the data sample method that samples the data to a target ratio but there is an epsilon
// to how precise this function is which is why we need to check around that epsilon
val samplingErrorEpsilon = (0.1 * trainingLimitDefault).toLong

dataBalanced.count() shouldBe trainingLimitDefault +- samplingErrorEpsilon
}

it should "set and get all data splitter params" in {
val maxRows = dataCount / 2
val downSampleFraction = maxRows / dataCount.toDouble

val dataSplitter = DataSplitter()
.setReserveTestFraction(0.0)
.setSeed(seed)
.setMaxTrainingSample(maxRows)
.setDownSampleFraction(downSampleFraction)

dataSplitter.getReserveTestFraction shouldBe 0.0
dataSplitter.getDownSampleFraction shouldBe downSampleFraction
dataSplitter.getSeed shouldBe seed
dataSplitter.getMaxTrainingSample shouldBe maxRows
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth checking downSampleFraction params was set here as well, for completeness so that you have checked everything in DataSplitterParams

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made changes here 433d483

it should "split the data in the appropriate proportion - 0.2" in {
val (train, test) = dataSplitter.setReserveTestFraction(0.2).split(data)
math.abs(test.count() - 200) < 30 shouldBe true
Expand All @@ -69,10 +101,13 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma
}

it should "keep the data unchanged when prepare is called" in {
val dataCount = data.count()
val summary = dataSplitter.preValidationPrepare(data)
val train = dataSplitter.validationPrepare(data)
val sampleF = trainingLimitDefault / dataCount.toDouble
val downSampleFraction = math.min(sampleF, 1.0)
train.collect().zip(data.collect()).foreach { case (a, b) => a shouldBe b }
assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary() }
assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(dataCount, downSampleFraction) }
}

}