Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10641][SQL] Add Skewness and Kurtosis Support #9003

Closed
wants to merge 22 commits into from

Conversation

sethah
Copy link
Contributor

@sethah sethah commented Oct 6, 2015

Implementing skewness and kurtosis support based on following algorithm:
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics

@sethah
Copy link
Contributor Author

sethah commented Oct 6, 2015

A few notes:

  • I wrote this implementation before stddev was merged, and hence I ended up with a slightly different implementation. The differences are mostly syntactical (the algorithms for computing second moment are the same), except for how the subclasses are implemented. Once we figure out the best way to implement the subclasses, I will merge the implementations to remove the overlap
  • SPARK-10953 Calls for benchmarking codegen vs scala implementations of univariate stats. The results of that may affect this PR.
  • I did not implement the skewness and kurtosis functions using the old way with AggregateFunction1, since I think that will be removed. I can implement this separately if needed.

}
}

//case class Average(child: Expression) extends CentralMomentAgg(child) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aggregate functions which depend on lower order moments can easily be implemented using the CentralMomentAgg base class. I have commented them out for now, but left them for discussion purposes.

@mengxr
Copy link
Contributor

mengxr commented Oct 19, 2015

@sethah, are you working on this? My suggestion is to implement skewness and kurtosis in a single aggregate function that implements ImperativeAggregate based on the discussion in SPARK-10953, but do not change existing implementation of standard deviation and variance. This makes the PR simpler and easier to review and merge.

@sethah
Copy link
Contributor Author

sethah commented Oct 19, 2015

@mengxr I am working on it, and I have incorporated changes from the note you posted on the Jira - thanks for that! I pushed some changes. I had already changed the existing implementation of stddev unfortunately, but I can revert those changes so that the PR is a bit smaller. I was thinking that since variance hasn't been merged yet, that I could include it in the PR, but if you think it's best to leave it out I can do that as well.

I tested the imperative versions vs the codegen versions I had before and they are significantly faster. I will work on getting the changes for stddev reverted so that it will be ready for review.

val updateValue = v match {
case d: java.lang.Number => d.doubleValue()
case _ => 0.0
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please double check if it will handle all numeric types correctly, e.g: Decimal?

@mengxr
Copy link
Contributor

mengxr commented Oct 19, 2015

@sethah Besides inline comments, we can also think about how to simplify the class inheritance. It would be better if we don't need to touch InternalRow in subclasses. Also, Variance is the same as VarianceSamp, shall we make one inherit another with a different prettyName?

@mengxr
Copy link
Contributor

mengxr commented Oct 20, 2015

add to whitelist

@sethah
Copy link
Contributor Author

sethah commented Oct 20, 2015

@mengxr I think having VarianceSamp inherit from Variance will be a fine solution. I'm not clear on why it is better not to touch InternalRow in the subclasses, but if we must avoid it, one solution could be moving to a single eval method in the base class and pattern matching on the subclass:

def eval(buffer: InternalRow): Any = this match {
      case _: VariancePop => { }
      case _: Variance => { }
      case _: Skewness => { }
      case _: Kurtosis => { }

@SparkQA
Copy link

SparkQA commented Oct 20, 2015

Test build #43961 has finished for PR 9003 at commit b180a28.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * case class Stddev(child: Expression) extends CentralMomentAgg(child)\n * case class StddevSamp(child: Expression) extends CentralMomentAgg(child)\n * case class StddevPop(child: Expression) extends CentralMomentAgg(child)\n * case class Variance(child: Expression) extends CentralMomentAgg(child)\n * case class VarianceSamp(child: Expression) extends CentralMomentAgg(child)\n * case class VariancePop(child: Expression) extends CentralMomentAgg(child)\n * case class Skewness(child: Expression) extends CentralMomentAgg(child)\n * case class Kurtosis(child: Expression) extends CentralMomentAgg(child)\n * abstract class CentralMomentAgg1(child: Expression)\n * case class Kurtosis(child: Expression) extends CentralMomentAgg1(child)\n * case class Skewness(child: Expression) extends CentralMomentAgg1(child)\n * case class Variance(child: Expression) extends CentralMomentAgg1(child)\n * case class VariancePop(child: Expression) extends CentralMomentAgg1(child)\n * case class VarianceSamp(child: Expression) extends CentralMomentAgg1(child)\n

@mengxr
Copy link
Contributor

mengxr commented Oct 20, 2015

@sethah The storage of InternalRow is an implicit contract. Exposing InternalRow to subclasses makes the code harder to understand. For example, to confirm the following code, one needs to go back to the implementation of CentralMomentAgg.

val M0 = buffer.getDouble(mutableAggBufferOffset)
val M2 = buffer.getDouble(mutableAggBufferOffset + 2)
val M4 = buffer.getDouble(mutableAggBufferOffset + 4)

I suggest adding an abstract method to CentralMomemtAgg called Double getStatistic(n: Double, mean: Double, centralMoments: Array[Double]) and implement eval in CentralMomentAgg:

override final def eval(buffer: InternalRow): Any = {
  val n = buffer.getDouble(offset)
  val mean = buffer.getDouble(offset + 1)
  val moments = Array.ofDim[Double](maxMomemt)
  moments[0] = 1.0
  moments[1] = 0.0
  // fill in other moments up to maxMoment
  getStatistic(n, mean, moments)
}

Then we can hide the storage of InternalRow from subclasses.

@SparkQA
Copy link

SparkQA commented Oct 21, 2015

Test build #44039 has finished for PR 9003 at commit 4ae234e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44179 has finished for PR 9003 at commit 3ef2faa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * abstract class SecondMoment(child: Expression) extends CentralMomentAgg(child)\n * case class Variance(child: Expression, mutableAggBufferOffset: Int = 0,\n * case class VarianceSamp(child: Expression, mutableAggBufferOffset: Int = 0,\n * case class VariancePop(child: Expression, mutableAggBufferOffset: Int = 0,\n * case class Skewness(child: Expression, mutableAggBufferOffset: Int = 0,\n * case class Kurtosis(child: Expression, mutableAggBufferOffset: Int = 0,\n * case class Kurtosis(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Skewness(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Variance(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VariancePop(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VarianceSamp(child: Expression) extends UnaryExpression with AggregateExpression\n

/**
* Compute aggregate statistic from sufficient moments.
* @param centralMoments Length `momentOrder + 1` array of central moments needed to
* compute the aggregate stat.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is useful to mention that whether they are divided by n or not.

@mengxr
Copy link
Contributor

mengxr commented Oct 23, 2015

LGTM except minor inline comments. Ping @yhuai for a final pass.

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44234 has finished for PR 9003 at commit fd3f4d6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

n = n1 + n2
buffer1.setDouble(nOffset, n)
delta = mean2 - mean1
deltaN = if (n == 0.0) 0.0 else delta / n
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed divide by zero case here, which was causing problems when number of partitions > number of samples.

@SparkQA
Copy link

SparkQA commented Oct 24, 2015

Test build #44267 has finished for PR 9003 at commit cf8a14b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * case class Variance(child: Expression,\n * case class VarianceSamp(child: Expression,\n * case class VariancePop(child: Expression,\n * case class Skewness(child: Expression,\n * case class Kurtosis(child: Expression,\n * case class Kurtosis(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Skewness(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Variance(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VariancePop(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VarianceSamp(child: Expression) extends UnaryExpression with AggregateExpression\n

@SparkQA
Copy link

SparkQA commented Oct 24, 2015

Test build #44272 has finished for PR 9003 at commit b86386a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Oct 25, 2015

@sethah It seems that VARIANCE in HIVE returns the population variance. It is the same in MySQL: https://dev.mysql.com/doc/refman/5.0/en/group-by-functions.html#function_variance. So let's keep the same behavior. Then it is a little awkward that stddev is sample variance in our implementation. HIVE doesn't have stddev, while MySQL uses population stddev for stddev. I think it is more important to be compatible with existing SQL engines. We can address the stddev in a follow-up PR.

@sethah
Copy link
Contributor Author

sethah commented Oct 26, 2015

@mengxr Corrected variance to yield the population variance. Tests should pass now.

@SparkQA
Copy link

SparkQA commented Oct 26, 2015

Test build #44366 has finished for PR 9003 at commit 3045e3b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * case class Variance(child: Expression,\n * case class VarianceSamp(child: Expression,\n * case class VariancePop(child: Expression,\n * case class Skewness(child: Expression,\n * case class Kurtosis(child: Expression,\n * case class Kurtosis(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Skewness(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Variance(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VariancePop(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VarianceSamp(child: Expression) extends UnaryExpression with AggregateExpression\n

@sethah
Copy link
Contributor Author

sethah commented Oct 26, 2015

I guess this is still failing HiveComparisonTest due to small error in the variance.

[info] !== HIVE - 1 row(s) == == CATALYST - 1 row(s) ==
[info] !500 130091 260.182 0 498 142.92680950752384 20428.072876000006 500 130091 260.182 0 498 142.92680950752384 20428.07287599999 (HiveComparisonTest.scala:433)

abs(20428.072876000006 - 20428.07287599999) = 1.4552e-11

Not sure if we need to change the test?

@mengxr
Copy link
Contributor

mengxr commented Oct 27, 2015

Had an offline discussion with @yhuai. We can remove fetch_aggregate from whitelist. We already tested all the functions there in SQL queries. We can create a new JIRA to add it back and handle small numerical differences, but it should be orthogonal to this PR.

Btw, we can remove WIP from the PR title now.

override def prettyName: String = "variance_samp"

override def toString: String = s"VAR_SAMP($child)"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the error message if we call this function when spark.sql.useAggregate2=false? It will be good to provide a meaning error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai This is the error I get when calling one of the new skewness on a dataframe when spark.sql.useAggregate2=false:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost): org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree:{colName}#2

I'm not exactly sure where we ought to throw the error. I'd appreciate any tips on how to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change with AggregateExpression to with AggregateExpression1? Then, in the newInstance method, we throw an UnsupportedOperationException to let users know that they need to set spark.sql.useAggregate2 to true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai Thanks for the suggestion! I think that's working now if you want to review it :)

@sethah sethah changed the title [SPARK-10641][WIP][SQL] Add Skewness and Kurtosis Support [SPARK-10641][SQL] Add Skewness and Kurtosis Support Oct 29, 2015
@SparkQA
Copy link

SparkQA commented Oct 29, 2015

Test build #44595 has finished for PR 9003 at commit ff363cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * case class Variance(child: Expression,\n * case class VarianceSamp(child: Expression,\n * case class VariancePop(child: Expression,\n * case class Skewness(child: Expression,\n * case class Kurtosis(child: Expression,\n * case class Kurtosis(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Skewness(child: Expression) extends UnaryExpression with AggregateExpression\n * case class Variance(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VariancePop(child: Expression) extends UnaryExpression with AggregateExpression\n * case class VarianceSamp(child: Expression) extends UnaryExpression with AggregateExpression\n

@SparkQA
Copy link

SparkQA commented Oct 29, 2015

Test build #44611 has finished for PR 9003 at commit f49ce5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * case class Variance(child: Expression,\n * case class VarianceSamp(child: Expression,\n * case class VariancePop(child: Expression,\n * case class Skewness(child: Expression,\n * case class Kurtosis(child: Expression,\n * case class Kurtosis(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class Skewness(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class Variance(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class VariancePop(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class VarianceSamp(child: Expression) extends UnaryExpression with AggregateExpression1\n

@asfgit asfgit closed this in a01cbf5 Oct 29, 2015
@mengxr
Copy link
Contributor

mengxr commented Oct 29, 2015

Merge into master. Thanks! @JihongMA @sethah Could you send a PR to replace Stddev implementation using CentralMomentAgg? I ran the following and the difference is significant:

val df = sqlContext.range(100000000)
df.select(var_samp("id")).show();
df.select(stddev_samp("id")).show()

@JihongMA
Copy link
Contributor

Thanks @mengxr , I will send a PR for Stddev.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants