Skip to content

Commit

Permalink
feat: compress state witness (#10715)
Browse files Browse the repository at this point in the history
This PR adds state witness compression as well as metrics around it. See
#10780 for the big picture analysis.

We explicitly limit the size of the decompressed state witness to 64MB
to handle [Zip bomb](https://en.wikipedia.org/wiki/Zip_bomb) attack.
This is implemented by using `BufMut` and `Limit` along with
`zstd::stream::copy_decode`, so it fails when attempting to write data
beyond the limit.

Compression reduces state witness size particularly well for large state
witnesses (containing many `ContractCode` values), which makes it
worthwhile. For cases when compression doesn't yield much improvements
the latency overhead is not significant.

In practice shadow validation was used to verify the statements above
with the current mainnet traffic:
* overall traffic reduction
([dashboard](https://nearinc.grafana.net/d/a41c3c5e-4c1b-41fc-919c-11040e9cfc13/shadow-chunk-validation?orgId=1&from=now-6h&to=now&viewPanel=32))
is about 16% which is not great. That makes sense considering that after
resharding to 6 shards the size of state witnesses per shard dropped
significantly which makes compression less effective.
* max state witness size
([uncompressed](https://nearinc.grafana.net/d/a41c3c5e-4c1b-41fc-919c-11040e9cfc13/shadow-chunk-validation?orgId=1&from=now-12h&to=now&viewPanel=33)
and
[compressed](https://nearinc.grafana.net/d/a41c3c5e-4c1b-41fc-919c-11040e9cfc13/shadow-chunk-validation?orgId=1&from=now-12h&to=now&viewPanel=1)
dashboards) reduced a lot for large state witnesses. We didn't observe
any witnesses larger than 2.5MB in compressed state while uncompressed
ones were as big as 6.6MB.
* additional latency
([avg](https://nearinc.grafana.net/d/a41c3c5e-4c1b-41fc-919c-11040e9cfc13/shadow-chunk-validation?orgId=1&from=now-6h&to=now&viewPanel=34)
and
[distribution](https://nearinc.grafana.net/d/a41c3c5e-4c1b-41fc-919c-11040e9cfc13/shadow-chunk-validation?orgId=1&from=now-6h&to=now&viewPanel=11)
dashboards) is mostly comes from encoding and is in a 10-20ms range for
the most busy shard 2.

This PR also includes the following changes:
- reduce state witness size histogram step
- bump `actix-http` version to avoid bringing multiple version of `zstd`
dependency ([zulip
thread](https://near.zulipchat.com/#narrow/stream/300659-Rust-.F0.9F.A6.80/topic/Transitive.20dependency.20conflict))
  • Loading branch information
pugachAG authored Apr 15, 2024
1 parent 579c53c commit dea7a10
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 58 deletions.
23 changes: 12 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ unnecessary_lazy_evaluations = "deny"
[workspace.dependencies]
actix = "0.13.0"
actix-cors = "0.6.1"
actix-http = "3.3"
actix-http = "3.6"
actix-rt = "2"
actix-web = "4.1"
anyhow = "1.0.62"
Expand Down Expand Up @@ -406,6 +406,7 @@ winapi = { version = "0.3", features = [
xshell = "0.2.1"
xz2 = "0.1.6"
yansi = "0.5.1"
zstd = "0.13.1"

stdx = { package = "near-stdx", path = "utils/stdx" }

Expand Down
34 changes: 32 additions & 2 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,39 @@ pub(crate) static CHUNK_STATE_WITNESS_VALIDATION_TIME: Lazy<HistogramVec> = Lazy
pub(crate) static CHUNK_STATE_WITNESS_TOTAL_SIZE: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_chunk_state_witness_total_size",
"Stateless validation state witness size in bytes",
"Stateless validation compressed state witness size in bytes",
&["shard_id"],
Some(exponential_buckets(1000.0, 2.0, 20).unwrap()),
Some(exponential_buckets(100_000.0, 1.2, 32).unwrap()),
)
.unwrap()
});

pub(crate) static CHUNK_STATE_WITNESS_RAW_SIZE: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_chunk_state_witness_raw_size",
"Stateless validation uncompressed (raw) state witness size in bytes",
&["shard_id"],
Some(exponential_buckets(100_000.0, 1.2, 32).unwrap()),
)
.unwrap()
});

pub(crate) static CHUNK_STATE_WITNESS_ENCODE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_chunk_state_witness_encode_time",
"State witness encoding (serialization + compression) latency in seconds",
&["shard_id"],
Some(linear_buckets(0.025, 0.025, 20).unwrap()),
)
.unwrap()
});

pub(crate) static CHUNK_STATE_WITNESS_DECODE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_chunk_state_witness_decode_time",
"State witness decoding (decompression + deserialization) latency in seconds",
&["shard_id"],
Some(linear_buckets(0.025, 0.025, 20).unwrap()),
)
.unwrap()
});
Expand Down
23 changes: 14 additions & 9 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use near_primitives::merkle::merklize;
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader};
use near_primitives::stateless_validation::{
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, SignedEncodedChunkStateWitness,
ChunkEndorsement, ChunkStateWitness, ChunkStateWitnessAck, ChunkStateWitnessSize,
SignedEncodedChunkStateWitness,
};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
Expand Down Expand Up @@ -624,7 +625,7 @@ impl Client {
signed_witness: SignedEncodedChunkStateWitness,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
let witness = self.partially_validate_state_witness(&signed_witness)?;
let (witness, raw_witness_size) = self.partially_validate_state_witness(&signed_witness)?;

// Send the acknowledgement for the state witness back to the chunk producer.
// This is currently used for network roundtrip time measurement, so we do not need to
Expand All @@ -639,10 +640,7 @@ impl Client {
),
Err(Error::DBNotFoundErr(_)) => {
// Previous block isn't available at the moment, add this witness to the orphan pool.
self.handle_orphan_state_witness(
witness,
signed_witness.witness_bytes.size_bytes(),
)?;
self.handle_orphan_state_witness(witness, raw_witness_size)?;
Ok(())
}
Err(err) => Err(err),
Expand Down Expand Up @@ -681,8 +679,10 @@ impl Client {
fn partially_validate_state_witness(
&self,
signed_witness: &SignedEncodedChunkStateWitness,
) -> Result<ChunkStateWitness, Error> {
let witness = signed_witness.witness_bytes.decode()?;
) -> Result<(ChunkStateWitness, ChunkStateWitnessSize), Error> {
let decode_start = std::time::Instant::now();
let (witness, raw_witness_size) = signed_witness.witness_bytes.decode()?;
let decode_elapsed_seconds = decode_start.elapsed().as_secs_f64();
let chunk_header = &witness.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();
Expand Down Expand Up @@ -731,6 +731,11 @@ impl Client {
return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string()));
}

Ok(witness)
// Record metrics after validating the witness
metrics::CHUNK_STATE_WITNESS_DECODE_TIME
.with_label_values(&[&witness_shard.to_string()])
.observe(decode_elapsed_seconds);

Ok((witness, raw_witness_size))
}
}
27 changes: 22 additions & 5 deletions chain/client/src/stateless_validation/shadow_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,26 @@ impl Client {
chunk,
validated_transactions.storage_proof,
)?;
let witness_size = EncodedChunkStateWitness::encode(&witness).size_bytes();
metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[&shard_id.to_string()])
.observe(witness_size as f64);
let (encoded_witness, raw_witness_size) = {
let shard_id_label = shard_id.to_string();
let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let (encoded_witness, raw_witness_size) = EncodedChunkStateWitness::encode(&witness)?;
encode_timer.observe_duration();
metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(encoded_witness.size_bytes() as f64);
metrics::CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(raw_witness_size as f64);
let decode_timer = metrics::CHUNK_STATE_WITNESS_DECODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
encoded_witness.decode()?;
decode_timer.observe_duration();
(encoded_witness, raw_witness_size)
};
let pre_validation_start = Instant::now();
let pre_validation_result = pre_validate_chunk_state_witness(
&witness,
Expand All @@ -97,7 +113,8 @@ impl Client {
target: "stateless_validation",
shard_id,
?chunk_hash,
witness_size,
witness_size = encoded_witness.size_bytes(),
raw_witness_size,
pre_validation_elapsed = ?pre_validation_start.elapsed(),
"completed shadow chunk pre-validation"
);
Expand Down
33 changes: 27 additions & 6 deletions chain/client/src/stateless_validation/state_witness_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{ChunkHash, ReceiptProof, ShardChunk, ShardChunkHeader};
use near_primitives::stateless_validation::{
ChunkStateTransition, ChunkStateWitness, ChunkStateWitnessAck, SignedEncodedChunkStateWitness,
StoredChunkStateTransitionData,
ChunkStateTransition, ChunkStateWitness, ChunkStateWitnessAck, EncodedChunkStateWitness,
SignedEncodedChunkStateWitness, StoredChunkStateTransitionData,
};
use near_primitives::types::{AccountId, EpochId};
use near_primitives::validator_signer::ValidatorSigner;
use std::collections::HashMap;

use crate::stateless_validation::chunk_validator::send_chunk_endorsement_to_block_producers;
Expand Down Expand Up @@ -53,10 +54,7 @@ impl Client {
chunk,
transactions_storage_proof,
)?;
let signed_witness = SignedEncodedChunkStateWitness::new(&witness, my_signer.as_ref());
metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[&chunk_header.shard_id().to_string()])
.observe(signed_witness.witness_bytes.size_bytes() as f64);
let signed_witness = create_signed_witness(&witness, my_signer.as_ref())?;

if chunk_validators.contains(my_signer.validator_id()) {
// Bypass state witness validation if we created state witness. Endorse the chunk immediately.
Expand Down Expand Up @@ -302,3 +300,26 @@ impl Client {
Ok(source_receipt_proofs)
}
}

fn create_signed_witness(
witness: &ChunkStateWitness,
my_signer: &dyn ValidatorSigner,
) -> Result<SignedEncodedChunkStateWitness, Error> {
let shard_id_label = witness.chunk_header.shard_id().to_string();
let encode_timer = metrics::CHUNK_STATE_WITNESS_ENCODE_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let (witness_bytes, raw_witness_size) = EncodedChunkStateWitness::encode(&witness)?;
encode_timer.observe_duration();
let signed_witness = SignedEncodedChunkStateWitness {
signature: my_signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};
metrics::CHUNK_STATE_WITNESS_TOTAL_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(signed_witness.witness_bytes.size_bytes() as f64);
metrics::CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id_label.as_str()])
.observe(raw_witness_size as f64);
Ok(signed_witness)
}
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl TestEnv {
fn found_differing_post_state_root_due_to_state_transitions(
signed_witness: &SignedEncodedChunkStateWitness,
) -> bool {
let witness = signed_witness.witness_bytes.decode().unwrap();
let witness = signed_witness.witness_bytes.decode().unwrap().0;
let mut post_state_roots = HashSet::from([witness.main_state_transition.post_state_root]);
post_state_roots.extend(witness.implicit_transitions.iter().map(|t| t.post_state_root));
post_state_roots.len() >= 2
Expand Down
9 changes: 7 additions & 2 deletions chain/epoch-manager/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use near_primitives::hash::hash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::sharding::{ShardChunkHeader, ShardChunkHeaderV3};
use near_primitives::stateless_validation::{
ChunkStateTransition, ChunkStateWitness, SignedEncodedChunkStateWitness,
ChunkStateTransition, ChunkStateWitness, EncodedChunkStateWitness,
SignedEncodedChunkStateWitness,
};
use near_primitives::types::ValidatorKickoutReason::{NotEnoughBlocks, NotEnoughChunks};
use near_primitives::validator_signer::ValidatorSigner;
Expand Down Expand Up @@ -2927,7 +2928,11 @@ fn test_verify_chunk_state_witness() {
Default::default(),
);
// Check chunk state witness validity.
let mut chunk_state_witness = SignedEncodedChunkStateWitness::new(&witness, signer.as_ref());
let witness_bytes = EncodedChunkStateWitness::encode(&witness).unwrap().0;
let mut chunk_state_witness = SignedEncodedChunkStateWitness {
signature: signer.sign_chunk_state_witness(&witness_bytes),
witness_bytes,
};
assert!(epoch_manager
.verify_chunk_state_witness_signature(&chunk_state_witness, &chunk_producer, &epoch_id)
.unwrap());
Expand Down
2 changes: 2 additions & 0 deletions core/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arbitrary.workspace = true
base64.workspace = true
borsh.workspace = true
bytesize.workspace = true
bytes.workspace = true
cfg-if.workspace = true
chrono.workspace = true
derive_more.workspace = true
Expand All @@ -39,6 +40,7 @@ stdx.workspace = true
strum.workspace = true
thiserror.workspace = true
tracing.workspace = true
zstd.workspace = true

near-async.workspace = true
near-crypto.workspace = true
Expand Down
Loading

0 comments on commit dea7a10

Please sign in to comment.