Skip to content

Commit

Permalink
[SPARK-51316][PYTHON][FOLLOW-UP] Revert unrelated changes and mark ma…
Browse files Browse the repository at this point in the history
…pInPandas/mapInArrow batched in byte size

### What changes were proposed in this pull request?

This PR is a followup of #50096 that reverts unrelated changes and mark mapInPandas/mapInArrow batched in byte size

### Why are the changes needed?

To make the original change self-contained, and mark mapInPandas/mapInArrow batched in byte size to be consistent.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released out yet.

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50111 from HyukjinKwon/SPARK-51316-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Feb 28, 2025
1 parent 208a7ee commit 5b45671
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.ReferenceAllColumns
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, FunctionUtils, LogicalGroupState}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.python.BatchIterator
import org.apache.spark.sql.execution.r.ArrowRRunner
import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -218,13 +219,17 @@ case class MapPartitionsInRWithArrowExec(
child: SparkPlan) extends UnaryExecNode {
override def producedAttributes: AttributeSet = AttributeSet(output)

private val batchSize = conf.arrowMaxRecordsPerBatch

override def outputPartitioning: Partitioning = child.outputPartitioning

override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { inputIter =>
val outputTypes = schema.map(_.dataType)

val batchIter = Iterator(inputIter)
// DO NOT use iter.grouped(). See BatchIterator.
val batchIter =
if (batchSize > 0) new BatchIterator(inputIter, batchSize) else Iterator(inputIter)

val runner = new ArrowRRunner(func, packageNames, broadcastVars, inputSchema,
SQLConf.get.sessionLocalTimeZone, RRunnerModes.DATAFRAME_DAPPLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MapInBatchEvaluatorFactory(
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
None)
None) with BatchedPythonArrowInput
val columnarBatchIter = pyRunner.compute(batchIter, context.partitionId(), context)

val unsafeProj = UnsafeProjection.create(output, output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ private[python] trait BasicPythonArrowInput extends PythonArrowInput[Iterator[In

private[python] trait BatchedPythonArrowInput extends BasicPythonArrowInput {
self: BasePythonRunner[Iterator[InternalRow], _] =>
private val arrowMaxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
private val arrowMaxRecordsPerBatch = {
val v = SQLConf.get.arrowMaxRecordsPerBatch
if (v > 0) v else Int.MaxValue
}

private val maxBytesPerBatch = SQLConf.get.arrowMaxBytesPerBatch

Expand Down

0 comments on commit 5b45671

Please sign in to comment.