From fec27956d8e7579f8015951c0997ac983f0ab98d Mon Sep 17 00:00:00 2001 From: Sergey Yedrikov Date: Mon, 9 Dec 2019 21:53:39 -0500 Subject: [PATCH] Added the --log-policy and --log-rate-limit CLI options Added implementations of the drop, ignore and passthrough log policies. --- src/conmon.c | 20 ++++++++ src/log_rate.c | 121 ++++++++++++++++++++++++++++++++++++++++++++----- src/log_rate.h | 15 ++++-- 3 files changed, 142 insertions(+), 14 deletions(-) diff --git a/src/conmon.c b/src/conmon.c index d4f383c4..7cce563b 100644 --- a/src/conmon.c +++ b/src/conmon.c @@ -80,6 +80,8 @@ static gchar **opt_exit_args = NULL; static gboolean opt_replace_listen_pid = FALSE; static char *opt_log_level = NULL; static char *opt_log_tag = NULL; +static char *opt_log_policy = NULL; +static char *opt_log_rate_limit = NULL; static GOptionEntry opt_entries[] = { {"terminal", 't', 0, G_OPTION_ARG_NONE, &opt_terminal, "Terminal", NULL}, {"stdin", 'i', 0, G_OPTION_ARG_NONE, &opt_stdin, "Stdin", NULL}, @@ -120,6 +122,8 @@ static GOptionEntry opt_entries[] = { {"syslog", 0, 0, G_OPTION_ARG_NONE, &opt_syslog, "Log to syslog (use with cgroupfs cgroup manager)", NULL}, {"log-level", 0, 0, G_OPTION_ARG_STRING, &opt_log_level, "Print debug logs based on log level", NULL}, {"log-tag", 0, 0, G_OPTION_ARG_STRING, &opt_log_tag, "Additional tag to use for logging", NULL}, + {"log-policy", 0, 0, G_OPTION_ARG_STRING, &opt_log_policy, "Log policy", NULL}, + {"log-rate-limit", 0, 0, G_OPTION_ARG_STRING, &opt_log_rate_limit, "Log rate limit", NULL}, {NULL, 0, 0, 0, NULL, NULL, NULL}}; #define CGROUP_ROOT "/sys/fs/cgroup" @@ -1305,6 +1309,22 @@ int main(int argc, char *argv[]) configure_log_drivers(opt_log_path, opt_log_size_max, opt_cid, opt_name, opt_log_tag); + log_policy_t log_policy; + if (!log_rate_parse_policy(opt_log_policy, &log_policy)) { + nexitf("Invalid log policy %s", opt_log_policy); + } + size_t log_rate_limit; + if (!log_rate_parse_rate_limit(opt_log_rate_limit, &log_rate_limit)) { + nexitf("Invalid log rate limit %s", opt_log_rate_limit); + } + if ((log_policy == PASSTHROUGH || log_policy == IGNORE) && log_rate_limit != 0) { + nexitf("Log rate limit not supported for log policy %s", opt_log_policy); + } + if ((log_policy == BACKPRESSURE || log_policy == DROP) && log_rate_limit == 0) { + nexitf("Log rate limit not provided for log policy %s. Use --log-rate-limit", opt_log_policy); + } + log_rate_init(log_policy, log_rate_limit); + start_pipe_fd = get_pipe_fd_from_env("_OCI_STARTPIPE"); if (start_pipe_fd > 0) { /* Block for an initial write to the start pipe before diff --git a/src/log_rate.c b/src/log_rate.c index b727c956..24ba1300 100644 --- a/src/log_rate.c +++ b/src/log_rate.c @@ -11,32 +11,115 @@ #include "ctr_logging.h" #define IO_BUF_SIZE 65536 -#define BYTES_PER_PERIOD 1024 #define SECS_PER_PERIOD 1 #define BILLION 1000000000 static const struct timespec secs_per_period = {SECS_PER_PERIOD, 0}; static size_t bytes_written_this_period = 0; static struct timespec start_of_this_period; +static log_policy_t log_policy; +static size_t bytes_per_period; +static bool dropping = false; +static struct timespec drop_until; +static int64_t add_timespecs_nano(const struct timespec* first, const struct timespec* second); +static struct timespec add_timespecs(const struct timespec* first, const struct timespec* second); static struct timespec subtract_timespecs(const struct timespec* first, const struct timespec* second); static int64_t subtract_timespecs_nano(const struct timespec* first, const struct timespec* second); static void write_io_bufs(stdpipe_t pipe, char* buf, ssize_t count); static void sleep_for_the_rest_of_this_period(); static void start_new_period(); -void log_rate_init() { +bool log_rate_parse_policy(const char* policy_string, log_policy_t* policy) { + if (policy_string == NULL) { + *policy = PASSTHROUGH; + return true; + } + if (!strcmp(policy_string, "backpressure")) { + *policy = BACKPRESSURE; + return true; + } else if (!strcmp(policy_string, "drop")) { + *policy = DROP; + return true; + } else if (!strcmp(policy_string, "ignore")) { + *policy = IGNORE; + return true; + } else if (!strcmp(policy_string, "passthrough")) { + *policy = PASSTHROUGH; + return true; + } else { + return false; + } +} + +bool log_rate_parse_rate_limit(const char* rate_limit_string, size_t* rate_limit) { + if (rate_limit_string == NULL) { + rate_limit = 0; + return true; + } + char* endptr; + size_t unscaled_rate_limit = strtol(rate_limit_string, &endptr, 10); + if (errno != 0) { + return false; + } + size_t scale = 1; + switch (*endptr) { + case '\0': + break; + case 'K': + scale = (size_t)1024; + break; + case 'M': + scale = (size_t)1024 * 1024; + break; + case 'G': + scale = (size_t)1024 * 1024 * 1024; + break; + case 'T': + scale = (size_t)1024 * 1024 * 1024 * 1024; + break; + default: + return false; + } + *rate_limit = unscaled_rate_limit * scale; + return true; +} + +void log_rate_init(log_policy_t policy, size_t rate_limit) { + log_policy = policy; + bytes_per_period = rate_limit; start_new_period(); } bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) { + struct timespec now; + switch (log_policy) { + case BACKPRESSURE: + break; + case DROP: + if (dropping) { + clock_gettime(CLOCK_MONOTONIC, &now); + int64_t diff_nano = subtract_timespecs_nano(&now, &drop_until); + if (diff_nano < 0) { + return true; + } else { + dropping = false; + start_new_period(); + } + } + break; + case IGNORE: + return true; + case PASSTHROUGH: + write_to_logs(pipe, buf, num_read); + return true; + } char* buf_start = buf; ssize_t bytes_remaining = num_read; - struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); int64_t diff_nano = subtract_timespecs_nano(&now, &start_of_this_period); if (diff_nano < SECS_PER_PERIOD * BILLION) { - ssize_t bytes_we_can_write = BYTES_PER_PERIOD - bytes_written_this_period; + ssize_t bytes_we_can_write = bytes_per_period - bytes_written_this_period; if (num_read <= bytes_we_can_write) { write_io_bufs(pipe, buf_start, num_read); bytes_written_this_period += num_read; @@ -53,25 +136,35 @@ bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) { start_new_period(); } - ssize_t chunks = bytes_remaining / BYTES_PER_PERIOD; - ssize_t remainder = bytes_remaining % BYTES_PER_PERIOD; + ssize_t chunks = bytes_remaining / bytes_per_period; + ssize_t remainder = bytes_remaining % bytes_per_period; for (ssize_t i = 0; i < chunks; ++i) { - write_io_bufs(pipe, buf_start + i * BYTES_PER_PERIOD, BYTES_PER_PERIOD); + write_io_bufs(pipe, buf_start + i * bytes_per_period, bytes_per_period); sleep_for_the_rest_of_this_period(); start_new_period(); } if (remainder != 0) { - if (bytes_written_this_period + remainder > BYTES_PER_PERIOD) { + if (bytes_written_this_period + remainder > bytes_per_period) { sleep_for_the_rest_of_this_period(); start_new_period(); } - write_io_bufs(pipe, buf_start + (chunks * BYTES_PER_PERIOD), remainder); + write_io_bufs(pipe, buf_start + (chunks * bytes_per_period), remainder); bytes_written_this_period += remainder; } return true; } +int64_t add_timespecs_nano(const struct timespec* first, const struct timespec* second) { + return (first->tv_sec + second->tv_sec) * BILLION + first->tv_nsec + second->tv_nsec; +} + +struct timespec add_timespecs(const struct timespec* first, const struct timespec* second) { + int64_t sum_nanoseconds = add_timespecs_nano(first, second); + struct timespec ret = {sum_nanoseconds / BILLION, sum_nanoseconds % BILLION}; + return ret; +} + struct timespec subtract_timespecs(const struct timespec* first, const struct timespec* second) { int64_t diff_nanoseconds = subtract_timespecs_nano(first, second); struct timespec ret = {diff_nanoseconds / BILLION, diff_nanoseconds % BILLION}; @@ -97,8 +190,14 @@ void sleep_for_the_rest_of_this_period() { clock_gettime(CLOCK_MONOTONIC, &now); diff = subtract_timespecs(&now, &start_of_this_period); sleep = subtract_timespecs(&secs_per_period, &diff); - if (sleep.tv_sec < 0 || sleep.tv_nsec < 0) - nexit("negative sleep"); + if (sleep.tv_sec < 0 || sleep.tv_nsec < 0) { + return; + } + if (log_policy == DROP) { + dropping = true; + drop_until = add_timespecs(&start_of_this_period, &secs_per_period); + return; + } do { ret = nanosleep(&sleep, &sleep); } while (ret == -1 && errno == EINTR); diff --git a/src/log_rate.h b/src/log_rate.h index 2bcc5068..57ce3e9a 100644 --- a/src/log_rate.h +++ b/src/log_rate.h @@ -1,8 +1,17 @@ #pragma once #if !defined(LOG_RATE_H) -#define LOG_RATE_H +#define LOG_RATE_H -void log_rate_init(); -bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read); +typedef enum { + BACKPRESSURE, + DROP, + IGNORE, + PASSTHROUGH +} log_policy_t; + +bool log_rate_parse_policy(const char* policy_string, log_policy_t* policy); +bool log_rate_parse_rate_limit(const char* rate_limit_string, size_t* rate_limit); +void log_rate_init(log_policy_t policy, size_t rate_limit); +bool log_rate_write_to_logs(stdpipe_t pipe, char* buf, ssize_t num_read); #endif /* !defined(LOG_RATE_H) */