Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python #18444

Closed
wants to merge 47 commits into from

Conversation

zasdfgbnm
Copy link
Contributor

What changes were proposed in this pull request?

This is the reopen of #14198, with merge conflicts resolved.

@ueshin Could you please take a look at my code?

Fix bugs about types that result an array of null when creating DataFrame using python.

Python's array.array have richer type than python itself, e.g. we can have array('f',[1,2,3]) and array('d',[1,2,3]). Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have array('f') in your rows.

A simple code to reproduce this bug is:

from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,DataFrame
from array import array

sc = SparkContext()
sqlContext = SQLContext(sc)

row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))
rows = sc.parallelize([ row1 ])
df = sqlContext.createDataFrame(rows)
df.show()

which have output

+---------------+------------------+
|    doublearray|        floatarray|
+---------------+------------------+
|[1.0, 2.0, 3.0]|[null, null, null]|
+---------------+------------------+

How was this patch tested?

New test case added

zasdfgbnm added 11 commits July 8, 2016 20:58
Python's array has more type than python it self, for example
python only has float while array support 'f' (float) and 'd' (double)
Switching to array.typecode helps spark make a better inference

For example, for the code:

from pyspark.sql.types import _infer_type
from array import array
a = array('f',[1,2,3,4,5,6])
_infer_type(a)

We will get ArrayType(DoubleType,true) before change,
but ArrayType(FloatType,true) after change
Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for working on this!
I left some comments for now.

for t in int_types:
# test positive numbers
a = array.array(t, [1])
while True:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this loop for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me figure that out. This patch was more than a year ago and I forget what was happening.

for v in obj:
if v is not None:
return ArrayType(_infer_type(obj[0]), True)
else:
return ArrayType(NullType(), True)
elif isinstance(obj, array):
if obj.typecode in _array_type_mappings:
return ArrayType(_array_type_mappings[obj.typecode](), True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we can change the element type. Doesn't this break backward-compatibility? cc @HyukjinKwon

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cc'ing me. If we say ignoring specified type in array, e.g., float being double is a bug, I think this is the same instance of the Pandas type related PR we helped review before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Could you send me the link to "the Pandas type related PR we helped review before"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant 67c7502. I think it is okay.

@ueshin
Copy link
Member

ueshin commented Jun 28, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Jun 28, 2017

Test build #78775 has finished for PR 18444 at commit 58b120c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 28, 2017

While I am here, I just ran some tests as below:

from array import array

from pyspark.sql import Row


spark.createDataFrame([Row(floatarray=array('f',[1, 2, 3]))]).show()
spark.createDataFrame([Row(unicodearray=array('u',[u"a", u"b"]))]).show()

Before

>>> spark.createDataFrame([Row(floatarray=array('f',[1,2,3]))]).show()
+------------------+
|        floatarray|
+------------------+
|[null, null, null]|
+------------------+
>>> spark.createDataFrame([Row(unicodearray=array('u',[u"a", u"b"]))]).show()
+------------+
|unicodearray|
+------------+
|      [a, b]|
+------------+

After

>>> spark.createDataFrame([Row(floatarray=array('f',[1, 2, 3]))]).show()
+---------------+
|     floatarray|
+---------------+
|[1.0, 2.0, 3.0]|
+---------------+
>>> spark.createDataFrame([Row(unicodearray=array('u',[u"a", u"b"]))]).show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/session.py", line 537, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File ".../spark/python/pyspark/sql/session.py", line 401, in _createFromLocal
    struct = self._inferSchemaFromList(data)
  File "...spark/python/pyspark/sql/session.py", line 333, in _inferSchemaFromList
    schema = reduce(_merge_type, map(_infer_schema, data))
  File ".../spark/python/pyspark/sql/types.py", line 1009, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File ".../spark/python/pyspark/sql/types.py", line 981, in _infer_type
    raise TypeError("not supported type: array(%s)" % obj.typecode)
TypeError: not supported type: array(u)

I think we should not drop this support. I guess the same thing would happen to c too.

# array types depend on C implementation on the machine. Therefore there
# is no machine independent correspondence between python's array types
# and Scala types.
# See: https://docs.python.org/2/library/array.html
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin The answer to your question is explained here, a couple lines of comments I just added.

@zasdfgbnm
Copy link
Contributor Author

@HyukjinKwon Thanks for figuring that out. I will fix those issues.

@SparkQA
Copy link

SparkQA commented Jun 28, 2017

Test build #78802 has finished for PR 18444 at commit b91dd55.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zasdfgbnm
Copy link
Contributor Author

@HyukjinKwon It is strange that the float array does not work and pyspark unit tests fails. This patch was created more than a year ago and and it worked well at that time. Maybe some changes in spark's internal things break this patch. I didn't follow spark's internal changes so I have no idea on how to fix this right now. Please allow me some time to figure things out and if you have an idea on what is the problem, please let me know.

@ueshin
Copy link
Member

ueshin commented Jun 28, 2017

Btw, seems like array module in Python 2 doesn't have array.typecodes attribute.

Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/sql/tests.py", line 2266, in test_array_types
    unsupported_types = set(array.typecodes) - int_types - float_types
AttributeError: 'module' object has no attribute 'typecodes'

@ueshin
Copy link
Member

ueshin commented Jun 29, 2017

In my local environment, this patch for floatarray looks working well:

df = spark.createDataFrame([Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3]))])
df.printSchema()
df.show()
root
 |-- doublearray: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- floatarray: array (nullable = true)
 |    |-- element: float (containsNull = true)

+---------------+---------------+
|    doublearray|     floatarray|
+---------------+---------------+
|[1.0, 2.0, 3.0]|[1.0, 2.0, 3.0]|
+---------------+---------------+

@HyukjinKwon Could you check this again please?
I guess we need to compile EvaluatePython.scala file.

@HyukjinKwon
Copy link
Member

Ahhh.. there was a Scala change here. I checked this again and you are right @ueshin. I will read and make a clean build carefully next time ... I also updated #18444 (comment).

@HyukjinKwon
Copy link
Member

@zasdfgbnm, I am sorry for a false alarm.

@SparkQA
Copy link

SparkQA commented Jul 1, 2017

Test build #79006 has finished for PR 18444 at commit bd8e111.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2017

Test build #79010 has finished for PR 18444 at commit 1b1c419.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 18, 2017

Test build #79705 has finished for PR 18444 at commit cafa5fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. For me LGTM except for the comments.

if obj.typecode in _array_type_mappings:
return ArrayType(_array_type_mappings[obj.typecode](), False)
else:
raise TypeError("not supported type: array(%s)" % obj.typecode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about failling back to type inference in this case?

Copy link
Contributor Author

@zasdfgbnm zasdfgbnm Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to do so? I don't think array with unsupported typecode will be correctly serialized or deserialized(if it will, why not add it to supported list?). In this case, it would be better to raise an TypeError and let the user to pick another type.

Copy link
Member

@HyukjinKwon HyukjinKwon Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. To double check, it covers all the cases we supported before in Spark? If it can be for sure, I am fine with as is.

I was trying to leave a sign-off for this reason - fixing a case we never be reachable before ('c' type in a specific Python version) and a bug (assigning the correct type for array.array), and all other cases work as was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fall back to _infer_type here, then array('q') will be inferred as something like array of Long, and then the user will see an error from net.razorvine.pickle saying that bad array typecode q, due to SPARK-21420, which seems to me more confusing than explicitly fails here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this worth a double check. Let me do some test on old spark.

Copy link
Contributor Author

@zasdfgbnm zasdfgbnm Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about doing the following?

  1. Open a new JIRA, explaining the issue about 'L' support.
  2. Make changes to tests as you suggest, make sure we clearly document why for python2 'L' is an exception as comments, ref the new JIRA issue. (and say that this exception should be removed and a better support for large integers should be implemented)
  3. Finish this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me do this. If it is possible, I would like to finish this PR before I travel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, too. Let's discuss it in the next pr.

'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21
)
} else {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
Map('B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last question to double check. So, 'c' did not ever work in a specific Python version but this PR fixes it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c was supported by spark 2.2.0:

>>> row = Row(myarray=array('c', ["a"]))
>>> df = spark.createDataFrame([row])
>>> df.first()["myarray"][0]
u'a'

This support I think was because array was infered the same way as list. But after we make changes on the type infer of array, we have to change this accordingly to bring it back to work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yea. I meant to say 'c' -> 1 was not ever reachable for sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@zasdfgbnm
Copy link
Contributor Author

By the way, I'm traveling tomorrow and will be back next Tuesday. During traveling, I may not be able to answer any comments, questions, etc.

@HyukjinKwon
Copy link
Member

Sure, take your time @zasdfgbnm. Thanks for your quick response.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 19, 2017

LGTM if we have a proper conclusion in #18444 (comment).

@zasdfgbnm
Copy link
Contributor Author

@HyukjinKwon Take a look at my newest commit. I think I find a better way to solve the problem that keeps all the hacking code for SPARK-21465 in a single place, making it easier to be removed in the future. What do you think?

@HyukjinKwon
Copy link
Member

Okay. LGTM.

@ueshin
Copy link
Member

ueshin commented Jul 19, 2017

LGTM, too. pending Jenkins.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79736 has finished for PR 18444 at commit 88091ea.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zasdfgbnm
Copy link
Contributor Author

Looks to be an unrelated error.

Sent from my OnePlus ONEPLUS A3000 using FastHub

@HyukjinKwon
Copy link
Member

Sounds actually related:


======================================================================
ERROR: test_array_types (pyspark.sql.tests.SQLTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests.py", line 2402, in test_array_types
    assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests.py", line 2352, in assertCollectSuccess
    self.assertEqual(df.first()["myarray"][0], value)
  File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/dataframe.py", line 994, in first
...

Py4JJavaError: An error occurred while calling o1194.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 101.0 failed 1
 times, most recent failure: Lost task 2.0 in stage 101.0 (TID 429, localhost, executor driver): 
net.razorvine.pickle.PickleException: unsupported datatype: 64-bits unsigned long
	at net.razorvine.pickle.objects.ArrayConstructor.constructLongArrayFromUInt64(ArrayConstructor.java:302)
...

@ueshin
Copy link
Member

ueshin commented Jul 19, 2017

Yea, it's related to typecode 'L' in pypy environment.
We might need to handle it as the same as we did for typecode 'c' or override constructLongArrayFromUInt64().

@ueshin
Copy link
Member

ueshin commented Jul 19, 2017

Btw, now I'm wondering Python 3 can handle array like array('L', [9223372036854775807]).

@HyukjinKwon
Copy link
Member

I just tested this with Python 3.6 to help:

from pyspark.sql import Row
import array


row = Row(myarray=array.array('L', [9223372036854775807]))
df = spark.createDataFrame([row])
df.show()

Master:

...
net.razorvine.pickle.PickleException: unsupported datatype: 64-bits unsigned long
...

After this PR:

...
TypeError: not supported type: array(L)
...

He tested other cases as well - https://github.com/apache/spark/pull/18444/files#r128131853.


# compute array typecode mappings for unsigned integer types
for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys():
# JVM does not have unsigned types, so use signed types that is at list 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo at list.

@ueshin
Copy link
Member

ueshin commented Jul 19, 2017

@HyukjinKwon Oh, I was missing his test for 'L'. Thanks for testing!

@zasdfgbnm
Copy link
Contributor Author

Hmm, how was 'L' supported in master python 2? Why there was no such an error there?

Sent from my OnePlus ONEPLUS A3000 using FastHub

@zasdfgbnm
Copy link
Contributor Author

@ueshin Maybe we don't have to do the same thing. If this is the problem, then I think in pypy environment, 'L' was originally unsupported and there is no need to support it now.

Sent from my OnePlus ONEPLUS A3000 using FastHub

@zasdfgbnm
Copy link
Contributor Author

We just change the dirty hacking to something like:

if python2.7 and not pypy:
     do the dirty hacking
     fool the unit test

Sent from my OnePlus ONEPLUS A3000 using FastHub


def _int_size_to_type(size):
"""
Return the Scala type from the size of integers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Scala type -> Catalyst datatype.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79758 has finished for PR 18444 at commit cdc7257.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zasdfgbnm
Copy link
Contributor Author

fixed

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79774 has finished for PR 18444 at commit a340745.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jul 20, 2017

Thanks! merging to master.

@asfgit asfgit closed this in b7a40f6 Jul 20, 2017
@zasdfgbnm zasdfgbnm deleted the fix_array_infer branch July 20, 2017 12:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants