From 3fce648fddb6c41e09572e0de89d3a83b394d47a Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 18 Nov 2013 19:07:09 -0800 Subject: [PATCH] Fix up stringconcat example and add streaming variance. --- CMakeLists.txt | 2 +- uda-sample.cc | 37 ++++++++++++++++++---------- variance-uda.cc | 64 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 14 deletions(-) create mode 100644 variance-uda.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b2bb9b..3d67512 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ endfunction(COMPILE_TO_IR) # Build the UDA/UDFs into a shared library. add_library(udfsample SHARED udf-sample.cc) -add_library(udasample SHARED uda-sample.cc hyperloglog-uda.cc) +add_library(udasample SHARED uda-sample.cc hyperloglog-uda.cc variance-uda.cc) # Custom targest to cross compile UDA/UDF to ir if (CLANG_EXECUTABLE) diff --git a/uda-sample.cc b/uda-sample.cc index 52a5792..c20dd20 100644 --- a/uda-sample.cc +++ b/uda-sample.cc @@ -76,24 +76,35 @@ DoubleVal AvgFinalize(FunctionContext* context, const BufferVal& val) { // This is a sample of implementing the STRING_CONCAT aggregate function. // Example: select string_concat(string_col, ",") from table // --------------------------------------------------------------------------- +// Delimiter to use if the separator is NULL. +static const StringVal DEFAULT_STRING_CONCAT_DELIM((uint8_t*)", ", 2); + void StringConcatInit(FunctionContext* context, StringVal* val) { val->is_null = true; } -void StringConcatUpdate(FunctionContext* context, const StringVal& arg1, - const StringVal& arg2, StringVal* val) { - if (val->is_null) { - val->is_null = false; - *val = StringVal(context, arg1.len); - memcpy(val->ptr, arg1.ptr, arg1.len); - } else { - int new_len = val->len + arg1.len + arg2.len; - StringVal new_val(context, new_len); - memcpy(new_val.ptr, val->ptr, val->len); - memcpy(new_val.ptr + val->len, arg2.ptr, arg2.len); - memcpy(new_val.ptr + val->len + arg2.len, arg1.ptr, arg1.len); - *val = new_val; +void StringConcatUpdate(FunctionContext* context, const StringVal& str, + const StringVal& separator, StringVal* result) { + if (str.is_null) return; + if (result->is_null) { + // This is the first string, simply set the result to be the value. + uint8_t* copy = context->Allocate(str.len); + memcpy(copy, str.ptr, str.len); + *result = StringVal(copy, str.len); + return; } + + const StringVal* sep_ptr = separator.is_null ? &DEFAULT_STRING_CONCAT_DELIM : + &separator; + + // We need to grow the result buffer and then append the new string and + // separator. + int new_size = result->len + sep_ptr->len + str.len; + result->ptr = context->Reallocate(result->ptr, new_size); + memcpy(result->ptr + result->len, sep_ptr->ptr, sep_ptr->len); + result->len += sep_ptr->len; + memcpy(result->ptr + result->len, str.ptr, str.len); + result->len += str.len; } void StringConcatMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { diff --git a/variance-uda.cc b/variance-uda.cc new file mode 100644 index 0000000..aa42354 --- /dev/null +++ b/variance-uda.cc @@ -0,0 +1,64 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace impala_udf; + +struct VarianceState { + // Sum of all input values. + double sum; + // Sum of the square of all input values. + double sum_squared; + // The number of input values. + int64_t count; +}; + +void VarianceInit(FunctionContext* ctx, StringVal* dst) { + dst->is_null = false; + dst->len = sizeof(VarianceState); + dst->ptr = ctx->Allocate(dst->len); + memset(dst->ptr, 0, dst->len); +} + +void VarianceUpdate(FunctionContext* ctx, const DoubleVal& src, StringVal* dst) { + if (src.is_null) return; + VarianceState* state = reinterpret_cast(dst->ptr); + state->sum += src.val; + state->sum_squared += src.val * src.val; + ++state->count; +} + +void VarianceMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) { + VarianceState* src_state = reinterpret_cast(src.ptr); + VarianceState* dst_state = reinterpret_cast(dst->ptr); + dst_state->sum += src_state->sum; + dst_state->sum_squared += src_state->sum_squared; + dst_state->count += src_state->count; +} + +DoubleVal VarianceFinalize(FunctionContext* ctx, const StringVal& src) { + VarianceState* state = reinterpret_cast(src.ptr); + if (state->count == 0) return DoubleVal::null(); + double mean = state->sum / state->count; + double variance = state->sum_squared / state->count - mean * mean; + return DoubleVal(variance); +} +