Skip to content

Commit

Permalink
Add note about how to wrap functions from Pyspark (#440)
Browse files Browse the repository at this point in the history
See #435
  • Loading branch information
srowen authored Mar 2, 2020
1 parent 3a7701a commit bb4b9ff
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
```

- This can converts arrays of strings containing XML to arrays of parsed structs. Use `schema_of_xml_array` instead
- This can convert arrays of strings containing XML to arrays of parsed structs. Use `schema_of_xml_array` instead
- `com.databricks.spark.xml.from_xml_string` is an alternative that operates on a String directly instead of a column,
for use in UDFs
- If you use `DROPMALFORMED` mode with `from_xml`, then XML values that do not parse correctly will result in a
Expand All @@ -116,6 +116,34 @@ val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
instead default to `DROPMALFORMED`.
If however you include a column in the schema for `from_xml` that matches the `columnNameOfCorruptRecord`, then
`PERMISSIVE` mode will still output malformed records to that column in the resulting struct.

#### Pyspark notes

The functions above are exposed in the Scala API only, at the moment, as there is no separate Python package
for `spark-xml`. They can be accessed from Pyspark by manually declaring some helper functions that call
into the JVM-based API from Python. Example:

```python
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)

def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1

scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
```

## Structure Conversion

Expand Down

0 comments on commit bb4b9ff

Please sign in to comment.