Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22125][PYSPARK][SQL] Enable Arrow Stream format for vectorized UDF. #19349

Closed
wants to merge 15 commits into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Sep 26, 2017

What changes were proposed in this pull request?

Currently we use Arrow File format to communicate with Python worker when invoking vectorized UDF but we can use Arrow Stream format.

This pr replaces the Arrow File format with the Arrow Stream format.

How was this patch tested?

Existing tests.

@ueshin
Copy link
Member Author

ueshin commented Sep 26, 2017

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82175 has finished for PR 19349 at commit e62d619.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ExtractPythonUDFs(conf: SQLConf) extends Rule[SparkPlan] with PredicateHelper

@ueshin
Copy link
Member Author

ueshin commented Sep 26, 2017

The performance test I did in my local based on @BryanCutler's (#18659 (comment)) is as follows:

from pyspark.sql.functions import *
from pyspark.sql.types import *

@udf(DoubleType())
def my_udf(p1, p2):
    from math import log, exp
    return exp(log(p1) + log(p2) - log(0.5))

@pandas_udf(DoubleType())
def my_pandas_udf(p1, p2):
    from numpy import log, exp
    return exp(log(p1) + log(p2) - log(0.5))

df = spark.range(1 << 24, numPartitions=16).toDF("id") \
    .withColumn("p1", rand()).withColumn("p2", rand())
df_udf = df.withColumn("p", my_udf(col("p1"), col("p2")))
df_pandas_udf = df.withColumn("p", my_pandas_udf(col("p1"), col("p2")))

Normal UDF:

%timeit -n2 df_udf.select(sum(col('p'))).collect()

12.2 s ± 456 ms per loop (mean ± std. dev. of 7 runs, 2 loops each)

Vectorized UDF before this patch:

%timeit -n2 df_pandas_udf.select(sum(col('p'))).collect()

1.91 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 2 loops each)

Vectorized UDF after this patch:

%timeit -n2 df_pandas_udf.select(sum(col('p'))).collect()

1.67 s ± 223 ms per loop (mean ± std. dev. of 7 runs, 2 loops each)

Environment:

  • Intel(R) Core(TM) i7-6820HQ CPU @ 2.70GHz
  • Java HotSpot(TM) 64-Bit Server VM 1.8.0_144-b01 on Mac OS X 10.12.6
  • Python 3.6.1 64bit [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)]
    • pandas 0.20.1
    • pyarrow 0.4.1

Updated commands because the configuration to enable Arrow stream format was removed.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82180 has finished for PR 19349 at commit 14aa3b6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82183 has finished for PR 19349 at commit 14aa3b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82184 has finished for PR 19349 at commit 14aa3b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

for series in iterator:
batch = _create_batch(series)
if writer is None:
write_int(0, stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a new entry in SpecialLenths and use it here instead of 0?

.internal()
.doc("When using Apache Arrow, use Arrow stream protocol if possible.")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any known problems? I think we should enable it by default, otherwise most users can't benefit from it

private var closed = false

context.addTaskCompletionListener { _ =>
// todo: we need something like `read.end()`, which release all the resources, but leave
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @BryanCutler for arrow side issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that ArrowStreamReader.close() should not close the input stream. I filed https://issues.apache.org/jira/browse/ARROW-1613 to fix this.

@cloud-fan
Copy link
Contributor

LGTM

@BryanCutler
Copy link
Member

Nice job on refactoring PythonRunner! I think we should just replace the arrow file format with stream format for pandas udf instead of having a new conf to enable it, as long as all the issues are worked out. Along with being a little faster, it's also easier on memory usage. I'd like to do the same for toPandas() also, but that can be a followup. Is it possible to do away with the SQLConf and maybe rename some of these classes to be more general, e.g. ArrowStreamPythonUDFRunner -> ArrowPythonRunner?

arrs = [pa.Array.from_pandas(cast_series(s, t), mask=s.isnull(), type=t) for s, t in series]
return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in xrange(len(arrs))])


class ArrowPandasSerializer(ArrowSerializer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep ArrowPandasSerializer? I don't see we use it other than in pandas udf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll remove it.

batch.setNumRows(root.getRowCount)
batch
} else {
read()
Copy link
Member

@viirya viirya Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is loadNextBatch a blocking action and returning false only no batch anymore? But looks like we call read() again if no batch is loaded, so loadNextBatch is an async action and can return false if the batch is not ready? If it takes too long for the batch to be ready, can recursive read be an issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think whether recursive read may cause StackOverflowException.
Can we implement this as a loop? Or, can we ensure it does not cause StackOverflowException. exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe loadNextBatch is a blocking action. Here's a single-line comment from a source code of the method:

Returns true if a batch was read, false on EOS

cc @BryanCutler Could you confirm this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it may not incurring StackOverflowException as batchLoaded is false now and we won't enter the if at 153.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk I might miss something, but I don't think StackOverflowException happens because of the protocol to communicate with Python worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin Do you mind to add a comment like:

} else {
  // Reach end of stream. Call `read()` again to read control data.
  read()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Sure, I'll add the comment. Thanks!

table = pa.Table.from_batches([batch])
yield [c.to_pandas() for c in table.itercolumns()]

def dump_stream(self, iterator, stream):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add few comments for dump_stream and load_stream like ArrowPandasSerializer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add comments.

context: TaskContext): WriterThread = {
new WriterThread(env, worker, inputIterator, partitionIndex, context) {

override def writeCommand(dataOut: DataOutputStream): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this implementation is no different than the writeCommand in ArrowPythonRunner? If so, I think we don't need to duplicate this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll try to avoid duplicate.

root.close()
allocator.close()
closed = true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to write out END_OF_DATA_SECTION after all data are written out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. ArrowStreamPandasSerializer is not a FramedSerializer.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good to me. Other few pending comments from me were a subset of @viirya's basically.

}

def writeCommand(dataOut: DataOutputStream): Unit
def writeIteratorToStream(dataOut: DataOutputStream): Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave few comments for methods that should be implemented here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add comments.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82214 has finished for PR 19349 at commit 7f6e43f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Sep 27, 2017

LGTM

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82221 has finished for PR 19349 at commit aa3fa70.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

LGTM too. Should be good to go.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82223 has finished for PR 19349 at commit 416bd10.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82231 has finished for PR 19349 at commit 7cd78b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 09cbf3d Sep 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants