diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index a0adeed994456..d7979f095da76 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -214,6 +214,7 @@ def __repr__(self): def _create_batch(series): + from pyspark.sql.types import _check_series_convert_timestamps_internal import pyarrow as pa # Make input conform to [(series1, type1), (series2, type2), ...] if not isinstance(series, (list, tuple)) or \ @@ -224,12 +225,25 @@ def _create_batch(series): # If a nullable integer series has been promoted to floating point with NaNs, need to cast # NOTE: this is not necessary with Arrow >= 0.7 def cast_series(s, t): - if t is None or s.dtype == t.to_pandas_dtype(): + if type(t) == pa.TimestampType: + # NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680 + return _check_series_convert_timestamps_internal(s.fillna(0))\ + .values.astype('datetime64[us]', copy=False) + elif t == pa.date32(): + # TODO: this converts the series to Python objects, possibly avoid with Arrow >= 0.8 + return s.dt.date + elif t is None or s.dtype == t.to_pandas_dtype(): return s else: return s.fillna(0).astype(t.to_pandas_dtype(), copy=False) - arrs = [pa.Array.from_pandas(cast_series(s, t), mask=s.isnull(), type=t) for s, t in series] + # Some object types don't support masks in Arrow, see ARROW-1721 + def create_array(s, t): + casted = cast_series(s, t) + mask = None if casted.dtype == 'object' else s.isnull() + return pa.Array.from_pandas(casted, mask=mask, type=t) + + arrs = [create_array(s, t) for s, t in series] return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in xrange(len(arrs))]) @@ -260,11 +274,13 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: - table = pa.Table.from_batches([batch]) - yield [c.to_pandas() for c in table.itercolumns()] + # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 + pdf = _check_dataframe_localize_timestamps(batch.to_pandas()) + yield [c for _, c in pdf.iteritems()] def __repr__(self): return "ArrowStreamPandasSerializer" diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 38b01f0011671..5ad53cff3cf64 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1880,11 +1880,13 @@ def toPandas(self): import pandas as pd if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) - return table.to_pandas() + pdf = table.to_pandas() + return _check_dataframe_localize_timestamps(pdf) else: return pd.DataFrame.from_records([], columns=self.columns) except ImportError as e: @@ -1952,6 +1954,7 @@ def _to_corrected_pandas_type(dt): """ When converting Spark SQL records to Pandas DataFrame, the inferred data type may be wrong. This method gets the corrected data type for Pandas if that type may be inferred uncorrectly. + NOTE: DateType is inferred incorrectly as 'object', TimestampType is correct with datetime64[ns] """ import numpy as np if type(dt) == ByteType: @@ -1962,6 +1965,8 @@ def _to_corrected_pandas_type(dt): return np.int32 elif type(dt) == FloatType: return np.float32 + elif type(dt) == DateType: + return 'datetime64[ns]' else: return None diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 685eebcafefba..98afae662b42d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3086,18 +3086,38 @@ class ArrowTests(ReusedPySparkTestCase): @classmethod def setUpClass(cls): + from datetime import datetime ReusedPySparkTestCase.setUpClass() + + # Synchronize default timezone between Python and Java + cls.tz_prev = os.environ.get("TZ", None) # save current tz if set + tz = "America/Los_Angeles" + os.environ["TZ"] = tz + time.tzset() + cls.spark = SparkSession(cls.sc) + cls.spark.conf.set("spark.sql.session.timeZone", tz) cls.spark.conf.set("spark.sql.execution.arrow.enabled", "true") cls.schema = StructType([ StructField("1_str_t", StringType(), True), StructField("2_int_t", IntegerType(), True), StructField("3_long_t", LongType(), True), StructField("4_float_t", FloatType(), True), - StructField("5_double_t", DoubleType(), True)]) - cls.data = [("a", 1, 10, 0.2, 2.0), - ("b", 2, 20, 0.4, 4.0), - ("c", 3, 30, 0.8, 6.0)] + StructField("5_double_t", DoubleType(), True), + StructField("6_date_t", DateType(), True), + StructField("7_timestamp_t", TimestampType(), True)]) + cls.data = [("a", 1, 10, 0.2, 2.0, datetime(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), + ("b", 2, 20, 0.4, 4.0, datetime(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), + ("c", 3, 30, 0.8, 6.0, datetime(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] + + @classmethod + def tearDownClass(cls): + del os.environ["TZ"] + if cls.tz_prev is not None: + os.environ["TZ"] = cls.tz_prev + time.tzset() + ReusedPySparkTestCase.tearDownClass() + cls.spark.stop() def assertFramesEqual(self, df_with_arrow, df_without): msg = ("DataFrame from Arrow is not equal" + @@ -3106,8 +3126,8 @@ def assertFramesEqual(self, df_with_arrow, df_without): self.assertTrue(df_without.equals(df_with_arrow), msg=msg) def test_unsupported_datatype(self): - schema = StructType([StructField("dt", DateType(), True)]) - df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], schema=schema) + schema = StructType([StructField("decimal", DecimalType(), True)]) + df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): self.assertRaises(Exception, lambda: df.toPandas()) @@ -3385,13 +3405,77 @@ def test_vectorized_udf_varargs(self): def test_vectorized_udf_unsupported_types(self): from pyspark.sql.functions import pandas_udf, col - schema = StructType([StructField("dt", DateType(), True)]) - df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], schema=schema) - f = pandas_udf(lambda x: x, DateType()) + schema = StructType([StructField("dt", DecimalType(), True)]) + df = self.spark.createDataFrame([(None,)], schema=schema) + f = pandas_udf(lambda x: x, DecimalType()) with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, 'Unsupported data type'): df.select(f(col('dt'))).collect() + def test_vectorized_udf_null_date(self): + from pyspark.sql.functions import pandas_udf, col + from datetime import date + schema = StructType().add("date", DateType()) + data = [(date(1969, 1, 1),), + (date(2012, 2, 2),), + (None,), + (date(2100, 4, 4),)] + df = self.spark.createDataFrame(data, schema=schema) + date_f = pandas_udf(lambda t: t, returnType=DateType()) + res = df.select(date_f(col("date"))) + self.assertEquals(df.collect(), res.collect()) + + def test_vectorized_udf_timestamps(self): + from pyspark.sql.functions import pandas_udf, col + from datetime import datetime + schema = StructType([ + StructField("idx", LongType(), True), + StructField("timestamp", TimestampType(), True)]) + data = [(0, datetime(1969, 1, 1, 1, 1, 1)), + (1, datetime(2012, 2, 2, 2, 2, 2)), + (2, None), + (3, datetime(2100, 4, 4, 4, 4, 4))] + df = self.spark.createDataFrame(data, schema=schema) + + # Check that a timestamp passed through a pandas_udf will not be altered by timezone calc + f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType()) + df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) + + @pandas_udf(returnType=BooleanType()) + def check_data(idx, timestamp, timestamp_copy): + is_equal = timestamp.isnull() # use this array to check values are equal + for i in range(len(idx)): + # Check that timestamps are as expected in the UDF + is_equal[i] = (is_equal[i] and data[idx[i]][1] is None) or \ + timestamp[i].to_pydatetime() == data[idx[i]][1] + return is_equal + + result = df.withColumn("is_equal", check_data(col("idx"), col("timestamp"), + col("timestamp_copy"))).collect() + # Check that collection values are correct + self.assertEquals(len(data), len(result)) + for i in range(len(result)): + self.assertEquals(data[i][1], result[i][1]) # "timestamp" col + self.assertTrue(result[i][3]) # "is_equal" data in udf was as expected + + def test_vectorized_udf_return_timestamp_tz(self): + from pyspark.sql.functions import pandas_udf, col + import pandas as pd + df = self.spark.range(10) + + @pandas_udf(returnType=TimestampType()) + def gen_timestamps(id): + ts = [pd.Timestamp(i, unit='D', tz='America/Los_Angeles') for i in id] + return pd.Series(ts) + + result = df.withColumn("ts", gen_timestamps(col("id"))).collect() + spark_ts_t = TimestampType() + for r in result: + i, ts = r + ts_tz = pd.Timestamp(i, unit='D', tz='America/Los_Angeles').to_pydatetime() + expected = spark_ts_t.fromInternal(spark_ts_t.toInternal(ts_tz)) + self.assertEquals(expected, ts) + @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") class GroupbyApplyTests(ReusedPySparkTestCase): @@ -3550,8 +3634,8 @@ def test_wrong_args(self): def test_unsupported_types(self): from pyspark.sql.functions import pandas_udf, col schema = StructType( - [StructField("id", LongType(), True), StructField("dt", DateType(), True)]) - df = self.spark.createDataFrame([(1, datetime.date(1970, 1, 1),)], schema=schema) + [StructField("id", LongType(), True), StructField("dt", DecimalType(), True)]) + df = self.spark.createDataFrame([(1, None,)], schema=schema) f = pandas_udf(lambda x: x, df.schema) with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, 'Unsupported data type'): diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f65273d5f0b6c..7dd8fa04160e0 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1619,11 +1619,47 @@ def to_arrow_type(dt): arrow_type = pa.decimal(dt.precision, dt.scale) elif type(dt) == StringType: arrow_type = pa.string() + elif type(dt) == DateType: + arrow_type = pa.date32() + elif type(dt) == TimestampType: + # Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read + arrow_type = pa.timestamp('us', tz='UTC') else: raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) return arrow_type +def _check_dataframe_localize_timestamps(pdf): + """ + Convert timezone aware timestamps to timezone-naive in local time + + :param pdf: pandas.DataFrame + :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive + """ + from pandas.api.types import is_datetime64tz_dtype + for column, series in pdf.iteritems(): + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(series.dtype): + pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) + return pdf + + +def _check_series_convert_timestamps_internal(s): + """ + Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage + :param s: a pandas.Series + :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone + """ + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64_dtype(s.dtype): + return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') + elif is_datetime64tz_dtype(s.dtype): + return s.dt.tz_convert('UTC') + else: + return s + + def _test(): import doctest from pyspark.context import SparkContext diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java index 1f171049820b2..51ea719f8c4a6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java @@ -320,6 +320,10 @@ public ArrowColumnVector(ValueVector vector) { accessor = new StringAccessor((NullableVarCharVector) vector); } else if (vector instanceof NullableVarBinaryVector) { accessor = new BinaryAccessor((NullableVarBinaryVector) vector); + } else if (vector instanceof NullableDateDayVector) { + accessor = new DateAccessor((NullableDateDayVector) vector); + } else if (vector instanceof NullableTimeStampMicroTZVector) { + accessor = new TimestampAccessor((NullableTimeStampMicroTZVector) vector); } else if (vector instanceof ListVector) { ListVector listVector = (ListVector) vector; accessor = new ArrayAccessor(listVector); @@ -575,6 +579,36 @@ final byte[] getBinary(int rowId) { } } + private static class DateAccessor extends ArrowVectorAccessor { + + private final NullableDateDayVector.Accessor accessor; + + DateAccessor(NullableDateDayVector vector) { + super(vector); + this.accessor = vector.getAccessor(); + } + + @Override + final int getInt(int rowId) { + return accessor.get(rowId); + } + } + + private static class TimestampAccessor extends ArrowVectorAccessor { + + private final NullableTimeStampMicroTZVector.Accessor accessor; + + TimestampAccessor(NullableTimeStampMicroTZVector vector) { + super(vector); + this.accessor = vector.getAccessor(); + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId); + } + } + private static class ArrayAccessor extends ArrowVectorAccessor { private final UInt4Vector.Accessor accessor; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index b70dfc05330f8..12f09ba3d8c52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3143,9 +3143,11 @@ class Dataset[T] private[sql]( private[sql] def toArrowPayload: RDD[ArrowPayload] = { val schemaCaptured = this.schema val maxRecordsPerBatch = sparkSession.sessionState.conf.arrowMaxRecordsPerBatch + val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone queryExecution.toRdd.mapPartitionsInternal { iter => val context = TaskContext.get() - ArrowConverters.toPayloadIterator(iter, schemaCaptured, maxRecordsPerBatch, context) + ArrowConverters.toPayloadIterator( + iter, schemaCaptured, maxRecordsPerBatch, timeZoneId, context) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 561a067a2f81f..05ea1517fcac9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -74,9 +74,10 @@ private[sql] object ArrowConverters { rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Int, + timeZoneId: String, context: TaskContext): Iterator[ArrowPayload] = { - val arrowSchema = ArrowUtils.toArrowSchema(schema) + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala index 2caf1ef02909a..6ad11bda84bf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.arrow import scala.collection.JavaConverters._ import org.apache.arrow.memory.RootAllocator -import org.apache.arrow.vector.types.FloatingPointPrecision +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.spark.sql.types._ @@ -31,7 +31,8 @@ object ArrowUtils { // todo: support more types. - def toArrowType(dt: DataType): ArrowType = dt match { + /** Maps data type from Spark to Arrow. NOTE: timeZoneId required for TimestampTypes */ + def toArrowType(dt: DataType, timeZoneId: String): ArrowType = dt match { case BooleanType => ArrowType.Bool.INSTANCE case ByteType => new ArrowType.Int(8, true) case ShortType => new ArrowType.Int(8 * 2, true) @@ -42,6 +43,13 @@ object ArrowUtils { case StringType => ArrowType.Utf8.INSTANCE case BinaryType => ArrowType.Binary.INSTANCE case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale) + case DateType => new ArrowType.Date(DateUnit.DAY) + case TimestampType => + if (timeZoneId == null) { + throw new UnsupportedOperationException("TimestampType must supply timeZoneId parameter") + } else { + new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) + } case _ => throw new UnsupportedOperationException(s"Unsupported data type: ${dt.simpleString}") } @@ -58,22 +66,27 @@ object ArrowUtils { case ArrowType.Utf8.INSTANCE => StringType case ArrowType.Binary.INSTANCE => BinaryType case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) + case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType + case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt") } - def toArrowField(name: String, dt: DataType, nullable: Boolean): Field = { + /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ + def toArrowField( + name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = { dt match { case ArrayType(elementType, containsNull) => val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null) - new Field(name, fieldType, Seq(toArrowField("element", elementType, containsNull)).asJava) + new Field(name, fieldType, + Seq(toArrowField("element", elementType, containsNull, timeZoneId)).asJava) case StructType(fields) => val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null) new Field(name, fieldType, fields.map { field => - toArrowField(field.name, field.dataType, field.nullable) + toArrowField(field.name, field.dataType, field.nullable, timeZoneId) }.toSeq.asJava) case dataType => - val fieldType = new FieldType(nullable, toArrowType(dataType), null) + val fieldType = new FieldType(nullable, toArrowType(dataType, timeZoneId), null) new Field(name, fieldType, Seq.empty[Field].asJava) } } @@ -94,9 +107,10 @@ object ArrowUtils { } } - def toArrowSchema(schema: StructType): Schema = { + /** Maps schema from Spark to Arrow. NOTE: timeZoneId required for TimestampType in StructType */ + def toArrowSchema(schema: StructType, timeZoneId: String): Schema = { new Schema(schema.map { field => - toArrowField(field.name, field.dataType, field.nullable) + toArrowField(field.name, field.dataType, field.nullable, timeZoneId) }.asJava) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 0b740735ffe19..e4af4f65da127 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.arrow.vector._ import org.apache.arrow.vector.complex._ -import org.apache.arrow.vector.util.DecimalUtility +import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters @@ -29,8 +29,8 @@ import org.apache.spark.sql.types._ object ArrowWriter { - def create(schema: StructType): ArrowWriter = { - val arrowSchema = ArrowUtils.toArrowSchema(schema) + def create(schema: StructType, timeZoneId: String): ArrowWriter = { + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) create(root) } @@ -55,6 +55,8 @@ object ArrowWriter { case (DoubleType, vector: NullableFloat8Vector) => new DoubleWriter(vector) case (StringType, vector: NullableVarCharVector) => new StringWriter(vector) case (BinaryType, vector: NullableVarBinaryVector) => new BinaryWriter(vector) + case (DateType, vector: NullableDateDayVector) => new DateWriter(vector) + case (TimestampType, vector: NullableTimeStampMicroTZVector) => new TimestampWriter(vector) case (ArrayType(_, _), vector: ListVector) => val elementVector = createFieldWriter(vector.getDataVector()) new ArrayWriter(vector, elementVector) @@ -69,9 +71,7 @@ object ArrowWriter { } } -class ArrowWriter( - val root: VectorSchemaRoot, - fields: Array[ArrowFieldWriter]) { +class ArrowWriter(val root: VectorSchemaRoot, fields: Array[ArrowFieldWriter]) { def schema: StructType = StructType(fields.map { f => StructField(f.name, f.dataType, f.nullable) @@ -255,6 +255,33 @@ private[arrow] class BinaryWriter( } } +private[arrow] class DateWriter(val valueVector: NullableDateDayVector) extends ArrowFieldWriter { + + override def valueMutator: NullableDateDayVector#Mutator = valueVector.getMutator() + + override def setNull(): Unit = { + valueMutator.setNull(count) + } + + override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { + valueMutator.setSafe(count, input.getInt(ordinal)) + } +} + +private[arrow] class TimestampWriter( + val valueVector: NullableTimeStampMicroTZVector) extends ArrowFieldWriter { + + override def valueMutator: NullableTimeStampMicroTZVector#Mutator = valueVector.getMutator() + + override def setNull(): Unit = { + valueMutator.setNull(count) + } + + override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { + valueMutator.setSafe(count, input.getLong(ordinal)) + } +} + private[arrow] class ArrayWriter( val valueVector: ListVector, val elementWriter: ArrowFieldWriter) extends ArrowFieldWriter { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 81896187ecc46..0db463a5fbd89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -79,7 +79,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val columnarBatchIter = new ArrowPythonRunner( funcs, bufferSize, reuseWorker, - PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema) + PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema, conf.sessionLocalTimeZone) .compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index f6c03c415dc66..94c05b9b5e49f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -43,7 +43,8 @@ class ArrowPythonRunner( reuseWorker: Boolean, evalType: Int, argOffsets: Array[Array[Int]], - schema: StructType) + schema: StructType, + timeZoneId: String) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( funcs, bufferSize, reuseWorker, evalType, argOffsets) { @@ -60,7 +61,7 @@ class ArrowPythonRunner( } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - val arrowSchema = ArrowUtils.toArrowSchema(schema) + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"stdout writer for $pythonExec", 0, Long.MaxValue) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index 5ed88ada428cb..cc93fda9f81da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -94,8 +94,8 @@ case class FlatMapGroupsInPandasExec( val columnarBatchIter = new ArrowPythonRunner( chainedFunc, bufferSize, reuseWorker, - PythonEvalType.SQL_PANDAS_GROUPED_UDF, argOffsets, schema) - .compute(grouped, context.partitionId(), context) + PythonEvalType.SQL_PANDAS_GROUPED_UDF, argOffsets, schema, conf.sessionLocalTimeZone) + .compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 30422b657742c..ba2903babbba8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -32,6 +32,8 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -793,6 +795,103 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { collectAndValidate(df, json, "binaryData.json") } + test("date type conversion") { + val json = + s""" + |{ + | "schema" : { + | "fields" : [ { + | "name" : "date", + | "type" : { + | "name" : "date", + | "unit" : "DAY" + | }, + | "nullable" : true, + | "children" : [ ], + | "typeLayout" : { + | "vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + | }, { + | "type" : "DATA", + | "typeBitWidth" : 32 + | } ] + | } + | } ] + | }, + | "batches" : [ { + | "count" : 4, + | "columns" : [ { + | "name" : "date", + | "count" : 4, + | "VALIDITY" : [ 1, 1, 1, 1 ], + | "DATA" : [ -1, 0, 16533, 382607 ] + | } ] + | } ] + |} + """.stripMargin + + val d1 = DateTimeUtils.toJavaDate(-1) // "1969-12-31" + val d2 = DateTimeUtils.toJavaDate(0) // "1970-01-01" + val d3 = Date.valueOf("2015-04-08") + val d4 = Date.valueOf("3017-07-18") + + val df = Seq(d1, d2, d3, d4).toDF("date") + + collectAndValidate(df, json, "dateData.json") + } + + test("timestamp type conversion") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val json = + s""" + |{ + | "schema" : { + | "fields" : [ { + | "name" : "timestamp", + | "type" : { + | "name" : "timestamp", + | "unit" : "MICROSECOND", + | "timezone" : "America/Los_Angeles" + | }, + | "nullable" : true, + | "children" : [ ], + | "typeLayout" : { + | "vectors" : [ { + | "type" : "VALIDITY", + | "typeBitWidth" : 1 + | }, { + | "type" : "DATA", + | "typeBitWidth" : 64 + | } ] + | } + | } ] + | }, + | "batches" : [ { + | "count" : 4, + | "columns" : [ { + | "name" : "timestamp", + | "count" : 4, + | "VALIDITY" : [ 1, 1, 1, 1 ], + | "DATA" : [ -1234, 0, 1365383415567000, 33057298500000000 ] + | } ] + | } ] + |} + """.stripMargin + + val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z", Locale.US) + val ts1 = DateTimeUtils.toJavaTimestamp(-1234L) + val ts2 = DateTimeUtils.toJavaTimestamp(0L) + val ts3 = new Timestamp(sdf.parse("2013-04-08 01:10:15.567 UTC").getTime) + val ts4 = new Timestamp(sdf.parse("3017-07-18 14:55:00.000 UTC").getTime) + val data = Seq(ts1, ts2, ts3, ts4) + + val df = data.toDF("timestamp") + + collectAndValidate(df, json, "timestampData.json", "America/Los_Angeles") + } + } + test("floating-point NaN") { val json = s""" @@ -1486,15 +1585,6 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { runUnsupported { decimalData.toArrowPayload.collect() } runUnsupported { mapData.toDF().toArrowPayload.collect() } runUnsupported { complexData.toArrowPayload.collect() } - - val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z", Locale.US) - val d1 = new Date(sdf.parse("2015-04-08 13:10:15.000 UTC").getTime) - val d2 = new Date(sdf.parse("2016-05-09 13:10:15.000 UTC").getTime) - runUnsupported { Seq(d1, d2).toDF("date").toArrowPayload.collect() } - - val ts1 = new Timestamp(sdf.parse("2013-04-08 01:10:15.567 UTC").getTime) - val ts2 = new Timestamp(sdf.parse("2013-04-08 13:10:10.789 UTC").getTime) - runUnsupported { Seq(ts1, ts2).toDF("timestamp").toArrowPayload.collect() } } test("test Arrow Validator") { @@ -1638,7 +1728,7 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) val ctx = TaskContext.empty() - val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, ctx) + val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx) val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx) assert(schema.equals(outputRowIter.schema)) @@ -1657,22 +1747,24 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } /** Test that a converted DataFrame to Arrow record batch equals batch read from JSON file */ - private def collectAndValidate(df: DataFrame, json: String, file: String): Unit = { + private def collectAndValidate( + df: DataFrame, json: String, file: String, timeZoneId: String = null): Unit = { // NOTE: coalesce to single partition because can only load 1 batch in validator val arrowPayload = df.coalesce(1).toArrowPayload.collect().head val tempFile = new File(tempDataPath, file) Files.write(json, tempFile, StandardCharsets.UTF_8) - validateConversion(df.schema, arrowPayload, tempFile) + validateConversion(df.schema, arrowPayload, tempFile, timeZoneId) } private def validateConversion( sparkSchema: StructType, arrowPayload: ArrowPayload, - jsonFile: File): Unit = { + jsonFile: File, + timeZoneId: String = null): Unit = { val allocator = new RootAllocator(Long.MaxValue) val jsonReader = new JsonFileReader(jsonFile, allocator) - val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema) + val arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timeZoneId) val jsonSchema = jsonReader.start() Validator.compareSchemas(arrowSchema, jsonSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala index 638619fd39d06..d801f62b62323 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowUtilsSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.execution.arrow +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ class ArrowUtilsSuite extends SparkFunSuite { @@ -25,7 +28,7 @@ class ArrowUtilsSuite extends SparkFunSuite { def roundtrip(dt: DataType): Unit = { dt match { case schema: StructType => - assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema)) === schema) + assert(ArrowUtils.fromArrowSchema(ArrowUtils.toArrowSchema(schema, null)) === schema) case _ => roundtrip(new StructType().add("value", dt)) } @@ -42,6 +45,27 @@ class ArrowUtilsSuite extends SparkFunSuite { roundtrip(StringType) roundtrip(BinaryType) roundtrip(DecimalType.SYSTEM_DEFAULT) + roundtrip(DateType) + val tsExMsg = intercept[UnsupportedOperationException] { + roundtrip(TimestampType) + } + assert(tsExMsg.getMessage.contains("timeZoneId")) + } + + test("timestamp") { + + def roundtripWithTz(timeZoneId: String): Unit = { + val schema = new StructType().add("value", TimestampType) + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + val fieldType = arrowSchema.findField("value").getType.asInstanceOf[ArrowType.Timestamp] + assert(fieldType.getTimezone() === timeZoneId) + assert(ArrowUtils.fromArrowSchema(arrowSchema) === schema) + } + + roundtripWithTz(DateTimeUtils.defaultTimeZone().getID) + roundtripWithTz("Asia/Tokyo") + roundtripWithTz("UTC") + roundtripWithTz("America/Los_Angeles") } test("array") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala index e9a629315f5f4..a71e30aa3ca96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala @@ -27,9 +27,9 @@ import org.apache.spark.unsafe.types.UTF8String class ArrowWriterSuite extends SparkFunSuite { test("simple") { - def check(dt: DataType, data: Seq[Any]): Unit = { + def check(dt: DataType, data: Seq[Any], timeZoneId: String = null): Unit = { val schema = new StructType().add("value", dt, nullable = true) - val writer = ArrowWriter.create(schema) + val writer = ArrowWriter.create(schema, timeZoneId) assert(writer.schema === schema) data.foreach { datum => @@ -51,6 +51,8 @@ class ArrowWriterSuite extends SparkFunSuite { case DoubleType => reader.getDouble(rowId) case StringType => reader.getUTF8String(rowId) case BinaryType => reader.getBinary(rowId) + case DateType => reader.getInt(rowId) + case TimestampType => reader.getLong(rowId) } assert(value === datum) } @@ -66,12 +68,14 @@ class ArrowWriterSuite extends SparkFunSuite { check(DoubleType, Seq(1.0d, 2.0d, null, 4.0d)) check(StringType, Seq("a", "b", null, "d").map(UTF8String.fromString)) check(BinaryType, Seq("a".getBytes(), "b".getBytes(), null, "d".getBytes())) + check(DateType, Seq(0, 1, 2, null, 4)) + check(TimestampType, Seq(0L, 3.6e9.toLong, null, 8.64e10.toLong), "America/Los_Angeles") } test("get multiple") { - def check(dt: DataType, data: Seq[Any]): Unit = { + def check(dt: DataType, data: Seq[Any], timeZoneId: String = null): Unit = { val schema = new StructType().add("value", dt, nullable = false) - val writer = ArrowWriter.create(schema) + val writer = ArrowWriter.create(schema, timeZoneId) assert(writer.schema === schema) data.foreach { datum => @@ -88,6 +92,8 @@ class ArrowWriterSuite extends SparkFunSuite { case LongType => reader.getLongs(0, data.size) case FloatType => reader.getFloats(0, data.size) case DoubleType => reader.getDoubles(0, data.size) + case DateType => reader.getInts(0, data.size) + case TimestampType => reader.getLongs(0, data.size) } assert(values === data) @@ -100,12 +106,14 @@ class ArrowWriterSuite extends SparkFunSuite { check(LongType, (0 until 10).map(_.toLong)) check(FloatType, (0 until 10).map(_.toFloat)) check(DoubleType, (0 until 10).map(_.toDouble)) + check(DateType, (0 until 10)) + check(TimestampType, (0 until 10).map(_ * 4.32e10.toLong), "America/Los_Angeles") } test("array") { val schema = new StructType() .add("arr", ArrayType(IntegerType, containsNull = true), nullable = true) - val writer = ArrowWriter.create(schema) + val writer = ArrowWriter.create(schema, null) assert(writer.schema === schema) writer.write(InternalRow(ArrayData.toArrayData(Array(1, 2, 3)))) @@ -144,7 +152,7 @@ class ArrowWriterSuite extends SparkFunSuite { test("nested array") { val schema = new StructType().add("nested", ArrayType(ArrayType(IntegerType))) - val writer = ArrowWriter.create(schema) + val writer = ArrowWriter.create(schema, null) assert(writer.schema === schema) writer.write(InternalRow(ArrayData.toArrayData(Array( @@ -195,7 +203,7 @@ class ArrowWriterSuite extends SparkFunSuite { test("struct") { val schema = new StructType() .add("struct", new StructType().add("i", IntegerType).add("str", StringType)) - val writer = ArrowWriter.create(schema) + val writer = ArrowWriter.create(schema, null) assert(writer.schema === schema) writer.write(InternalRow(InternalRow(1, UTF8String.fromString("str1")))) @@ -231,7 +239,7 @@ class ArrowWriterSuite extends SparkFunSuite { test("nested struct") { val schema = new StructType().add("struct", new StructType().add("nested", new StructType().add("i", IntegerType).add("str", StringType))) - val writer = ArrowWriter.create(schema) + val writer = ArrowWriter.create(schema, null) assert(writer.schema === schema) writer.write(InternalRow(InternalRow(InternalRow(1, UTF8String.fromString("str1"))))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala index d24a9e1f4bd16..068a17bf772e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala @@ -29,7 +29,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("boolean") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("boolean", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("boolean", BooleanType, nullable = true) + val vector = ArrowUtils.toArrowField("boolean", BooleanType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableBitVector] vector.allocateNew() val mutator = vector.getMutator() @@ -58,7 +58,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("byte") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("byte", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("byte", ByteType, nullable = true) + val vector = ArrowUtils.toArrowField("byte", ByteType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableTinyIntVector] vector.allocateNew() val mutator = vector.getMutator() @@ -87,7 +87,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("short") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("short", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("short", ShortType, nullable = true) + val vector = ArrowUtils.toArrowField("short", ShortType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableSmallIntVector] vector.allocateNew() val mutator = vector.getMutator() @@ -116,7 +116,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("int") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableIntVector] vector.allocateNew() val mutator = vector.getMutator() @@ -145,7 +145,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("long") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("long", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("long", LongType, nullable = true) + val vector = ArrowUtils.toArrowField("long", LongType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableBigIntVector] vector.allocateNew() val mutator = vector.getMutator() @@ -174,7 +174,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("float") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("float", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("float", FloatType, nullable = true) + val vector = ArrowUtils.toArrowField("float", FloatType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableFloat4Vector] vector.allocateNew() val mutator = vector.getMutator() @@ -203,7 +203,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("double") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("double", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("double", DoubleType, nullable = true) + val vector = ArrowUtils.toArrowField("double", DoubleType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableFloat8Vector] vector.allocateNew() val mutator = vector.getMutator() @@ -232,7 +232,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("string") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("string", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("string", StringType, nullable = true) + val vector = ArrowUtils.toArrowField("string", StringType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableVarCharVector] vector.allocateNew() val mutator = vector.getMutator() @@ -260,7 +260,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("binary") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("binary", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable = true) + val vector = ArrowUtils.toArrowField("binary", BinaryType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableVarBinaryVector] vector.allocateNew() val mutator = vector.getMutator() @@ -288,7 +288,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("array") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("array", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("array", ArrayType(IntegerType), nullable = true) + val vector = ArrowUtils.toArrowField("array", ArrayType(IntegerType), nullable = true, null) .createVector(allocator).asInstanceOf[ListVector] vector.allocateNew() val mutator = vector.getMutator() @@ -345,7 +345,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite { test("struct") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue) val schema = new StructType().add("int", IntegerType).add("long", LongType) - val vector = ArrowUtils.toArrowField("struct", schema, nullable = true) + val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null) .createVector(allocator).asInstanceOf[NullableMapVector] vector.allocateNew() val mutator = vector.getMutator() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 0b179aa97c479..4cfc776e51db1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1249,11 +1249,11 @@ class ColumnarBatchSuite extends SparkFunSuite { test("create columnar batch from Arrow column vectors") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) - val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableIntVector] vector1.allocateNew() val mutator1 = vector1.getMutator() - val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true, null) .createVector(allocator).asInstanceOf[NullableIntVector] vector2.allocateNew() val mutator2 = vector2.getMutator()