Skip to content

Commit

Permalink
[feature](schema change) unified schema change for parquet and orc re…
Browse files Browse the repository at this point in the history
…ader (#32873) (#38408)

## Proposed changes
bp #32873 

Scenario: Reading a hive table after adding fields to a struct column
Since there are still problems with reading tables in parquet and text
formats on the master in this scenario, only tables in orc format are
picked here and some cases are added.
  • Loading branch information
hubgeter authored Jul 30, 2024
1 parent 9c7a035 commit ce64963
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 35 deletions.
9 changes: 1 addition & 8 deletions be/src/vec/data_types/data_type_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,7 @@ std::optional<size_t> DataTypeStruct::try_get_position_by_name(const String& nam
}

String DataTypeStruct::get_name_by_position(size_t i) const {
if (i == 0 || i > names.size()) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Index of tuple element ({}) if out range ([1, {}])", i,
names.size());
LOG(FATAL) << fmt::to_string(error_msg);
}

return names[i - 1];
return names[i];
}

int64_t DataTypeStruct::get_uncompressed_serialized_bytes(const IColumn& column,
Expand Down
94 changes: 68 additions & 26 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1274,16 +1274,15 @@ Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
}

template <bool is_filter>
Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
const ColumnPtr& doris_column,
Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, ColumnPtr& doris_column,
const DataTypePtr& data_type,
const orc::Type* orc_column_type,
orc::ColumnVectorBatch* cvb, size_t num_values) {
MutableColumnPtr data_column;
if (doris_column->is_nullable()) {
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
auto* nullable_column =
reinterpret_cast<ColumnNullable*>(doris_column->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& map_data_column = nullable_column->get_null_map_data();
auto origin_size = map_data_column.size();
Expand All @@ -1294,9 +1293,7 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
map_data_column[origin_size + i] = !cvb_nulls[i];
}
} else {
for (int i = 0; i < num_values; ++i) {
map_data_column[origin_size + i] = false;
}
memset(map_data_column.data() + origin_size, 0, num_values);
}
} else {
if (cvb->hasNulls) {
Expand All @@ -1306,6 +1303,16 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
data_column = doris_column->assume_mutable();
}

return _fill_doris_data_column<is_filter>(col_name, data_column, data_type, orc_column_type,
cvb, num_values);
}

template <bool is_filter>
Status OrcReader::_fill_doris_data_column(const std::string& col_name,
MutableColumnPtr& data_column,
const DataTypePtr& data_type,
const orc::Type* orc_column_type,
orc::ColumnVectorBatch* cvb, size_t num_values) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(FlatType, CppType, OrcColumnType) \
Expand Down Expand Up @@ -1346,9 +1353,11 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
cvb, num_values);
case TypeIndex::Array: {
if (orc_column_type->getKind() != orc::TypeKind::LIST) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
return Status::InternalError(
"Wrong data type for column '{}', expected list, actual {}", col_name,
orc_column_type->getKind());
}
auto* orc_list = down_cast<orc::ListVectorBatch*>(cvb);
auto* orc_list = dynamic_cast<orc::ListVectorBatch*>(cvb);
auto& doris_offsets = static_cast<ColumnArray&>(*data_column).get_offsets();
auto& orc_offsets = orc_list->offsets;
size_t element_size = 0;
Expand All @@ -1358,15 +1367,17 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get())
->get_nested_type());
const orc::Type* nested_orc_type = orc_column_type->getSubtype(0);
std::string element_name = col_name + ".element";
return _orc_column_to_doris_column<is_filter>(
col_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
element_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
nested_orc_type, orc_list->elements.get(), element_size);
}
case TypeIndex::Map: {
if (orc_column_type->getKind() != orc::TypeKind::MAP) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
return Status::InternalError("Wrong data type for column '{}', expected map, actual {}",
col_name, orc_column_type->getKind());
}
auto* orc_map = down_cast<orc::MapVectorBatch*>(cvb);
auto* orc_map = dynamic_cast<orc::MapVectorBatch*>(cvb);
auto& doris_map = static_cast<ColumnMap&>(*data_column);
size_t element_size = 0;
RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_map.get_offsets(),
Expand All @@ -1379,33 +1390,64 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
->get_value_type());
const orc::Type* orc_key_type = orc_column_type->getSubtype(0);
const orc::Type* orc_value_type = orc_column_type->getSubtype(1);
const ColumnPtr& doris_key_column = doris_map.get_keys_ptr();
const ColumnPtr& doris_value_column = doris_map.get_values_ptr();
RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>(col_name, doris_key_column,
ColumnPtr& doris_key_column = doris_map.get_keys_ptr();
ColumnPtr& doris_value_column = doris_map.get_values_ptr();
std::string key_col_name = col_name + ".key";
std::string value_col_name = col_name + ".value";
RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>(key_col_name, doris_key_column,
doris_key_type, orc_key_type,
orc_map->keys.get(), element_size));
return _orc_column_to_doris_column<is_filter>(col_name, doris_value_column,
return _orc_column_to_doris_column<is_filter>(value_col_name, doris_value_column,
doris_value_type, orc_value_type,
orc_map->elements.get(), element_size);
}
case TypeIndex::Struct: {
if (orc_column_type->getKind() != orc::TypeKind::STRUCT) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
return Status::InternalError(
"Wrong data type for column '{}', expected struct, actual {}", col_name,
orc_column_type->getKind());
}
auto* orc_struct = down_cast<orc::StructVectorBatch*>(cvb);
auto* orc_struct = dynamic_cast<orc::StructVectorBatch*>(cvb);
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
if (orc_struct->fields.size() != doris_struct.tuple_size()) {
return Status::InternalError("Wrong number of struct fields for column '{}'", col_name);
}
std::map<int, int> read_fields;
std::set<int> missing_fields;
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
orc::ColumnVectorBatch* orc_field = orc_struct->fields[i];
const orc::Type* orc_type = orc_column_type->getSubtype(i);
const ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
const DataTypePtr& doris_type = doris_struct_type->get_element(i);
bool is_missing_col = true;
for (int j = 0; j < orc_column_type->getSubtypeCount(); ++j) {
if (boost::iequals(doris_struct_type->get_name_by_position(i),
orc_column_type->getFieldName(j))) {
read_fields[i] = j;
is_missing_col = false;
break;
}
}
if (is_missing_col) {
missing_fields.insert(i);
}
}

for (int missing_field : missing_fields) {
ColumnPtr& doris_field = doris_struct.get_column_ptr(missing_field);
if (!doris_field->is_nullable()) {
return Status::InternalError(
"Child field of '{}' is not nullable, but is missing in orc file",
col_name);
}
reinterpret_cast<ColumnNullable*>(doris_field->assume_mutable().get())
->insert_null_elements(num_values);
}

for (auto read_field : read_fields) {
orc::ColumnVectorBatch* orc_field = orc_struct->fields[read_field.second];
const orc::Type* orc_type = orc_column_type->getSubtype(read_field.second);
std::string field_name =
col_name + "." + orc_column_type->getFieldName(read_field.second);
ColumnPtr& doris_field = doris_struct.get_column_ptr(read_field.first);
const DataTypePtr& doris_type = doris_struct_type->get_element(read_field.first);
RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>(
col_name, doris_field, doris_type, orc_type, orc_field, num_values));
field_name, doris_field, doris_type, orc_type, orc_field, num_values));
}
return Status::OK();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,16 @@ class OrcReader : public GenericReader {
void _init_system_properties();
void _init_file_description();
template <bool is_filter = false>
Status _orc_column_to_doris_column(const std::string& col_name, const ColumnPtr& doris_column,
Status _orc_column_to_doris_column(const std::string& col_name, ColumnPtr& doris_column,
const DataTypePtr& data_type,
const orc::Type* orc_column_type,
orc::ColumnVectorBatch* cvb, size_t num_values);

template <bool is_filter = false>
Status _fill_doris_data_column(const std::string& col_name, MutableColumnPtr& data_column,
const DataTypePtr& data_type, const orc::Type* orc_column_type,
orc::ColumnVectorBatch* cvb, size_t num_values);

template <typename CppType, typename OrcColumnType>
Status _decode_flat_column(const std::string& col_name, const MutableColumnPtr& data_column,
orc::ColumnVectorBatch* cvb, size_t num_values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,19 @@ LOCATION

msck repair table test_string_dict_filter_orc;

create table test_hive_struct_add_column_orc (
`id` int,
`name` string,
`details` struct<age:int,city:string,email:string,phone:int>,
`sex` int,
`complex` array<struct<a:int,b:struct<aa:string,bb:int>>>
)
STORED AS ORC
LOCATION '/user/doris/preinstalled_data/orc_table/test_hive_struct_add_column_orc';

msck repair table test_hive_struct_add_column_orc;


show tables;


Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit ce64963

Please sign in to comment.