Skip to content

Commit

Permalink
support bitmap agg function bitmap_union bitmap_union_count bitmap_un… (
Browse files Browse the repository at this point in the history
apache#55)

* support bitmap agg function bitmap_union bitmap_union_count bitmap_union_int

* Fix Bug in Union, Support bitmap_hash bitmap_empty bitmap_from_string
  • Loading branch information
stdpain authored and HappenLee committed Jul 1, 2021
1 parent 77ada7e commit 2e27cee
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 7 deletions.
76 changes: 76 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/aggregate_functions/aggregate_function_bitmap.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"

namespace doris::vectorized {

template <bool nullable, template <bool, typename> class AggregateFunctionTemplate>
static IAggregateFunction* createWithIntDataType(const DataTypes& argument_type) {
auto type = argument_type[0].get();
if(type->isNullable()) {
type = assert_cast<const DataTypeNullable*>(type)->getNestedType().get();
}
WhichDataType which(type);
if (which.idx == TypeIndex::Int8)
return new AggregateFunctionTemplate<nullable, ColumnVector<Int8>>(argument_type);
if (which.idx == TypeIndex::Int16)
return new AggregateFunctionTemplate<nullable, ColumnVector<Int16>>(argument_type);
if (which.idx == TypeIndex::Int32)
return new AggregateFunctionTemplate<nullable, ColumnVector<Int32>>(argument_type);
if (which.idx == TypeIndex::Int64)
return new AggregateFunctionTemplate<nullable, ColumnVector<Int64>>(argument_type);
return nullptr;
}

AggregateFunctionPtr createAggregateFunctionBitmapUnion(const std::string& name,
const DataTypes& argument_types,
const Array& parameters) {
return std::make_shared<AggregateFunctionBitmapOp<AggregateFunctionBitmapUnionOp>>(argument_types);
}

AggregateFunctionPtr createAggregateFunctionBitmapIntersect(const std::string& name,
const DataTypes& argument_types,
const Array& parameters) {
return std::make_shared<AggregateFunctionBitmapOp<AggregateFunctionBitmapIntersectOp>>(argument_types);
}
template <bool nullable>
AggregateFunctionPtr createAggregateFunctionBitmapUnionCount(const std::string& name,
const DataTypes& argument_types,
const Array& parameters) {
return std::make_shared<AggregateFunctionBitmapCount<nullable, ColumnBitmap>>(argument_types);
}

template <bool nullable>
AggregateFunctionPtr createAggregateFunctionBitmapUnionInt(const std::string& name,
const DataTypes& argument_types,
const Array& parameters) {

return std::shared_ptr<IAggregateFunction>(createWithIntDataType<nullable, AggregateFunctionBitmapCount>(argument_types));
}

void registerAggregateFunctionBitmap(AggregateFunctionSimpleFactory& factory) {
factory.registerFunction("bitmap_union", createAggregateFunctionBitmapUnion);
factory.registerFunction("bitmap_intersect", createAggregateFunctionBitmapIntersect);
factory.registerFunction("bitmap_union_count", createAggregateFunctionBitmapUnionCount<false>);
factory.registerFunction("bitmap_union_count", createAggregateFunctionBitmapUnionCount<true>, true);

factory.registerFunction("bitmap_union_int", createAggregateFunctionBitmapUnionInt<false>);
factory.registerFunction("bitmap_union_int", createAggregateFunctionBitmapUnionInt<true>, true);
}
}
166 changes: 166 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once
#include <istream>
#include <ostream>

#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column_complex.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_bitmap.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/io/io_helper.h"

namespace doris::vectorized {

struct AggregateFunctionBitmapUnionOp {
static constexpr auto name = "bitmap_union";

template <typename T>
static void add(BitmapValue& res, const T& data) {
res.add(data);
}

static void add(BitmapValue& res, const BitmapValue& data) { res |= data; }

static void merge(BitmapValue& res, const BitmapValue& data) { res |= data; }
};

struct AggregateFunctionBitmapIntersectOp {
static constexpr auto name = "bitmap_intersect";
static void add(BitmapValue& res, const BitmapValue& data) { res &= data; }

static void merge(BitmapValue& res, const BitmapValue& data) { res &= data; }
};

template <typename Op>
struct AggregateFunctionBitmapData {
BitmapValue value;

template <typename T>
void add(const T& data) { Op::add(value, data); }

void merge(const BitmapValue& data) { Op::merge(value, data); }

void write(std::ostream& buf) const { DataTypeBitMap::serializeAsStream(value, buf); }

void read(std::istream& buf) { DataTypeBitMap::deserializeAsStream(value, buf); }

BitmapValue& get() { return value; }
};

template <typename Op>
class AggregateFunctionBitmapOp final
: public IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
AggregateFunctionBitmapOp<Op>> {
public:
using ResultDataType = BitmapValue;
using ColVecType = ColumnBitmap;
using ColVecResult = ColumnBitmap;

String getName() const override { return Op::name; }

AggregateFunctionBitmapOp(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
AggregateFunctionBitmapOp<Op>>(argument_types_, {}) {}

DataTypePtr getReturnType() const override { return std::make_shared<DataTypeBitMap>(); }

void add(AggregateDataPtr place, const IColumn** columns, size_t row_num,
Arena*) const override {
const auto& column = static_cast<const ColVecType&>(*columns[0]);
this->data(place).add(column.getData()[row_num]);
}

void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) const override {
this->data(place).merge(
const_cast<AggregateFunctionBitmapData<Op>&>(this->data(rhs)).get());
}

void serialize(ConstAggregateDataPtr place, std::ostream& buf) const override {
this->data(place).write(buf);
}

void deserialize(AggregateDataPtr place, std::istream& buf, Arena*) const override {
this->data(place).read(buf);
}

void insertResultInto(ConstAggregateDataPtr place, IColumn& to) const override {
auto& column = static_cast<ColVecResult&>(to);
column.getData().push_back(
const_cast<AggregateFunctionBitmapData<Op>&>(this->data(place)).get());
}

const char* getHeaderFilePath() const override { return __FILE__; }
};

template <bool nullable, typename ColVecType>
class AggregateFunctionBitmapCount final : public IAggregateFunctionDataHelper<
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
AggregateFunctionBitmapCount<nullable, ColVecType>> {
public:
// using ColVecType = ColumnBitmap;
using ColVecResult = ColumnVector<Int64>;
using AggFunctionData = AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>;

AggregateFunctionBitmapCount(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
AggregateFunctionBitmapCount<nullable, ColVecType>>(argument_types_, {}) {}

String getName() const override { return "count"; }
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeInt64>(); }

void add(AggregateDataPtr place, const IColumn** columns, size_t row_num,
Arena*) const override {
if constexpr (nullable) {
auto& nullable_column = assert_cast<const ColumnNullable&>(*columns[0]);
if (!nullable_column.isNullAt(row_num)) {
const auto& column = static_cast<const ColVecType&>(nullable_column.getNestedColumn());
this->data(place).add(column.getData()[row_num]);
}
} else {
const auto& column = static_cast<const ColVecType&>(*columns[0]);
this->data(place).add(column.getData()[row_num]);
}
}

void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) const override {
this->data(place).merge(
const_cast<AggFunctionData&>(this->data(rhs)).get());
}

void serialize(ConstAggregateDataPtr place, std::ostream& buf) const override {
this->data(place).write(buf);
}

void deserialize(AggregateDataPtr place, std::istream& buf, Arena*) const override {
this->data(place).read(buf);
}

void insertResultInto(ConstAggregateDataPtr place, IColumn& to) const override {
auto & value_data = const_cast<AggFunctionData&>(this->data(place)).get();
auto& column = static_cast<ColVecResult&>(to);
column.getData().push_back(value_data.cardinality());
}

const char* getHeaderFilePath() const override { return __FILE__; }
};

} // namespace doris::vectorized
9 changes: 9 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ class Block {
void erase(const std::set<size_t>& positions);
/// remove the column with the specified name
void erase(const String& name);
// T was std::set<int>, std::vector<int>, std::list<int>
template <class T>
void erase_not_in(const T& container) {
Container new_data;
for(auto pos: container) {
new_data.emplace_back(std::move(data[pos]));
}
std::swap(data, new_data);
}

/// References are invalidated after calling functions above.

Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/exec/vunion_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
MutableBlock mblock;
for (; _const_expr_list_idx < _const_expr_lists.size(); ++_const_expr_list_idx) {
Block tmp_block;
for (size_t i = 0; i < _const_expr_lists[_const_expr_list_idx].size(); ++i) {
int result_column_num = -1;
_const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block, &result_column_num);
int const_expr_lists_size = _const_expr_lists[_const_expr_list_idx].size();
std::vector<int> result_list(const_expr_lists_size);
for (size_t i = 0; i < const_expr_lists_size; ++i) {
_const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block, &result_list[i]);
}
tmp_block.erase_not_in(result_list);
mblock.merge(tmp_block);
}
block->swap(mblock.to_block());
Expand Down
75 changes: 75 additions & 0 deletions be/src/vec/functions/function_bitmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,23 @@

#include "util/string_parser.hpp"
#include "vec/functions/function_totype.h"
#include "vec/functions/function_const.h"
#include "vec/functions/simple_function_factory.h"
#include "gutil/strings/split.h"

namespace doris::vectorized {

struct BitmapEmpty {
static constexpr auto name = "bitmap_empty";
using ReturnColVec = ColumnBitmap;
static DataTypePtr get_return_type() {
return std::make_shared<DataTypeBitMap>();
}
static auto init_value() {
return BitmapValue{};
}
};

struct NameToBitmap {
static constexpr auto name = "to_bitmap";
};
Expand Down Expand Up @@ -54,6 +68,60 @@ struct ToBitmapImpl {
}
};

struct NameBitmapFromString {
static constexpr auto name = "bitmap_from_string";
};

struct BitmapFromString {
using ReturnType = DataTypeBitMap;
static constexpr auto TYPE_INDEX = TypeIndex::String;
using Type = String;
using ReturnColumnType = ColumnBitmap;
static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
std::vector<BitmapValue>& res) {
auto size = offsets.size();
res.reserve(size);
std::vector<uint64_t> bits;
for (int i = 0; i < size; ++i) {
const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
int str_size = offsets[i] - offsets[i - 1] - 1;
if (SplitStringAndParse({raw_str, str_size},
",", &safe_strtou64, &bits)) {
res.emplace_back(bits);
} else {
res.emplace_back();
}
bits.clear();
}
return Status::OK();
}
};

struct NameBitmapHash {
static constexpr auto name = "bitmap_hash";
};

struct BitmapHash {
using ReturnType = DataTypeBitMap;
static constexpr auto TYPE_INDEX = TypeIndex::String;
using Type = String;
using ReturnColumnType = ColumnBitmap;
static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
std::vector<BitmapValue>& res) {
auto size = offsets.size();
res.reserve(size);
for (int i = 0; i < size; ++i) {
const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
int str_size = offsets[i] - offsets[i - 1] - 1;
uint32_t hash_value =
HashUtil::murmur_hash3_32(raw_str, str_size, HashUtil::MURMUR3_32_SEED);
res.emplace_back();
res.back().add(hash_value);
}
return Status::OK();
}
};

struct NameBitmapCount {
static constexpr auto name = "bitmap_count";
};
Expand Down Expand Up @@ -204,7 +272,11 @@ struct BitmapHasAny {
}
};

using FunctionBitmapEmpty = FunctionConst<BitmapEmpty,false>;
using FunctionToBitmap = FunctionUnaryToType<ToBitmapImpl, NameToBitmap>;
using FunctionBitmapFromString = FunctionUnaryToType<BitmapFromString,NameBitmapFromString>;
using FunctionBitmapHash = FunctionUnaryToType<BitmapHash, NameBitmapHash>;

using FunctionBitmapCount = FunctionUnaryToType<BitmapCount, NameBitmapCount>;

using FunctionBitmapAnd =
Expand All @@ -223,7 +295,10 @@ using FunctionBitmapHasAny =
FunctionBinaryToType<DataTypeBitMap, DataTypeBitMap, BitmapHasAny, NameBitmapHasAny>;

void registerFunctionBitmap(SimpleFunctionFactory& factory) {
factory.registerFunction<FunctionBitmapEmpty>();
factory.registerFunction<FunctionToBitmap>();
factory.registerFunction<FunctionBitmapFromString>();
factory.registerFunction<FunctionBitmapHash>();
factory.registerFunction<FunctionBitmapCount>();
factory.registerFunction<FunctionBitmapAnd>();
factory.registerFunction<FunctionBitmapOr>();
Expand Down
Loading

0 comments on commit 2e27cee

Please sign in to comment.