Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22112][PYSPARK] Supports RDD of strings as input in spark.read.csv in PySpark #19339

Closed
wants to merge 10 commits into from

Conversation

goldmedal
Copy link
Contributor

What changes were proposed in this pull request?

We added a method to the scala API for creating a DataFrame from DataSet[String] storing CSV in SPARK-15463 but PySpark doesn't have Dataset to support this feature. Therfore, I add an API to create a DataFrame from RDD[String] storing csv and it's also consistent with PySpark's spark.read.json.

For example as below

>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]

How was this patch tested?

add unit test cases.

@goldmedal
Copy link
Contributor Author

@HyukjinKwon @viirya Could you review this PR? Thanks! :)

* @since 2.2.0
*/
@deprecated("Use csv(Dataset[String]) instead.", "2.2.0")
def csv(csvRDD: RDD[String]): DataFrame = {
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait ... I think we shouldn't introduce an RDD API in Scala side. I was thinking doing this within Python-side, or maybe adding a private wrapper in Scala side if required .. Will take a closer look tomorrow (KST).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reviewing :)
umm.. I followed spark.read.json's way to add them. Although json(jsonRDD :RDD[String] has been deprecated, PySpark still use it to create a DataFrame. I think adding a private wrapper in Scala maybe better because not only PySpark but SparkR maybe need it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. +1 for @HyukjinKwon's advice. We cannot add a deprecated method which doesn't exist in 2.2.0 at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...It's weird to add a deprecated method. :) We either add a special wrapper for this purpose or doing this in python-side if possible and not complicated.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 25, 2017

Test build #82148 has finished for PR 19339 at commit 7525b48.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

yield x
keyed = path.mapPartitions(func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a way within Python and this seems working:

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1ed452d895b..0f54065b3ee 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -438,7 +438,10 @@ class DataFrameReader(OptionUtils):
             keyed = path.mapPartitions(func)
             keyed._bypass_serializer = True
             jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
-            return self._df(self._jreader.csv(jrdd))
+            jdataset = self._spark._jsqlContext.createDataset(
+                jrdd.rdd(),
+                self._spark._sc._jvm.Encoders.STRING())
+            return self._df(self._jreader.csv(jdataset))
         else:
             raise TypeError("path can be only string, list or RDD")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goldmedal, it'd be great if you could double check whether this really works and it can be shorten or cleaner. This was just my rough try only to reach the goal so I am not sure if it is the best way.

Copy link
Contributor Author

@goldmedal goldmedal Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, This way looks good. I'll try it. Thanks for your suggestion.

@goldmedal
Copy link
Contributor Author

@HyukjinKwon I think your way works fine after fixing a variable name bug (_jsqlContext >> _jssql_ctx). Should we need to modify the json part to be consistent with the csv part?

@viirya
Copy link
Member

viirya commented Sep 26, 2017

As it relies on a deprecated API, I think it is also good to replace pyspark json to use Dataset. But I think it is better in another PR.

@HyukjinKwon
Copy link
Member

Yea, let's do it separately.

@goldmedal
Copy link
Contributor Author

ok, so maybe I create another JIRA for this issue?

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82195 has finished for PR 19339 at commit 4040103.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Hm to me .. I'd actually leave it for now. I am less sure if we should fix it now as we could sweep it out when we remove the deprecated ones later together and, for the current status, it actually does not cause any problem for now, e.g., build warning, if I understood correctly. I won't stay against but I think I don't support. Let's go ahead with this one first.

@goldmedal
Copy link
Contributor Author

This is so weird. I run it fine using Python 3.5.2 but it seems to have some problem using Python 3.4. Let me try Python 3.4 in my local.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

keyed = path.mapPartitions(func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
jdataset = self._spark._ssql_ctx.createDataset(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a small comment here to explain why we should create the dataset (which could look a bit weird in PySpark I believe).

jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
jdataset = self._spark._ssql_ctx.createDataset(
jrdd.rdd(),
self._spark._sc._jvm.Encoders.STRING())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace _spark._sc._jvm to _spark._jvm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's work. I'll modify it.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82198 has finished for PR 19339 at commit 4040103.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
# [SPARK-22112]
# There aren't any jvm api for creating a dataframe from rdd storing csv.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just personal preference: SPARK-22112: ... or see SPARK-22112 if you wouldn't mind ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let me fix it. thanks :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the usual style.

@@ -336,6 +336,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``inferSchema`` option or specify the schema explicitly using ``schema``.

:param path: string, or list of strings, for input path(s).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: . -> ,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks :)

@viirya
Copy link
Member

viirya commented Sep 26, 2017

LGTM

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82201 has finished for PR 19339 at commit f542967.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82202 has finished for PR 19339 at commit 5988336.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 26, 2017

Test build #82203 has finished for PR 19339 at commit 032b0c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@goldmedal
Copy link
Contributor Author

goldmedal commented Sep 26, 2017

umm.. I test it always fine using Python 3.4 in my local. I'm not sure why did it test fail with Jenkins sometime... :(

@HyukjinKwon
Copy link
Member

In a quick look, both tests failures:

  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/readwriter.py", line 303, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

Sounds not related with the current PR (my rough assumption is, it's, IMHO, instability of Py4J).

@HyukjinKwon
Copy link
Member

@goldmedal, are you online now? how about fixing the PR title to say something like .. "Supports RDD of strings as input in spark.read.csv in PySpark"?

@goldmedal goldmedal changed the title [SPARK-22112][PYSPARK] Add an API to create a DataFrame from RDD[String] storing CSV [SPARK-22112][PYSPARK] Supports RDD of strings as input in spark.read.csv in PySpark Sep 27, 2017
@goldmedal
Copy link
Contributor Author

@HyukjinKwon I has updated this title. Thanks !

@HyukjinKwon
Copy link
Member

Thanks @goldmedal.

@HyukjinKwon
Copy link
Member

Merged to master.

@viirya
Copy link
Member

viirya commented Sep 27, 2017

I've tested few times locally. Can't have the same failure.

@asfgit asfgit closed this in 1fdfe69 Sep 27, 2017
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
# see SPARK-22112
# There aren't any jvm api for creating a dataframe from rdd storing csv.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix these comments like,

SPARK-22112: There aren't any jvm api for creating a dataframe from rdd storing csv.
...

or

There aren't any jvm api ...
...
for creating a dataframe from dataset storing csv. See SPARK-22112.

when we happened to fix some code around here or review other PRs fixing some codes around here in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks

@goldmedal
Copy link
Contributor Author

@HyukjinKwon @viirya Thanks for your reviewing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants