Skip to content

Commit

Permalink
Added more checks to test_batch_info_reports
Browse files Browse the repository at this point in the history
  • Loading branch information
djalova committed Nov 13, 2015
1 parent 64192d6 commit 5349c82
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,15 @@ def func(dstream):
for info in batchInfosSubmitted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
self.assertGreaterEqual(info.submissionTime(), 0)
self.assertTrue(info.streamIdToInputInfo().isEmpty())
self.assertFalse(info.outputOperationInfos().isEmpty())
self.assertIsNotNone(info.outputOperationInfos()[0])

for streamId in info.streamIdToInputInfo():
self.assertIsNotNone(info.streamIdToInputInfo()[streamId])
# access fields of streamInputInfo

for outputOpId in info.outputOperationInfos():
self.assertIsNotNone(info.outputOperationInfos()[outputOpId])
# access fields of outputOperationInfo

self.assertEqual(info.schedulingDelay(), -1)
self.assertEqual(info.processingDelay(), -1)
self.assertEqual(info.totalDelay(), -1)
Expand All @@ -457,9 +463,15 @@ def func(dstream):
for info in batchInfosStarted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
self.assertGreaterEqual(info.submissionTime(), 0)
self.assertTrue(info.streamIdToInputInfo().isEmpty())
self.assertFalse(info.outputOperationInfos().isEmpty())
self.assertIsNotNone(info.outputOperationInfos()[0])

for streamId in info.streamIdToInputInfo():
self.assertIsNotNone(info.streamIdToInputInfo()[streamId])
# access fields of streamInputInfo

for outputOpId in info.outputOperationInfos():
self.assertIsNotNone(info.outputOperationInfos()[outputOpId])
# access fields of outputOperationInfo

self.assertGreaterEqual(info.schedulingDelay(), 0)
self.assertEqual(info.processingDelay(), -1)
self.assertEqual(info.totalDelay(), -1)
Expand All @@ -469,9 +481,15 @@ def func(dstream):
for info in batchInfosCompleted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
self.assertGreaterEqual(info.submissionTime(), 0)
self.assertTrue(info.streamIdToInputInfo().isEmpty())
self.assertFalse(info.outputOperationInfos().isEmpty())
self.assertIsNotNone(info.outputOperationInfos()[0])

for streamId in info.streamIdToInputInfo():
self.assertIsNotNone(info.streamIdToInputInfo()[streamId])
# access fields of streamInputInfo

for outputOpId in info.outputOperationInfos():
self.assertIsNotNone(info.outputOperationInfos()[outputOpId])
# access fields of outputOperationInfo

self.assertGreaterEqual(info.schedulingDelay(), 0)
self.assertGreaterEqual(info.processingDelay(), 0)
self.assertGreaterEqual(info.totalDelay(), 0)
Expand Down

0 comments on commit 5349c82

Please sign in to comment.