diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index e0d7c1bc9..af722494f 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -103,7 +103,9 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. -## Metrics +## Metrics + +### Spark SQL Metrics Some Comet metrics are not directly comparable to Spark metrics in some cases: @@ -111,10 +113,17 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases: milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times between Spark and Comet. -Comet also adds some custom metrics: +### Native Metrics + +Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are +logged for each native plan (and there is one plan per task, so this is very verbose). + +Here is a guide to some of the native metrics. -### ShuffleWriterExec +### ScanExec -| Metric | Description | -| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | +| Metric | Description | +| ----------------- | --------------------------------------------------------------------------------------------------- | +| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | +| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | +| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index c79eeeb4a..7587ff06d 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); - let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, - jvm_fetch_time, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1085,7 +1083,6 @@ impl Debug for ShuffleRepartitioner { } } -#[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, partition_id: usize, @@ -1094,7 +1091,6 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, - jvm_fetch_time: Time, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1108,23 +1104,13 @@ async fn external_shuffle( context.session_config().batch_size(), ); - loop { - let mut timer = jvm_fetch_time.timer(); - let b = input.next().await; - timer.stop(); - - match b { - Some(batch_result) => { - // Block on the repartitioner to insert the batch and shuffle the rows - // into the corresponding partition buffer. - // Otherwise, pull the next batch from the input stream might overwrite the - // current batch in the repartitioner. - block_on(repartitioner.insert_batch(batch_result?))?; - } - _ => break, - } + while let Some(batch) = input.next().await { + // Block on the repartitioner to insert the batch and shuffle the rows + // into the corresponding partition buffer. + // Otherwise, pull the next batch from the input stream might overwrite the + // current batch in the repartitioner. + block_on(repartitioner.insert_batch(batch?))?; } - repartitioner.shuffle_write().await } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2cb8a84d9..a97caf0db 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -77,6 +77,10 @@ pub struct ScanExec { metrics: ExecutionPlanMetricsSet, /// Baseline metrics baseline_metrics: BaselineMetrics, + /// Time waiting for JVM input plan to execute and return batches + jvm_fetch_time: Time, + /// Time spent in FFI + arrow_ffi_time: Time, } impl ScanExec { @@ -88,6 +92,8 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); + let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0); + let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0); // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the @@ -97,8 +103,13 @@ impl ScanExec { // Dictionary-encoded primitive arrays are always unpacked. let first_batch = if let Some(input_source) = input_source.as_ref() { let mut timer = baseline_metrics.elapsed_compute().timer(); - let batch = - ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?; + let batch = ScanExec::get_next( + exec_context_id, + input_source.as_obj(), + data_types.len(), + &jvm_fetch_time, + &arrow_ffi_time, + )?; timer.stop(); batch } else { @@ -124,6 +135,8 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, + jvm_fetch_time, + arrow_ffi_time, schema, }) } @@ -171,6 +184,8 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), + &self.jvm_fetch_time, + &self.arrow_ffi_time, )?; *current_batch = Some(next_batch); } @@ -185,6 +200,8 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, + jvm_fetch_time: &Time, + arrow_ffi_time: &Time, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -200,6 +217,21 @@ impl ScanExec { let mut env = JVMClasses::get_env()?; + let mut timer = jvm_fetch_time.timer(); + + let num_rows: i32 = unsafe { + jni_call!(&mut env, + comet_batch_iterator(iter).has_next() -> i32)? + }; + + timer.stop(); + + if num_rows == -1 { + return Ok(InputBatch::EOF); + } + + let mut timer = arrow_ffi_time.timer(); + let mut array_addrs = Vec::with_capacity(num_cols); let mut schema_addrs = Vec::with_capacity(num_cols); @@ -233,9 +265,9 @@ impl ScanExec { comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)? }; - if num_rows == -1 { - return Ok(InputBatch::EOF); - } + // we already checked for end of results on call to has_next() so should always + // have a valid row count when calling next() + assert!(num_rows != -1); let mut inputs: Vec = Vec::with_capacity(num_cols); @@ -255,6 +287,8 @@ impl ScanExec { } } + timer.stop(); + Ok(InputBatch::new(inputs, Some(num_rows as usize))) } } diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index 4870624d2..45b10cf20 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -26,6 +26,8 @@ use jni::{ /// A struct that holds all the JNI methods and fields for JVM `CometBatchIterator` class. pub struct CometBatchIterator<'a> { pub class: JClass<'a>, + pub method_has_next: JMethodID, + pub method_has_next_ret: ReturnType, pub method_next: JMethodID, pub method_next_ret: ReturnType, } @@ -38,6 +40,8 @@ impl<'a> CometBatchIterator<'a> { Ok(CometBatchIterator { class, + method_has_next: env.get_method_id(Self::JVM_CLASS, "hasNext", "()I")?, + method_has_next_ret: ReturnType::Primitive(Primitive::Int), method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)I")?, method_next_ret: ReturnType::Primitive(Primitive::Int), }) diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index accd57c20..e05bea1df 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -33,12 +33,31 @@ public class CometBatchIterator { final Iterator input; final NativeUtil nativeUtil; + private ColumnarBatch currentBatch = null; CometBatchIterator(Iterator input, NativeUtil nativeUtil) { this.input = input; this.nativeUtil = nativeUtil; } + /** + * Fetch the next input batch. + * + * @return Number of rows in next batch or -1 if no batches left. + */ + public int hasNext() { + if (currentBatch == null) { + if (input.hasNext()) { + currentBatch = input.next(); + } + } + if (currentBatch == null) { + return -1; + } else { + return currentBatch.numRows(); + } + } + /** * Get the next batches of Arrow arrays. * @@ -47,12 +66,11 @@ public class CometBatchIterator { * @return the number of rows of the current batch. -1 if there is no more batch. */ public int next(long[] arrayAddrs, long[] schemaAddrs) { - boolean hasBatch = input.hasNext(); - - if (!hasBatch) { + if (currentBatch == null) { return -1; } - - return nativeUtil.exportBatch(arrayAddrs, schemaAddrs, input.next()); + int numRows = nativeUtil.exportBatch(arrayAddrs, schemaAddrs, currentBatch); + currentBatch = null; + return numRows; } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 2bb467af5..b33f6b5a6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2909,7 +2909,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case op if isCometSink(op) && op.output.forall(a => supportedDataType(a.dataType, true)) => // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() - scanBuilder.setSource(op.simpleStringWithNodeId()) + val source = op.simpleStringWithNodeId() + if (source.isEmpty) { + scanBuilder.setSource(op.getClass.getSimpleName) + } else { + scanBuilder.setSource(source) + } val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index 9698dc98b..2fc73bb7c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -88,7 +88,7 @@ object CometExecUtils { * child partition */ def getLimitNativePlan(outputAttributes: Seq[Attribute], limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder() + val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("LimitInput") val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => @@ -118,7 +118,7 @@ object CometExecUtils { sortOrder: Seq[SortOrder], child: SparkPlan, limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder() + val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("TopKInput") val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index a7a33c40d..b1dd9ac83 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,9 +77,6 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( - sparkContext, - "time fetching batches from JVM"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -485,14 +482,7 @@ class CometShuffleWriteProcessor( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) - - val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { - CometMetricNode( - nativeSQLMetrics ++ Map("jvm_fetch_time" -> - metrics("jvm_fetch_time"))) - } else { - CometMetricNode(nativeSQLMetrics) - } + val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) @@ -538,7 +528,7 @@ class CometShuffleWriteProcessor( } def getNativePlan(dataFile: String, indexFile: String): Operator = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder() + val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput") val opBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr =>