Skip to content

Commit

Permalink
Merge branch 'master' into pipe-syntax-projections
Browse files Browse the repository at this point in the history
sync
  • Loading branch information
dtenedor committed Nov 18, 2024
2 parents 09b04ed + b626528 commit 5b1c047
Show file tree
Hide file tree
Showing 279 changed files with 16,271 additions and 11,493 deletions.
27 changes: 12 additions & 15 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,12 @@
],
"sqlState" : "56000"
},
"CIRCULAR_CLASS_REFERENCE" : {
"message" : [
"Cannot have circular references in class, but got the circular reference of class <t>."
],
"sqlState" : "42602"
},
"CLASS_NOT_OVERRIDE_EXPECTED_METHOD" : {
"message" : [
"<className> must override either <method1> or <method2>."
Expand Down Expand Up @@ -3411,6 +3417,12 @@
],
"sqlState" : "42K0L"
},
"LABEL_ALREADY_EXISTS" : {
"message" : [
"The label <label> already exists. Choose another name or rename the existing label."
],
"sqlState" : "42K0L"
},
"LOAD_DATA_PATH_NOT_EXISTS" : {
"message" : [
"LOAD DATA input path does not exist: <path>."
Expand Down Expand Up @@ -6394,11 +6406,6 @@
"Partition column `<col>` not found in schema <schemaCatalog>."
]
},
"_LEGACY_ERROR_TEMP_1156" : {
"message" : [
"Column <colName> not found in schema <tableSchema>."
]
},
"_LEGACY_ERROR_TEMP_1158" : {
"message" : [
"Saving data into a view is not allowed."
Expand Down Expand Up @@ -7325,16 +7332,6 @@
"Exception when registering StreamingQueryListener."
]
},
"_LEGACY_ERROR_TEMP_2138" : {
"message" : [
"Cannot have circular references in bean class, but got the circular reference of class <clazz>."
]
},
"_LEGACY_ERROR_TEMP_2139" : {
"message" : [
"cannot have circular references in class, but got the circular reference of class <t>."
]
},
"_LEGACY_ERROR_TEMP_2144" : {
"message" : [
"Unable to find constructor for <tpe>. This could happen if <tpe> is an interface, or a trait without companion object constructor."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ private[spark] object LogKeys {
case object RPC_ENDPOINT_REF extends LogKey
case object RPC_MESSAGE_CAPACITY extends LogKey
case object RPC_SSL_ENABLED extends LogKey
case object RULE_EXECUTOR_NAME extends LogKey
case object RULE_NAME extends LogKey
case object RUN_ID extends LogKey
case object SCALA_VERSION extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[sql] case class AvroDataToCatalyst(
@transient private lazy val nullResultRow: Any = dataType match {
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
for(i <- 0 until st.length) {
for (i <- 0 until st.length) {
resultRow.setNullAt(i)
}
resultRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,46 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
connection.prepareStatement("CREATE TABLE array_timestamptz (col timestamptz[])")
.executeUpdate()

connection.prepareStatement("INSERT INTO array_int VALUES (array[array[10]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]])")
connection.prepareStatement("INSERT INTO array_int VALUES (array[10]), (array[array[10]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_boolean VALUES (array[array[true]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_float VALUES (array[array[10.5]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_double VALUES (array[array[10.1]])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_timestamp VALUES (" +
"array[array['2022-01-01 09:15'::timestamp]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_bigint VALUES (array[10]), " +
"(array[array[10]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_smallint VALUES (array[10]), " +
"(array[array[10]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_boolean VALUES (array[true]), " +
"(array[array[true]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_float VALUES (array[10.5]), " +
"(array[array[10.5]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_double VALUES (array[10.1]), " +
"(array[array[10.1]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_timestamp VALUES " +
"(array['2022-01-01 09:15'::timestamp]), " +
"(array[array['2022-01-01 09:15'::timestamp]])").executeUpdate()
connection.prepareStatement("INSERT INTO array_timestamptz VALUES " +
"(array['2022-01-01 09:15'::timestamptz]), " +
"(array[array['2022-01-01 09:15'::timestamptz]])").executeUpdate()
connection.prepareStatement(
"CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)")
.executeUpdate()

connection.prepareStatement("CREATE TABLE array_of_int (col int[])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_of_int " +
"VALUES (array[1])").executeUpdate()
connection.prepareStatement("CREATE TABLE ctas_array_of_int " +
"AS SELECT * FROM array_of_int").executeUpdate()

connection.prepareStatement("CREATE TABLE array_of_array_of_int (col int[][])")
.executeUpdate()
connection.prepareStatement("INSERT INTO array_of_array_of_int " +
"VALUES (array[array[1],array[2]])").executeUpdate()
connection.prepareStatement("CREATE TABLE ctas_array_of_array_of_int " +
"AS SELECT * FROM array_of_array_of_int").executeUpdate()

connection.prepareStatement("CREATE TABLE unsupported_array_of_array_of_int (col int[][])")
.executeUpdate()
connection.prepareStatement("INSERT INTO unsupported_array_of_array_of_int " +
"VALUES (array[array[1],array[2]]), (array[3])").executeUpdate()
}

test("Test multi-dimensional column types") {
Expand Down Expand Up @@ -302,4 +324,34 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
assert(rows10(0).getString(0) === "amy")
assert(rows10(1).getString(0) === "alex")
}

test("Test reading 2d array from table created via CTAS command - positive test") {
val dfNoCTASTable = sql(s"SELECT * FROM $catalogName.array_of_int")
val dfWithCTASTable = sql(s"SELECT * FROM $catalogName.ctas_array_of_int")

checkAnswer(dfWithCTASTable, dfNoCTASTable.collect())
}

test("Test reading 2d array from table created via CTAS command - negative test") {
val dfNoCTASTable = sql(s"SELECT * FROM $catalogName.array_of_int")

checkError(
exception = intercept[org.apache.spark.SparkSQLException] {
// This should fail as only 1D CTAS tables are supported
sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int").collect()
},
condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH",
parameters = Map("pos" -> "0", "type" -> "\"ARRAY<INT>\"")
)
}

test("Test reading multiple dimension array from table created via CTAS command") {
checkError(
exception = intercept[org.apache.spark.SparkSQLException] {
sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect()
},
condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH",
parameters = Map("pos" -> "0", "type" -> "\"ARRAY<ARRAY<INT>>\"")
)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private[spark] object SerDe {
writeType(dos, "map")
writeInt(dos, v.size)
val iter = v.entrySet.iterator
while(iter.hasNext) {
while (iter.hasNext) {
val entry = iter.next
val key = entry.getKey
val value = entry.getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| Spark Connect only:
| --remote CONNECT_URL URL to connect to the server for Spark Connect, e.g.,
| sc://host:port. --master and --deploy-mode cannot be set
| together with this option. This option is experimental, and
| might change between minor releases.
| together with this option.
|
| Cluster deploy mode only:
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ object HistoryServer extends Logging {
ShutdownHookManager.addShutdownHook { () => server.stop() }

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
while (true) { Thread.sleep(Int.MaxValue) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[spark] class HadoopDelegationTokenManager(
creds.addAll(newTokens)
}
})
if(!currentUser.equals(freshUGI)) {
if (!currentUser.equals(freshUGI)) {
FileSystem.closeAllForUGI(freshUGI)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private object PipedRDD {
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
val tok = new StringTokenizer(command)
while(tok.hasMoreElements) {
while (tok.hasMoreElements) {
buf += tok.nextToken()
}
buf.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ private[spark] class TaskSetManager(
val info = taskInfos(tid)
// SPARK-37300: when the task was already finished state, just ignore it,
// so that there won't cause successful and tasksSuccessful wrong result.
if(info.finished) {
if (info.finished) {
if (dropTaskInfoAccumulablesOnTaskCompletion) {
// SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable
// lifetime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false
f: BlockInfo => Boolean): Option[BlockInfo] = {
var done = false
var result: Option[BlockInfo] = None
while(!done) {
while (!done) {
val wrapper = blockInfoWrappers.get(blockId)
if (wrapper == null) {
done = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ private[spark] class BlockManager(
return true
}

if(master.isRDDBlockVisible(blockId)) {
if (master.isRDDBlockVisible(blockId)) {
// Cache the visibility status if block exists.
blockInfoManager.tryMarkBlockAsVisible(blockId)
true
Expand Down Expand Up @@ -1882,7 +1882,7 @@ private[spark] class BlockManager(
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailureCount &&
while (numFailures <= maxReplicationFailureCount &&
peersForReplication.nonEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BitSet(numBits: Int) extends Serializable {
def setUntil(bitIndex: Int): Unit = {
val wordIndex = bitIndex >> 6 // divide by 64
Arrays.fill(words, 0, wordIndex, -1)
if(wordIndex < words.length) {
if (wordIndex < words.length) {
// Set the remaining bits (note that the mask could still be zero)
val mask = ~(-1L << (bitIndex & 0x3f))
words(wordIndex) |= mask
Expand All @@ -58,7 +58,7 @@ class BitSet(numBits: Int) extends Serializable {
def clearUntil(bitIndex: Int): Unit = {
val wordIndex = bitIndex >> 6 // divide by 64
Arrays.fill(words, 0, wordIndex, 0)
if(wordIndex < words.length) {
if (wordIndex < words.length) {
// Clear the remaining bits
val mask = -1L << (bitIndex & 0x3f)
words(wordIndex) &= mask
Expand All @@ -75,7 +75,7 @@ class BitSet(numBits: Int) extends Serializable {
assert(newBS.numWords >= numWords)
assert(newBS.numWords >= other.numWords)
var ind = 0
while( ind < smaller ) {
while (ind < smaller) {
newBS.words(ind) = words(ind) & other.words(ind)
ind += 1
}
Expand All @@ -92,15 +92,15 @@ class BitSet(numBits: Int) extends Serializable {
assert(newBS.numWords >= other.numWords)
val smaller = math.min(numWords, other.numWords)
var ind = 0
while( ind < smaller ) {
while (ind < smaller) {
newBS.words(ind) = words(ind) | other.words(ind)
ind += 1
}
while( ind < numWords ) {
while (ind < numWords) {
newBS.words(ind) = words(ind)
ind += 1
}
while( ind < other.numWords ) {
while (ind < other.numWords) {
newBS.words(ind) = other.words(ind)
ind += 1
}
Expand Down Expand Up @@ -242,7 +242,7 @@ class BitSet(numBits: Int) extends Serializable {
def union(other: BitSet): Unit = {
require(this.numWords <= other.numWords)
var ind = 0
while( ind < this.numWords ) {
while (ind < this.numWords) {
this.words(ind) = this.words(ind) | other.words(ind)
ind += 1
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
test("randomSplit") {
val n = 600
val data = sc.parallelize(1 to n, 2)
for(seed <- 1 to 5) {
for (seed <- 1 to 5) {
val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed)
assert(splits.length == 3, "wrong number of splits")
assert(splits.flatMap(_.collect()).sorted.toList == data.collect().toList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplication
assert(prioritizedPeers.toSet.size == numReplicas)
val priorityPeers = prioritizedPeers.take(2)
assert(priorityPeers.forall(p => p.host != blockManager.host))
if(numReplicas > 1) {
if (numReplicas > 1) {
// both these conditions should be satisfied when numReplicas > 1
assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo))
assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo))
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ opencsv/2.3//opencsv-2.3.jar
opentracing-api/0.33.0//opentracing-api-0.33.0.jar
opentracing-noop/0.33.0//opentracing-noop-0.33.0.jar
opentracing-util/0.33.0//opentracing-util-0.33.0.jar
orc-core/2.0.2/shaded-protobuf/orc-core-2.0.2-shaded-protobuf.jar
orc-core/2.0.3/shaded-protobuf/orc-core-2.0.3-shaded-protobuf.jar
orc-format/1.0.0/shaded-protobuf/orc-format-1.0.0-shaded-protobuf.jar
orc-mapreduce/2.0.2/shaded-protobuf/orc-mapreduce-2.0.2-shaded-protobuf.jar
orc-shims/2.0.2//orc-shims-2.0.2.jar
orc-mapreduce/2.0.3/shaded-protobuf/orc-mapreduce-2.0.3-shaded-protobuf.jar
orc-shims/2.0.3//orc-shims-2.0.3.jar
oro/2.0.8//oro-2.0.8.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
Expand Down
2 changes: 2 additions & 0 deletions docs/_data/menu-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,7 @@
url: sql-ref-syntax.html#data-retrieval-statements
- text: Auxiliary Statements
url: sql-ref-syntax.html#auxiliary-statements
- text: Pipe Syntax
url: sql-pipe-syntax.html
- text: Error Conditions
url: sql-error-conditions.html
2 changes: 1 addition & 1 deletion docs/sql-data-sources-hive-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used
<td><code>2.3.10</code></td>
<td>
Version of the Hive metastore. Available
options are <code>2.0.0</code> through <code>2.3.10</code> and <code>3.0.0</code> through <code>3.1.3</code>.
options are <code>2.0.0</code> through <code>2.3.10</code>, <code>3.0.0</code> through <code>3.1.3</code>, and <code>4.0.0</code> through <code>4.0.1</code>.
</td>
<td>1.4.0</td>
</tr>
Expand Down
Loading

0 comments on commit 5b1c047

Please sign in to comment.