Skip to content

Commit

Permalink
Added StreamingListenerAdapter for Python.
Browse files Browse the repository at this point in the history
  • Loading branch information
djalova committed Oct 26, 2015
1 parent 233104d commit 79d70ab
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 20 deletions.
5 changes: 3 additions & 2 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer
from pyspark.streaming.listener import StreamingListener
from pyspark.streaming.listener import StreamingListenerAdapter

__all__ = ["StreamingContext"]

Expand Down Expand Up @@ -370,4 +370,5 @@ def addStreamingListener(self, streamingListener):
Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
receiving system events related to streaming.
"""
self._jssc.addStreamingListener(streamingListener)
streamingListenerAdapter = StreamingListenerAdapter(streamingListener)
self._jssc.addStreamingListener(streamingListenerAdapter)
123 changes: 108 additions & 15 deletions python/pyspark/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,68 @@
__all__ = ["StreamingListener"]


class StreamingListenerEvent(object):

def __init__(self):
pass


class StreamingListenerBatchSubmitted(StreamingListenerEvent):

def __init__(self, batchInfo):
super(StreamingListenerEvent, self).__init__()
self.batchInfo = batchInfo


class StreamingListenerBatchCompleted(StreamingListenerEvent):

def __init__(self, batchInfo):
super(StreamingListenerEvent, self).__init__()
self.batchInfo = batchInfo


class StreamingListenerBatchStarted(StreamingListenerEvent):

def __init__(self, batchInfo):
super(StreamingListenerEvent, self).__init__()
self.batchInfo = batchInfo


class StreamingListenerOutputOperationStarted(StreamingListenerEvent):

def __init__(self, outputOperationInfo):
super(StreamingListenerEvent, self).__init__()
self.outputOperationInfo = outputOperationInfo


class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):

def __init__(self, outputOperationInfo):
super(StreamingListenerEvent, self).__init__()
self.outputOperationInfo = outputOperationInfo


class StreamingListenerReceieverStarted(StreamingListenerEvent):

def __init__(self, receiverInfo):
super(StreamingListenerEvent, self).__init__()
self.receiverInfo = receiverInfo


class StreamingListenerReceiverError(StreamingListenerEvent):

def __init__(self, receiverInfo):
super(StreamingListenerEvent, self).__init__()
self.receiverInfo = receiverInfo


class StreamingListenerReceiverStopped(StreamingListenerEvent):

def __init__(self, receiverInfo):
super(StreamingListenerEvent, self).__init__()
self.receiverInfo = receiverInfo


class StreamingListener(object):

def __init__(self):
Expand Down Expand Up @@ -71,24 +133,55 @@ def onOutputOperationCompleted(self, outputOperationCompleted):
"""
pass

def getEventInfo(self, event):
"""
:param event: StreamingListenerEvent
:return Returns a BatchInfo, OutputOperationInfo, or ReceiverInfo based on
event passed.
"""
event_name = event.getClass().getSimpleName()
if 'Batch' in event_name:
return BatchInfo(event.batchInfo())
class Java:
implements = ["org.apache.spark.streaming.scheduler.StreamingListener"]

elif 'Output' in event_name:
return OutputOperationInfo(event.outputOperationInfo())

elif 'Receiver' in event_name:
return ReceiverInfo(event.receiverInfo())
class StreamingListenerAdapter(StreamingListener):

class Java:
implements = ["org.apache.spark.streaming.scheduler.StreamingListener"]
def __init__(self, streamingListener):
super(StreamingListener, self).__init__()
self.userStreamingListener = streamingListener

def onReceiverStarted(self, receiverStarted):
receiver_info = ReceiverInfo(receiverStarted.receiverInfo())
receiver_started = StreamingListenerReceieverStarted(receiver_info)
self.userStreamingListener.onReceiverStarted(receiver_started)

def onReceiverError(self, receiverError):
receiver_info = ReceiverInfo(receiverError.receiverInfo())
receiver_error = StreamingListenerReceiverError(receiver_info)
self.userStreamingListener.onReceiverError(receiver_error)

def onReceiverStopped(self, receiverStopped):
receiver_info = ReceiverInfo(receiverStopped.receiverInfo())
receiver_stopped = StreamingListenerReceiverStopped(receiver_info)
self.userStreamingListener.onReceiverStopped(receiver_stopped)

def onBatchSubmitted(self, batchSubmitted):
batch_info = BatchInfo(batchSubmitted.batchInfo())
batch_submitted = StreamingListenerBatchSubmitted(batch_info)
self.userStreamingListener.onBatchSubmitted(batch_submitted)

def onBatchStarted(self, batchStarted):
batch_info = BatchInfo(batchStarted.batchInfo())
batch_started = StreamingListenerBatchStarted(batch_info)
self.userStreamingListener .onBatchStarted(batch_started)

def onBatchCompleted(self, batchCompleted):
batch_info = BatchInfo(batchCompleted.batchInfo())
batch_completed = StreamingListenerBatchCompleted(batch_info)
self.userStreamingListener.onBatchCompleted(batch_completed)

def onOutputOperationStarted(self, outputOperationStarted):
output_op_info = OutputOperationInfo(outputOperationStarted.outputOperationInfo())
output_operation_started = StreamingListenerOutputOperationStarted(output_op_info)
self.userStreamingListener.onOutputOperationStarted(output_operation_started)

def onOutputOperationCompleted(self, outputOperationCompleted):
output_op_info = OutputOperationInfo(outputOperationCompleted.outputOperationInfo())
output_operation_completed = StreamingListenerOutputOperationCompleted(output_op_info)
self.userStreamingListener.onOutputOperationCompleted(output_operation_completed)


class BatchInfo(object):
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,13 @@ def __init__(self):
self.batchInfosSubmitted = []

def onBatchSubmitted(self, batchSubmitted):
self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted))
self.batchInfosSubmitted.append(batchSubmitted.batchInfo)

def onBatchStarted(self, batchStarted):
self.batchInfosStarted.append(self.getEventInfo(batchStarted))
self.batchInfosStarted.append(batchStarted.batchInfo)

def onBatchCompleted(self, batchCompleted):
self.batchInfosCompleted.append(self.getEventInfo(batchCompleted))
self.batchInfosCompleted.append(batchCompleted.batchInfo)

def test_batch_info_reports(self):
batch_collector = self.BatchInfoCollector()
Expand Down

0 comments on commit 79d70ab

Please sign in to comment.