Skip to content

Commit

Permalink
fixed style checks
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Jul 14, 2017
1 parent c12f658 commit 11a7a87
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
9 changes: 6 additions & 3 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,20 @@ class ArrowPandasSerializer(ArrowSerializer):
def __init__(self):
super(ArrowPandasSerializer, self).__init__()

# make an ArrowRecordBatch from a Pandas Series and serialize
def dumps(self, series):
"""
Make an ArrowRecordBatch from a Pandas Series and serialize
"""
import pyarrow as pa
# TODO: iterator could be a tuple
arr = pa.Array.from_pandas(series)
batch = pa.RecordBatch.from_arrays([arr], ["_0"])
#import asdb; asdb.set_trace()
return super(ArrowPandasSerializer, self).dumps(batch)

# deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series
def loads(self, obj):
"""
Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series
"""
table = super(ArrowPandasSerializer, self).loads(obj)
return [c.to_pandas() for c in table.itercolumns()]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream
import java.nio.channels.Channels

import scala.collection.JavaConverters._

import io.netty.buffer.ArrowBuf
import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
import org.apache.arrow.vector._
Expand All @@ -30,6 +31,7 @@ import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
import org.apache.arrow.vector.types.FloatingPointPrecision
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@
package org.apache.spark.sql.execution.python

import java.io.File

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.util.Utils
import org.apache.spark.{SparkEnv, TaskContext}


/**
* A physical plan that evaluates a [[PythonUDF]],
*/
* A physical plan that evaluates a [[PythonUDF]],
*/
case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
extends SparkPlan {

Expand Down Expand Up @@ -118,4 +119,4 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.python

import java.io.File

import net.razorvine.pickle.{Pickler, Unpickler}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonRunner}
import org.apache.spark.rdd.RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
Expand Down

0 comments on commit 11a7a87

Please sign in to comment.