Skip to content

Commit

Permalink
Added the --log-policy and --log-rate-limit CLI options
Browse files Browse the repository at this point in the history
Added implementations of the drop, ignore and passthrough log policies.
  • Loading branch information
syedriko committed Dec 10, 2019
1 parent b4a7d17 commit fec2795
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 14 deletions.
20 changes: 20 additions & 0 deletions src/conmon.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ static gchar **opt_exit_args = NULL;
static gboolean opt_replace_listen_pid = FALSE;
static char *opt_log_level = NULL;
static char *opt_log_tag = NULL;
static char *opt_log_policy = NULL;
static char *opt_log_rate_limit = NULL;
static GOptionEntry opt_entries[] = {
{"terminal", 't', 0, G_OPTION_ARG_NONE, &opt_terminal, "Terminal", NULL},
{"stdin", 'i', 0, G_OPTION_ARG_NONE, &opt_stdin, "Stdin", NULL},
Expand Down Expand Up @@ -120,6 +122,8 @@ static GOptionEntry opt_entries[] = {
{"syslog", 0, 0, G_OPTION_ARG_NONE, &opt_syslog, "Log to syslog (use with cgroupfs cgroup manager)", NULL},
{"log-level", 0, 0, G_OPTION_ARG_STRING, &opt_log_level, "Print debug logs based on log level", NULL},
{"log-tag", 0, 0, G_OPTION_ARG_STRING, &opt_log_tag, "Additional tag to use for logging", NULL},
{"log-policy", 0, 0, G_OPTION_ARG_STRING, &opt_log_policy, "Log policy", NULL},
{"log-rate-limit", 0, 0, G_OPTION_ARG_STRING, &opt_log_rate_limit, "Log rate limit", NULL},
{NULL, 0, 0, 0, NULL, NULL, NULL}};

#define CGROUP_ROOT "/sys/fs/cgroup"
Expand Down Expand Up @@ -1305,6 +1309,22 @@ int main(int argc, char *argv[])

configure_log_drivers(opt_log_path, opt_log_size_max, opt_cid, opt_name, opt_log_tag);

log_policy_t log_policy;
if (!log_rate_parse_policy(opt_log_policy, &log_policy)) {
nexitf("Invalid log policy %s", opt_log_policy);
}
size_t log_rate_limit;
if (!log_rate_parse_rate_limit(opt_log_rate_limit, &log_rate_limit)) {
nexitf("Invalid log rate limit %s", opt_log_rate_limit);
}
if ((log_policy == PASSTHROUGH || log_policy == IGNORE) && log_rate_limit != 0) {
nexitf("Log rate limit not supported for log policy %s", opt_log_policy);
}
if ((log_policy == BACKPRESSURE || log_policy == DROP) && log_rate_limit == 0) {
nexitf("Log rate limit not provided for log policy %s. Use --log-rate-limit", opt_log_policy);
}
log_rate_init(log_policy, log_rate_limit);

start_pipe_fd = get_pipe_fd_from_env("_OCI_STARTPIPE");
if (start_pipe_fd > 0) {
/* Block for an initial write to the start pipe before
Expand Down
121 changes: 110 additions & 11 deletions src/log_rate.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,115 @@
#include "ctr_logging.h"

#define IO_BUF_SIZE 65536
#define BYTES_PER_PERIOD 1024
#define SECS_PER_PERIOD 1
#define BILLION 1000000000

static const struct timespec secs_per_period = {SECS_PER_PERIOD, 0};
static size_t bytes_written_this_period = 0;
static struct timespec start_of_this_period;
static log_policy_t log_policy;
static size_t bytes_per_period;
static bool dropping = false;
static struct timespec drop_until;

static int64_t add_timespecs_nano(const struct timespec* first, const struct timespec* second);
static struct timespec add_timespecs(const struct timespec* first, const struct timespec* second);
static struct timespec subtract_timespecs(const struct timespec* first, const struct timespec* second);
static int64_t subtract_timespecs_nano(const struct timespec* first, const struct timespec* second);
static void write_io_bufs(stdpipe_t pipe, char* buf, ssize_t count);
static void sleep_for_the_rest_of_this_period();
static void start_new_period();

void log_rate_init() {
bool log_rate_parse_policy(const char* policy_string, log_policy_t* policy) {
if (policy_string == NULL) {
*policy = PASSTHROUGH;
return true;
}
if (!strcmp(policy_string, "backpressure")) {
*policy = BACKPRESSURE;
return true;
} else if (!strcmp(policy_string, "drop")) {
*policy = DROP;
return true;
} else if (!strcmp(policy_string, "ignore")) {
*policy = IGNORE;
return true;
} else if (!strcmp(policy_string, "passthrough")) {
*policy = PASSTHROUGH;
return true;
} else {
return false;
}
}

bool log_rate_parse_rate_limit(const char* rate_limit_string, size_t* rate_limit) {
if (rate_limit_string == NULL) {
rate_limit = 0;
return true;
}
char* endptr;
size_t unscaled_rate_limit = strtol(rate_limit_string, &endptr, 10);
if (errno != 0) {
return false;
}
size_t scale = 1;
switch (*endptr) {
case '\0':
break;
case 'K':
scale = (size_t)1024;
break;
case 'M':
scale = (size_t)1024 * 1024;
break;
case 'G':
scale = (size_t)1024 * 1024 * 1024;
break;
case 'T':
scale = (size_t)1024 * 1024 * 1024 * 1024;
break;
default:
return false;
}
*rate_limit = unscaled_rate_limit * scale;
return true;
}

void log_rate_init(log_policy_t policy, size_t rate_limit) {
log_policy = policy;
bytes_per_period = rate_limit;
start_new_period();
}

bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) {
struct timespec now;
switch (log_policy) {
case BACKPRESSURE:
break;
case DROP:
if (dropping) {
clock_gettime(CLOCK_MONOTONIC, &now);
int64_t diff_nano = subtract_timespecs_nano(&now, &drop_until);
if (diff_nano < 0) {
return true;
} else {
dropping = false;
start_new_period();
}
}
break;
case IGNORE:
return true;
case PASSTHROUGH:
write_to_logs(pipe, buf, num_read);
return true;
}
char* buf_start = buf;
ssize_t bytes_remaining = num_read;
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
int64_t diff_nano = subtract_timespecs_nano(&now, &start_of_this_period);
if (diff_nano < SECS_PER_PERIOD * BILLION) {
ssize_t bytes_we_can_write = BYTES_PER_PERIOD - bytes_written_this_period;
ssize_t bytes_we_can_write = bytes_per_period - bytes_written_this_period;
if (num_read <= bytes_we_can_write) {
write_io_bufs(pipe, buf_start, num_read);
bytes_written_this_period += num_read;
Expand All @@ -53,25 +136,35 @@ bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) {
start_new_period();
}

ssize_t chunks = bytes_remaining / BYTES_PER_PERIOD;
ssize_t remainder = bytes_remaining % BYTES_PER_PERIOD;
ssize_t chunks = bytes_remaining / bytes_per_period;
ssize_t remainder = bytes_remaining % bytes_per_period;

for (ssize_t i = 0; i < chunks; ++i) {
write_io_bufs(pipe, buf_start + i * BYTES_PER_PERIOD, BYTES_PER_PERIOD);
write_io_bufs(pipe, buf_start + i * bytes_per_period, bytes_per_period);
sleep_for_the_rest_of_this_period();
start_new_period();
}
if (remainder != 0) {
if (bytes_written_this_period + remainder > BYTES_PER_PERIOD) {
if (bytes_written_this_period + remainder > bytes_per_period) {
sleep_for_the_rest_of_this_period();
start_new_period();
}
write_io_bufs(pipe, buf_start + (chunks * BYTES_PER_PERIOD), remainder);
write_io_bufs(pipe, buf_start + (chunks * bytes_per_period), remainder);
bytes_written_this_period += remainder;
}
return true;
}

int64_t add_timespecs_nano(const struct timespec* first, const struct timespec* second) {
return (first->tv_sec + second->tv_sec) * BILLION + first->tv_nsec + second->tv_nsec;
}

struct timespec add_timespecs(const struct timespec* first, const struct timespec* second) {
int64_t sum_nanoseconds = add_timespecs_nano(first, second);
struct timespec ret = {sum_nanoseconds / BILLION, sum_nanoseconds % BILLION};
return ret;
}

struct timespec subtract_timespecs(const struct timespec* first, const struct timespec* second) {
int64_t diff_nanoseconds = subtract_timespecs_nano(first, second);
struct timespec ret = {diff_nanoseconds / BILLION, diff_nanoseconds % BILLION};
Expand All @@ -97,8 +190,14 @@ void sleep_for_the_rest_of_this_period() {
clock_gettime(CLOCK_MONOTONIC, &now);
diff = subtract_timespecs(&now, &start_of_this_period);
sleep = subtract_timespecs(&secs_per_period, &diff);
if (sleep.tv_sec < 0 || sleep.tv_nsec < 0)
nexit("negative sleep");
if (sleep.tv_sec < 0 || sleep.tv_nsec < 0) {
return;
}
if (log_policy == DROP) {
dropping = true;
drop_until = add_timespecs(&start_of_this_period, &secs_per_period);
return;
}
do {
ret = nanosleep(&sleep, &sleep);
} while (ret == -1 && errno == EINTR);
Expand Down
15 changes: 12 additions & 3 deletions src/log_rate.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
#pragma once
#if !defined(LOG_RATE_H)
#define LOG_RATE_H
#define LOG_RATE_H

void log_rate_init();
bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read);
typedef enum {
BACKPRESSURE,
DROP,
IGNORE,
PASSTHROUGH
} log_policy_t;

bool log_rate_parse_policy(const char* policy_string, log_policy_t* policy);
bool log_rate_parse_rate_limit(const char* rate_limit_string, size_t* rate_limit);
void log_rate_init(log_policy_t policy, size_t rate_limit);
bool log_rate_write_to_logs(stdpipe_t pipe, char* buf, ssize_t num_read);

#endif /* !defined(LOG_RATE_H) */

0 comments on commit fec2795

Please sign in to comment.