-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
modify (simplify) state_history plugin's thread logic; add get_status_request_v1
#236
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submitting commentary in phases 😇
session_mgr.add_send_queue(std::move(self), std::move(entry_ptr)); | ||
template<typename F> | ||
void drop_exceptions(F&& f) { | ||
try{ f(); } catch(...) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the calling context, this appears to be used to guarantee we do all our resource releasing even if some of the calls throw. Do we know the scenarios under which these are expected to throw? Is there nothing valuable in logging the exceptions in that case even if we have to exclude some "expected" exceptions by carving them out of the logging statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct, only time this is used is when performing final cleanup on an already failed connection. It's dead, Jim. There isn't any reasonable action we could take besides logging, but logging could be deceptive since it wouldn't be logging why the connection failed -- just some other cleanup foul up. (the code goes through some trouble to always make sure it only logs one single error message on a connection failure, in hopes that the true culprit is logged)
I am not sure what specific conditions could cause these to throw. Both close()
and cancel_one()
are documented as possibly throwing a boost::system::system_error
so I have to do something. Letting anything escape here would, at the moment, land up in,
spring/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Lines 76 to 80 in 18bea83
void check_coros_done(std::exception_ptr e) { | |
//the only exception that should have bubbled out of the coros is a bad_alloc, bubble it up further. No need to bother | |
// with the rest of the cleanup: we'll be shutting down soon anyway due to bad_alloc | |
if(e) | |
std::rethrow_exception(e); |
which would be a fatal ship thread error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main risk I could see here is if somehow close()
being called twice does something bad. I will look in to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close()
ing a socket multiple times isn't an error; I guess maybe that's not too surprising upon considering that when a socket is dtored it effectively gets close()
ed too.
I'm not really sure what to do. Both close()
and cancel_one()
are documented as potentially throwing but the situations they throw might very much be corner cases or even pathological. Still we ought to be correct here and not blow the entire thread out. If having a "broad" drop_exceptions()
is ugly maybe we could do
boost::system::error_code ignored;
stream.next_layer().close(ignored);
wake_timer.cancel_one(ignored); //HOWEVER, on steady_timer, this is deprecated usage 👎
I'd like to avoid depreciated interfaces, which leads us to something a little more icky like
boost::system::error_code ignored;
stream.next_layer().close(ignored);
try{ wake_timer.cancel_one();} catch(...) {} //ignore failures
this removes the need to have that broad drop_exceptions()
hanging around.
|
||
//TODO: how can set main thread priority on this? | ||
auto& self = *this; //gcc10 ICE workaround wrt capturing 'this' in a coro | ||
co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a minor nitpick, using an anonymous lambda here makes it harder to understand its meaning at a glance.
even something simple like this may add readability:
auto add_requests_to_main_thread = [&]() -> boost::asio::awaitable<void> {
...
}
co_await boost::asio::co_spawn(app().get_io_service(), add_requests_to_main_thread, boost::asio::use_awaitable);
I'd also accept a doc-style function header inside the body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference of those two options is adding a comment, which I did in edb242d. I think the real strength of the coroutines is keeping code presented in the same order of execution and the inline anonymous lambda does better at that than "ping-ponging" back to a named lambda, or calling another function entirely (which starts becoming more callback-ish)
That said, I think there may be some sloppiness here around the capture-all aspect of the lambda; specifically how the main thread coro accesses variables in the outer coro to "fill them out" (as a way of returning values). It seems like maybe the main thread coro should return these values instead, for example instead of
std::deque<bool> status_requests;
std::optional<block_package> block_to_send;
co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable<void> {
status_requests = std::move(self.queued_status_requests);
// ...
co_return; //void
it actually returns the values. for example,
auto [status_requests, block_to_send] = co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable<std::pair<std::deque<bool>, std::optional<block_package>>> {
std::deque<bool> status_requests_ret = std::move(self.queued_status_requests);
// ...
co_return std::make_pair(status_requests_ret, block_to_send_ret)
But, I wasn't really able to come up with an attractive way to do this in the little time I spent looking at it, and I wasn't entirely confident about move semantics and/or copy elision mixed together with coroutines either, without more investigation.
|
||
///TODO: How to set main thread priority? | ||
auto& self = *this; //gcc10 ICE workaround wrt capturing 'this' in a coro | ||
co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Equivalent commentary here as with the read loop. Naming the lambda or adding a comment block would make it crystal clear that this construct is for moving data from the main thread to this thread.
GetBlockID get_block_id; | ||
GetBlock get_block; | ||
|
||
///these items might be used on either the strand or main thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate these comments and I don't expect SHIP to be dynamic in the near term but, I have concerns that over time, this comment (and the one above about the strand-only variables) will end up being a thin protection. Perhaps in a future iteration we can figure out a more concreate way of throwing errors quickly if these members are accessed from the wrong context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The net_plugin
has a verify_strand_in_this_thread
function. Something like that could be used. Maybe in an assert
. I'm not sure the overhead associated with that call. For something like SHiP the overhead is likely not important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly the other edge of the implicit-locking-by-using-strands sword. Definitely be curious to explore some solutions here going forward.
One option in mind is to wrap all these inside a container that validates running_in_this_thread()
(that exists on both a basic_executor and strand). For example (I'm just pseudo coding here) we'd have something like
executor_validator<std::deque<bool>> queued_status_requests(app.executor());
executor_validator<get_blocks_request_v0> current_blocks_request(app.executor());
/// etc
and then something like,
template<typename Executor, typename T>
struct executor_validator {
executor_validator(Executor check_executor) : check_executor(check_executor) { }
T& operator*() {
assert(check_executor.running_in_this_thread());
return t;
}
private:
Executor check_executor;
T t;
};
so something like
*current_blocks_request = ...
would assert if not on running on main thread.
I suspect there will be quite a few traps here in practice.
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Outdated
Show resolved
Hide resolved
GetBlockID get_block_id; | ||
GetBlock get_block; | ||
|
||
///these items might be used on either the strand or main thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The net_plugin
has a verify_strand_in_this_thread
function. Something like that could be used. Maybe in an assert
. I'm not sure the overhead associated with that call. For something like SHiP the overhead is likely not important.
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Outdated
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Outdated
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Outdated
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
fc_dlog(plugin.get_logger(), "the id for block ${block_num} in block request have_positions does not match the existing", | ||
("block_num", cp.block_num)); | ||
} | ||
catch(boost::system::system_error& e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, I broke out boost::system::system_error
to be treated specially because the what()
on it contains a lot of information -- even the filename. It's the difference between
state history connection from 127.0.0.1:55066 failed: End of file
vs
state history connection from 127.0.0.1:34910 failed: End of file [asio.misc:2 at /home/foo/leap/libraries/boost/libs/asio/include/boost/asio/detail/reactive_socket_recv_op.hpp:134:5 in function 'static void boost::asio::detail::reactive_socket_recv_op<MutableBufferSequence, Handler, IoExecutor>::do_complete(void*, boost::asio::detail::operation*, const boost::system::error_code&, std::size_t)']
Note:start |
there cannot be a session in connections before replay is completed because ship's plugin_startup() is where the listner is created
avoid a reorder warning that would require more significant changes
create_listener<boost::asio::local::stream_protocol>(unix_path); | ||
} | ||
} catch (std::exception&) { | ||
} catch(std::exception&) { | ||
FC_THROW_EXCEPTION(plugin_exception, "unable to open listen socket"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we log e.what()
before re-throwing plugin_exception
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interestingly that error message is never seen; somewhere the exception seems to be swallowed. When I see an error here I see (which is good enough for now imo),
warn 2024-06-11T00:51:24.439 nodeos listener.hpp:202 operator() ] unable to listen on 127.0.0.1:8080 resolved from 127.0.0.1:8080: bind: Address already in use [system:98 at /home/xxx/leap/libraries/boost/libs/asio/include/boost/asio/detail/reactive_socket_service.hpp:161:5 in function 'boost::system::error_code boost::asio::detail::reactive_socket_service<Protocol>::bind(implementation_type&, const endpoint_type&, boost::system::error_code&)']
error 2024-06-11T00:51:24.439 nodeos listener.hpp:224 create_listener ] none of the addresses resolved from 127.0.0.1:8080 can be listened to
info 2024-06-11T00:51:24.439 nodeos main.cpp:165 operator() ] appbase quit called
...
info 2024-06-11T00:51:24.442 nodeos main.cpp:241 main ] nodeos successfully exiting
We can see that the creation of the listener logs the elog
above and throws,
spring/libraries/libfc/include/fc/network/listener.hpp
Lines 223 to 226 in caa4780
if (listened == 0) { | |
fc_elog(logger, "none of the addresses resolved from ${addr} can be listened to", ("addr", address)); | |
throw std::system_error(std::make_error_code(std::errc::bad_address)); | |
} |
My initial impression is that application_base::startup()
should have rethrown the plugin_exception
which would then land us in main, but that's only after application_base::shutdown()
has its own opportunity to usurp the exception to something else that occurs on shutdown. Which doesn't seem to be the case. appbase exceptions ay ay ay 😱
plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know review is winding down, but adding some additional self-commentary to a few places
session_mgr.add_send_queue(std::move(self), std::move(entry_ptr)); | ||
void awake_if_idle() { | ||
boost::asio::dispatch(strand, [this]() { | ||
wake_timer.cancel_one(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a quirky pattern.
asio & corountines are often demonstrated in use cases that I would call a sort of "1:1 synchronous usage", for example echoing back what was sent, or replying to an HTTP request that was received. In those examples, a single async_receive
is always followed by a single async_send
. This makes the coroutine usage simple and elegant, it ends up with something like
while(true) {
co_await sock.async_receive(...);
co_await sock.async_send(...);
}
But this is not the model of how state history's websocket protocol operates. It can stream a potentially unlimited async_send
s from just a single async_receive
, and during the streaming of these async_send
s it might need to "pause" sending until some event from our main thread source (i.e. a new block is applied), and it still must service new async_receive
s during the streaming of data. And, remember, both the "send side" and "receive side" need to be on a single strand.
There is only one example pattern of a similar communication protocol in asio documentation: the coroutine based chat server. This is where this quirky steady_timer
-as-a-sort-of-condition-variable comes from.
add_subdirectory(tests) | ||
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 11 ) | ||
target_compile_options( state_history_plugin PRIVATE "-fcoroutines" ) | ||
endif() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The situation with gcc 10 and corountines is certainly a bit... eyebrow raising. So far I've not seen any indications it is malfunctioning in our usage.
target_include_directories( test_state_history PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" ) | ||
|
||
add_test(test_state_history test_state_history) | ||
set_property(TEST test_state_history PROPERTY LABELS nonparallelizable_tests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel bad for the removal of these tests. Unfortunately many of them, due to how the mocking is structured, would need to be significantly rewritten. They also impose an additional 'non-natural' burden on the code, such as needing to support uncompressed logs -- something that never actually happens in practice.
I've added #230 to contemplate what to do about restoring these.
imo the biggest loss is likely any tests around nuanced semantics of how requests are handled. For example, how have_positions
operates. Otherwise, our integration tests bang on most aspects of ship's connection.
I took the current have_positions
logic from 3.x which might not be exactly how 4 or 5 performs it, I'm unsure. It's a little difficult to ascertain what clients expect with have_positions
because I'm not aware of any client that uses that functionality.
Executor strand; | ||
coro_throwing_stream stream; | ||
coro_nonthrowing_steadytimer wake_timer; | ||
unsigned coros_running = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the manual counting of how many coroutines are running, to then manually fire off a callback that the session
can be destroyed, looks suspect as not being very C++-ish. I tried some approaches with shared_ptr
to do automatic cleanup but all attempts resulted in very spooky crashes and/or sanitizer errors.
The manual counting is potentially error prone, but it allows crafting the rules of when the session
is destroyed (critically, when no coroutine's context is "alive" at all) very deliberately.
User Facing Changes
Add
get_status_request_v1
andget_status_result_v1
for completeness and consistency. It's a minor thing, this just provides access to the range of blocks with finality data available.Some state_history log messages have been removed, most notably the log that occurs on every block. That seemed rather unnecessary (and was not present in 2.0/3.1) but this change can certainly be revisited.
Internal Changes
The entirety of the connection code has been reworked to simplify the threading logic. This new design makes all decisions about what blocks to send on the main thread, and then conveys that information (while holding appropriate handles) to the ship thread which does the log reading, decompression, and websocket operations. This rework resolves #275 because the prior implementation would sometimes not send forks properly due to thread timing.
Unfortunately with the way the log implementation relies on mutexes this implementation is subject to deadlocks. It must be taken as a pair with AntelopeIO/leap#237; the PRs are split chiefly to reduce potential review burden. (I had hoped this PR would pass tests with the ship_streamer test remaining disabled but it seems that
ship_reqs_across_svnn_test
is running afoul of the deadlock)This PR contains the exact same commits as AntelopeIO/leap#96; up until the point of an accidental push mishap.