From 3133a60e4488402bfad95fd8d13d6f2e7bd8e888 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 9 Sep 2014 17:53:33 -0700 Subject: [PATCH] fix accumulator with reused worker --- python/pyspark/tests.py | 11 +++++++++++ python/pyspark/worker.py | 1 + 2 files changed, 12 insertions(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index a44592e5e1e34..76a398a952fb8 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1106,6 +1106,17 @@ def test_after_jvm_exception(self): rdd = self.sc.parallelize(range(100), 1) self.assertEqual(100, rdd.map(str).count()) + def test_accumulator_when_reuse_worker(self): + from pyspark.accumulators import INT_ACCUMULATOR_PARAM + acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM) + self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x)) + self.assertEqual(sum(range(100)), acc1.value) + + acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM) + self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x)) + self.assertEqual(sum(range(100)), acc2.value) + self.assertEqual(sum(range(100)), acc1.value) + class TestSparkSubmit(unittest.TestCase): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 77254f599aa04..9acad54eb7e42 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -76,6 +76,7 @@ def main(infile, outfile): bid = - bid - 1 _broadcastRegistry.pop(bid, None) + _accumulatorRegistry.clear() command = pickleSer._read_with_length(infile) (func, deserializer, serializer) = command init_time = time.time()