Skip to content

Commit

Permalink
Changed bypass to skip and crc to checksum. Changed tests. Removed un…
Browse files Browse the repository at this point in the history
…ecessary metric and added a log instead. Using sendCommandArgv when sendding replica capa

Signed-off-by: Tal Shachar <talxsha@amazon.com>
  • Loading branch information
talxsha committed Jan 12, 2025
1 parent 04e7dbe commit c74edf0
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ typedef struct ConnectionType {
/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);

/* Miselenious */
/* Miscellaneous */
int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks
} ConnectionType;

Expand Down
29 changes: 14 additions & 15 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3024,7 +3024,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int error;
long long empty_keys_skipped = 0;

if (rdb->flags & RIO_FLAG_BYPASS_CRC) server.stat_total_sync_bypass_crc++;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
Expand Down Expand Up @@ -3369,7 +3368,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0 || (rdb->flags & RIO_FLAG_BYPASS_CRC)) {
if (rdb->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: skipped checksum for this transfer");
} else if (cksum == 0) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
Expand Down Expand Up @@ -3562,9 +3563,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC.
* Check replica capabilities, if every replica supports skiping RDB checksum, primary should also skip checksum.
* Otherwise, use checksum for this RDB transfer.
*/
int bypass_crc = 1;
int skip_rdb_checksum = 1;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
Expand Down Expand Up @@ -3599,9 +3601,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}

// do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC))
bypass_crc = 0;
// do not skip RDB checksum on the primary if connection doesn't have integrity check or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->repl_data->replica_capa & REPLICA_CAPA_SKIP_RDB_CHECKSUM))
skip_rdb_checksum = 0;
}

/* Create the child process. */
Expand All @@ -3625,11 +3627,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);

if (bypass_crc) {
serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer");
// mark rdb object to skip CRC checksum calculations
rdb.flags |= RIO_FLAG_BYPASS_CRC;
}
if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;
Expand Down Expand Up @@ -3680,8 +3678,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
dual_channel ? "direct socket to replica" : "pipe through parent process");
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s%s", (long)childpid,
dual_channel ? "direct socket to replica" : "pipe through parent process",
skip_rdb_checksum ? " while skipping RDB checksum for this transfer" : "");

server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
if (dual_channel) {
Expand All @@ -3696,7 +3696,6 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
}
if (!dual_channel) close(safe_to_exit_pipe);
if (bypass_crc) server.stat_total_sync_bypass_crc++;
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
2 changes: 1 addition & 1 deletion src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,7 @@ static ConnectionType CT_RDMA = {
.postpone_update_state = postPoneUpdateRdmaState,
.update_state = updateRdmaState,

/* Miselenious */
/* Miscellaneous */
.connIntegrityChecked = NULL,
};

Expand Down
65 changes: 36 additions & 29 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1314,13 +1314,13 @@ void freeClientReplicationData(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel|bypass-crc>
* - capa <eof|psync2|dual-channel|skip-rdb-checksum>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* bypass-crc: supports skipping CRC calculations during diskless sync using
* a connection that has integrity checks (such as TLS).
* skip-rdb-checksum: supports skipping RDB checksum calculations during diskless sync using
* a connection that has integrity checks (such as TLS).
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
Expand Down Expand Up @@ -1388,9 +1388,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->repl_data->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_BYPASS_CRC_STR))
c->repl_data->replica_capa |= REPLICA_CAPA_BYPASS_CRC;
}
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR))
c->repl_data->replica_capa |= REPLICA_CAPA_SKIP_RDB_CHECKSUM;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -2049,9 +2048,10 @@ static int useDisklessLoad(void) {
return enabled;
}

/* Returns 1 if the replica can skip CRC calculations during full sync */
int replicationSupportBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) {
return is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn);
/* Returns 1 if the node can skip RDB checksum during full sync.
* We can RDB checksum when data is transmitted through a verified stream. */
int replicationSupportSkipRDBChecksum(connection *conn, int is_replica_stream_verified, int is_primary_stream_verified) {
return is_replica_stream_verified && is_primary_stream_verified && connIsIntegrityChecked(conn);
}

/* Helper function for readSyncBulkPayload() to initialize tempDb
Expand Down Expand Up @@ -2333,14 +2333,7 @@ void readSyncBulkPayload(connection *conn) {

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
if (replicationSupportBypassCRC(conn, use_diskless_load, usemark)) {
/* We can bypass CRC checks when data is transmitted through a verified stream.
* The usemark flag indicates that the primary is streaming the data directly without
* writing it to storage.
* Similarly, the use_diskless_load flag indicates that the
* replica will load the payload directly into memory without first writing it to disk. */
rdb.flags |= RIO_FLAG_BYPASS_CRC;
}
if (replicationSupportSkipRDBChecksum(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
Expand Down Expand Up @@ -2582,7 +2575,6 @@ char *sendCommand(connection *conn, ...) {
while (1) {
arg = va_arg(ap, char *);
if (arg == NULL) break;
if (strcmp(arg, "") == 0) continue;
cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg);
argslen++;
}
Expand Down Expand Up @@ -3600,19 +3592,34 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* BYPASS-CRC: supports skipping CRC calculations during full sync.
* Inform the primary of this capa only during diskless sync using a
* connection that has integrity checks (such as TLS).
* In disk-based sync, or non-integrity-checked connection, there is more
* concern for data corruprion so we keep this extra layer of detection.
* skip-rdb-checksum: supports skipping RDB checksum during full sync.
* Inform the primary of this capa only during diskless sync
* using a connection that has integrity checks (such as TLS).
* In non-diskless sync, or non-integrity-checked connection, there is more
* concern for data corruprion so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
int send_bypass_crc_capa = replicationSupportBypassCRC(conn, useDisklessLoad(), 1);
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
send_bypass_crc_capa ? "capa" : "",
send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "",
server.dual_channel_replication ? "capa" : "",
server.dual_channel_replication ? "dual-channel" : "", NULL);
int send_skip_rdb_checksum_capa = replicationSupportSkipRDBChecksum(conn, useDisklessLoad(), 1); // we can ignore primary's conditions when sending capa (is_primary_stream_verified=1)
char *argv[9] = {"REPLCONF", "capa", "eof", "capa", "psync2", NULL, NULL, NULL, NULL};
size_t lens[9] = {8, 4, 3, 4, 6, 0, 0, 0, 0};
int argc = 5;
if (send_skip_rdb_checksum_capa) {
argv[argc] = "capa";
lens[argc] = strlen("capa");
argc++;
argv[argc] = REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR;
lens[argc] = strlen(REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR);
argc++;
}
if (server.dual_channel_replication) {
argv[argc] = "capa";
lens[argc] = strlen("capa");
argc++;
argv[argc] = "dual-channel";
lens[argc] = strlen("dual-channel");
argc++;
}
err = sendCommandArgv(conn, argc, argv, lens);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
Expand Down
2 changes: 1 addition & 1 deletion src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations
if ((r->flags & RIO_FLAG_SKIP_RDB_CHECKSUM) != 0) return; // skip RDB checksum
r->cksum = crc64(r->cksum, buf, len);
}

Expand Down
2 changes: 1 addition & 1 deletion src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */
#define RIO_FLAG_BYPASS_CRC (1 << 3)
#define RIO_FLAG_SKIP_RDB_CHECKSUM (1 << 3)

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down
2 changes: 0 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2642,7 +2642,6 @@ void resetServerStats(void) {
server.stat_fork_rate = 0;
server.stat_total_forks = 0;
server.stat_rejected_conn = 0;
server.stat_total_sync_bypass_crc = 0;
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
Expand Down Expand Up @@ -5882,7 +5881,6 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024,
"instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024,
"rejected_connections:%lld\r\n", server.stat_rejected_conn,
"total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc,
"sync_full:%lld\r\n", server.stat_sync_full,
"sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok,
"sync_partial_err:%lld\r\n", server.stat_sync_partial_err,
Expand Down
5 changes: 2 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,10 @@ typedef enum {
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */
#define REPLICA_CAPA_SKIP_RDB_CHECKSUM (1 << 3) /* Supports skipping RDB checksum for sync requests. */

/* Replica capability strings */
#define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */
#define REPLICA_CAPA_SKIP_RDB_CHECKSUM_STR "skip-rdb-checksum" /* Supports skipping RDB checksum for sync requests. */

/* Replica requirements */
#define REPLICA_REQ_NONE 0
Expand Down Expand Up @@ -1690,7 +1690,6 @@ struct valkeyServer {
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
long long stat_rejected_conn; /* Clients rejected because of maxclients */
size_t stat_total_sync_bypass_crc; /* Total number of full syncs stated with CRC checksum bypassed */
long long stat_sync_full; /* Number of full resyncs with replicas. */
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */
Expand Down
2 changes: 1 addition & 1 deletion src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ static ConnectionType CT_Socket = {
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
/* Miscellaneous */
.connIntegrityChecked = NULL,
};

Expand Down
2 changes: 1 addition & 1 deletion src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ static ConnectionType CT_TLS = {
/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,

/* Miselenious */
/* Miscellaneous */
.connIntegrityChecked = connTLSIsIntegrityChecked,
};

Expand Down
2 changes: 1 addition & 1 deletion src/unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ static ConnectionType CT_Unix = {
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
/* Miscellaneous */
.connIntegrityChecked = NULL,
};

Expand Down
37 changes: 0 additions & 37 deletions tests/integration/bypass-crc.tcl

This file was deleted.

Loading

0 comments on commit c74edf0

Please sign in to comment.