Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Implement Hilbert clustering #2314

Closed
wants to merge 5 commits into from

Conversation

weiluo-db
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR is part of #1874.

This PR implements a new data clustering algorithm based on Hilbert curve. No code uses this new implementation yet. Will implement incremental clustering using ZCube in follow-up PRs.

Design can be found at: https://docs.google.com/document/d/1FWR3odjOw4v4-hjFy_hVaNdxHVs4WuK1asfB6M6XEMw/edit#heading=h.uubbjjd24plb.

How was this patch tested?

Unit tests.

Does this PR introduce any user-facing changes?

No.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few nits, but overall lgtm

@weiluo-db weiluo-db requested a review from imback82 November 21, 2023 22:25
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@imback82
Copy link
Contributor

cc @bart-samwel FYI

/**
* Transforms the provided integer columns into their corresponding position in the hilbert
* curve for the given dimension.
* @see http://www.dcs.bbk.ac.uk/~jkl/thesis.pdf
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Fixed.


/**
* Represents a hilbert index built from the provided columns.
* The columns are expected to all be Ints and to have at most numBits.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at most numBits individually, or collectively?

Copy link
Contributor Author

@weiluo-db weiluo-db Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Individually. The Scaladoc from the caller side mentions this as well:

* @param numBits The number of bits to consider in each column.
.


/**
* The following code is based on this paper:
* http://www.dcs.bbk.ac.uk/~jkl/thesis.pdf
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* This will construct an x2-gray-codes sequence of order n as described in
* http://www.dcs.bbk.ac.uk/~jkl/thesis.pdf
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link rot

return HilbertIndex9.STATE_LIST;
default:
throw new SparkException(
"Cannot perform hilbert clustering on more than 9 dimensions");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will catch <2 as well, maybe update this error, or add another specific one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Updated the error. FWIW, the only caller, HilbertClustering always ensures that n is greater than 1:

assert(cols.size > 1, "Cannot do Hilbert clustering by zero or one column!")
.

* The columns are expected to all be Ints and to have at most numBits.
* The points along the hilbert curve are represented by Longs.
*/
private[sql] case class HilbertLongIndex(numBits: Int, children: Seq[Expression])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a maximum practical value of numBits and/or a maximum practical number of children given we're targeting a 64-bit space of output values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of children is the number of clustering columns, and it's usually a good idea to keep it <=4 in practice, as you would for zorder-by columns. numBits is a tradeoff between better granularity and computation cost, as explained by the config's doc:

val MDC_NUM_RANGE_IDS =
SQLConf.buildConf("spark.databricks.io.skipping.mdc.rangeId.max")
.internal()
.doc("This controls the domain of rangeId values to be interleaved. The bigger, the better " +
"granularity, but at the expense of performance (more data gets sampled).")
.intConf
.checkValue(_ > 1, "'spark.databricks.io.skipping.mdc.rangeId.max' must be greater than 1")
.createWithDefault(1000)

val partCount = new File(tempDir, "source").listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.startsWith("part-0000")
Seq("zorder", "hilbert").foreach{ curve =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before { seems like the house style here :)

@Orpheuz
Copy link

Orpheuz commented Nov 30, 2023

Hey, @weiluo-db, I don't know if it's the right place to ask but I was curious if there was any particular reason to prefer the state tables approach over John Skilling's iterative approach found in https://pubs.aip.org/aip/acp/article-abstract/707/1/381/719611/Programming-the-Hilbert-curve?redirectedFrom=PDF

@weiluo-db
Copy link
Contributor Author

Hey, @weiluo-db, I don't know if it's the right place to ask but I was curious if there was any particular reason to prefer the state tables approach over John Skilling's iterative approach found in https://pubs.aip.org/aip/acp/article-abstract/707/1/381/719611/Programming-the-Hilbert-curve?redirectedFrom=PDF

@Orpheuz Thanks a lot for the pointer! It does appear to be a promising alternative! That said, there wasn't any particular reason why we implemented Hilbert curve using state tables per se - the iterative approach that you pointed out above just wasn't under our radar. Further evaluating and possibly adopting the iterative approach could certainly be good follow-ups.

result
}

private[this] case class Row(y: Int, x1: Int, x2: (Int, Int), dy: Int, m: HilbertMatrix)

This comment was marked as resolved.

Copy link
Contributor Author

@weiluo-db weiluo-db Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row is used only by getStateGenerator(n: Int):

def getStateGenerator(n: Int): GeneratorTable = {
val x2s = getX2GrayCodes(n)
val len = 1 << n
val rows = (0 until len).map { i =>
val x21 = x2s(i << 1)
val x22 = x2s((i << 1) + 1)
val dy = x21 ^ x22
Row(
y = i,
x1 = i ^ (i >>> 1),
x2 = (x21, x22),
dy = dy,
m = HilbertMatrix(n, x21, getSetColumn(n, dy))
)
}
new GeneratorTable(n, rows)
}

Because n can only be between 2 to 9, there can be < 2^10 such Rows in memory in total, which should be negligible in practice.

OTOH, it just came to me that x2 and dy are not really needed for later calculation, so I just decided to remove them from Row altogether.

@weiluo-db weiluo-db requested a review from acruise December 12, 2023 16:47
@@ -0,0 +1,405 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: copyright years may need updating

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like every file under spark/ always uses 2021.

@weiluo-db weiluo-db requested a review from acruise December 13, 2023 03:50
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link

@acruise acruise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for indulging my questions! :)

andreaschat-db pushed a commit to andreaschat-db/delta that referenced this pull request Jan 5, 2024
This PR is part of delta-io#1874.

This PR implements a new data clustering algorithm based on Hilbert curve. No code uses this new implementation yet. Will implement incremental clustering using ZCube in follow-up PRs.

Design can be found at: https://docs.google.com/document/d/1FWR3odjOw4v4-hjFy_hVaNdxHVs4WuK1asfB6M6XEMw/edit#heading=h.uubbjjd24plb.

Closes delta-io#2314

GitOrigin-RevId: abafaa717ba8f7d8809114858c0fd2a25861fcb8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants