Skip to content

Commit

Permalink
factor batch activate deals and claim allocations for reuse in replic…
Browse files Browse the repository at this point in the history
…a_updates
  • Loading branch information
alexytsu committed May 31, 2023
1 parent 8482a5b commit df56d14
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 121 deletions.
6 changes: 3 additions & 3 deletions actors/miner/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,16 @@ pub mod verifreg {
pub all_or_nothing: bool,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct SectorAllocationClaimResult {
#[serde(with = "bigint_ser")]
pub claimed_space: BigInt,
pub sector: SectorNumber,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
#[serde(transparent)]
pub struct ClaimAllocationsReturn {
pub batch_info: BatchReturn,
pub claim_results: Vec<SectorAllocationClaimResult>,
}
}
183 changes: 94 additions & 89 deletions actors/miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4797,13 +4797,17 @@ fn confirm_sector_proofs_valid_internal(
// a constant number of them.
let activation = rt.curr_epoch();

let activated_sectors: Vec<(SectorPreCommitOnChainInfo, ext::market::DealSpaces)> =
match batch_activate_deals_and_claim_allocations(rt, pre_commits, false)? {
None => {
return Err(actor_error!(illegal_argument, "all prove commits failed to validate"));
}
Some(activated_sectors) => activated_sectors,
};
let deals_activation_infos = pre_commits
.iter()
.map(|pc| DealsActivationInfo {
deal_ids: pc.info.deal_ids.clone(),
sector_expiry: pc.info.expiration,
sector_number: pc.info.sector_number,
})
.collect();

let activated_sectors =
batch_activate_deals_and_claim_allocations(rt, deals_activation_infos, false)?;

let (total_pledge, newly_vested) = rt.transaction(|state: &mut State, rt| {
let policy = rt.policy();
Expand All @@ -4815,7 +4819,13 @@ fn confirm_sector_proofs_valid_internal(
let mut new_sectors = Vec::<SectorOnChainInfo>::new();
let mut total_pledge = TokenAmount::zero();

for (pre_commit, deal_spaces) in activated_sectors {
for (pre_commit, deal_spaces) in pre_commits.iter().zip(activated_sectors) {
// skip sectors that weren't activated
if deal_spaces.is_none() {
continue;
}
let deal_spaces = deal_spaces.unwrap();

// compute initial pledge
let duration = pre_commit.info.expiration - activation;

Expand Down Expand Up @@ -4870,7 +4880,7 @@ fn confirm_sector_proofs_valid_internal(
sector_number: pre_commit.info.sector_number,
seal_proof: pre_commit.info.seal_proof,
sealed_cid: pre_commit.info.sealed_cid,
deal_ids: pre_commit.info.deal_ids,
deal_ids: pre_commit.info.deal_ids.clone(),
expiration: pre_commit.info.expiration,
activation,
deal_weight,
Expand Down Expand Up @@ -4948,24 +4958,23 @@ fn confirm_sector_proofs_valid_internal(

fn batch_activate_deals_and_claim_allocations(
rt: &impl Runtime,
pre_commits: Vec<SectorPreCommitOnChainInfo>,
activation_infos: Vec<DealsActivationInfo>,
all_or_nothing: bool,
) -> Result<Option<Vec<(SectorPreCommitOnChainInfo, ext::market::DealSpaces)>>, ActorError> {
// non-verified deal space
) -> Result<Vec<Option<ext::market::DealSpaces>>, ActorError> {
// Fills with ActivateDealResults per DealActivationInfo
// ActivateDeals implicitly succeeds when no deal_ids are specified
// If deal activation fails, a None is recorded in place
let mut activation_results = Vec::new();

// TODO: https://github.com/filecoin-project/builtin-actors/pull/1303 enable activation batching
for pre_commit in pre_commits {
for activation_info in activation_infos.clone() {
// Check (and activate) storage deals associated to sector
let deal_ids = pre_commit.info.deal_ids.clone();
let deal_ids = activation_info.deal_ids.clone();
if deal_ids.is_empty() {
activation_results.push((
pre_commit,
ActivateDealsResult {
nonverified_deal_space: BigInt::default(),
verified_infos: Vec::default(),
},
));
activation_results.push(Some(ActivateDealsResult {
nonverified_deal_space: BigInt::default(),
verified_infos: Vec::default(),
}));
continue;
}

Expand All @@ -4974,37 +4983,47 @@ fn batch_activate_deals_and_claim_allocations(
ext::market::ACTIVATE_DEALS_METHOD,
IpldBlock::serialize_cbor(&ext::market::ActivateDealsParams {
deal_ids,
sector_expiry: pre_commit.info.expiration,
sector_expiry: activation_info.sector_expiry,
})?,
TokenAmount::zero(),
));
let activate_res: ext::market::ActivateDealsResult = match activate_raw {
Ok(res) => deserialize_block(res)?,
Err(e) => {
info!(
"error activating deals on sector {}: {}",
pre_commit.info.sector_number,
e.msg()
);
if all_or_nothing {
return Ok(None);
return Err(actor_error!(
illegal_argument,
"failed to activate {:?} but all_or_nothing was true",
activation_info
));
} else {
info!(
"error activating deals on sector {}: {}",
activation_info.sector_number,
e.msg()
);
activation_results.push(None);
continue;
}
}
};

activation_results.push((pre_commit, activate_res));
activation_results.push(Some(activate_res));
}

// When all prove commits have failed abort early
if activation_results.is_empty() {
return Err(actor_error!(illegal_argument, "all sectors failed to activate"));
}

// Claims aggregated across all sectors
// Flattened claims across all sectors
let mut sectors_claims = Vec::new();
for (pre_commit, activate_res) in activation_results.clone() {
for (activation_info, activate_res) in activation_infos.iter().zip(activation_results.clone()) {
if activate_res.is_none() {
continue;
}
let activate_res = activate_res.unwrap();

let mut sector_claims = activate_res
.verified_infos
.iter()
Expand All @@ -5013,72 +5032,58 @@ fn batch_activate_deals_and_claim_allocations(
allocation_id: info.allocation_id,
data: info.data,
size: info.size,
sector: pre_commit.info.sector_number,
sector_expiry: pre_commit.info.expiration,
sector: activation_info.sector_number,
sector_expiry: activation_info.sector_expiry,
})
.collect();
sectors_claims.append(&mut sector_claims);
}

if sectors_claims.is_empty() {
// TODO: collapse this into the other branch. have an early conditional to determine whether sending to the
// verifreg actor is necessary
Ok(Some(
activation_results
.iter()
.map(|(pre_commit, activation_result)| {
(
pre_commit.clone(),
ext::market::DealSpaces {
verified_deal_space: BigInt::default(),
deal_space: activation_result.nonverified_deal_space.clone(),
},
)
})
.collect(),
))
} else {
let claim_raw = extract_send_result(rt.send_simple(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsParams {
allocations: sectors_claims,
all_or_nothing: true,
})?,
TokenAmount::zero(),
));
let claim_res = match sectors_claims.is_empty() {
true => Vec::default(),
false => {
let claim_raw = extract_send_result(rt.send_simple(
&VERIFIED_REGISTRY_ACTOR_ADDR,
ext::verifreg::CLAIM_ALLOCATIONS_METHOD,
IpldBlock::serialize_cbor(&ext::verifreg::ClaimAllocationsParams {
allocations: sectors_claims,
all_or_nothing: true,
})?,
TokenAmount::zero(),
));

let claim_res: ext::verifreg::ClaimAllocationsReturn = match claim_raw {
Ok(res) => deserialize_block(res)?,
Err(e) => {
info!("error claiming allocations for batch {}", e.msg());
return Ok(None);
}
};
let claim_res: ext::verifreg::ClaimAllocationsReturn = match claim_raw {
Ok(res) => deserialize_block(res)?,
Err(e) => {
info!("error claiming allocations for batch {}", e.msg());
return Err(actor_error!(
illegal_argument,
"failed to claim allocation during batch activation: {}",
e.msg()
));
}
};

let activation_and_claim_results: Vec<(
SectorPreCommitOnChainInfo,
ext::market::DealSpaces,
)> = activation_results
.iter()
.map(|(pre_commit, activation_result)| {
let verified_deal_space = claim_res
.claim_results
.iter()
.filter(|c| pre_commit.info.sector_number == c.sector)
.fold(BigInt::zero(), |acc, c| acc + c.claimed_space.clone());
(
pre_commit.clone(),
ext::market::DealSpaces {
verified_deal_space,
deal_space: activation_result.nonverified_deal_space.clone(),
},
)
})
.collect();
claim_res.claim_results
}
};

Ok(Some(activation_and_claim_results))
}
let mut claim_res = claim_res.iter();

// reassociate the verified claims within each sector to the original activation requests
let activation_and_claim_results: Vec<Option<ext::market::DealSpaces>> = activation_results
.iter()
.map(|activation_res| {
activation_res.as_ref().map(|res| ext::market::DealSpaces {
verified_deal_space: claim_res
.by_ref()
.take(res.verified_infos.len())
.fold(BigInt::zero(), |acc, claim_res| acc + claim_res.claimed_space.clone()),
deal_space: res.nonverified_deal_space.clone(),
})
})
.collect();
Ok(activation_and_claim_results)
}

// activate deals with builtin market and claim allocations with verified registry actor
Expand Down
7 changes: 7 additions & 0 deletions actors/miner/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ pub struct DisputeWindowedPoStParams {
pub post_index: u64, // only one is allowed at a time to avoid loading too many sector infos.
}

#[derive(Debug, Clone)]
pub struct DealsActivationInfo {
pub deal_ids: Vec<DealID>,
pub sector_expiry: ChainEpoch,
pub sector_number: SectorNumber,
}

#[derive(Debug, Clone, Serialize_tuple, Deserialize_tuple)]
pub struct ProveCommitAggregateParams {
pub sector_numbers: BitField,
Expand Down
8 changes: 2 additions & 6 deletions actors/miner/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use fil_actor_miner::ext::verifreg::{
};

use fil_actors_runtime::runtime::{DomainSeparationTag, Policy, Runtime, RuntimePolicy};
use fil_actors_runtime::{test_utils::*, BatchReturn, BatchReturnGen};
use fil_actors_runtime::{test_utils::*, BatchReturnGen};
use fil_actors_runtime::{
ActorDowncast, ActorError, Array, DealWeight, MessageAccumulator, BURNT_FUNDS_ACTOR_ADDR,
INIT_ACTOR_ADDR, REWARD_ACTOR_ADDR, STORAGE_MARKET_ACTOR_ADDR, STORAGE_POWER_ACTOR_ADDR,
Expand Down Expand Up @@ -1126,13 +1126,9 @@ impl ActorHarness {
// use exit code map for claim allocations in config

let claim_allocs_ret = ClaimAllocationsReturn {
batch_info: BatchReturn::ok(sectors_claims.len() as u32),
claim_results: sectors_claims
.iter()
.map(|claim| SectorAllocationClaimResult {
claimed_space: claim.size.0.into(),
sector: claim.sector,
})
.map(|claim| SectorAllocationClaimResult { claimed_space: claim.size.0.into() })
.collect(),
};
rt.expect_send_simple(
Expand Down
19 changes: 11 additions & 8 deletions actors/verifreg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ impl Actor {
// Called by storage provider actor to claim allocations for data provably committed to storage.
// For each allocation claim, the registry checks that the provided piece CID
// and size match that of the allocation.
// When all_or_nothing is false, returns a vec of claimed spaces parallel to the requested
// allocations where failed allocations are represented by DataCap::zero
pub fn claim_allocations(
rt: &impl Runtime,
params: ClaimAllocationsParams,
Expand All @@ -375,7 +377,7 @@ impl Actor {
if params.allocations.is_empty() {
return Err(actor_error!(illegal_argument, "claim allocations called with no claims"));
}
let mut datacap_claimed = DataCap::zero();
let mut total_datacap_claimed = DataCap::zero();
let mut ret_gen = BatchReturnGen::new(params.allocations.len());
let all_or_nothing = params.all_or_nothing;
let mut sector_claims = Vec::new();
Expand All @@ -396,6 +398,7 @@ impl Actor {
"no allocation {} for client {}",
claim_alloc.allocation_id, claim_alloc.client,
);
sector_claims.push(SectorAllocationClaimResult::default());
continue;
}
Some(a) => a,
Expand All @@ -407,6 +410,7 @@ impl Actor {
"invalid sector {:?} for allocation {}",
claim_alloc.sector, claim_alloc.allocation_id,
);
sector_claims.push(SectorAllocationClaimResult::default());
continue;
}

Expand Down Expand Up @@ -434,6 +438,7 @@ impl Actor {
"claim for allocation {} could not be inserted as it already exists",
claim_alloc.allocation_id,
);
sector_claims.push(SectorAllocationClaimResult::default());
continue;
}

Expand All @@ -442,12 +447,10 @@ impl Actor {
format!("failed to remove allocation {}", claim_alloc.allocation_id),
)?;

datacap_claimed += DataCap::from(claim_alloc.size.0);
total_datacap_claimed += DataCap::from(claim_alloc.size.0);
ret_gen.add_success();
sector_claims.push(SectorAllocationClaimResult {
claimed_space: claim_alloc.size.0.into(),
sector: claim_alloc.sector,
});
sector_claims
.push(SectorAllocationClaimResult { claimed_space: claim_alloc.size.0.into() });
}
st.save_allocs(&mut allocs)?;
st.save_claims(&mut claims)?;
Expand All @@ -464,9 +467,9 @@ impl Actor {
}

// Burn the datacap tokens from verified registry's own balance.
burn(rt, &datacap_claimed)?;
burn(rt, &total_datacap_claimed)?;

Ok(ClaimAllocationsReturn { batch_info, claim_results: sector_claims })
Ok(ClaimAllocationsReturn { claim_results: sector_claims })
}

// get claims for a provider
Expand Down
Loading

0 comments on commit df56d14

Please sign in to comment.