-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22789] Map-only continuous processing execution #19984
Conversation
Test build #84937 has finished for PR 19984 at commit
|
.internal() | ||
.doc("The interval at which continuous execution readers will poll to check whether" + | ||
" the epoch has advanced on the driver.") | ||
.intConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeConf
?
.internal() | ||
.doc("The size (measured in number of rows) of the queue used in continuous execution to" + | ||
" buffer the results of a ContinuousDataReader.") | ||
.intConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
longConf
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be? I can't imagine anything close to MAX_INT being a reasonable value here. Will it be hard to migrate to a long if we later discover it's needed?
* import scala.concurrent.duration._ | ||
* df.writeStream.trigger(Trigger.Continuous(10.seconds)) | ||
* }}} | ||
* @since 2.2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.3.0?
* {{{ | ||
* df.writeStream.trigger(Trigger.Continuous("10 seconds")) | ||
* }}} | ||
* @since 2.2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.3.0?
* | ||
* {{{ | ||
* import java.util.concurrent.TimeUnit | ||
* df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trigger.Continuous(10, TimeUnit.SECONDS)
instead of ProcessingTime.create(10, TimeUnit.SECONDS)
?
Test build #84973 has finished for PR 19984 at commit
|
Test build #84975 has finished for PR 19984 at commit
|
Test build #84976 has finished for PR 19984 at commit
|
Test build #84981 has finished for PR 19984 at commit
|
Test build #85079 has finished for PR 19984 at commit
|
Test build #85089 has finished for PR 19984 at commit
|
Test build #85092 has finished for PR 19984 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Half-finished the review. Just posted the comments as I'm leaving now.
/** | ||
* Atomically increment the current epoch and get the new value. | ||
*/ | ||
private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case class
-> case object
/** | ||
* Get the current epoch. | ||
*/ | ||
private[sql] case class GetCurrentEpoch() extends EpochCoordinatorMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case class
-> case object
@@ -303,7 +299,7 @@ abstract class StreamExecution( | |||
e, | |||
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, | |||
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString) | |||
logError(s"Query $prettyIdString terminated with error", e) | |||
// logError(s"Query $prettyIdString terminated with error", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: restore logError
.map(f => pathToBatchId(f.getPath)) | ||
|
||
for (batchId <- batchIds if batchId > thresholdBatchId) { | ||
print(s"AAAAA purging\n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this
* @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with. | ||
*/ | ||
private def runContinuous(sparkSessionForQuery: SparkSession): Unit = { | ||
import scala.collection.JavaConverters._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this to the beginning of this file
|
||
case t: Throwable => | ||
failedFlag.set(true) | ||
failureReason = t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
throw new SparkException("epoch poll failed", epochPollRunnable.failureReason) | ||
} | ||
|
||
queue.take() match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if dataReaderFailed
is set when a thread is blocking here? Seems the task will block forever.
reader: DataReader[UnsafeRow], | ||
queue: BlockingQueue[(UnsafeRow, PartitionOffset)], | ||
context: TaskContext, | ||
failedFlag: AtomicBoolean) extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set a proper thread name for this thread
case t: Throwable => | ||
failedFlag.set(true) | ||
failureReason = t | ||
throw t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot throw t
here as it will kill the executor. See org.apache.spark.util.SparkUncaughtExceptionHandler
case t: Throwable => | ||
failedFlag.set(true) | ||
failureReason = t | ||
throw t |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
c00e104
to
825d437
Compare
Test build #85216 has finished for PR 19984 at commit
|
Test build #85218 has finished for PR 19984 at commit
|
The result says it fails Spark unit tests, but clicking through shows a count of 0. |
retest this please |
Test build #85230 has finished for PR 19984 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Left some minor comments
logInfo(s"Writer for partition ${context.partitionId()} is committing.") | ||
val msg = dataWriter.commit() | ||
logInfo(s"Writer for partition ${context.partitionId()} committed.") | ||
EpochCoordinatorRef.get(runId, SparkEnv.get).send( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: EpochCoordinatorRef.get
is not cheap. You can store it outside the loop.
@@ -19,6 +19,7 @@ | |||
|
|||
import java.util.concurrent.TimeUnit; | |||
|
|||
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this below import org.apache.spark.annotation.InterfaceStability;
assert(queryExecutionThread eq Thread.currentThread, | ||
"logicalPlan must be initialized in StreamExecutionThread " + | ||
s"but the current thread was ${Thread.currentThread}") | ||
var nextSourceId = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unused
EpochCoordinatorRef.create( | ||
writer.get(), reader, currentBatchId, | ||
id.toString, runId.toString, sparkSession, SparkEnv.get) | ||
val epochUpdateThread = new Thread(new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: -> new Thread(new Runnable {
-> new Thread(s"epoch update thread for $prettyIdString", new Runnable {
|
||
if (reader.needsReconfiguration()) { | ||
stopSources() | ||
state.set(RECONFIGURING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this above stopSources()
. Otherwise, the stream thread may see the exception before setting state to RECONFIGURING
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that order is fine too, but I don't think stopSources can cause an exception in the query execution thread. queryExecutionThread.interrupt() is the line which passes control flow; until that runs the query execution thread should be sitting there waiting for the long-running spark task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stopping a source may cause an exception, e.g., it closes a socket while queryExecutionThread is reading from it.
if (thisEpochCommits.size == numWriterPartitions && | ||
nextEpochOffsets.size == numReaderPartitions) { | ||
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") | ||
val query = session.streams.get(queryId).asInstanceOf[StreamingQueryWrapper] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not pass the query into EpochCoordinator
's constructor? Getting a query from StreamingQueryManager may have a race condition because the query can fail before we process CommitPartitionEpoch
messages. If so, session.streams.get(queryId)
will return null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jose-torres forgot to update this? Also you don't need queryId
any more.
assert(continuousSources.length == 1, "only one continuous source supported currently") | ||
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") | ||
synchronized { | ||
commitLog.add(epoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is called in the RPC thread, we should also check if this query is still alive here.
AwaitEpoch(0), | ||
Execute(waitForRateSourceTriggers(_, 2)), | ||
IncrementEpoch(), | ||
CheckAnswer(scala.Range(0, 10): _*), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not do an exact match check here.
Test build #85287 has finished for PR 19984 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more comment. Otherwise LGTM
if (thisEpochCommits.size == numWriterPartitions && | ||
nextEpochOffsets.size == numReaderPartitions) { | ||
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") | ||
val query = session.streams.get(queryId).asInstanceOf[StreamingQueryWrapper] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jose-torres forgot to update this? Also you don't need queryId
any more.
Test build #85332 has finished for PR 19984 at commit
|
Thanks! Merging to master! |
@@ -75,6 +76,52 @@ case class StreamingExecutionRelation( | |||
) | |||
} | |||
|
|||
// We have to pack in the V1 data source as a shim, for the case when a source implements | |||
// continuous processing (which is always V2) but only has V1 microbatch support. We don't | |||
// know at read time whether the query is conntinuous or not, so we need to be able to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we know it from the specified trigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trigger isn't specified at the point where the dataframe is created.
What changes were proposed in this pull request?
Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC.
How was this patch tested?
new unit-ish tests (exercising execution end to end)