Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6328] [Python] Python API for StreamingListener #9186

Closed
wants to merge 23 commits into from

Conversation

djalova
Copy link
Contributor

@djalova djalova commented Oct 20, 2015

No description provided.

@djalova djalova changed the title [SPARK-6328] [pyspark] [SPARK-6328] [Python] [Streaming] Python API for StreamingListener Oct 20, 2015
@djalova djalova changed the title [SPARK-6328] [Python] [Streaming] Python API for StreamingListener [SPARK-6328] [Python] Python API for StreamingListener Oct 20, 2015
@tdas
Copy link
Contributor

tdas commented Oct 20, 2015

this is ok to test.

@tdas
Copy link
Contributor

tdas commented Oct 20, 2015

@zsxwing can you take a look.

@SparkQA
Copy link

SparkQA commented Oct 20, 2015

Test build #44014 has finished for PR 9186 at commit aa87c40.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n

@djalova
Copy link
Contributor Author

djalova commented Oct 21, 2015

The test that failed was a flaky StreamingKMeansTest.
Please retest.

@tdas
Copy link
Contributor

tdas commented Oct 21, 2015

test this again

@djalova
Copy link
Contributor Author

djalova commented Oct 21, 2015

@tdas it looks like Jenkins didn't retest

@zsxwing
Copy link
Member

zsxwing commented Oct 22, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44115 has finished for PR 9186 at commit aa87c40.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def onOutputOperationCompleted(self, outputOperationCompleted):
pass

def getEventInfo(self, event):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would make it easier to see which Info is associated with each StreamingListenerEvent but I can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use this method to return instances of the Python friendly classes for these Scala objects?

@zsxwing
Copy link
Member

zsxwing commented Oct 22, 2015

I suggest adding some Python friendly classes for BatchInfo, ReceiverInfo, OutputOperationInfo and converting these Scala objects to them. Some Scala things, like Scala Map, Option, are hard to use in Python.

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44248 has finished for PR 9186 at commit 0ac3df6.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n * class BatchInfo(object):\n * class OutputOperationInfo(object):\n * class ReceiverInfo(object):\n

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44268 has finished for PR 9186 at commit 233104d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n * class BatchInfo(object):\n * class OutputOperationInfo(object):\n * class ReceiverInfo(object):\n

self._test_func(input, func, expected)

# Test occasionally fails without a delay
time.sleep(.1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listener sometimes receives 3 batchCompleted events instead of 4 when I run the test, even though it always gets the correct job output. However, when I add a minor delay the test passes consistently. Can I get someone's opinion on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at PySparkStreamingTestCase.wait_for. You can use it to wait for the expected results or timeout.

@SparkQA
Copy link

SparkQA commented Oct 26, 2015

Test build #44377 has finished for PR 9186 at commit 79d70ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListenerEvent(object):\n * class StreamingListenerBatchSubmitted(StreamingListenerEvent):\n * class StreamingListenerBatchCompleted(StreamingListenerEvent):\n * class StreamingListenerBatchStarted(StreamingListenerEvent):\n * class StreamingListenerOutputOperationStarted(StreamingListenerEvent):\n * class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):\n * class StreamingListenerReceieverStarted(StreamingListenerEvent):\n * class StreamingListenerReceiverError(StreamingListenerEvent):\n * class StreamingListenerReceiverStopped(StreamingListenerEvent):\n * class StreamingListener(object):\n * class StreamingListenerAdapter(StreamingListener):\n * class BatchInfo(object):\n * class OutputOperationInfo(object):\n * class ReceiverInfo(object):\n

def onBatchStarted(self, batchStarted):
batch_info = BatchInfo(batchStarted.batchInfo())
batch_started = StreamingListenerBatchStarted(batch_info)
self.userStreamingListener .onBatchStarted(batch_started)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant space

@SparkQA
Copy link

SparkQA commented Nov 12, 2015

Test build #45690 has finished for PR 9186 at commit 7cdaf37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n

self.assertEqual(info.schedulingDelay(), -1)
self.assertEqual(info.processingDelay(), -1)
self.assertEqual(info.totalDelay(), -1)
self.assertEqual(info.numRecords(), 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just meant you can access the Map here, e.g.:

for streamId in info.streamIdToInputInfo():
                 streamInputInfo = info.streamIdToInputInfo()[streamId]
                 # access fields of streamInputInfo

for outputOpId in info.outputOperationInfos():
                outputOperationInfo =  info.outputOperationInfos()[outputOpId]
                # access fields of outputOperationInfo

@zsxwing
Copy link
Member

zsxwing commented Nov 12, 2015

I just noticed that Streaming Python unit tests cannot report failure. It always says pass even if some test fails. Investigating it.

@zsxwing
Copy link
Member

zsxwing commented Nov 12, 2015

We should retest this PR after merging #9669.

@tdas
Copy link
Contributor

tdas commented Nov 13, 2015

#9669 has been merged. please merge with master and test the PR again.

@zsxwing
Copy link
Member

zsxwing commented Nov 13, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45874 has finished for PR 9186 at commit 7cdaf37.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n

@zsxwing
Copy link
Member

zsxwing commented Nov 13, 2015

@djalova could you fix the test?

[Running <class '__main__.StreamingListenerTests'>]
test_batch_info_reports (__main__.StreamingListenerTests) ... FAIL

======================================================================
FAIL: test_batch_info_reports (__main__.StreamingListenerTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/workspace/SparkPullRequestBuilder@2/python/pyspark/streaming/tests.py", line 444, in test_batch_info_reports
    self.assertEqual(len(batchInfosSubmitted), 4)
AssertionError: 5 != 4

----------------------------------------------------------------------
Ran 1 test in 4.768s

FAILED (failures=1)

@djalova
Copy link
Contributor Author

djalova commented Nov 13, 2015

Sure, I'll change it to check for at least 4 batches.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45880 has finished for PR 9186 at commit 64192d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class JavaGradientBoostingClassificationExample\n * public class JavaGradientBoostingRegressionExample\n * public class JavaRandomForestClassificationExample\n * public class JavaRandomForestRegressionExample\n * class StreamingListener(object):\n

@zsxwing
Copy link
Member

zsxwing commented Nov 13, 2015

@djalova could you add the following checks?

I just meant you can access the Map here, e.g.:

for streamId in info.streamIdToInputInfo():
streamInputInfo = info.streamIdToInputInfo()[streamId]
# access fields of streamInputInfo

for outputOpId in info.outputOperationInfos():
outputOperationInfo = info.outputOperationInfos()[outputOpId]
# access fields of outputOperationInfo

@zsxwing
Copy link
Member

zsxwing commented Nov 13, 2015

Sorry for my unclear comment. I just meant adding the real codes to access fields of streamInputInfo and outputOperationInfo, such as

streamInputInfo.inputStreamId()
streamInputInfo.numRecords()
for key in streamInputInfo.metadata():
    streamInputInfo.metadata()[key]
metadataDescription.metadataDescription()
...

@djalova
Copy link
Contributor Author

djalova commented Nov 13, 2015

Sorry I wasn't reading your comment carefully. I'll make the update.

@SparkQA
Copy link

SparkQA commented Nov 14, 2015

Test build #45905 has finished for PR 9186 at commit 5349c82.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n

@SparkQA
Copy link

SparkQA commented Nov 14, 2015

Test build #45906 has finished for PR 9186 at commit c941c3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class StreamingListener(object):\n

@zsxwing
Copy link
Member

zsxwing commented Nov 14, 2015

Thanks @djalova LGTM

@tdas
Copy link
Contributor

tdas commented Nov 16, 2015

Thanks @djalova and @zsxwing for your efforts. Merging this to master and 1.6

asfgit pushed a commit that referenced this pull request Nov 16, 2015
Author: Daniel Jalova <djalova@us.ibm.com>

Closes #9186 from djalova/SPARK-6328.

(cherry picked from commit ace0db4)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in ace0db4 Nov 16, 2015
@djalova djalova deleted the SPARK-6328 branch August 19, 2016 19:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants