Skip to content

Commit

Permalink
Explicit deal settlement in built-in market (#1377)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexytsu authored and anorth committed Jan 25, 2024
1 parent cae0fd4 commit 21baa60
Show file tree
Hide file tree
Showing 25 changed files with 1,918 additions and 783 deletions.
312 changes: 215 additions & 97 deletions actors/market/src/lib.rs

Large diffs are not rendered by default.

177 changes: 166 additions & 11 deletions actors/market/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019-2022 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::cmp::{max, min};
use std::collections::BTreeMap;

use cid::Cid;
Expand Down Expand Up @@ -728,22 +729,125 @@ impl State {
Ok(())
}

/// Delete proposal and state simultaneously.
pub fn remove_completed_deal<BS>(
&mut self,
store: &BS,
deal_id: DealID,
) -> Result<(), ActorError>
where
BS: Blockstore,
{
let state = self.remove_deal_state(store, deal_id)?;
if state.is_none() {
return Err(actor_error!(illegal_state, "failed to delete deal state: does not exist"));
}
let proposal = self.remove_proposal(store, deal_id)?;
if proposal.is_none() {
return Err(actor_error!(
illegal_state,
"failed to delete deal proposal: does not exist"
));
}
Ok(())
}

/// Given a DealProposal, checks that the corresponding deal has activated
/// If not, checks that the deal is past its activation epoch and performs cleanup
pub fn get_active_deal_or_process_timeout<BS>(
&mut self,
store: &BS,
curr_epoch: ChainEpoch,
deal_id: DealID,
deal_proposal: &DealProposal,
dcid: &Cid,
) -> Result<LoadDealState, ActorError>
where
BS: Blockstore,
{
let deal_state = self.find_deal_state(store, deal_id)?;

match deal_state {
Some(deal_state) => Ok(LoadDealState::Loaded(deal_state)),
None => {
// deal_id called too early
if curr_epoch < deal_proposal.start_epoch {
return Ok(LoadDealState::TooEarly);
}

// if not activated, the proposal has timed out
let slashed = self.process_deal_init_timed_out(store, deal_proposal)?;

// delete the proposal (but not state, which doesn't exist)
let deleted = self.remove_proposal(store, deal_id)?;
if deleted.is_none() {
return Err(actor_error!(
illegal_state,
format!(
"failed to delete deal {} proposal {}: does not exist",
deal_id, dcid
)
));
}

// delete pending deal cid
self.remove_pending_deal(store, *dcid)?.ok_or_else(|| {
actor_error!(
illegal_state,
format!(
"failed to delete pending deal {}: cid {} does not exist",
deal_id, dcid
)
)
})?;

// delete pending deal allocation id (if present)
self.remove_pending_deal_allocation_id(store, deal_id)?;

Ok(LoadDealState::ProposalExpired(slashed))
}
}
}

////////////////////////////////////////////////////////////////////////////////
// Deal state operations
////////////////////////////////////////////////////////////////////////////////

// TODO: change return value when marked-for-termination sectors are cleared from state
// https://github.com/filecoin-project/builtin-actors/issues/1388
// drop slash_amount, bool return value indicates a completed deal
pub fn process_deal_update<BS>(
&mut self,
store: &BS,
state: &DealState,
deal: &DealProposal,
deal_cid: &Cid,
epoch: ChainEpoch,
) -> Result<(TokenAmount, bool), ActorError>
) -> Result<
(
/* slash_amount */ TokenAmount,
/* payment_amount */ TokenAmount,
/* remove */ bool,
),
ActorError,
>
where
BS: Blockstore,
{
let ever_updated = state.last_updated_epoch != EPOCH_UNDEFINED;

// seeing a slashed deal here will eventually be an unreachable state
// during the transition to synchronous deal termination there may be marked-for-termination
// deals that have not been processed in cron yet
// https://github.com/filecoin-project/builtin-actors/issues/1388
// TODO: remove this and calculations below that assume deals can be slashed
let ever_slashed = state.slash_epoch != EPOCH_UNDEFINED;

if !ever_updated {
// pending deal might have been removed by manual settlement or cron so we don't care if it's missing
self.remove_pending_deal(store, *deal_cid)?;
}

// if the deal was ever updated, make sure it didn't happen in the future
if ever_updated && state.last_updated_epoch > epoch {
return Err(actor_error!(
Expand All @@ -753,10 +857,9 @@ impl State {
));
}

// This would be the case that the first callback somehow triggers before it is scheduled to
// This is expected not to be able to happen
// this is a safe no-op but can happen if a storage provider calls settle_deal_payments too early
if deal.start_epoch > epoch {
return Ok((TokenAmount::zero(), false));
return Ok((TokenAmount::zero(), TokenAmount::zero(), false));
}

let payment_end_epoch = if ever_slashed {
Expand Down Expand Up @@ -788,11 +891,14 @@ impl State {

let num_epochs_elapsed = payment_end_epoch - payment_start_epoch;

let total_payment = &deal.storage_price_per_epoch * num_epochs_elapsed;
if total_payment.is_positive() {
self.transfer_balance(store, &deal.client, &deal.provider, &total_payment)?;
let elapsed_payment = &deal.storage_price_per_epoch * num_epochs_elapsed;
if elapsed_payment.is_positive() {
self.transfer_balance(store, &deal.client, &deal.provider, &elapsed_payment)?;
}

// TODO: remove handling of terminated deals *after* transition to synchronous deal termination
// at that point, this function can be modified to return a bool only, indicating whether the deal is completed
// https://github.com/filecoin-project/builtin-actors/issues/1388
if ever_slashed {
// unlock client collateral and locked storage fee
let payment_remaining = deal_get_payment_remaining(deal, state.slash_epoch)?;
Expand All @@ -815,14 +921,57 @@ impl State {
self.slash_balance(store, &deal.provider, &slashed, Reason::ProviderCollateral)
.context("slashing balance")?;

return Ok((slashed, true));
return Ok((slashed, payment_remaining + elapsed_payment, true));
}

if epoch >= deal.end_epoch {
self.process_deal_expired(store, deal, state)?;
return Ok((TokenAmount::zero(), true));
return Ok((TokenAmount::zero(), elapsed_payment, true));
}

Ok((TokenAmount::zero(), elapsed_payment, false))
}

pub fn process_slashed_deal<BS>(
&mut self,
store: &BS,
proposal: &DealProposal,
state: &DealState,
) -> Result<TokenAmount, ActorError>
where
BS: Blockstore,
{
// make payments for epochs until termination
let payment_start_epoch = max(proposal.start_epoch, state.last_updated_epoch);
let payment_end_epoch = min(proposal.end_epoch, state.slash_epoch);
let num_epochs_elapsed = max(0, payment_end_epoch - payment_start_epoch);
let total_payment = &proposal.storage_price_per_epoch * num_epochs_elapsed;
if total_payment.is_positive() {
self.transfer_balance(store, &proposal.client, &proposal.provider, &total_payment)?;
}
Ok((TokenAmount::zero(), false))

// unlock client collateral and locked storage fee
let payment_remaining = deal_get_payment_remaining(proposal, state.slash_epoch)?;

// Unlock remaining storage fee
self.unlock_balance(store, &proposal.client, &payment_remaining, Reason::ClientStorageFee)
.context("unlocking client storage fee")?;

// Unlock client collateral
self.unlock_balance(
store,
&proposal.client,
&proposal.client_collateral,
Reason::ClientCollateral,
)
.context("unlocking client collateral")?;

// slash provider collateral
let slashed = proposal.provider_collateral.clone();
self.slash_balance(store, &proposal.provider, &slashed, Reason::ProviderCollateral)
.context("slashing balance")?;

Ok(slashed)
}

/// Deal start deadline elapsed without appearing in a proven sector.
Expand Down Expand Up @@ -1045,7 +1194,13 @@ impl State {
}
}

fn deal_get_payment_remaining(
pub enum LoadDealState {
TooEarly,
ProposalExpired(/* slashed_amount */ TokenAmount),
Loaded(DealState),
}

pub fn deal_get_payment_remaining(
deal: &DealProposal,
mut slash_epoch: ChainEpoch,
) -> Result<TokenAmount, ActorError> {
Expand Down
15 changes: 9 additions & 6 deletions actors/market/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,15 @@ pub fn check_state_invariants<BS: Blockstore>(

deal_op_epoch_count += 1;

deal_ops.for_each(epoch, |deal_id| {
acc.require(proposal_stats.contains_key(&deal_id), format!("deal op found for deal id {deal_id} with missing proposal at epoch {epoch}"));
expected_deal_ops.remove(&deal_id);
deal_op_count += 1;
Ok(())
}).map_err(|e| anyhow::anyhow!("error iterating deal ops for epoch {}: {}", epoch, e))
deal_ops
.for_each(epoch, |deal_id| {
expected_deal_ops.remove(&deal_id);
deal_op_count += 1;
Ok(())
})
.map_err(|e| {
anyhow::anyhow!("error iterating deal ops for epoch {}: {}", epoch, e)
})
});
acc.require_no_error(ret, "error iterating all deal ops");
}
Expand Down
22 changes: 22 additions & 0 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,25 @@ pub struct MarketNotifyDealParams {
pub proposal: Vec<u8>,
pub deal_id: u64,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone)]
#[serde(transparent)]
pub struct SettleDealPaymentsParams {
pub deal_ids: BitField,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct SettleDealPaymentsReturn {
/// Indicators of success or failure for each deal
pub results: BatchReturn,
/// Results for the deals that succesfully settled
pub settlements: Vec<DealSettlementSummary>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct DealSettlementSummary {
/// Incremental amount of funds transferred from client to provider for deal payment
pub payment: TokenAmount,
/// Whether the deal has settled for the final time
pub completed: bool,
}
4 changes: 2 additions & 2 deletions actors/market/tests/batch_activate_deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn activate_deals_one_sector() {
assert!(res.activation_results.all_ok());

// Deal IDs are stored under the sector, in correct order.
assert_eq!(deal_ids, get_sector_deal_ids(&rt, PROVIDER_ID, 1));
assert_eq!(deal_ids, get_sector_deal_ids(&rt, PROVIDER_ID, &[1]));

for id in deal_ids.iter() {
let state = get_deal_state(&rt, *id);
Expand Down Expand Up @@ -391,7 +391,7 @@ fn activate_new_deals_in_existing_sector() {
assert_eq!(0, get_deal_state(&rt, deal_ids[2]).sector_start_epoch);

// All deals stored under the sector, in order.
assert_eq!(deal_ids, get_sector_deal_ids(&rt, PROVIDER_ID, sector_number));
assert_eq!(deal_ids, get_sector_deal_ids(&rt, PROVIDER_ID, &[sector_number]));
check_state(&rt);
}

Expand Down
Loading

0 comments on commit 21baa60

Please sign in to comment.