Skip to content

Commit

Permalink
Simplify the usage of FileChannel
Browse files Browse the repository at this point in the history
Change-Id: Ibb0036d0ac88c01310cba817da0bb40535c12351
  • Loading branch information
jerryshao committed Jul 20, 2017
1 parent f2d534a commit 6d7224c
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.nio.file.StandardOpenOption;

import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
import io.netty.channel.DefaultFileRegion;

Expand Down Expand Up @@ -134,8 +133,7 @@ public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = FileChannel.open(file.toPath(),
ImmutableSet.of(StandardOpenOption.READ));
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
return new DefaultFileRegion(fileChannel, offset, length);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import scala.collection.Iterator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,7 +75,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
Expand Down Expand Up @@ -108,7 +106,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
Expand Down Expand Up @@ -189,15 +186,16 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
return lengths;
}

final FileChannel out = FileChannel.open(outputFile.toPath(),
ImmutableSet.of(WRITE, APPEND, CREATE));
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileChannel in = FileChannel.open(file.toPath(), ImmutableSet.of(READ));
final FileChannel in = FileChannel.open(file.toPath(), READ);
boolean copyThrewException = true;
try {
long size = in.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import scala.reflect.ClassTag$;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
Expand Down Expand Up @@ -444,12 +443,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
boolean threwException = true;
try {
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), ImmutableSet.of(READ));
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(),
ImmutableSet.of(WRITE, CREATE, APPEND));
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);

long bytesWrittenToMergedFile = 0;
for (int partition = 0; partition < numPartitions; partition++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.BufferedIterator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.collect.ImmutableSet
import com.google.common.io.ByteStreams

import org.apache.spark.{SparkEnv, TaskContext}
Expand Down Expand Up @@ -486,7 +485,7 @@ class ExternalAppendOnlyMap[K, V, C](
}

val start = batchOffsets(batchIndex)
fileChannel = FileChannel.open(file.toPath, ImmutableSet.of(StandardOpenOption.READ))
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
batchIndex += 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.Comparator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.collect.ImmutableSet
import com.google.common.io.ByteStreams

import org.apache.spark._
Expand Down Expand Up @@ -514,7 +513,7 @@ private[spark] class ExternalSorter[K, V, C](
}

val start = batchOffsets(batchId)
fileChannel = FileChannel.open(spill.file.toPath, ImmutableSet.of(StandardOpenOption.READ))
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
batchId += 1

Expand Down

0 comments on commit 6d7224c

Please sign in to comment.