Skip to content

Commit

Permalink
Compute batch returns for direct onboarding methods (#1393)
Browse files Browse the repository at this point in the history
  • Loading branch information
anorth authored Aug 31, 2023
1 parent b374f45 commit 18940d8
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 22 deletions.
54 changes: 36 additions & 18 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use fil_actors_runtime::cbor::{serialize, serialize_vec};
use fil_actors_runtime::runtime::builtins::Type;
use fil_actors_runtime::runtime::{ActorCode, DomainSeparationTag, Policy, Runtime};
use fil_actors_runtime::{
actor_dispatch, actor_error, deserialize_block, extract_send_result, ActorContext,
actor_dispatch, actor_error, deserialize_block, extract_send_result, util, ActorContext,
ActorDowncast, ActorError, AsActorError, BatchReturn, BatchReturnGen, DealWeight,
BURNT_FUNDS_ACTOR_ADDR, INIT_ACTOR_ADDR, REWARD_ACTOR_ADDR, STORAGE_MARKET_ACTOR_ADDR,
STORAGE_POWER_ACTOR_ADDR, SYSTEM_ACTOR_ADDR, VERIFIED_REGISTRY_ACTOR_ADDR,
Expand Down Expand Up @@ -1081,7 +1081,7 @@ impl Actor {

// Validate inputs.
let require_deals = false; // No deals can be specified in new replica update.
let (batch_return, update_sector_infos) = validate_replica_updates(
let (validation_batch, update_sector_infos) = validate_replica_updates(
&updates,
&sector_infos,
&state,
Expand All @@ -1092,11 +1092,12 @@ impl Actor {
require_deals,
params.require_activation_success,
)?;
let valid_unproven_usis = batch_return.successes(&update_sector_infos);
let valid_manifests = batch_return.successes(&params.sector_updates);
let valid_unproven_usis = validation_batch.successes(&update_sector_infos);
let valid_manifests = validation_batch.successes(&params.sector_updates);

// Verify proofs before activating anything.
let mut proven_manifests: Vec<(&SectorUpdateManifest, &SectorOnChainInfo)> = vec![];
let mut proven_batch_gen = BatchReturnGen::new(validation_batch.size());
if !params.sector_proofs.is_empty() {
// Batched proofs, one per sector
if params.sector_updates.len() != params.sector_proofs.len() {
Expand All @@ -1108,6 +1109,9 @@ impl Actor {
));
}

// Note: an alternate factoring here could pull this block out to a separate function,
// return a BatchReturn, and then extract successes from
// valid_unproven_usis and valid_manifests, following the pattern used elsewhere.
for (usi, manifest) in valid_unproven_usis.iter().zip(valid_manifests) {
let proof_inputs = ReplicaUpdateInfo {
update_proof_type: usi.update.update_proof_type,
Expand All @@ -1117,12 +1121,16 @@ impl Actor {
proof: usi.update.replica_proof.clone().into(),
};
match rt.verify_replica_update(&proof_inputs) {
Ok(_) => proven_manifests.push((manifest, usi.sector_info)),
Ok(_) => {
proven_manifests.push((manifest, usi.sector_info));
proven_batch_gen.add_success();
}
Err(e) => {
warn!(
"failed to verify replica update for sector {}: {e}",
usi.sector_info.sector_number
);
proven_batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
if params.require_activation_success {
return Err(actor_error!(
illegal_argument,
Expand All @@ -1139,10 +1147,12 @@ impl Actor {
illegal_argument,
"aggregate update proofs not yet supported"
));
// proven_batch_gen.add_successes(valid_manifests.len());
}
if proven_manifests.is_empty() {
return Err(actor_error!(illegal_argument, "no valid updates"));
}
let proven_batch = proven_batch_gen.gen();

// Activate data and compute CommD.
let data_activation_inputs: Vec<SectorPiecesActivationInput> = proven_manifests
Expand All @@ -1157,11 +1167,11 @@ impl Actor {
.collect();

// Activate data for proven updates.
let (batch_return, data_activations) =
let (data_batch, data_activations) =
activate_sectors_pieces(rt, data_activation_inputs, params.require_activation_success)?;

// Successful data activation is required for sector activation.
let successful_manifests = batch_return.successes(&proven_manifests);
let successful_manifests = data_batch.successes(&proven_manifests);

let mut state_updates_by_dline = BTreeMap::<u64, Vec<ReplicaUpdateStateInputs>>::new();
for ((update, sector_info), data_activation) in
Expand Down Expand Up @@ -1205,7 +1215,8 @@ impl Actor {
}
notify_data_consumers(rt, &notifications, params.require_notification_success)?;

Ok(ProveReplicaUpdates3Return { activation_results: BatchReturn::empty() })
let result = util::stack(&[validation_batch, proven_batch, data_batch]);
Ok(ProveReplicaUpdates3Return { activation_results: result })
}

fn dispute_windowed_post(
Expand Down Expand Up @@ -1764,27 +1775,28 @@ impl Actor {

// Validate pre-commits.
let allow_deals = false; // New onboarding entry point does not allow pre-committed deals.
let (batch_return, proof_inputs) =
let (validation_batch, proof_inputs) =
validate_precommits(rt, &precommits, allow_deals, params.require_activation_success)?;
if batch_return.success_count == 0 {
if validation_batch.success_count == 0 {
return Err(actor_error!(illegal_argument, "no valid precommits specified"));
}
let valid_precommits = batch_return.successes(&precommits);
let valid_activation_inputs = batch_return.successes(&params.sector_activations);
let valid_precommits = validation_batch.successes(&precommits);
let valid_activation_inputs = validation_batch.successes(&params.sector_activations);
let eligible_activation_inputs_iter = valid_activation_inputs.iter().zip(valid_precommits);

// Verify seal proof(s), either batch or aggregate.
let mut proven_activation_inputs: Vec<(
&SectorActivationManifest,
&SectorPreCommitOnChainInfo,
)> = vec![];
let mut proven_batch_gen = BatchReturnGen::new(validation_batch.size());
if !params.sector_proofs.is_empty() {
// Verify batched proofs.
// Filter proof inputs to those for valid pre-commits.
let seal_verify_inputs: Vec<SealVerifyInfo> = batch_return
let seal_verify_inputs: Vec<SealVerifyInfo> = validation_batch
.successes(&proof_inputs)
.iter()
.zip(batch_return.successes(&params.sector_proofs))
.zip(validation_batch.successes(&params.sector_proofs))
.map(|(info, proof)| -> SealVerifyInfo {
info.to_seal_verify_info(miner_id, proof)
})
Expand All @@ -1799,7 +1811,10 @@ impl Actor {
res.iter().zip(eligible_activation_inputs_iter)
{
if *verified {
proven_activation_inputs.push((*activation, precommit))
proven_activation_inputs.push((*activation, precommit));
proven_batch_gen.add_success();
} else {
proven_batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
}
}
if params.require_activation_success
Expand Down Expand Up @@ -1827,7 +1842,9 @@ impl Actor {
proven_activation_inputs = eligible_activation_inputs_iter
.map(|(activation, precommit)| (*activation, precommit))
.collect();
proven_batch_gen.add_successes(validation_batch.size());
}
let proven_batch = proven_batch_gen.gen();

// Activate data and verify CommD matches the declared one.
let data_activation_inputs = proven_activation_inputs
Expand All @@ -1844,11 +1861,11 @@ impl Actor {
.collect();

// Activate data for proven sectors.
let (batch_return, data_activations) =
let (data_batch, data_activations) =
activate_sectors_pieces(rt, data_activation_inputs, params.require_activation_success)?;

// Successful data activation is required for sector activation.
let successful_sector_activations = batch_return.successes(&proven_activation_inputs);
let successful_sector_activations = data_batch.successes(&proven_activation_inputs);
let successful_precommits =
successful_sector_activations.iter().map(|(_, second)| *second).collect();

Expand Down Expand Up @@ -1888,7 +1905,8 @@ impl Actor {
}
notify_data_consumers(rt, &notifications, params.require_notification_success)?;

Ok(ProveCommitSectors2Return { activation_results: BatchReturn::empty() })
let result = util::stack(&[validation_batch, proven_batch, data_batch]);
Ok(ProveCommitSectors2Return { activation_results: result })
}

/// Checks state of the corresponding sector pre-commitment, then schedules the proof to be verified in bulk
Expand Down
145 changes: 144 additions & 1 deletion runtime/src/util/batch_return.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ impl BatchReturn {
Self { success_count: n, fail_codes: Vec::new() }
}

pub fn of(codes: &[ExitCode]) -> Self {
let mut gen = BatchReturnGen::new(codes.len());
for code in codes {
gen.add(*code);
}
gen.gen()
}

pub fn size(&self) -> usize {
self.success_count as usize + self.fail_codes.len()
}
Expand Down Expand Up @@ -86,6 +94,48 @@ impl fmt::Display for BatchReturn {
}
}

/// Computes a batch return that is the result of a sequence of batch returns
/// applied to the previous successful results.
/// Each batch's size() must be equal to the previous batch's success_count.
/// Any fail codes then override the prior stack's successful items,
/// indexed against only those successful items.
/// E.g. stack([OK, E1, OK, E2], [OK, E3], [E4]) => [E4, E1, E3, E2]
pub fn stack(batch_returns: &[BatchReturn]) -> BatchReturn {
if batch_returns.is_empty() {
return BatchReturn::empty();
}
let mut base = batch_returns[0].clone();
for nxt in &batch_returns[1..] {
assert_eq!(
base.success_count as usize,
nxt.size(),
"can't stack batch of {} on batch with {} successes",
nxt.size(),
base.success_count
);
let mut nxt_fail = nxt.fail_codes.iter().peekable();
let mut base_fail = base.fail_codes.iter().peekable();
let mut offset = 0;
let mut new_fail_codes = vec![];
while nxt_fail.peek().is_some() {
let nxt_fail = nxt_fail.next().unwrap();
while base_fail.peek().is_some()
&& base_fail.peek().unwrap().idx <= nxt_fail.idx + offset
{
base_fail.next();
offset += 1;
}
new_fail_codes.push(FailCode { idx: nxt_fail.idx + offset, code: nxt_fail.code })
}
base.fail_codes.extend(new_fail_codes);
base.fail_codes.sort_by(|a, b| a.idx.cmp(&b.idx));
base.success_count = nxt.success_count;
}
assert_eq!(base.size(), batch_returns[0].size());
assert_eq!(base.success_count, batch_returns[batch_returns.len() - 1].success_count);
base
}

pub struct BatchReturnGen {
success_count: usize,
fail_codes: Vec<FailCode>,
Expand All @@ -100,7 +150,11 @@ impl BatchReturnGen {
}

pub fn add_success(&mut self) -> &mut Self {
self.success_count += 1;
self.add_successes(1)
}

pub fn add_successes(&mut self, count: usize) -> &mut Self {
self.success_count += count;
self
}

Expand All @@ -110,6 +164,14 @@ impl BatchReturnGen {
self
}

pub fn add(&mut self, code: ExitCode) -> &mut Self {
if code.is_success() {
self.add_success()
} else {
self.add_fail(code)
}
}

pub fn gen(&self) -> BatchReturn {
assert_eq!(self.expect_count, self.success_count + self.fail_codes.len(), "programmer error, mismatched batch size {} and processed count {} batch return must include success/fail for all inputs", self.expect_count, self.success_count + self.fail_codes.len());
BatchReturn {
Expand All @@ -118,3 +180,84 @@ impl BatchReturnGen {
}
}
}

// Unit tests
#[cfg(test)]
mod test {
use crate::util::batch_return::stack;
use crate::{BatchReturn, FailCode};
use fvm_shared::error::ExitCode;

const OK: ExitCode = ExitCode::OK;
const ERR1: ExitCode = ExitCode::USR_ILLEGAL_ARGUMENT;
const ERR2: ExitCode = ExitCode::USR_NOT_FOUND;
const ERR3: ExitCode = ExitCode::USR_FORBIDDEN;

///// Tests for stacking batch returns. /////

#[test]
fn test_stack_empty() {
let batch_returns = vec![];
let stacked = stack(&batch_returns);
assert_eq!(stacked.success_count, 0);
assert_eq!(Vec::<FailCode>::new(), stacked.fail_codes);
}

#[test]
fn test_stack_single() {
assert_stack(&[], &[]);
assert_stack(&[OK], &[&[OK]]);
assert_stack(&[ERR1], &[&[ERR1]]);
assert_stack(&[ERR1, OK, ERR2], &[&[ERR1, OK, ERR2]]);
}

#[test]
fn test_stack_overwrites() {
assert_stack(&[OK], &[&[OK], &[OK]]);
assert_stack(&[ERR1], &[&[OK], &[ERR1]]);

assert_stack(&[OK, ERR1], &[&[OK, OK], &[OK, ERR1]]);
assert_stack(&[ERR1, ERR2], &[&[OK, OK], &[ERR1, ERR2]]);
}

#[test]
fn test_stack_offsets() {
assert_stack(&[ERR1], &[&[ERR1], &[]]);
assert_stack(&[ERR1, ERR2], &[&[ERR1, ERR2], &[]]);

assert_stack(&[ERR2, ERR1], &[&[OK, ERR1], &[ERR2]]);
assert_stack(&[ERR1, ERR2], &[&[ERR1, OK], &[ERR2]]);

assert_stack(&[ERR2, ERR1], &[&[OK, OK], &[OK, ERR1], &[ERR2]]);
assert_stack(&[ERR1, ERR2], &[&[OK, OK], &[ERR1, OK], &[ERR2]]);

assert_stack(&[OK, ERR1, OK], &[&[OK, ERR1, OK], &[OK, OK]]);
assert_stack(&[ERR2, ERR1, OK], &[&[OK, ERR1, OK], &[ERR2, OK]]);
assert_stack(&[OK, ERR1, ERR2], &[&[OK, ERR1, OK], &[OK, ERR2]]);
assert_stack(&[ERR1, ERR2, OK], &[&[ERR1, OK, OK], &[ERR2, OK]]);
assert_stack(&[ERR1, OK, ERR2], &[&[ERR1, OK, OK], &[OK, ERR2]]);
assert_stack(&[ERR3, ERR1, ERR2], &[&[OK, ERR1, OK], &[ERR3, ERR2]]);

assert_stack(
&[ERR1, ERR1, ERR1, ERR3, ERR2, ERR3],
&[&[ERR1, ERR1, ERR1, OK, ERR2, OK], &[ERR3, ERR3]],
);

assert_stack(
&[ERR1, ERR3, ERR2, OK, ERR1, ERR3],
&[&[ERR1, OK, ERR2, OK, ERR1, OK], &[ERR3, OK, ERR3]],
);

assert_stack(
&[ERR2, ERR1, OK, ERR3, ERR2],
&[&[OK; 5], &[OK, ERR1, OK, OK, OK], &[ERR2, OK, OK, ERR2], &[OK, ERR3]],
);
}

fn assert_stack(expected: &[ExitCode], stacked: &[&[ExitCode]]) {
let expected = BatchReturn::of(expected);
let batches: Vec<BatchReturn> = stacked.iter().map(|b| BatchReturn::of(b)).collect();
let stacked = stack(&batches);
assert_eq!(expected, stacked);
}
}
4 changes: 1 addition & 3 deletions runtime/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

pub use self::batch_return::BatchReturn;
pub use self::batch_return::BatchReturnGen;
pub use self::batch_return::FailCode;
pub use self::batch_return::*;
pub use self::downcast::*;
pub use self::map::*;
pub use self::mapmap::MapMap;
Expand Down

0 comments on commit 18940d8

Please sign in to comment.