Skip to content

Commit

Permalink
resolved_ts: return early after regions already had quorum (tikv#11352)…
Browse files Browse the repository at this point in the history
… (tikv#11537)

close tikv#11351, ref tikv#11352

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>

Co-authored-by: 5kbpers <tangminghua@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people authored Feb 21, 2022
1 parent f7ca4e1 commit d67444b
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 294 deletions.
213 changes: 20 additions & 193 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::f64::INFINITY;
use std::fmt;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;

use collections::{HashMap, HashSet};
Expand All @@ -12,20 +12,14 @@ use crossbeam::atomic::AtomicCell;
use engine_traits::{KvEngine, Snapshot as EngineSnapshot};
use fail::fail_point;
use futures::compat::Future01CompatExt;
use grpcio::{ChannelBuilder, Environment};
#[cfg(feature = "prost-codec")]
use kvproto::cdcpb::{
event::Event as Event_oneof_event, ChangeDataRequest,
DuplicateRequest as ErrorDuplicateRequest, Error as EventError, Event, ResolvedTs,
};
#[cfg(not(feature = "prost-codec"))]
use grpcio::Environment;
use kvproto::cdcpb::{
ChangeDataRequest, ClusterIdMismatch as ErrorClusterIdMismatch,
DuplicateRequest as ErrorDuplicateRequest, Error as EventError, Event, Event_oneof_event,
ResolvedTs,
};
use kvproto::kvrpcpb::{CheckLeaderRequest, ExtraOp as TxnExtraOp, LeaderInfo};
use kvproto::metapb::{PeerRole, Region, RegionEpoch};
use kvproto::kvrpcpb::ExtraOp as TxnExtraOp;
use kvproto::metapb::{Region, RegionEpoch};
use kvproto::tikvpb::TikvClient;
use online_config::{ConfigChange, OnlineConfig};
use pd_client::{Feature, PdClient};
Expand All @@ -34,6 +28,7 @@ use raftstore::coprocessor::ObserveID;
use raftstore::router::RaftStoreRouter;
use raftstore::store::fsm::{ChangeObserver, StoreMeta};
use raftstore::store::msg::{Callback, ReadResponse, SignificantMsg};
use raftstore::store::RegionReadProgressRegistry;
use resolved_ts::Resolver;
use security::SecurityManager;
use tikv::config::CdcConfig;
Expand All @@ -46,9 +41,9 @@ use tikv_util::sys::inspector::{self_thread_inspector, ThreadInspector};
use tikv_util::time::{Instant, Limiter};
use tikv_util::timer::SteadyTimer;
use tikv_util::worker::{Runnable, RunnableWithTimer, ScheduleError, Scheduler};
use tikv_util::{box_err, box_try, debug, error, impl_display_as_debug, info, warn};
use tikv_util::{box_err, debug, error, impl_display_as_debug, info, warn};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, Semaphore};
use txn_types::{Key, Lock, LockType, TimeStamp, TxnExtra, TxnExtraScheduler};

use crate::channel::{CdcEvent, MemoryQuota, SendError};
Expand All @@ -59,7 +54,6 @@ use crate::service::{Conn, ConnID, FeatureGate};
use crate::{CdcObserver, Error, Result};

const FEATURE_RESOLVED_TS_STORE: Feature = Feature::require(5, 0, 0);
const DEFAULT_CHECK_LEADER_TIMEOUT_MILLISECONDS: u64 = 5_000; // 5s

pub enum Deregister {
Downstream {
Expand Down Expand Up @@ -239,7 +233,7 @@ pub struct Endpoint<T, E> {
pd_client: Arc<dyn PdClient>,
timer: SteadyTimer,
tso_worker: Runtime,
store_meta: Arc<Mutex<StoreMeta>>,
store_meta: Arc<StdMutex<StoreMeta>>,
/// The concurrency manager for transactions. It's needed for CDC to check locks when
/// calculating resolved_ts.
concurrency_manager: ConcurrencyManager,
Expand All @@ -266,6 +260,7 @@ pub struct Endpoint<T, E> {
tikv_clients: Arc<Mutex<HashMap<u64, TikvClient>>>,
env: Arc<Environment>,
security_mgr: Arc<SecurityManager>,
region_read_progress: RegionReadProgressRegistry,
}

impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
Expand All @@ -276,7 +271,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
scheduler: Scheduler<Task>,
raft_router: T,
observer: CdcObserver,
store_meta: Arc<Mutex<StoreMeta>>,
store_meta: Arc<StdMutex<StoreMeta>>,
concurrency_manager: ConcurrencyManager,
env: Arc<Environment>,
security_mgr: Arc<SecurityManager>,
Expand Down Expand Up @@ -310,6 +305,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
// Assume 1KB per entry.
let max_scan_batch_size = 1024;

let region_read_progress = store_meta.lock().unwrap().region_read_progress.clone();
let ep = Endpoint {
cluster_id,
env,
Expand Down Expand Up @@ -338,6 +334,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
unresolved_region_count: 0,
sink_memory_quota,
tikv_clients: Arc::new(Mutex::new(HashMap::default())),
region_read_progress,
};
ep.register_min_ts_event();
ep
Expand Down Expand Up @@ -828,6 +825,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let store_meta = self.store_meta.clone();
let tikv_clients = self.tikv_clients.clone();
let hibernate_regions_compatible = self.config.hibernate_regions_compatible;
let region_read_progress = self.region_read_progress.clone();

let fut = async move {
let _ = timeout.compat().await;
Expand Down Expand Up @@ -857,9 +855,14 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let regions =
if hibernate_regions_compatible && gate.can_enable(FEATURE_RESOLVED_TS_STORE) {
CDC_RESOLVED_TS_ADVANCE_METHOD.set(1);
Self::region_resolved_ts_store(
let regions = regions
.into_iter()
.map(|(region_id, _)| region_id)
.collect();
resolved_ts::region_resolved_ts_store(
regions,
store_meta,
region_read_progress,
pd_client,
security_mgr,
env,
Expand Down Expand Up @@ -932,182 +935,6 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
resps.into_iter().flatten().collect::<Vec<u64>>()
}

async fn region_resolved_ts_store(
regions: Vec<(u64, ObserveID)>,
store_meta: Arc<Mutex<StoreMeta>>,
pd_client: Arc<dyn PdClient>,
security_mgr: Arc<SecurityManager>,
env: Arc<Environment>,
cdc_clients: Arc<Mutex<HashMap<u64, TikvClient>>>,
min_ts: TimeStamp,
) -> Vec<u64> {
let region_has_quorum = |region: &Region, stores: &[u64]| {
let mut voters = 0;
let mut incoming_voters = 0;
let mut demoting_voters = 0;

let mut resp_voters = 0;
let mut resp_incoming_voters = 0;
let mut resp_demoting_voters = 0;

region.get_peers().iter().for_each(|peer| {
let mut in_resp = false;
for store_id in stores {
if *store_id == peer.store_id {
in_resp = true;
break;
}
}
match peer.get_role() {
PeerRole::Voter => {
voters += 1;
if in_resp {
resp_voters += 1;
}
}
PeerRole::IncomingVoter => {
incoming_voters += 1;
if in_resp {
resp_incoming_voters += 1;
}
}
PeerRole::DemotingVoter => {
demoting_voters += 1;
if in_resp {
resp_demoting_voters += 1;
}
}
PeerRole::Learner => (),
}
});

let has_incoming_majority =
(resp_voters + resp_incoming_voters) >= ((voters + incoming_voters) / 2 + 1);
let has_demoting_majority =
(resp_voters + resp_demoting_voters) >= ((voters + demoting_voters) / 2 + 1);

has_incoming_majority && has_demoting_majority
};

let find_store_id = |region: &Region, peer_id| {
for peer in region.get_peers() {
if peer.id == peer_id {
return Some(peer.store_id);
}
}
None
};

// store_id -> leaders info, record the request to each stores
let mut store_map: HashMap<u64, Vec<LeaderInfo>> = HashMap::default();
// region_id -> region, cache the information of regions
let mut region_map: HashMap<u64, Region> = HashMap::default();
// region_id -> peers id, record the responses
let mut resp_map: HashMap<u64, Vec<u64>> = HashMap::default();
{
let meta = store_meta.lock().unwrap();
let store_id = match meta.store_id {
Some(id) => id,
None => return vec![],
};
// TODO: should using `RegionReadProgressRegistry` to dump leader info like `resolved-ts`
// to reduce the time holding the `store_meta` mutex
for (region_id, _) in regions {
if let Some(region) = meta.regions.get(&region_id) {
if let Some((term, leader_id)) = meta.leaders.get(&region_id) {
let leader_store_id = find_store_id(region, *leader_id);
if leader_store_id.is_none() {
continue;
}
if leader_store_id.unwrap() != meta.store_id.unwrap() {
continue;
}
for peer in region.get_peers() {
if peer.store_id == store_id && peer.id == *leader_id {
resp_map.entry(region_id).or_default().push(store_id);
continue;
}
if peer.get_role() == PeerRole::Learner {
continue;
}
let mut leader_info = LeaderInfo::default();
leader_info.set_peer_id(*leader_id);
leader_info.set_term(*term);
leader_info.set_region_id(region_id);
leader_info.set_region_epoch(region.get_region_epoch().clone());
store_map
.entry(peer.store_id)
.or_default()
.push(leader_info);
}
region_map.insert(region_id, region.clone());
}
}
}
}
let stores = store_map.into_iter().map(|(store_id, regions)| {
let cdc_clients = cdc_clients.clone();
let env = env.clone();
let pd_client = pd_client.clone();
let security_mgr = security_mgr.clone();
async move {
if cdc_clients.lock().unwrap().get(&store_id).is_none() {
let store = box_try!(pd_client.get_store_async(store_id).await);
let cb = ChannelBuilder::new(env.clone());
let channel = security_mgr.connect(cb, &store.address);
cdc_clients
.lock()
.unwrap()
.insert(store_id, TikvClient::new(channel));
}
let client = cdc_clients.lock().unwrap().get(&store_id).unwrap().clone();
let mut req = CheckLeaderRequest::default();
req.set_regions(regions.into());
req.set_ts(min_ts.into_inner());
let res = box_try!(
tokio::time::timeout(
Duration::from_millis(DEFAULT_CHECK_LEADER_TIMEOUT_MILLISECONDS),
box_try!(client.check_leader_async(&req))
)
.await
);
let resp = box_try!(res);
Result::Ok((store_id, resp))
}
});
let resps = futures::future::join_all(stores).await;
resps
.into_iter()
.filter_map(|resp| match resp {
Ok(resp) => Some(resp),
Err(e) => {
debug!("cdc check leader error"; "err" =>?e);
None
}
})
.map(|(store_id, resp)| {
resp.regions
.into_iter()
.map(move |region_id| (store_id, region_id))
})
.flatten()
.for_each(|(store_id, region_id)| {
resp_map.entry(region_id).or_default().push(store_id);
});
resp_map
.into_iter()
.filter_map(|(region_id, stores)| {
if region_has_quorum(&region_map[&region_id], &stores) {
Some(region_id)
} else {
debug!("cdc cannot get quorum for resolved ts";
"region_id" => region_id, "stores" => ?stores, "region" => ?&region_map[&region_id]);
None
}
})
.collect()
}

fn on_open_conn(&mut self, conn: Conn) {
self.connections.insert(conn.get_id(), conn);
}
Expand Down Expand Up @@ -1684,7 +1511,7 @@ mod tests {
) {
let mut region = Region::default();
region.set_id(1);
let store_meta = Arc::new(Mutex::new(StoreMeta::new(0)));
let store_meta = Arc::new(StdMutex::new(StoreMeta::new(0)));
let read_delegate = ReadDelegate {
tag: String::new(),
region: Arc::new(region),
Expand Down
5 changes: 5 additions & 0 deletions components/raftstore/src/store/worker/check_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ impl Runnable for Runner {
fn run(&mut self, task: Task) {
match task {
Task::CheckLeader { leaders, cb } => {
fail_point!(
"before_check_leader_store_2",
self.store_meta.lock().unwrap().store_id == Some(2),
|_| {}
);
fail_point!(
"before_check_leader_store_3",
self.store_meta.lock().unwrap().store_id == Some(3),
Expand Down
Loading

0 comments on commit d67444b

Please sign in to comment.