Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36553][ML] KMeans avoid compute auxiliary statistics for large K #35457

Closed
wants to merge 3 commits into from

Conversation

zhengruifeng
Copy link
Contributor

What changes were proposed in this pull request?

SPARK-31007 introduce an auxiliary statistics to speed up computation in KMeasn.

However, it needs a array of size k * (k + 1) / 2, which may cause overflow or OOM when k is too large.

So we should skip this optimization in this case.

Why are the changes needed?

avoid overflow or OOM when k is too large (like 50,000)

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing testsuites

@github-actions github-actions bot added the MLLIB label Feb 9, 2022
init

init
@anders-rydbirk
Copy link

@zhengruifeng Thanks for picking this one up!

@zhengruifeng
Copy link
Contributor Author

cc @srowen

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the two code paths but maybe this is the easiest fix. It un-optimizes large k though. Is the idea to un-pack that triangular array not viable?

@@ -117,6 +117,17 @@ private[spark] abstract class DistanceMeasure extends Serializable {
packedValues
}

def findClosest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this overload used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used in both training and prediction. statistics is optional in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is a new method but I don't see it called, maybe I'm missing something

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Feb 10, 2022

since the matrix is symmetric, if we un-pack it, then we will get a even bigger matrix of size k * k.

#27758 (comment)

@srowen
Copy link
Member

srowen commented Feb 10, 2022

Sorry, I guess I mean make it into an array of arrays, not one big array.

@zhengruifeng
Copy link
Contributor Author

I think I made it too complex.

according to @anders-rydbirk your description in the ticket:

Possible workaround:

    Roll back to Spark 3.0.0 since a KMeansModel generated with 3.0.0 cannot be loaded in 3.1.1.
    Reduce K. Currently trying with 45000.

maybe we just need to chang k * (k + 1) / 2 to (k.toLong * (k + 1) / 2).toInt?

scala> val k = 50000
val k: Int = 50000

scala> k * (k + 1) / 2
val res8: Int = -897458648

scala> (k.toLong * (k + 1) / 2).toInt
val res9: Int = 1250025000

scala> val k = 45000
val k: Int = 45000

scala> k * (k + 1) / 2
val res10: Int = 1012522500

scala> (k.toLong * (k + 1) / 2).toInt
val res11: Int = 1012522500

Sorry, I guess I mean make it into an array of arrays, not one big array.

@srowen yes, using arrays of sizes (1, 2, ..., k) is another choice

@srowen
Copy link
Member

srowen commented Feb 10, 2022

Array sizes can't be long so if it doesn't fit in an int it won't work

@zhengruifeng
Copy link
Contributor Author

there are two limits:

1, array size, must be less than Int.MaxValue;

2, its size should fit in memory for initialization and broadcasting.

with --driver-memory=8G, I can not create an array of 1250025000 doubles. If we switch to arrays of arrays, I am afraid it's prone to OOM for large K.

@zhengruifeng
Copy link
Contributor Author

@srowen

I can switch to array[array[double] if you perfer it, I am netural on it.

my main concern is, this optional statistics may be too large. In this case, k=50,000, it is much larger than the clustering centers (dim=3).

@srowen
Copy link
Member

srowen commented Feb 15, 2022

Your current design is fine, I trust your judgment

} else {
findClosest(centers, point)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add the function description like the other existing def findClosest functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I will update this PR

val k = clusterCenters.length
val numFeatures = clusterCenters.head.size
if (DistanceMeasure.shouldComputeStatistics(k) &&
DistanceMeasure.shouldComputeStatisticsLocally(k, numFeatures)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

@zhengruifeng
Copy link
Contributor Author

I think this should also be back-ported to 3.1/3.2

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's adding complexity but I think for a reasonable reason, to fix a perf regression

@@ -117,6 +117,17 @@ private[spark] abstract class DistanceMeasure extends Serializable {
packedValues
}

def findClosest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is a new method but I don't see it called, maybe I'm missing something

@zhengruifeng
Copy link
Contributor Author

image

@srowen It is used in both training (in the .ml side) and prediction (in the .mllib side), the switch is done by just changing the type of stats in distanceMeasureInstance.findClosest(centers, stats, point) from Array[Double] to Option[Array[Double]]

@srowen
Copy link
Member

srowen commented Feb 21, 2022

Do existing call sites bind to the new method? I can't see how a new method is called when nothing new calls it, but if you understand it and it works, nevermind

@zhengruifeng
Copy link
Contributor Author

Do existing call sites bind to the new method?

NO.

existing two methods are used in DistanceMeasure and DistanceMeasureSuite;

but def findClosest(centers: Array[VectorWithNorm], point: VectorWithNorm) is also used in KMeans initialization algorithm initKMeansParallel and BisectingKMeans.


// Execute iterations of Lloyd's algorithm until converged
while (iteration < maxIterations && !converged) {
val bcCenters = sc.broadcast(centers)
val stats = if (shouldDistributed) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous stats is a Array[Double]


// Execute iterations of Lloyd's algorithm until converged
while (iteration < maxIterations && !converged) {
val bcCenters = sc.broadcast(centers)
val stats = if (shouldDistributed) {
distanceMeasureInstance.computeStatisticsDistributedly(sc, bcCenters)
val stats = if (shouldComputeStats) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, it is a Option[Array[Double]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the following val (bestCenter, cost) = distanceMeasureInstance.findClosest(centers, stats, point) will call this new method, without code change in the call sites.

@huaxingao
Copy link
Contributor

I will merge this tomorrow if there are no further comments.

@huaxingao huaxingao closed this in ad5427e Mar 2, 2022
huaxingao pushed a commit that referenced this pull request Mar 2, 2022
### What changes were proposed in this pull request?

SPARK-31007 introduce an auxiliary statistics to speed up computation in KMeasn.

However, it needs a array of size `k * (k + 1) / 2`, which may cause overflow or OOM when k is too large.

So we should skip this optimization in this case.

### Why are the changes needed?

avoid overflow or OOM when k is too large (like 50,000)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing testsuites

Closes #35457 from zhengruifeng/kmean_k_limit.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ad5427e)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
huaxingao pushed a commit that referenced this pull request Mar 2, 2022
### What changes were proposed in this pull request?

SPARK-31007 introduce an auxiliary statistics to speed up computation in KMeasn.

However, it needs a array of size `k * (k + 1) / 2`, which may cause overflow or OOM when k is too large.

So we should skip this optimization in this case.

### Why are the changes needed?

avoid overflow or OOM when k is too large (like 50,000)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing testsuites

Closes #35457 from zhengruifeng/kmean_k_limit.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ad5427e)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
@huaxingao
Copy link
Contributor

Merged to master/3.2/3.1. Thanks!

@zhengruifeng zhengruifeng deleted the kmean_k_limit branch March 3, 2022 03:39
@zhengruifeng
Copy link
Contributor Author

@huaxingao @srowen @dongjoon-hyun Thanks for reviewing!

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
### What changes were proposed in this pull request?

SPARK-31007 introduce an auxiliary statistics to speed up computation in KMeasn.

However, it needs a array of size `k * (k + 1) / 2`, which may cause overflow or OOM when k is too large.

So we should skip this optimization in this case.

### Why are the changes needed?

avoid overflow or OOM when k is too large (like 50,000)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing testsuites

Closes apache#35457 from zhengruifeng/kmean_k_limit.

Authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ad5427e)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit d5e90cf)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants