Skip to content

Commit

Permalink
Fix up stringconcat example and add streaming variance.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nong Li committed Nov 19, 2013
1 parent 3c53d8b commit 3fce648
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ endfunction(COMPILE_TO_IR)

# Build the UDA/UDFs into a shared library.
add_library(udfsample SHARED udf-sample.cc)
add_library(udasample SHARED uda-sample.cc hyperloglog-uda.cc)
add_library(udasample SHARED uda-sample.cc hyperloglog-uda.cc variance-uda.cc)

# Custom targest to cross compile UDA/UDF to ir
if (CLANG_EXECUTABLE)
Expand Down
37 changes: 24 additions & 13 deletions uda-sample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,35 @@ DoubleVal AvgFinalize(FunctionContext* context, const BufferVal& val) {
// This is a sample of implementing the STRING_CONCAT aggregate function.
// Example: select string_concat(string_col, ",") from table
// ---------------------------------------------------------------------------
// Delimiter to use if the separator is NULL.
static const StringVal DEFAULT_STRING_CONCAT_DELIM((uint8_t*)", ", 2);

void StringConcatInit(FunctionContext* context, StringVal* val) {
val->is_null = true;
}

void StringConcatUpdate(FunctionContext* context, const StringVal& arg1,
const StringVal& arg2, StringVal* val) {
if (val->is_null) {
val->is_null = false;
*val = StringVal(context, arg1.len);
memcpy(val->ptr, arg1.ptr, arg1.len);
} else {
int new_len = val->len + arg1.len + arg2.len;
StringVal new_val(context, new_len);
memcpy(new_val.ptr, val->ptr, val->len);
memcpy(new_val.ptr + val->len, arg2.ptr, arg2.len);
memcpy(new_val.ptr + val->len + arg2.len, arg1.ptr, arg1.len);
*val = new_val;
void StringConcatUpdate(FunctionContext* context, const StringVal& str,
const StringVal& separator, StringVal* result) {
if (str.is_null) return;
if (result->is_null) {
// This is the first string, simply set the result to be the value.
uint8_t* copy = context->Allocate(str.len);
memcpy(copy, str.ptr, str.len);
*result = StringVal(copy, str.len);
return;
}

const StringVal* sep_ptr = separator.is_null ? &DEFAULT_STRING_CONCAT_DELIM :
&separator;

// We need to grow the result buffer and then append the new string and
// separator.
int new_size = result->len + sep_ptr->len + str.len;
result->ptr = context->Reallocate(result->ptr, new_size);
memcpy(result->ptr + result->len, sep_ptr->ptr, sep_ptr->len);
result->len += sep_ptr->len;
memcpy(result->ptr + result->len, str.ptr, str.len);
result->len += str.len;
}

void StringConcatMerge(FunctionContext* context, const StringVal& src, StringVal* dst) {
Expand Down
64 changes: 64 additions & 0 deletions variance-uda.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <assert.h>
#include <math.h>
#include <algorithm>
#include <sstream>
#include <iostream>
#include <impala_udf/udf.h>

using namespace std;
using namespace impala_udf;

struct VarianceState {
// Sum of all input values.
double sum;
// Sum of the square of all input values.
double sum_squared;
// The number of input values.
int64_t count;
};

void VarianceInit(FunctionContext* ctx, StringVal* dst) {
dst->is_null = false;
dst->len = sizeof(VarianceState);
dst->ptr = ctx->Allocate(dst->len);
memset(dst->ptr, 0, dst->len);
}

void VarianceUpdate(FunctionContext* ctx, const DoubleVal& src, StringVal* dst) {
if (src.is_null) return;
VarianceState* state = reinterpret_cast<VarianceState*>(dst->ptr);
state->sum += src.val;
state->sum_squared += src.val * src.val;
++state->count;
}

void VarianceMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
VarianceState* src_state = reinterpret_cast<VarianceState*>(src.ptr);
VarianceState* dst_state = reinterpret_cast<VarianceState*>(dst->ptr);
dst_state->sum += src_state->sum;
dst_state->sum_squared += src_state->sum_squared;
dst_state->count += src_state->count;
}

DoubleVal VarianceFinalize(FunctionContext* ctx, const StringVal& src) {
VarianceState* state = reinterpret_cast<VarianceState*>(src.ptr);
if (state->count == 0) return DoubleVal::null();
double mean = state->sum / state->count;
double variance = state->sum_squared / state->count - mean * mean;
return DoubleVal(variance);
}

0 comments on commit 3fce648

Please sign in to comment.