Skip to content

Commit

Permalink
support iterator in vaggnode (apache#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
stdpain authored and HappenLee committed Jul 13, 2021
1 parent 9931535 commit 2eb5489
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
34 changes: 25 additions & 9 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,11 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
using Method = AggregationMethodSerialized<AggregatedDataWithStringKey>;
using AggState = Method::State;

_agg_data.serialized->init_once();

auto& method = *_agg_data.serialized;
auto& data = _agg_data.serialized->data;
auto& iter = _agg_data.serialized->iterator;

block->clear();
auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc());
Expand All @@ -400,14 +403,16 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
value_columns.emplace_back(column_withschema[i].type->createColumn());
}

data.forEachValue([&](const auto& key, auto& mapped) {
// insert keys
while (iter != data.end() && key_columns[0]->size() < state->batch_size()) {
const auto& key = iter->getFirst();
auto& mapped = iter->getSecond();
method.insertKeyIntoColumns(key, key_columns, {});
// insert values
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
_aggregate_evaluators[i]->insert_result_info(mapped + _offsets_of_aggregate_states[i],
value_columns[i].get());
});

++iter;
}

*block = column_withschema;
MutableColumns columns(block->columns());
Expand All @@ -419,7 +424,9 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
}
}
block->setColumns(std::move(columns));
*eos = true;
if (iter == data.end()) {
*eos = true;
}
return Status::OK();
}

Expand All @@ -429,8 +436,11 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
using Method = AggregationMethodSerialized<AggregatedDataWithStringKey>;
using AggState = Method::State;

_agg_data.serialized->init_once();

auto& method = *_agg_data.serialized;
auto& data = _agg_data.serialized->data;
auto& iter = _agg_data.serialized->iterator;

int key_size = _probe_expr_ctxs.size();
int agg_size = _aggregate_evaluators.size();
Expand All @@ -448,7 +458,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
value_columns[i] = value_data_types[i]->createColumn();
}

data.forEachValue([&](const auto& key, auto& mapped) {
while (iter != data.end() && key_columns[0]->size() < state->batch_size()) {
const auto& key = iter->getFirst();
auto& mapped = iter->getSecond();
// insert keys
method.insertKeyIntoColumns(key, key_columns, {});

Expand All @@ -458,9 +470,11 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
_aggregate_evaluators[i]->function()->serialize(
mapped + _offsets_of_aggregate_states[i], buf);
value_columns[i]->insertData(buf.str().c_str(), buf.str().length());
buf.str().clear();
buf.str("");
buf.clear();
}
});
++iter;
}

ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
Expand All @@ -473,7 +487,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
}

*block = Block(columns_with_schema);
*eos = true;
if (iter == data.end()) {
*eos = true;
}
return Status::OK();
}

Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ struct AggregationMethodSerialized {
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
using Iterator = typename Data::iterator;

Data data;
Iterator iterator;
bool inited = false;

AggregationMethodSerialized() = default;

Expand All @@ -61,6 +64,13 @@ struct AggregationMethodSerialized {
auto pos = key.data;
for (auto& column : key_columns) pos = column->deserializeAndInsertFromArena(pos);
}

void init_once() {
if (!inited) {
inited = true;
iterator = data.begin();
}
}
};

using AggregatedDataWithoutKey = AggregateDataPtr;
Expand Down

0 comments on commit 2eb5489

Please sign in to comment.