From 7e8a11ce99bdb32bc86de96497036b731bd41e47 Mon Sep 17 00:00:00 2001 From: Matthew Farrellee Date: Sun, 7 Sep 2014 11:26:55 -0400 Subject: [PATCH 1/2] [SPARK-927] detect numpy at time of use it is possible for numpy to be installed on the driver node but not on worker nodes. in such a case, using the rddsampler's constructor to detect numpy leads to failures on workers as they cannot import numpy (see below). the solution here is to detect numpy right before it is used on the workers. example code & error - yum install -y numpy pyspark >>> sc.parallelize(range(10000)).sample(False, .1).collect() ... 14/09/07 10:50:01 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, node4): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/test/spark/dist/python/pyspark/worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/home/test/spark/dist/python/pyspark/serializers.py", line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/home/test/spark/dist/python/pyspark/serializers.py", line 127, in dump_stream for obj in iterator: File "/home/test/spark/dist/python/pyspark/serializers.py", line 185, in _batched for item in iterator: File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 115, in func if self.getUniformSample(split) <= self._fraction: File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 57, in getUniformSample self.initRandomGenerator(split) File "/home/test/spark/dist/python/pyspark/rddsampler.py", line 42, in initRandomGenerator import numpy ImportError: No module named numpy org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) --- python/pyspark/rddsampler.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 55e247da0e4dc..3751f44320de1 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -22,24 +22,26 @@ class RDDSamplerBase(object): def __init__(self, withReplacement, seed=None): - try: - import numpy - self._use_numpy = True - except ImportError: - print >> sys.stderr, ( - "NumPy does not appear to be installed. " - "Falling back to default random generator for sampling.") - self._use_numpy = False - self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._random = None self._split = None self._rand_initialized = False + self._tried_numpy = False def initRandomGenerator(self, split): + if not self._tried_numpy: + try: + import numpy + self._use_numpy = True + except ImportError: + print >> sys.stderr, ( + "NumPy does not appear to be installed. " + "Falling back to default random generator for sampling.") + self._use_numpy = False + self._tried_numpy = True + if self._use_numpy: - import numpy self._random = numpy.random.RandomState(self._seed) else: self._random = random.Random(self._seed) From 1b7af9364d3151f0d1ac812cdd536fae989d52ab Mon Sep 17 00:00:00 2001 From: Matthew Farrellee Date: Mon, 15 Sep 2014 15:06:21 -0400 Subject: [PATCH 2/2] Maintain expectation of consistent generator, emit warning when violated driver w/, worker w/ - numpy used, no message emitted driver w/, worker w/o - numpy used on driver, not used on workers, warning emitted driver w/o, worker w/ - numpy not used on driver nor worker, no message emitted driver w/o, worker w/o - numpy not used on driver nor worker, no message emitted --- python/pyspark/rddsampler.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 3751f44320de1..cf7a80acea88d 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -28,17 +28,23 @@ def __init__(self, withReplacement, seed=None): self._split = None self._rand_initialized = False self._tried_numpy = False + try: + import numpy + self._driver_has_numpy = True + except ImportError: + self._driver_has_numpy = False def initRandomGenerator(self, split): if not self._tried_numpy: - try: - import numpy - self._use_numpy = True - except ImportError: - print >> sys.stderr, ( - "NumPy does not appear to be installed. " - "Falling back to default random generator for sampling.") - self._use_numpy = False + self._use_numpy = False + if self._driver_has_numpy: + try: + import numpy + self._use_numpy = True + except ImportError: + print >> sys.stderr, ( + "NumPy does not appear to be installed. " + "Falling back to default random generator for sampling.") self._tried_numpy = True if self._use_numpy: