diff --git a/skaffold/manifests/config.yaml b/skaffold/manifests/config.yaml index 890a06f705e1c0..e34be3fa588e0c 100644 --- a/skaffold/manifests/config.yaml +++ b/skaffold/manifests/config.yaml @@ -4,8 +4,11 @@ metadata: name: vector-config data: vector.toml: | + [sources.internal_metrics] + type = "internal_metrics" + [sinks.stdout] type = "console" - inputs = ["kubernetes_logs"] + inputs = ["kubernetes_logs", "internal_metrics"] target = "stdout" encoding = "json" diff --git a/src/internal_events/kubernetes/api_watcher.rs b/src/internal_events/kubernetes/api_watcher.rs new file mode 100644 index 00000000000000..52269a4a876e30 --- /dev/null +++ b/src/internal_events/kubernetes/api_watcher.rs @@ -0,0 +1,24 @@ +use super::InternalEvent; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct RequestPrepared { + pub request: R, +} + +impl InternalEvent for RequestPrepared { + fn emit_logs(&self) { + trace!(message = "request prepared", ?self.request); + } +} + +#[derive(Debug)] +pub struct ResponseReceived { + pub response: R, +} + +impl InternalEvent for ResponseReceived { + fn emit_logs(&self) { + trace!(message = "got response", ?self.response); + } +} diff --git a/src/internal_events/kubernetes/instrumenting_state.rs b/src/internal_events/kubernetes/instrumenting_state.rs new file mode 100644 index 00000000000000..8def5ecb78524f --- /dev/null +++ b/src/internal_events/kubernetes/instrumenting_state.rs @@ -0,0 +1,39 @@ +use super::InternalEvent; +use metrics::counter; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct StateItemAdded; + +#[derive(Debug)] +pub struct StateItemUpdated; + +#[derive(Debug)] +pub struct StateItemDeleted; + +#[derive(Debug)] +pub struct StateResynced; + +impl InternalEvent for StateItemAdded { + fn emit_metrics(&self) { + counter!("k8s_state_ops", 1, "op_kind" => "item_added"); + } +} + +impl InternalEvent for StateItemUpdated { + fn emit_metrics(&self) { + counter!("k8s_state_ops", 1, "op_kind" => "item_updated"); + } +} + +impl InternalEvent for StateItemDeleted { + fn emit_metrics(&self) { + counter!("k8s_state_ops", 1, "op_kind" => "item_deleted"); + } +} + +impl InternalEvent for StateResynced { + fn emit_metrics(&self) { + counter!("k8s_state_ops", 1, "op_kind" => "resynced"); + } +} diff --git a/src/internal_events/kubernetes/instrumenting_watcher.rs b/src/internal_events/kubernetes/instrumenting_watcher.rs new file mode 100644 index 00000000000000..91b69c7b4ccedd --- /dev/null +++ b/src/internal_events/kubernetes/instrumenting_watcher.rs @@ -0,0 +1,43 @@ +use super::InternalEvent; +use metrics::counter; +use std::fmt::Debug; + +#[derive(Debug)] +pub struct WatchRequestInvoked; + +impl InternalEvent for WatchRequestInvoked { + fn emit_metrics(&self) { + counter!("k8s_watch_request_invoked", 1); + } +} + +#[derive(Debug)] +pub struct WatchRequestInvocationFailed { + pub error: E, +} + +impl InternalEvent for WatchRequestInvocationFailed { + fn emit_logs(&self) { + error!(message = "watch invocation failed", ?self.error); + } +} + +#[derive(Debug)] +pub struct WatchStreamItemObtained; + +impl InternalEvent for WatchStreamItemObtained { + fn emit_metrics(&self) { + counter!("k8s_watch_stream_items_obtained", 1); + } +} + +#[derive(Debug)] +pub struct WatchStreamErrored { + pub error: E, +} + +impl InternalEvent for WatchStreamErrored { + fn emit_logs(&self) { + error!(message = "watch stream errored", ?self.error); + } +} diff --git a/src/internal_events/kubernetes/mod.rs b/src/internal_events/kubernetes/mod.rs new file mode 100644 index 00000000000000..f5caf3340ecb90 --- /dev/null +++ b/src/internal_events/kubernetes/mod.rs @@ -0,0 +1,9 @@ +#![cfg(feature = "kubernetes")] + +use super::InternalEvent; + +pub mod api_watcher; +pub mod instrumenting_state; +pub mod instrumenting_watcher; +pub mod reflector; +pub mod stream; diff --git a/src/internal_events/kubernetes/reflector.rs b/src/internal_events/kubernetes/reflector.rs new file mode 100644 index 00000000000000..4090aab0a475f6 --- /dev/null +++ b/src/internal_events/kubernetes/reflector.rs @@ -0,0 +1,19 @@ +use super::InternalEvent; +use metrics::counter; + +/// Emitted when reflector gets a desync from the watch command. +#[derive(Debug)] +pub struct DesyncReceived { + /// The underlying error. + pub error: E, +} + +impl InternalEvent for DesyncReceived { + fn emit_logs(&self) { + warn!(message = "handling desync", error = ?self.error); + } + + fn emit_metrics(&self) { + counter!("k8s_reflector_desyncs", 1); + } +} diff --git a/src/internal_events/kubernetes/stream.rs b/src/internal_events/kubernetes/stream.rs new file mode 100644 index 00000000000000..964d553eabcb3e --- /dev/null +++ b/src/internal_events/kubernetes/stream.rs @@ -0,0 +1,14 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct ChunkProcessed { + pub byte_size: usize, +} + +impl InternalEvent for ChunkProcessed { + fn emit_metrics(&self) { + counter!("k8s_stream_chunks_processed", 1); + counter!("k8s_stream_bytes_processed", self.byte_size as u64); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 3f9de6d7fbb5da..e551cb6b2a52c7 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -18,6 +18,8 @@ mod udp; mod unix; mod vector; +pub mod kubernetes; + pub use self::add_fields::*; pub use self::aws_kinesis_streams::*; pub use self::blackhole::*; diff --git a/src/kubernetes/api_watcher.rs b/src/kubernetes/api_watcher.rs index 1dd400c8b1f5a0..172d151371eed2 100644 --- a/src/kubernetes/api_watcher.rs +++ b/src/kubernetes/api_watcher.rs @@ -6,6 +6,7 @@ use super::{ watch_request_builder::WatchRequestBuilder, watcher::{self, Watcher}, }; +use crate::internal_events::kubernetes::api_watcher as internal_events; use futures::{ future::BoxFuture, stream::{BoxStream, Stream}, @@ -59,7 +60,7 @@ where .request_builder .build(watch_optional) .context(invocation::RequestPreparation)?; - trace!(message = "request prepared", ?request); + emit!(internal_events::RequestPrepared { request: &request }); // Send request, get response. let response = self @@ -67,7 +68,9 @@ where .send(request) .await .context(invocation::Request)?; - trace!(message = "got response", ?response); + emit!(internal_events::ResponseReceived { + response: &response + }); // Handle response status code. let status = response.status(); diff --git a/src/kubernetes/instrumenting_watcher.rs b/src/kubernetes/instrumenting_watcher.rs new file mode 100644 index 00000000000000..7cfd1cba00e29f --- /dev/null +++ b/src/kubernetes/instrumenting_watcher.rs @@ -0,0 +1,67 @@ +//! A watcher that adds instrumentation. + +use super::watcher::{self, Watcher}; +use crate::internal_events::kubernetes::instrumenting_watcher as internal_events; +use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; +use k8s_openapi::{WatchOptional, WatchResponse}; + +/// A watcher that wraps another watcher with instrumentation calls. +pub struct InstrumentingWatcher +where + T: Watcher, +{ + inner: T, +} + +impl InstrumentingWatcher +where + T: Watcher, +{ + /// Create a new [`InstrumentingWatcher`]. + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +impl Watcher for InstrumentingWatcher +where + T: Watcher, + ::Stream: 'static, +{ + type Object = ::Object; + + type InvocationError = ::InvocationError; + + type StreamError = ::StreamError; + type Stream = BoxStream<'static, Result, Self::StreamError>>; + + fn watch<'a>( + &'a mut self, + watch_optional: WatchOptional<'a>, + ) -> BoxFuture<'a, Result>> + { + Box::pin(self.inner.watch(watch_optional).map(|result| { + result + .map(|stream| { + emit!(internal_events::WatchRequestInvoked); + Box::pin(stream.map(|item_result| { + item_result + .map(|item| { + emit!(internal_events::WatchStreamItemObtained); + item + }) + .map_err(|error| { + emit!(internal_events::WatchRequestInvocationFailed { + error: &error + }); + error + }) + })) as BoxStream<'static, _> + }) + .map_err(|error| { + emit!(internal_events::WatchRequestInvocationFailed { error: &error }); + error + }) + })) + } +} diff --git a/src/kubernetes/mod.rs b/src/kubernetes/mod.rs index addb4235f55a3e..3d1fb1399ae1f2 100644 --- a/src/kubernetes/mod.rs +++ b/src/kubernetes/mod.rs @@ -24,6 +24,7 @@ pub mod api_watcher; pub mod client; pub mod delayed_delete; pub mod hash_value; +pub mod instrumenting_watcher; pub mod mock_watcher; pub mod multi_response_decoder; pub mod reflector; diff --git a/src/kubernetes/reflector.rs b/src/kubernetes/reflector.rs index 76b5fcdf1dfe37..013d6388e91fa8 100644 --- a/src/kubernetes/reflector.rs +++ b/src/kubernetes/reflector.rs @@ -4,6 +4,7 @@ use super::{ resource_version, watcher::{self, Watcher}, }; +use crate::internal_events::kubernetes::reflector as internal_events; use futures::{ pin_mut, stream::{Stream, StreamExt}, @@ -90,7 +91,7 @@ where let stream = match invocation_result { Ok(val) => val, Err(watcher::invocation::Error::Desync { source }) => { - warn!(message = "handling desync", error = ?source); + emit!(internal_events::DesyncReceived { error: source }); // We got desynced, reset the state and retry fetching. // By omiting the flush here, we cache the results from the // previous run until flush is issued when the new events @@ -278,6 +279,7 @@ mod tests { }; use crate::{ kubernetes::{ + instrumenting_watcher::InstrumentingWatcher, mock_watcher::{self, MockWatcher}, state, }, @@ -356,11 +358,13 @@ mod tests { let (state_events_tx, _state_events_rx) = mpsc::channel(0); let (_state_action_tx, state_action_rx) = mpsc::channel(0); let state_writer = state::mock::Writer::new(state_events_tx, state_action_rx); + let state_writer = state::instrumenting::Writer::new(state_writer); // Prepare watcher. let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0); let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0); let watcher = MockWatcher::::new(watcher_events_tx, watcher_invocations_rx); + let watcher = InstrumentingWatcher::new(watcher); // Prepare reflector. let mut reflector = Reflector::new( @@ -521,6 +525,7 @@ mod tests { let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0); let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0); let watcher = MockWatcher::::new(watcher_events_tx, watcher_invocations_rx); + let watcher = InstrumentingWatcher::new(watcher); // Prepare reflector. let mut reflector = Reflector::new( @@ -592,11 +597,13 @@ mod tests { let (state_events_tx, mut state_events_rx) = mpsc::channel(0); let (mut state_action_tx, state_action_rx) = mpsc::channel(0); let state_writer = state::mock::Writer::new(state_events_tx, state_action_rx); + let state_writer = state::instrumenting::Writer::new(state_writer); // Prepare watcher. let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0); let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0); let watcher = MockWatcher::::new(watcher_events_tx, watcher_invocations_rx); + let watcher = InstrumentingWatcher::new(watcher); // Prepare reflector. let deletion_delay = Duration::from_secs(600); @@ -745,11 +752,13 @@ mod tests { let (state_events_tx, _state_events_rx) = mpsc::channel(0); let (_state_action_tx, state_action_rx) = mpsc::channel(0); let state_writer = state::mock::Writer::new(state_events_tx, state_action_rx); + let state_writer = state::instrumenting::Writer::new(state_writer); // Prepare watcher. let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0); let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0); let watcher = MockWatcher::::new(watcher_events_tx, watcher_invocations_rx); + let watcher = InstrumentingWatcher::new(watcher); // Prepare reflector. let mut reflector = Reflector::new( @@ -821,6 +830,7 @@ mod tests { // Prepare state. let (state_reader, state_writer) = evmap10::new(); let state_writer = Writer::new(state_writer); + let state_writer = state::instrumenting::Writer::new(state_writer); let resulting_state_reader = state_reader.clone(); // Prepare watcher. @@ -828,6 +838,7 @@ mod tests { let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0); let watcher: MockWatcher = MockWatcher::new(watcher_events_tx, watcher_invocations_rx); + let watcher = InstrumentingWatcher::new(watcher); // Prepare reflector. let mut reflector = Reflector::new( diff --git a/src/kubernetes/state/instrumenting.rs b/src/kubernetes/state/instrumenting.rs new file mode 100644 index 00000000000000..1b4273767395dd --- /dev/null +++ b/src/kubernetes/state/instrumenting.rs @@ -0,0 +1,238 @@ +//! An instrumenting state wrapper. + +use crate::internal_events::kubernetes::instrumenting_state as internal_events; +use async_trait::async_trait; + +/// A [`super::Write`] implementatiom that wraps another [`super::Write`] and +/// adds instrumentation. +pub struct Writer { + inner: T, +} + +impl Writer { + /// Take a [`super::Write`] and return it wrapped with [`Self`]. + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +#[async_trait] +impl super::Write for Writer +where + T: super::Write + Send, +{ + type Item = ::Item; + + async fn add(&mut self, item: Self::Item) { + emit!(internal_events::StateItemAdded); + self.inner.add(item).await + } + + async fn update(&mut self, item: Self::Item) { + emit!(internal_events::StateItemUpdated); + self.inner.update(item).await + } + + async fn delete(&mut self, item: Self::Item) { + emit!(internal_events::StateItemDeleted); + self.inner.delete(item).await + } + + async fn resync(&mut self) { + emit!(internal_events::StateResynced); + self.inner.resync().await + } +} + +#[cfg(test)] +mod tests { + use super::super::{mock, Write}; + use super::*; + use crate::test_util; + use futures::{channel::mpsc, SinkExt, StreamExt}; + use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta}; + + fn prepare_test() -> ( + Writer>, + mpsc::Receiver>, + mpsc::Sender<()>, + ) { + let (events_tx, events_rx) = mpsc::channel(0); + let (actions_tx, actions_rx) = mpsc::channel(0); + let writer = mock::Writer::new(events_tx, actions_rx); + let writer = Writer::new(writer); + (writer, events_rx, actions_tx) + } + + fn make_pod() -> Pod { + Pod { + metadata: Some(ObjectMeta { + name: Some("pod_name".to_owned()), + uid: Some("pod_uid".to_owned()), + ..ObjectMeta::default() + }), + ..Pod::default() + } + } + + fn get_metric_value(op_kind: &'static str) -> Option { + let controller = crate::metrics::CONTROLLER.get().unwrap_or_else(|| { + crate::metrics::init().unwrap(); + crate::metrics::CONTROLLER + .get() + .expect("failed to init metric container") + }); + + let key = metrics_core::Key::from_name_and_labels( + "k8s_state_ops", + vec![metrics_core::Label::new("op_kind", op_kind)], + ); + controller + .snapshot() + .into_measurements() + .into_iter() + .find_map(|(candidate_key, measurement)| { + if candidate_key == key { + Some(measurement) + } else { + None + } + }) + } + + fn assert_counter_incremented( + before: Option, + after: Option, + ) { + let before = before.unwrap_or_else(|| metrics_runtime::Measurement::Counter(0)); + let after = after.expect("after value was None"); + + let (before, after) = match (before, after) { + ( + metrics_runtime::Measurement::Counter(before), + metrics_runtime::Measurement::Counter(after), + ) => (before, after), + _ => panic!("metrics kind mismatch"), + }; + + let difference = after - before; + + assert_eq!(difference, 1); + } + + #[test] + fn add() { + test_util::trace_init(); + test_util::block_on_std(async { + let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); + + let pod = make_pod(); + + let join = { + let pod = pod.clone(); + let before = get_metric_value("item_added"); + tokio::spawn(async move { + assert_eq!( + events_rx.next().await.unwrap().unwrap_op(), + (pod, mock::OpKind::Add) + ); + + // By now metrics should've updated. + let after = get_metric_value("item_added"); + assert_counter_incremented(before, after); + + actions_tx.send(()).await.unwrap(); + }) + }; + + writer.add(pod).await; + join.await.unwrap(); + }) + } + + #[test] + fn update() { + test_util::trace_init(); + test_util::block_on_std(async { + let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); + + let pod = make_pod(); + + let join = { + let pod = pod.clone(); + let before = get_metric_value("item_updated"); + tokio::spawn(async move { + assert_eq!( + events_rx.next().await.unwrap().unwrap_op(), + (pod, mock::OpKind::Update) + ); + + // By now metrics should've updated. + let after = get_metric_value("item_updated"); + assert_counter_incremented(before, after); + + actions_tx.send(()).await.unwrap(); + }) + }; + + writer.update(pod).await; + join.await.unwrap(); + }) + } + + #[test] + fn delete() { + test_util::trace_init(); + test_util::block_on_std(async { + let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); + + let pod = make_pod(); + + let join = { + let pod = pod.clone(); + let before = get_metric_value("item_deleted"); + tokio::spawn(async move { + assert_eq!( + events_rx.next().await.unwrap().unwrap_op(), + (pod, mock::OpKind::Delete) + ); + + // By now metrics should've updated. + let after = get_metric_value("item_deleted"); + assert_counter_incremented(before, after); + + actions_tx.send(()).await.unwrap(); + }) + }; + + writer.delete(pod).await; + join.await.unwrap(); + }) + } + + #[test] + fn resync() { + test_util::trace_init(); + test_util::block_on_std(async { + let (mut writer, mut events_rx, mut actions_tx) = prepare_test(); + + let join = { + let before = get_metric_value("resynced"); + tokio::spawn(async move { + assert!(matches!( + events_rx.next().await.unwrap(), + mock::ScenarioEvent::Resync + )); + + let after = get_metric_value("resynced"); + assert_counter_incremented(before, after); + + actions_tx.send(()).await.unwrap(); + }) + }; + + writer.resync().await; + join.await.unwrap(); + }) + } +} diff --git a/src/kubernetes/state/mod.rs b/src/kubernetes/state/mod.rs index 2c713f422d39c8..05dc96ff1094fa 100644 --- a/src/kubernetes/state/mod.rs +++ b/src/kubernetes/state/mod.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use k8s_openapi::{apimachinery::pkg::apis::meta::v1::ObjectMeta, Metadata}; pub mod evmap; +pub mod instrumenting; pub mod mock; /// Provides the interface for write access to the cached state. diff --git a/src/kubernetes/stream.rs b/src/kubernetes/stream.rs index 5e5f020990d3d8..3e91de631c60fc 100644 --- a/src/kubernetes/stream.rs +++ b/src/kubernetes/stream.rs @@ -1,6 +1,7 @@ //! Work with HTTP bodies as streams of Kubernetes resources. use super::multi_response_decoder::MultiResponseDecoder; +use crate::internal_events::kubernetes::stream as internal_events; use async_stream::try_stream; use bytes05::Buf; use futures::pin_mut; @@ -27,6 +28,7 @@ where let mut buf = buf.context(Reading)?; let chunk = buf.to_bytes(); let responses = decoder.process_next_chunk(chunk.as_ref()); + emit!(internal_events::ChunkProcessed{ byte_size: chunk.len() }); for response in responses { let response = response.context(Parsing)?; yield response; diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 843e8837b5b7ed..0381ae27219402 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -158,8 +158,10 @@ impl Source { let label_selector = "vector.dev/exclude!=true".to_owned(); let watcher = k8s::api_watcher::ApiWatcher::new(client, Pod::watch_pod_for_all_namespaces); + let watcher = k8s::instrumenting_watcher::InstrumentingWatcher::new(watcher); let (state_reader, state_writer) = evmap::new(); let state_writer = k8s::state::evmap::Writer::new(state_writer); + let state_writer = k8s::state::instrumenting::Writer::new(state_writer); let mut reflector = k8s::reflector::Reflector::new( watcher,