From 7aeb0b1877490a3f94de721bf2097bfcdffda60b Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 30 Dec 2021 11:10:45 +0000 Subject: [PATCH 1/6] Clean up walproposer states --- src/backend/replication/walproposer.c | 878 ++++++++++---------- src/backend/replication/walproposer_utils.c | 43 +- src/include/replication/walproposer.h | 43 +- 3 files changed, 459 insertions(+), 505 deletions(-) diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index b307c79177d..e0d7d179049 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -110,7 +110,15 @@ static void ShutdownConnection(WalKeeper *wk); static void ResetConnection(WalKeeper *wk); static long TimeToReconnect(TimestampTz now); static void ReconnectWalKeepers(void); -static void AdvancePollState(int i, uint32 events); +static void AdvancePollState(WalKeeper *wk, uint32 events); +static void HandleConnectionEvent(WalKeeper *wk); +static void SendStartWALPush(WalKeeper *wk); +static void RecvStartWALPushResult(WalKeeper *wk); +static void SendProposerGreeting(WalKeeper *wk); +static void RecvAcceptorGreeting(WalKeeper *wk); +static void SendVoteRequest(WalKeeper *wk); +static void RecvVoteResponse(WalKeeper *wk); +static void HandleElectedProposer(void); static term_t GetHighestTerm(TermHistory *th); static term_t GetEpoch(WalKeeper *wk); static void DetermineEpochStartLsn(void); @@ -129,10 +137,10 @@ static XLogRecPtr CalculateDiskConsistentLsn(void); static XLogRecPtr CalculateMinFlushLsn(void); static XLogRecPtr GetAcknowledgedByQuorumWALPosition(void); static void HandleWalKeeperResponse(void); -static bool AsyncRead(int i, char **buf, int *buf_size); -static bool AsyncReadFixed(int i, void *value, size_t value_size); -static bool AsyncReadMessage(int i, AcceptorProposerMessage *anymsg); -static bool BlockingWrite(int i, void *msg, size_t msg_size, WalKeeperState success_state); +static bool AsyncRead(WalKeeper *wk, char **buf, int *buf_size); +static bool AsyncReadFixed(WalKeeper *wk, void *value, size_t value_size); +static bool AsyncReadMessage(WalKeeper *wk, AcceptorProposerMessage *anymsg); +static bool BlockingWrite(WalKeeper *wk, void *msg, size_t msg_size, WalKeeperState success_state); static bool AsyncWrite(WalKeeper *wk, void *msg, size_t msg_size, WalKeeperState flush_state); static bool AsyncFlush(WalKeeper *wk); @@ -250,21 +258,19 @@ WalProposerPoll(void) { WalKeeper *wk; int rc; - int i; WaitEvent event; TimestampTz now = GetCurrentTimestamp(); rc = WaitEventSetWait(waitEvents, TimeToReconnect(now), &event, 1, WAIT_EVENT_WAL_SENDER_MAIN); wk = (WalKeeper *) event.user_data; - i = (int) (wk - walkeeper); /* * If the event contains something that one of our walkeeper states * was waiting for, we'll advance its state. */ if (rc != 0 && (event.events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))) - AdvancePollState(i, event.events); + AdvancePollState(wk, event.events); /* * If the timeout expired, attempt to reconnect to any walkeepers that @@ -640,455 +646,454 @@ ReconnectWalKeepers(void) } /* - * Performs the logic for advancing the state machine of the 'i'th walkeeper, + * Performs the logic for advancing the state machine of the specified walkeeper, * given that a certain set of events has occured. */ static void -AdvancePollState(int i, uint32 events) +AdvancePollState(WalKeeper *wk, uint32 events) { - WalKeeper *wk = &walkeeper[i]; /* - * Keep advancing the state while either: (a) the event is still - * unprocessed (usually because it's the first iteration of the loop), or - * (b) the state can execute, and does not need to wait for any socket - * events + * Sanity check. We assume further down that the operations don't + * block because the socket is ready. */ - while (events || StateShouldImmediatelyExecute(wk->state)) + AssertEventsOkForState(events, wk); + + /* Execute the code corresponding to the current state */ + switch (wk->state) { - /* - * Sanity check. We assume further down that the operations don't - * block because the socket is ready. - */ - AssertEventsOkForState(events, wk); + /* + * WAL keepers are only taken out of SS_OFFLINE by calls to + * ResetConnection + */ + case SS_OFFLINE: + elog(FATAL, "Unexpected walkeeper %s:%s state advancement: is offline", + wk->host, wk->port); + break; /* actually unreachable, but prevents + * -Wimplicit-fallthrough */ - /* Execute the code corresponding to the current state */ - switch (wk->state) - { - /* - * WAL keepers are only taken out of SS_OFFLINE by calls to - * ResetConnection - */ - case SS_OFFLINE: - elog(FATAL, "Unexpected walkeeper %s:%s state advancement: is offline", - wk->host, wk->port); - break; /* actually unreachable, but prevents - * -Wimplicit-fallthrough */ - - /* - * Both connecting states run the same logic. The only - * difference is the events they're expecting - */ - case SS_CONNECTING_READ: - case SS_CONNECTING_WRITE: - { - WalProposerConnectPollStatusType result = walprop_connect_poll(wk->conn); - - /* The new set of events we'll wait on, after updating */ - uint32 new_events = WL_NO_EVENTS; - - switch (result) - { - case WP_CONN_POLLING_OK: - elog(LOG, "connected with node %s:%s", wk->host, - wk->port); - - /* - * Once we're fully connected, we can move to the - * next state - */ - wk->state = SS_EXEC_STARTWALPUSH; - - /* - * Even though SS_EXEC_STARTWALPUSH doesn't wait - * on anything, we do need to replace the current - * event, so we have to just pick something. We'll - * eventually need the socket to be readable, so - * we go with that. - */ - new_events = WL_SOCKET_READABLE; - break; - - /* - * If we need to poll to finish connecting, - * continue doing that - */ - case WP_CONN_POLLING_READING: - wk->state = SS_CONNECTING_READ; - new_events = WL_SOCKET_READABLE; - break; - case WP_CONN_POLLING_WRITING: - wk->state = SS_CONNECTING_WRITE; - new_events = WL_SOCKET_WRITEABLE; - break; - - case WP_CONN_POLLING_FAILED: - elog(WARNING, "Failed to connect to node '%s:%s': %s", - wk->host, wk->port, walprop_error_message(wk->conn)); - - /* - * If connecting failed, we don't want to restart - * the connection because that might run us into a - * loop. Instead, shut it down -- it'll naturally - * restart at a slower interval on calls to - * ReconnectWalKeepers. - */ - ShutdownConnection(wk); - return; - } - - /* - * Because PQconnectPoll can change the socket, we have to - * un-register the old event and re-register an event on - * the new socket. - */ - HackyRemoveWalProposerEvent(wk); - wk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_socket(wk->conn), NULL, wk); - break; - } - - /* - * Send "START_WAL_PUSH" command to the walkeeper. After - * sending, wait for response with SS_WAIT_EXEC_RESULT - */ - case SS_EXEC_STARTWALPUSH: - { - char *query = NULL; - if (zenith_pageserver_connstring_walproposer != NULL) { - query = psprintf("START_WAL_PUSH %s", zenith_pageserver_connstring_walproposer); - } else { - query = psprintf("START_WAL_PUSH"); - } - if (!walprop_send_query(wk->conn, query)) - { - pfree(query); - elog(WARNING, "Failed to send 'START_WAL_PUSH' query to walkeeper %s:%s: %s", - wk->host, wk->port, walprop_error_message(wk->conn)); - ShutdownConnection(wk); - return; - } - pfree(query); - wk->state = SS_WAIT_EXEC_RESULT; - UpdateEventSet(wk, WL_SOCKET_READABLE); - break; - } - - case SS_WAIT_EXEC_RESULT: - switch (walprop_get_query_result(wk->conn)) - { - /* - * Successful result, move on to starting the - * handshake - */ - case WP_EXEC_SUCCESS_COPYBOTH: - - /* - * Because this state is immediately executable, we'll - * start this on the next iteration of the loop - */ - wk->state = SS_HANDSHAKE_SEND; - break; - - /* - * Needs repeated calls to finish. Wait until the - * socket is readable - */ - case WP_EXEC_NEEDS_INPUT: - - /* - * SS_WAIT_EXEC_RESULT is always reached through an - * event, so we don't need to update the event set - */ - break; - - case WP_EXEC_FAILED: - elog(WARNING, "Failed to send query to walkeeper %s:%s: %s", - wk->host, wk->port, walprop_error_message(wk->conn)); - ShutdownConnection(wk); - return; - - /* - * Unexpected result -- funamdentally an error, but we - * want to produce a custom message, rather than a - * generic "something went wrong" - */ - case WP_EXEC_UNEXPECTED_SUCCESS: - elog(WARNING, "Received bad response from walkeeper %s:%s query execution", - wk->host, wk->port); - ShutdownConnection(wk); - return; - } - break; - - /* - * Start handshake: first of all send information about the - * WAL keeper. After sending, we wait on SS_HANDSHAKE_RECV for - * a response to finish the handshake. - */ - case SS_HANDSHAKE_SEND: - - /* - * On failure, logging & resetting the connection is handled. - * We just need to handle the control flow. - */ - if (!BlockingWrite(i, &proposerGreeting, sizeof(proposerGreeting), SS_HANDSHAKE_RECV)) - return; + /* + * Both connecting states run the same logic. The only + * difference is the events they're expecting + */ + case SS_CONNECTING_READ: + case SS_CONNECTING_WRITE: + HandleConnectionEvent(wk); + break; + + /* + * Waiting for a successful CopyBoth response. + */ + case SS_WAIT_EXEC_RESULT: + RecvStartWALPushResult(wk); + break; + + /* + * Finish handshake comms: receive information about the safekeeper. + */ + case SS_HANDSHAKE_RECV: + RecvAcceptorGreeting(wk); + break; + + /* + * Voting is an idle state - we don't expect any events to trigger. + * Refer to the execution of SS_HANDSHAKE_RECV to see how nodes are + * transferred from SS_VOTING to sending actual vote requests. + */ + case SS_VOTING: + elog(WARNING, "EOF from node %s:%s in %s state", wk->host, + wk->port, FormatWalKeeperState(wk->state)); + ResetConnection(wk); + return; + + /* Read the safekeeper response for our candidate */ + case SS_WAIT_VERDICT: + RecvVoteResponse(wk); + break; - break; + /* Flush proposer announcement message */ + case SS_SEND_ELECTED_FLUSH: - /* - * Finish handshake comms: receive information about the WAL - * keeper - */ - case SS_HANDSHAKE_RECV: + /* + * AsyncFlush ensures we only move on to SS_ACTIVE once the flush + * completes. If we still have more to do, we'll wait until the next + * poll comes along. + */ + if (!AsyncFlush(wk)) + return; + + StartStreaming(wk); + break; + + /* + * Idle state for waiting votes from quorum. + */ + case SS_IDLE: + elog(WARNING, "EOF from node %s:%s in %s state", wk->host, + wk->port, FormatWalKeeperState(wk->state)); + ResetConnection(wk); + return; - /* - * If our reading doesn't immediately succeed, any necessary - * error handling or state setting is taken care of. We can - * leave any other work until later. - */ - if (!AsyncReadFixed(i, &wk->greet, sizeof(wk->greet))) + /* + * Active state is used for streaming WAL and receiving feedback. + */ + case SS_ACTIVE: + if (events & WL_SOCKET_WRITEABLE) + if (!SendAppendRequests(wk)) return; - /* Protocol is all good, move to voting. */ - wk->state = SS_VOTING; - - /* - * Don't need to update the event set yet. Either we update - * the event set to WL_SOCKET_READABLE *or* we change the - * state to SS_SEND_VOTE in the loop below - */ - UpdateEventSet(wk, WL_SOCKET_READABLE); - wk->feedback.flushLsn = truncateLsn; - wk->feedback.hs.ts = 0; - - /* - * We want our term to be highest and unique, so choose max - * and +1 once we have majority. - */ - propTerm = Max(walkeeper[i].greet.term, propTerm); - - /* - * Check if we have quorum. If there aren't enough walkeepers, - * wait and do nothing. We'll eventually get a task when the - * election starts. - * - * If we do have quorum, we can start an election - */ - if (++n_connected < quorum) - { - /* - * SS_VOTING is an idle state; read-ready indicates the - * connection closed. - */ - UpdateEventSet(wk, WL_SOCKET_READABLE); - } - else - { - if (n_connected == quorum) - { - propTerm++; - /* prepare voting message */ - voteRequest = (VoteRequest) - { - .tag = 'v', - .term = propTerm - }; - memcpy(voteRequest.proposerId.data, proposerGreeting.proposerId.data, UUID_LEN); - } - - /* - * Now send voting request to the cohort and wait - * responses - */ - for (int j = 0; j < n_walkeepers; j++) - { - /* - * Remember: SS_VOTING indicates that the walkeeper is - * participating in voting, but hasn't sent anything - * yet. The ones that have sent something are given - * SS_SEND_VOTE or SS_WAIT_VERDICT. - */ - if (walkeeper[j].state == SS_VOTING) - { - walkeeper[j].state = SS_SEND_VOTE; - /* Immediately send info */ - AdvancePollState(j, WL_NO_EVENTS); - } - } - } - break; - - /* - * Voting is an idle state - we don't expect any events to - * trigger. Refer to the execution of SS_HANDSHAKE_RECV to see - * how nodes are transferred from SS_VOTING to SS_SEND_VOTE. - */ - case SS_VOTING: - elog(WARNING, "EOF from node %s:%s in %s state", wk->host, - wk->port, FormatWalKeeperState(wk->state)); - ResetConnection(wk); - break; - - /* We have quorum for voting, send our vote request */ - case SS_SEND_VOTE: - elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, wk->host, wk->port, voteRequest.term); - /* On failure, logging & resetting is handled */ - if (!BlockingWrite(i, &voteRequest, sizeof(voteRequest), SS_WAIT_VERDICT)) + if (events & WL_SOCKET_READABLE) + if (!RecvAppendResponses(wk)) return; - /* If successful, wait for read-ready with SS_WAIT_VERDICT */ - break; + UpdateEventSet(wk, WL_SOCKET_READABLE | (wk->currMsg == NULL ? 0 : WL_SOCKET_WRITEABLE)); + break; + } +} - /* Start reading the walkeeper response for our candidate */ - case SS_WAIT_VERDICT: - wk->voteResponse.apm.tag = 'v'; - if (!AsyncReadMessage(i, (AcceptorProposerMessage *) &wk->voteResponse)) - return; +static void +HandleConnectionEvent(WalKeeper *wk) +{ + WalProposerConnectPollStatusType result = walprop_connect_poll(wk->conn); - elog(LOG, - "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X", - wk->host, wk->port, wk->voteResponse.voteGiven, GetHighestTerm(&wk->voteResponse.termHistory), - LSN_FORMAT_ARGS(wk->voteResponse.flushLsn), - LSN_FORMAT_ARGS(wk->voteResponse.truncateLsn)); - - /* - * In case of acceptor rejecting our vote, bail out, but only - * if either it already lives in strictly higher term - * (concurrent compute spotted) or we are not elected yet and - * thus need the vote. - */ - if ((!wk->voteResponse.voteGiven) && - (wk->voteResponse.term > propTerm || n_votes < quorum)) - { - elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", - wk->host, wk->port, - wk->voteResponse.term, propTerm); - } - Assert(wk->voteResponse.term == propTerm); - - /* Handshake completed, do we have quorum? */ - n_votes++; - if (n_votes < quorum) - { - wk->state = SS_IDLE; /* can't do much yet, no quorum */ - } - else if (n_votes > quorum) - { - - /* recovery already performed, just start streaming */ - SendProposerElected(wk); - } - else - { - wk->state = SS_IDLE; - UpdateEventSet(wk, WL_SOCKET_READABLE); /* Idle states wait for - * read-ready */ - - DetermineEpochStartLsn(); - - /* - * Check if not all safekeepers are up-to-date, we need to - * download WAL needed to synchronize them - */ - if (truncateLsn < propEpochStartLsn) - { - elog(LOG, - "start recovery because truncateLsn=%X/%X is not " - "equal to epochStartLsn=%X/%X", - LSN_FORMAT_ARGS(truncateLsn), - LSN_FORMAT_ARGS(propEpochStartLsn)); - /* Perform recovery */ - if (!WalProposerRecovery(donor, proposerGreeting.timeline, truncateLsn, propEpochStartLsn)) - elog(FATAL, "Failed to recover state"); - } - else if (syncSafekeepers) - { - /* Sync is not needed: just exit */ - fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(propEpochStartLsn)); - exit(0); - } - - for (int i = 0; i < n_walkeepers; i++) - { - if (walkeeper[i].state == SS_IDLE) - SendProposerElected(&walkeeper[i]); - } - - /* - * The proposer has been elected, and there will be no quorum waiting - * after this point. There will be no safekeeper with state SS_IDLE - * also, because that state is used only for quorum waiting. - */ - - if (syncSafekeepers) - { - /* - * Queue empty message to enforce receiving feedback - * even from nodes who are fully recovered; this is - * required to learn they switched epoch which finishes - * sync-safeekepers who doesn't generate any real new - * records. Will go away once we switch to async acks. - */ - BroadcastMessage(CreateMessageCommitLsnOnly(propEpochStartLsn)); - - /* keep polling until all walkeepers are synced */ - return; - } - - WalProposerStartStreaming(propEpochStartLsn); - /* Should not return here */ - } - - break; + /* The new set of events we'll wait on, after updating */ + uint32 new_events = WL_NO_EVENTS; - /* Flush proposer announcement message */ - case SS_SEND_ELECTED_FLUSH: - - /* - * AsyncFlush ensures we only move on to SS_RECV_FEEDBACK once - * the flush completes. If we still have more to do, we'll - * wait until the next poll comes along. - */ - if (!AsyncFlush(wk)) - return; - - StartStreaming(wk); + switch (result) + { + case WP_CONN_POLLING_OK: + elog(LOG, "connected with node %s:%s", wk->host, + wk->port); - break; + /* + * We have to pick some event to update event set. + * We'll eventually need the socket to be readable, + * so we go with that. + */ + new_events = WL_SOCKET_READABLE; + break; + /* + * If we need to poll to finish connecting, + * continue doing that + */ + case WP_CONN_POLLING_READING: + wk->state = SS_CONNECTING_READ; + new_events = WL_SOCKET_READABLE; + break; + case WP_CONN_POLLING_WRITING: + wk->state = SS_CONNECTING_WRITE; + new_events = WL_SOCKET_WRITEABLE; + break; - /* - * Idle state for sending WAL. Moved out only by calls to - * SendMessageToNode - */ - case SS_IDLE: - elog(WARNING, "EOF from node %s:%s in %s state", wk->host, - wk->port, FormatWalKeeperState(wk->state)); - ResetConnection(wk); - break; + case WP_CONN_POLLING_FAILED: + elog(WARNING, "Failed to connect to node '%s:%s': %s", + wk->host, wk->port, walprop_error_message(wk->conn)); + /* + * If connecting failed, we don't want to restart + * the connection because that might run us into a + * loop. Instead, shut it down -- it'll naturally + * restart at a slower interval on calls to + * ReconnectWalKeepers. + */ + ShutdownConnection(wk); + return; + } - case SS_ACTIVE: - if (events & WL_SOCKET_WRITEABLE) - if (!SendAppendRequests(wk)) - return; + /* + * Because PQconnectPoll can change the socket, we have to + * un-register the old event and re-register an event on + * the new socket. + */ + HackyRemoveWalProposerEvent(wk); + wk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_socket(wk->conn), NULL, wk); - if (events & WL_SOCKET_READABLE) - if (!RecvAppendResponses(wk)) - return; + /* If we successfully connected, send START_WAL_PUSH query */ + if (result == WP_CONN_POLLING_OK) + SendStartWALPush(wk); +} - UpdateEventSet(wk, WL_SOCKET_READABLE | (wk->currMsg == NULL ? 0 : WL_SOCKET_WRITEABLE)); - break; +/* + * Send "START_WAL_PUSH" message as an empty query to the walkeeper. Performs + * a blocking send, then immediately moves to SS_WAIT_EXEC_RESULT. If something + * goes wrong, change state to SS_OFFLINE and shutdown the connection. + */ +static void +SendStartWALPush(WalKeeper *wk) +{ + char *query = NULL; + if (zenith_pageserver_connstring_walproposer != NULL) { + query = psprintf("START_WAL_PUSH %s", zenith_pageserver_connstring_walproposer); + } else { + query = psprintf("START_WAL_PUSH"); + } + if (!walprop_send_query(wk->conn, query)) + { + pfree(query); + elog(WARNING, "Failed to send 'START_WAL_PUSH' query to walkeeper %s:%s: %s", + wk->host, wk->port, walprop_error_message(wk->conn)); + ShutdownConnection(wk); + return; + } + pfree(query); + wk->state = SS_WAIT_EXEC_RESULT; + UpdateEventSet(wk, WL_SOCKET_READABLE); +} + +static void +RecvStartWALPushResult(WalKeeper *wk) +{ + switch (walprop_get_query_result(wk->conn)) + { + /* + * Successful result, move on to starting the + * handshake + */ + case WP_EXEC_SUCCESS_COPYBOTH: + + SendProposerGreeting(wk); + break; + + /* + * Needs repeated calls to finish. Wait until the + * socket is readable + */ + case WP_EXEC_NEEDS_INPUT: + + /* + * SS_WAIT_EXEC_RESULT is always reached through an + * event, so we don't need to update the event set + */ + break; + + case WP_EXEC_FAILED: + elog(WARNING, "Failed to send query to walkeeper %s:%s: %s", + wk->host, wk->port, walprop_error_message(wk->conn)); + ShutdownConnection(wk); + return; + + /* + * Unexpected result -- funamdentally an error, but we + * want to produce a custom message, rather than a + * generic "something went wrong" + */ + case WP_EXEC_UNEXPECTED_SUCCESS: + elog(WARNING, "Received bad response from walkeeper %s:%s query execution", + wk->host, wk->port); + ShutdownConnection(wk); + return; + } +} + +/* + * Start handshake: first of all send information about the + * WAL keeper. After sending, we wait on SS_HANDSHAKE_RECV for + * a response to finish the handshake. + */ +static void +SendProposerGreeting(WalKeeper *wk) +{ + /* + * On failure, logging & resetting the connection is handled. + * We just need to handle the control flow. + */ + BlockingWrite(wk, &proposerGreeting, sizeof(proposerGreeting), SS_HANDSHAKE_RECV); +} + +static void +RecvAcceptorGreeting(WalKeeper *wk) +{ + /* + * If our reading doesn't immediately succeed, any necessary + * error handling or state setting is taken care of. We can + * leave any other work until later. + */ + if (!AsyncReadFixed(wk, &wk->greet, sizeof(wk->greet))) + return; + + /* Protocol is all good, move to voting. */ + wk->state = SS_VOTING; + wk->feedback.flushLsn = truncateLsn; + wk->feedback.hs.ts = 0; + + /* + * We want our term to be highest and unique, so choose max + * and +1 once we have majority. + */ + propTerm = Max(wk->greet.term, propTerm); + + /* + * Check if we have quorum. If there aren't enough safekeepers, + * wait and do nothing. We'll eventually get a task when the + * election starts. + * + * If we do have quorum, we can start an election + */ + if (++n_connected < quorum) + { + /* + * SS_VOTING is an idle state; read-ready indicates the + * connection closed. + */ + UpdateEventSet(wk, WL_SOCKET_READABLE); + } + else + { + if (n_connected == quorum) + { + propTerm++; + /* prepare voting message */ + voteRequest = (VoteRequest) + { + .tag = 'v', + .term = propTerm + }; + memcpy(voteRequest.proposerId.data, proposerGreeting.proposerId.data, UUID_LEN); } /* - * We've already done something for these events - don't attempt more - * states than we need to. + * Now send voting request to the cohort and wait + * responses */ - events = WL_NO_EVENTS; + for (int j = 0; j < n_walkeepers; j++) + { + /* + * Remember: SS_VOTING indicates that the safekeeper is + * participating in voting, but hasn't sent anything + * yet. + */ + if (walkeeper[j].state == SS_VOTING) + SendVoteRequest(&walkeeper[j]); + } + } +} + +static void +SendVoteRequest(WalKeeper *wk) +{ + /* We have quorum for voting, send our vote request */ + elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, wk->host, wk->port, voteRequest.term); + /* On failure, logging & resetting is handled */ + if (!BlockingWrite(wk, &voteRequest, sizeof(voteRequest), SS_WAIT_VERDICT)) + return; + + /* If successful, wait for read-ready with SS_WAIT_VERDICT */ +} + +static void +RecvVoteResponse(WalKeeper *wk) +{ + wk->voteResponse.apm.tag = 'v'; + if (!AsyncReadMessage(wk, (AcceptorProposerMessage *) &wk->voteResponse)) + return; + + elog(LOG, + "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X", + wk->host, wk->port, wk->voteResponse.voteGiven, GetHighestTerm(&wk->voteResponse.termHistory), + LSN_FORMAT_ARGS(wk->voteResponse.flushLsn), + LSN_FORMAT_ARGS(wk->voteResponse.truncateLsn)); + + /* + * In case of acceptor rejecting our vote, bail out, but only + * if either it already lives in strictly higher term + * (concurrent compute spotted) or we are not elected yet and + * thus need the vote. + */ + if ((!wk->voteResponse.voteGiven) && + (wk->voteResponse.term > propTerm || n_votes < quorum)) + { + elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + wk->host, wk->port, + wk->voteResponse.term, propTerm); + } + Assert(wk->voteResponse.term == propTerm); + + /* Handshake completed, do we have quorum? */ + n_votes++; + if (n_votes < quorum) + { + wk->state = SS_IDLE; /* can't do much yet, no quorum */ + } + else if (n_votes > quorum) + { + /* recovery already performed, just start streaming */ + SendProposerElected(wk); + } + else + { + wk->state = SS_IDLE; + UpdateEventSet(wk, WL_SOCKET_READABLE); /* Idle states wait for + * read-ready */ + + HandleElectedProposer(); } } +/* + * Called once a majority of acceptors have voted for us and current proposer + * has been elected. + * + * Sends ProposerElected message to all acceptors in SS_IDLE state and starts + * replication from walsender. + */ +static void +HandleElectedProposer(void) +{ + DetermineEpochStartLsn(); + + /* + * Check if not all safekeepers are up-to-date, we need to + * download WAL needed to synchronize them + */ + if (truncateLsn < propEpochStartLsn) + { + elog(LOG, + "start recovery because truncateLsn=%X/%X is not " + "equal to epochStartLsn=%X/%X", + LSN_FORMAT_ARGS(truncateLsn), + LSN_FORMAT_ARGS(propEpochStartLsn)); + /* Perform recovery */ + if (!WalProposerRecovery(donor, proposerGreeting.timeline, truncateLsn, propEpochStartLsn)) + elog(FATAL, "Failed to recover state"); + } + else if (syncSafekeepers) + { + /* Sync is not needed: just exit */ + fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(propEpochStartLsn)); + exit(0); + } + + for (int i = 0; i < n_walkeepers; i++) + { + if (walkeeper[i].state == SS_IDLE) + SendProposerElected(&walkeeper[i]); + } + + /* + * The proposer has been elected, and there will be no quorum waiting + * after this point. There will be no safekeeper with state SS_IDLE + * also, because that state is used only for quorum waiting. + */ + + if (syncSafekeepers) + { + /* + * Queue empty message to enforce receiving feedback + * even from nodes who are fully recovered; this is + * required to learn they switched epoch which finishes + * sync-safeekepers who doesn't generate any real new + * records. Will go away once we switch to async acks. + */ + BroadcastMessage(CreateMessageCommitLsnOnly(propEpochStartLsn)); + + /* keep polling until all walkeepers are synced */ + return; + } + + WalProposerStartStreaming(propEpochStartLsn); + /* Should not return here */ +} + /* latest term in TermHistory, or 0 is there is no entries */ static term_t GetHighestTerm(TermHistory *th) @@ -1404,8 +1409,7 @@ StartStreaming(WalKeeper *wk) /* * Start sending message to the particular node. * - * Always updates the state and event set for the WAL keeper; setting either of - * these before calling would be redundant work. + * Can be used only for safekeepers in SS_ACTIVE state. */ static void SendMessageToNode(int i, WalMessage *msg) @@ -1414,6 +1418,7 @@ SendMessageToNode(int i, WalMessage *msg) /* we shouldn't be already sending something */ Assert(wk->currMsg == NULL); + Assert(wk->state == SS_ACTIVE); /* * Skip already acknowledged messages. Used after reconnection to get to @@ -1649,7 +1654,7 @@ RecvAppendResponses(WalKeeper *wk) * necessary error handling or state setting is taken care * of. We can leave any other work until later. */ - if (!AsyncReadFixed(wki, &wk->feedback, sizeof(wk->feedback))) + if (!AsyncReadFixed(wk, &wk->feedback, sizeof(wk->feedback))) break; Assert(wk->ackMsg != NULL && (wk->ackMsg->ackMask & (1 << wki)) == 0); @@ -1911,10 +1916,8 @@ HandleWalKeeperResponse(void) * failure. */ static bool -AsyncRead(int i, char **buf, int *buf_size) +AsyncRead(WalKeeper *wk, char **buf, int *buf_size) { - WalKeeper *wk = &walkeeper[i]; - switch (walprop_async_read(wk->conn, buf, buf_size)) { case PG_ASYNC_READ_SUCCESS: @@ -1944,13 +1947,12 @@ AsyncRead(int i, char **buf, int *buf_size) * failed, a warning is emitted and the connection is reset. */ static bool -AsyncReadFixed(int i, void *value, size_t value_size) +AsyncReadFixed(WalKeeper *wk, void *value, size_t value_size) { - WalKeeper *wk = &walkeeper[i]; char *buf = NULL; int buf_size = -1; - if (!(AsyncRead(i, &buf, &buf_size))) + if (!(AsyncRead(wk, &buf, &buf_size))) return false; /* @@ -1977,15 +1979,14 @@ AsyncReadFixed(int i, void *value, size_t value_size) * TODO: migrate AsyncReadFixed here for all messages */ static bool -AsyncReadMessage(int i, AcceptorProposerMessage *anymsg) +AsyncReadMessage(WalKeeper *wk, AcceptorProposerMessage *anymsg) { - WalKeeper *wk = &walkeeper[i]; char *buf; int buf_size; uint64 tag; StringInfoData s; - if (!(AsyncRead(i, &buf, &buf_size))) + if (!(AsyncRead(wk, &buf, &buf_size))) return false; /* parse it */ @@ -2038,9 +2039,8 @@ AsyncReadMessage(int i, AcceptorProposerMessage *anymsg) * single packet. */ static bool -BlockingWrite(int i, void *msg, size_t msg_size, WalKeeperState success_state) +BlockingWrite(WalKeeper *wk, void *msg, size_t msg_size, WalKeeperState success_state) { - WalKeeper *wk = &walkeeper[i]; uint32 events; if (!walprop_blocking_write(wk->conn, msg, msg_size)) diff --git a/src/backend/replication/walproposer_utils.c b/src/backend/replication/walproposer_utils.c index c61ab87db45..0c1578b46ec 100644 --- a/src/backend/replication/walproposer_utils.c +++ b/src/backend/replication/walproposer_utils.c @@ -48,24 +48,15 @@ FormatWalKeeperState(WalKeeperState state) case SS_CONNECTING_WRITE: return_val = "connecting"; break; - case SS_EXEC_STARTWALPUSH: - return_val = "sending 'START_WAL_PUSH' query"; - break; case SS_WAIT_EXEC_RESULT: return_val = "receiving query result"; break; - case SS_HANDSHAKE_SEND: - return_val = "handshake (sending)"; - break; case SS_HANDSHAKE_RECV: return_val = "handshake (receiving)"; break; case SS_VOTING: return_val = "voting"; break; - case SS_SEND_VOTE: - return_val = "sending vote"; - break; case SS_WAIT_VERDICT: return_val = "wait-for-verdict"; break; @@ -140,19 +131,6 @@ WalKeeperStateDesiredEvents(WalKeeperState state) result = WL_SOCKET_READABLE; break; - /* Most writing states don't require any socket conditions */ - case SS_EXEC_STARTWALPUSH: - case SS_HANDSHAKE_SEND: - case SS_SEND_VOTE: - result = WL_NO_EVENTS; - break; - /* but flushing does require read- or write-ready */ - case SS_SEND_ELECTED_FLUSH: - /* Active state does both reading and writing to the socket */ - case SS_ACTIVE: - result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - break; - /* Idle states use read-readiness as a sign that the connection has been * disconnected. */ case SS_VOTING: @@ -160,6 +138,15 @@ WalKeeperStateDesiredEvents(WalKeeperState state) result = WL_SOCKET_READABLE; break; + /* + * Flush states require write-ready for flushing. + * Active state does both reading and writing. + */ + case SS_SEND_ELECTED_FLUSH: + case SS_ACTIVE: + result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + break; + /* The offline state expects no events. */ case SS_OFFLINE: result = WL_NO_EVENTS; @@ -169,16 +156,6 @@ WalKeeperStateDesiredEvents(WalKeeperState state) return result; } -/* Returns whether the WAL keeper state corresponds to something that should be - * immediately executed -- i.e. it is not idle, and is not currently waiting. */ -bool -StateShouldImmediatelyExecute(WalKeeperState state) -{ - /* This is actually pretty simple to determine. */ - return WalKeeperStateDesiredEvents(state) == WL_NO_EVENTS - && state != SS_OFFLINE; -} - /* Returns a human-readable string corresponding to the event set * * If the events do not correspond to something set as the `events` field of a `WaitEvent`, the @@ -309,4 +286,4 @@ pq_sendint64_le(StringInfo buf, uint64 i) enlargeStringInfo(buf, sizeof(uint64)); memcpy(buf->data + buf->len, &i, sizeof(uint64)); buf->len += sizeof(uint64); -} \ No newline at end of file +} diff --git a/src/include/replication/walproposer.h b/src/include/replication/walproposer.h index ca27df2d19b..45f00cc5c40 100644 --- a/src/include/replication/walproposer.h +++ b/src/include/replication/walproposer.h @@ -68,20 +68,12 @@ typedef enum } PGAsyncWriteResult; /* - * WAL safekeeper state + * WAL safekeeper state, which is used to wait for some event. * * States are listed here in the order that they're executed. * * Most states, upon failure, will move back to SS_OFFLINE by calls to * ResetConnection or ShutdownConnection. - * - * Also note: In places we say that a state "immediately" moves to another. This - * happens in states that only exist to execute program logic, so they run - * exactly once (when moved into), without waiting for any socket conditions. - * - * For example, when we set a WalKeeper's state to SS_SEND_VOTE, we immediately - * call AdvancePollState - during which the WalKeeper switches its state to - * SS_WAIT_VERDICT. */ typedef enum { @@ -99,28 +91,18 @@ typedef enum * they execute when polled, but we have this distinction in order to * recreate the event set in HackyRemoveWalProposerEvent. * - * After the connection is made, moves to SS_EXEC_STARTWALPUSH. + * After the connection is made, "START_WAL_PUSH" query is sent. */ SS_CONNECTING_WRITE, SS_CONNECTING_READ, - /* - * Sending the "START_WAL_PUSH" message as an empty query to the walkeeper. - * Performs a blocking send, then immediately moves to SS_WAIT_EXEC_RESULT. - */ - SS_EXEC_STARTWALPUSH, /* * Waiting for the result of the "START_WAL_PUSH" command. * - * After we get a successful result, moves to SS_HANDSHAKE_SEND. + * After we get a successful result, sends handshake to safekeeper. */ SS_WAIT_EXEC_RESULT, - /* - * Executing the sending half of the handshake. Performs the blocking send, - * then immediately moves to SS_HANDSHAKE_RECV. - */ - SS_HANDSHAKE_SEND, /* * Executing the receiving half of the handshake. After receiving, moves to * SS_VOTING. @@ -128,32 +110,28 @@ typedef enum SS_HANDSHAKE_RECV, /* - * Currently participating in voting, but a quorum hasn't yet been reached. + * Waiting to participate in voting, but a quorum hasn't yet been reached. * This is an idle state - we do not expect AdvancePollState to be called. * - * Moved externally to SS_SEND_VOTE or SS_WAIT_VERDICT by execution of - * SS_HANDSHAKE_RECV. + * Moved externally by execution of SS_HANDSHAKE_RECV, when we received a + * quorum of handshakes. */ SS_VOTING, - /* - * Performs a blocking send of the assigned vote, then immediately moves to - * SS_WAIT_VERDICT. - */ - SS_SEND_VOTE, + /* * Already sent voting information, waiting to receive confirmation from the - * node. After receiving, moves to SS_IDLE. + * node. After receiving, moves to SS_IDLE, if the quorum isn't reached yet. */ SS_WAIT_VERDICT, - /* need to flush ProposerAnnouncement */ + /* Need to flush ProposerElected message. */ SS_SEND_ELECTED_FLUSH, /* * Waiting for quorum to send WAL. Idle state. If the socket becomes * read-ready, the connection has been closed. * - * Moves to SS_ACTIVE only by calls to SendMessageToNode. + * Moves to SS_ACTIVE only by call to StartStreaming. */ SS_IDLE, @@ -361,7 +339,6 @@ int CompareLsn(const void *a, const void *b); char* FormatWalKeeperState(WalKeeperState state); void AssertEventsOkForState(uint32 events, WalKeeper* wk); uint32 WalKeeperStateDesiredEvents(WalKeeperState state); -bool StateShouldImmediatelyExecute(WalKeeperState state); char* FormatEvents(uint32 events); void WalProposerMain(Datum main_arg); void WalProposerBroadcast(XLogRecPtr startpos, char* data, int len); From e0cbfcb0d012597b6f5b3624b700febc0045c16b Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 30 Dec 2021 12:13:05 +0000 Subject: [PATCH 2/6] Migrate AsyncReadFixed to AsyncReadMessage --- src/backend/replication/walproposer.c | 70 ++++++++++++--------------- src/include/replication/walproposer.h | 4 +- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index e0d7d179049..f67bef911d7 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -138,7 +138,6 @@ static XLogRecPtr CalculateMinFlushLsn(void); static XLogRecPtr GetAcknowledgedByQuorumWALPosition(void); static void HandleWalKeeperResponse(void); static bool AsyncRead(WalKeeper *wk, char **buf, int *buf_size); -static bool AsyncReadFixed(WalKeeper *wk, void *value, size_t value_size); static bool AsyncReadMessage(WalKeeper *wk, AcceptorProposerMessage *anymsg); static bool BlockingWrite(WalKeeper *wk, void *msg, size_t msg_size, WalKeeperState success_state); static bool AsyncWrite(WalKeeper *wk, void *msg, size_t msg_size, WalKeeperState flush_state); @@ -908,7 +907,8 @@ RecvAcceptorGreeting(WalKeeper *wk) * error handling or state setting is taken care of. We can * leave any other work until later. */ - if (!AsyncReadFixed(wk, &wk->greet, sizeof(wk->greet))) + wk->greet.apm.tag = 'g'; + if (!AsyncReadMessage(wk, (AcceptorProposerMessage *) &wk->greet)) return; /* Protocol is all good, move to voting. */ @@ -1654,7 +1654,8 @@ RecvAppendResponses(WalKeeper *wk) * necessary error handling or state setting is taken care * of. We can leave any other work until later. */ - if (!AsyncReadFixed(wk, &wk->feedback, sizeof(wk->feedback))) + wk->feedback.apm.tag = 'a'; + if (!AsyncReadMessage(wk, (AcceptorProposerMessage *) &wk->feedback)) break; Assert(wk->ackMsg != NULL && (wk->ackMsg->ackMask & (1 << wki)) == 0); @@ -1664,7 +1665,7 @@ RecvAppendResponses(WalKeeper *wk) * look like we are receiving responses for messages that haven't been * sent yet. This can happen when message was placed in a buffer in * SendAppendRequests, but sent through a wire only with a flush inside - * AsyncReadFixed. In this case, we should move wk->currMsg. + * AsyncReadMessage. In this case, we should move wk->currMsg. */ if (wk->ackMsg == wk->currMsg) { @@ -1939,46 +1940,15 @@ AsyncRead(WalKeeper *wk, char **buf, int *buf_size) } /* - * Reads a CopyData block from the 'i'th WAL keeper's postgres connection, - * returning whether the read was successful. - * + * Read next message with known type into provided struct, by reading a CopyData + * block from the safekeeper's postgres connection, returning whether the read + * was successful. + * * If the read needs more polling, we return 'false' and keep the state * unmodified, waiting until it becomes read-ready to try again. If it fully * failed, a warning is emitted and the connection is reset. */ static bool -AsyncReadFixed(WalKeeper *wk, void *value, size_t value_size) -{ - char *buf = NULL; - int buf_size = -1; - - if (!(AsyncRead(wk, &buf, &buf_size))) - return false; - - /* - * If we get here, the read was ok, but we still need to check it was the - * right amount - */ - if ((size_t) buf_size != value_size) - { - elog(FATAL, - "Unexpected walkeeper %s:%s read length from %s state. Expected %ld, found %d", - wk->host, wk->port, - FormatWalKeeperState(wk->state), - value_size, buf_size); - } - - /* Copy the resulting info into place */ - memcpy(value, buf, buf_size); - - return true; -} - -/* - * Read next message with known type into provided struct. - * TODO: migrate AsyncReadFixed here for all messages - */ -static bool AsyncReadMessage(WalKeeper *wk, AcceptorProposerMessage *anymsg) { char *buf; @@ -2005,6 +1975,14 @@ AsyncReadMessage(WalKeeper *wk, AcceptorProposerMessage *anymsg) switch (tag) { + case 'g': + { + AcceptorGreeting *msg = (AcceptorGreeting *) anymsg; + msg->term = pq_getmsgint64_le(&s); + pq_getmsgend(&s); + return true; + } + case 'v': { VoteResponse *msg = (VoteResponse *) anymsg; @@ -2024,6 +2002,20 @@ AsyncReadMessage(WalKeeper *wk, AcceptorProposerMessage *anymsg) return true; } + case 'a': + { + AppendResponse *msg = (AppendResponse *) anymsg; + msg->term = pq_getmsgint64_le(&s); + msg->flushLsn = pq_getmsgint64_le(&s); + msg->commitLsn = pq_getmsgint64_le(&s); + msg->diskConsistentLsn = pq_getmsgint64_le(&s); + msg->hs.ts = pq_getmsgint64_le(&s); + msg->hs.xmin.value = pq_getmsgint64_le(&s); + msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s); + pq_getmsgend(&s); + return true; + } + default: { Assert(false); diff --git a/src/include/replication/walproposer.h b/src/include/replication/walproposer.h index 45f00cc5c40..0b74342ed87 100644 --- a/src/include/replication/walproposer.h +++ b/src/include/replication/walproposer.h @@ -173,7 +173,7 @@ typedef struct AcceptorProposerMessage */ typedef struct AcceptorGreeting { - uint64 tag; + AcceptorProposerMessage apm; term_t term; } AcceptorGreeting; @@ -284,11 +284,11 @@ typedef struct HotStandbyFeedback */ typedef struct AppendResponse { + AcceptorProposerMessage apm; /* * Current term of the safekeeper; if it is higher than proposer's, the * compute is out of date. */ - uint64 tag; term_t term; // TODO: add comment XLogRecPtr flushLsn; From bd51cd0d74cc8f10b4ed43ad5cefad44445d6442 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 30 Dec 2021 17:28:38 +0000 Subject: [PATCH 3/6] Handle flushWrite better a bit --- src/backend/replication/walproposer.c | 107 ++++++++++++++++---------- src/include/replication/walproposer.h | 4 +- 2 files changed, 67 insertions(+), 44 deletions(-) diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index f67bef911d7..ada2288dc53 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -130,6 +130,7 @@ static void SendMessageToNode(int i, WalMessage *msg); static void BroadcastMessage(WalMessage *msg); static WalMessage * CreateMessage(XLogRecPtr startpos, char *data, int len); static WalMessage * CreateMessageCommitLsnOnly(XLogRecPtr lsn); +static void HandleAppendState(WalKeeper *wk, uint32 events); static bool SendAppendRequests(WalKeeper *wk); static bool RecvAppendResponses(WalKeeper *wk); static void CombineHotStanbyFeedbacks(HotStandbyFeedback * hs); @@ -736,15 +737,7 @@ AdvancePollState(WalKeeper *wk, uint32 events) * Active state is used for streaming WAL and receiving feedback. */ case SS_ACTIVE: - if (events & WL_SOCKET_WRITEABLE) - if (!SendAppendRequests(wk)) - return; - - if (events & WL_SOCKET_READABLE) - if (!RecvAppendResponses(wk)) - return; - - UpdateEventSet(wk, WL_SOCKET_READABLE | (wk->currMsg == NULL ? 0 : WL_SOCKET_WRITEABLE)); + HandleAppendState(wk, events); break; } } @@ -1389,7 +1382,6 @@ StartStreaming(WalKeeper *wk) * exactly once for a connection. */ wk->state = SS_ACTIVE; - UpdateEventSet(wk, WL_SOCKET_READABLE); for (WalMessage *msg = msgQueueHead; msg != NULL; msg = msg->next) { @@ -1400,16 +1392,21 @@ StartStreaming(WalKeeper *wk) } else { + /* event set will be updated inside SendMessageToNode */ SendMessageToNode(wki, msg); return; } } + + /* Call SS_ACTIVE handler to update event set */ + HandleAppendState(wk, WL_NO_EVENTS); } /* - * Start sending message to the particular node. + * Start sending message to the particular node. Always updates event set. * - * Can be used only for safekeepers in SS_ACTIVE state. + * Can be used only for safekeepers in SS_ACTIVE state. State can be changed + * in case of errors. */ static void SendMessageToNode(int i, WalMessage *msg) @@ -1428,11 +1425,9 @@ SendMessageToNode(int i, WalMessage *msg) msg = msg->next; wk->currMsg = msg; - wk->flushWrite = false; /* Note: we always send everything to the safekeeper until WOULDBLOCK or nothing left to send */ - if (!SendAppendRequests(wk)) - return; + HandleAppendState(wk, WL_SOCKET_WRITEABLE); } /* @@ -1532,9 +1527,31 @@ CreateMessageCommitLsnOnly(XLogRecPtr lsn) return msg; } +/* + * Process all events happened in SS_ACTIVE state, update event set after that. + */ +static void +HandleAppendState(WalKeeper *wk, uint32 events) +{ + uint32 newEvents = WL_SOCKET_READABLE; + + if (events & WL_SOCKET_WRITEABLE) + if (!SendAppendRequests(wk)) + return; + + if (events & WL_SOCKET_READABLE) + if (!RecvAppendResponses(wk)) + return; + + if (wk->currMsg != NULL || wk->flushWrite) + newEvents |= WL_SOCKET_WRITEABLE; + + UpdateEventSet(wk, newEvents); +} + /* * Send queue messages starting from wk->currMsg until the end or non-writable - * socket, whichever comes first. + * socket, whichever comes first. Caller should take care of updating event set. * * Can change state if Async* functions encounter errors and reset connection. * Returns false in this case, true otherwise. @@ -1545,6 +1562,7 @@ SendAppendRequests(WalKeeper *wk) int wki = wk - walkeeper; WalMessage *msg; AppendRequestHeader *req; + PGAsyncWriteResult writeResult; if (wk->flushWrite) { @@ -1555,7 +1573,6 @@ SendAppendRequests(WalKeeper *wk) */ return wk->state == SS_ACTIVE; - wk->currMsg = wk->currMsg->next; wk->flushWrite = false; } @@ -1609,24 +1626,39 @@ SendAppendRequests(WalKeeper *wk) * message is stored after the end of the WalMessage * struct, in the allocation for each msg */ - if (!AsyncWrite(wk, req, - sizeof(AppendRequestHeader) + req->endLsn - req->beginLsn, - SS_ACTIVE)) - { - if (req != &msg->req) - free(req); - if (wk->state == SS_ACTIVE) - { - wk->flushWrite = true; - return true; - } - return false; - } + writeResult = walprop_async_write(wk->conn, req, sizeof(AppendRequestHeader) + req->endLsn - req->beginLsn); + + /* Free up resources */ if (req != &msg->req) free(req); - /* continue writing the next message */ + /* Mark current message as sent, whatever the result is */ wk->currMsg = wk->currMsg->next; + + switch (writeResult) + { + case PG_ASYNC_WRITE_SUCCESS: + /* Continue writing the next message */ + break; + + case PG_ASYNC_WRITE_TRY_FLUSH: + /* + * We still need to call PQflush some more to finish the job. + * Caller function will handle this by setting right event set. + */ + wk->flushWrite = true; + return true; + + case PG_ASYNC_WRITE_FAIL: + elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + wk->host, wk->port, FormatWalKeeperState(wk->state), + walprop_error_message(wk->conn)); + ShutdownConnection(wk); + return false; + default: + Assert(false); + return false; + } } return true; @@ -1663,18 +1695,9 @@ RecvAppendResponses(WalKeeper *wk) /* * We shouldn't read responses ahead of wk->currMsg, because that will * look like we are receiving responses for messages that haven't been - * sent yet. This can happen when message was placed in a buffer in - * SendAppendRequests, but sent through a wire only with a flush inside - * AsyncReadMessage. In this case, we should move wk->currMsg. + * sent yet. */ - if (wk->ackMsg == wk->currMsg) - { - /* Couldn't happen without flush flag */ - Assert(wk->flushWrite); - - wk->currMsg = wk->currMsg->next; - wk->flushWrite = false; - } + Assert(wk->ackMsg != wk->currMsg); wk->ackMsg->ackMask |= 1 << wki; /* this safekeeper confirms * receiving of this diff --git a/src/include/replication/walproposer.h b/src/include/replication/walproposer.h index 0b74342ed87..9506a6ee887 100644 --- a/src/include/replication/walproposer.h +++ b/src/include/replication/walproposer.h @@ -319,8 +319,8 @@ typedef struct WalKeeper WalProposerConn* conn; StringInfoData outbuf; - bool flushWrite; /* set to true if we wrote currMsg, but still need to call AsyncFlush */ - WalMessage* currMsg; /* message been send to the receiver */ + bool flushWrite; /* set to true if we need to call AsyncFlush, to flush pending messages */ + WalMessage* currMsg; /* message that wasn't sent yet or NULL, if we have nothing to send */ WalMessage* ackMsg; /* message waiting ack from the receiver */ int eventPos; /* position in wait event set. Equal to -1 if no event */ From 93283e9818b408762503802bc6272a0ea66407ff Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 3 Jan 2022 12:04:27 +0000 Subject: [PATCH 4/6] Update SS_ACTIVE event set in single place Now event set is updated only in the end of HandleActiveState, after all handlers code was executed. --- src/backend/replication/walproposer.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index ada2288dc53..e144ef12bb8 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -130,7 +130,7 @@ static void SendMessageToNode(int i, WalMessage *msg); static void BroadcastMessage(WalMessage *msg); static WalMessage * CreateMessage(XLogRecPtr startpos, char *data, int len); static WalMessage * CreateMessageCommitLsnOnly(XLogRecPtr lsn); -static void HandleAppendState(WalKeeper *wk, uint32 events); +static void HandleActiveState(WalKeeper *wk, uint32 events); static bool SendAppendRequests(WalKeeper *wk); static bool RecvAppendResponses(WalKeeper *wk); static void CombineHotStanbyFeedbacks(HotStandbyFeedback * hs); @@ -445,7 +445,7 @@ InitEventSet(void) * * This function is called any time the WAL keeper's state switches to one where * it has to wait to continue. This includes the full body of AdvancePollState - * and each call to AsyncRead/BlockingWrite/AsyncWrite/AsyncFlush. + * and calls to IO helper functions. */ static void UpdateEventSet(WalKeeper *wk, uint32 events) @@ -721,6 +721,7 @@ AdvancePollState(WalKeeper *wk, uint32 events) if (!AsyncFlush(wk)) return; + /* flush is done, event set and state will be updated later */ StartStreaming(wk); break; @@ -737,7 +738,7 @@ AdvancePollState(WalKeeper *wk, uint32 events) * Active state is used for streaming WAL and receiving feedback. */ case SS_ACTIVE: - HandleAppendState(wk, events); + HandleActiveState(wk, events); break; } } @@ -1370,7 +1371,8 @@ WalProposerStartStreaming(XLogRecPtr startpos) } /* - * Start streaming to safekeeper wk, always updates state to SS_ACTIVE. + * Start streaming to safekeeper wk, always updates state to SS_ACTIVE and sets + * correct event set. */ static void StartStreaming(WalKeeper *wk) @@ -1399,7 +1401,7 @@ StartStreaming(WalKeeper *wk) } /* Call SS_ACTIVE handler to update event set */ - HandleAppendState(wk, WL_NO_EVENTS); + HandleActiveState(wk, WL_NO_EVENTS); } /* @@ -1427,7 +1429,7 @@ SendMessageToNode(int i, WalMessage *msg) wk->currMsg = msg; /* Note: we always send everything to the safekeeper until WOULDBLOCK or nothing left to send */ - HandleAppendState(wk, WL_SOCKET_WRITEABLE); + HandleActiveState(wk, WL_SOCKET_WRITEABLE); } /* @@ -1531,7 +1533,7 @@ CreateMessageCommitLsnOnly(XLogRecPtr lsn) * Process all events happened in SS_ACTIVE state, update event set after that. */ static void -HandleAppendState(WalKeeper *wk, uint32 events) +HandleActiveState(WalKeeper *wk, uint32 events) { uint32 newEvents = WL_SOCKET_READABLE; @@ -1573,6 +1575,7 @@ SendAppendRequests(WalKeeper *wk) */ return wk->state == SS_ACTIVE; + /* Event set will be updated in the end of HandleActiveState */ wk->flushWrite = false; } @@ -2120,7 +2123,9 @@ AsyncWrite(WalKeeper *wk, void *msg, size_t msg_size, WalKeeperState flush_state * Flushes a previous call to AsyncWrite. This only needs to be called when the * socket becomes read or write ready *after* calling AsyncWrite. * - * If flushing successfully completes returns true, otherwise false. + * If flushing successfully completes returns true, otherwise false. Event set + * is updated only if connection fails, otherwise caller should manually unset + * WL_SOCKET_WRITEABLE. */ static bool AsyncFlush(WalKeeper *wk) @@ -2134,7 +2139,7 @@ AsyncFlush(WalKeeper *wk) switch (walprop_flush(wk->conn)) { case 0: - UpdateEventSet(wk, WL_SOCKET_READABLE); /* flush is done, unset write interest */ + /* flush is done */ return true; case 1: /* Nothing to do; try again when the socket's ready */ From 8645ffa89f1787916e633f5a5bfc9cc7a8b4ced9 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Tue, 4 Jan 2022 09:28:56 +0000 Subject: [PATCH 5/6] Add comment on SS_ACTIVE write event --- src/backend/replication/walproposer.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index e144ef12bb8..14f300d110b 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -1545,6 +1545,15 @@ HandleActiveState(WalKeeper *wk, uint32 events) if (!RecvAppendResponses(wk)) return; + /* + * We should wait for WL_SOCKET_WRITEABLE event if we have unflushed data + * in the buffer. + * + * wk->currMsg checks if we have pending unsent messages. This check isn't + * necessary now, because we always send queue messages immediately after + * creation. But it's good to have it here in case we change this behavior + * in the future. + */ if (wk->currMsg != NULL || wk->flushWrite) newEvents |= WL_SOCKET_WRITEABLE; From 33b79d82cfe8ed6fb990f12a1e9ce91c2059a51c Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Tue, 4 Jan 2022 09:54:43 +0000 Subject: [PATCH 6/6] Add TODO for SS_ACTIVE DesiredEvents --- src/backend/replication/walproposer_utils.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/replication/walproposer_utils.c b/src/backend/replication/walproposer_utils.c index 0c1578b46ec..74ea1cfd5b1 100644 --- a/src/backend/replication/walproposer_utils.c +++ b/src/backend/replication/walproposer_utils.c @@ -141,6 +141,9 @@ WalKeeperStateDesiredEvents(WalKeeperState state) /* * Flush states require write-ready for flushing. * Active state does both reading and writing. + * + * TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We should + * check wk->flushWrite here to set WL_SOCKET_WRITEABLE. */ case SS_SEND_ELECTED_FLUSH: case SS_ACTIVE: