Skip to content

Commit

Permalink
Merge pull request apache-spark-on-k8s#273 from palantir/aash/resync-…
Browse files Browse the repository at this point in the history
…apache2

[NOSQUASH] Resync Apache
  • Loading branch information
ash211 authored Oct 13, 2017
2 parents 7bfa380 + 373e355 commit 46376cb
Show file tree
Hide file tree
Showing 164 changed files with 2,390 additions and 584 deletions.
8 changes: 4 additions & 4 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2538,7 +2538,7 @@ test_that("describe() and summary() on a DataFrame", {

stats2 <- summary(df)
expect_equal(collect(stats2)[5, "summary"], "25%")
expect_equal(collect(stats2)[5, "age"], "30")
expect_equal(collect(stats2)[5, "age"], "19")

stats3 <- summary(df, "min", "max", "55.1%")

Expand Down Expand Up @@ -2738,7 +2738,7 @@ test_that("sampleBy() on a DataFrame", {
})

test_that("approxQuantile() on a DataFrame", {
l <- lapply(c(0:99), function(i) { list(i, 99 - i) })
l <- lapply(c(0:100), function(i) { list(i, 100 - i) })
df <- createDataFrame(l, list("a", "b"))
quantiles <- approxQuantile(df, "a", c(0.5, 0.8), 0.0)
expect_equal(quantiles, list(50, 80))
Expand All @@ -2749,8 +2749,8 @@ test_that("approxQuantile() on a DataFrame", {
dfWithNA <- createDataFrame(data.frame(a = c(NA, 30, 19, 11, 28, 15),
b = c(-30, -19, NA, -11, -28, -15)))
quantiles3 <- approxQuantile(dfWithNA, c("a", "b"), c(0.5), 0.0)
expect_equal(quantiles3[[1]], list(28))
expect_equal(quantiles3[[2]], list(-15))
expect_equal(quantiles3[[1]], list(19))
expect_equal(quantiles3[[2]], list(-19))
})

test_that("SQL error message is returned from JVM", {
Expand Down
4 changes: 3 additions & 1 deletion bin/beeline.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

cmd /V /E /C "%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*
rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*"
4 changes: 3 additions & 1 deletion bin/pyspark.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ rem
rem This is the entry point for running PySpark. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0pyspark2.cmd" %*
rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0pyspark2.cmd" %*"
5 changes: 4 additions & 1 deletion bin/run-example.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ rem

set SPARK_HOME=%~dp0..
set _SPARK_CMD_USAGE=Usage: ./bin/run-example [options] example-class [example args]
cmd /V /E /C "%~dp0spark-submit.cmd" run-example %*

rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0spark-submit.cmd" run-example %*"
4 changes: 3 additions & 1 deletion bin/spark-class.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ rem
rem This is the entry point for running a Spark class. To avoid polluting
rem the environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0spark-class2.cmd" %*
rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0spark-class2.cmd" %*"
4 changes: 3 additions & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ rem
rem This is the entry point for running Spark shell. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0spark-shell2.cmd" %*
rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0spark-shell2.cmd" %*"
4 changes: 3 additions & 1 deletion bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0spark-submit2.cmd" %*
rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0spark-submit2.cmd" %*"
4 changes: 3 additions & 1 deletion bin/sparkR.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ rem
rem This is the entry point for running SparkR. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C "%~dp0sparkR2.cmd" %*
rem The outermost quotes are used to prevent Windows command line parse error
rem when there are some quotes in parameters, see SPARK-21877.
cmd /V /E /C ""%~dp0sparkR2.cmd" %*"
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ public ShuffleSecretManager() {
* fetching shuffle files written by other executors in this application.
*/
public void registerApp(String appId, String shuffleSecret) {
if (!shuffleSecretMap.containsKey(appId)) {
shuffleSecretMap.put(appId, shuffleSecret);
logger.info("Registered shuffle secret for application {}", appId);
} else {
logger.debug("Application {} already registered", appId);
}
// Always put the new secret information to make sure it's the most up to date.
// Otherwise we have to specifically look at the application attempt in addition
// to the applicationId since the secrets change between application attempts on yarn.
shuffleSecretMap.put(appId, shuffleSecret);
logger.info("Registered shuffle secret for application {}", appId);
}

/**
Expand All @@ -67,12 +66,8 @@ public void registerApp(String appId, ByteBuffer shuffleSecret) {
* This is called when the application terminates.
*/
public void unregisterApp(String appId) {
if (shuffleSecretMap.containsKey(appId)) {
shuffleSecretMap.remove(appId);
logger.info("Unregistered shuffle secret for application {}", appId);
} else {
logger.warn("Attempted to unregister application {} when it is not registered", appId);
}
shuffleSecretMap.remove(appId);
logger.info("Unregistered shuffle secret for application {}", appId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.sasl;

import java.nio.ByteBuffer;

import org.junit.Test;
import static org.junit.Assert.*;

public class ShuffleSecretManagerSuite {
static String app1 = "app1";
static String app2 = "app2";
static String pw1 = "password1";
static String pw2 = "password2";
static String pw1update = "password1update";
static String pw2update = "password2update";

@Test
public void testMultipleRegisters() {
ShuffleSecretManager secretManager = new ShuffleSecretManager();
secretManager.registerApp(app1, pw1);
assertEquals(pw1, secretManager.getSecretKey(app1));
secretManager.registerApp(app2, ByteBuffer.wrap(pw2.getBytes()));
assertEquals(pw2, secretManager.getSecretKey(app2));

// now update the password for the apps and make sure it takes affect
secretManager.registerApp(app1, pw1update);
assertEquals(pw1update, secretManager.getSecretKey(app1));
secretManager.registerApp(app2, ByteBuffer.wrap(pw2update.getBytes()));
assertEquals(pw2update, secretManager.getSecretKey(app2));

secretManager.unregisterApp(app1);
assertNull(secretManager.getSecretKey(app1));
assertEquals(pw2update, secretManager.getSecretKey(app2));

secretManager.unregisterApp(app2);
assertNull(secretManager.getSecretKey(app2));
assertNull(secretManager.getSecretKey(app1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
}
}

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller.
// Be conservative and lower the cap a little.
// Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229"
// This value is word rounded. Use this value if the allocated byte arrays are used to store other
// types rather than bytes.
public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;

private static final boolean unaligned = Platform.unaligned();
/**
* Optimized byte array equality check for byte arrays.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.unsafe.map;

import org.apache.spark.unsafe.array.ByteArrayMethods;

/**
* Interface that defines how we can grow the size of a hash map when it is over a threshold.
*/
Expand All @@ -31,9 +33,7 @@ public interface HashMapGrowthStrategy {

class Doubling implements HashMapGrowthStrategy {

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;

@Override
public int nextCapacity(int currentCapacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
}
}

@VisibleForTesting boolean hasSpaceForAnotherRecord() {
return inMemSorter.hasSpaceForAnotherRecord();
}

private static void spillIterator(UnsafeSorterIterator inMemIterator,
UnsafeSorterSpillWriter spillWriter) throws IOException {
while (inMemIterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,25 @@ private int getUsableCapacity() {
*/
public void free() {
if (consumer != null) {
consumer.freeArray(array);
if (array != null) {
consumer.freeArray(array);
}
array = null;
}
}

public void reset() {
if (consumer != null) {
consumer.freeArray(array);
// the call to consumer.allocateArray may trigger a spill which in turn access this instance
// and eventually re-enter this method and try to free the array again. by setting the array
// to null and its length to 0 we effectively make the spill code-path a no-op. setting the
// array to null also indicates that it has already been de-allocated which prevents a double
// de-allocation in free().
array = null;
usableCapacity = 0;
pos = 0;
nullBoundaryPos = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
Expand Down
40 changes: 6 additions & 34 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ class SparkContext(config: SparkConf) extends Logging {
_statusTracker = new SparkStatusTracker(this)

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
Expand Down Expand Up @@ -2371,41 +2371,13 @@ class SparkContext(config: SparkConf) extends Logging {
* (e.g. after the web UI and event logging listeners have been registered).
*/
private def setupAndStartListenerBus(): Unit = {
// Use reflection to instantiate listeners specified via `spark.extraListeners`
try {
val listenerClassNames: Seq[String] =
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
val listenerClass = Utils.classForName(className)
listenerClass
.getConstructors
.asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
conf.get(EXTRA_LISTENERS).foreach { classNames =>
val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
listeners.foreach { listener =>
listenerBus.addToSharedQueue(listener)
logInfo(s"Registered listener ${listener.getClass().getName()}")
}
val constructorTakingSparkConf = constructors.find { c =>
c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
}
lazy val zeroArgumentConstructor = constructors.find { c =>
c.getParameterTypes.isEmpty
}
val listener: SparkListenerInterface = {
if (constructorTakingSparkConf.isDefined) {
constructorTakingSparkConf.get.newInstance(conf)
} else if (zeroArgumentConstructor.isDefined) {
zeroArgumentConstructor.get.newInstance()
} else {
throw new SparkException(
s"$className did not have a zero-argument constructor or a" +
" single-argument constructor that accepts SparkConf. Note: if the class is" +
" defined inside of another Scala class, then its constructors may accept an" +
" implicit parameter that references the enclosing class; in this case, you must" +
" define the listener as a top-level class in order to prevent this extra" +
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
listenerBus.addToSharedQueue(listener)
logInfo(s"Registered listener $className")
}
} catch {
case e: Exception =>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ private class ClientEndpoint(
driverArgs.cores,
driverArgs.supervise,
command)
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

case "kill" =>
val driverId = driverArgs.driverId
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}

/**
* Send the message to master and forward the reply to self asynchronously.
*/
private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
for (masterEndpoint <- masterEndpoints) {
masterEndpoint.ask[T](message).onComplete {
case Success(v) => self.send(v)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

// In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
sysProps(UI_SHOW_CONSOLE_PROGRESS.key) = "true"
}

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// In Kubernetes cluster mode, the jar will be uploaded by the client separately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private[spark] class StandaloneAppClient(

case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
logInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort,
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,13 @@ private[deploy] class Master(
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
* worker by default, in which case only one executor per application may be launched on each
* worker during one single schedule iteration.
* Note that when `spark.executor.cores` is not set, we may still launch multiple executors from
* the same application on the same worker. Consider appA and appB both have one executor running
* on worker1, and appA.coresLeft > 0, then appB is finished and release all its cores on worker1,
* thus for the next schedule iteration, appA launches a new executor that grabs all the free
* cores on worker1, therefore we get multiple executors from appA running on worker1.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ private[rest] class StandaloneSubmitRequestServlet(
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
val superviseDriver = sparkProperties.get("spark.driver.supervise")
val appArgs = request.appArgs
val environmentVariables = request.environmentVariables
// Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set on the remote system.
val environmentVariables =
request.environmentVariables.filterNot(x => x._1.matches("SPARK_LOCAL_(IP|HOSTNAME)"))

// Construct driver description
val conf = new SparkConf(false)
Expand Down
Loading

0 comments on commit 46376cb

Please sign in to comment.