From f0fbb60cf8fee0dcd95e837c9c61e17eb1043ca9 Mon Sep 17 00:00:00 2001 From: Yaroslav Molochko Date: Tue, 31 Oct 2017 16:48:22 +0200 Subject: [PATCH] Throttling implementation Based on sliding window algorithm --- CMakeLists.txt | 1 + plugins/CMakeLists.txt | 1 + plugins/filter_throttle/CMakeLists.txt | 6 + plugins/filter_throttle/throttle.c | 269 +++++++++++++++++++++++++ plugins/filter_throttle/throttle.h | 49 +++++ plugins/filter_throttle/window.c | 95 +++++++++ plugins/filter_throttle/window.h | 37 ++++ tests/runtime/CMakeLists.txt | 1 + tests/runtime/filter_throttle.c | 70 +++++++ 9 files changed, 529 insertions(+) create mode 100644 plugins/filter_throttle/CMakeLists.txt create mode 100644 plugins/filter_throttle/throttle.c create mode 100644 plugins/filter_throttle/throttle.h create mode 100644 plugins/filter_throttle/window.c create mode 100644 plugins/filter_throttle/window.h create mode 100644 tests/runtime/filter_throttle.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 7eb4dfaf543..d80d41ea93a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,6 +112,7 @@ option(FLB_FILTER_GREP "Enable grep filter" Yes) option(FLB_FILTER_STDOUT "Enable stdout filter" Yes) option(FLB_FILTER_PARSER "Enable parser filter" Yes) option(FLB_FILTER_KUBERNETES "Enable kubernetes filter" Yes) +option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes) option(FLB_FILTER_RECORD_MODIFIER "Enable record_modifier filter" Yes) # Enable all features diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 2409b65ad1a..e4f09193e98 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -142,6 +142,7 @@ if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows") REGISTER_FILTER_PLUGIN("filter_grep") endif() REGISTER_FILTER_PLUGIN("filter_stdout") +REGISTER_FILTER_PLUGIN("filter_throttle") if(FLB_REGEX) REGISTER_FILTER_PLUGIN("filter_kubernetes") diff --git a/plugins/filter_throttle/CMakeLists.txt b/plugins/filter_throttle/CMakeLists.txt new file mode 100644 index 00000000000..adc7b8f4c37 --- /dev/null +++ b/plugins/filter_throttle/CMakeLists.txt @@ -0,0 +1,6 @@ +set(src + window.c + throttle.c + ) + +FLB_PLUGIN(filter_throttle "${src}" "") diff --git a/plugins/filter_throttle/throttle.c b/plugins/filter_throttle/throttle.c new file mode 100644 index 00000000000..ec806b2c43e --- /dev/null +++ b/plugins/filter_throttle/throttle.c @@ -0,0 +1,269 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "stdlib.h" + +#include "throttle.h" +#include "window.h" + + +static bool apply_suffix (double *x, char suffix_char) +{ + int multiplier; + + switch (suffix_char) + { + case 0: + case 's': + multiplier = 1; + break; + case 'm': + multiplier = 60; + break; + case 'h': + multiplier = 60 * 60; + break; + case 'd': + multiplier = 60 * 60 * 24; + break; + default: + return false; + } + + *x *= multiplier; + + return true; +} + + +void *time_ticker(void *args) +{ + struct ticker *t = args; + struct flb_time ftm; + long timestamp; + + while (!t->done) { + flb_time_get(&ftm); + timestamp = flb_time_to_double(&ftm); + window_add(t->ctx->hash, timestamp, 0); + + t->ctx->hash->current_timestamp = timestamp; + + if (t->ctx->print_status) { + flb_info("[filter_throttle] %i: limit is %0.2f per %s with window size of %i, current rate is: %i per interval", timestamp, t->ctx->max_rate, t->ctx->slide_interval, t->ctx->window_size, t->ctx->hash->total / t->ctx->hash->size); + } + sleep(t->seconds); + } +} + +/* Given a msgpack record, do some filter action based on the defined rules */ +static inline int throttle_data(struct flb_filter_throttle_ctx *ctx) +{ + if ( ctx->hash->total / ctx->hash->size >= ctx->max_rate) { + return THROTTLE_RET_DROP; + } + + window_add(ctx->hash, ctx->hash->current_timestamp, 1); + + return THROTTLE_RET_KEEP; +} + +static int configure(struct flb_filter_throttle_ctx *ctx, struct flb_filter_instance *f_ins) +{ + char *str = NULL; + double val = 0; + char *endp; + + /* rate per second */ + str = flb_filter_get_property("rate", f_ins); + + if (str != NULL && (val = strtod(str, &endp)) > 1) { + ctx->max_rate = val; + } else { + ctx->max_rate = THROTTLE_DEFAULT_RATE; + } + + /* windows size */ + str = flb_filter_get_property("window", f_ins); + if (str != NULL && (val = strtoul(str, &endp, 10)) > 1) { + ctx->window_size = val; + } else { + ctx->window_size = THROTTLE_DEFAULT_WINDOW; + } + + /* print informational status */ + str = flb_filter_get_property("print_status", f_ins); + if (str != NULL) { + ctx->print_status = flb_utils_bool(str); + } else { + ctx->print_status = THROTTLE_DEFAULT_STATUS; + } + + /* sliding interval */ + str = flb_filter_get_property("interval", f_ins); + if (str != NULL) { + ctx->slide_interval = str; + } else { + ctx->slide_interval = THROTTLE_DEFAULT_INTERVAL; + } + return 0; +} + +static int parse_duration(char *interval) +{ + double seconds = 0.0; + double s; + char *p; + + s = strtod(interval, &p); + if ( 0 >= s + /* No extra chars after the number and an optional s,m,h,d char. */ + || (*p && *(p+1)) + /* Check any suffix char and update S based on the suffix. */ + || ! apply_suffix (&s, *p)) + { + flb_warn("[filter_throttle] invalid time interval %s falling back to default: 1 second", interval); + } + + seconds += s; + return seconds; +} + +static int cb_throttle_init(struct flb_filter_instance *f_ins, + struct flb_config *config, + void *data) +{ + int ret; + struct flb_filter_throttle_ctx *ctx; + pthread_t tid; + struct ticker *ticker_ctx; + + /* Create context */ + ctx = flb_malloc(sizeof(struct flb_filter_throttle_ctx)); + if (!ctx) { + flb_errno(); + return -1; + } + + /* parse plugin configuration */ + ret = configure(ctx, f_ins); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + ctx->hash = window_create(ctx->window_size); + + /* Set our context */ + flb_filter_set_context(f_ins, ctx); + + ticker_ctx = flb_malloc(sizeof(struct ticker)); + ticker_ctx->ctx = ctx; + ticker_ctx->done = false; + ticker_ctx->seconds = parse_duration(ctx->slide_interval); + pthread_create(&tid, NULL, &time_ticker, ticker_ctx); + return 0; +} + +static int cb_throttle_filter(void *data, size_t bytes, + char *tag, int tag_len, + void **out_buf, size_t *out_size, + struct flb_filter_instance *f_ins, + void *context, + struct flb_config *config) +{ + int ret; + int old_size = 0; + int new_size = 0; + msgpack_unpacked result; + msgpack_object root; + size_t off = 0; + (void) f_ins; + (void) config; + msgpack_sbuffer tmp_sbuf; + msgpack_packer tmp_pck; + + /* Create temporal msgpack buffer */ + msgpack_sbuffer_init(&tmp_sbuf); + msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + + + /* Iterate each item array and apply rules */ + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, data, bytes, &off)) { + root = result.data; + if (root.type != MSGPACK_OBJECT_ARRAY) { + continue; + } + + old_size++; + + ret = throttle_data(context); + if (ret == THROTTLE_RET_KEEP) { + msgpack_pack_object(&tmp_pck, root); + new_size++; + } + else if (ret == THROTTLE_RET_DROP) { + /* Do nothing */ + } + } + msgpack_unpacked_destroy(&result); + + /* we keep everything ? */ + if (old_size == new_size) { + /* Destroy the buffer to avoid more overhead */ + msgpack_sbuffer_destroy(&tmp_sbuf); + return FLB_FILTER_NOTOUCH; + } + + /* link new buffers */ + *out_buf = tmp_sbuf.data; + *out_size = tmp_sbuf.size; + + return FLB_FILTER_MODIFIED; +} + +static int cb_throttle_exit(void *data, struct flb_config *config) +{ + struct flb_filter_throttle_ctx *ctx = data; + + flb_free(ctx); + return 0; +} + +struct flb_filter_plugin filter_throttle_plugin = { + .name = "throttle", + .description = "Throttle messages using sliding window algorithm", + .cb_init = cb_throttle_init, + .cb_filter = cb_throttle_filter, + .cb_exit = cb_throttle_exit, + .flags = 0 +}; diff --git a/plugins/filter_throttle/throttle.h b/plugins/filter_throttle/throttle.h new file mode 100644 index 00000000000..92df8c985af --- /dev/null +++ b/plugins/filter_throttle/throttle.h @@ -0,0 +1,49 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit Throttling + * ========== + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_FILTER_THROTTLE_H +#define FLB_FILTER_THROTTLE_H + +/* actions */ +#define THROTTLE_RET_KEEP 0 +#define THROTTLE_RET_DROP 1 + +/* defaults */ +#define THROTTLE_DEFAULT_RATE 1 +#define THROTTLE_DEFAULT_WINDOW 5 +#define THROTTLE_DEFAULT_INTERVAL "1" +#define THROTTLE_DEFAULT_STATUS FLB_FALSE; + +struct flb_filter_throttle_ctx { + double max_rate; + unsigned int window_size; + char *slide_interval; + int print_status; + + /* internal */ + struct throttle_window *hash; +}; + +struct ticker { + struct flb_filter_throttle_ctx *ctx; + bool done; + double seconds; +}; + +#endif diff --git a/plugins/filter_throttle/window.c b/plugins/filter_throttle/window.c new file mode 100644 index 00000000000..8a43efe4dd2 --- /dev/null +++ b/plugins/filter_throttle/window.c @@ -0,0 +1,95 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "window.h" +#include "throttle.h" + + +struct throttle_window *window_create(size_t size) { + struct throttle_window *tw; + + if (size <= 0) { + return NULL; + } + + tw = flb_malloc(sizeof(struct throttle_window)); + + if (!tw) { + return NULL; + } + + tw->size = size; + tw->total = 0; + tw->current_timestamp = 0; + tw->max_index = -1; + tw->table = flb_calloc(size, sizeof(struct throttle_pane)); + if (!tw->table) { + flb_error("Could not allocate initial window memory"); + return NULL; + } + + return tw; +} + + +int window_get(struct throttle_window *tw, long timestamp) { + int i; + for (i=0; i< tw->size; i++ ) { + if (tw->table[i].timestamp == timestamp) { + return i; + } + } + return NOT_FOUND; +} + + +int window_add(struct throttle_window *tw, long timestamp, int val) { + int i, index, size; + int sum = 0; + tw->current_timestamp = timestamp; + + size = tw->size; + index = window_get(tw, timestamp); + + if (index == NOT_FOUND) { + if (size - 1 == tw->max_index) { + /* window must be shifted */ + tw->max_index = -1; + } + tw->max_index += 1; + tw->table[tw->max_index].timestamp= timestamp; + tw->table[tw->max_index].counter = val; + } else { + tw->table[index].counter += val; + } + + for (i=0; i < tw->size; i++ ) { + sum += tw->table[i].counter; + flb_debug("timestamp: %i, value: %i", tw->table[i].timestamp, tw->table[i].counter); + } + tw->total = sum; + flb_debug("total: %i", tw->total); + return 0; +} diff --git a/plugins/filter_throttle/window.h b/plugins/filter_throttle/window.h new file mode 100644 index 00000000000..922e020c93b --- /dev/null +++ b/plugins/filter_throttle/window.h @@ -0,0 +1,37 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define NOT_FOUND -1 + +struct throttle_pane { + long timestamp; + long counter; +}; + +struct throttle_window { + long current_timestamp; + unsigned size; + unsigned total; + pthread_mutex_t result_mutex; + int max_index; + struct throttle_pane *table; +}; + +struct throttle_window *window_create(size_t size); +int window_add(struct throttle_window *tw, long timestamp, int val); diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 46317b603ff..f7f1307f9d0 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -31,6 +31,7 @@ endif() if(FLB_IN_LIB AND FLB_OUT_LIB) FLB_RT_TEST(FLB_FILTER_STDOUT "filter_stdout.c") FLB_RT_TEST(FLB_FILTER_GREP "filter_grep.c") + FLB_RT_TEST(FLB_FILTER_THROTTLE "filter_throttle.c") FLB_RT_TEST(FLB_FILTER_KUBERNETES "filter_kubernetes.c") endif() diff --git a/tests/runtime/filter_throttle.c b/tests/runtime/filter_throttle.c new file mode 100644 index 00000000000..fbed548e125 --- /dev/null +++ b/tests/runtime/filter_throttle.c @@ -0,0 +1,70 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include "flb_tests_runtime.h" + +/* Test data */ + +/* Utility functions */ +pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* Test functions */ +void flb_test_filter_throttle(void); + +/* Test list */ +TEST_LIST = { + {"throttle", flb_test_filter_throttle }, + {NULL, NULL} +}; + + +void flb_test_filter_throttle(void) +{ + int i; + int ret; + int bytes; + char p[100]; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + + ctx = flb_create(); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + + filter_ffd = flb_filter(ctx, (char *) "throttle", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, "match", "*", NULL); + TEST_CHECK(ret == 0); + ret = flb_filter_set(ctx, filter_ffd, "rate", "9", NULL); + TEST_CHECK(ret == 0); + ret = flb_filter_set(ctx, filter_ffd, "window", "3", NULL); + TEST_CHECK(ret == 0); + ret = flb_filter_set(ctx, filter_ffd, "interval", "3s", NULL); + TEST_CHECK(ret == 0); + ret = flb_filter_set(ctx, filter_ffd, "print_status", "true", NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Send log messages all should go through */ + for (i = 0; i < 256; i++) { + memset(p, '\0', sizeof(p)); + snprintf(p, sizeof(p), "[%d, {\"val\": \"%d\",\"END_KEY\": \"JSON_END\"}]", i, i); + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + sleep(1); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); +}