Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Minor] Comment improvements in ExternalSorter. #5620

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId}
* probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do
* want to do combining, having an Ordering is more efficient than not having it.
*
* At a high level, this class works as follows:
* Users interact with this class in the following way:
*
* 1. Instantiate an ExternalSorter.
*
* 2. Call insertAll() with a set of records.
*
* 3. Request an iterator() back to traverse sorted/aggregated records.
* - or -
* Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs
* that can be used in Spark's sort shuffle.
*
* At a high level, this class works internally as follows:
*
* - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if
* we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers,
Expand All @@ -65,11 +76,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId}
* aggregation. For each file, we track how many objects were in each partition in memory, so we
* don't have to write out the partition ID for every element.
*
* - When the user requests an iterator, the spilled files are merged, along with any remaining
* in-memory data, using the same sort order defined above (unless both sorting and aggregation
* are disabled). If we need to aggregate by key, we either use a total ordering from the
* ordering parameter, or read the keys with the same hash code and compare them with each other
* for equality to merge values.
* - When the user requests an iterator or file output, the spilled files are merged, along with
* any remaining in-memory data, using the same sort order defined above (unless both sorting
* and aggregation are disabled). If we need to aggregate by key, we either use a total ordering
* from the ordering parameter, or read the keys with the same hash code and compare them with
* each other for equality to merge values.
*
* - Users are expected to call stop() at the end to delete all the intermediate files.
*
Expand Down Expand Up @@ -259,8 +270,8 @@ private[spark] class ExternalSorter[K, V, C](
* Spill our in-memory collection to a sorted file that we can merge later (normal code path).
* We add this file into spilledFiles to find it later.
*
* Alternatively, if bypassMergeSort is true, we spill to separate files for each partition.
* See spillToPartitionedFiles() for that code path.
* This should not be invoked if bypassMergeSort is true. In that case, spillToPartitionedFiles()
* is used to write files for each partition.
*
* @param collection whichever collection we're using (map or buffer)
*/
Expand Down