diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index cb928cf892ff9..ef0980e902f8a 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -28,7 +28,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer -from pyspark.streaming.listener import StreamingListener +from pyspark.streaming.listener import StreamingListenerAdapter __all__ = ["StreamingContext"] @@ -370,4 +370,5 @@ def addStreamingListener(self, streamingListener): Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for receiving system events related to streaming. """ - self._jssc.addStreamingListener(streamingListener) + streamingListenerAdapter = StreamingListenerAdapter(streamingListener) + self._jssc.addStreamingListener(streamingListenerAdapter) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index 221767bf85a79..80d138ecf1c05 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -18,6 +18,68 @@ __all__ = ["StreamingListener"] +class StreamingListenerEvent(object): + + def __init__(self): + pass + + +class StreamingListenerBatchSubmitted(StreamingListenerEvent): + + def __init__(self, batchInfo): + super(StreamingListenerEvent, self).__init__() + self.batchInfo = batchInfo + + +class StreamingListenerBatchCompleted(StreamingListenerEvent): + + def __init__(self, batchInfo): + super(StreamingListenerEvent, self).__init__() + self.batchInfo = batchInfo + + +class StreamingListenerBatchStarted(StreamingListenerEvent): + + def __init__(self, batchInfo): + super(StreamingListenerEvent, self).__init__() + self.batchInfo = batchInfo + + +class StreamingListenerOutputOperationStarted(StreamingListenerEvent): + + def __init__(self, outputOperationInfo): + super(StreamingListenerEvent, self).__init__() + self.outputOperationInfo = outputOperationInfo + + +class StreamingListenerOutputOperationCompleted(StreamingListenerEvent): + + def __init__(self, outputOperationInfo): + super(StreamingListenerEvent, self).__init__() + self.outputOperationInfo = outputOperationInfo + + +class StreamingListenerReceieverStarted(StreamingListenerEvent): + + def __init__(self, receiverInfo): + super(StreamingListenerEvent, self).__init__() + self.receiverInfo = receiverInfo + + +class StreamingListenerReceiverError(StreamingListenerEvent): + + def __init__(self, receiverInfo): + super(StreamingListenerEvent, self).__init__() + self.receiverInfo = receiverInfo + + +class StreamingListenerReceiverStopped(StreamingListenerEvent): + + def __init__(self, receiverInfo): + super(StreamingListenerEvent, self).__init__() + self.receiverInfo = receiverInfo + + class StreamingListener(object): def __init__(self): @@ -71,24 +133,55 @@ def onOutputOperationCompleted(self, outputOperationCompleted): """ pass - def getEventInfo(self, event): - """ - :param event: StreamingListenerEvent - :return Returns a BatchInfo, OutputOperationInfo, or ReceiverInfo based on - event passed. - """ - event_name = event.getClass().getSimpleName() - if 'Batch' in event_name: - return BatchInfo(event.batchInfo()) + class Java: + implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] - elif 'Output' in event_name: - return OutputOperationInfo(event.outputOperationInfo()) - elif 'Receiver' in event_name: - return ReceiverInfo(event.receiverInfo()) +class StreamingListenerAdapter(StreamingListener): - class Java: - implements = ["org.apache.spark.streaming.scheduler.StreamingListener"] + def __init__(self, streamingListener): + super(StreamingListener, self).__init__() + self.userStreamingListener = streamingListener + + def onReceiverStarted(self, receiverStarted): + receiver_info = ReceiverInfo(receiverStarted.receiverInfo()) + receiver_started = StreamingListenerReceieverStarted(receiver_info) + self.userStreamingListener.onReceiverStarted(receiver_started) + + def onReceiverError(self, receiverError): + receiver_info = ReceiverInfo(receiverError.receiverInfo()) + receiver_error = StreamingListenerReceiverError(receiver_info) + self.userStreamingListener.onReceiverError(receiver_error) + + def onReceiverStopped(self, receiverStopped): + receiver_info = ReceiverInfo(receiverStopped.receiverInfo()) + receiver_stopped = StreamingListenerReceiverStopped(receiver_info) + self.userStreamingListener.onReceiverStopped(receiver_stopped) + + def onBatchSubmitted(self, batchSubmitted): + batch_info = BatchInfo(batchSubmitted.batchInfo()) + batch_submitted = StreamingListenerBatchSubmitted(batch_info) + self.userStreamingListener.onBatchSubmitted(batch_submitted) + + def onBatchStarted(self, batchStarted): + batch_info = BatchInfo(batchStarted.batchInfo()) + batch_started = StreamingListenerBatchStarted(batch_info) + self.userStreamingListener .onBatchStarted(batch_started) + + def onBatchCompleted(self, batchCompleted): + batch_info = BatchInfo(batchCompleted.batchInfo()) + batch_completed = StreamingListenerBatchCompleted(batch_info) + self.userStreamingListener.onBatchCompleted(batch_completed) + + def onOutputOperationStarted(self, outputOperationStarted): + output_op_info = OutputOperationInfo(outputOperationStarted.outputOperationInfo()) + output_operation_started = StreamingListenerOutputOperationStarted(output_op_info) + self.userStreamingListener.onOutputOperationStarted(output_operation_started) + + def onOutputOperationCompleted(self, outputOperationCompleted): + output_op_info = OutputOperationInfo(outputOperationCompleted.outputOperationInfo()) + output_operation_completed = StreamingListenerOutputOperationCompleted(output_op_info) + self.userStreamingListener.onOutputOperationCompleted(output_operation_completed) class BatchInfo(object): diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index fc76c6ea2300f..223984f10ad8e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -412,13 +412,13 @@ def __init__(self): self.batchInfosSubmitted = [] def onBatchSubmitted(self, batchSubmitted): - self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted)) + self.batchInfosSubmitted.append(batchSubmitted.batchInfo) def onBatchStarted(self, batchStarted): - self.batchInfosStarted.append(self.getEventInfo(batchStarted)) + self.batchInfosStarted.append(batchStarted.batchInfo) def onBatchCompleted(self, batchCompleted): - self.batchInfosCompleted.append(self.getEventInfo(batchCompleted)) + self.batchInfosCompleted.append(batchCompleted.batchInfo) def test_batch_info_reports(self): batch_collector = self.BatchInfoCollector()