Skip to content

Commit

Permalink
Merge branch 'master' into rvagg/dealids-vec-alias
Browse files Browse the repository at this point in the history
  • Loading branch information
anorth authored Feb 1, 2024
2 parents f3794ad + a96db01 commit f17131a
Show file tree
Hide file tree
Showing 51 changed files with 2,043 additions and 477 deletions.
76 changes: 76 additions & 0 deletions actors/market/src/emit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use fil_actors_runtime::runtime::Runtime;
use fil_actors_runtime::{ActorError, EventBuilder};
use fvm_shared::deal::DealID;
use fvm_shared::ActorID;

/// Indicates a deal has been published.
pub fn deal_published(
rt: &impl Runtime,
client: ActorID,
provider: ActorID,
deal_id: DealID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-published")
.with_parties(deal_id, client, provider)
.build()?,
)
}

/// Indicates a deal has been activated.
pub fn deal_activated(
rt: &impl Runtime,
deal_id: DealID,
client: ActorID,
provider: ActorID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-activated")
.with_parties(deal_id, client, provider)
.build()?,
)
}

/// Indicates a deal has been terminated.
pub fn deal_terminated(
rt: &impl Runtime,
deal_id: DealID,
client: ActorID,
provider: ActorID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-terminated")
.with_parties(deal_id, client, provider)
.build()?,
)
}

/// Indicates a deal has been completed successfully.
pub fn deal_completed(
rt: &impl Runtime,
deal_id: DealID,
client: ActorID,
provider: ActorID,
) -> Result<(), ActorError> {
rt.emit_event(
&EventBuilder::new()
.typ("deal-completed")
.with_parties(deal_id, client, provider)
.build()?,
)
}

trait WithParties {
fn with_parties(self, id: DealID, client: ActorID, provider: ActorID) -> EventBuilder;
}

impl WithParties for EventBuilder {
fn with_parties(self, id: DealID, client: ActorID, provider: ActorID) -> EventBuilder {
self.field_indexed("id", &id)
.field_indexed("client", &client)
.field_indexed("provider", &provider)
}
}
101 changes: 78 additions & 23 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub mod policy;
pub mod testing;

mod deal;
mod emit;
mod state;
mod types;

Expand Down Expand Up @@ -470,19 +471,26 @@ impl Actor {
// notify clients, any failures cause the entire publish_storage_deals method to fail
// it's unsafe to ignore errors here, since that could be used to attack storage contract clients
// that might be unaware they're making storage deals
for (i, valid_deal) in valid_deals.iter().enumerate() {
for (valid_deal, &deal_id) in valid_deals.iter().zip(&new_deal_ids) {
_ = extract_send_result(rt.send_simple(
&valid_deal.proposal.client,
MARKET_NOTIFY_DEAL_METHOD,
IpldBlock::serialize_cbor(&MarketNotifyDealParams {
proposal: valid_deal.serialized_proposal.to_vec(),
deal_id: new_deal_ids[i],
deal_id,
})?,
TokenAmount::zero(),
))
.with_context_code(ExitCode::USR_ILLEGAL_ARGUMENT, || {
format!("failed to notify deal with proposal cid {}", valid_deal.cid)
})?;

emit::deal_published(
rt,
valid_deal.proposal.client.id().unwrap(),
valid_deal.proposal.provider.id().unwrap(),
deal_id,
)?;
}

Ok(PublishStorageDealsReturn { ids: new_deal_ids, valid_deals: valid_input_bf })
Expand Down Expand Up @@ -555,7 +563,7 @@ impl Actor {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
let mut activations: Vec<SectorDealActivation> = vec![];
let mut activated_deals = BTreeSet::<DealID>::new();
let mut activated_deals: HashSet<DealID> = HashSet::new();
let mut sectors_deals: Vec<(SectorNumber, Vec<DealID>)> = vec![];

'sector: for sector in params.sectors {
Expand Down Expand Up @@ -598,8 +606,7 @@ impl Actor {
validated_proposals.push(proposal);
}

let mut verified_infos = vec![];
let mut nonverified_deal_space = BigInt::zero();
let mut activated = vec![];
// Given that all deals validated, prepare the state updates for them all.
// There's no continue below here to ensure updates are consistent.
// Any error must abort.
Expand All @@ -609,16 +616,12 @@ impl Actor {
let alloc_id =
pending_deal_allocation_ids.delete(deal_id)?.unwrap_or(NO_ALLOCATION_ID);

if alloc_id != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: alloc_id,
data: proposal.piece_cid,
size: proposal.piece_size,
})
} else {
nonverified_deal_space += proposal.piece_size.0;
}
activated.push(ActivatedDeal {
client: proposal.client.id().unwrap(),
allocation_id: alloc_id,
data: proposal.piece_cid,
size: proposal.piece_size,
});

// Prepare initial deal state.
deal_states.push((
Expand All @@ -644,6 +647,14 @@ impl Actor {
verified_infos,
unsealed_cid: data_commitment,
});
for (deal_id, proposal) in sector.deal_ids.iter().zip(&validated_proposals) {
emit::deal_activated(
rt,
*deal_id,
proposal.client.id().unwrap(),
proposal.provider.id().unwrap(),
)?;
}
batch_gen.add_success();
}

Expand Down Expand Up @@ -842,6 +853,13 @@ impl Actor {
.entry(state.sector_number)
.or_default()
.push(id);

emit::deal_terminated(
rt,
id,
deal.client.id().unwrap(),
deal.provider.id().unwrap(),
)?;
}

st.remove_sector_deal_ids(rt.store(), &provider_deals_to_remove)?;
Expand Down Expand Up @@ -925,13 +943,14 @@ impl Actor {
// https://github.com/filecoin-project/builtin-actors/issues/1389
// handling of legacy deals is still done in cron. we handle such deals here and continue to
// reschedule them. eventually, all legacy deals will expire and the below code can be removed.
let (slash_amount, _payment_amount, remove_deal) = st.process_deal_update(
rt.store(),
&state,
&deal_proposal,
&dcid,
curr_epoch,
)?;
let (slash_amount, _payment_amount, completed, remove_deal) = st
.process_deal_update(
rt.store(),
&state,
&deal_proposal,
&dcid,
curr_epoch,
)?;

if remove_deal {
// TODO: remove handling for terminated-deal slashing when marked-for-termination deals are all processed
Expand All @@ -948,6 +967,15 @@ impl Actor {
.entry(state.sector_number)
.or_default()
.push(deal_id);

if !completed {
emit::deal_terminated(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
} else {
if !slash_amount.is_zero() {
return Err(actor_error!(
Expand All @@ -971,6 +999,15 @@ impl Actor {
);
new_updates_scheduled.entry(next_epoch).or_default().push(deal_id);
}

if completed {
emit::deal_completed(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
}
epochs_completed.push(i);
}
Expand Down Expand Up @@ -1246,7 +1283,7 @@ impl Actor {
));
}

let (_, payment_amount, remove_deal) = match st.process_deal_update(
let (_, payment_amount, completed, remove_deal) = match st.process_deal_update(
rt.store(),
&deal_state,
&deal_proposal,
Expand All @@ -1268,6 +1305,15 @@ impl Actor {
.entry(deal_state.sector_number)
.or_default()
.push(deal_id);

if !completed {
emit::deal_terminated(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
} else {
deal_state.last_updated_epoch = curr_epoch;
new_deal_states.push((deal_id, deal_state));
Expand All @@ -1278,6 +1324,15 @@ impl Actor {
payment: payment_amount,
});
batch_gen.add_success();

if completed {
emit::deal_completed(
rt,
deal_id,
deal_proposal.client.id().unwrap(),
deal_proposal.provider.id().unwrap(),
)?;
}
}

st.put_deal_states(rt.store(), &new_deal_states)?;
Expand Down
9 changes: 5 additions & 4 deletions actors/market/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ impl State {
(
/* slash_amount */ TokenAmount,
/* payment_amount */ TokenAmount,
/* is_deal_completed */ bool,
/* remove */ bool,
),
ActorError,
Expand Down Expand Up @@ -851,7 +852,7 @@ impl State {

// this is a safe no-op but can happen if a storage provider calls settle_deal_payments too early
if deal.start_epoch > epoch {
return Ok((TokenAmount::zero(), TokenAmount::zero(), false));
return Ok((TokenAmount::zero(), TokenAmount::zero(), false, false));
}

let payment_end_epoch = if ever_slashed {
Expand Down Expand Up @@ -913,15 +914,15 @@ impl State {
self.slash_balance(store, &deal.provider, &slashed, Reason::ProviderCollateral)
.context("slashing balance")?;

return Ok((slashed, payment_remaining + elapsed_payment, true));
return Ok((slashed, payment_remaining + elapsed_payment, false, true));
}

if epoch >= deal.end_epoch {
self.process_deal_expired(store, deal, state)?;
return Ok((TokenAmount::zero(), elapsed_payment, true));
return Ok((TokenAmount::zero(), elapsed_payment, true, true));
}

Ok((TokenAmount::zero(), elapsed_payment, false))
Ok((TokenAmount::zero(), elapsed_payment, false, false))
}

pub fn process_slashed_deal<BS>(
Expand Down
13 changes: 5 additions & 8 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,20 @@ pub struct BatchActivateDealsParams {
pub compute_cid: bool,
}

// Information about a verified deal that has been activated.
// Information about a deal that has been activated.
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct VerifiedDealInfo {
pub struct ActivatedDeal {
pub client: ActorID,
pub allocation_id: AllocationID,
pub allocation_id: AllocationID, // NO_ALLOCATION_ID for unverified deals.
pub data: Cid,
pub size: PaddedPieceSize,
}

// Information about a sector-grouping of deals that have been activated.
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct SectorDealActivation {
/// The total size of the non-verified deals activated.
#[serde(with = "bigint_ser")]
pub nonverified_deal_space: BigInt,
/// Information about each verified deal activated.
pub verified_infos: Vec<VerifiedDealInfo>,
/// Information about each deal activated.
pub activated: Vec<ActivatedDeal>,
/// Unsealed CID computed from the deals specified for the sector.
/// A None indicates no deals were specified, or the computation was not requested.
pub unsealed_cid: Option<Cid>,
Expand Down
1 change: 1 addition & 0 deletions actors/market/tests/activate_deal_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ fn assert_activation_failure(
deal_ids: vec![deal_id],
}],
false,
&[],
)
.unwrap();
let res: BatchActivateDealsResult =
Expand Down
9 changes: 5 additions & 4 deletions actors/market/tests/batch_activate_deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ fn sectors_fail_and_succeed_independently_during_batch_activation() {
},
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[id_4]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -227,7 +227,7 @@ fn handles_sectors_empty_of_deals_gracefully() {
SectorDeals { sector_number: 3, deal_ids: vec![], sector_type, sector_expiry: END_EPOCH },
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[id_1]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -273,7 +273,7 @@ fn fails_to_activate_single_sector_duplicate_deals() {
sector_expiry: END_EPOCH,
},
];
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down Expand Up @@ -328,7 +328,8 @@ fn fails_to_activate_cross_sector_duplicate_deals() {
},
];

let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false).unwrap();
let res =
batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals, false, &[id_1, id_3]).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

Expand Down
Loading

0 comments on commit f17131a

Please sign in to comment.