Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: logging behavior policies and rate-limited logging #92

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ containerized: bin
static:
$(MAKE) git-vars bin/conmon PKG_CONFIG='$(PKG_CONFIG) --static' CFLAGS='-static' LDFLAGS='$(LDFLAGS) -s -w -static' LIBS='$(LIBS)'

bin/conmon: src/conmon.o src/cmsg.o src/ctr_logging.o src/utils.o | bin
bin/conmon: src/conmon.o src/cmsg.o src/ctr_logging.o src/log_rate.o src/utils.o | bin
$(CC) $(LDFLAGS) -o $@ $^ $(LIBS)

%.o: %.c
Expand All @@ -78,7 +78,9 @@ src/utils.o: src/utils.c src/utils.h

src/ctr_logging.o: src/ctr_logging.c src/ctr_logging.h src/utils.h

src/conmon.o: src/conmon.c src/cmsg.h src/config.h src/utils.h src/ctr_logging.h
src/log_rate.o: src/log_rate.c src/log_rate.h src/ctr_logging.h src/utils.h

src/conmon.o: src/conmon.c src/cmsg.h src/config.h src/utils.h src/ctr_logging.h src/log_rate.h

bin:
mkdir -p bin
Expand Down
3 changes: 2 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ executable('conmon',
'src/cmsg.h',
'src/ctr_logging.c',
'src/ctr_logging.h',
'src/log_rate.c',
'src/log_rate.h',
'src/utils.c',
'src/utils.h'],
dependencies : [glib],
install : true,
install_dir : join_paths(get_option('libexecdir'), 'podman'),
)

23 changes: 22 additions & 1 deletion src/conmon.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "cmsg.h"
#include "config.h"
#include "log_rate.h"

#ifndef CGROUP2_SUPER_MAGIC
#define CGROUP2_SUPER_MAGIC 0x63677270
Expand Down Expand Up @@ -79,6 +80,8 @@ static gchar **opt_exit_args = NULL;
static gboolean opt_replace_listen_pid = FALSE;
static char *opt_log_level = NULL;
static char *opt_log_tag = NULL;
static char *opt_log_policy = NULL;
static char *opt_log_rate_limit = NULL;
static GOptionEntry opt_entries[] = {
{"terminal", 't', 0, G_OPTION_ARG_NONE, &opt_terminal, "Terminal", NULL},
{"stdin", 'i', 0, G_OPTION_ARG_NONE, &opt_stdin, "Stdin", NULL},
Expand Down Expand Up @@ -119,6 +122,8 @@ static GOptionEntry opt_entries[] = {
{"syslog", 0, 0, G_OPTION_ARG_NONE, &opt_syslog, "Log to syslog (use with cgroupfs cgroup manager)", NULL},
{"log-level", 0, 0, G_OPTION_ARG_STRING, &opt_log_level, "Print debug logs based on log level", NULL},
{"log-tag", 0, 0, G_OPTION_ARG_STRING, &opt_log_tag, "Additional tag to use for logging", NULL},
{"log-policy", 0, 0, G_OPTION_ARG_STRING, &opt_log_policy, "Log policy", NULL},
{"log-rate-limit", 0, 0, G_OPTION_ARG_STRING, &opt_log_rate_limit, "Log rate limit", NULL},
{NULL, 0, 0, 0, NULL, NULL, NULL}};

#define CGROUP_ROOT "/sys/fs/cgroup"
Expand Down Expand Up @@ -359,7 +364,7 @@ static bool read_stdio(int fd, stdpipe_t pipe, gboolean *eof)
// Always null terminate the buffer, just in case.
buf[num_read] = '\0';

bool written = write_to_logs(pipe, buf, num_read);
bool written = log_rate_write_to_logs(pipe, buf, num_read);
if (!written)
return written;

Expand Down Expand Up @@ -1304,6 +1309,22 @@ int main(int argc, char *argv[])

configure_log_drivers(opt_log_path, opt_log_size_max, opt_cid, opt_name, opt_log_tag);

log_policy_t log_policy;
if (!log_rate_parse_policy(opt_log_policy, &log_policy)) {
nexitf("Invalid log policy %s", opt_log_policy);
}
size_t log_rate_limit;
if (!log_rate_parse_rate_limit(opt_log_rate_limit, &log_rate_limit)) {
nexitf("Invalid log rate limit %s", opt_log_rate_limit);
}
if ((log_policy == PASSTHROUGH || log_policy == IGNORE) && log_rate_limit != 0) {
nexitf("Log rate limit not supported for log policy %s", opt_log_policy);
}
if ((log_policy == BACKPRESSURE || log_policy == DROP) && log_rate_limit == 0) {
nexitf("Log rate limit not provided for log policy %s. Use --log-rate-limit", opt_log_policy);
}
log_rate_init(log_policy, log_rate_limit);

start_pipe_fd = get_pipe_fd_from_env("_OCI_STARTPIPE");
if (start_pipe_fd > 0) {
/* Block for an initial write to the start pipe before
Expand Down
210 changes: 210 additions & 0 deletions src/log_rate.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <stdlib.h>
#include <errno.h>

#include "utils.h"
#include "log_rate.h"
#include "ctr_logging.h"

#define IO_BUF_SIZE 65536
#define SECS_PER_PERIOD 1
#define BILLION 1000000000

static const struct timespec secs_per_period = {SECS_PER_PERIOD, 0};
static size_t bytes_written_this_period = 0;
static struct timespec start_of_this_period;
static log_policy_t log_policy;
static size_t bytes_per_period;
static bool dropping = false;
static struct timespec drop_until;

static int64_t add_timespecs_nano(const struct timespec* first, const struct timespec* second);
static struct timespec add_timespecs(const struct timespec* first, const struct timespec* second);
static struct timespec subtract_timespecs(const struct timespec* first, const struct timespec* second);
static int64_t subtract_timespecs_nano(const struct timespec* first, const struct timespec* second);
static void write_io_bufs(stdpipe_t pipe, char* buf, ssize_t count);
static void sleep_for_the_rest_of_this_period();
static void start_new_period();

bool log_rate_parse_policy(const char* policy_string, log_policy_t* policy) {
if (policy_string == NULL) {
*policy = PASSTHROUGH;
return true;
}
if (!strcmp(policy_string, "backpressure")) {
*policy = BACKPRESSURE;
return true;
} else if (!strcmp(policy_string, "drop")) {
*policy = DROP;
return true;
} else if (!strcmp(policy_string, "ignore")) {
*policy = IGNORE;
return true;
} else if (!strcmp(policy_string, "passthrough")) {
*policy = PASSTHROUGH;
return true;
} else {
return false;
}
}

bool log_rate_parse_rate_limit(const char* rate_limit_string, size_t* rate_limit) {
if (rate_limit_string == NULL) {
rate_limit = 0;
return true;
}
char* endptr;
errno = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to set errno to zero here? We don't before nanosleep below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shortcut, also used in

errno = 0;
. I'll fix it according to strtol(3).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and with nanosleep() we don't because we only look at errno if nanosleep() returns an error.

long unscaled_rate_limit = strtol(rate_limit_string, &endptr, 10);
if (errno != 0 || unscaled_rate_limit <= 0) {
return false;
}
size_t scale = 1;
switch (*endptr) {
case '\0':
break;
case 'K':
scale = (size_t)1024;
break;
case 'M':
scale = (size_t)1024 * 1024;
break;
case 'G':
scale = (size_t)1024 * 1024 * 1024;
break;
case 'T':

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really want to allow somebody to specify a "tera-byte" level logging rate? Eek.

No need for passthrough default if we just make the default rate-limit essentially infinite.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, there would be a sysconfig value for the maximum allowable logging rate for a individual container. That way, nobody could cause problems for the SRE.

Is that possible to do?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the rate limit suffixes, I was going by what pv(1) does. We can call it future-proofing. Or I can get rid of T.

As far as a config file goes, conmon receives all it's config on the command line. I think this is something that needs to be looked at at the podman and cri-o level, which I don't have a good grasp of yet.

scale = (size_t)1024 * 1024 * 1024 * 1024;
break;
default:
return false;
}
*rate_limit = unscaled_rate_limit * scale;
return true;
}

void log_rate_init(log_policy_t policy, size_t rate_limit) {
log_policy = policy;
bytes_per_period = rate_limit;
start_new_period();
}

bool log_rate_write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it'd be worth a comment here stating that this function's call signature must be kept the same as write_to_logs()?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it is not clear this is the interface we want to rate limit. The SRE needs to be able to control the final behavior of conmon, after it has added all the metadata to the logs. So it would seem we need to add the rate limit to the file writes to disk, or the journald writes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's true. Stay tuned...

struct timespec now;
switch (log_policy) {
case BACKPRESSURE:
break;
case DROP:
if (dropping) {
clock_gettime(CLOCK_MONOTONIC, &now);
int64_t diff_nano = subtract_timespecs_nano(&now, &drop_until);
if (diff_nano < 0) {
return true;
} else {
dropping = false;
start_new_period();
}
}
break;
case IGNORE:
return true;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if there were metrics collected for how many bytes were ignored.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can collect the metrics, but what should we do with them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be returned up the stack they'd have to be collected and written to a file here, and read by the caller. Alternatively a pipe fd could be sent from the parent similar to how sync_fd is in main. Seems like a reasonable extension but a bit out of scope here

case PASSTHROUGH:
write_to_logs(pipe, buf, num_read);
return true;
}
char* buf_start = buf;
ssize_t bytes_remaining = num_read;
clock_gettime(CLOCK_MONOTONIC, &now);
int64_t diff_nano = subtract_timespecs_nano(&now, &start_of_this_period);
if (diff_nano < SECS_PER_PERIOD * BILLION) {
ssize_t bytes_we_can_write = bytes_per_period - bytes_written_this_period;
if (num_read <= bytes_we_can_write) {
write_io_bufs(pipe, buf_start, num_read);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assume that after a write we are still in the same period?

It seems that we have to re-evaluate what period we are in following each write_to_logs() call.

bytes_written_this_period += num_read;
return true;
} else {
write_io_bufs(pipe, buf_start, bytes_we_can_write);
bytes_written_this_period += bytes_we_can_write;
buf_start += bytes_we_can_write;
bytes_remaining = num_read - bytes_we_can_write;
sleep_for_the_rest_of_this_period();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't block the thread execution here.

We should either propagate the condition back (with the remaining time) and set a glibc timer, or we need to use a much shorter period so that it is acceptable, in the worst case, to hang for that duration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code implements the backpressure policy, I'm still adding the others.
I'm not sure I follow your concern or how backpressure is going to be maintained if we return from here without blocking. If we do, aren't we going to quickly enough be reading from the container's pipe again?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first reaction to the sleep was also "yeuch" but given that conmon is written as a sync read()/write() loop there's no other way to idle but to sleep.

If conmon was written as a poll/epoll loop then you could set a timer and flip the read fd out of the loop till it fired. That would be neater and more efficient, but a bigger overhaul of conmon.

start_new_period();
}
} else {
start_new_period();
}

ssize_t chunks = bytes_remaining / bytes_per_period;
ssize_t remainder = bytes_remaining % bytes_per_period;

for (ssize_t i = 0; i < chunks; ++i) {
write_io_bufs(pipe, buf_start + i * bytes_per_period, bytes_per_period);
sleep_for_the_rest_of_this_period();
start_new_period();
}
if (remainder != 0) {
if (bytes_written_this_period + remainder > bytes_per_period) {
sleep_for_the_rest_of_this_period();
start_new_period();
}
write_io_bufs(pipe, buf_start + (chunks * bytes_per_period), remainder);
bytes_written_this_period += remainder;
}
return true;
}

int64_t add_timespecs_nano(const struct timespec* first, const struct timespec* second) {
return (first->tv_sec + second->tv_sec) * BILLION + first->tv_nsec + second->tv_nsec;
}

struct timespec add_timespecs(const struct timespec* first, const struct timespec* second) {
int64_t sum_nanoseconds = add_timespecs_nano(first, second);
struct timespec ret = {sum_nanoseconds / BILLION, sum_nanoseconds % BILLION};
return ret;
}

struct timespec subtract_timespecs(const struct timespec* first, const struct timespec* second) {
int64_t diff_nanoseconds = subtract_timespecs_nano(first, second);
struct timespec ret = {diff_nanoseconds / BILLION, diff_nanoseconds % BILLION};
return ret;
}

int64_t subtract_timespecs_nano(const struct timespec* first, const struct timespec* second) {
return (first->tv_sec - second->tv_sec) * BILLION + first->tv_nsec - second->tv_nsec;
}

void write_io_bufs(stdpipe_t pipe, char* buf, ssize_t count) {
ssize_t chunks = count / IO_BUF_SIZE;
ssize_t remainder = count % IO_BUF_SIZE;
for (int i = 0; i < chunks; ++i) {
write_to_logs(pipe, buf + (i * IO_BUF_SIZE), IO_BUF_SIZE);
}
write_to_logs(pipe, buf + count - remainder, remainder);
}

void sleep_for_the_rest_of_this_period() {
int ret;
struct timespec now, sleep, diff;
clock_gettime(CLOCK_MONOTONIC, &now);
diff = subtract_timespecs(&now, &start_of_this_period);
sleep = subtract_timespecs(&secs_per_period, &diff);
if (sleep.tv_sec < 0 || sleep.tv_nsec < 0) {
return;
}
if (log_policy == DROP) {
dropping = true;
drop_until = add_timespecs(&start_of_this_period, &secs_per_period);
return;
}
do {
ret = nanosleep(&sleep, &sleep);
} while (ret == -1 && errno == EINTR);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you want a loop that doesn't check the time change. Shouldn't we be checking to see if the absolute time has reached or passed the calculated absolute time for the sleep?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of nanosleep(2) is that it sleeps for relative time intervals and when interrupted by a signal, it will store the remaining time to sleep in the timespec pointed to by the second argument, which we will use on the next pass through the loop as the relative time to sleep.

And I think I'm not doing the right thing for the negative sleep - if it's negative, it means we're in a new period and we should just return without sleeping. We start a new period after every sleep anyway.

}

void start_new_period() {
bytes_written_this_period = 0;
clock_gettime(CLOCK_MONOTONIC, &start_of_this_period);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to cause the periods to drift? Instead, could we just use math to calculate the periods from some initial startup time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you illustrate with a bit of code how you propose to calculate the periods?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cautionary note: clock_gettime() is relatively expensive in tight inner loops. I'm not saying this code is calling it needlessly, but keep in mind as you fiddle with the timing calculations. A common strategy is to cache the time for re-use and only refresh it after blocking IO calls.

Copy link
Author

@syedriko syedriko Dec 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is pretty close to that idea. It finds out the time the first thing on entering and then the usual sequence is
write()
clock_gettime() // how long to sleep for?
nanosleep() // sleep
clock_gettime() // when does the new period start
Not sure if it's worth/good idea eliminating the last clock_gettime() and just adding the sleep time to the previous timestamp. Seems like a topic for a profiling session once we get to that point.

}
17 changes: 17 additions & 0 deletions src/log_rate.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once
#if !defined(LOG_RATE_H)
#define LOG_RATE_H

typedef enum {
BACKPRESSURE,
DROP,
IGNORE,
PASSTHROUGH
} log_policy_t;

bool log_rate_parse_policy(const char* policy_string, log_policy_t* policy);
bool log_rate_parse_rate_limit(const char* rate_limit_string, size_t* rate_limit);
void log_rate_init(log_policy_t policy, size_t rate_limit);
bool log_rate_write_to_logs(stdpipe_t pipe, char* buf, ssize_t num_read);

#endif /* !defined(LOG_RATE_H) */