diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 556a8a776b29d..2d92f6a42b308 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -47,7 +47,7 @@ private[spark] class PythonRDD( pythonIncludes: JList[String], preservePartitoning: Boolean, pythonExec: String, - pythonVer: Int, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]]) extends RDD[Array[Byte]](parent) { @@ -212,7 +212,7 @@ private[spark] class PythonRDD( // Partition index dataOut.writeInt(split.index) // Python version of driver - dataOut.writeInt(pythonVer) + PythonRDD.writeUTF(pythonVer, dataOut) // sparkFilesDir PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut) // Python includes (*.zip and *.egg files) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c35ce2ca64d1f..d25ee855235be 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -173,7 +173,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') - self.pythonVer = sys.version_info.major * 10 + sys.version_info.minor + self.pythonVer = "%d.%d" % sys.version_info[:2] # Broadcast's __reduce__ method stores Broadcast instances here. # This allows other code to determine which Broadcast instances have diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index aeb5abb753aae..93df9002be377 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -57,12 +57,11 @@ def main(infile, outfile): if split_index == -1: # for unit tests exit(-1) - version = read_int(infile) - version = (version // 10, version % 10) - if version != sys.version_info[:2]: + version = utf8_deserializer.loads(infile) + if version != "%d.%d" % sys.version_info[:2]: raise Exception(("Python in worker has different version %s than that in " + "driver %s, PySpark cannot run with different minor versions") % - (sys.version_info[:2], version)) + ("%d.%d" % sys.version_info[:2], version)) # initialize global state shuffle.MemoryBytesSpilled = 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index ef6928a649723..3cc5c2441d8a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -46,7 +46,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, - pythonVer: Int, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]], stringDataType: String): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala index 8626ed92596c2..a02e202d2eebc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala @@ -58,7 +58,7 @@ private[sql] case class UserDefinedPythonFunction( envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, - pythonVer: Int, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]], dataType: DataType) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 6feb8001beb2e..11b2897f76786 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -46,7 +46,7 @@ private[spark] case class PythonUDF( envVars: JMap[String, String], pythonIncludes: JList[String], pythonExec: String, - pythonVer: Int, + pythonVer: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]], dataType: DataType,