diff --git a/src/connection.h b/src/connection.h index 4b513ffccb..b631ba579d 100644 --- a/src/connection.h +++ b/src/connection.h @@ -122,7 +122,7 @@ typedef struct ConnectionType { /* TLS specified methods */ sds (*get_peer_cert)(struct connection *conn); - /* Miselenious */ + /* Miscellaneous */ int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks } ConnectionType; diff --git a/src/rdb.c b/src/rdb.c index b39021a4d1..58dd9524b2 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3024,7 +3024,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int error; long long empty_keys_skipped = 0; - if (rdb->flags & RIO_FLAG_BYPASS_CRC) server.stat_total_sync_bypass_crc++; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = server.loading_process_events_interval_bytes; if (rioRead(rdb, buf, 9) == 0) goto eoferr; @@ -3369,7 +3368,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (rioRead(rdb, &cksum, 8) == 0) goto eoferr; if (server.rdb_checksum && !server.skip_checksum_validation) { memrev64ifbe(&cksum); - if (cksum == 0 || (rdb->flags & RIO_FLAG_BYPASS_CRC)) { + if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) { + serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: skipped checksum for this transfer"); + } else if (cksum == 0) { serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { serverLog(LL_WARNING, @@ -3562,9 +3563,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } /* * For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req: - * Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC. + * Check replica capabilities, if every replica supports skiping RDB checksum, primary should also skip checksum. + * Otherwise, use checksum for this RDB transfer. */ - int bypass_crc = 1; + int skip_rdb_checksum = 1; /* Collect the connections of the replicas we want to transfer * the RDB to, which are in WAIT_BGSAVE_START state. */ int connsnum = 0; @@ -3599,9 +3601,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } - // do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it - if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC)) - bypass_crc = 0; + // do not skip RDB checksum on the primary if connection doesn't have integrity check or if the replica doesn't support it + if (!connIsIntegrityChecked(replica->conn) || !(replica->repl_data->replica_capa & REPLICA_CAPA_SKIP_RDB_CHECKSUM)) + skip_rdb_checksum = 0; } /* Create the child process. */ @@ -3625,11 +3627,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } serverSetCpuAffinity(server.bgsave_cpulist); - if (bypass_crc) { - serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer"); - // mark rdb object to skip CRC checksum calculations - rdb.flags |= RIO_FLAG_BYPASS_CRC; - } + if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; @@ -3680,8 +3678,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { server.rdb_pipe_numconns_writing = 0; } } else { - serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid, - dual_channel ? "direct socket to replica" : "pipe through parent process"); + serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s%s", (long)childpid, + dual_channel ? "direct socket to replica" : "pipe through parent process", + skip_rdb_checksum ? " while skipping RDB checksum for this transfer" : ""); + server.rdb_save_time_start = time(NULL); server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; if (dual_channel) { @@ -3696,7 +3696,6 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } if (!dual_channel) close(safe_to_exit_pipe); - if (bypass_crc) server.stat_total_sync_bypass_crc++; return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ diff --git a/src/rdma.c b/src/rdma.c index d2aca8b708..7bdf1828db 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -1818,7 +1818,7 @@ static ConnectionType CT_RDMA = { .postpone_update_state = postPoneUpdateRdmaState, .update_state = updateRdmaState, - /* Miselenious */ + /* Miscellaneous */ .connIntegrityChecked = NULL, }; diff --git a/src/replication.c b/src/replication.c index ea78557429..447938f22b 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1314,13 +1314,13 @@ void freeClientReplicationData(client *c) { * the primary can accurately lists replicas and their listening ports in the * INFO output. * - * - capa + * - capa * What is the capabilities of this instance. * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * dual-channel: supports full sync using rdb channel. - * bypass-crc: supports skipping CRC calculations during diskless sync using - * a connection that has integrity checks (such as TLS). + * skip-rdb-checksum: supports skipping RDB checksum calculations during diskless sync using + * a connection that has integrity checks (such as TLS). * * - ack [fack ] * Replica informs the primary the amount of replication stream that it @@ -1388,9 +1388,8 @@ void replconfCommand(client *c) { /* If dual-channel is disable on this primary, treat this command as unrecognized * replconf option. */ c->repl_data->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL; - } else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_BYPASS_CRC_STR)) - c->repl_data->replica_capa |= REPLICA_CAPA_BYPASS_CRC; - } + } else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR)) + c->repl_data->replica_capa |= REPLICA_CAPA_SKIP_RDB_CHECKSUM; } else if (!strcasecmp(c->argv[j]->ptr, "ack")) { /* REPLCONF ACK is used by replica to inform the primary the amount * of replication stream that it processed so far. It is an @@ -2049,9 +2048,10 @@ static int useDisklessLoad(void) { return enabled; } -/* Returns 1 if the replica can skip CRC calculations during full sync */ -int replicationSupportBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) { - return is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn); +/* Returns 1 if the node can skip RDB checksum during full sync. + * We can RDB checksum when data is transmitted through a verified stream. */ +int replicationSupportSkipRDBChecksum(connection *conn, int is_replica_stream_verified, int is_primary_stream_verified) { + return is_replica_stream_verified && is_primary_stream_verified && connIsIntegrityChecked(conn); } /* Helper function for readSyncBulkPayload() to initialize tempDb @@ -2333,14 +2333,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); - if (replicationSupportBypassCRC(conn, use_diskless_load, usemark)) { - /* We can bypass CRC checks when data is transmitted through a verified stream. - * The usemark flag indicates that the primary is streaming the data directly without - * writing it to storage. - * Similarly, the use_diskless_load flag indicates that the - * replica will load the payload directly into memory without first writing it to disk. */ - rdb.flags |= RIO_FLAG_BYPASS_CRC; - } + if (replicationSupportSkipRDBChecksum(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { @@ -2582,7 +2575,6 @@ char *sendCommand(connection *conn, ...) { while (1) { arg = va_arg(ap, char *); if (arg == NULL) break; - if (strcmp(arg, "") == 0) continue; cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg); argslen++; } @@ -3600,19 +3592,34 @@ void syncWithPrimary(connection *conn) { * * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * BYPASS-CRC: supports skipping CRC calculations during full sync. - * Inform the primary of this capa only during diskless sync using a - * connection that has integrity checks (such as TLS). - * In disk-based sync, or non-integrity-checked connection, there is more - * concern for data corruprion so we keep this extra layer of detection. + * skip-rdb-checksum: supports skipping RDB checksum during full sync. + * Inform the primary of this capa only during diskless sync + * using a connection that has integrity checks (such as TLS). + * In non-diskless sync, or non-integrity-checked connection, there is more + * concern for data corruprion so we keep this extra layer of detection. * * The primary will ignore capabilities it does not understand. */ - int send_bypass_crc_capa = replicationSupportBypassCRC(conn, useDisklessLoad(), 1); - err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - send_bypass_crc_capa ? "capa" : "", - send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "", - server.dual_channel_replication ? "capa" : "", - server.dual_channel_replication ? "dual-channel" : "", NULL); + int send_skip_rdb_checksum_capa = replicationSupportSkipRDBChecksum(conn, useDisklessLoad(), 1); // we can ignore primary's conditions when sending capa (is_primary_stream_verified=1) + char *argv[9] = {"REPLCONF", "capa", "eof", "capa", "psync2", NULL, NULL, NULL, NULL}; + size_t lens[9] = {8, 4, 3, 4, 6, 0, 0, 0, 0}; + int argc = 5; + if (send_skip_rdb_checksum_capa) { + argv[argc] = "capa"; + lens[argc] = strlen("capa"); + argc++; + argv[argc] = REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR; + lens[argc] = strlen(REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR); + argc++; + } + if (server.dual_channel_replication) { + argv[argc] = "capa"; + lens[argc] = strlen("capa"); + argc++; + argv[argc] = "dual-channel"; + lens[argc] = strlen("dual-channel"); + argc++; + } + err = sendCommandArgv(conn, argc, argv, lens); if (err) goto write_error; /* Inform the primary of our (replica) version. */ diff --git a/src/rio.c b/src/rio.c index a1638d589b..38a80b36fc 100644 --- a/src/rio.c +++ b/src/rio.c @@ -425,7 +425,7 @@ void rioFreeFd(rio *r) { /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { - if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations + if ((r->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) != 0) return; // skip RDB checksum r->cksum = crc64(r->cksum, buf, len); } diff --git a/src/rio.h b/src/rio.h index cbe7886fbe..f7d43ef84a 100644 --- a/src/rio.h +++ b/src/rio.h @@ -40,7 +40,7 @@ #define RIO_FLAG_READ_ERROR (1 << 0) #define RIO_FLAG_WRITE_ERROR (1 << 1) #define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */ -#define RIO_FLAG_BYPASS_CRC (1 << 3) +#define RIO_FLAG_SKIP_RDB_CHECKSUM (1 << 3) #define RIO_TYPE_FILE (1 << 0) #define RIO_TYPE_BUFFER (1 << 1) diff --git a/src/server.c b/src/server.c index 290223cf5e..5de0a123f2 100644 --- a/src/server.c +++ b/src/server.c @@ -2642,7 +2642,6 @@ void resetServerStats(void) { server.stat_fork_rate = 0; server.stat_total_forks = 0; server.stat_rejected_conn = 0; - server.stat_total_sync_bypass_crc = 0; server.stat_sync_full = 0; server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; @@ -5882,7 +5881,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024, "instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024, "rejected_connections:%lld\r\n", server.stat_rejected_conn, - "total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc, "sync_full:%lld\r\n", server.stat_sync_full, "sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok, "sync_partial_err:%lld\r\n", server.stat_sync_partial_err, diff --git a/src/server.h b/src/server.h index 56869bf7d6..7d84054b5e 100644 --- a/src/server.h +++ b/src/server.h @@ -440,10 +440,10 @@ typedef enum { #define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ #define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ #define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ -#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */ +#define REPLICA_CAPA_SKIP_RDB_CHECKSUM (1 << 3) /* Supports skipping RDB checksum for sync requests. */ /* Replica capability strings */ -#define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */ +#define REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR "skip-rdb-checksum" /* Supports skipping RDB checksum for sync requests. */ /* Replica requirements */ #define REPLICA_REQ_NONE 0 @@ -1690,7 +1690,6 @@ struct valkeyServer { double stat_fork_rate; /* Fork rate in GB/sec. */ long long stat_total_forks; /* Total count of fork. */ long long stat_rejected_conn; /* Clients rejected because of maxclients */ - size_t stat_total_sync_bypass_crc; /* Total number of full syncs stated with CRC checksum bypassed */ long long stat_sync_full; /* Number of full resyncs with replicas. */ long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */ diff --git a/src/socket.c b/src/socket.c index 0c8cc9f0c9..220726b83b 100644 --- a/src/socket.c +++ b/src/socket.c @@ -448,7 +448,7 @@ static ConnectionType CT_Socket = { .postpone_update_state = NULL, .update_state = NULL, - /* Miselenious */ + /* Miscellaneous */ .connIntegrityChecked = NULL, }; diff --git a/src/tls.c b/src/tls.c index 3dc42f209f..482ad03f4e 100644 --- a/src/tls.c +++ b/src/tls.c @@ -1191,7 +1191,7 @@ static ConnectionType CT_TLS = { /* TLS specified methods */ .get_peer_cert = connTLSGetPeerCert, - /* Miselenious */ + /* Miscellaneous */ .connIntegrityChecked = connTLSIsIntegrityChecked, }; diff --git a/src/unix.c b/src/unix.c index 76ab8f23bd..a46bd53a02 100644 --- a/src/unix.c +++ b/src/unix.c @@ -208,7 +208,7 @@ static ConnectionType CT_Unix = { .postpone_update_state = NULL, .update_state = NULL, - /* Miselenious */ + /* Miscellaneous */ .connIntegrityChecked = NULL, }; diff --git a/tests/integration/bypass-crc.tcl b/tests/integration/bypass-crc.tcl deleted file mode 100644 index d07c2e6667..0000000000 --- a/tests/integration/bypass-crc.tcl +++ /dev/null @@ -1,37 +0,0 @@ -start_server {tags {"repl tls"} overrides {save {}}} { - set primary [srv 0 client] - set primary_host [srv 0 host] - set primary_port [srv 0 port] - set primary_bypassed_crc_counter 0 - foreach mds {no yes} { - foreach sdl {disabled on-empty-db swapdb flush-before-load} { - test "Bypass CRC sync - tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" { - $primary config set repl-diskless-sync $mds - start_server {overrides {save {}}} { - set replica [srv 0 client] - $replica config set repl-diskless-load $sdl - $replica replicaof $primary_host $primary_port - - wait_for_condition 50 100 { - [string match {*master_link_status:up*} [$replica info replication]] - } else { - fail "Replication not started" - } - - set replica_bypassing_crc_count [string match {*total_sync_bypass_crc:1*} [$replica info stats]] - set stats [regexp -inline {total_sync_bypass_crc:(\d+)} [$primary info stats]] - set primary_bypass_crc_count [lindex $stats 1] - - if {$sdl eq "disabled" || $mds eq "no" || !$::tls} { - assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should not bypass CRC in this scenario" - assert_equal 0 $replica_bypassing_crc_count "Replica should not bypass CRC in this scenario" - } else { - incr primary_bypassed_crc_counter - assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should bypass CRC in this scenario" - assert_equal 1 $replica_bypassing_crc_count "Replica should bypass CRC in this scenario" - } - } - } - } - } -} diff --git a/tests/integration/skip-rdb-checksum.tcl b/tests/integration/skip-rdb-checksum.tcl new file mode 100644 index 0000000000..2c15b02eb3 --- /dev/null +++ b/tests/integration/skip-rdb-checksum.tcl @@ -0,0 +1,49 @@ +proc test_skip_rdb_checksum {primary primary_host primary_port primary_skipped_rdb_checksum_counter primary_diskless_sync replica_diskless_load} { + upvar primary_skipped_rdb_checksum_counter counter + $primary config set repl-diskless-sync $primary_diskless_sync + start_server {overrides {save {}}} { + set replica [srv 0 client] + $replica config set repl-diskless-load $replica_diskless_load + $replica replicaof $primary_host $primary_port + + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started" + } + + set replica_skipping_rdb_checksum_count [count_log_message 0 "RDB file was saved with checksum disabled: skipped checksum for this transfer"] + set primary_skipping_rdb_checksum_count [count_log_message -1 "while skipping RDB checksum for this transfer"] + + if {$replica_diskless_load eq "disabled" || $primary_diskless_sync eq "no" || !$::tls} { + assert_equal $counter $primary_skipping_rdb_checksum_count "Primary should not skip RDB checksum in this scenario" + assert_equal 0 $replica_skipping_rdb_checksum_count "Replica should not skip RDB checksum in this scenario" + } else { + incr counter + assert_equal $counter $primary_skipping_rdb_checksum_count "Primary should skip RDB checksum in this scenario" + assert_equal 1 $replica_skipping_rdb_checksum_count "Replica should skip RDB checksum in this scenario" + } + } +} + +start_server {tags {"repl tls"} overrides {save {}}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + set primary_skipped_rdb_checksum_counter 0 + if {$::tls} { + foreach primary_diskless_sync {no yes} { + foreach replica_diskless_load {disabled on-empty-db swapdb flush-before-load} { + test "Skip RDB checksum sync - tls:$::tls, repl_diskless_sync:$primary_diskless_sync, repl_diskless_load:$replica_diskless_load" { + test_skip_rdb_checksum $primary $primary_host $primary_port $primary_skipped_rdb_checksum_counter $primary_diskless_sync $replica_diskless_load + } + } + } + } else { + set primary_diskless_sync yes + set replica_diskless_load on-empty-db + test "Skip RDB checksum sync - tls:$::tls, repl_diskless_sync:$primary_diskless_sync, repl_diskless_load:$replica_diskless_load" { + test_skip_rdb_checksum $primary $primary_host $primary_port $primary_skipped_rdb_checksum_counter $primary_diskless_sync $replica_diskless_load + } + } +}