Skip to content

Commit

Permalink
Fix export/import checkpoints to create a new tracee control socket f…
Browse files Browse the repository at this point in the history
…or each child
  • Loading branch information
rocallahan committed Nov 30, 2021
1 parent 30b72a8 commit b8566d2
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 47 deletions.
5 changes: 2 additions & 3 deletions src/AutoRemoteSyscalls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,18 +562,17 @@ int AutoRemoteSyscalls::send_fd(const ScopedFd &our_fd) {
RR_ARCH_FUNCTION(send_fd_arch, arch(), our_fd);
}

void AutoRemoteSyscalls::infallible_send_fd_dup(const ScopedFd& our_fd, int dup_to) {
void AutoRemoteSyscalls::infallible_send_fd_dup(const ScopedFd& our_fd, int dup_to, int dup3_flags) {
int remote_fd = send_fd(our_fd);
ASSERT(task(), remote_fd >= 0);
if (remote_fd != dup_to) {
long ret = infallible_syscall(syscall_number_for_dup3(arch()), remote_fd,
dup_to, O_CLOEXEC);
dup_to, dup3_flags);
ASSERT(task(), ret == dup_to);
infallible_syscall(syscall_number_for_close(arch()), remote_fd);
}
}


remote_ptr<void> AutoRemoteSyscalls::infallible_mmap_syscall(
remote_ptr<void> addr, size_t length, int prot, int flags, int child_fd,
uint64_t offset_pages) {
Expand Down
2 changes: 1 addition & 1 deletion src/AutoRemoteSyscalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class AutoRemoteSyscalls {
* `send_fd` the given file descriptor, making sure that it ends up as fd
* `dup_to`, (dup'ing it there and closing the original if necessary)
*/
void infallible_send_fd_dup(const ScopedFd& our_fd, int dup_to);
void infallible_send_fd_dup(const ScopedFd& our_fd, int dup_to, int dup3_flags);

/**
* Remotely invoke in |t| the specified syscall with the given
Expand Down
69 changes: 64 additions & 5 deletions src/ExportImportCheckpoints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#include "ExportImportCheckpoints.h"

#include <fcntl.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>

Expand All @@ -20,6 +22,35 @@ using namespace std;

namespace rr {

bool parse_export_checkpoints(const string& arg, FrameTime& export_checkpoints_event,
int& export_checkpoints_count, string& export_checkpoints_socket) {
size_t first_comma = arg.find(',');
if (first_comma == string::npos) {
fprintf(stderr, "Missing <NUM> parameter for --export-checkpoints");
return false;
}
size_t second_comma = arg.find(',', first_comma + 1);
if (second_comma == string::npos) {
fprintf(stderr, "Missing <FILE> parameter for --export-checkpoints");
return false;
}
char* endptr;
string event_str = arg.substr(0, first_comma);
export_checkpoints_event = strtoul(event_str.c_str(), &endptr, 0);
if (*endptr) {
fprintf(stderr, "Invalid <EVENT> for --export-checkpoints: %s\n", event_str.c_str());
return false;
}
string num_str = arg.substr(first_comma + 1, second_comma - (first_comma + 1));
export_checkpoints_count = strtoul(num_str.c_str(), &endptr, 0);
if (*endptr) {
fprintf(stderr, "Invalid <NUM> for --export-checkpoints: %s\n", num_str.c_str());
return false;
}
export_checkpoints_socket = arg.substr(second_comma + 1);
return true;
}

ScopedFd bind_export_checkpoints_socket(int count, const string& socket_file_name) {
unlink(socket_file_name.c_str());

Expand Down Expand Up @@ -115,7 +146,7 @@ static void set_title(const vector<string>& args) {
CommandForCheckpoint export_checkpoints(ReplaySession::shr_ptr session, int count, ScopedFd& sock,
const std::string&) {
if (!session->can_clone()) {
FATAL() << "Can't create checkpoints at this time, aborting";
FATAL() << "Can't create checkpoints at this time, aborting: " << session->current_frame_time();
}

CommandForCheckpoint command_for_checkpoint;
Expand All @@ -127,6 +158,17 @@ CommandForCheckpoint export_checkpoints(ReplaySession::shr_ptr session, int coun
FATAL() << "Failed to accept client connection";
}

ssize_t priority;
recv_all(client, &priority, sizeof(priority));
ssize_t ret = setpriority(PRIO_PROCESS, 0, priority);
if (ret < 0) {
if (errno == EACCES) {
LOG(warn) << "Failed to increase priority";
} else {
FATAL() << "Failed setpriority";
}
}

size_t fds_size;
recv_all(client, &fds_size, sizeof(fds_size));

Expand All @@ -142,7 +184,7 @@ CommandForCheckpoint export_checkpoints(ReplaySession::shr_ptr session, int coun
cbuf.resize(CMSG_SPACE(data_len));
msg.msg_control = cbuf.data();
msg.msg_controllen = cbuf.size();
ssize_t ret = recvmsg(client, &msg, MSG_CMSG_CLOEXEC);
ret = recvmsg(client, &msg, MSG_CMSG_CLOEXEC);
if (ret != 1) {
FATAL() << "Failed to read fds";
}
Expand Down Expand Up @@ -177,6 +219,16 @@ CommandForCheckpoint export_checkpoints(ReplaySession::shr_ptr session, int coun

checkpoint->prepare_to_detach_tasks();

// We need to create a new control socket for the child, we can't use the shared control socket
// safely in multiple processes.
int sockets[2];
ret = socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, sockets);
if (ret < 0) {
FATAL() << "socketpair failed";
}
ScopedFd new_tracee_socket(sockets[0]);
ScopedFd new_tracee_socket_receiver(sockets[1]);

pid_t child = fork();
if (!child) {
set_title(args);
Expand All @@ -189,12 +241,13 @@ CommandForCheckpoint export_checkpoints(ReplaySession::shr_ptr session, int coun
if (ret != 1) {
FATAL() << "Failed to read parent notification";
}
command_for_checkpoint.session->reattach_tasks();
command_for_checkpoint.session->reattach_tasks(move(new_tracee_socket),
move(new_tracee_socket_receiver));
return command_for_checkpoint;
}
children.push_back(child);

checkpoint->detach_tasks(child);
checkpoint->detach_tasks(child, new_tracee_socket_receiver);
ret = write(parent_to_child_write, "x", 1);
if (ret != 1) {
FATAL() << "Failed to write parent notification";
Expand Down Expand Up @@ -248,11 +301,17 @@ int invoke_checkpoint_command(const string& socket_file_name,
break;
}

ssize_t ret = getpriority(PRIO_PROCESS, 0);
if (ret < 0) {
FATAL() << "Failed getpriority";
}
send_all(sock, &ret, sizeof(ret));

size_t total_fds = 4 + fds.size();
send_all(sock, &total_fds, sizeof(total_fds));

int exit_notification_pipe_fds[2];
ssize_t ret = pipe(exit_notification_pipe_fds);
ret = pipe(exit_notification_pipe_fds);
if (ret < 0) {
FATAL() << "Failed pipe";
}
Expand Down
3 changes: 3 additions & 0 deletions src/ExportImportCheckpoints.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

namespace rr {

bool parse_export_checkpoints(const std::string& arg, FrameTime& export_checkpoints_event,
int& export_checkpoints_count, std::string& export_checkpoints_socket);

/* Bind the socket so clients can try to connect to it and block. */
ScopedFd bind_export_checkpoints_socket(int count, const std::string& socket_file_name);

Expand Down
13 changes: 8 additions & 5 deletions src/ReplaySession.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ ReplaySession::shr_ptr ReplaySession::clone() {
* Return true if it's possible/meaningful to make a checkpoint at the
* |frame| that |t| will replay.
*/
static bool can_checkpoint_at(const TraceFrame& frame) {
const Event& ev = frame.event();
static bool can_checkpoint_at(const Event& ev) {
if (ev.has_ticks_slop()) {
return false;
}
Expand All @@ -274,7 +273,7 @@ bool ReplaySession::can_clone() {
finish_initializing();

ReplayTask* t = current_task();
return t && done_initial_exec() && can_checkpoint_at(current_trace_frame());
return t && done_initial_exec() && can_checkpoint_at(current_trace_frame().event());
}

DiversionSession::shr_ptr ReplaySession::clone_diversion() {
Expand Down Expand Up @@ -1921,15 +1920,17 @@ void ReplaySession::forget_tasks() {
}
}

void ReplaySession::detach_tasks(pid_t new_ptracer) {
void ReplaySession::detach_tasks(pid_t new_ptracer, ScopedFd& new_tracee_socket_receiver) {
// First tell Yama to let new_ptracer ptrace the tracees.
// Do this before sending SIGSTOP to any tracees because SIGSTOP
// might stop threads before we do their PR_SET_PTRACER.
// Also push the new control socket into all tracees.
for (auto& entry : task_map) {
Task* t = entry.second;
AutoRemoteSyscalls remote(t);
long ret = remote.syscall(syscall_number_for_prctl(t->arch()), PR_SET_PTRACER, new_ptracer);
ASSERT(t, ret >= 0 || ret == -EINVAL) << "Failed PR_SET_PTRACER";
remote.infallible_send_fd_dup(new_tracee_socket_receiver, tracee_socket_fd_number, 0);
}
// Now PTRACE_DETACH and stop them all with SIGSTOP.
for (auto& entry : task_map) {
Expand All @@ -1940,7 +1941,9 @@ void ReplaySession::detach_tasks(pid_t new_ptracer) {
forget_tasks();
}

void ReplaySession::reattach_tasks() {
void ReplaySession::reattach_tasks(ScopedFd new_tracee_socket, ScopedFd new_tracee_socket_receiver) {
tracee_socket = make_shared<ScopedFd>(move(new_tracee_socket));
tracee_socket_receiver = make_shared<ScopedFd>(move(new_tracee_socket_receiver));
// Seize all tasks.
for (auto& entry : task_map) {
Task* t = entry.second;
Expand Down
7 changes: 5 additions & 2 deletions src/ReplaySession.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,17 @@ class ReplaySession : public Session {
* The shared resources associated with this ReplaySession are being transferred to
* the child process `new_ptracer`. Prepare them for transfer (e.g. ptrace-detach the
* tracees) and prepare them to be traced by `new_ptracer`, and forget about them.
* `new_sock_fd` is the new control fd pushed into all tasks.
*/
void detach_tasks(pid_t new_ptracer);
void detach_tasks(pid_t new_ptracer, ScopedFd& new_tracee_socket_receiver);
/**
* The shared resources associated with this ReplaySession are being transferred to
* the child process `new_ptracer`. Receive them in the child process by ptrace-attaching
* to them etc.
* `new_sock_fd` is the control fd that has been assigned to all tasks,
* `new_sock_receiver_fd` is its receiver end.
*/
void reattach_tasks();
void reattach_tasks(ScopedFd new_tracee_socket, ScopedFd new_tracee_socket_receiver);

private:
ReplaySession(const std::string& dir, const Flags& flags);
Expand Down
2 changes: 1 addition & 1 deletion src/ReplayTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void ReplayTask::init_buffers_arch(remote_ptr<void> map_hint) {
cloned_file_data_fname = trace_reader().file_data_clone_file_name(tuid());
ScopedFd clone_file(cloned_file_data_fname.c_str(), O_RDONLY);
ASSERT(this, clone_file.is_open());
remote.infallible_send_fd_dup(clone_file, cloned_file_data_fd_child);
remote.infallible_send_fd_dup(clone_file, cloned_file_data_fd_child, O_CLOEXEC);
fds->add_monitor(this, cloned_file_data_fd_child, new PreserveFileMonitor());
}
}
Expand Down
30 changes: 1 addition & 29 deletions src/RerunCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,34 +467,6 @@ static bool parse_regs(const string& value, vector<TraceField>* out) {
return true;
}

static bool parse_export_checkpoints(const string& arg, RerunFlags& flags) {
size_t first_comma = arg.find(',');
if (first_comma == string::npos) {
fprintf(stderr, "Missing <NUM> parameter for --export-checkpoints");
return false;
}
size_t second_comma = arg.find(',', first_comma + 1);
if (second_comma == string::npos) {
fprintf(stderr, "Missing <FILE> parameter for --export-checkpoints");
return false;
}
char* endptr;
string event_str = arg.substr(0, first_comma);
flags.export_checkpoints_event = strtoul(event_str.c_str(), &endptr, 0);
if (*endptr) {
fprintf(stderr, "Invalid <EVENT> for --export-checkpoints: %s\n", event_str.c_str());
return false;
}
string num_str = arg.substr(first_comma + 1, second_comma - (first_comma + 1));
flags.export_checkpoints_count = strtoul(num_str.c_str(), &endptr, 0);
if (*endptr) {
fprintf(stderr, "Invalid <NUM> for --export-checkpoints: %s\n", num_str.c_str());
return false;
}
flags.export_checkpoints_socket = arg.substr(second_comma + 1);
return true;
}

static bool parse_rerun_arg(vector<string>& args, RerunFlags& flags) {
if (parse_global_option(args)) {
return true;
Expand Down Expand Up @@ -528,7 +500,7 @@ static bool parse_rerun_arg(vector<string>& args, RerunFlags& flags) {
}
break;
case 3:
if (!parse_export_checkpoints(opt.value, flags)) {
if (!parse_export_checkpoints(opt.value, flags.export_checkpoints_event, flags.export_checkpoints_count, flags.export_checkpoints_socket)) {
return false;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2542,7 +2542,7 @@ void Task::copy_state(const CapturedState& state) {
if (cloned_file_data_fd_child >= 0) {
ScopedFd fd(cloned_file_data_fname.c_str(), session().as_record() ?
O_RDWR : O_RDONLY);
remote.infallible_send_fd_dup(fd, cloned_file_data_fd_child);
remote.infallible_send_fd_dup(fd, cloned_file_data_fd_child, O_CLOEXEC);
remote.infallible_lseek_syscall(
cloned_file_data_fd_child, state.cloned_file_data_offset, SEEK_SET);
}
Expand Down

0 comments on commit b8566d2

Please sign in to comment.