Skip to content

Commit

Permalink
[Enhancement] (doris-future) Support auto partition name function (#3…
Browse files Browse the repository at this point in the history
…4258)

## Proposed changes
Issue Number: #34130
  • Loading branch information
wyxxxcat authored Aug 5, 2024
1 parent 13cb380 commit d848afb
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 0 deletions.
1 change: 1 addition & 0 deletions be/src/vec/functions/function_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ void register_function_string(SimpleFunctionFactory& factory) {
factory.register_function<FunctionStringLocate>();
factory.register_function<FunctionStringLocatePos>();
factory.register_function<FunctionQuote>();
factory.register_function<FunctionAutoPartitionName>();
factory.register_function<FunctionReverseCommon>();
factory.register_function<FunctionUnHex>();
factory.register_function<FunctionToLower>();
Expand Down
215 changes: 215 additions & 0 deletions be/src/vec/functions/function_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
#include <array>
#include <boost/iterator/iterator_facade.hpp>
#include <cmath>
#include <codecvt>
#include <cstddef>
#include <cstdlib>
#include <iomanip>
#include <limits>
#include <memory>
#include <ostream>
#include <random>
#include <regex>
#include <sstream>
#include <stdexcept>
#include <tuple>
Expand Down Expand Up @@ -70,6 +72,7 @@
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/functions/function_binary_arithmetic.h"
#include "vec/functions/round.h"
#include "vec/io/io_helper.h"
#include "vec/utils/template_helpers.hpp"
Expand Down Expand Up @@ -387,6 +390,218 @@ class FunctionStrcmp : public IFunction {
}
};

class FunctionAutoPartitionName : public IFunction {
public:
static constexpr auto name = "auto_partition_name";
static FunctionPtr create() { return std::make_shared<FunctionAutoPartitionName>(); }
String get_name() const override { return name; }
size_t get_number_of_arguments() const override { return 0; }
bool is_variadic() const override { return true; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeString>();
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) const override {
size_t argument_size = arguments.size();
if (argument_size < 2) {
return Status::InvalidArgument(
"auto_partition_name must contains at least two arguments");
}
std::vector<const ColumnString::Chars*> chars_list(argument_size);
std::vector<const ColumnString::Offsets*> offsets_list(argument_size);
std::vector<bool> is_const_args(argument_size);

for (int i = 0; i < argument_size; ++i) {
const auto& [col, is_const] =
unpack_if_const(block.get_by_position(arguments[i]).column);

const auto* col_str = assert_cast<const ColumnString*>(col.get());
chars_list[i] = &col_str->get_chars();
offsets_list[i] = &col_str->get_offsets();
is_const_args[i] = is_const;
}
auto res = ColumnString::create();
auto& res_data = res->get_chars();
auto& res_offset = res->get_offsets();
res_offset.resize(input_rows_count);

const char* partition_type = chars_list[0]->raw_data();
// partition type is list|range
if (std::strncmp(partition_type, "list", 4) == 0) {
return _auto_partition_type_of_list(chars_list, offsets_list, is_const_args, res_data,
res_offset, input_rows_count, argument_size, block,
result, res);
} else {
return _auto_partition_type_of_range(chars_list, offsets_list, is_const_args, res_data,
res_offset, input_rows_count, argument_size, block,
result, res);
}
return Status::OK();
}

private:
std::u16string _string_to_u16string(const std::string& str) const {
std::wstring_convert<std::codecvt_utf8_utf16<char16_t>, char16_t> convert;
return convert.from_bytes(str);
}

std::string _string_to_unicode(const std::u16string& s) const {
std::string res_s;
res_s.reserve(s.size());
if (s.length() > 0 && s[0] == '-') {
res_s += '_';
}
for (int i = 0; i < s.length(); i++) {
char ch = s[i];
if (std::isalnum(ch)) {
res_s += ch;
} else {
int unicodeValue = _get_code_point_at(s, i);
res_s += fmt::format("{:02x}", static_cast<uint32_t>(unicodeValue));
}
}
return res_s;
}

int _get_code_point_at(const std::u16string& str, std::size_t index) const {
char16_t first = str[index];
// [0xD800,0xDBFF] is the scope of the first code unit
if ((first >= 0xD800 && first <= 0xDBFF) && (index + 1 < str.size())) {
char16_t second = str[index + 1];
// [0xDC00,0xDFFF] is the scope of the second code unit
if (second >= 0xDC00 && second <= 0xDFFF) {
return ((first - 0xD800) << 10) + (second - 0xDC00) + 0x10000;
}
}

return first;
}
Status _auto_partition_type_of_list(std::vector<const ColumnString::Chars*>& chars_list,
std::vector<const ColumnString::Offsets*>& offsets_list,
std::vector<bool>& is_const_args, auto& res_data,
auto& res_offset, size_t input_rows_count,
size_t argument_size, Block& block, size_t result,
auto& res) const {
int curr_len = 0;
for (int row = 0; row < input_rows_count; row++) {
std::string res_p;
res_p.reserve(argument_size * 5);
res_p += 'p';
for (int col = 1; col < argument_size; col++) {
const auto& current_offsets = *offsets_list[col];
const auto& current_chars = *chars_list[col];

auto idx = index_check_const(row, is_const_args[col]);
int size = current_offsets[idx] - current_offsets[idx - 1];
const char* raw_chars =
reinterpret_cast<const char*>(&current_chars[current_offsets[idx - 1]]);

// convert string to u16string in order to convert to unicode strings
const std::string raw_str(raw_chars, size);
auto u16string = _string_to_u16string(raw_str);
res_p += _string_to_unicode(u16string) + std::to_string(u16string.size());
}

// check the name of length
int len = res_p.size();
if (len > 50) [[unlikely]] {
return Status::InvalidArgument(
"The list partition name cannot exceed 50 characters");
}
curr_len += len;
res_data.resize(curr_len);
memcpy(&res_data[res_offset[row - 1]], res_p.c_str(), len);
res_offset[row] = res_offset[row - 1] + len;
}
block.get_by_position(result).column = std::move(res);
return Status::OK();
}

size_t _copy_date_str_of_len_to_res_data(auto& res_data, auto& res_offset,
std::vector<std::string>& date_str, size_t row,
size_t len) const {
size_t curr_len = 1;
for (int j = 0; j < len; j++) {
memcpy(&res_data[res_offset[row - 1]] + curr_len, date_str[j].c_str(),
date_str[j].size());
curr_len += date_str[j].size();
}
return curr_len;
}

Status _auto_partition_type_of_range(std::vector<const ColumnString::Chars*>& chars_list,
std::vector<const ColumnString::Offsets*>& offsets_list,
std::vector<bool>& is_const_args, auto& res_data,
auto& res_offset, size_t input_rows_count,
size_t argument_size, Block& block, size_t result,
auto& res) const {
const char* range_type = chars_list[1]->raw_data();

res_data.resize(15 * input_rows_count);
for (int i = 0; i < input_rows_count; i++) {
const auto& current_offsets = *offsets_list[2];
const auto& current_chars = *chars_list[2];

auto idx = index_check_const(i, is_const_args[2]);
int size = current_offsets[idx] - current_offsets[idx - 1];
const char* tmp =
reinterpret_cast<const char*>(&current_chars[current_offsets[idx - 1]]);
std::string to_split_s(tmp, size);

// check the str if it is date|datetime
RE2 date_regex(R"(^\d{4}-\d{2}-\d{2}( \d{2}:\d{2}:\d{2})?$)");
if (!RE2::FullMatch(to_split_s, date_regex)) {
return Status::InvalidArgument("The range partition only support DATE|DATETIME");
}

// split date_str from (yyyy-mm-dd hh:mm:ss) to ([yyyy, mm, dd, hh, mm, ss])
std::vector<std::string> date_str(6);
date_str[0] = to_split_s.substr(0, 4);
for (int i = 5, j = 1; i <= size; i += 3, j++) {
date_str[j] = to_split_s.substr(i, 2);
}
int curr_len = 0;

res_data[res_offset[i - 1]] = 'p';
// raw => 2022-12-12 11:30:20
// year => 2022 01 01 00 00 00
// month => 2022 12 01 00 00 00
// day => 2022 12 12 00 00 00
// hour => 2022 12 12 11 00 00
// minute => 2022 12 11 30 00
// second => 2022 12 12 12 30 20

if (!strncmp(range_type, "year", 4)) {
curr_len += _copy_date_str_of_len_to_res_data(res_data, res_offset, date_str, i, 1);
memcpy(&res_data[res_offset[i - 1]] + curr_len, "0101", 4);
curr_len += 4;
} else if (!strncmp(range_type, "month", 5)) {
curr_len += _copy_date_str_of_len_to_res_data(res_data, res_offset, date_str, i, 2);
memcpy(&res_data[res_offset[i - 1]] + curr_len, "01", 2);
curr_len += 2;
} else if (!strncmp(range_type, "day", 3)) {
curr_len += _copy_date_str_of_len_to_res_data(res_data, res_offset, date_str, i, 3);
} else if (!strncmp(range_type, "hour", 4)) {
curr_len += _copy_date_str_of_len_to_res_data(res_data, res_offset, date_str, i, 4);
} else if (!strncmp(range_type, "minute", 6)) {
curr_len += _copy_date_str_of_len_to_res_data(res_data, res_offset, date_str, i, 5);
} else if (!strncmp(range_type, "second", 6)) {
curr_len += _copy_date_str_of_len_to_res_data(res_data, res_offset, date_str, i, 6);
}

// fill in zero
int zero = 15 - curr_len;
std::fill_n(&res_data[res_offset[i - 1]] + curr_len, zero, '0');
curr_len += zero;
res_offset[i] = res_offset[i - 1] + curr_len;
}
block.get_by_position(result).column = std::move(res);
return Status::OK();
}
};

template <typename Impl>
class FunctionSubstring : public IFunction {
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.Asin;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan2;
import org.apache.doris.nereids.trees.expressions.functions.scalar.AutoPartitionName;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Bin;
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitCount;
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitLength;
Expand Down Expand Up @@ -528,6 +529,7 @@ public class BuiltinScalarFunctions implements FunctionHelper {
scalar(Asin.class, "asin"),
scalar(Atan.class, "atan"),
scalar(Atan2.class, "atan2"),
scalar(AutoPartitionName.class, "auto_partition_name"),
scalar(Bin.class, "bin"),
scalar(BitCount.class, "bit_count"),
scalar(BitLength.class, "bit_length"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.expressions.functions.scalar;

import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.util.List;

/**
* ScalarFunction 'auto_partition_name'. This class is not generated by
* GenerateFunction.
*/
public class AutoPartitionName extends ScalarFunction
implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).varArgs(VarcharType.SYSTEM_DEFAULT),
FunctionSignature.ret(StringType.INSTANCE).varArgs(StringType.INSTANCE));

/**
* constructor with 2 or 3 arguments.
*/
public AutoPartitionName(Expression arg, Expression... varArgs) {
super("auto_partition_name", ExpressionUtils.mergeArguments(arg, varArgs));
}

/**
* withChildren.
*/
@Override
public AutoPartitionName withChildren(List<Expression> children) {
Preconditions.checkArgument(children.size() >= 2);
return new AutoPartitionName(children.get(0),
children.subList(1, children.size()).toArray(new Expression[0]));
}

@Override
public void checkLegalityAfterRewrite() {
if (arity() < 2) {
throw new AnalysisException("function auto_partition_name must contains at least two arguments");
}
if (!child(0).isLiteral()) {
throw new AnalysisException("auto_partition_name must accept literal for 1nd argument");
}
final String partition_type = ((VarcharLiteral) getArgument(0)).getStringValue().toLowerCase();
if (!Lists.newArrayList("range", "list").contains(partition_type)) {
throw new AnalysisException("function auto_partition_name must accept range|list for 1nd argument");
} else if (Lists.newArrayList("range").contains(partition_type)) {
if (!child(1).isLiteral()) {
throw new AnalysisException("auto_partition_name must accept literal for 2nd argument");
} else {
final String range_partition_type = ((VarcharLiteral) getArgument(1)).getStringValue()
.toLowerCase();
if (arity() != 3) {
throw new AnalysisException("range auto_partition_name must contains three arguments");
}
if (!Lists.newArrayList("year", "month", "day", "hour", "minute", "second")
.contains(range_partition_type)) {
throw new AnalysisException(
"range auto_partition_name must accept year|month|day|hour|minute|second for 2nd argument");
}
}

}
}

@Override
public List<FunctionSignature> getSignatures() {
return SIGNATURES;
}

@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitAutoPartitionName(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.Asin;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Atan2;
import org.apache.doris.nereids.trees.expressions.functions.scalar.AutoPartitionName;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Bin;
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitCount;
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitLength;
Expand Down Expand Up @@ -708,6 +709,10 @@ default R visitAtan2(Atan2 atan2, C context) {
return visitScalarFunction(atan2, context);
}

default R visitAutoPartitionName(AutoPartitionName autoPartitionName, C context) {
return visitScalarFunction(autoPartitionName, context);
}

default R visitBin(Bin bin, C context) {
return visitScalarFunction(bin, context);
}
Expand Down
Loading

0 comments on commit d848afb

Please sign in to comment.