Skip to content

Commit

Permalink
add countApproxDistinct()
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 26, 2014
1 parent b21ae5b commit e97e342
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
def countApproxDistinct(p: Int, sp: Int): Long = {
require(p >= 4, s"p ($p) must be greater than 0")
require(p >= 4, s"p ($p) must be at least 4")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
Expand Down
33 changes: 30 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,7 +1864,7 @@ def _is_pickled(self):
return True
return False

def _to_jrdd(self):
def _to_java_object_rdd(self):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
Expand Down Expand Up @@ -1899,7 +1899,7 @@ def sumApprox(self, timeout, confidence=0.95):
>>> (rdd.sumApprox(1000) - r) / r < 0.05
True
"""
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
Expand All @@ -1915,11 +1915,38 @@ def meanApprox(self, timeout, confidence=0.95):
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
"""
jrdd = self.map(float)._to_jrdd()
jrdd = self.map(float)._to_java_object_rdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())

def countApproxDistinct(self, relativeSD=0.05):
"""
:: Experimental ::
Return approximate number of distinct elements in the RDD.
The algorithm used is based on streamlib's implementation of
"HyperLogLog in Practice: Algorithmic Engineering of a State
of The Art Cardinality Estimation Algorithm", available
<a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
This support all the types of objects, which is supported by
Pyrolite, nearly all builtin types.
@param relativeSD Relative accuracy. Smaller values create
counters that require more space.
It must be greater than 0.000017.
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
>>> 950 < n < 1050
True
"""
if relativeSD < 0.000017:
raise ValueError("relativeSD should be greater than 0.000017")
if relativeSD > 0.37:
raise ValueError("relativeSD should be smaller than 0.37")
return self._to_java_object_rdd().countApproxDistinct(relativeSD)


class PipelinedRDD(RDD):

Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,15 @@ def test_zip_with_different_number_of_items(self):
self.assertEquals(a.count(), b.count())
self.assertRaises(Exception, lambda: a.zip(b).count())

def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050)
self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050)
self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050)
self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050)
self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001))
self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5))


class TestIO(PySparkTestCase):

Expand Down

0 comments on commit e97e342

Please sign in to comment.