Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyspark binding issue #435

Closed
grusin opened this issue Feb 11, 2020 · 29 comments
Closed

pyspark binding issue #435

grusin opened this issue Feb 11, 2020 · 29 comments

Comments

@grusin
Copy link

grusin commented Feb 11, 2020

Hey,

I have problems binding to functions schema_of_xml, schema_of_xml_array, from_xml_string in https://github.com/databricks/spark-xml/blob/ef3af6aa5b29763dbfe72cb23c7755d2bfe4d5a7/src/main/scala/com/databricks/spark/xml/package.scala from pyspark (yes, python). The py4j wrapper library complains about the package wrapping the functions directly.

I do not have such problem with function from_xml inside of the object https://github.com/databricks/spark-xml/blob/ef3af6aa5b29763dbfe72cb23c7755d2bfe4d5a7/src/main/scala/com/databricks/spark/xml/functions.scala. It is understood by by py4j without a problem, and can be used with just providing correct py4j wrapper code around it.

I would like to ask to make wrappers for functions from the first class, into the functions (from the 2nd link), so that they could be used without too much hassle in pyspark.

Thx
G

@srowen
Copy link
Collaborator

srowen commented Feb 11, 2020

It's a fine request, for sure. These functions aren't usable in Python without adding some manual wrapping. The reason we hadn't before is that there weren't functions like this before, and there's some overhead and bother in deploying a bit of Python code just to wrap a few functions. Still it may become important (I just don't know what's involved - others may find it easy)

@HyukjinKwon do you know why the package bit may make the usual workaround hard here? I wouldn't have thought it would matter much.

@grusin how are you trying to wrap these? maybe we can spot a small change that makes it work.

@HyukjinKwon
Copy link
Member

The problem with the Scala package is that it becomes a Java class with such $ characters in the name, and this matters when we access in Py4J.

For example, this is how to access org.apache.spark.SPARK_VERSION in PySpark. SPARK_VERSION is under spark package.

>>> getattr(getattr(sc._jvm.org.apache.spark, "package$"), "MODULE$").SPARK_VERSION()
'3.0.0-SNAPSHOT'

@grusin, are you able to work around via #430 (comment)?

@grusin
Copy link
Author

grusin commented Feb 12, 2020

Thanks, that rather less than obvious getattr function chain solved the problem ;-) I can access all needed functions now.

Having resolve that... I still seem to have another problem, now with converting pyspark dataframe with 'body' column containing the xml string into the scala's Dataset[String], which is required to call schema_of_xml. The example from #430 does not work, as I get cast exceptions saying that GenericRowWithSchema cannot be converted to string.

I dont have code at hand, I will post code tomorrow showing the issue and two less then perfect solutions:

  • i was able to execute schema_of_xml, but then i had to build array of xml messages and pass it along, which could easily overload the driver.
  • patch the scala code to do conversion for me, which is more elegant, but yet requires code modification :)

Once again, thx for help! :)

@srowen
Copy link
Collaborator

srowen commented Feb 12, 2020

Ah I see; we could add a signature with RDD[String] or DataFrame (of one string column); would one of those work better?

Would you be willing to share how you got it to work otherwise? might be the template for a small python package later.

@grusin
Copy link
Author

grusin commented Feb 13, 2020

Current code (no changes required to spark-xml)

wrapper code of from_xml function, it works flawlessly, implementation is pretty much like the one from #430

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import Row, _create_row, _parse_datatype_json_string
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

def ext_from_xml(xmlColumn, schema, options={}):
  java_column = _to_java_column(xmlColumn.cast('string'))
  java_schema = spark._jsparkSession.parseDataType(schema.json())
  scala_map = sc._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
  
  jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(java_column, java_schema, scala_map)
  
  return Column(jc)

this is the wrapper code schema_of_xml, it works, but it's far from perfect, as it needs to pull the data into driver to create the array of strings, and only then, pass it over to schema_of_xml scala function. I am not sure how to make this work without this uglyness and not making code changes on scala side:

def ext_schema_of_xml_slow(df, options={}):
  scala_options = spark._jvm.PythonUtils.toScalaMap(options)
  enc = spark._jvm.Encoders.STRING()
    
  jdataset = spark._wrapped._ssql_ctx.createDataset(
    df.rdd.map(lambda x: x[0]).collect(), #this is UGLY, as it collects data and serializes that... so it's pointless
    enc
  ) 
  java_schema = getattr(getattr(spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$").schema_of_xml(jdataset, scala_options)
    
  return _parse_datatype_json_string(java_schema.json())  

test function (tested on spark 2.4.4 from databricks 6.2 cluster, having the 0.8-spark-xml.jar loaded

avroFiles = [ '/mnt/path/to/data/1/2020/01/08/20/00/38.avro' ]

avroDf = spark.read.format("avro") \
  .load(avroFiles) 
 
schema = ext_schema_of_xml_slow(avroDf.select(col('Body').cast('string')))

nestedDf = avroDf.withColumn('Payload', ext_from_xml(col('Body'), schema))
display(nestedDf)

Better Code (requires changes to spark-xml)

Better version of schema_of_xml, it requires this function to be added to spark-xml library, to handle the casting of dataframe into a string. I bet this is also not the most elegant code (Scala is still emerging skill for me), but it does the trick ;-)

  def schema_of_xml_df(df: DataFrame,
                       xmlColumn: Column,
                       options: Map[String, String]): StructType = {
    val ds = df.select(xmlColumn.cast("string")).as[String](Encoders.STRING);
    schema_of_xml(ds, options);
  }

and wrapper code from pyspark that uses it. with the nice wrapper on scala side, the pyspark code is keeping logic to minimum, and it makes wrapper very elegant.

def ext_schema_of_xml_df(df, xmlColumn, options={}):
  java_df = df._jdf
  java_column = _to_java_column(xmlColumn)
  scala_map = sc._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
  
  java_schema = spark._jvm.com.databricks.spark.xml.functions.schema_of_xml_df(java_df, java_column, scala_map)

  return _parse_datatype_json_string(java_schema.json()) .

@HyukjinKwon
Copy link
Member

Thanks for sharing full codes. I see so the problem is here:

def ext_schema_of_xml_slow(df, options={}):
  scala_options = spark._jvm.PythonUtils.toScalaMap(options)
  enc = spark._jvm.Encoders.STRING()
    
  jdataset = spark._wrapped._ssql_ctx.createDataset(
    df.rdd.map(lambda x: x[0]).collect(), #this is UGLY, as it collects data and serializes that... so it's pointless
    enc
  ) 
  java_schema = getattr(getattr(spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$").schema_of_xml(jdataset, scala_options)
    
  return _parse_datatype_json_string(java_schema.json())  

I think you're almost there. Can you try this version?

def ext_schema_of_xml(df, options={}):
    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    enc = spark._jvm.Encoders.STRING()
    assert len(df.columns) == 1
    jdataset = getattr(df._jdf, "as")(spark._jvm.Encoders.STRING())
    java_schema = getattr(
        getattr(spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$").schema_of_xml(jdataset, scala_options)
    
  return _parse_datatype_json_string(java_schema.json())  

@HyukjinKwon
Copy link
Member

This is a messy workaround but hope at least it works :-).

@grusin
Copy link
Author

grusin commented Feb 13, 2020

it works flawesly!!! thank you!!!

@grusin
Copy link
Author

grusin commented Feb 13, 2020

One note: it only works on standard spark, and or standard data bricks clusters. The code above fails (luckily only the infer schema code) on high concurrency cluster, so maybe... after all a wrapper on Scala side is needed in a long run.

schema = ext_schema_of_xml(avroDf.select(col('Body').cast('string')))

py4j.security.Py4JSecurityException: Method public static org.apache.spark.sql.Encoder org.apache.spark.sql.Encoders.STRING() is not whitelisted on class class org.apache.spark.sql.Encoders

@srowen
Copy link
Collaborator

srowen commented Feb 13, 2020

I can add the extra scala signature; that part is easy.
(Yes what you're referring to above is just because third-party packages are disabled in the JVM in such a cluster, but that is Databricks-specific)

@grusin
Copy link
Author

grusin commented Feb 13, 2020

that would be awesome to have. I would be happy to test once it's added :)

@srowen
Copy link
Collaborator

srowen commented Feb 13, 2020

See #438 - what about accepting just a DataFrame that must have a single string column?

@grusin
Copy link
Author

grusin commented Feb 14, 2020

Hi, thanks for the changes, I will not be able to test them today due to workload. I will come back to you by monday EOD CET.

@grusin
Copy link
Author

grusin commented Feb 17, 2020

I can confirm, the code from master works! The pyspark glue code is now limited to usual wrappers:

def ext_schema_of_xml_df(df, options={}):
  assert len(df.columns) == 1
  
  scala_options = spark._jvm.PythonUtils.toScalaMap(options)
  java_schema = getattr(getattr(spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$").schema_of_xml_df(df._jdf, scala_options)
    
  return _parse_datatype_json_string(java_schema.json())

@grusin
Copy link
Author

grusin commented Feb 17, 2020

should we close the issue?

@srowen
Copy link
Collaborator

srowen commented Feb 17, 2020

Let's leave it open for others to find more readily, and as a placeholder for considering adding pyspark bindings.

@grusin
Copy link
Author

grusin commented Feb 26, 2020

Is there an ETA for the release of 0.9.0? I am really keen on having this new scala helper code in official release :)

@srowen
Copy link
Collaborator

srowen commented Feb 26, 2020

I hadn't planned on a release soon, but it's easy to do. Maybe I can document the content of this thread and cut a minor release.

@srowen
Copy link
Collaborator

srowen commented Mar 2, 2020

@grusin I made the 0.9.0 release just now

@3mlabs
Copy link

3mlabs commented May 15, 2020

We have been trying to typecast the XML string. It seems you guys have successfully used the above function. we have tried below steps -

Create dataframe and schema -
data = '48673 2014-09-302014-09-30T06:20:14USD'

rdd1 = spark.sparkContext.parallelize([(123,data)])
df = rdd1.toDF(['id','value'])

schema = eval('StructType([StructField("BusinessDayDate",StringType(),True),StructField("CurrencyCode",StringType(),True),StructField("EndDateTime",StringType(),True),StructField("RetailStoreID",LongType(),True),StructField("SequenceNumber",LongType(),True),StructField("WorkstationID",LongType(),True)])')

from_xml function -
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.types import *

def from_xml(col, schema, options={}):
scala_datatype = spark._jsparkSession.parseDataType(schema.json())
scala_options = sc._jvm.PythonUtils.toScalaMap(options)
jc = sc._jvm.com.databricks.spark.xml.functions.from_xml(col._jc if isinstance(col, Column) else col, scala_datatype, scala_options)
return Column(jc)

finally applied from_xml to dataframe -
new_df = df.select(ext_from_xml(df['value'], schema, {"rowTag":"Transaction"}))

The code gives a blank dataframe like below -

new_df.show()
+------------------------+
|xmldatatocatalyst(value)|
+------------------------+
| [,,,,,]|
+------------------------+

We have been stuck here for quite some time now. Appreciate if you guys can help us in any way.

Thanks,
3m Labs

@HyukjinKwon
Copy link
Member

It would have been easier to read if you use code blocks via ```...```. Seems like you're input isn't XML but a string "48673 2014-09-302014-09-30T06:20:14USD".

@3mlabs
Copy link

3mlabs commented May 16, 2020

Hi HyukjinKwon,

Thank you for you response.

We are trying to cast string xml dataframe column to schema definition data frame column. Current scenario is we are receiving XML string from user from kafka/kinesis streams and we are consuming it via Spark Streaming. After consumption step we receive dataframe column which contains XML string named 'value' (in StringType() Format).

We are looking for spark function which can convert XML string to type casted XML column. This is the reason we created the sample XML data frame and tried to execute via from_xml function.

We are not sure what we are doing wrong or is there any other way to parse XML string to Schema XML format. Appreciate if you guys can help us to find if there are any other better ways to execute this retirement.

Thanks,
3M Labs.

@srowen
Copy link
Collaborator

srowen commented May 16, 2020

You aren't parsing XML here, that seems to be the fundamental problem. There is none in your example.

@3mlabs
Copy link

3mlabs commented May 16, 2020

You aren't parsing XML here, that seems to be the fundamental problem. There is none in your example.

Hi Srowen,

Sorry we just missed to add the XML. please find attached code snippet with this reply (sample_xml_parser.txt).

The issue is we are not getting any output once we apply the code (in new_df dataframe).

Thanks,
VM

@srowen
Copy link
Collaborator

srowen commented May 16, 2020

I think the problem is that you are passing rowTag = Transaction, but your XML has an extra enclosing element.

@3mlabs
Copy link

3mlabs commented May 16, 2020

I think the problem is that you are passing rowTag = Transaction, but your XML has an extra enclosing element.

Thank you for your suggestion. Based on your input i modified the code and added the additional parameter However it still not returning actual XML values. Please review the attached code.
sample_xml_parser.txt

@grusin
Copy link
Author

grusin commented May 16, 2020 via email

@3mlabs
Copy link

3mlabs commented May 16, 2020

Root tag is wrong. You have it on poslog but the structfield is having fields from transaction. Either fix that or... in my opinion to get the best behavior do not use root tag and have proper nesting in struct field.

On Sat, 16 May 2020, 21:42 3mlabs, @.***> wrote: I think the problem is that you are passing rowTag = Transaction, but your XML has an extra enclosing element. Thank you for your suggestion. Based on your input i modified the code and added the additional parameter However it still not returning actual XML values. Please review the attached code. sample_xml_parser.txt https://github.com/databricks/spark-xml/files/4639120/sample_xml_parser.txt — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub <#435 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACUVBQEGZTPQYX4GVAS3XUTRR3UCVANCNFSM4KTF7FXA .

Hi Grusin,

Thank you for your suggestion.

As per your inputs i tried to load the XML file to dataframe using spark.read -

df = spark.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").load("")

above code gives me below schema -

df.schema
StructType(List(StructField(BusinessDayDate,StringType,true),StructField(CurrencyCode,StringType,true),StructField(EndDateTime,StringType,true),StructField(RetailStoreID,LongType,true),StructField(SequenceNumber,LongType,true),StructField(WorkstationID,LongType,true)))

This schema does not change even if i add rootTag -

df = spark.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").option("rootTag","POSLog").load("")

above code gives me below schema -

df.schema
StructType(List(StructField(BusinessDayDate,StringType,true),StructField(CurrencyCode,StringType,true),StructField(EndDateTime,StringType,true),StructField(RetailStoreID,LongType,true),StructField(SequenceNumber,LongType,true),StructField(WorkstationID,LongType,true)))

Both the code snippet gives me same output -

df.show()
+---------------+------------+-------------------+-------------+--------------+-------------+
|BusinessDayDate|CurrencyCode| EndDateTime|RetailStoreID|SequenceNumber|WorkstationID|
+---------------+------------+-------------------+-------------+--------------+-------------+
| 2014-09-30| USD|2014-09-30T06:20:14| 48| 73| 6|
+---------------+------------+-------------------+-------------+--------------+-------------+

We created our schema on the basis of schema format. Please let us know if we are missing any step.

Thanks,
3m Labs

@grusin
Copy link
Author

grusin commented May 16, 2020 via email

@srowen srowen closed this as completed Oct 21, 2020
beluisterql added a commit to beluisterql/spark-xml that referenced this issue Aug 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants