Skip to content

Commit

Permalink
[SPARK-18274][ML][PYSPARK] Memory leak in PySpark JavaWrapper
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
In`JavaWrapper `'s destructor make Java Gateway dereference object in destructor, using `SparkContext._active_spark_context._gateway.detach`
Fixing the copying parameter bug, by moving the `copy` method from `JavaModel` to `JavaParams`

## How was this patch tested?
```scala
import random, string
from pyspark.ml.feature import StringIndexer

l = [(''.join(random.choice(string.ascii_uppercase) for _ in range(10)), ) for _ in range(int(7e5))]  # 700000 random strings of 10 characters
df = spark.createDataFrame(l, ['string'])

for i in range(50):
    indexer = StringIndexer(inputCol='string', outputCol='index')
    indexer.fit(df)
```
* Before: would keep StringIndexer strong reference, causing GC issues and is halted midway
After: garbage collection works as the object is dereferenced, and computation completes
* Mem footprint tested using profiler
* Added a parameter copy related test which was failing before.

Author: Sandeep Singh <sandeep@techaddict.me>
Author: jkbradley <joseph.kurata.bradley@gmail.com>

Closes apache#15843 from techaddict/SPARK-18274.
  • Loading branch information
techaddict authored and uzadude committed Jan 27, 2017
1 parent d949256 commit 0fd8014
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
18 changes: 18 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,24 @@ def test_word2vec_param(self):
self.assertEqual(model.getWindowSize(), 6)


class EvaluatorTests(SparkSessionTestCase):

def test_java_params(self):
"""
This tests a bug fixed by SPARK-18274 which causes multiple copies
of a Params instance in Python to be linked to the same Java instance.
"""
evaluator = RegressionEvaluator(metricName="r2")
df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)])
evaluator.evaluate(df)
self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"})
evaluator.evaluate(df)
evaluatorCopy.evaluate(df)
self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae")


class FeatureTests(SparkSessionTestCase):

def test_binarizer(self):
Expand Down
41 changes: 23 additions & 18 deletions python/pyspark/ml/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class JavaParams(JavaWrapper, Params):

__metaclass__ = ABCMeta

def __del__(self):
if SparkContext._active_spark_context:
SparkContext._active_spark_context._gateway.detach(self._java_obj)

def _make_java_param_pair(self, param, value):
"""
Makes a Java parm pair.
Expand Down Expand Up @@ -180,6 +184,25 @@ def __get_class(clazz):
% stage_name)
return py_stage

def copy(self, extra=None):
"""
Creates a copy of this instance with the same uid and some
extra params. This implementation first calls Params.copy and
then make a copy of the companion Java pipeline component with
extra params. So both the Python wrapper and the Java pipeline
component get copied.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
that = super(JavaParams, self).copy(extra)
if self._java_obj is not None:
that._java_obj = self._java_obj.copy(self._empty_java_param_map())
that._transfer_params_to_java()
return that


@inherit_doc
class JavaEstimator(JavaParams, Estimator):
Expand Down Expand Up @@ -256,21 +279,3 @@ def __init__(self, java_model=None):
super(JavaModel, self).__init__(java_model)
if java_model is not None:
self._resetUid(java_model.uid())

def copy(self, extra=None):
"""
Creates a copy of this instance with the same uid and some
extra params. This implementation first calls Params.copy and
then make a copy of the companion Java model with extra params.
So both the Python wrapper and the Java model get copied.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
that = super(JavaModel, self).copy(extra)
if self._java_obj is not None:
that._java_obj = self._java_obj.copy(self._empty_java_param_map())
that._transfer_params_to_java()
return that

0 comments on commit 0fd8014

Please sign in to comment.