Skip to content

Commit

Permalink
use string for version
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed May 16, 2015
1 parent 47c6278 commit 6ce5096
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] class PythonRDD(
pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
pythonVer: Int,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
Expand Down Expand Up @@ -212,7 +212,7 @@ private[spark] class PythonRDD(
// Partition index
dataOut.writeInt(split.index)
// Python version of driver
dataOut.writeInt(pythonVer)
PythonRDD.writeUTF(pythonVer, dataOut)
// sparkFilesDir
PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
// Python includes (*.zip and *.egg files)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._jvm.PythonAccumulatorParam(host, port))

self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
self.pythonVer = sys.version_info.major * 10 + sys.version_info.minor
self.pythonVer = "%d.%d" % sys.version_info[:2]

# Broadcast's __reduce__ method stores Broadcast instances here.
# This allows other code to determine which Broadcast instances have
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ def main(infile, outfile):
if split_index == -1: # for unit tests
exit(-1)

version = read_int(infile)
version = (version // 10, version % 10)
if version != sys.version_info[:2]:
version = utf8_deserializer.loads(infile)
if version != "%d.%d" % sys.version_info[:2]:
raise Exception(("Python in worker has different version %s than that in " +
"driver %s, PySpark cannot run with different minor versions") %
(sys.version_info[:2], version))
("%d.%d" % sys.version_info[:2], version))

# initialize global state
shuffle.MemoryBytesSpilled = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: Int,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
stringDataType: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[sql] case class UserDefinedPythonFunction(
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: Int,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
dataType: DataType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] case class PythonUDF(
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: Int,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]],
dataType: DataType,
Expand Down

0 comments on commit 6ce5096

Please sign in to comment.