Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: move partition #1326

Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
924722e
init
zhiqiangxu Jun 25, 2023
da95dbb
avoid recompute current_deadline
zhiqiangxu Jun 25, 2023
e502709
treat empty bitfield as all
zhiqiangxu Jun 26, 2023
eaf5370
rm useless quote
zhiqiangxu Jun 26, 2023
2fa9693
add verify
zhiqiangxu Jun 27, 2023
8725420
combine option1 & option2
zhiqiangxu Jun 28, 2023
d534373
fix
zhiqiangxu Jun 28, 2023
36caece
fix
zhiqiangxu Jun 28, 2023
badb691
nit
zhiqiangxu Jun 28, 2023
fd39aac
mod error
zhiqiangxu Jun 28, 2023
daab781
nit
zhiqiangxu Jun 28, 2023
0271369
fmt
zhiqiangxu Jun 28, 2023
c046cc6
fix ci
zhiqiangxu Jun 28, 2023
3eac91e
fix bug
zhiqiangxu Jul 3, 2023
181dc2e
add test
zhiqiangxu Jul 4, 2023
8eb2243
add more test
zhiqiangxu Jul 5, 2023
394ea44
Merge remote-tracking branch 'origin/master' into feature/move_partit…
zhiqiangxu Jul 11, 2023
d1fdb0f
partial fix for review
zhiqiangxu Jul 18, 2023
0804565
Merge remote-tracking branch 'origin/master' into feature/move_partit…
zhiqiangxu Jul 19, 2023
0353a28
adjust test
zhiqiangxu Jul 19, 2023
bc9e6a1
use .context_code
zhiqiangxu Jul 19, 2023
c1d88a2
fix for test
zhiqiangxu Jul 19, 2023
37d7d03
disallow empty partitions
zhiqiangxu Jul 19, 2023
89bcb74
refactor deadline_available_for_move
zhiqiangxu Jul 19, 2023
79ab15d
fix for clippy
zhiqiangxu Jul 19, 2023
39d6447
minor opt
zhiqiangxu Jul 19, 2023
0bd0611
only verify_windowed_post once
zhiqiangxu Jul 20, 2023
c01b54c
mod error msg
zhiqiangxu Jul 24, 2023
ccdb45f
1. verify_window_post batch by batch
zhiqiangxu Jul 25, 2023
b775fa8
fix ci
zhiqiangxu Jul 25, 2023
a5290cc
mod check for epoch
zhiqiangxu Jul 25, 2023
bf13558
partial review fix
zhiqiangxu Aug 2, 2023
9195aa1
Merge remote-tracking branch 'origin/master' into feature/move_partit…
zhiqiangxu Aug 2, 2023
70fd5fe
adjust test
zhiqiangxu Aug 2, 2023
511e170
refactor with Partition::adjust_for_move
zhiqiangxu Aug 2, 2023
d16077c
share the language with FIP
zhiqiangxu Aug 30, 2023
9ed53b8
deadline_available_for_move => ensure_deadline_available_for_move
zhiqiangxu Aug 30, 2023
15d5ac1
add some doc comment
zhiqiangxu Aug 30, 2023
5744223
more renaming
zhiqiangxu Aug 30, 2023
e8f0a22
more renaming
zhiqiangxu Aug 30, 2023
194b7cd
Merge remote-tracking branch 'origin/master' into feature/move_partit…
zhiqiangxu Aug 30, 2023
5e400d2
rename + merge master
zhiqiangxu Aug 30, 2023
c1159c5
mod wording
zhiqiangxu Aug 30, 2023
cb5e3c4
fix test
zhiqiangxu Aug 30, 2023
faa4873
renaming in test
zhiqiangxu Aug 31, 2023
7bc1156
apply alex's idea of not re-quantizing at all.
zhiqiangxu Sep 19, 2023
f7b7bd7
1. forbid moving when there're early terminations
zhiqiangxu Sep 22, 2023
3b25fdc
rm anyhow::Ok
zhiqiangxu Sep 22, 2023
f3ddb23
Merge branch 'master' into feature/move_partition_verify
zhiqiangxu Sep 23, 2023
9c8517a
minor optimization by observing that partition `faulty_power` should …
zhiqiangxu Sep 23, 2023
3c6a374
adjust find_sectors_by_expiration for not re-quantizing
zhiqiangxu Sep 25, 2023
271e3c0
add test
zhiqiangxu Sep 26, 2023
d2202ab
fix for review
zhiqiangxu Sep 26, 2023
d276241
add a comment about not re-quantizing when moving expirations_epochs
zhiqiangxu Sep 26, 2023
b2dc66c
minor optimization
zhiqiangxu Sep 27, 2023
fcc4cc0
avoid scanning the same range twice
zhiqiangxu Sep 28, 2023
7619065
1. review fix
zhiqiangxu Sep 28, 2023
6e91261
fix comment
zhiqiangxu Sep 28, 2023
9964257
use with_context_code
zhiqiangxu Sep 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fvm_ipld_encoding = "0.4.0"
fvm_ipld_blockstore = "0.2.0"
fvm_ipld_hamt = "0.7.0"
fvm_ipld_kamt = "0.3.0"
fvm_ipld_amt = { version = "0.6.1" }
fvm_ipld_amt = { version = "0.6.2" }
fvm_ipld_bitfield = "0.6.0"

# workspace
Expand Down
97 changes: 97 additions & 0 deletions actors/miner/src/deadline_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::{
BitFieldQueue, ExpirationSet, Partition, PartitionSectorMap, PoStPartition, PowerPair,
SectorOnChainInfo, Sectors, TerminationResult,
};

use crate::SECTORS_AMT_BITWIDTH;

// Bitwidth of AMTs determined empirically from mutation patterns and projections of mainnet data.
Expand Down Expand Up @@ -102,6 +103,102 @@ impl Deadlines {
self.due[deadline_idx as usize] = store.put_cbor(deadline, Code::Blake2b256)?;
Ok(())
}

pub fn move_partitions<BS: Blockstore>(
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
policy: &Policy,
store: &BS,
orig_deadline: &mut Deadline,
dest_deadline: &mut Deadline,
partitions: &BitField,
) -> anyhow::Result<()> {
let mut orig_partitions = orig_deadline.partitions_amt(store)?;
let mut dest_partitions = dest_deadline.partitions_amt(store)?;

// even though we're moving partitions intact, we still need to update orig/dest `Deadline` accordingly.

if dest_partitions.count() + partitions.len() > policy.max_partitions_per_deadline {
return Err(actor_error!(
forbidden,
"partitions in dest_deadline will exceed max_partitions_per_deadline"
))?;
}

let first_dest_partition_idx = dest_partitions.count();
for (i, orig_partition_idx) in partitions.iter().enumerate() {
let moving_partition = orig_partitions
.get(orig_partition_idx)?
.ok_or_else(|| actor_error!(not_found, "no partition {}", orig_partition_idx))?
.clone();
if !moving_partition.faults.is_empty() || !moving_partition.unproven.is_empty() {
return Err(actor_error!(forbidden, "partition with faults or unproven sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if orig_deadline.early_terminations.get(orig_partition_idx) {
return Err(actor_error!(forbidden, "partition with early terminated sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if !moving_partition.faulty_power.is_zero() {
return Err(actor_error!(
illegal_state,
"partition faulty_power should be zero when faults is empty, partition_idx {}",
orig_partition_idx
))?;
}

let dest_partition_idx = first_dest_partition_idx + i as u64;

// sector_count is both total sector count and total live sector count, since no sector is faulty here.
let sector_count = moving_partition.sectors.len();

// start updating orig/dest `Deadline` here

orig_deadline.total_sectors -= sector_count;
orig_deadline.live_sectors -= sector_count;

dest_deadline.total_sectors += sector_count;
dest_deadline.live_sectors += sector_count;

orig_partitions.set(orig_partition_idx, Partition::new(store)?)?;
dest_partitions.set(dest_partition_idx, moving_partition)?;
}

// update expirations_epochs Cid of Deadline.
// Note that when moving a partition from `orig_expirations_epochs` to `dest_expirations_epochs`,
// we explicitly keep the `dest_epoch` the same as `orig_epoch`, this is by design of not re-quantizing.
{
let mut epochs_to_remove = Vec::<u64>::new();
let mut orig_expirations_epochs: Array<BitField, _> =
Array::load(&orig_deadline.expirations_epochs, store)?;
anorth marked this conversation as resolved.
Show resolved Hide resolved
let mut dest_expirations_epochs: Array<BitField, _> =
Array::load(&dest_deadline.expirations_epochs, store)?;
orig_expirations_epochs.for_each_mut(|orig_epoch, orig_bitfield| {
let dest_epoch = orig_epoch;
let mut to_bitfield =
dest_expirations_epochs.get(dest_epoch)?.cloned().unwrap_or_default();
for (i, partition_id) in partitions.iter().enumerate() {
if orig_bitfield.get(partition_id) {
orig_bitfield.unset(partition_id);
to_bitfield.set(first_dest_partition_idx + i as u64);
}
}
dest_expirations_epochs.set(dest_epoch, to_bitfield)?;

if orig_bitfield.is_empty() {
epochs_to_remove.push(orig_epoch);
}

Ok(())
})?;
if !epochs_to_remove.is_empty() {
orig_expirations_epochs.batch_delete(epochs_to_remove, true)?;
}
orig_deadline.expirations_epochs = orig_expirations_epochs.flush()?;
dest_deadline.expirations_epochs = dest_expirations_epochs.flush()?;
}

orig_deadline.partitions = orig_partitions.flush()?;
dest_deadline.partitions = dest_partitions.flush()?;

Ok(())
}
}

/// Deadline holds the state for all sectors due at a specific deadline.
Expand Down
67 changes: 67 additions & 0 deletions actors/miner/src/deadlines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,73 @@ pub fn deadline_available_for_compaction(
)
}

/// the distance between from_deadline and to_deadline clockwise in deadline unit.
fn deadline_distance(policy: &Policy, from_deadline: u64, to_deadline: u64) -> u64 {
if to_deadline >= from_deadline {
to_deadline - from_deadline
} else {
policy.wpost_period_deadlines - from_deadline + to_deadline
}
}

/// only allow moving to a nearer deadline from current one
pub fn ensure_deadline_available_for_move(
policy: &Policy,
orig_deadline: u64,
dest_deadline: u64,
current_deadline: &DeadlineInfo,
) -> Result<(), String> {
if !deadline_is_mutable(
policy,
current_deadline.period_start,
orig_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move from a deadline {}, immutable at epoch {}",
orig_deadline, current_deadline.current_epoch
));
}

if !deadline_is_mutable(
policy,
current_deadline.period_start,
dest_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move to a deadline {}, immutable at epoch {}",
dest_deadline, current_deadline.current_epoch
));
}

if deadline_distance(policy, current_deadline.index, dest_deadline)
>= deadline_distance(policy, current_deadline.index, orig_deadline)
{
return Err(format!(
"can only move to a deadline which is nearer from current deadline {}, dest_deadline {} is not nearer than orig_deadline {}",
current_deadline.index, dest_deadline, orig_deadline
));
}

Ok(())
}

// returns the nearest deadline info with index `target_deadline` that has already occured from the point of view of the current deadline(including the current deadline).
pub fn nearest_occured_deadline_info(
policy: &Policy,
current_deadline: &DeadlineInfo,
target_deadline: u64,
) -> DeadlineInfo {
// Find the proving period start for the deadline in question.
let mut pp_start = current_deadline.period_start;
if current_deadline.index < target_deadline {
pp_start -= policy.wpost_proving_period
}

new_deadline_info(policy, pp_start, target_deadline, current_deadline.current_epoch)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look correct, but please add tests to demonstrate all the cases.

// Determine current period start and deadline index directly from current epoch and
// the offset implied by the proving period. This works correctly even for the state
// of a miner actor without an active deadline cron
Expand Down
108 changes: 71 additions & 37 deletions actors/miner/src/expiration_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::convert::TryInto;

use anyhow::{anyhow, Context};
use cid::Cid;
use fil_actors_runtime::network::EPOCHS_IN_DAY;
use fil_actors_runtime::runtime::Policy;
use fil_actors_runtime::{ActorDowncast, Array};
use fvm_ipld_amt::{Error as AmtError, ValueMut};
Expand Down Expand Up @@ -643,16 +644,16 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
Ok(())
}

/// Note that the `epoch` parameter doesn't quantize, and assumes the entry for the epoch is non-empty.
fn remove(
&mut self,
raw_epoch: ChainEpoch,
epoch: ChainEpoch,
on_time_sectors: &BitField,
early_sectors: &BitField,
active_power: &PowerPair,
faulty_power: &PowerPair,
pledge: &TokenAmount,
) -> anyhow::Result<()> {
let epoch = self.quant.quantize_up(raw_epoch);
let mut expiration_set = self
.amt
.get(epoch.try_into()?)
Expand Down Expand Up @@ -776,46 +777,67 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size: SectorSize,
sectors: &[SectorOnChainInfo],
) -> anyhow::Result<Vec<SectorExpirationSet>> {
let mut declared_expirations = BTreeMap::<ChainEpoch, bool>::new();
if sectors.is_empty() {
return Ok(Vec::new());
}

let mut sectors_by_number = BTreeMap::<u64, &SectorOnChainInfo>::new();
let mut all_remaining = BTreeSet::<u64>::new();
let mut declared_expirations = BTreeSet::<i64>::new();

for sector in sectors {
let q_expiration = self.quant.quantize_up(sector.expiration);
declared_expirations.insert(q_expiration, true);
declared_expirations.insert(sector.expiration);
all_remaining.insert(sector.sector_number);
sectors_by_number.insert(sector.sector_number, sector);
}

let mut expiration_groups =
Vec::<SectorExpirationSet>::with_capacity(declared_expirations.len());

for (&expiration, _) in declared_expirations.iter() {
let es = self.may_get(expiration)?;

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
expiration,
);
if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}
let mut expiration_groups = Vec::<SectorExpirationSet>::with_capacity(sectors.len());

let mut old_end = 0i64;
for expiration in declared_expirations.iter() {
// Basically we're scanning [sector.expiration, sector.expiration+EPOCHS_IN_DAY) for active sectors.
// Since a sector may have been moved from another deadline, the possible range for an active sector is [sector.expiration, sector.expiration+EPOCHS_IN_DAY).
//
// And we're also trying to avoid scanning the same range twice by choosing a proper `start_at`.

let start_at = if *expiration > old_end {
*expiration
} else {
// +1 since the range is inclusive
old_end + 1
};
let new_end = (expiration + EPOCHS_IN_DAY - 1) as u64;

// scan range [start_at, new_end] for active sectors of interest
self.amt.for_each_while_ranged(Some(start_at as u64), None, |epoch, es| {
if epoch > new_end {
// no need to scan any more
return Ok(false);
}

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
epoch as ChainEpoch,
);

if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}

Ok(epoch < new_end && !all_remaining.is_empty())
})?;

old_end = new_end as i64;
}

// If sectors remain, traverse next in epoch order. Remaining sectors should be
// rescheduled to expire soon, so this traversal should exit early.
if !all_remaining.is_empty() {
self.amt.for_each_while(|epoch, es| {
let epoch = epoch as ChainEpoch;
// If this set's epoch is one of our declared epochs, we've already processed it
// in the loop above, so skip processing here. Sectors rescheduled to this epoch
// would have been included in the earlier processing.
if declared_expirations.contains_key(&epoch) {
return Ok(true);
}

// Sector should not be found in EarlyExpirations which holds faults. An implicit assumption
// of grouping is that it only returns sectors with active power. ExpirationQueue should not
Expand All @@ -826,7 +848,7 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size,
&sectors_by_number,
&mut all_remaining,
es.clone(),
es,
epoch,
);

Expand Down Expand Up @@ -911,7 +933,7 @@ fn group_expiration_set(
sector_size: SectorSize,
sectors: &BTreeMap<u64, &SectorOnChainInfo>,
include_set: &mut BTreeSet<u64>,
es: ExpirationSet,
es: &ExpirationSet,
expiration: ChainEpoch,
) -> SectorExpirationSet {
let mut sector_numbers = Vec::new();
Expand All @@ -927,14 +949,26 @@ fn group_expiration_set(
}
}

SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es,
if sector_numbers.is_empty() {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: ExpirationSet::default(),
}
} else {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es.clone(), // lazy clone
}
}
}

Expand Down
Loading