Skip to content

Commit

Permalink
Fix deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
anorth committed Aug 15, 2023
1 parent 9aa6ab7 commit 4a27b20
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 23 deletions.
16 changes: 9 additions & 7 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,14 @@ impl Actor {
let mut activated_deals: HashSet<DealID> = HashSet::new();
let mut sectors_deals: Vec<(SectorNumber, SectorDealIDs)> = vec![];

'sector: for p in params.sectors {
'sector: for sector in params.sectors {
let mut sector_deal_ids: HashSet<DealID> = HashSet::new();
let mut validated_proposals = vec![];
// Iterate once to validate all the requested deals.
// If a deal fails, skip the whole sector.
for deal_id in &p.deal_ids {
for deal_id in &sector.deal_ids {
// Check each deal is present only once, within and across sectors.
if activated_deals.contains(deal_id) {
if sector_deal_ids.contains(deal_id) || activated_deals.contains(deal_id) {
log::warn!("failed to activate sector, duplicated deal {}", deal_id);
batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
continue 'sector;
Expand All @@ -581,7 +582,7 @@ impl Actor {
&states,
&pending_deals,
&miner_addr,
p.sector_expiry,
sector.sector_expiry,
curr_epoch,
st.next_id,
)? {
Expand All @@ -592,6 +593,7 @@ impl Actor {
continue 'sector;
}
};
sector_deal_ids.insert(*deal_id);
validated_proposals.push(proposal);
}

Expand All @@ -600,7 +602,7 @@ impl Actor {
// Given that all deals validated, prepare the state updates for them all.
// There's no continue below here to ensure updates are consistent.
// Any error must abort.
for (deal_id, proposal) in p.deal_ids.iter().zip(validated_proposals) {
for (deal_id, proposal) in sector.deal_ids.iter().zip(validated_proposals) {
activated_deals.insert(*deal_id);
// Extract and remove any verified allocation ID for the pending deal.
let alloc_id = remove_pending_deal_allocation_id(
Expand All @@ -624,7 +626,7 @@ impl Actor {
deal_states.push((
*deal_id,
DealState {
sector_number: p.sector_number,
sector_number: sector.sector_number,
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
Expand All @@ -633,7 +635,7 @@ impl Actor {
));
}

sectors_deals.push((p.sector_number, SectorDealIDs { deals: p.deal_ids.clone() }));
sectors_deals.push((sector.sector_number, SectorDealIDs { deals: sector.deal_ids.clone() }));
activations.push(SectorDealActivation { nonverified_deal_space, verified_infos });
batch_gen.add_success();
}
Expand Down
39 changes: 38 additions & 1 deletion actors/market/tests/batch_activate_deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,44 @@ fn handles_sectors_empty_of_deals_gracefully() {
}

#[test]
fn fails_to_activate_sectors_containing_duplicate_deals() {
fn fails_to_activate_single_sector_duplicate_deals() {
let rt = setup();
let deal_1 = create_deal(&rt, CLIENT_ADDR, &MINER_ADDRESSES, START_EPOCH, END_EPOCH, false);
let deal_2 = create_deal(&rt, CLIENT_ADDR, &MINER_ADDRESSES, START_EPOCH + 1, END_EPOCH, false);

let next_allocation_id = 1;
rt.set_caller(*ACCOUNT_ACTOR_CODE_ID, WORKER_ADDR);
let deal_ids = publish_deals(
&rt,
&MINER_ADDRESSES,
&[deal_1, deal_2],
TokenAmount::zero(),
next_allocation_id,
);
assert_eq!(2, deal_ids.len());
let id_1 = deal_ids[0];
let id_2 = deal_ids[1];

let sector_type = RegisteredSealProof::StackedDRG8MiBV1;
// group into sectors
let sectors_deals = vec![
// duplicate id_1
SectorDeals {
sector_number: 0,
deal_ids: vec![id_1, id_1, id_2],
sector_type,
sector_expiry: END_EPOCH,
},
];
let res = batch_activate_deals_raw(&rt, PROVIDER_ADDR, sectors_deals).unwrap();
let res: BatchActivateDealsResult =
res.unwrap().deserialize().expect("VerifyDealsForActivation failed!");

assert_eq!(vec![ExitCode::USR_ILLEGAL_ARGUMENT], res.activation_results.codes());
}

#[test]
fn fails_to_activate_cross_sector_duplicate_deals() {
let rt = setup();
let deal_1 = create_deal(&rt, CLIENT_ADDR, &MINER_ADDRESSES, START_EPOCH, END_EPOCH, false);
let deal_2 = create_deal(&rt, CLIENT_ADDR, &MINER_ADDRESSES, START_EPOCH + 1, END_EPOCH, false);
Expand Down
66 changes: 51 additions & 15 deletions actors/market/tests/sector_content_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ fn failures_isolated() {
}

#[test]
fn duplicates_rejected() {
fn rejects_duplicates_in_same_sector() {
let rt = setup();
let deals = create_deals(&rt, 1);
let deals = create_deals(&rt, 2);
rt.set_caller(*ACCOUNT_ACTOR_CODE_ID, WORKER_ADDR);
let deal_ids =
publish_deals(&rt, &MINER_ADDRESSES, &deals, TokenAmount::zero(), NO_ALLOCATION_ID);
Expand All @@ -289,43 +289,79 @@ fn duplicates_rejected() {
SectorChanges {
sector: 1,
minimum_commitment_epoch: END_EPOCH + 10,
added: vec![pieces[0].clone(), pieces[0].clone()],
added: vec![pieces[0].clone(), pieces[0].clone(), pieces[1].clone()],
},
// Same deal again, referencing same sector.
];
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(1, ret.sectors.len());
// The first piece succeeds just once, the second piece succeeds too.
assert_eq!(
vec![
PieceReturn { code: ExitCode::OK, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::OK, data: vec![] },
],
ret.sectors[0].added
);

// Deal IDs are stored under the right sector, in correct order.
assert_eq!(deal_ids[0..2], get_sector_deal_ids(&rt, &PROVIDER_ADDR, 1));
assert_eq!(Vec::<DealID>::new(), get_sector_deal_ids(&rt, &PROVIDER_ADDR, 2));
}

#[test]
fn rejects_duplicates_across_sectors() {
let rt = setup();
let deals = create_deals(&rt, 3);
rt.set_caller(*ACCOUNT_ACTOR_CODE_ID, WORKER_ADDR);
let deal_ids =
publish_deals(&rt, &MINER_ADDRESSES, &deals, TokenAmount::zero(), NO_ALLOCATION_ID);
let pieces = pieces_from_deals(&deal_ids, &deals);

let changes = vec![
SectorChanges {
sector: 1,
minimum_commitment_epoch: END_EPOCH + 10,
added: vec![pieces[0].clone()],
},
// Same deal again in a different sector.
// Same piece again, referencing same sector, plus a new piece.
SectorChanges {
sector: 1,
minimum_commitment_epoch: END_EPOCH + 10,
added: vec![pieces[0].clone(), pieces[1].clone()],
},
// Same deal piece in a different sector, plus second piece agoin, plus a new piece.
SectorChanges {
sector: 2,
minimum_commitment_epoch: END_EPOCH + 10,
added: vec![pieces[0].clone()],
added: vec![pieces[0].clone(), pieces[1].clone(), pieces[2].clone()],
},
];
let ret = sector_content_changed(&rt, PROVIDER_ADDR, changes).unwrap();
assert_eq!(3, ret.sectors.len());
// Succeeds just once.
// Succeeds in the first time.
assert_eq!(vec![PieceReturn { code: ExitCode::OK, data: vec![] },], ret.sectors[0].added);
// Fails second time, but other piece succeeds.
assert_eq!(
vec![
PieceReturn { code: ExitCode::OK, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::OK, data: vec![] },
],
ret.sectors[0].added
);
assert_eq!(
vec![PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] }],
ret.sectors[1].added
);
// Both duplicates fail, but third piece succeeds.
assert_eq!(
vec![PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] }],
vec![
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::USR_ILLEGAL_ARGUMENT, data: vec![] },
PieceReturn { code: ExitCode::OK, data: vec![] },
],
ret.sectors[2].added
);

// Deal IDs are stored under the right sector, in correct order.
assert_eq!(deal_ids[0..1], get_sector_deal_ids(&rt, &PROVIDER_ADDR, 1));
assert_eq!(Vec::<DealID>::new(), get_sector_deal_ids(&rt, &PROVIDER_ADDR, 2));
assert_eq!(deal_ids[0..2], get_sector_deal_ids(&rt, &PROVIDER_ADDR, 1));
assert_eq!(deal_ids[2..3], get_sector_deal_ids(&rt, &PROVIDER_ADDR, 2));
}

#[test]
Expand Down

0 comments on commit 4a27b20

Please sign in to comment.