Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc #18445

Closed
wants to merge 11 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object JDBCRDD extends Logging {
try {
val rs = statement.executeQuery()
try {
JdbcUtils.getSchema(rs, dialect)
JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
} finally {
rs.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,14 @@ object JdbcUtils extends Logging {
/**
* Takes a [[ResultSet]] and returns its Catalyst schema.
*
* @param alwaysNullable If true, all the columns are nullable.
* @return A [[StructType]] giving the Catalyst schema.
* @throws SQLException if the schema contains an unsupported type.
*/
def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
def getSchema(
resultSet: ResultSet,
dialect: JdbcDialect,
alwaysNullable: Boolean = false): StructType = {
val rsmd = resultSet.getMetaData
val ncols = rsmd.getColumnCount
val fields = new Array[StructField](ncols)
Expand All @@ -290,7 +294,11 @@ object JdbcUtils extends Logging {
rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
}
}
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
val nullable = if (alwaysNullable) {
true
} else {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder()
.putString("name", columnName)
.putLong("scale", fieldScale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter

import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
Expand Down Expand Up @@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
"schema struct<name:string,id:int>"))
}
}

test("SPARK-19726: INSERT null to a NOT NULL column") {
val e = intercept[SparkException] {
sql("INSERT INTO PEOPLE1 values (null, null)")
}.getMessage
assert(e.contains("NULL not allowed for column \"NAME\""))
}
}