From 287a391b006372517d035ff51ecf0db14747b1bc Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 21 Feb 2022 17:19:16 +0800 Subject: [PATCH] cdc: separate resolved region outliers (#11991) Separate broadcasing outlier regions and normal regions, so 1) downstreams know where they should send resolve lock requests, and 2) resolved ts of normal regions does not fallback. close pingcap/tiflow#4516 close pingcap/tiflow#4311 ref pingcap/tiflow#4146 Signed-off-by: Neil Shen --- components/cdc/src/endpoint.rs | 247 +++++++++++++++++++++++--- components/resolved_ts/src/advance.rs | 84 +++++++-- components/resolved_ts/src/metrics.rs | 7 + src/server/service/kv.rs | 7 +- 4 files changed, 306 insertions(+), 39 deletions(-) diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index afebe605aae..6edb14cd596 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -1,5 +1,8 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +use std::cmp::Reverse; +use std::cmp::{Ord, Ordering as CmpOrdering, PartialOrd}; +use std::collections::BinaryHeap; use std::f64::INFINITY; use std::fmt; use std::marker::PhantomData; @@ -54,6 +57,12 @@ use crate::service::{Conn, ConnID, FeatureGate}; use crate::{CdcObserver, Error, Result}; const FEATURE_RESOLVED_TS_STORE: Feature = Feature::require(5, 0, 0); +const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s +// 10 minutes, it's the default gc life time of TiDB +// and is long enough for most transactions. +const WARN_RESOLVED_TS_LAG_THRESHOLD: Duration = Duration::from_secs(600); +// Suppress repeat resolved ts lag warnning. +const WARN_RESOLVED_TS_COUNT_THRESHOLD: usize = 10; pub enum Deregister { Downstream { @@ -218,7 +227,75 @@ impl fmt::Debug for Task { } } -const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s +#[derive(PartialEq, Eq)] +struct ResolvedRegion { + region_id: u64, + resolved_ts: TimeStamp, +} + +impl PartialOrd for ResolvedRegion { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ResolvedRegion { + fn cmp(&self, other: &Self) -> CmpOrdering { + self.resolved_ts.cmp(&other.resolved_ts) + } +} + +struct ResolvedRegionHeap { + // BinaryHeap is max heap, so we reverse order to get a min heap. + heap: BinaryHeap>, +} + +impl ResolvedRegionHeap { + fn push(&mut self, region_id: u64, resolved_ts: TimeStamp) { + self.heap.push(Reverse(ResolvedRegion { + region_id, + resolved_ts, + })) + } + + // Pop slow regions and the minimum resolved ts among them. + fn pop(&mut self, count: usize) -> (TimeStamp, HashSet) { + let mut min_resolved_ts = TimeStamp::max(); + let mut outliers = HashSet::with_capacity_and_hasher(count, Default::default()); + for _ in 0..count { + if let Some(resolved_region) = self.heap.pop() { + outliers.insert(resolved_region.0.region_id); + if min_resolved_ts > resolved_region.0.resolved_ts { + min_resolved_ts = resolved_region.0.resolved_ts; + } + } else { + break; + } + } + (min_resolved_ts, outliers) + } + + fn to_hash_set(&self) -> (TimeStamp, HashSet) { + let mut min_resolved_ts = TimeStamp::max(); + let mut regions = HashSet::with_capacity_and_hasher(self.heap.len(), Default::default()); + for resolved_region in &self.heap { + regions.insert(resolved_region.0.region_id); + if min_resolved_ts > resolved_region.0.resolved_ts { + min_resolved_ts = resolved_region.0.resolved_ts; + } + } + (min_resolved_ts, regions) + } + + fn clear(&mut self) { + self.heap.clear(); + } + + fn reset_and_shrink_to(&mut self, min_capacity: usize) { + self.clear(); + self.heap.shrink_to(min_capacity); + } +} pub struct Endpoint { cluster_id: u64, @@ -239,28 +316,31 @@ pub struct Endpoint { concurrency_manager: ConcurrencyManager, config: CdcConfig, + + // Incremental scan workers: Runtime, scan_concurrency_semaphore: Arc, - scan_speed_limiter: Limiter, max_scan_batch_bytes: usize, max_scan_batch_size: usize, + sink_memory_quota: MemoryQuota, - min_resolved_ts: TimeStamp, - min_ts_region_id: u64, old_value_cache: OldValueCache, + resolved_region_heap: ResolvedRegionHeap, - // stats - resolved_region_count: usize, - unresolved_region_count: usize, - - sink_memory_quota: MemoryQuota, - + // Check leader // store_id -> client tikv_clients: Arc>>, env: Arc, security_mgr: Arc, region_read_progress: RegionReadProgressRegistry, + + // Metrics and logging. + min_resolved_ts: TimeStamp, + min_ts_region_id: u64, + resolved_region_count: usize, + unresolved_region_count: usize, + warn_resolved_ts_repeat_count: usize, } impl, E: KvEngine> Endpoint { @@ -329,12 +409,17 @@ impl, E: KvEngine> Endpoint { concurrency_manager, min_resolved_ts: TimeStamp::max(), min_ts_region_id: 0, + resolved_region_heap: ResolvedRegionHeap { + heap: BinaryHeap::new(), + }, old_value_cache, resolved_region_count: 0, unresolved_region_count: 0, sink_memory_quota, tikv_clients: Arc::new(Mutex::new(HashMap::default())), region_read_progress, + // Log the first resolved ts warnning. + warn_resolved_ts_repeat_count: WARN_RESOLVED_TS_COUNT_THRESHOLD, }; ep.register_min_ts_event(); ep @@ -692,30 +777,77 @@ impl, E: KvEngine> Endpoint { } fn on_min_ts(&mut self, regions: Vec, min_ts: TimeStamp) { + // Reset resolved_regions to empty. + let resolved_regions = &mut self.resolved_region_heap; + resolved_regions.clear(); + let total_region_count = regions.len(); - // TODO: figure out how to avoid create a hashset every time, - // saving some CPU. - let mut resolved_regions = - HashSet::with_capacity_and_hasher(regions.len(), Default::default()); self.min_resolved_ts = TimeStamp::max(); + let mut advance_ok = 0; + let mut advance_failed_none = 0; + let mut advance_failed_same = 0; + let mut advance_failed_stale = 0; for region_id in regions { if let Some(delegate) = self.capture_regions.get_mut(®ion_id) { + let old_resolved_ts = delegate + .resolver + .as_ref() + .map_or(TimeStamp::zero(), |r| r.resolved_ts()); + if old_resolved_ts > min_ts { + advance_failed_stale += 1; + } if let Some(resolved_ts) = delegate.on_min_ts(min_ts) { if resolved_ts < self.min_resolved_ts { self.min_resolved_ts = resolved_ts; self.min_ts_region_id = region_id; } - resolved_regions.insert(region_id); + resolved_regions.push(region_id, resolved_ts); + + if resolved_ts == old_resolved_ts { + advance_failed_same += 1; + } else { + advance_ok += 1; + } + } else { + advance_failed_none += 1; } } } - self.resolved_region_count = resolved_regions.len(); + let lag_millis = min_ts + .physical() + .saturating_sub(self.min_resolved_ts.physical()); + if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD { + self.warn_resolved_ts_repeat_count += 1; + if self.warn_resolved_ts_repeat_count >= WARN_RESOLVED_TS_COUNT_THRESHOLD { + self.warn_resolved_ts_repeat_count = 0; + warn!("cdc resolved ts lag too large"; + "min_resolved_ts" => self.min_resolved_ts, + "min_ts_region_id" => self.min_ts_region_id, + "min_ts" => min_ts, + "ok" => advance_ok, + "none" => advance_failed_none, + "stale" => advance_failed_stale, + "same" => advance_failed_same); + } + } + self.resolved_region_count = resolved_regions.heap.len(); self.unresolved_region_count = total_region_count - self.resolved_region_count; - self.broadcast_resolved_ts(resolved_regions); + + // Separate broadcasing outlier regions and normal regions, + // so 1) downstreams know where they should send resolve lock requests, + // and 2) resolved ts of normal regions does not fallback. + // + // Max number of outliers, in most cases, only a few regions are outliers. + // TODO: figure out how to avoid create hashset every time, saving some CPU. + let max_outlier_count = 32; + let (outlier_min_resolved_ts, outlier_regions) = resolved_regions.pop(max_outlier_count); + let (normal_min_resolved_ts, normal_regions) = resolved_regions.to_hash_set(); + self.broadcast_resolved_ts(outlier_min_resolved_ts, outlier_regions); + self.broadcast_resolved_ts(normal_min_resolved_ts, normal_regions); } - fn broadcast_resolved_ts(&self, regions: HashSet) { - let min_resolved_ts = self.min_resolved_ts.into_inner(); + fn broadcast_resolved_ts(&self, min_resolved_ts: TimeStamp, regions: HashSet) { + let min_resolved_ts = min_resolved_ts.into_inner(); let send_cdc_event = |regions: &HashSet, min_resolved_ts: u64, conn: &Conn| { let downstream_regions = conn.get_downstreams(); let mut resolved_ts = ResolvedTs::default(); @@ -830,7 +962,9 @@ impl, E: KvEngine> Endpoint { let fut = async move { let _ = timeout.compat().await; // Ignore get tso errors since we will retry every `min_ts_interval`. - let mut min_ts = pd_client.get_tso().await.unwrap_or_default(); + let min_ts_pd = pd_client.get_tso().await.unwrap_or_default(); + let mut min_ts = min_ts_pd; + let mut min_ts_min_lock = min_ts_pd; // Sync with concurrency manager so that it can work correctly when optimizations // like async commit is enabled. @@ -841,6 +975,7 @@ impl, E: KvEngine> Endpoint { if min_mem_lock_ts < min_ts { min_ts = min_mem_lock_ts; } + min_ts_min_lock = min_mem_lock_ts; } match scheduler.schedule(Task::RegisterMinTsEvent) { @@ -883,6 +1018,13 @@ impl, E: KvEngine> Endpoint { Err(err) => panic!("failed to schedule min ts event, error: {:?}", err), } } + let lag_millis = min_ts_pd.physical().saturating_sub(min_ts.physical()); + if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD { + // TODO: Suppress repeat logs by using WARN_RESOLVED_TS_COUNT_THRESHOLD. + info!("cdc min_ts lag too large"; + "min_ts" => min_ts, "min_ts_pd" => min_ts_pd, + "min_ts_min_lock" => min_ts_min_lock); + } }; self.tso_worker.spawn(fut); } @@ -1356,6 +1498,10 @@ impl, E: KvEngine> Runnable for Endpoint { impl, E: KvEngine> RunnableWithTimer for Endpoint { fn on_timeout(&mut self) { + // Reclaim resolved_region_heap memory. + self.resolved_region_heap + .reset_and_shrink_to(self.capture_regions.len()); + CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64); CDC_REGION_RESOLVE_STATUS_GAUGE_VEC .with_label_values(&["unresolved"]) @@ -2506,4 +2652,65 @@ mod tests { event ); } + + #[test] + fn test_resolved_region_heap() { + let mut heap = ResolvedRegionHeap { + heap: BinaryHeap::new(), + }; + heap.push(5, 5.into()); + heap.push(4, 4.into()); + heap.push(6, 6.into()); + heap.push(3, 3.into()); + + let (ts, regions) = heap.pop(0); + assert_eq!(ts, TimeStamp::max()); + assert!(regions.is_empty()); + + let (ts, regions) = heap.pop(2); + assert_eq!(ts, 3.into()); + assert_eq!(regions.len(), 2); + assert!(regions.contains(&3)); + assert!(regions.contains(&4)); + + // Pop outliers more then it has. + let (ts, regions) = heap.pop(3); + assert_eq!(ts, 5.into()); + assert_eq!(regions.len(), 2); + assert!(regions.contains(&5)); + assert!(regions.contains(&6)); + + // Empty regions + let (ts, regions) = heap.to_hash_set(); + assert_eq!(ts, TimeStamp::max()); + assert!(regions.is_empty()); + + let mut heap1 = ResolvedRegionHeap { + heap: BinaryHeap::new(), + }; + heap1.push(5, 5.into()); + heap1.push(4, 4.into()); + heap1.push(6, 6.into()); + heap1.push(3, 3.into()); + + let (ts, regions) = heap1.pop(1); + assert_eq!(ts, 3.into()); + assert_eq!(regions.len(), 1); + assert!(regions.contains(&3)); + + let (ts, regions) = heap1.to_hash_set(); + assert_eq!(ts, 4.into()); + assert_eq!(regions.len(), 3); + assert!(regions.contains(&4)); + assert!(regions.contains(&5)); + assert!(regions.contains(&6)); + + heap1.reset_and_shrink_to(3); + assert_eq!(3, heap1.heap.capacity()); + assert!(heap1.heap.is_empty()); + + heap1.push(1, 1.into()); + heap1.clear(); + assert!(heap1.heap.is_empty()); + } } diff --git a/components/resolved_ts/src/advance.rs b/components/resolved_ts/src/advance.rs index 86722603802..34a2042a751 100644 --- a/components/resolved_ts/src/advance.rs +++ b/components/resolved_ts/src/advance.rs @@ -1,5 +1,7 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use std::ffi::CString; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; @@ -18,15 +20,17 @@ use protobuf::Message; use raftstore::store::fsm::StoreMeta; use raftstore::store::util::RegionReadProgressRegistry; use security::SecurityManager; +use tikv_util::time::Instant; use tikv_util::timer::SteadyTimer; use tikv_util::worker::Scheduler; +use tikv_util::{error, info}; use tokio::runtime::{Builder, Runtime}; use tokio::sync::Mutex; use txn_types::TimeStamp; use crate::endpoint::Task; -use crate::errors::Result; -use crate::metrics::{CHECK_LEADER_REQ_ITEM_COUNT_HISTOGRAM, CHECK_LEADER_REQ_SIZE_HISTOGRAM}; +use crate::errors::{Error, Result}; +use crate::metrics::*; const DEFAULT_CHECK_LEADER_TIMEOUT_MILLISECONDS: u64 = 5_000; // 5s @@ -203,7 +207,7 @@ pub async fn region_resolved_ts_store( let store_count = store_map.len(); let mut stores: Vec<_> = store_map .into_iter() - .map(|(store_id, regions)| { + .map(|(to_store, regions)| { let tikv_clients = tikv_clients.clone(); let env = env.clone(); let pd_client = pd_client.clone(); @@ -212,25 +216,65 @@ pub async fn region_resolved_ts_store( CHECK_LEADER_REQ_SIZE_HISTOGRAM.observe((leader_info_size * region_num) as f64); CHECK_LEADER_REQ_ITEM_COUNT_HISTOGRAM.observe(region_num as f64); async move { - let client = box_try!( - get_tikv_client(store_id, pd_client, security_mgr, env, tikv_clients.clone()) - .await - ); + let client = + get_tikv_client(to_store, pd_client, security_mgr, env, tikv_clients.clone()) + .await; + let client = match client { + Ok(client) => client, + Err(err) => { + error!("check leader failed"; + "error" => ?err, + "store_id" => store_id, "to_store" => to_store); + tikv_clients.lock().await.remove(&to_store); + return Err(Error::Other(box_err!(err))); + } + }; let mut req = CheckLeaderRequest::default(); req.set_regions(regions.into()); req.set_ts(min_ts.into_inner()); - let res = box_try!( - tokio::time::timeout( - Duration::from_millis(DEFAULT_CHECK_LEADER_TIMEOUT_MILLISECONDS), - box_try!(client.check_leader_async(&req)) - ) - .await - ); + let start = Instant::now_coarse(); + defer!({ + let elapsed = start.saturating_elapsed(); + slow_log!( + elapsed, + "check leader rpc costs too long, store_id: {}, to_store: {}", + store_id, + to_store + ); + RTS_CHECK_LEADER_DURATION_HISTOGRAM_VEC + .with_label_values(&["rpc"]) + .observe(elapsed.as_secs_f64()); + }); + let rpc = match client.check_leader_async(&req) { + Ok(rpc) => rpc, + Err(err) => { + error!("check leader failed"; + "error" => ?err, + "store_id" => store_id, "to_store" => to_store); + tikv_clients.lock().await.remove(&to_store); + return Err(Error::Other(box_err!(err))); + } + }; + let timeout = Duration::from_millis(DEFAULT_CHECK_LEADER_TIMEOUT_MILLISECONDS); + let res_timout = tokio::time::timeout(timeout, rpc).await; + let res = match res_timout { + Ok(res) => res, + Err(err) => { + error!("check leader failed"; + "error" => ?err, + "store_id" => store_id, "to_store" => to_store); + tikv_clients.lock().await.remove(&to_store); + return Err(Error::Other(box_err!(err))); + } + }; let resp = match res { Ok(resp) => resp, Err(err) => { - tikv_clients.lock().await.remove(&store_id); - return Err(box_err!(err)); + error!("check leader failed"; + "error" => ?err, + "store_id" => store_id, "to_store" => to_store); + tikv_clients.lock().await.remove(&to_store); + return Err(Error::Other(box_err!(err))); } }; Result::Ok((store_id, resp)) @@ -317,6 +361,8 @@ fn find_store_id(peer_list: &[Peer], peer_id: u64) -> Option { None } +static CONN_ID: AtomicI32 = AtomicI32::new(0); + async fn get_tikv_client( store_id: u64, pd_client: Arc, @@ -329,7 +375,11 @@ async fn get_tikv_client( Some(client) => client.clone(), None => { let store = box_try!(pd_client.get_store_async(store_id).await); - let cb = ChannelBuilder::new(env.clone()); + // hack: so it's different args, grpc will always create a new connection. + let cb = ChannelBuilder::new(env.clone()).raw_cfg_int( + CString::new("random id").unwrap(), + CONN_ID.fetch_add(1, Ordering::SeqCst), + ); let channel = security_mgr.connect(cb, &store.address); let client = TikvClient::new(channel); clients.insert(store_id, client.clone()); diff --git a/components/resolved_ts/src/metrics.rs b/components/resolved_ts/src/metrics.rs index 8a23c8537f1..04f9e33bf91 100644 --- a/components/resolved_ts/src/metrics.rs +++ b/components/resolved_ts/src/metrics.rs @@ -70,4 +70,11 @@ lazy_static! { &["type"] ) .unwrap(); + pub static ref RTS_CHECK_LEADER_DURATION_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!( + "tikv_resolved_ts_check_leader_duration_seconds", + "Bucketed histogram of resolved-ts check leader duration", + &["type"], + exponential_buckets(0.005, 2.0, 20).unwrap(), + ) + .unwrap(); } diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 4bfbadb90d3..68b38ce7fbc 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -1072,6 +1072,7 @@ impl + 'static, E: Engine, L: LockManager> Tikv for mut request: CheckLeaderRequest, sink: UnarySink, ) { + let addr = ctx.peer(); let ts = request.get_ts(); let leaders = request.take_regions().into(); let (cb, resp) = paired_future_callback(); @@ -1087,8 +1088,10 @@ impl + 'static, E: Engine, L: LockManager> Tikv for sink.success(resp).await?; ServerResult::Ok(()) } - .map_err(|e| { - warn!("call CheckLeader failed"; "err" => ?e); + .map_err(move |e| { + // CheckLeader only needs quorum responses, remote may drops + // requests early. + info!("call CheckLeader failed"; "err" => ?e, "address" => addr); }) .map(|_| ());