Skip to content

Commit

Permalink
Merge pull request #7 from eosnetworkfoundation/merge_dfuse
Browse files Browse the repository at this point in the history
Merge dfuse into main
  • Loading branch information
heifner authored Mar 19, 2022
2 parents 5134c1a + 2b85cc0 commit 2baba6a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 31 deletions.
10 changes: 9 additions & 1 deletion include/fc/log/dmlog_appender.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <fc/log/appender.hpp>
#include <fc/reflect/reflect.hpp>

namespace fc {

Expand All @@ -11,16 +12,23 @@ namespace fc {
class dmlog_appender : public appender
{
public:
struct config
{
std::string file = "-";
};
explicit dmlog_appender( const variant& args );
dmlog_appender();
explicit dmlog_appender( const std::optional<config>& args) ;

virtual ~dmlog_appender();
virtual void initialize( boost::asio::io_service& io_service ) override;

virtual void log( const log_message& m ) override;

private:
dmlog_appender();
class impl;
std::unique_ptr<impl> my;
};
}

FC_REFLECT(fc::dmlog_appender::config, (file))
82 changes: 52 additions & 30 deletions src/log/dmlog_appender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <fc/reflect/variant.hpp>
#ifndef WIN32
#include <unistd.h>
#include <signal.h>
#endif
#include <boost/asio/io_context.hpp>
#include <boost/thread/mutex.hpp>
Expand All @@ -16,60 +17,81 @@
namespace fc {
class dmlog_appender::impl {
public:
bool is_stopped;
boost::asio::io_service* io_service;
boost::mutex log_mutex;
bool is_stopped = false;
FILE* out = nullptr;
bool owns_out = false;
};

dmlog_appender::dmlog_appender( const std::optional<dmlog_appender::config>& args )
:dmlog_appender(){
if (!args || args->file == "-")
{
my->out = stdout;
}
else
{
my->out = std::fopen(args->file.c_str(), "a");
if (my->out)
{
std::setbuf(my->out, nullptr);
my->owns_out = true;
}
else
{
FC_THROW("Failed to open deep mind log file ${name}", ("name", args->file));
}
}
}

dmlog_appender::dmlog_appender( const variant& args )
:my(new impl){}
:dmlog_appender(args.as<std::optional<config>>()){}

dmlog_appender::dmlog_appender()
:my(new impl){}

dmlog_appender::~dmlog_appender() {}

void dmlog_appender::initialize( boost::asio::io_service& io_service ) {
my->io_service = &io_service;
dmlog_appender::~dmlog_appender() {
if (my->owns_out)
{
std::fclose(my->out);
}
}

void dmlog_appender::initialize( boost::asio::io_service& io_service ) {}

void dmlog_appender::log( const log_message& m ) {
FILE* out = stdout;
FILE* out = my->out;

string message = format_string( "DMLOG " + m.get_format() + "\n", m.get_data() );
std::unique_lock<boost::mutex> lock(my->log_mutex);
if (my->is_stopped) {
// It might happen that `io_server->stop` was called due to printing errors but did not take
// effect just yet. So if we are stopped, print a line that we are terminated and return.
fprintf(out, "DMLOG FPRINTF_FAILURE_TERMINATED\n");
return;
}

int retries = 0;
auto remaining_size = message.size();
auto message_ptr = message.c_str();
while (true) {
while (!my->is_stopped && remaining_size) {
auto written = fwrite(message_ptr, sizeof(char), remaining_size, out);
if (written == remaining_size) {
break;

// EINTR shouldn't happen anymore, but keep this detection, just in case.
if(written == 0 && errno != EINTR)
{
my->is_stopped = true;
}

fprintf(stderr, "DMLOG FPRINTF_FAILED failed written=%lu remaining=%lu %d %s\n", written, remaining_size, ferror(out), strerror(errno));
if(written != remaining_size)
{
fprintf(stderr, "DMLOG FPRINTF_FAILED failed written=%lu remaining=%lu %d %s\n", written, remaining_size, ferror(out), strerror(errno));
clearerr(out);
}

if (retries++ > 5) {
if(my->is_stopped)
{
fprintf(stderr, "DMLOG FPRINTF_FAILURE_TERMINATED\n");
my->io_service->stop();
my->is_stopped = true;

return;
// Depending on the error, we might have already gotten a SIGPIPE
// An extra signal is harmless, though. Use a process targeted
// signal (not raise) because the SIGTERM may be blocked in this
// thread.
kill(getpid(), SIGTERM);
}

message_ptr = &message_ptr[written];
remaining_size -= written;

// We don't `fflush`, rather we made `stdout` unbuffered with `setbuf`. This way we have
// atomic error handling and retry logic up here ^^.
}
}
}

0 comments on commit 2baba6a

Please sign in to comment.