Skip to content

Commit

Permalink
[SPARK-2871] [PySpark] add countApproxDistinct() API
Browse files Browse the repository at this point in the history
RDD.countApproxDistinct(relativeSD=0.05):

        :: Experimental ::
        Return approximate number of distinct elements in the RDD.

        The algorithm used is based on streamlib's implementation of
        "HyperLogLog in Practice: Algorithmic Engineering of a State
        of The Art Cardinality Estimation Algorithm", available
        <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.

        This support all the types of objects, which is supported by
        Pyrolite, nearly all builtin types.

        param relativeSD Relative accuracy. Smaller values create
                           counters that require more space.
                           It must be greater than 0.000017.

        >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
        >>> 950 < n < 1050
        True
        >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
        >>> 18 < n < 22
        True

Author: Davies Liu <davies.liu@gmail.com>

Closes #2142 from davies/countApproxDistinct and squashes the following commits:

e20da47 [Davies Liu] remove the correction in Python
c38c4e4 [Davies Liu] fix doc tests
2ab157c [Davies Liu] fix doc tests
9d2565f [Davies Liu] add commments and link for hash collision correction
d306492 [Davies Liu] change range of hash of tuple to [0, maxint]
ded624f [Davies Liu] calculate hash in Python
4cba98f [Davies Liu] add more tests
a85a8c6 [Davies Liu] Merge branch 'master' into countApproxDistinct
e97e342 [Davies Liu] add countApproxDistinct()
  • Loading branch information
davies authored and JoshRosen committed Sep 2, 2014
1 parent 81b9d5b commit e2c901b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
def countApproxDistinct(p: Int, sp: Int): Long = {
require(p >= 4, s"p ($p) must be greater than 0")
require(p >= 4, s"p ($p) must be at least 4")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
Expand Down
39 changes: 34 additions & 5 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def portable_hash(x):
>>> portable_hash(None)
0
>>> portable_hash((None, 1))
>>> portable_hash((None, 1)) & 0xffffffff
219750521
"""
if x is None:
Expand All @@ -72,7 +72,7 @@ def portable_hash(x):
for i in x:
h ^= portable_hash(i)
h *= 1000003
h &= 0xffffffff
h &= sys.maxint
h ^= len(x)
if h == -1:
h = -2
Expand Down Expand Up @@ -1942,7 +1942,7 @@ def _is_pickled(self):
return True
return False

def _to_jrdd(self):
def _to_java_object_rdd(self):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
Expand Down Expand Up @@ -1977,7 +1977,7 @@ def sumApprox(self, timeout, confidence=0.95):
>>> (rdd.sumApprox(1000) - r) / r < 0.05
True
"""
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
Expand All @@ -1993,11 +1993,40 @@ def meanApprox(self, timeout, confidence=0.95):
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
"""
jrdd = self.map(float)._to_jrdd()
jrdd = self.map(float)._to_java_object_rdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())

def countApproxDistinct(self, relativeSD=0.05):
"""
:: Experimental ::
Return approximate number of distinct elements in the RDD.
The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice: Algorithmic Engineering of a State
of The Art Cardinality Estimation Algorithm", available
<a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
@param relativeSD Relative accuracy. Smaller values create
counters that require more space.
It must be greater than 0.000017.
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
>>> 950 < n < 1050
True
>>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
>>> 18 < n < 22
True
"""
if relativeSD < 0.000017:
raise ValueError("relativeSD should be greater than 0.000017")
if relativeSD > 0.37:
raise ValueError("relativeSD should be smaller than 0.37")
# the hash space in Java is 2^32
hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF)
return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)


class PipelinedRDD(RDD):

Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,22 @@ def test_zip_with_different_number_of_items(self):
self.assertEquals(a.count(), b.count())
self.assertRaises(Exception, lambda: a.zip(b).count())

def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050)
self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050)
self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050)
self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050)

rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7)
self.assertTrue(18 < rdd.countApproxDistinct() < 22)
self.assertTrue(18 < rdd.map(float).countApproxDistinct() < 22)
self.assertTrue(18 < rdd.map(str).countApproxDistinct() < 22)
self.assertTrue(18 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 22)

self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001))
self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5))

def test_histogram(self):
# empty
rdd = self.sc.parallelize([])
Expand Down

0 comments on commit e2c901b

Please sign in to comment.