Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin-apache/master' into SPARK-22465
Browse files Browse the repository at this point in the history
  • Loading branch information
sujithjay committed Dec 16, 2017
2 parents 176270b + 0c8fca4 commit be391a7
Show file tree
Hide file tree
Showing 118 changed files with 6,136 additions and 664 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[spark] class OptionalConfigEntry[T](
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {

override def defaultValueString: String = "<undefined>"
override def defaultValueString: String = ConfigEntry.UNDEFINED

override def readFrom(reader: ConfigReader): Option[T] = {
readString(reader).map(rawValueConverter)
Expand All @@ -149,12 +149,12 @@ private[spark] class OptionalConfigEntry[T](
/**
* A config entry whose default value is defined by another config entry.
*/
private class FallbackConfigEntry[T] (
private[spark] class FallbackConfigEntry[T] (
key: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
private[config] val fallback: ConfigEntry[T])
val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, alternatives,
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {

Expand All @@ -167,6 +167,8 @@ private class FallbackConfigEntry[T] (

private[spark] object ConfigEntry {

val UNDEFINED = "<undefined>"

private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

def registerEntry(entry: ConfigEntry[_]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,11 @@ private[spark] class TaskSetManager(
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
Expand All @@ -502,7 +502,7 @@ private[spark] class TaskSetManager(
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")

sched.dagScheduler.taskStarted(task, info)
new TaskDescription(
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ private[spark] object Utils extends Logging {

/**
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* "k8s://" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* in canCreate to determine if the KubernetesClusterManager should be used.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
Expand All @@ -2770,7 +2770,7 @@ private[spark] object Utils extends Logging {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s:$resolvedURL"
return s"k8s://$resolvedURL"
}

val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
Expand All @@ -2789,7 +2789,7 @@ private[spark] object Utils extends Logging {
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
}

return s"k8s:$resolvedURL"
s"k8s://$resolvedURL"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class SparkSubmitSuite
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.master") should be ("k8s:https://host:port")
conf.get("spark.master") should be ("k8s://https://host:port")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("spark")
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1148,16 +1148,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps === "k8s:https://host:port")
assert(k8sMasterURLHttps === "k8s://https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp === "k8s:http://host:port")
assert(k8sMasterURLHttp === "k8s://http://host:port")

val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s://https://127.0.0.1:8443")

val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s://https://127.0.0.1")

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSource._
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.execution.streaming.SerializedOffset
import org.apache.spark.sql.sources.v2.reader.Offset

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
Expand Down
10 changes: 7 additions & 3 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import numpy as np
from numpy import abs, all, arange, array, array_equal, inf, ones, tile, zeros
import inspect
import py4j

from pyspark import keyword_only, SparkContext
from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer, UnaryTransformer
Expand Down Expand Up @@ -1859,8 +1860,9 @@ class ImageReaderTest2(PySparkTestCase):

@classmethod
def setUpClass(cls):
PySparkTestCase.setUpClass()
super(ImageReaderTest2, cls).setUpClass()
# Note that here we enable Hive's support.
cls.spark = None
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
Expand All @@ -1873,8 +1875,10 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
PySparkTestCase.tearDownClass()
cls.spark.sparkSession.stop()
super(ImageReaderTest2, cls).tearDownClass()
if cls.spark is not None:
cls.spark.sparkSession.stop()
cls.spark = None

def test_read_images_multiple_times(self):
# This test case is to check if `ImageSchema.readImages` tries to
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def parquet(self, *paths):

@ignore_unicode_prefix
@since(1.6)
def text(self, paths):
def text(self, paths, wholetext=False):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand All @@ -313,11 +313,16 @@ def text(self, paths):
Each line in the text file is a new row in the resulting DataFrame.
:param paths: string, or list of strings, for input path(s).
:param wholetext: if true, read each file from input path(s) as a single row.
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value=u'hello'), Row(value=u'this')]
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
>>> df.collect()
[Row(value=u'hello\\nthis')]
"""
self._set_opts(wholetext=wholetext)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s:" prefix here.
val master = sparkConf.get("spark.master").substring("k8s:".length)
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ class SQLConf extends Serializable with Logging {
Option(settings.get(key)).
orElse {
// Try to use the default value
Option(sqlConfEntries.get(key)).map(_.defaultValueString)
Option(sqlConfEntries.get(key)).map { e => e.stringConverter(e.readFrom(reader)) }
}.
getOrElse(throw new NoSuchElementException(key))
}
Expand Down Expand Up @@ -1417,14 +1417,21 @@ class SQLConf extends Serializable with Logging {
* not set yet, return `defaultValue`.
*/
def getConfString(key: String, defaultValue: String): String = {
if (defaultValue != null && defaultValue != "<undefined>") {
if (defaultValue != null && defaultValue != ConfigEntry.UNDEFINED) {
val entry = sqlConfEntries.get(key)
if (entry != null) {
// Only verify configs in the SQLConf object
entry.valueConverter(defaultValue)
}
}
Option(settings.get(key)).getOrElse(defaultValue)
Option(settings.get(key)).getOrElse {
// If the key is not set, need to check whether the config entry is registered and is
// a fallback conf, so that we can check its parent.
sqlConfEntries.get(key) match {
case e: FallbackConfigEntry[_] => getConfString(e.fallback.key, defaultValue)
case _ => defaultValue
}
}
}

/**
Expand All @@ -1440,7 +1447,8 @@ class SQLConf extends Serializable with Logging {
*/
def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
(entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc)
val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
(entry.key, displayValue, entry.doc)
}.toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private void decodeDictionaryIds(
int rowId,
int num,
WritableColumnVector column,
ColumnVector dictionaryIds) {
WritableColumnVector dictionaryIds) {
switch (descriptor.getType()) {
case INT32:
if (column.dataType() == DataTypes.IntegerType ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public int numNulls() {
return accessor.getNullCount();
}

@Override
public boolean anyNullsSet() {
return numNulls() > 0;
}

@Override
public void close() {
if (childColumns != null) {
Expand Down Expand Up @@ -159,11 +154,6 @@ public int[] getInts(int rowId, int count) {
return array;
}

@Override
public int getDictId(int rowId) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Longs
//
Expand Down
Loading

0 comments on commit be391a7

Please sign in to comment.