Skip to content

Commit

Permalink
Read DateTime64 from CSV (#6188)
Browse files Browse the repository at this point in the history
  • Loading branch information
iddqdex authored Jul 2, 2024
1 parent 00eab5b commit 732076c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ arrow::Result<std::shared_ptr<arrow::DataType>> GetCSVArrowType(NScheme::TTypeIn
std::shared_ptr<arrow::DataType> result;
switch (typeId.GetTypeId()) {
case NScheme::NTypeIds::Datetime:
case NScheme::NTypeIds::Datetime64:
return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
case NScheme::NTypeIds::Timestamp:
case NScheme::NTypeIds::Timestamp64:
return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
case NScheme::NTypeIds::Date:
case NScheme::NTypeIds::Date32:
return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
default:
return GetArrowType(typeId);
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/io_formats/arrow/csv_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,28 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
}
}
arrResult = aBuilder.Finish();
} else if (originalType->id() == arrow::Int32Type::type_id) {
arrow::Int32Builder aBuilder;
Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
for (long i = 0; i < parsedBatch->num_rows(); ++i) {
if (i64Arr->IsNull(i)) {
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
} else {
aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400);
}
}
arrResult = aBuilder.Finish();
} else if (originalType->id() == arrow::Int64Type::type_id) {
arrow::Int64Builder aBuilder;
Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
for (long i = 0; i < parsedBatch->num_rows(); ++i) {
if (i64Arr->IsNull(i)) {
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
} else {
aBuilder.UnsafeAppend(i64Arr->Value(i));
}
}
arrResult = aBuilder.Finish();
} else {
Y_ABORT_UNLESS(false);
}
Expand Down
26 changes: 26 additions & 0 deletions ydb/core/kqp/ut/olap/datatime64_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,32 @@ Y_UNIT_TEST_SUITE(KqpDatetime64ColumnShard) {
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE `date` >= Date32('1969-12-29')", "[[-3;[-1000002]];[0;#];[4;[-2000003]];[9;[5000004]]]");
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE `date` != Date32('1970-01-01')", "[[-7;[3000001]];[-3;[-1000002]];[4;[-2000003]];[9;[5000004]]]");
}

Y_UNIT_TEST(Csv) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;

TTestHelper testHelper(runnerSettings);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int64).SetNullable(false),
TTestHelper::TColumnSchema().SetName("date32").SetType(NScheme::NTypeIds::Date32),
TTestHelper::TColumnSchema().SetName("timestamp64").SetType(NScheme::NTypeIds::Timestamp64),
TTestHelper::TColumnSchema().SetName("datetime64").SetType(NScheme::NTypeIds::Datetime64),
};

TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);
{
TStringBuilder builder;
builder << "1,2000-01-05,2000-01-15T01:15:12.122Z,2005-01-15T01:15:12Z" << Endl;
const auto result = testHelper.GetKikimr().GetTableClient().BulkUpsert(testTable.GetName(), EDataFormat::CSV, builder).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess() , result.GetIssues().ToString());
}
testHelper.ReadData("SELECT id, date32, timestamp64, datetime64 FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;[10961];[947898912122000];[1105751712]]]");
}

}

} // namespace NKqp
Expand Down

0 comments on commit 732076c

Please sign in to comment.