-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21190][PYSPARK] Python Vectorized UDFs #18659
[SPARK-21190][PYSPARK] Python Vectorized UDFs #18659
Conversation
The following was used to test performance locally spark = SparkSession.builder.appName("vectorized_udfs").getOrCreate()
vectorize = True
if vectorize:
from numpy import log, exp
else:
from math import log, exp
def my_func(p1, p2):
w = 0.5
return exp(log(p1) + log(p2) - log(w))
df = spark.range(1 << 24, numPartitions=16).toDF("id") \
.withColumn("p1", rand()).withColumn("p2", rand())
my_udf = udf(my_func, DoubleType(), vectorized=vectorize)
df.withColumn("p", my_udf(col("p1"), col("p2"))) ** Updated with using
|
Some comments on the performance above
|
Test build #79680 has finished for PR 18659 at commit
|
Test build #79682 has finished for PR 18659 at commit
|
|
||
val genericRowData = fields.map { field => | ||
field.getAccessor.getObject(_index) | ||
}.toArray[Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using SpecificInternalRow
to improve performance? I think that it could eliminate some boxing/unboxing. The following is a snippet for this usage.
val fieldTypes = fields.map { field =>
field match {
case NullableIntVector => IntegerType
case NullableFloat8Vector => DoubleType
...
}
}
val row = new SpecificInternalRow(fieldTypes)
fields.zipWithIndex.map { case (field, i) =>
field match {
case NullableIntVector =>
row.setInt(i, field.asInstanceOf[NullableIntVector].getAccessor.get(_index))
case NullableFloat8Vector => LongType
row.setDouble(i, field.asInstanceOf[NullableFloat8Vector].getAccessor.get(_index))
...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kiszk , I'll give that a shot and see if it helps!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented arrow -> unsafe row conversions in:
icexelloss@8f38c15#diff-52cca47e7a940849b28d476ddf99d65eR575
This reuses the row object and doesn't do boxing. Hopefully it's useful to you as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler
As @cloud-fan suggested here, it is good to create ColumnarBatch
with ArrowColumnVector
and get an iterator. It looks simpler implementation.
cc: @ueshin
The following is code piece.
new Iterator[InternalRow] {
private val _allocator = new RootAllocator(Long.MaxValue)
private var _reader: ArrowFileReader = _
private var _root: VectorSchemaRoot = _
private var _index = 0
private var _iterator = null
loadNextBatch()
override def hasNext: Boolean = _root != null && _index < _root.getRowCount && _iterator.hasNext
override def next(): InternalRow = {
_index += 1
if (_index >= _root.getRowCount) {
_index = 0
loadNextBatch()
if (!hasNext) {
close()
}
}
_iterator.next()
}
...
private def loadNextBatch(): Unit = {
closeReader()
if (iter.hasNext) {
val in = new ByteArrayReadableSeekableByteChannel(iter.next().asPythonSerializable)
_reader = new ArrowFileReader(in, _allocator)
_root = _reader.getVectorSchemaRoot // throws IOException
_reader.loadNextBatch() // throws IOException
val arrowSchema = ArrowUtils.fromArrowSchema(_root.getSchema)
val fields = _root.getFieldVectors
val rows = _root.getRowCount
val columnarBatch = ColumnarBatch.allocateArrow(
_root.getFieldVectors.asInstanceOf[java.util.List[ValueVector]],
ArrowUtils.fromArrowSchema(_root.getSchema), _root.getRowCount)
_iterator = columnarBatch.rowIterator
}
}
public final class ColumnarBatch {
...
public static ColumnarBatch allocateArrow(List<ValueVector> vectors, StructType schema, int maxRows) {
// need to implement the following constructor for arrowColumnVector
return new ColumnarBatch(vectors, schema, maxRows);
}
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kiszk , I'm giving it a try!
return columns; | ||
} | ||
|
||
public static ColumnarBatch createReadOnly( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin I made some changes here to allow for use with ArrowColumnVectors
. I was thinking of putting these in a separate JIRA because it can be used regardless of what is done with vectorized UDFs. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler I agree with you, let's separate it from this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will do. I created https://issues.apache.org/jira/browse/SPARK-21583 for this
Test build #80030 has finished for PR 18659 at commit
|
46e4112
to
912143e
Compare
Test build #80264 has finished for PR 18659 at commit
|
Test build #80265 has finished for PR 18659 at commit
|
a01a2d3
to
38474d8
Compare
Test build #81138 has finished for PR 18659 at commit
|
38474d8
to
cc7ed5a
Compare
Test build #81321 has finished for PR 18659 at commit
|
Test build #81478 has finished for PR 18659 at commit
|
1503fa0
to
fdea603
Compare
fdea603
to
4f6c950
Compare
python/pyspark/sql/functions.py
Outdated
@@ -2112,7 +2113,7 @@ def wrapper(*args): | |||
|
|||
|
|||
@since(1.3) | |||
def udf(f=None, returnType=StringType()): | |||
def udf(f=None, returnType=StringType(), vectorized=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixcheung does this fit your idea for a more generic decorator? Not exclusively labeled as pandas_udf
, just enable vectorization with a flag, e.g. @udf(DoubleType(), vectorized=True)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @pandas_udf(DoubleType())
is better than @udf(DoubleType(), vectorized=True)
, which is more concise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we discussed in the email, we should also accept data type of string format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also **kwargs
to bring the size information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the consensus is for pandas_udf
and I'm fine with that too. I'll make that change and the others brought up here.
Cool!
|
val outputRowIterator = ArrowConverters.fromPayloadIterator( | ||
outputIterator.map(new ArrowPayload(_)), context) | ||
|
||
assert(schemaOut.equals(outputRowIterator.schema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixcheung , I think you had also brought up checking the return type matches what was defined in the UDF. This is done here.
Test build #81479 has finished for PR 18659 at commit
|
python/pyspark/serializers.py
Outdated
series = [series] | ||
series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] | ||
arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) for s in series] | ||
batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use xrange
.
python/pyspark/serializers.py
Outdated
if not isinstance(series, (list, tuple)) or \ | ||
(len(series) == 2 and isinstance(series[1], pa.DataType)): | ||
series = [series] | ||
series = [(s, None) if not isinstance(s, (list, tuple)) else s for s in series] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use generator comprehension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would work, but does it help much since series
will already be a list or tuple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it actually affects the performance because we can avoid an extra loop:
def im_map(x):
print("I am map %s" % x)
return x
def im_gen(x):
print("I am gen %s" % x)
return x
def im_list(x):
print("I am list %s" % x)
return x
items = list(range(3))
map(im_map, [im_list(item) for item in items])
map(im_map, (im_gen(item) for item in items))
And .. this actually affects the performance up to my knowledge:
import time
items = list(xrange(int(1e8)))
for _ in xrange(10):
s = time.time()
_ = map(lambda x: x, [item for item in items])
print "I am list comprehension with a list: %s" % (time.time() - s)
s = time.time()
_ = map(lambda x: x, (item for item in items))
print "I am generator expression with a list: %s" % (time.time() - s)
This gives me ~13% improvement in Python 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not be a big deal but .. I usually use generator if it iterates once and is discarded. This should consume less memory too as list comprehension should be evaluated once first up to my knowledge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon , I suppose if there are more than a few series then it might make some difference. In that case, every little bit helps so sounds good to me!
python/pyspark/serializers.py
Outdated
reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) | ||
batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] | ||
# NOTE: a 0-parameter pandas_udf will produce an empty batch that can have num_rows set | ||
num_rows = sum([batch.num_rows for batch in batches]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use generator comprehension here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this makes sense because its a summation, no sense in making a list then adding it all up
python/pyspark/serializers.py
Outdated
""" | ||
import pyarrow as pa | ||
reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) | ||
batches = [reader.get_batch(i) for i in range(reader.num_record_batches)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And .. xrange
here too
what if users installed an older version of pyarrow? Shall we throw exception and ask them to upgrade, or work around type casting issue? |
Test build #81945 has finished for PR 18659 at commit
|
Thanks for the reviews @ueshin @viirya and @HyukjinKwon ! I updated with your comments |
@cloud-fan , in regards to handling of problems that might come up if using different versions of Arrow, I think we should first decide on a minimum supported version, then maybe we could put that version of pyarrow as a requirement for PySpark. If we decide to use 0.4.1 which we currently use, then we should probably work around the type casting issue and make sure this PR works with that version. |
Test build #81955 has finished for PR 18659 at commit
|
ok let's work around the type casting issue and discuss arrow upgrading later. |
* \ / | ||
* \ socket (input of UDF) | ||
* \ / | ||
* upstream (from child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Upstream
better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think upstream
is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I put myself uncomfortable to see Downstream
upper, forgive me..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine but either looks fine and not a big deal.
@ueshin I haven't had much luck with the casting workaround:
It appears that it forces a copy for floating point -> integer and then checks if any NaNs, so I get the error |
@BryanCutler Hmm, I'm not exactly sure the reason why it doesn't work (or mine works) but I guess we can use
|
Thanks @ueshin , that works to allow the tests to pass. I do worry that it might cause some other issues and I would much prefer we upgrade Arrow to handle this, but I'll push this and we can discuss. |
Test build #82042 has finished for PR 18659 at commit
|
Test build #82053 has finished for PR 18659 at commit
|
""" | ||
|
||
def __init__(self): | ||
super(ArrowPandasSerializer, self).__init__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that was leftovers.. I'll remove it in a followup.
LGTM, merging to master! We can address remaining minor comments in follow-up, and have new PRs to remove the 0-parameter UDF and use arrow streaming protocol. |
Thanks @cloud-fan @ueshin and others who reviewed! I'll make followups to disable 0-param and complete the docs for this. |
What changes were proposed in this pull request?
This PR adds vectorized UDFs to the Python API
Proposed API
Introduce a flag to turn on vectorization for a defined UDF, for example:
or
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional
**kwargs
and when evaluated, will contain a key "size" that will give the required length of the output. For example:How was this patch tested?
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
TODO