diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 425f7d0c5bf..d52f511b0a1 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -1,5 +1,8 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. +use std::cmp::Reverse; +use std::cmp::{Ord, Ordering as CmpOrdering, PartialOrd}; +use std::collections::BinaryHeap; use std::fmt; use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; @@ -60,6 +63,12 @@ use crate::service::{Conn, ConnID, FeatureGate}; use crate::{CdcObserver, Error, Result}; const FEATURE_RESOLVED_TS_STORE: Feature = Feature::require(5, 0, 0); +const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s +// 10 minutes, it's the default gc life time of TiDB +// and is long enough for most transactions. +const WARN_RESOLVED_TS_LAG_THRESHOLD: Duration = Duration::from_secs(600); +// Suppress repeat resolved ts lag warnning. +const WARN_RESOLVED_TS_COUNT_THRESHOLD: usize = 10; pub enum Deregister { Downstream { @@ -224,7 +233,75 @@ impl fmt::Debug for Task { } } -const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s +#[derive(PartialEq, Eq)] +struct ResolvedRegion { + region_id: u64, + resolved_ts: TimeStamp, +} + +impl PartialOrd for ResolvedRegion { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ResolvedRegion { + fn cmp(&self, other: &Self) -> CmpOrdering { + self.resolved_ts.cmp(&other.resolved_ts) + } +} + +struct ResolvedRegionHeap { + // BinaryHeap is max heap, so we reverse order to get a min heap. + heap: BinaryHeap>, +} + +impl ResolvedRegionHeap { + fn push(&mut self, region_id: u64, resolved_ts: TimeStamp) { + self.heap.push(Reverse(ResolvedRegion { + region_id, + resolved_ts, + })) + } + + // Pop slow regions and the minimum resolved ts among them. + fn pop(&mut self, count: usize) -> (TimeStamp, HashSet) { + let mut min_resolved_ts = TimeStamp::max(); + let mut outliers = HashSet::with_capacity_and_hasher(count, Default::default()); + for _ in 0..count { + if let Some(resolved_region) = self.heap.pop() { + outliers.insert(resolved_region.0.region_id); + if min_resolved_ts > resolved_region.0.resolved_ts { + min_resolved_ts = resolved_region.0.resolved_ts; + } + } else { + break; + } + } + (min_resolved_ts, outliers) + } + + fn to_hash_set(&self) -> (TimeStamp, HashSet) { + let mut min_resolved_ts = TimeStamp::max(); + let mut regions = HashSet::with_capacity_and_hasher(self.heap.len(), Default::default()); + for resolved_region in &self.heap { + regions.insert(resolved_region.0.region_id); + if min_resolved_ts > resolved_region.0.resolved_ts { + min_resolved_ts = resolved_region.0.resolved_ts; + } + } + (min_resolved_ts, regions) + } + + fn clear(&mut self) { + self.heap.clear(); + } + + fn reset_and_shrink_to(&mut self, min_capacity: usize) { + self.clear(); + self.heap.shrink_to(min_capacity); + } +} pub struct Endpoint { cluster_id: u64, @@ -245,28 +322,31 @@ pub struct Endpoint { concurrency_manager: ConcurrencyManager, config: CdcConfig, + + // Incremental scan workers: Runtime, scan_concurrency_semaphore: Arc, - scan_speed_limiter: Limiter, max_scan_batch_bytes: usize, max_scan_batch_size: usize, + sink_memory_quota: MemoryQuota, - min_resolved_ts: TimeStamp, - min_ts_region_id: u64, old_value_cache: OldValueCache, + resolved_region_heap: ResolvedRegionHeap, - // stats - resolved_region_count: usize, - unresolved_region_count: usize, - - sink_memory_quota: MemoryQuota, - + // Check leader // store_id -> client tikv_clients: Arc>>, env: Arc, security_mgr: Arc, region_read_progress: RegionReadProgressRegistry, + + // Metrics and logging. + min_resolved_ts: TimeStamp, + min_ts_region_id: u64, + resolved_region_count: usize, + unresolved_region_count: usize, + warn_resolved_ts_repeat_count: usize, } impl, E: KvEngine> Endpoint { @@ -336,12 +416,17 @@ impl, E: KvEngine> Endpoint { concurrency_manager, min_resolved_ts: TimeStamp::max(), min_ts_region_id: 0, + resolved_region_heap: ResolvedRegionHeap { + heap: BinaryHeap::new(), + }, old_value_cache, resolved_region_count: 0, unresolved_region_count: 0, sink_memory_quota, tikv_clients: Arc::new(Mutex::new(HashMap::default())), region_read_progress, + // Log the first resolved ts warnning. + warn_resolved_ts_repeat_count: WARN_RESOLVED_TS_COUNT_THRESHOLD, }; ep.register_min_ts_event(); ep @@ -709,30 +794,77 @@ impl, E: KvEngine> Endpoint { } fn on_min_ts(&mut self, regions: Vec, min_ts: TimeStamp) { + // Reset resolved_regions to empty. + let resolved_regions = &mut self.resolved_region_heap; + resolved_regions.clear(); + let total_region_count = regions.len(); - // TODO: figure out how to avoid create a hashset every time, - // saving some CPU. - let mut resolved_regions = - HashSet::with_capacity_and_hasher(regions.len(), Default::default()); self.min_resolved_ts = TimeStamp::max(); + let mut advance_ok = 0; + let mut advance_failed_none = 0; + let mut advance_failed_same = 0; + let mut advance_failed_stale = 0; for region_id in regions { if let Some(delegate) = self.capture_regions.get_mut(®ion_id) { + let old_resolved_ts = delegate + .resolver + .as_ref() + .map_or(TimeStamp::zero(), |r| r.resolved_ts()); + if old_resolved_ts > min_ts { + advance_failed_stale += 1; + } if let Some(resolved_ts) = delegate.on_min_ts(min_ts) { if resolved_ts < self.min_resolved_ts { self.min_resolved_ts = resolved_ts; self.min_ts_region_id = region_id; } - resolved_regions.insert(region_id); + resolved_regions.push(region_id, resolved_ts); + + if resolved_ts == old_resolved_ts { + advance_failed_same += 1; + } else { + advance_ok += 1; + } + } else { + advance_failed_none += 1; } } } - self.resolved_region_count = resolved_regions.len(); + let lag_millis = min_ts + .physical() + .saturating_sub(self.min_resolved_ts.physical()); + if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD { + self.warn_resolved_ts_repeat_count += 1; + if self.warn_resolved_ts_repeat_count >= WARN_RESOLVED_TS_COUNT_THRESHOLD { + self.warn_resolved_ts_repeat_count = 0; + warn!("cdc resolved ts lag too large"; + "min_resolved_ts" => self.min_resolved_ts, + "min_ts_region_id" => self.min_ts_region_id, + "min_ts" => min_ts, + "ok" => advance_ok, + "none" => advance_failed_none, + "stale" => advance_failed_stale, + "same" => advance_failed_same); + } + } + self.resolved_region_count = resolved_regions.heap.len(); self.unresolved_region_count = total_region_count - self.resolved_region_count; - self.broadcast_resolved_ts(resolved_regions); + + // Separate broadcasing outlier regions and normal regions, + // so 1) downstreams know where they should send resolve lock requests, + // and 2) resolved ts of normal regions does not fallback. + // + // Max number of outliers, in most cases, only a few regions are outliers. + // TODO: figure out how to avoid create hashset every time, saving some CPU. + let max_outlier_count = 32; + let (outlier_min_resolved_ts, outlier_regions) = resolved_regions.pop(max_outlier_count); + let (normal_min_resolved_ts, normal_regions) = resolved_regions.to_hash_set(); + self.broadcast_resolved_ts(outlier_min_resolved_ts, outlier_regions); + self.broadcast_resolved_ts(normal_min_resolved_ts, normal_regions); } - fn broadcast_resolved_ts(&self, regions: HashSet) { - let min_resolved_ts = self.min_resolved_ts.into_inner(); + fn broadcast_resolved_ts(&self, min_resolved_ts: TimeStamp, regions: HashSet) { + let min_resolved_ts = min_resolved_ts.into_inner(); let send_cdc_event = |regions: &HashSet, min_resolved_ts: u64, conn: &Conn| { let downstream_regions = conn.get_downstreams(); let mut resolved_ts = ResolvedTs::default(); @@ -847,7 +979,9 @@ impl, E: KvEngine> Endpoint { let fut = async move { let _ = timeout.compat().await; // Ignore get tso errors since we will retry every `min_ts_interval`. - let mut min_ts = pd_client.get_tso().await.unwrap_or_default(); + let min_ts_pd = pd_client.get_tso().await.unwrap_or_default(); + let mut min_ts = min_ts_pd; + let mut min_ts_min_lock = min_ts_pd; // Sync with concurrency manager so that it can work correctly when optimizations // like async commit is enabled. @@ -858,6 +992,7 @@ impl, E: KvEngine> Endpoint { if min_mem_lock_ts < min_ts { min_ts = min_mem_lock_ts; } + min_ts_min_lock = min_mem_lock_ts; } match scheduler.schedule(Task::RegisterMinTsEvent) { @@ -900,6 +1035,13 @@ impl, E: KvEngine> Endpoint { Err(err) => panic!("failed to schedule min ts event, error: {:?}", err), } } + let lag_millis = min_ts_pd.physical().saturating_sub(min_ts.physical()); + if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD { + // TODO: Suppress repeat logs by using WARN_RESOLVED_TS_COUNT_THRESHOLD. + info!("cdc min_ts lag too large"; + "min_ts" => min_ts, "min_ts_pd" => min_ts_pd, + "min_ts_min_lock" => min_ts_min_lock); + } }; self.tso_worker.spawn(fut); } @@ -1454,6 +1596,10 @@ impl, E: KvEngine> Runnable for Endpoint { impl, E: KvEngine> RunnableWithTimer for Endpoint { fn on_timeout(&mut self) { + // Reclaim resolved_region_heap memory. + self.resolved_region_heap + .reset_and_shrink_to(self.capture_regions.len()); + CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64); CDC_REGION_RESOLVE_STATUS_GAUGE_VEC .with_label_values(&["unresolved"]) @@ -2723,4 +2869,65 @@ mod tests { event ); } + + #[test] + fn test_resolved_region_heap() { + let mut heap = ResolvedRegionHeap { + heap: BinaryHeap::new(), + }; + heap.push(5, 5.into()); + heap.push(4, 4.into()); + heap.push(6, 6.into()); + heap.push(3, 3.into()); + + let (ts, regions) = heap.pop(0); + assert_eq!(ts, TimeStamp::max()); + assert!(regions.is_empty()); + + let (ts, regions) = heap.pop(2); + assert_eq!(ts, 3.into()); + assert_eq!(regions.len(), 2); + assert!(regions.contains(&3)); + assert!(regions.contains(&4)); + + // Pop outliers more then it has. + let (ts, regions) = heap.pop(3); + assert_eq!(ts, 5.into()); + assert_eq!(regions.len(), 2); + assert!(regions.contains(&5)); + assert!(regions.contains(&6)); + + // Empty regions + let (ts, regions) = heap.to_hash_set(); + assert_eq!(ts, TimeStamp::max()); + assert!(regions.is_empty()); + + let mut heap1 = ResolvedRegionHeap { + heap: BinaryHeap::new(), + }; + heap1.push(5, 5.into()); + heap1.push(4, 4.into()); + heap1.push(6, 6.into()); + heap1.push(3, 3.into()); + + let (ts, regions) = heap1.pop(1); + assert_eq!(ts, 3.into()); + assert_eq!(regions.len(), 1); + assert!(regions.contains(&3)); + + let (ts, regions) = heap1.to_hash_set(); + assert_eq!(ts, 4.into()); + assert_eq!(regions.len(), 3); + assert!(regions.contains(&4)); + assert!(regions.contains(&5)); + assert!(regions.contains(&6)); + + heap1.reset_and_shrink_to(3); + assert_eq!(3, heap1.heap.capacity()); + assert!(heap1.heap.is_empty()); + + heap1.push(1, 1.into()); + heap1.clear(); + assert!(heap1.heap.is_empty()); + } } diff --git a/components/cdc/src/lib.rs b/components/cdc/src/lib.rs index e5cc19e0aa6..dc03da88c56 100644 --- a/components/cdc/src/lib.rs +++ b/components/cdc/src/lib.rs @@ -2,6 +2,7 @@ #![feature(box_patterns)] #![feature(assert_matches)] +#![feature(shrink_to)] mod channel; mod config; diff --git a/components/resolved_ts/src/advance.rs b/components/resolved_ts/src/advance.rs index 86722603802..55794a948d7 100644 --- a/components/resolved_ts/src/advance.rs +++ b/components/resolved_ts/src/advance.rs @@ -1,5 +1,7 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use std::ffi::CString; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; @@ -18,6 +20,7 @@ use protobuf::Message; use raftstore::store::fsm::StoreMeta; use raftstore::store::util::RegionReadProgressRegistry; use security::SecurityManager; +use tikv_util::info; use tikv_util::timer::SteadyTimer; use tikv_util::worker::Scheduler; use tokio::runtime::{Builder, Runtime}; @@ -26,7 +29,7 @@ use txn_types::TimeStamp; use crate::endpoint::Task; use crate::errors::Result; -use crate::metrics::{CHECK_LEADER_REQ_ITEM_COUNT_HISTOGRAM, CHECK_LEADER_REQ_SIZE_HISTOGRAM}; +use crate::metrics::*; const DEFAULT_CHECK_LEADER_TIMEOUT_MILLISECONDS: u64 = 5_000; // 5s @@ -317,6 +320,8 @@ fn find_store_id(peer_list: &[Peer], peer_id: u64) -> Option { None } +static CONN_ID: AtomicI32 = AtomicI32::new(0); + async fn get_tikv_client( store_id: u64, pd_client: Arc, @@ -329,7 +334,11 @@ async fn get_tikv_client( Some(client) => client.clone(), None => { let store = box_try!(pd_client.get_store_async(store_id).await); - let cb = ChannelBuilder::new(env.clone()); + // hack: so it's different args, grpc will always create a new connection. + let cb = ChannelBuilder::new(env.clone()).raw_cfg_int( + CString::new("random id").unwrap(), + CONN_ID.fetch_add(1, Ordering::SeqCst), + ); let channel = security_mgr.connect(cb, &store.address); let client = TikvClient::new(channel); clients.insert(store_id, client.clone()); diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index a3aa303c797..d9b01861e5d 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -1073,6 +1073,7 @@ impl + 'static, E: Engine, L: LockManager> Tikv for mut request: CheckLeaderRequest, sink: UnarySink, ) { + let addr = ctx.peer(); let ts = request.get_ts(); let leaders = request.take_regions().into(); let (cb, resp) = paired_future_callback(); @@ -1088,8 +1089,10 @@ impl + 'static, E: Engine, L: LockManager> Tikv for sink.success(resp).await?; ServerResult::Ok(()) } - .map_err(|e| { - warn!("call CheckLeader failed"; "err" => ?e); + .map_err(move |e| { + // CheckLeader only needs quorum responses, remote may drops + // requests early. + info!("call CheckLeader failed"; "err" => ?e, "address" => addr); }) .map(|_| ());