diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 6b6cc653dfa6f..a1a9be1eae439 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -21,6 +21,7 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer from pyspark.context import SparkContext from pyspark.streaming.dstream import DStream +from pyspark.streaming.duration import Duration from py4j.java_collections import ListConverter @@ -107,6 +108,20 @@ def awaitTermination(self, timeout=None): else: self._jssc.awaitTermination(timeout) + def remember(self, duration): + """ + Set each DStreams in this context to remember RDDs it generated in the last given duration. + DStreams remember RDDs only for a limited duration of time and releases them for garbage + collection. This method allows the developer to specify how to long to remember the RDDs ( + if the developer wishes to query old data outside the DStream computation). + @param duration pyspark.streaming.duration.Duration object. + Minimum duration that each DStream should remember its RDDs + """ + if not isinstance(duration, Duration): + raise TypeError("Input should be pyspark.streaming.duration.Duration object") + + self._jssc.remember(duration._jduration) + # TODO: add storageLevel def socketTextStream(self, hostname, port): """