Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: move get_region_approximate_size to split check worker #9081

Merged
merged 28 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
45def28
send size and keys to router
sleepymole Nov 23, 2020
730257f
rename
sleepymole Nov 23, 2020
9c398f8
add pending heartbeat pd
sleepymole Nov 23, 2020
def6e6c
add test_region_heartbeat_when_size_or_keys_is_none
sleepymole Nov 24, 2020
deac59c
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 24, 2020
46a8e6d
use pending_pd_heartbeat_tasks
sleepymole Nov 24, 2020
f7fffc0
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 24, 2020
c549ebd
rerange code
sleepymole Nov 25, 2020
fe1edde
typo
sleepymole Nov 25, 2020
3414886
avoid too many heartbeats
sleepymole Nov 25, 2020
294b4b4
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 25, 2020
20b00e9
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 25, 2020
70f33c0
estimate size and keys after split
sleepymole Nov 27, 2020
98b4eeb
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 27, 2020
23acc0f
use and_then instead of map_or
sleepymole Nov 27, 2020
9b18961
bug fix
sleepymole Nov 27, 2020
298b30b
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 30, 2020
a73a7ae
send task after router register
sleepymole Nov 30, 2020
a57544e
Merge branch 'master' into fix-pd-hb-block
sleepymole Nov 30, 2020
4511aeb
Merge branch 'master' into fix-pd-hb-block
sleepymole Dec 1, 2020
f5780de
only leader need to update size and keys
sleepymole Dec 1, 2020
d5a7233
only leader need to update size and keys
sleepymole Dec 3, 2020
c2eee6f
Merge branch 'master' into fix-pd-hb-block
sleepymole Dec 3, 2020
02aad56
estimate size before heartbeat_pd
sleepymole Dec 4, 2020
bf9aa3d
Merge branch 'master' into fix-pd-hb-block
sleepymole Dec 4, 2020
55d47e0
remove extra newline
sleepymole Dec 4, 2020
7d87f61
fix estimated size and keys
sleepymole Dec 4, 2020
7a6e4a9
Merge branch 'master' into fix-pd-hb-block
sleepymole Dec 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::borrow::Cow;
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::collections::VecDeque;
use std::iter::Iterator;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;
use std::{cmp, u64};

Expand Down Expand Up @@ -2125,9 +2127,28 @@ where
if meta.region_ranges.remove(&last_key).is_none() {
panic!("{} original region should exists", self.fsm.peer.tag);
}
// It's not correct anymore, so set it to None to let split checker update it.
self.fsm.peer.approximate_size = None;
let last_region_id = regions.last().unwrap().get_id();

let new_region_count = regions.len() as u64;
// Roughly estimate the size and keys and then let split checker to update it.
self.fsm.peer.approximate_size =
self.fsm.peer.approximate_size.map(|x| x / new_region_count);
self.fsm.peer.approximate_keys =
self.fsm.peer.approximate_keys.map(|x| x / new_region_count);
if let Err(e) = self.ctx.split_check_scheduler.schedule(
SplitCheckTask::GetRegionApproximateSizeAndKeys {
region: self.fsm.peer.region().clone(),
pending_tasks: Arc::new(AtomicU64::new(1)),
cb: Box::new(move |_, _| {}),
},
) {
error!(
"failed to schedule split check task";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
"err" => ?e,
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not leader, this task is very wasteful and the approximate_size & approximate_key are no need to be updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think self.fsm.peer.heartbeat_pd should be called here if it's a leader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not leader, this task is very wasteful and the approximate_size & approximate_key are no need to be updated.

Oh, I didn't notice here in the last update. I will add some check later.

And I think self.fsm.peer.heartbeat_pd should be called here if it's a leader.

heartbeat_pd is called a few lines before here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliqi I have updated. PTAL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move these codes to the upper if is_leader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated again. Please have a look.

for new_region in regions {
let new_region_id = new_region.get_id();

Expand Down Expand Up @@ -2218,6 +2239,8 @@ where
new_peer.peer.peer_stat = self.fsm.peer.peer_stat.clone();
let campaigned = new_peer.peer.maybe_campaign(is_leader);
new_peer.has_ready |= campaigned;
new_peer.peer.approximate_size = self.fsm.peer.approximate_size;
new_peer.peer.approximate_keys = self.fsm.peer.approximate_keys;

if is_leader {
// The new peer is likely to become leader, send a heartbeat immediately to reduce
Expand All @@ -2226,7 +2249,7 @@ where
}

new_peer.peer.activate(self.ctx);
meta.regions.insert(new_region_id, new_region);
meta.regions.insert(new_region_id, new_region.clone());
meta.readers
.insert(new_region_id, ReadDelegate::from_peer(new_peer.get_peer()));
if last_region_id == new_region_id {
Expand Down Expand Up @@ -2255,6 +2278,22 @@ where
}
}
}

// The size and keys for new region may be far from the real value.
// So we let split checker to update it immediately.
if let Err(e) = self.ctx.split_check_scheduler.schedule(
SplitCheckTask::GetRegionApproximateSizeAndKeys {
region: new_region,
pending_tasks: Arc::new(AtomicU64::new(1)),
cb: Box::new(move |_, _| {}),
},
) {
error!(
"failed to schedule split check task";
"region_id" => new_region_id,
"err" => ?e,
);
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
concurrency_manager: ConcurrencyManager,
pd_client: Arc<C>,
) -> Result<()> {
let engines = builder.engines.clone();
let cfg = builder.cfg.value().clone();
let store = builder.store.clone();

Expand Down Expand Up @@ -1319,7 +1318,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
store.get_id(),
Arc::clone(&pd_client),
self.router.clone(),
engines.kv,
workers.pd_worker.scheduler(),
cfg.pd_store_heartbeat_tick_interval.0,
auto_split_controller,
Expand Down
66 changes: 59 additions & 7 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ use crate::coprocessor::{CoprocessorHost, RegionChangeEvent};
use crate::store::fsm::apply::CatchUpLogs;
use crate::store::fsm::store::PollContext;
use crate::store::fsm::{apply, Apply, ApplyMetrics, ApplyTask, GroupState, Proposal};
use crate::store::worker::{ReadDelegate, ReadExecutor, ReadProgress, RegionTask};
use crate::store::worker::{HeartbeatTask, ReadDelegate, ReadExecutor, ReadProgress, RegionTask};
use crate::store::{
Callback, Config, GlobalReplicationState, PdTask, ReadIndexContext, ReadResponse,
SplitCheckTask,
};
use crate::{Error, Result};
use pd_client::INVALID_ID;
Expand Down Expand Up @@ -470,6 +471,11 @@ where

/// Check whether this proposal can be proposed based on its epoch
cmd_epoch_checker: CmdEpochChecker<EK::Snapshot>,

/// The number of pending pd heartbeat tasks. Pd heartbeat task may be blocked by
/// reading rocksdb. To avoid unnecessary io operations, we always let the later
/// task run when there are more than 1 pending tasks.
pub pending_pd_heartbeat_tasks: Arc<AtomicU64>,
}

impl<EK, ER> Peer<EK, ER>
Expand Down Expand Up @@ -562,6 +568,7 @@ where
txn_extra_op: Arc::new(AtomicCell::new(TxnExtraOp::Noop)),
max_ts_sync_status: Arc::new(AtomicU64::new(0)),
cmd_epoch_checker: Default::default(),
pending_pd_heartbeat_tasks: Arc::new(AtomicU64::new(0)),
};

// If this region has only one peer and I am the one, campaign directly.
Expand Down Expand Up @@ -3093,26 +3100,71 @@ where
Some(status)
}

pub fn is_region_size_or_keys_none(&self) -> bool {
fail_point!("region_size_or_keys_none", |_| true);
self.approximate_size.is_none() || self.approximate_keys.is_none()
}

pub fn heartbeat_pd<T>(&mut self, ctx: &PollContext<EK, ER, T>) {
let task = PdTask::Heartbeat {
let task = PdTask::Heartbeat(HeartbeatTask {
term: self.term(),
region: self.region().clone(),
peer: self.peer.clone(),
down_peers: self.collect_down_peers(ctx.cfg.max_peer_down_duration.0),
pending_peers: self.collect_pending_peers(ctx),
written_bytes: self.peer_stat.written_bytes,
written_keys: self.peer_stat.written_keys,
approximate_size: self.approximate_size,
approximate_keys: self.approximate_keys,
approximate_size: self.approximate_size.unwrap_or_default(),
approximate_keys: self.approximate_keys.unwrap_or_default(),
replication_status: self.region_replication_status(),
});
if !self.is_region_size_or_keys_none() {
if let Err(e) = ctx.pd_scheduler.schedule(task) {
error!(
"failed to notify pd";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"err" => ?e,
);
}
return;
}

if self.pending_pd_heartbeat_tasks.load(Ordering::SeqCst) > 2 {
return;
}
let region_id = self.region_id;
let peer_id = self.peer.get_id();
let scheduler = ctx.pd_scheduler.clone();
let split_check_task = SplitCheckTask::GetRegionApproximateSizeAndKeys {
region: self.region().clone(),
pending_tasks: self.pending_pd_heartbeat_tasks.clone(),
cb: Box::new(move |size: u64, keys: u64| {
if let PdTask::Heartbeat(mut h) = task {
h.approximate_size = size;
h.approximate_keys = keys;
if let Err(e) = scheduler.schedule(PdTask::Heartbeat(h)) {
error!(
"failed to notify pd";
"region_id" => region_id,
"peer_id" => peer_id,
"err" => ?e,
);
}
}
}),
};
if let Err(e) = ctx.pd_scheduler.schedule(task) {
self.pending_pd_heartbeat_tasks
.fetch_add(1, Ordering::SeqCst);
if let Err(e) = ctx.split_check_scheduler.schedule(split_check_task) {
error!(
"failed to notify pd";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"region_id" => region_id,
"peer_id" => peer_id,
"err" => ?e,
);
self.pending_pd_heartbeat_tasks
.fetch_sub(1, Ordering::SeqCst);
}
}

Expand Down
4 changes: 3 additions & 1 deletion components/raftstore/src/store/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub use self::cleanup::{Runner as CleanupRunner, Task as CleanupTask};
pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask};
pub use self::compact::{Runner as CompactRunner, Task as CompactTask};
pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask};
pub use self::pd::{FlowStatistics, FlowStatsReporter, Runner as PdRunner, Task as PdTask};
pub use self::pd::{
FlowStatistics, FlowStatsReporter, HeartbeatTask, Runner as PdRunner, Task as PdTask,
};
pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask};
pub use self::read::{LocalReader, Progress as ReadProgress, ReadDelegate, ReadExecutor};
pub use self::region::{Runner as RegionRunner, Task as RegionTask};
Expand Down
88 changes: 32 additions & 56 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use kvproto::replication_modepb::RegionReplicationStatus;
use prometheus::local::LocalHistogram;
use raft::eraftpb::ConfChangeType;

use crate::coprocessor::{get_region_approximate_keys, get_region_approximate_size};
use crate::store::cmd_resp::new_error;
use crate::store::metrics::*;
use crate::store::util::{is_epoch_stale, ConfChangeKind, KeysInfoFormatter};
Expand Down Expand Up @@ -75,6 +74,19 @@ where
}
}

pub struct HeartbeatTask {
pub term: u64,
pub region: metapb::Region,
pub peer: metapb::Peer,
pub down_peers: Vec<pdpb::PeerStats>,
pub pending_peers: Vec<metapb::Peer>,
pub written_bytes: u64,
pub written_keys: u64,
pub approximate_size: u64,
pub approximate_keys: u64,
pub replication_status: Option<RegionReplicationStatus>,
}

/// Uses an asynchronous thread to tell PD something.
pub enum Task<E>
where
Expand All @@ -99,18 +111,7 @@ where
AutoSplit {
split_infos: Vec<SplitInfo>,
},
Heartbeat {
term: u64,
region: metapb::Region,
peer: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>,
written_bytes: u64,
written_keys: u64,
approximate_size: Option<u64>,
approximate_keys: Option<u64>,
replication_status: Option<RegionReplicationStatus>,
},
Heartbeat(HeartbeatTask),
StoreHeartbeat {
stats: pdpb::StoreStats,
store_info: StoreInfo<E>,
Expand Down Expand Up @@ -225,17 +226,12 @@ where
region.get_id(),
KeysInfoFormatter(split_keys.iter())
),
Task::Heartbeat {
ref region,
ref peer,
ref replication_status,
..
} => write!(
Task::Heartbeat(ref hb_task) => write!(
f,
"heartbeat for region {:?}, leader {}, replication status {:?}",
region,
peer.get_id(),
replication_status
hb_task.region,
hb_task.peer.get_id(),
hb_task.replication_status
),
Task::StoreHeartbeat { ref stats, .. } => {
write!(f, "store heartbeat stats: {:?}", stats)
Expand Down Expand Up @@ -437,7 +433,6 @@ where
store_id: u64,
pd_client: Arc<T>,
router: RaftRouter<EK, ER>,
db: EK,
region_peers: HashMap<u64, PeerStat>,
store_stat: StoreStat,
is_hb_receiver_scheduled: bool,
Expand Down Expand Up @@ -465,7 +460,6 @@ where
store_id: u64,
pd_client: Arc<T>,
router: RaftRouter<EK, ER>,
db: EK,
scheduler: Scheduler<Task<EK>>,
store_heartbeat_interval: Duration,
auto_split_controller: AutoSplitController,
Expand All @@ -481,7 +475,6 @@ where
store_id,
pd_client,
router,
db,
is_hb_receiver_scheduled: false,
region_peers: HashMap::default(),
store_stat: StoreStat::default(),
Expand Down Expand Up @@ -1096,24 +1089,7 @@ where
spawn_local(f);
}

Task::Heartbeat {
term,
region,
peer,
down_peers,
pending_peers,
written_bytes,
written_keys,
approximate_size,
approximate_keys,
replication_status,
} => {
let approximate_size = approximate_size.unwrap_or_else(|| {
get_region_approximate_size(&self.db, &region, 0).unwrap_or_default()
});
let approximate_keys = approximate_keys.unwrap_or_else(|| {
get_region_approximate_keys(&self.db, &region, 0).unwrap_or_default()
});
Task::Heartbeat(hb_task) => {
let (
read_bytes_delta,
read_keys_delta,
Expand All @@ -1123,15 +1099,15 @@ where
) = {
let peer_stat = self
.region_peers
.entry(region.get_id())
.entry(hb_task.region.get_id())
.or_insert_with(PeerStat::default);
let read_bytes_delta = peer_stat.read_bytes - peer_stat.last_read_bytes;
let read_keys_delta = peer_stat.read_keys - peer_stat.last_read_keys;
let written_bytes_delta = written_bytes - peer_stat.last_written_bytes;
let written_keys_delta = written_keys - peer_stat.last_written_keys;
let written_bytes_delta = hb_task.written_bytes - peer_stat.last_written_bytes;
let written_keys_delta = hb_task.written_keys - peer_stat.last_written_keys;
let mut last_report_ts = peer_stat.last_report_ts;
peer_stat.last_written_bytes = written_bytes;
peer_stat.last_written_keys = written_keys;
peer_stat.last_written_bytes = hb_task.written_bytes;
peer_stat.last_written_keys = hb_task.written_keys;
peer_stat.last_read_bytes = peer_stat.read_bytes;
peer_stat.last_read_keys = peer_stat.read_keys;
peer_stat.last_report_ts = UnixSecs::now();
Expand All @@ -1147,21 +1123,21 @@ where
)
};
self.handle_heartbeat(
term,
region,
peer,
hb_task.term,
hb_task.region,
hb_task.peer,
RegionStat {
down_peers,
pending_peers,
down_peers: hb_task.down_peers,
pending_peers: hb_task.pending_peers,
written_bytes: written_bytes_delta,
written_keys: written_keys_delta,
read_bytes: read_bytes_delta,
read_keys: read_keys_delta,
approximate_size,
approximate_keys,
approximate_size: hb_task.approximate_size,
approximate_keys: hb_task.approximate_keys,
last_report_ts,
},
replication_status,
hb_task.replication_status,
)
}
Task::StoreHeartbeat { stats, store_info } => {
Expand Down
Loading