-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17219][ML] enhanced NaN value handling in Bucketizer #15428
Conversation
Test build #66731 has finished for PR 15428 at commit
|
a3e4308
to
cd8113c
Compare
@@ -270,10 +270,10 @@ private[ml] trait HasFitIntercept extends Params { | |||
private[ml] trait HasHandleInvalid extends Params { | |||
|
|||
/** | |||
* Param for how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later. | |||
* Param for how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error), or keep (which will keep the bad values in certain way). More options may be added later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm neutral on the complexity that this adds, but not against it. It gets a little funny to say "keep invalid data" but I think we discussed that on the JIRA
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, usually we treat NaN as a type of invalid value, but we know there are cases they are proved useful, so, it's a bit dilemma here, also with the API naming.
val bucketizer: UserDefinedFunction = udf { (feature: Double, flag: String) => | ||
Bucketizer.binarySearchForBuckets($(splits), feature, flag) | ||
} | ||
val filteredDataset = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this need to try to handle "error"?
val filteredDataSet = getHandleInvalid match {
case "skip" => dataset.na.drop
case "keep" => dataset
case "error" =>
if (...dataset contains NaN...) {
throw new IllegalArgumentException(...)
} else {
dataset
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, actually, NaN will trigger an error later in binarySearchForBuckets as an invalid feature value if no special handling is made.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see that the method handles NaN
below. What binarySearch returns is undefined. One place or the other I think this has to be explicitly handled.
private[feature] def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = { | ||
if (feature.isNaN) { | ||
private[feature] def binarySearchForBuckets | ||
(splits: Array[Double], feature: Double, flag: String): Double = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think the convention is to leave the open paren on the previous line
Doesn't this need to handle "skip" and "error"? throw an exception on NaN if "error" or ignore it if "skip"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic behind is that, we will filter out all NaN values in the dataset if user chooses 'skip', then no further special NaN handling is needed; if not choose to 'skip' NaN, the original dataset will be passed for binary search followed, in which, if user indicates "keep" handling for NaN, an extra bucket will be reserved, if not, an error will be raised.
cd8113c
to
5cd58b7
Compare
Test build #66733 has finished for PR 15428 at commit
|
Test build #66735 has finished for PR 15428 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll let @thunterdb or @jkbradley mostly review from here to see if this is what they intended
val bucketizer: UserDefinedFunction = udf { (feature: Double, flag: String) => | ||
Bucketizer.binarySearchForBuckets($(splits), feature, flag) | ||
} | ||
val filteredDataset = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see that the method handles NaN
below. What binarySearch returns is undefined. One place or the other I think this has to be explicitly handled.
Thanks! I'll take a look. Could you please fix the typo in the title? "enchanced" -> "enhanced" |
typo corrected. Thank you all. @srowen @jkbradley |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done with review. Thanks for the PR!
Other comments:
- Add unit test for
DataFrameStatFunctions.approxQuantile
to check for handling of NaN values. - Create JIRA for adding handleInvalid to Python API.
private[feature] def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = { | ||
if (feature.isNaN) { | ||
private[feature] def binarySearchForBuckets( | ||
splits: Array[Double], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indentation
@Since("2.0.0") | ||
override def transform(dataset: Dataset[_]): DataFrame = { | ||
transformSchema(dataset.schema) | ||
val bucketizer = udf { feature: Double => | ||
Bucketizer.binarySearchForBuckets($(splits), feature) | ||
val bucketizer: UserDefinedFunction = udf { (feature: Double, flag: String) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Boolean flag internally to avoid the string comparison on each call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, at this point, you already know the value for flag, so you can just use it here, rather than making the UDF take an extra argument.
* @group param | ||
*/ | ||
final val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error). More options may be added later", ParamValidators.inArray(Array("skip", "error"))) | ||
final val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle invalid entries. Options are skip (which will filter out rows with bad values), or error (which will throw an error), or keep (which will keep the bad values in certain way). More options may be added later", ParamValidators.inArray(Array("skip", "keep", "error"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is used by StringIndexer, which will not support "keep," we should not modify HasHandleInvalid. I recommend copying out the shared param and just putting it in Bucketizer and QuantileDiscretizer, rather than trying to reuse HasHandleInvalid. That will let you specialize the documentation too so that you can be more specific.
HasHandleInvalid may not be a good shared Param yet, but perhaps in the future..
distinct values of the input to create enough distinct quantiles. Note also that NaN values are | ||
handled specially and placed into their own bucket. For example, if 4 buckets are used, then | ||
non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4]. | ||
distinct values of the input to create enough distinct quantiles. Note also that QuantileDiscretizer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(same as below) Is "possible that the number of buckets used will be less than this value" true? It was true before this used Dataset.approxQuantiles, but I don't think it is true any longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In cases when the number of buckets requested by the user is greater than the number of distinct splits generated from Bucketizer, the returned number of buckets will be less than requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you're right
if ("skip" == getHandleInvalid) { | ||
dataset.na.drop | ||
} else { | ||
dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put dataset.toDF()
here and not import existentials.
* too few distinct values of the input to create enough distinct quantiles. Note also that | ||
* QuantileDiscretizer will raise an error when it finds NaN value in the dataset, but user can | ||
* also choose to either keep or remove NaN values within the dataset by calling setHandleInvalid. | ||
* If user chooses to keep NaN values, they will be handled specially and placed into their own |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation should go in the handleInvalid Param doc string.
@@ -114,6 +115,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa | |||
.setInputCol("feature") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just set splits here; there's no need to set the other Params for this test.
val observedNumBuckets = result.select("result").distinct.count | ||
// Reserve extra one bucket for NaN values | ||
discretizer.setHandleInvalid("keep") | ||
var expectedNumBuckets = discretizer.fit(df).getSplits.length - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also recommend directly testing the values of the splits. The current test makes sure that handleInvalid is passed to the bucketizer correctly, which is important but separate.
Also, please use vals (not vars) for clarity. I'd recommend making a helper method for lines 87-93, which can be reused for the test of handleInvalid = "skip"
|
||
// Remove NaN values | ||
discretizer.setHandleInvalid("skip") | ||
expectedNumBuckets = discretizer.fit(df).getSplits.length - 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here; I'd also recommend directly testing the values of the splits.
@@ -1157,9 +1157,11 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadab | |||
categorical features. The number of bins can be set using the :py:attr:`numBuckets` parameter. | |||
It is possible that the number of buckets used will be less than this value, for example, if |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too: no longer the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as the comment above.
5cd58b7
to
c350e9f
Compare
Test build #67231 has finished for PR 15428 at commit
|
c350e9f
to
0fb8d38
Compare
Thanks for your valuable suggestions. @jkbradley @srowen |
0fb8d38
to
d1dd840
Compare
Test build #67232 has finished for PR 15428 at commit
|
Test build #67233 has finished for PR 15428 at commit
|
val bucketizer = udf { feature: Double => | ||
Bucketizer.binarySearchForBuckets($(splits), feature) | ||
|
||
val bucketizer: UserDefinedFunction = udf { (feature: Double) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, one more comment. I'm not quite sure how closure capture behaves currently, but it might be good to define local vals for $(splits)
and getHandleNaN.isDefined && getHandleNaN.get
. Since these reference methods in the Bucketizer class, I believe the UDF may capture the whole Bucketizer class instead of just those vals.
After you define them in local vals here, you can use those vals in this UDF.
distinct values of the input to create enough distinct quantiles. Note also that NaN values are | ||
handled specially and placed into their own bucket. For example, if 4 buckets are used, then | ||
non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4]. | ||
distinct values of the input to create enough distinct quantiles. Note also that QuantileDiscretizer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you're right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! A few more comments.
Also, I see you renamed the param to handleNaN. Do you think we will ever want to handle null values too? It might be good to stick with "handleInvalid" in case we add null support.
|
||
for (handleNaN <- Array("keep", "skip")) { | ||
discretizer.setHandleNaN(handleNaN) | ||
val expectedNumBuckets = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of complex logic just to get the expected number of splits. I'd recommend just putting the expected bucket values in the original DataFrame:
val df = sc.parallelize(Seq(
(1.0, /*expected value for option "keep"*/, /*expected value for option "skip"*/),
...
)).toDF("input", "expectedKeep", "expectedSkip")
Then you can compare with the actual values. That'll be an easier test to understand IMO.
that NaN values are handled specially and placed into their own bucket. For example, if 4 | ||
buckets are used, then non-NaN data will be put into buckets(0-3), but NaNs will be counted in | ||
a special bucket(4). | ||
that QuantileDiscretizer will raise an error when it finds NaN value in the dataset, but user |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually no need to update Python API until it is updated to include handleNaN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't available in Python yet, so can you please revert this change to feature.py?
* @group param | ||
*/ | ||
@Since("2.1.0") | ||
val handleNaN: Param[String] = new Param[String](this, "handleNaNs", "how to handle NaN" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add getHandleNaN
* Param for how to handle NaN entries. Options are skip (which will filter out rows with | ||
* NaN values), or error (which will throw an error), or keep (which will make the NaN | ||
* values an extra bucket). More options may be added later. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State default
/** | ||
* Param for how to handle NaN entries. Options are skip (which will filter out rows with | ||
* NaN values), or error (which will throw an error), or keep (which will make the NaN | ||
* values an extra bucket). More options may be added later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove "More options may be added later."
Also state default.
val handleNaN: Param[String] = new Param[String](this, "handleNaNs", "how to handle NaN" + | ||
"entries. Options are skip (which will filter out rows with NaN values), or error" + | ||
"(which will throw an error), or keep (which will make the NaN values an extra bucket)." + | ||
"More options may be added later", ParamValidators.inArray(Array("skip", "error", "keep"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove "More options may be added later."
d1dd840
to
47aad24
Compare
Test build #67386 has finished for PR 15428 at commit
|
47aad24
to
70cee57
Compare
This PR is an enhancement of PR with commit ID:57dc326bd00cf0a49da971e9c573c48ae28acaa2. We provided user when dealing NaN value in the dataset with 3 options, to either reserve an extra bucket for NaN values, or remove the NaN values, or report an error, by setting "keep", "skip", or "error"(default) to handleInvalid. '''Before: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) '''After: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) .setHandleNaN("skip") Signed-off-by: VinceShieh <vincent.xie@intel.com>
70cee57
to
b14fbab
Compare
Test build #67430 has finished for PR 15428 at commit
|
Test build #67429 has finished for PR 15428 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates.
Also, when you push further updates, could you please just push a new commit? Rebasing makes it harder to review since it makes it impossible to connect your most recent changes with the recent reviewer comments. Thanks!
/** | ||
* Param for how to handle invalid entries. Options are skip (which will filter out rows with | ||
* invalid values), or error (which will throw an error), or keep (which will keep the invalid | ||
* values in certain way). Default behaviour is to report an error for invalid entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just write: default: "error"
Rewording as "report" instead of "throw" could confuse people.
|
||
/** @group getParam */ | ||
@Since("2.1.0") | ||
def gethandleInvalid: Option[Boolean] = $(handleInvalid) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just return $(handleInvalid)
, just like any other Param getter method.
/** | ||
* Param for how to handle invalid entries. Options are skip (which will filter out rows with | ||
* invalid values), or error (which will throw an error), or keep (which will keep the invalid | ||
* values in certain way). Default behaviour is to report an error for invalid entries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just write: default: "error"
Rewording as "report" instead of "throw" could confuse people.
that NaN values are handled specially and placed into their own bucket. For example, if 4 | ||
buckets are used, then non-NaN data will be put into buckets(0-3), but NaNs will be counted in | ||
a special bucket(4). | ||
that QuantileDiscretizer will raise an error when it finds NaN value in the dataset, but user |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't available in Python yet, so can you please revert this change to feature.py?
This PR is an enhancement of PR with commit ID:57dc326bd00cf0a49da971e9c573c48ae28acaa2. We provided user when dealing NaN value in the dataset with 3 options, to either reserve an extra bucket for NaN values, or remove the NaN values, or report an error, by setting "keep", "skip", or "error"(default) to handleInvalid. '''Before: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) '''After: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) .setHandleNaN("skip") Signed-off-by: VinceShieh <vincent.xie@intel.com>
Test build #67493 has finished for PR 15428 at commit
|
Thanks, you still need to remove the change in feature.py, but other than that, this should be ready. |
Signed-off-by: VinceShieh <vincent.xie@intel.com>
sorry, I must have forgotten to commit the changes. |
Test build #67547 has finished for PR 15428 at commit
|
…naming of set/get for handleInvalid
Thanks for the update! On a last glance through, I spotted a few things to fix up. Since some had to do with wording, I thought it'd be easier just to send a PR to your PR. Can you please review this and merge it if it looks Ok to you? Thanks! Here it is: [https://github.com/VinceShieh/pull/2] |
Test build #67611 has finished for PR 15428 at commit
|
Thanks for merging that! This LGTM |
## What changes were proposed in this pull request? This PR is an enhancement of PR with commit ID:57dc326bd00cf0a49da971e9c573c48ae28acaa2. NaN is a special type of value which is commonly seen as invalid. But We find that there are certain cases where NaN are also valuable, thus need special handling. We provided user when dealing NaN values with 3 options, to either reserve an extra bucket for NaN values, or remove the NaN values, or report an error, by setting handleNaN "keep", "skip", or "error"(default) respectively. '''Before: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) '''After: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) .setHandleNaN("keep") ## How was this patch tested? Tests added in QuantileDiscretizerSuite, BucketizerSuite and DataFrameStatSuite Signed-off-by: VinceShieh <vincent.xieintel.com> Author: VinceShieh <vincent.xie@intel.com> Author: Vincent Xie <vincent.xie@intel.com> Author: Joseph K. Bradley <joseph@databricks.com> Closes apache#15428 from VinceShieh/spark-17219_followup.
## What changes were proposed in this pull request? This PR is an enhancement of PR with commit ID:57dc326bd00cf0a49da971e9c573c48ae28acaa2. NaN is a special type of value which is commonly seen as invalid. But We find that there are certain cases where NaN are also valuable, thus need special handling. We provided user when dealing NaN values with 3 options, to either reserve an extra bucket for NaN values, or remove the NaN values, or report an error, by setting handleNaN "keep", "skip", or "error"(default) respectively. '''Before: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) '''After: val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") .setSplits(splits) .setHandleNaN("keep") ## How was this patch tested? Tests added in QuantileDiscretizerSuite, BucketizerSuite and DataFrameStatSuite Signed-off-by: VinceShieh <vincent.xieintel.com> Author: VinceShieh <vincent.xie@intel.com> Author: Vincent Xie <vincent.xie@intel.com> Author: Joseph K. Bradley <joseph@databricks.com> Closes apache#15428 from VinceShieh/spark-17219_followup.
…r in pyspark This PR is to document the change on QuantileDiscretizer in pyspark for PR: apache#15428 Signed-off-by: VinceShieh <vincent.xie@intel.com>
…r in pyspark ## What changes were proposed in this pull request? This PR is to document the changes on QuantileDiscretizer in pyspark for PR: apache#15428 ## How was this patch tested? No test needed Signed-off-by: VinceShieh <vincent.xieintel.com> Author: VinceShieh <vincent.xie@intel.com> Closes apache#16922 from VinceShieh/spark-19590.
…r in pyspark ## What changes were proposed in this pull request? This PR is to document the changes on QuantileDiscretizer in pyspark for PR: apache#15428 ## How was this patch tested? No test needed Signed-off-by: VinceShieh <vincent.xieintel.com> Author: VinceShieh <vincent.xie@intel.com> Closes apache#16922 from VinceShieh/spark-19590.
What changes were proposed in this pull request?
This PR is an enhancement of PR with commit ID:57dc326bd00cf0a49da971e9c573c48ae28acaa2.
NaN is a special type of value which is commonly seen as invalid. But We find that there are certain cases where NaN are also valuable, thus need special handling. We provided user when dealing NaN values with 3 options, to either reserve an extra bucket for NaN values, or remove the NaN values, or report an error, by setting handleNaN "keep", "skip", or "error"(default) respectively.
'''Before:
val bucketizer: Bucketizer = new Bucketizer()
.setInputCol("feature")
.setOutputCol("result")
.setSplits(splits)
'''After:
val bucketizer: Bucketizer = new Bucketizer()
.setInputCol("feature")
.setOutputCol("result")
.setSplits(splits)
.setHandleNaN("keep")
How was this patch tested?
Tests added in QuantileDiscretizerSuite, BucketizerSuite and DataFrameStatSuite
Signed-off-by: VinceShieh vincent.xie@intel.com