Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15480][UI][Streaming]show missed InputInfo in streaming UI #13259

Closed
wants to merge 3 commits into from

Conversation

mwws
Copy link

@mwws mwws commented May 23, 2016

What changes were proposed in this pull request?

It's a bug in Streaming UI. If BatchDuration is changed by window operation, InputInfo on time with no output ops will not be shown in streaming UI.

Here is a simple example to reproduce:

val inputDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topic)
val matchedDStream = inputDStream.window(batchInteval*2, batchInteval*2)
matchedDStream.foreachRDD(rdd => {
    val count = rdd.count()
    println(s"2 count number: ${count}")
})

How was this patch tested?

manually tested

I manually input 6 records from Kafka. According to output, there are indeed 6 records processed
output

but according to web UI, input records show 0.
original

And here is the Screenshot after my change:
newresult

@SparkQA
Copy link

SparkQA commented May 23, 2016

Test build #59124 has finished for PR 13259 at commit 83351b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mwws
Copy link
Author

mwws commented May 24, 2016

@chenghao-intel
Copy link
Contributor

cc @zsxwing

inputInfoMissedTimes.foreach (time => {
val streamIdToInputInfos = inputInfoTracker.getInfo(time)
val fakeJobSet = JobSet(time, Seq(), streamIdToInputInfos)
listenerBus.post(StreamingListenerBatchCompleted(fakeJobSet.toBatchInfo))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this change the semantics if posing fake StreamingListenerBatchCompleted, if user's code rely on this, will this break their assumptions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it would be a breaking change in that case. Give the fact that some information is indeed missing now, we need either send more events or add additional fields in current event. The later might be better and more correct in semantics, although it can still break current implementation of user's custom listener. And I noticed that listener interface is annotated as DeveloperAPI, so it's expected that API might be changed. How do you think?

@zsxwing
Copy link
Member

zsxwing commented May 24, 2016

I suggest we change InputInfoTracker.getInfo(batchTime: Time) to InputInfoTracker.getInfo(batchTime: Time, batchDuration: Long) and let InputInfoTracker return the aggregated input infos between batchTime - batchDuration and batchTime.

@mwws
Copy link
Author

mwws commented May 25, 2016

@zsxwing thank you for the suggestion, But I have two concerns:

  1. There could be multiple output ops on one BatchTime. And these ops could have different batchDuration. Which one should I use?
  2. The input rate graph is generated by the input size column of batch table. If we do the aggregation, the meaning of input rate graph will be changed from "number of input events from source" to "number of processed events".

@zsxwing
Copy link
Member

zsxwing commented May 26, 2016

@zsxwing thank you for the suggestion, But I have two concerns:

  1. There could be multiple output ops on one BatchTime. And these ops could have different batchDuration. Which one should I use?
  2. The input rate graph is generated by the input size column of batch table. If we do the aggregation, the meaning of input rate graph will be changed from "number of input events from source" to "number of processed events".

How about adding a field lastBatchTime in JobScheduler and storing the last submitted batch? Then just use the aggregated input infos between the last batch and the current batch? We need to aggregate them since we only have batch infos that have jobs.

1. revert perivous change
2. add batchTimesWithNoJob Set to record the batch with no job
3. add aggrate method of InputInfo
@mwws
Copy link
Author

mwws commented Jun 1, 2016

@zsxwing Thanks for your suggestion, I have re-implemented this PR according to that. But instead of maintaining state lastBatchTime in JobScheduler, I would prefer record such info in JobGenerator, so we can have central place to resolve inputInfo.

I according to my manual test the new code works. Could you help review again?

@SparkQA
Copy link

SparkQA commented Jun 1, 2016

Test build #59730 has finished for PR 13259 at commit a3d05e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jun 6, 2016

@mwws sorry for the delay. Could you add a unit test, please?

@mwws
Copy link
Author

mwws commented Jun 7, 2016

@zsxwing Yes sir, a unit test has been added.

@SparkQA
Copy link

SparkQA commented Jun 7, 2016

Test build #60095 has finished for PR 13259 at commit ffd1787.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Oct 25, 2016

@zsxwing Could you take a look at this PR. Would be good to merge this.

@mwws
Copy link
Author

mwws commented Nov 7, 2016

@zsxwing This patch has been pended for a long time, could you help to review?

@maropu maropu mentioned this pull request Apr 23, 2017
maropu added a commit to maropu/spark that referenced this pull request Apr 23, 2017
@asfgit asfgit closed this in e9f9715 Apr 24, 2017
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
This pr proposed to close stale PRs. Currently, we have 400+ open PRs and there are some stale PRs whose JIRA tickets have been already closed and whose JIRA tickets does not exist (also, they seem not to be minor issues).

// Open PRs whose JIRA tickets have been already closed
Closes apache#11785
Closes apache#13027
Closes apache#13614
Closes apache#13761
Closes apache#15197
Closes apache#14006
Closes apache#12576
Closes apache#15447
Closes apache#13259
Closes apache#15616
Closes apache#14473
Closes apache#16638
Closes apache#16146
Closes apache#17269
Closes apache#17313
Closes apache#17418
Closes apache#17485
Closes apache#17551
Closes apache#17463
Closes apache#17625

// Open PRs whose JIRA tickets does not exist and they are not minor issues
Closes apache#10739
Closes apache#15193
Closes apache#15344
Closes apache#14804
Closes apache#16993
Closes apache#17040
Closes apache#15180
Closes apache#17238

N/A

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes apache#17734 from maropu/resolved_pr.

Change-Id: Id2e590aa7283fe5ac01424d30a40df06da6098b5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants