From 5349c825de7ec1483b8d9c28480ec4071f75ea01 Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Fri, 13 Nov 2015 15:02:34 -0800 Subject: [PATCH] Added more checks to test_batch_info_reports --- python/pyspark/streaming/tests.py | 36 +++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4d4195a4a9f3e..8a285523ada16 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -445,9 +445,15 @@ def func(dstream): for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) - self.assertTrue(info.streamIdToInputInfo().isEmpty()) - self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos()[0]) + + for streamId in info.streamIdToInputInfo(): + self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) + # access fields of streamInputInfo + + for outputOpId in info.outputOperationInfos(): + self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) + # access fields of outputOperationInfo + self.assertEqual(info.schedulingDelay(), -1) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) @@ -457,9 +463,15 @@ def func(dstream): for info in batchInfosStarted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) - self.assertTrue(info.streamIdToInputInfo().isEmpty()) - self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos()[0]) + + for streamId in info.streamIdToInputInfo(): + self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) + # access fields of streamInputInfo + + for outputOpId in info.outputOperationInfos(): + self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) + # access fields of outputOperationInfo + self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertEqual(info.processingDelay(), -1) self.assertEqual(info.totalDelay(), -1) @@ -469,9 +481,15 @@ def func(dstream): for info in batchInfosCompleted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) self.assertGreaterEqual(info.submissionTime(), 0) - self.assertTrue(info.streamIdToInputInfo().isEmpty()) - self.assertFalse(info.outputOperationInfos().isEmpty()) - self.assertIsNotNone(info.outputOperationInfos()[0]) + + for streamId in info.streamIdToInputInfo(): + self.assertIsNotNone(info.streamIdToInputInfo()[streamId]) + # access fields of streamInputInfo + + for outputOpId in info.outputOperationInfos(): + self.assertIsNotNone(info.outputOperationInfos()[outputOpId]) + # access fields of outputOperationInfo + self.assertGreaterEqual(info.schedulingDelay(), 0) self.assertGreaterEqual(info.processingDelay(), 0) self.assertGreaterEqual(info.totalDelay(), 0)