Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merged branch-2.3 #253

Merged
merged 28 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8080c93
[PYSPARK] Updates to Accumulators
LucaCanali Jul 18, 2018
14b50d7
[SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for T…
Aug 4, 2018
136588e
[SPARK-25015][BUILD] Update Hadoop 2.7 to 2.7.7
srowen Aug 4, 2018
597a77e
Merge branch 'branch-2.3' of github.com:apache/spark into branch-2.3-…
markhamstra Aug 7, 2018
9fb70f4
[SPARK-24948][SHS][BACKPORT-2.3] Delegate check access permissions to…
mgaido91 Aug 8, 2018
7d465d8
[MINOR][BUILD] Update Jetty to 9.3.24.v20180605
srowen Aug 9, 2018
9bfc55b
[SPARK-25076][SQL] SQLConf should not be retrieved from a stopped Spa…
cloud-fan Aug 9, 2018
b426ec5
[SPARK-24950][SQL] DateTimeUtilsSuite daysToMillis and millisToDays f…
d80tb7 Jul 28, 2018
6930f48
Preparing Spark release v2.3.2-rc4
jerryshao Aug 10, 2018
e66f3f9
Preparing development version 2.3.3-SNAPSHOT
jerryshao Aug 10, 2018
7306ac7
[MINOR][BUILD] Add ECCN notice required by http://www.apache.org/dev/…
srowen Aug 10, 2018
04c6520
[SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not …
zsxwing Aug 10, 2018
a0a7e41
[SPARK-24908][R][STYLE] removing spaces to make lintr happy
shaneknapp Jul 24, 2018
b9b35b9
[SPARK-25084][SQL][BACKPORT-2.3] distribute by" on multiple columns (…
LantaoJin Aug 13, 2018
787790b
[SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values
mgaido91 Aug 13, 2018
4dc8225
Preparing Spark release v2.3.2-rc5
jerryshao Aug 14, 2018
29a0403
Preparing development version 2.3.3-SNAPSHOT
jerryshao Aug 14, 2018
0856b82
[MINOR][SQL][DOC] Fix `to_json` example in function description and doc
dongjoon-hyun Aug 14, 2018
34191e6
[SPARK-25051][SQL] FixNullability should not stop on AnalysisBarrier
mgaido91 Aug 14, 2018
032f6d9
[MINOR][DOC][SQL] use one line for annotation arg value
mengxr Aug 18, 2018
ea01e36
[SPARK-25144][SQL][TEST][BRANCH-2.3] Free aggregate map when task ends
cloud-fan Aug 20, 2018
9702bb6
[DOCS] Fixed NDCG formula issues
yueguoguo Aug 20, 2018
8bde467
[SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction betwe…
jiangxb1987 Aug 21, 2018
9cb9d72
[SPARK-25114][2.3][CORE][FOLLOWUP] Fix RecordBinaryComparatorSuite bu…
jiangxb1987 Aug 21, 2018
fcc9bd6
[SPARK-25205][CORE] Fix typo in spark.network.crypto.keyFactoryIterat…
squito Aug 24, 2018
42c1fdd
[SPARK-25234][SPARKR] avoid integer overflow in parallelize
mengxr Aug 24, 2018
f598382
[SPARK-25124][ML] VectorSizeHint setSize and getSize don't return val…
huaxingao Aug 24, 2018
67ff50d
Merge branch 'branch-2.3' of github.com:apache/spark into branch-2.3-…
markhamstra Aug 27, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).


Export Control Notice
---------------------

This distribution includes cryptographic software. The country in which you currently reside may have
restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
<http://www.wassenaar.org/> for more information.

The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
both object code and source code.

The following provides more details on the included cryptographic software:

This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
support authentication, and encryption and decryption of data sent across the network between
services.

This software includes Bouncy Castle (http://bouncycastle.org/) to support the jets3t library.


========================================================================
Common Development and Distribution License 1.0
========================================================================
Expand Down
9 changes: 4 additions & 5 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ parallelize <- function(sc, coll, numSlices = 1) {

sizeLimit <- getMaxAllocationLimit(sc)
objectSize <- object.size(coll)
len <- length(coll)

# For large objects we make sure the size of each slice is also smaller than sizeLimit
numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
if (numSerializedSlices > length(coll))
numSerializedSlices <- length(coll)
numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit)))

# Generate the slice ids to put each row
# For instance, for numSerializedSlices of 22, length of 50
Expand All @@ -153,8 +152,8 @@ parallelize <- function(sc, coll, numSlices = 1) {
splits <- if (numSerializedSlices > 0) {
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
# nolint start
start <- trunc((x * length(coll)) / numSerializedSlices)
end <- trunc(((x + 1) * length(coll)) / numSerializedSlices)
start <- trunc((as.numeric(x) * len) / numSerializedSlices)
end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
# nolint end
rep(start, end - start)
}))
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/inst/tests/testthat/test_basic.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
tryCatch( checkJavaVersion(),
tryCatch(checkJavaVersion(),
error = function(e) { skip("error on Java check") },
warning = function(e) { skip("warning on Java check") } )
warning = function(e) { skip("warning on Java check") })

sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)
Expand Down Expand Up @@ -54,9 +54,9 @@ test_that("create DataFrame from list or data.frame", {
})

test_that("spark.glm and predict", {
tryCatch( checkJavaVersion(),
tryCatch(checkJavaVersion(),
error = function(e) { skip("error on Java check") },
warning = function(e) { skip("warning on Java check") } )
warning = function(e) { skip("warning on Java check") })

sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE,
sparkConfig = sparkRTestConfig)
Expand Down
7 changes: 7 additions & 0 deletions R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,10 @@ test_that("add and get file to be downloaded with Spark job on every node", {
unlink(path, recursive = TRUE)
sparkR.session.stop()
})

test_that("SPARK-25234: parallelize should not have integer overflow", {
sc <- sparkR.sparkContext(master = sparkRTestMaster)
# 47000 * 47000 exceeds integer range
parallelize(sc, 1:47000, 47000)
sparkR.session.stop()
})
2 changes: 1 addition & 1 deletion assembly/README
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command

If you need to build an assembly for a different version of Hadoop the
hadoop-version system property needs to be set as in this example:
-Dhadoop.version=2.7.3
-Dhadoop.version=2.7.7
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public String keyFactoryAlgorithm() {
* (128 bits by default), which is not generally the case with user passwords.
*/
public int keyFactoryIterations() {
return conf.getInt("spark.networy.crypto.keyFactoryIterations", 1024);
return conf.getInt("spark.network.crypto.keyFactoryIterations", 1024);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
*/
private int usableCapacity = 0;

private int initialSize;
private final int initialSize;

ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
this.consumer = consumer;
Expand Down Expand Up @@ -95,12 +95,20 @@ public int numRecords() {
}

public void reset() {
// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
pos = 0;
if (consumer != null) {
consumer.freeArray(array);
// As `array` has been released, we should set it to `null` to avoid accessing it before
// `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
// data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
// ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
// `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError).
array = null;
usableCapacity = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
pos = 0;
}

public void expandPointerArray(LongArray newArray) {
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,9 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
*/
private[spark] class PythonAccumulatorV2(
@transient private val serverHost: String,
private val serverPort: Int)
extends CollectionAccumulator[Array[Byte]] {
private val serverPort: Int,
private val secretToken: String)
extends CollectionAccumulator[Array[Byte]] with Logging{

Utils.checkHost(serverHost)

Expand All @@ -602,12 +603,17 @@ private[spark] class PythonAccumulatorV2(
private def openSocket(): Socket = synchronized {
if (socket == null || socket.isClosed) {
socket = new Socket(serverHost, serverPort)
logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort")
// send the secret just for the initial authentication when opening a new connection
socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8))
}
socket
}

// Need to override so the types match with PythonFunction
override def copyAndReset(): PythonAccumulatorV2 = new PythonAccumulatorV2(serverHost, serverPort)
override def copyAndReset(): PythonAccumulatorV2 = {
new PythonAccumulatorV2(serverHost, serverPort, secretToken)
}

override def merge(other: AccumulatorV2[Array[Byte], JList[Array[Byte]]]): Unit = synchronized {
val otherPythonAccumulator = other.asInstanceOf[PythonAccumulatorV2]
Expand Down
23 changes: 0 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -378,28 +377,6 @@ class SparkHadoopUtil extends Logging {
buffer.toString
}

private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
val perm = status.getPermission
val ugi = UserGroupInformation.getCurrentUser

if (ugi.getShortUserName == status.getOwner) {
if (perm.getUserAction.implies(mode)) {
return true
}
} else if (ugi.getGroupNames.contains(status.getGroup)) {
if (perm.getGroupAction.implies(mode)) {
return true
}
} else if (perm.getOtherAction.implies(mode)) {
return true
}

logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}

def serialize(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.util.{Date, ServiceLoader, UUID}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.util.Try
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -111,7 +111,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = new Path(logDir).getFileSystem(hadoopConf)
// Visible for testing
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)

// Used by check event thread and clean log thread.
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
Expand Down Expand Up @@ -155,6 +156,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

private val blacklist = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()

/**
Expand Down Expand Up @@ -412,7 +432,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
!isBlacklisted(entry.getPath)
}
.filter { entry =>
try {
Expand Down Expand Up @@ -446,32 +466,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
}

val tasks = updated.map { entry =>
val tasks = updated.flatMap { entry =>
try {
replayExecutor.submit(new Runnable {
val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
})
}, Unit)
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
null
None
}
}.filter(_ != null)
}

pendingReplayTasksCount.addAndGet(tasks.size)

// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
tasks.foreach { task =>
tasks.foreach { case (task, path) =>
try {
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $path", e.getCause)
blacklist(path)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
Expand Down Expand Up @@ -694,6 +719,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}
// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

/**
Expand Down Expand Up @@ -871,13 +898,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private def deleteLog(log: Path): Unit = {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
if (isBlacklisted(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;

import org.apache.spark.unsafe.memory.MemoryBlock;

public class TestMemoryConsumer extends MemoryConsumer {
public TestMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) {
super(memoryManager, 1024L, mode);
Expand All @@ -43,6 +45,11 @@ void free(long size) {
used -= size;
taskMemoryManager.releaseExecutionMemory(size, this);
}

public void freePage(MemoryBlock page) {
used -= page.size();
taskMemoryManager.freePage(page, this);
}
}


Loading