diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index e990c1da6730f..37b9939f90bf8 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -17,11 +17,13 @@ package org.apache.spark.network -import java.io.{FileInputStream, RandomAccessFile, File, InputStream} +import java.io._ import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode +import scala.util.Try + import com.google.common.io.ByteStreams import io.netty.buffer.{ByteBufInputStream, ByteBuf} @@ -71,6 +73,14 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt try { channel = new RandomAccessFile(file, "r").getChannel channel.map(MapMode.READ_ONLY, offset, length) + } catch { + case e: IOException => + Try(channel.size).toOption match { + case Some(fileLen) => + throw new IOException(s"Error in reading $this (actual file length $fileLen)", e) + case None => + throw new IOException(s"Error in opening $this", e) + } } finally { if (channel != null) { channel.close() @@ -79,10 +89,31 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt } override def inputStream(): InputStream = { - val is = new FileInputStream(file) - is.skip(offset) - ByteStreams.limit(is, length) + var is: FileInputStream = null + try { + is = new FileInputStream(file) + is.skip(offset) + ByteStreams.limit(is, length) + } catch { + case e: IOException => + if (is != null) { + is.close() + } + Try(file.length).toOption match { + case Some(fileLen) => + throw new IOException(s"Error in reading $this (actual file length $fileLen)", e) + case None => + throw new IOException(s"Error in opening $this", e) + } + case e: Throwable => + if (is != null) { + is.close() + } + throw e + } } + + override def toString: String = s"${getClass.getName}($file, $offset, $length)" }