Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22789] Map-only continuous processing execution #19984

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b1ad771
Refactor StreamExecution into a parent class so continuous processing…
jose-torres Dec 7, 2017
4058c65
address fmt
jose-torres Dec 12, 2017
2af3920
slight changes
jose-torres Dec 13, 2017
cd3d28a
rm spurious space
jose-torres Dec 13, 2017
fdc404d
fix compile
jose-torres Dec 13, 2017
7d97c22
harness
jose-torres Dec 11, 2017
bd20abd
awaitEpoch impl
jose-torres Dec 11, 2017
b930c2a
move local[10] to only continuous suite
jose-torres Dec 11, 2017
5c2d1b2
repeatedly restart
jose-torres Dec 11, 2017
38e989b
fix some simple TODOs
jose-torres Dec 11, 2017
9e031f5
use runId instead of queryId for endpoint name
jose-torres Dec 11, 2017
690eadc
more simple todos
jose-torres Dec 11, 2017
4df4f04
remove old state
jose-torres Dec 11, 2017
6701310
remove clean shutdown workaround in StreamTest
jose-torres Dec 11, 2017
1cab58f
update ContinuousExecution docs
jose-torres Dec 11, 2017
1c11b7c
add comments to EpochCoordinator
jose-torres Dec 11, 2017
c6a580c
change offset semantic to end of previous epoch
jose-torres Dec 12, 2017
a76987a
document EpochCoordinator
jose-torres Dec 12, 2017
1d87302
simplify epoch handling
jose-torres Dec 12, 2017
89b9ee6
stress tests
jose-torres Dec 12, 2017
bdafb15
add minBatchesToRetain
jose-torres Dec 12, 2017
5ad34a1
add confs
jose-torres Dec 12, 2017
1da0559
latency suite not meaningful here
jose-torres Dec 12, 2017
7c5b438
more stress::q
jose-torres Dec 13, 2017
bedd7b3
use temp dir
jose-torres Dec 13, 2017
e5bf024
fix against rebase
jose-torres Dec 14, 2017
d16410f
fix ser/deser
jose-torres Dec 14, 2017
6eaba32
fix rebase compile
jose-torres Dec 14, 2017
b48c8b0
stop using ProcessingTime in executor
jose-torres Dec 14, 2017
2536290
add tests for supported ops
jose-torres Dec 14, 2017
2eb048d
unsupported operation check for unsupported continuous mode ops
jose-torres Dec 15, 2017
d175cc9
current timestamp test
jose-torres Dec 15, 2017
2abfd0e
address trigger comments
jose-torres Dec 15, 2017
2d3fb96
update mima excludes for private[sql] method change
jose-torres Dec 15, 2017
24fed79
remove check microbatch is in registry
jose-torres Dec 15, 2017
c651206
fix computeStats test
jose-torres Dec 15, 2017
6c5870b
fail task if subthreads fail
jose-torres Dec 19, 2017
12f2955
fix race conditions
jose-torres Dec 19, 2017
c00b099
make sure each op can advance epoch in test
jose-torres Dec 19, 2017
3330ae4
handle data reader failure when queue is empty
jose-torres Dec 20, 2017
2f902f2
set failure reason before flag
jose-torres Dec 20, 2017
047d48b
don't throw in data reader thread
jose-torres Dec 20, 2017
a04978e
set thread name
jose-torres Dec 20, 2017
b672370
only check InterruptedException
jose-torres Dec 20, 2017
e4d6e9d
move import
jose-torres Dec 20, 2017
35a72c7
case object instead of case class
jose-torres Dec 20, 2017
004f865
fulfil contract for iterator
jose-torres Dec 20, 2017
825d437
fix compile after rebase
jose-torres Dec 21, 2017
07a9e06
address review comments
jose-torres Dec 21, 2017
b4f7976
remove unnecessary queryId
jose-torres Dec 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ object MimaExcludes {

// Exclude rules for 2.3.x
lazy val v23excludes = v22excludes ++ Seq(
// SPARK-22789: Map-only continuous processing execution
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$8"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$6"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$9"),

// SPARK-22372: Make cluster submission use SparkApplication.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getSecretKeyFromUserCredentials"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.isYarnMode"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, CurrentDate, CurrentTimestamp, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -339,6 +339,29 @@ object UnsupportedOperationChecker {
}
}

def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {
checkForStreaming(plan, outputMode)

plan.foreachUp { implicit subPlan =>
subPlan match {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
}

subPlan.expressions.foreach { e =>
if (e.collectLeaves().exists {
case (_: CurrentTimestamp | _: CurrentDate) => true
case _ => false
}) {
throwError(s"Continuous processing does not support current time operations.")
}
}
}
}

private def throwErrorIf(
condition: Boolean,
msg: String)(implicit operator: LogicalPlan): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,22 @@ object SQLConf {
"When this conf is not set, the value from `spark.redaction.string.regex` is used.")
.fallbackConf(org.apache.spark.internal.config.STRING_REDACTION_PATTERN)

val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.executorQueueSize")
.internal()
.doc("The size (measured in number of rows) of the queue used in continuous execution to" +
" buffer the results of a ContinuousDataReader.")
.intConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

longConf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be? I can't imagine anything close to MAX_INT being a reasonable value here. Will it be hard to migrate to a long if we later discover it's needed?

.createWithDefault(1024)

val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
.internal()
.doc("The interval at which continuous execution readers will poll to check whether" +
" the epoch has advanced on the driver.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -1357,6 +1373,11 @@ class SQLConf extends Serializable with Logging {

def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)

def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)

def continuousStreamingExecutorPollIntervalMs: Long =
getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,10 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reade
default boolean needsReconfiguration() {
return false;
}

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSourc
* @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
*/
Offset deserializeOffset(String json);

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
}
54 changes: 54 additions & 0 deletions sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import scala.concurrent.duration.Duration;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;

/**
Expand Down Expand Up @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
public static Trigger Once() {
return OneTimeTrigger$.MODULE$;
}

/**
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* @since 2.3.0
*/
public static Trigger Continuous(long intervalMs) {
return ContinuousTrigger.apply(intervalMs);
}

/**
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* {{{
* import java.util.concurrent.TimeUnit
* df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.3.0
*/
public static Trigger Continuous(long interval, TimeUnit timeUnit) {
return ContinuousTrigger.create(interval, timeUnit);
}

/**
* (Scala-friendly)
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* {{{
* import scala.concurrent.duration._
* df.writeStream.trigger(Trigger.Continuous(10.seconds))
* }}}
* @since 2.3.0
*/
public static Trigger Continuous(Duration interval) {
return ContinuousTrigger.apply(interval);
}

/**
* A trigger that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*
* {{{
* df.writeStream.trigger(Trigger.Continuous("10 seconds"))
* }}}
* @since 2.3.0
*/
public static Trigger Continuous(String interval) {
return ContinuousTrigger.apply(interval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlanV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType

/**
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
Expand Down Expand Up @@ -374,6 +376,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
StreamingRelationExec(s.sourceName, s.output) :: Nil
case s: StreamingExecutionRelation =>
StreamingRelationExec(s.toString, s.output) :: Nil
case s: StreamingRelationV2 =>
StreamingRelationExec(s.sourceName, s.output) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -404,6 +408,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
case MemoryPlanV2(sink, output) =>
val encoder = RowEncoder(StructType.fromAttributes(output))
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil

case logical.Distinct(child) =>
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousDataSourceRDD, ContinuousExecution, EpochCoordinatorRef, SetReaderPartitions}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType

Expand All @@ -52,10 +54,20 @@ case class DataSourceV2ScanExec(
}.asJava
}

val inputRDD = new DataSourceRDD(sparkContext, readTasks)
.asInstanceOf[RDD[InternalRow]]
val inputRDD = reader match {
case _: ContinuousReader =>
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.RUN_ID_KEY), sparkContext.env)
.askSync[Unit](SetReaderPartitions(readTasks.size()))

new ContinuousDataSourceRDD(sparkContext, sqlContext, readTasks)

case _ =>
new DataSourceRDD(sparkContext, readTasks)
}

val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
inputRDD.asInstanceOf[RDD[InternalRow]].map { r =>
numOutputRows += 1
r
}
Expand All @@ -73,7 +85,7 @@ class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], schema: StructType)
}
}

class RowToUnsafeDataReader(rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
class RowToUnsafeDataReader(val rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
extends DataReader[UnsafeRow] {

override def next: Boolean = rowReader.next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand All @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -58,10 +60,22 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
s"The input RDD has ${messages.length} partitions.")

try {
val runTask = writer match {
case w: ContinuousWriter =>
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.RUN_ID_KEY), sparkContext.env)
.askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))

(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.runContinuous(writeTask, context, iter)
case _ =>
(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.run(writeTask, context, iter)
}

sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[InternalRow]) =>
DataWritingSparkTask.run(writeTask, context, iter),
runTask,
rdd.partitions.indices,
(index, message: WriterCommitMessage) => messages(index) = message
)
Expand All @@ -70,6 +84,8 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
writer.commit(messages)
logInfo(s"Data source writer $writer committed.")
} catch {
case _: InterruptedException if writer.isInstanceOf[ContinuousWriter] =>
// Interruption is how continuous queries are ended, so accept and ignore the exception.
case cause: Throwable =>
logError(s"Data source writer $writer is aborting.")
try {
Expand Down Expand Up @@ -109,6 +125,44 @@ object DataWritingSparkTask extends Logging {
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
}

def runContinuous(
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.RUN_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong

do {
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
iter.foreach(dataWriter.write)
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
logInfo(s"Writer for partition ${context.partitionId()} committed.")
epochCoordinator.send(
CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
)
currentEpoch += 1
} catch {
case _: InterruptedException =>
// Continuous shutdown always involves an interrupt. Just finish the task.
}
})(catchBlock = {
// If there is an error, abort this writer
logError(s"Writer for partition ${context.partitionId()} is aborting.")
dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
} while (!context.isInterrupted())

currentMsg
}
}

class InternalRowDataWriterFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@

package org.apache.spark.sql.execution.streaming;

import org.apache.spark.sql.sources.v2.reader.Offset;

/**
* The shared interface between V1 streaming sources and V2 streaming readers.
*
* This is a temporary interface for compatibility during migration. It should not be implemented
* directly, and will be removed in future versions.
*/
public interface BaseStreamingSource {
/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);

/** Stop this source and free any resources it has allocated. */
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,20 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}

/**
* Removes all log entries later than thresholdBatchId (exclusive).
*/
def purgeAfter(thresholdBatchId: Long): Unit = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))

for (batchId <- batchIds if batchId > thresholdBatchId) {
val path = batchIdToPath(batchId)
fileManager.delete(path)
logTrace(s"Removed metadata log file: $path")
}
}

private def createFileManager(): FileManager = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
Expand Down
Loading