From e7dff8657c999ed380d5114313645b9771e68b94 Mon Sep 17 00:00:00 2001 From: Phil Ciampini Date: Thu, 12 Mar 2026 15:20:30 -0400 Subject: [PATCH 1/2] feat(k8s): add kubernetes metrics source port of mezmo agent module that allows for the scraping of kubernetes metrics-server api and the format created is specific to the format mezmo expects for the kubernetes enrichment feature ref: LOG-23416 --- Cargo.lock | 1 + Cargo.toml | 3 + .../kube_stats/cluster_stats.rs | 78 +++ .../kube_stats/container_stats.rs | 367 ++++++++++ .../kube_stats/controller_stats.rs | 49 ++ .../kube_stats/extended_pod_stats.rs | 27 + .../kube_stats/helpers.rs | 200 ++++++ .../kube_stats/mod.rs | 7 + .../kube_stats/node_stats.rs | 636 ++++++++++++++++++ .../kube_stats/pod_stats.rs | 248 +++++++ src/sources/mezmo_kubernetes_metrics/mod.rs | 609 +++++++++++++++++ src/sources/mod.rs | 2 + 12 files changed, 2227 insertions(+) create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/cluster_stats.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/container_stats.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/controller_stats.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/extended_pod_stats.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/helpers.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/mod.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/node_stats.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/kube_stats/pod_stats.rs create mode 100644 src/sources/mezmo_kubernetes_metrics/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1b18e690f..2e940a2aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13275,6 +13275,7 @@ checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" name = "vector" version = "0.52.0" dependencies = [ + "anyhow", "apache-avro 0.16.0", "approx", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index 5eae41a8f..f44167115 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -211,6 +211,7 @@ mock_instant = { version = "0.6" } serial_test = { version = "3.2" } [dependencies] +anyhow.workspace = true cfg-if.workspace = true clap.workspace = true indoc.workspace = true @@ -680,6 +681,7 @@ sources-logs-mezmo = [ "sources-fluent", "sources-http_server", "sources-http_client", + "sources-mezmo_kubernetes_metrics", "sources-mezmo_pipeline_state_variable_change", "sources-mezmo_user_logs", "sources-kafka", @@ -734,6 +736,7 @@ sources-host_metrics = ["heim/cpu", "heim/host", "heim/memory", "heim/net"] sources-http_client = ["sources-utils-http-client"] sources-http_server = ["sources-utils-http", "sources-utils-http-headers", "sources-utils-http-query"] sources-internal_logs = [] +sources-mezmo_kubernetes_metrics = ["kubernetes"] sources-mezmo_pipeline_state_variable_change = [] sources-mezmo_user_logs = ["sources-internal_logs"] sources-internal_metrics = [] diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/cluster_stats.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/cluster_stats.rs new file mode 100644 index 000000000..af6f9f0ab --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/cluster_stats.rs @@ -0,0 +1,78 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct ClusterStats { + pub resource: String, + pub r#type: String, + pub containers_init: u32, + pub containers_ready: u32, + pub containers_running: u32, + pub containers_terminated: u32, + pub containers_total: u32, + pub containers_waiting: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_allocatable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_capacity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_usage: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_allocatable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_capacity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_usage: Option, + pub nodes_notready: u32, + pub nodes_ready: u32, + pub nodes_total: u32, + pub nodes_unschedulable: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub pods_allocatable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pods_capacity: Option, + pub pods_failed: u32, + pub pods_pending: u32, + pub pods_running: u32, + pub pods_succeeded: u32, + pub pods_unknown: u32, + pub pods_total: u32, +} + +impl Default for ClusterStats { + fn default() -> Self { + Self::new() + } +} + +impl ClusterStats { + pub fn new() -> ClusterStats { + ClusterStats { + containers_init: 0, + containers_ready: 0, + containers_running: 0, + containers_terminated: 0, + containers_total: 0, + containers_waiting: 0, + cpu_allocatable: None, + cpu_capacity: None, + cpu_usage: None, + memory_allocatable: None, + memory_capacity: None, + memory_usage: None, + nodes_notready: 0, + nodes_ready: 0, + nodes_total: 0, + nodes_unschedulable: 0, + pods_allocatable: None, + pods_capacity: None, + pods_failed: 0, + pods_pending: 0, + pods_running: 0, + pods_succeeded: 0, + pods_unknown: 0, + pods_total: 0, + resource: "cluster".to_string(), + r#type: "metric".to_string(), + } + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/container_stats.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/container_stats.rs new file mode 100644 index 000000000..b62cd8ad9 --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/container_stats.rs @@ -0,0 +1,367 @@ +use chrono::Utc; +use k8s_openapi::api::core::v1::{Container, ContainerState, ContainerStatus}; +use serde::{Deserialize, Serialize}; + +use super::helpers::{convert_cpu_usage_to_milli, convert_memory_usage_to_bytes}; + +#[derive(Serialize, Deserialize)] +pub struct ContainerStats { + pub container_age: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub container: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_request: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_usage: Option, + #[serde(skip_serializing_if = "String::is_empty")] + pub image_tag: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub image: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_finished: Option, + #[serde(skip_serializing_if = "String::is_empty")] + pub last_reason: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_started: Option, + #[serde(skip_serializing_if = "String::is_empty")] + pub last_state: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_request: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_usage: Option, + pub ready: bool, + pub restarts: i32, + pub started: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub state: String, +} + +impl ContainerStats { + pub fn new( + c: &Container, + c_status: &ContainerStatus, + c_state: &ContainerState, + raw_cpu_usage: &str, + raw_memory_usage: &str, + ) -> Self { + let container = c.name.clone(); + let memory_usage = convert_memory_usage_to_bytes(raw_memory_usage); + let cpu_usage = convert_cpu_usage_to_milli(raw_cpu_usage); + + let mut image = String::new(); + let mut image_tag = String::new(); + let mut last_state = String::new(); + let mut last_reason = String::new(); + + let mut cpu_limit = None; + let mut cpu_request = None; + let mut memory_limit = None; + let mut memory_request = None; + let mut last_started = None; + let mut last_finished = None; + + let mut container_age = 0i64; + let mut started = 0i64; + + let restarts = c_status.restart_count; + let ready = c_status.ready; + + if let Some(container_image) = c.image.clone() { + if let Some((name, tag)) = container_image.split_once(':') { + image = name.to_string(); + image_tag = tag.to_string(); + } else { + image = container_image; + } + } + + let state = if let Some(running) = c_state.running.as_ref() { + if let Some(started_at) = running.started_at.as_ref().map(|s| s.0) { + container_age = Utc::now() + .signed_duration_since(started_at) + .num_milliseconds(); + started = started_at.timestamp_millis(); + } + "Running".to_string() + } else if c_state.terminated.is_some() { + "Terminated".to_string() + } else { + "Waiting".to_string() + }; + + if let Some(last_status_state) = c_status.last_state.as_ref() { + if last_status_state.waiting.is_some() { + last_state = "Waiting".to_string(); + } + + if let Some(l) = last_status_state.running.as_ref() { + last_state = "Running".to_string(); + if let Some(s) = l.started_at.as_ref() { + last_started = Some(s.0.timestamp_millis()); + } + } + + if let Some(l) = last_status_state.terminated.as_ref() { + last_state = "Terminated".to_string(); + if let Some(s) = l.started_at.as_ref() { + last_started = Some(s.0.timestamp_millis()); + } + if let Some(f) = l.finished_at.as_ref() { + last_finished = Some(f.0.timestamp_millis()); + } + if let Some(r) = l.reason.as_ref() { + last_reason = r.clone(); + } + } + } + + if last_state == state || last_state.is_empty() { + last_state = String::new(); + last_reason = String::new(); + last_finished = None; + last_started = None; + } + + if let Some(resources) = c.resources.as_ref() { + if let Some(limits) = resources.limits.as_ref() { + cpu_limit = limits + .get("cpu") + .and_then(|q| convert_cpu_usage_to_milli(q.0.as_str())); + + memory_limit = limits + .get("memory") + .and_then(|q| convert_memory_usage_to_bytes(q.0.as_str())); + } + + if let Some(requests) = resources.requests.as_ref() { + cpu_request = requests + .get("cpu") + .and_then(|q| convert_cpu_usage_to_milli(q.0.as_str())); + + memory_request = requests + .get("memory") + .and_then(|q| convert_memory_usage_to_bytes(q.0.as_str())); + } + } + + ContainerStats { + container_age, + container, + cpu_limit, + cpu_request, + cpu_usage, + image_tag, + image, + last_finished, + last_reason, + last_started, + last_state, + memory_limit, + memory_request, + memory_usage, + ready, + restarts, + started, + state, + } + } +} + +#[cfg(test)] +mod tests { + use chrono::Utc; + use k8s_openapi::{ + api::core::v1::{ + ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, + ResourceRequirements, + }, + apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::Time}, + }; + use std::collections::BTreeMap; + + use super::*; + + #[tokio::test] + async fn test_create_running_container_stats() { + let resource = create_resource_default(); + let state = create_state("running".to_string()); + let status = create_status(None); + let container = create_container(resource); + + let result = ContainerStats::new(&container, &status, &state, "1", "1"); + + assert_eq!(result.image, "test-image".to_string()); + assert_eq!(result.image_tag, "1234:1234".to_string()); + assert_eq!(result.memory_usage.unwrap(), 1); + assert_eq!(result.cpu_usage.unwrap(), 1000); + assert_eq!(result.state, "Running".to_string()); + assert_eq!(result.cpu_limit.unwrap(), 123000); + assert_eq!(result.cpu_request.unwrap(), 123000); + assert_eq!(result.memory_limit.unwrap(), 123); + assert_eq!(result.memory_request.unwrap(), 123); + assert_eq!(result.restarts, 0); + assert!(result.ready); + assert_eq!(result.last_finished, None); + assert_eq!(result.last_started, None); + assert_eq!(result.last_reason, String::from("")); + assert_eq!(result.last_state, String::from("")); + assert_eq!(result.state, "Running".to_string()); + } + + #[tokio::test] + async fn test_create_running_prev_waiting_container_stats() { + let resource = create_resource_default(); + let state = create_state("running".to_string()); + let prev_state = create_state("waiting".to_string()); + let status = create_status(Some(prev_state)); + let container = create_container(resource); + + let result = ContainerStats::new(&container, &status, &state, "1", "1"); + + assert_eq!(result.state, "Running".to_string()); + assert_eq!(result.last_state, "Waiting".to_string()); + assert_eq!(result.last_finished, None); + } + + #[tokio::test] + async fn test_create_running_prev_terminated_container_stats() { + let resource = create_resource_default(); + let state = create_state("running".to_string()); + let prev_state = create_state("terminated".to_string()); + let status = create_status(Some(prev_state)); + let container = create_container(resource); + + let result = ContainerStats::new(&container, &status, &state, "1", "1"); + + assert_eq!(result.state, "Running".to_string()); + assert_eq!(result.last_state, "Terminated".to_string()); + assert!(result.last_finished.is_some()); + assert!(result.last_started.is_some()); + } + + #[tokio::test] + async fn test_bad_limits_bad_requests_container_stats() { + let resource = create_resource_bad(); + let state = create_state("running".to_string()); + let prev_state = create_state("terminated".to_string()); + let status = create_status(Some(prev_state)); + let container = create_container(resource); + + let result = ContainerStats::new(&container, &status, &state, "1", "1"); + + assert_eq!(result.cpu_limit, None); + assert_eq!(result.cpu_request, None); + assert_eq!(result.memory_limit, None); + assert_eq!(result.memory_request, None); + } + + fn create_state(state: String) -> ContainerState { + let mut running_state = None; + let mut terminated_state = None; + let mut waiting_state = None; + + if state.eq(&"running".to_string()) { + running_state = Some(ContainerStateRunning { + started_at: Some(Time(Utc::now())), + }) + } else if state.eq(&"terminated".to_string()) { + terminated_state = Some(ContainerStateTerminated { + container_id: None, + exit_code: 0, + finished_at: Some(Time(Utc::now())), + message: Some("message".to_string()), + reason: Some("reason".to_string()), + signal: None, + started_at: Some(Time(Utc::now())), + }) + } else if state.eq(&"waiting".to_string()) { + waiting_state = Some(ContainerStateWaiting { + message: Some("reason".to_string()), + reason: None, + }) + } + + ContainerState { + running: running_state, + terminated: terminated_state, + waiting: waiting_state, + } + } + + fn create_status(prev_state: Option) -> ContainerStatus { + ContainerStatus { + container_id: Some("container".to_string()), + image: "image".to_string(), + image_id: "image".to_string(), + last_state: prev_state, + name: "container_name".to_string(), + ready: true, + restart_count: 0, + started: None, + state: None, + } + } + + fn create_container(resource: ResourceRequirements) -> Container { + Container { + args: None, + command: None, + env: None, + env_from: None, + image: Some("test-image:1234:1234".to_string()), + image_pull_policy: Some("test-sometimes:1234:1234".to_string()), + lifecycle: None, + liveness_probe: None, + name: "container-name".to_string(), + ports: None, + readiness_probe: None, + resources: Some(resource), + security_context: None, + startup_probe: None, + stdin: None, + stdin_once: None, + termination_message_path: None, + termination_message_policy: None, + tty: None, + volume_devices: None, + volume_mounts: None, + working_dir: None, + } + } + + fn create_resource_default() -> ResourceRequirements { + let mut b_tree_limits: BTreeMap = BTreeMap::new(); + b_tree_limits.insert("cpu".to_string(), Quantity("123".to_string())); + b_tree_limits.insert("memory".to_string(), Quantity("123".to_string())); + + let mut b_tree_requests = BTreeMap::new(); + b_tree_requests.insert("cpu".to_string(), Quantity("123".to_string())); + b_tree_requests.insert("memory".to_string(), Quantity("123".to_string())); + + ResourceRequirements { + claims: None, + limits: Some(b_tree_limits), + requests: Some(b_tree_requests), + } + } + + fn create_resource_bad() -> ResourceRequirements { + let mut b_tree_limits: BTreeMap = BTreeMap::new(); + b_tree_limits.insert("cpu".to_string(), Quantity("not a limit".to_string())); + b_tree_limits.insert("memory".to_string(), Quantity("not a limit".to_string())); + + let mut b_tree_requests = BTreeMap::new(); + b_tree_requests.insert("cpu".to_string(), Quantity("not a limit".to_string())); + b_tree_requests.insert("memory".to_string(), Quantity("not a limit".to_string())); + + ResourceRequirements { + claims: None, + limits: Some(b_tree_limits), + requests: Some(b_tree_requests), + } + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/controller_stats.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/controller_stats.rs new file mode 100644 index 000000000..d583d71eb --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/controller_stats.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct ControllerStats { + pub controller_containers_ready: u32, + pub controller_containers_total: u32, + pub controller_pods_ready: u32, + pub controller_pods_total: u32, +} + +impl Default for ControllerStats { + fn default() -> Self { + Self::new() + } +} + +impl ControllerStats { + pub const fn new() -> Self { + Self { + controller_containers_ready: 0, + controller_containers_total: 0, + controller_pods_ready: 0, + controller_pods_total: 0, + } + } + + pub const fn inc_containers_ready(&mut self) { + self.controller_containers_ready += 1; + } + + pub const fn inc_containers_total(&mut self) { + self.controller_containers_total += 1; + } + + pub const fn inc_pods_ready(&mut self) { + self.controller_pods_ready += 1; + } + + pub const fn inc_pods_total(&mut self) { + self.controller_pods_total += 1; + } + + pub const fn copy_stats(&mut self, value: &ControllerStats) { + self.controller_containers_ready = value.controller_containers_ready; + self.controller_containers_total = value.controller_containers_total; + self.controller_pods_ready = value.controller_pods_ready; + self.controller_pods_total = value.controller_pods_total; + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/extended_pod_stats.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/extended_pod_stats.rs new file mode 100644 index 000000000..e058d891c --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/extended_pod_stats.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Serialize}; + +use super::{ + container_stats::ContainerStats, controller_stats::ControllerStats, pod_stats::PodStats, +}; + +#[derive(Serialize, Deserialize)] +pub struct ExtendedPodStats { + #[serde(flatten)] + pub pod_stats: PodStats, + + #[serde(flatten)] + pub container_stats: ContainerStats, + + #[serde(flatten)] + pub controller_stats: ControllerStats, +} + +impl ExtendedPodStats { + pub const fn new(p_s: PodStats, c_s: ContainerStats) -> Self { + Self { + pod_stats: p_s, + container_stats: c_s, + controller_stats: ControllerStats::new(), + } + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/helpers.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/helpers.rs new file mode 100644 index 000000000..32ca7a5c9 --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/helpers.rs @@ -0,0 +1,200 @@ +use tracing::error; + +pub fn convert_cpu_usage_to_milli(cpu: &str) -> Option { + if cpu.is_empty() { + return None; + } + + let value: String = cpu.chars().filter(|c| c.is_ascii_digit()).collect(); + let unit: String = cpu.chars().filter(|c| c.is_alphabetic()).collect(); + + if value.is_empty() { + return None; + } + + let parsed_value: f64 = value.parse().unwrap_or(0f64); + let mut denominator = 1000000.0; + + if parsed_value < 1.0 || unit.is_empty() { + return Some((parsed_value * 1000.0).ceil() as u32); + } + + match unit.as_str() { + "m" => { + denominator = 1.0; + } + "u" => { + denominator = 1000.0; + } + "n" => {} + &_ => { + error!("Unknown CPU unit"); + return None; + } + } + + Some((parsed_value / denominator).ceil() as u32) +} +pub fn convert_memory_usage_to_bytes(memory: &str) -> Option { + if memory.is_empty() { + return None; + } + + let value: String = memory.chars().filter(|c| c.is_ascii_digit()).collect(); + let mut unit: String = memory.chars().filter(|c| c.is_alphabetic()).collect(); + + unit = unit.to_lowercase(); + + if value.is_empty() { + return None; + } + + let parsed_value: u64 = value.parse().unwrap_or(0u64); + let mut multiplier: u64 = 1024; + + match unit.as_str() { + "" => { + multiplier = 1; + } + "ki" => {} + "mi" => { + multiplier = multiplier.pow(2); + } + "gi" => { + multiplier = multiplier.pow(3); + } + "ti" => { + multiplier = multiplier.pow(4); + } + "k" => { + multiplier = 1000; + } + "m" => { + multiplier = 1000000; + } + "g" => { + multiplier = 1000u64.pow(3); + } + &_ => {} + } + + Some(parsed_value * multiplier) +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[tokio::test] + async fn test_cpu_empty() { + let result = convert_cpu_usage_to_milli(""); + + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_cpu_0() { + let result = convert_cpu_usage_to_milli("0u"); + + assert_eq!(result.unwrap(), 0); + } + + #[tokio::test] + async fn test_cpu_unit_empty() { + let result = convert_cpu_usage_to_milli("100"); + + assert_eq!(result.unwrap(), 100000); + } + + #[tokio::test] + async fn test_unknown_cpu_unit() { + let result = convert_cpu_usage_to_milli("100rrr"); + + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_cpu_m() { + let result = convert_cpu_usage_to_milli("100m"); + + assert_eq!(result.unwrap(), 100); + } + + #[tokio::test] + async fn test_cpu_u() { + let result = convert_cpu_usage_to_milli("1000u"); + + assert_eq!(result.unwrap(), 1); + } + + #[tokio::test] + async fn test_less_than_1_converted_cpu_u() { + let result = convert_cpu_usage_to_milli("10u"); + + assert_eq!(result.unwrap(), 1); + } + + #[tokio::test] + async fn test_memory_empty() { + let result = convert_memory_usage_to_bytes(""); + + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_memory_empty_but_unit() { + let result = convert_memory_usage_to_bytes("ki"); + + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_memory_no_unit() { + let result = convert_memory_usage_to_bytes("1000"); + + assert_eq!(result.unwrap(), 1000); + } + + #[tokio::test] + async fn test_memory_mi() { + let result = convert_memory_usage_to_bytes("1000mi"); + + assert_eq!(result.unwrap(), 1048576000); + } + + #[tokio::test] + async fn test_memory_gi() { + let result = convert_memory_usage_to_bytes("1000gi"); + + assert_eq!(result.unwrap(), 1073741824000); + } + + #[tokio::test] + async fn test_memory_ti() { + let result = convert_memory_usage_to_bytes("1000ti"); + + assert_eq!(result.unwrap(), 1099511627776000); + } + + #[tokio::test] + async fn test_memory_k() { + let result = convert_memory_usage_to_bytes("1000k"); + + assert_eq!(result.unwrap(), 1000000); + } + + #[tokio::test] + async fn test_memory_m() { + let result = convert_memory_usage_to_bytes("1000m"); + + assert_eq!(result.unwrap(), 1000000000); + } + + #[tokio::test] + async fn test_memory_g() { + let result = convert_memory_usage_to_bytes("1000g"); + + assert_eq!(result.unwrap(), 1000000000000); + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/mod.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/mod.rs new file mode 100644 index 000000000..6d3f648ed --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/mod.rs @@ -0,0 +1,7 @@ +pub mod cluster_stats; +pub mod container_stats; +pub mod controller_stats; +pub mod extended_pod_stats; +pub mod helpers; +pub mod node_stats; +pub mod pod_stats; diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/node_stats.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/node_stats.rs new file mode 100644 index 000000000..779230be2 --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/node_stats.rs @@ -0,0 +1,636 @@ +use chrono::Utc; +use k8s_openapi::api::core::v1::Node; +use serde::{Deserialize, Serialize}; + +use super::helpers::{convert_cpu_usage_to_milli, convert_memory_usage_to_bytes}; + +#[derive(Serialize, Deserialize)] +pub struct NodeStats { + pub resource: String, + pub r#type: String, + pub age: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub container_runtime_version: String, + pub containers_init: u32, + pub containers_ready: u32, + pub containers_running: u32, + pub containers_terminated: u32, + pub containers_total: u32, + pub containers_waiting: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_allocatable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_capacity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cpu_usage: Option, + pub created: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub ip_external: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub ip: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub kernel_version: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub kubelet_version: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_allocatable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_capacity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pods_allocatable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pods_capacity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_usage: Option, + #[serde(skip_serializing_if = "String::is_empty")] + pub node: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub os_image: String, + pub pods_failed: u32, + pub pods_pending: u32, + pub pods_running: u32, + pub pods_succeeded: u32, + pub pods_total: u32, + pub pods_unknown: u32, + pub ready_heartbeat_age: i64, + pub ready_heartbeat_time: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub ready_message: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub ready_status: String, + pub ready_transition_age: i64, + pub ready_transition_time: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub ready: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub unschedulable: Option, +} + +impl NodeStats { + pub fn new( + n: &Node, + n_pods: &NodePodStats, + n_containers: &NodeContainerStats, + raw_cpu_usage: &str, + raw_memory_usage: &str, + ) -> Self { + let memory_usage = convert_memory_usage_to_bytes(raw_memory_usage); + let cpu_usage = convert_cpu_usage_to_milli(raw_cpu_usage); + + let mut container_runtime_version = String::new(); + let mut ip = String::new(); + let mut ip_external = String::new(); + let mut kernel_version = String::new(); + let mut kubelet_version = String::new(); + let mut os_image = String::new(); + let mut ready_message = String::new(); + let mut ready_status = String::new(); + + let mut cpu_allocatable: Option = None; + let mut cpu_capacity: Option = None; + let mut memory_allocatable: Option = None; + let mut memory_capacity: Option = None; + let mut pods_allocatable: Option = None; + let mut pods_capacity: Option = None; + + let mut age = 0i64; + let mut created = 0i64; + let mut ready_heartbeat_age = 0i64; + let mut ready_heartbeat_time = 0i64; + let mut ready_transition_age = 0i64; + let mut ready_transition_time = 0i64; + + let mut ready: Option = None; + let mut unschedulable: Option = None; + + let containers_init = n_containers.containers_init; + let containers_ready = n_containers.containers_ready; + let containers_running = n_containers.containers_running; + let containers_terminated = n_containers.containers_terminated; + let containers_total = n_containers.containers_total; + let containers_waiting = n_containers.containers_waiting; + + let pods_failed = n_pods.pods_failed; + let pods_pending = n_pods.pods_pending; + let pods_running = n_pods.pods_running; + let pods_succeeded = n_pods.pods_succeeded; + let pods_total = n_pods.pods_total; + let pods_unknown = n_pods.pods_unknown; + + if let Some(spec) = &n.spec { + unschedulable = spec.unschedulable; + } + + if let Some(status) = &n.status { + if let Some(info) = &status.node_info { + container_runtime_version.clone_from(&info.container_runtime_version); + kernel_version.clone_from(&info.kernel_version); + kubelet_version.clone_from(&info.kubelet_version); + os_image.clone_from(&info.os_image); + } + + if let Some(allocatable) = &status.allocatable { + cpu_allocatable = allocatable + .get("cpu") + .and_then(|q| convert_cpu_usage_to_milli(q.0.as_str())); + + memory_allocatable = allocatable + .get("memory") + .and_then(|q| convert_memory_usage_to_bytes(q.0.as_str())); + + pods_allocatable = allocatable.get("pods").and_then(|q| q.0.parse().ok()); + } + + if let Some(capacity) = &status.capacity { + cpu_capacity = capacity + .get("cpu") + .and_then(|q| convert_cpu_usage_to_milli(q.0.as_str())); + + memory_capacity = capacity + .get("memory") + .and_then(|q| convert_memory_usage_to_bytes(q.0.as_str())); + + pods_capacity = capacity.get("pods").and_then(|q| q.0.parse().ok()); + } + + if let Some(addresses) = &status.addresses { + for address in addresses { + match address.type_.to_lowercase().as_str() { + "internalip" => ip.clone_from(&address.address), + "externalip" => ip_external.clone_from(&address.address), + _ => {} + } + } + } + + if let Some(conditions) = &status.conditions { + for condition in conditions { + if condition.type_.to_lowercase() != "ready" { + continue; + } + + if let Some(heartbeat) = condition.last_heartbeat_time.clone() { + ready_heartbeat_age = Utc::now() + .signed_duration_since(heartbeat.0) + .num_milliseconds(); + ready_heartbeat_time = heartbeat.0.timestamp_millis(); + ready_message + .clone_from(condition.message.as_ref().unwrap_or(&String::new())); + ready_status.clone_from(&condition.status); + } + + if let Some(transition) = condition.last_transition_time.clone() { + ready_transition_age = Utc::now() + .signed_duration_since(transition.0) + .num_milliseconds(); + ready_transition_time = transition.0.timestamp_millis(); + } + + ready = Some(condition.status.to_lowercase() == "true"); + } + } + } + + if let Some(ts) = n.metadata.creation_timestamp.clone() { + age = Utc::now().signed_duration_since(ts.0).num_milliseconds(); + created = ts.0.timestamp_millis(); + } + + let node = n.metadata.name.clone().unwrap_or_default(); + + NodeStats { + age, + container_runtime_version, + containers_init, + containers_ready, + containers_running, + containers_terminated, + containers_total, + containers_waiting, + cpu_allocatable, + cpu_capacity, + cpu_usage, + created, + ip_external, + ip, + kernel_version, + kubelet_version, + memory_allocatable, + memory_capacity, + memory_usage, + node, + os_image, + pods_failed, + pods_pending, + pods_running, + pods_succeeded, + pods_total, + pods_unknown, + ready_heartbeat_age, + ready_heartbeat_time, + ready_message, + ready_status, + ready_transition_age, + ready_transition_time, + ready, + unschedulable, + pods_allocatable, + pods_capacity, + resource: "node".to_string(), + r#type: "metric".to_string(), + } + } +} + +#[derive(Debug)] +pub struct NodeContainerStats { + pub containers_waiting: u32, + pub containers_total: u32, + pub containers_terminated: u32, + pub containers_running: u32, + pub containers_ready: u32, + pub containers_init: u32, +} + +impl Default for NodeContainerStats { + fn default() -> Self { + Self::new() + } +} + +impl NodeContainerStats { + pub const fn new() -> Self { + NodeContainerStats { + containers_waiting: 0, + containers_total: 0, + containers_terminated: 0, + containers_running: 0, + containers_ready: 0, + containers_init: 0, + } + } + + pub fn inc(&mut self, state: &str, ready: bool, init: bool) { + if init { + self.containers_init += 1; + } + + match state.to_lowercase().as_str() { + "waiting" => { + self.containers_waiting += 1; + self.containers_total += 1; + } + "terminated" => { + self.containers_terminated += 1; + self.containers_total += 1; + } + "running" => { + self.containers_running += 1; + self.containers_total += 1; + + if ready { + self.containers_ready += 1; + } + } + _ => {} + } + } +} + +#[derive(Debug)] +pub struct NodePodStats { + pub pods_failed: u32, + pub pods_pending: u32, + pub pods_running: u32, + pub pods_succeeded: u32, + pub pods_unknown: u32, + pub pods_total: u32, +} + +impl Default for NodePodStats { + fn default() -> Self { + Self::new() + } +} + +impl NodePodStats { + pub const fn new() -> Self { + NodePodStats { + pods_failed: 0, + pods_pending: 0, + pods_running: 0, + pods_succeeded: 0, + pods_unknown: 0, + pods_total: 0, + } + } + + pub fn inc(&mut self, phase: &str) { + self.pods_total += 1; + + match phase.to_lowercase().as_str() { + "failed" => { + self.pods_failed += 1; + } + "pending" => { + self.pods_pending += 1; + } + "running" => { + self.pods_running += 1; + } + "succeeded" => { + self.pods_succeeded += 1; + } + _ => { + self.pods_unknown += 1; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use chrono::Utc; + use k8s_openapi::{ + api::core::v1::{Node, NodeAddress, NodeCondition, NodeSpec, NodeStatus, NodeSystemInfo}, + apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::Time}, + }; + use kube::api::ObjectMeta; + + use super::{NodeContainerStats, NodePodStats, NodeStats}; + + #[tokio::test] + async fn test_build_node_stats() { + let allocatable = create_allocatable_default(); + let capacity = create_capacity_default(); + + let node_pod_stats = NodePodStats::new(); + let node_container_stats = NodeContainerStats::new(); + + let status = create_status(Some(capacity), Some(allocatable), true, true, true); + let node = create_node(status); + + let result = NodeStats::new(&node, &node_pod_stats, &node_container_stats, "1", "1"); + + assert_eq!(result.node, "name".to_string()); + assert_eq!(result.container_runtime_version, "version".to_string()); + assert_eq!(result.containers_init, 0); + assert_eq!(result.containers_ready, 0); + assert_eq!(result.containers_running, 0); + assert_eq!(result.containers_terminated, 0); + assert_eq!(result.containers_total, 0); + assert_eq!(result.containers_waiting, 0); + assert_eq!(result.cpu_allocatable.unwrap(), 123000); + assert_eq!(result.cpu_capacity.unwrap(), 123000); + assert_eq!(result.cpu_usage.unwrap(), 1000); + assert_eq!(result.memory_usage.unwrap(), 1); + assert_eq!(result.ip, "a2".to_string()); + assert_eq!(result.ip_external, "a1".to_string()); + assert_eq!(result.kernel_version, "kernel".to_string()); + assert_eq!(result.kubelet_version, "kubelet".to_string()); + assert_eq!(result.memory_allocatable.unwrap(), 123); + assert_eq!(result.memory_capacity.unwrap(), 123); + assert_eq!(result.os_image, "os_image".to_string()); + assert_eq!(result.pods_allocatable.unwrap(), 123); + assert_eq!(result.pods_capacity.unwrap(), 123); + assert_eq!(result.pods_failed, 0); + assert_eq!(result.pods_pending, 0); + assert_eq!(result.pods_running, 0); + assert_eq!(result.pods_succeeded, 0); + assert_eq!(result.pods_total, 0); + assert_eq!(result.pods_unknown, 0); + assert_eq!(result.ready, Some(true)); + assert_eq!(result.ready_status, "true".to_string()); + assert_eq!(result.ready_message, "message".to_string()); + } + + #[tokio::test] + async fn test_no_addresses() { + let allocatable = create_allocatable_default(); + let capacity = create_capacity_default(); + + let node_pod_stats = NodePodStats::new(); + let node_container_stats = NodeContainerStats::new(); + + let status = create_status(Some(capacity), Some(allocatable), false, true, true); + let node = create_node(status); + + let result = NodeStats::new(&node, &node_pod_stats, &node_container_stats, "1", "1"); + + assert_eq!(result.node, "name".to_string()); + assert_eq!(result.ip, "".to_string()); + assert_eq!(result.ip_external, "".to_string()); + } + + #[tokio::test] + async fn test_no_conditions() { + let allocatable = create_allocatable_default(); + let capacity = create_capacity_default(); + + let node_pod_stats = NodePodStats::new(); + let node_container_stats = NodeContainerStats::new(); + + let status = create_status(Some(capacity), Some(allocatable), true, false, true); + let node = create_node(status); + + let result = NodeStats::new(&node, &node_pod_stats, &node_container_stats, "1", "1"); + + assert_eq!(result.node, "name".to_string()); + assert_eq!(result.ready_status, "".to_string()); + } + + #[tokio::test] + async fn test_no_node_info() { + let allocatable = create_allocatable_default(); + let capacity = create_capacity_default(); + + let node_pod_stats = NodePodStats::new(); + let node_container_stats = NodeContainerStats::new(); + + let status = create_status(Some(capacity), Some(allocatable), true, true, false); + let node = create_node(status); + + let result = NodeStats::new(&node, &node_pod_stats, &node_container_stats, "1", "1"); + + assert_eq!(result.node, "name".to_string()); + assert_eq!(result.os_image, "".to_string()); + } + + #[tokio::test] + async fn test_bad_capacity() { + let allocatable = create_allocatable_default(); + let capacity = create_capacity_bad(); + + let node_pod_stats = NodePodStats::new(); + let node_container_stats = NodeContainerStats::new(); + + let status = create_status(Some(capacity), Some(allocatable), true, true, false); + let node = create_node(status); + + let result = NodeStats::new(&node, &node_pod_stats, &node_container_stats, "1", "1"); + + assert_eq!(result.cpu_capacity, None); + } + + #[tokio::test] + async fn test_bad_allocatable() { + let allocatable = create_allocatable_bad(); + let capacity = create_capacity_default(); + + let node_pod_stats = NodePodStats::new(); + let node_container_stats = NodeContainerStats::new(); + + let status = create_status(Some(capacity), Some(allocatable), true, true, false); + let node = create_node(status); + + let result = NodeStats::new(&node, &node_pod_stats, &node_container_stats, "1", "1"); + + assert_eq!(result.cpu_allocatable, None); + } + + fn create_node(status: Option) -> Node { + let spec = create_spec(); + + let meta = ObjectMeta { + annotations: None, + creation_timestamp: Some(Time(Utc::now())), + deletion_grace_period_seconds: None, + deletion_timestamp: None, + finalizers: None, + generate_name: None, + generation: None, + labels: None, + managed_fields: None, + name: Some("name".to_string()), + namespace: Some("namespace".to_string()), + owner_references: None, + resource_version: None, + self_link: None, + uid: None, + }; + + Node { + metadata: meta, + spec, + status, + } + } + + fn create_spec() -> Option { + Some(NodeSpec { + config_source: None, + external_id: None, + pod_cidr: None, + pod_cidrs: None, + provider_id: None, + taints: None, + unschedulable: Some(true), + }) + } + + fn create_status( + capacity: Option>, + allocatable: Option>, + populate_addresses: bool, + populate_conditions: bool, + populate_node_info: bool, + ) -> Option { + let mut address = None; + let mut conditions = None; + let mut node_info = None; + + if populate_addresses { + let address_vec = vec![ + NodeAddress { + address: "a1".to_string(), + type_: "externalip".to_string(), + }, + NodeAddress { + address: "a2".to_string(), + type_: "internalip".to_string(), + }, + ]; + address = Some(address_vec); + } + + if populate_conditions { + let conditions_vec = vec![NodeCondition { + last_heartbeat_time: Some(Time(Utc::now())), + last_transition_time: Some(Time(Utc::now())), + message: Some("message".to_string()), + reason: Some("reason".to_string()), + status: "true".to_string(), + type_: "ready".to_string(), + }]; + + conditions = Some(conditions_vec) + } + + if populate_node_info { + node_info = Some(NodeSystemInfo { + architecture: "arch".to_string(), + boot_id: "boot".to_string(), + container_runtime_version: "version".to_string(), + kernel_version: "kernel".to_string(), + kube_proxy_version: "proxy".to_string(), + kubelet_version: "kubelet".to_string(), + machine_id: "id".to_string(), + operating_system: "opsys".to_string(), + os_image: "os_image".to_string(), + system_uuid: "sysid".to_string(), + }); + } + + Some(NodeStatus { + addresses: address, + allocatable, + capacity, + conditions, + config: None, + daemon_endpoints: None, + images: None, + node_info, + phase: Some("phase".to_string()), + volumes_attached: None, + volumes_in_use: None, + }) + } + + fn create_allocatable_default() -> BTreeMap { + let mut allocatable: BTreeMap = BTreeMap::new(); + allocatable.insert("cpu".to_string(), Quantity("123".to_string())); + allocatable.insert("memory".to_string(), Quantity("123".to_string())); + allocatable.insert("pods".to_string(), Quantity("123".to_string())); + + allocatable + } + + fn create_allocatable_bad() -> BTreeMap { + let mut allocatable: BTreeMap = BTreeMap::new(); + allocatable.insert("cpu".to_string(), Quantity("ab".to_string())); + allocatable.insert("memory".to_string(), Quantity("ab".to_string())); + allocatable.insert("pods".to_string(), Quantity("ab".to_string())); + + allocatable + } + + fn create_capacity_default() -> BTreeMap { + let mut capacity: BTreeMap = BTreeMap::new(); + capacity.insert("cpu".to_string(), Quantity("123".to_string())); + capacity.insert("memory".to_string(), Quantity("123".to_string())); + capacity.insert("pods".to_string(), Quantity("123".to_string())); + + capacity + } + + fn create_capacity_bad() -> BTreeMap { + let mut capacity: BTreeMap = BTreeMap::new(); + capacity.insert("cpu".to_string(), Quantity("ab".to_string())); + capacity.insert("memory".to_string(), Quantity("ab".to_string())); + capacity.insert("pods".to_string(), Quantity("ab".to_string())); + + capacity + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/kube_stats/pod_stats.rs b/src/sources/mezmo_kubernetes_metrics/kube_stats/pod_stats.rs new file mode 100644 index 000000000..d0e849b06 --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/kube_stats/pod_stats.rs @@ -0,0 +1,248 @@ +use chrono::Utc; +use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::OwnerReference}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct PodStats { + pub resource: String, + pub r#type: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub controller: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub controller_type: String, + pub created: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub ip: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub namespace: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub node: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub phase: String, + pub pod_age: i64, + #[serde(skip_serializing_if = "String::is_empty")] + pub pod: String, + #[serde(skip_serializing_if = "String::is_empty")] + pub priority_class: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub priority: Option, + #[serde(skip_serializing_if = "String::is_empty")] + pub qos_class: String, +} + +impl From<&Pod> for PodStats { + fn from(p: &Pod) -> Self { + let (controller, controller_type) = get_controller_details(&p.metadata.owner_references); + + let namespace = p.metadata.namespace.clone().unwrap_or_default(); + let pod = p.metadata.name.clone().unwrap_or_default(); + + let mut priority_class = String::new(); + let mut node = String::new(); + let mut ip = String::new(); + let mut phase = String::new(); + let mut qos_class = String::new(); + let mut pod_age = 0i64; + let mut created = 0i64; + let mut priority = None; + + if let Some(spec) = &p.spec { + priority = spec.priority; + + if let Some(name) = &spec.priority_class_name { + priority_class.clone_from(name); + } + + if let Some(name) = &spec.node_name { + node.clone_from(name); + } + } + + if let Some(status) = &p.status { + if let Some(pod_created) = status.start_time.clone() { + pod_age = Utc::now() + .signed_duration_since(pod_created.0) + .num_milliseconds(); + created = pod_created.0.timestamp_millis(); + } + + if let Some(pod_ip) = &status.pod_ip { + ip.clone_from(pod_ip); + } + + if let Some(p) = &status.phase { + phase.clone_from(p); + } + + if let Some(qos) = &status.qos_class { + qos_class.clone_from(qos); + } + } + + PodStats { + controller, + controller_type, + created, + ip, + namespace, + node, + phase, + pod_age, + pod, + priority_class, + priority, + qos_class, + resource: "container".to_string(), + r#type: "metric".to_string(), + } + } +} + +fn get_controller_details(owners: &Option>) -> (String, String) { + if let Some(owners) = owners { + for owner in owners { + if owner.controller == Some(true) { + return (owner.name.clone(), owner.kind.clone()); + } + } + } + (String::new(), String::new()) +} + +#[cfg(test)] +mod tests { + use chrono::Utc; + use k8s_openapi::{ + api::core::v1::{Pod, PodSpec, PodStatus}, + apimachinery::pkg::apis::meta::v1::Time, + }; + use kube::api::ObjectMeta; + + use super::PodStats; + + #[tokio::test] + async fn test_create_pod() { + let spec = create_spec(); + let status = create_status(); + let pod = create_pod(Some(spec), Some(status)); + + let result = PodStats::from(&pod); + + assert_eq!(result.ip, "ip".to_string()); + assert_eq!(result.phase, "phase".to_string()); + assert_eq!(result.priority_class, "p_class".to_string()); + assert_eq!(result.node, "node_name".to_string()); + assert_eq!(result.qos_class, "class".to_string()); + assert_eq!(result.namespace, "namespace".to_string()); + assert_eq!(result.pod, "name".to_string()); + assert_eq!(result.priority.unwrap(), 222); + } + + #[tokio::test] + async fn test_create_no_spec() { + let status = create_status(); + let pod = create_pod(None, Some(status)); + + let result = PodStats::from(&pod); + + assert_eq!(result.node, "".to_string()); + } + + #[tokio::test] + async fn test_create_no_status() { + let spec = create_spec(); + let pod = create_pod(Some(spec), None); + + let result = PodStats::from(&pod); + + assert_eq!(result.phase, "".to_string()); + } + + fn create_pod(spec: Option, status: Option) -> Pod { + let meta = ObjectMeta { + annotations: None, + creation_timestamp: Some(Time(Utc::now())), + deletion_grace_period_seconds: None, + deletion_timestamp: None, + finalizers: None, + generate_name: None, + generation: None, + labels: None, + managed_fields: None, + name: Some("name".to_string()), + namespace: Some("namespace".to_string()), + owner_references: None, + resource_version: None, + self_link: None, + uid: None, + }; + + Pod { + metadata: meta, + spec, + status, + } + } + + fn create_spec() -> PodSpec { + PodSpec { + active_deadline_seconds: None, + affinity: None, + automount_service_account_token: None, + containers: Vec::new(), + dns_config: None, + dns_policy: None, + enable_service_links: None, + ephemeral_containers: None, + host_aliases: None, + host_ipc: None, + host_network: None, + host_pid: None, + hostname: None, + image_pull_secrets: None, + init_containers: None, + node_name: Some("node_name".to_string()), + node_selector: None, + overhead: None, + preemption_policy: None, + priority: Some(222), + priority_class_name: Some("p_class".to_string()), + readiness_gates: None, + restart_policy: None, + runtime_class_name: None, + scheduler_name: None, + security_context: None, + service_account: None, + service_account_name: None, + share_process_namespace: None, + subdomain: None, + termination_grace_period_seconds: None, + tolerations: None, + topology_spread_constraints: None, + volumes: None, + set_hostname_as_fqdn: None, + host_users: None, + os: None, + resource_claims: None, + scheduling_gates: None, + } + } + + fn create_status() -> PodStatus { + PodStatus { + conditions: None, + container_statuses: None, + ephemeral_container_statuses: None, + host_ip: None, + init_container_statuses: None, + message: None, + nominated_node_name: None, + phase: Some("phase".to_string()), + pod_ip: Some("ip".to_string()), + pod_ips: None, + qos_class: Some("class".to_string()), + reason: None, + start_time: Some(Time(Utc::now())), + } + } +} diff --git a/src/sources/mezmo_kubernetes_metrics/mod.rs b/src/sources/mezmo_kubernetes_metrics/mod.rs new file mode 100644 index 000000000..4a8ccdf81 --- /dev/null +++ b/src/sources/mezmo_kubernetes_metrics/mod.rs @@ -0,0 +1,609 @@ +use std::collections::HashMap; +use std::time::Duration; + +use anyhow::Result; +use chrono::Utc; +use futures::StreamExt; +use k8s_openapi::api::core::v1::{Container, ContainerStatus, Node, Pod}; +use kube::{ + Client, + api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList}, + discovery, +}; +use serde_json::Value; +use serde_with::serde_as; +use tokio::time; +use tokio_stream::wrappers::IntervalStream; +use tracing::{error, info, trace, warn}; +use vector_lib::{ + config::{LogNamespace, log_schema}, + configurable::configurable_component, + event::{Event, LogEvent}, + schema, +}; + +use crate::{ + SourceSender, + config::{DataType, SourceConfig, SourceContext, SourceOutput}, + shutdown::ShutdownSignal, + sources::util::http_client::warn_if_interval_too_low, +}; + +use self::kube_stats::{ + cluster_stats::ClusterStats, + container_stats::ContainerStats, + controller_stats::ControllerStats, + extended_pod_stats::ExtendedPodStats, + node_stats::{NodeContainerStats, NodePodStats, NodeStats}, + pod_stats::PodStats, +}; + +mod kube_stats; + +const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME"; + +const fn default_scrape_interval() -> Duration { + Duration::from_secs(30) +} + +const fn default_scrape_timeout() -> Duration { + Duration::from_secs(10) +} + +fn default_self_node_name_env_template() -> String { + format!("${{{SELF_NODE_NAME_ENV_KEY}}}") +} + +/// Configuration for the `mezmo_kubernetes_metrics` source. +/// +/// Collects pod, node, and cluster-level metrics from the Kubernetes API and +/// the metrics-server (`metrics.k8s.io/v1beta1`), emitting one JSON log event +/// per container, per node, and one cluster-aggregate event on each collection +/// interval. +#[serde_as] +#[configurable_component(source( + "mezmo_kubernetes_metrics", + "Collect Kubernetes pod, node, and cluster metrics via the Kubernetes API." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields, default)] +pub struct MezmoKubernetesMetricsConfig { + /// The interval between metric collection runs. + /// + /// Defaults to 30 seconds. + #[serde(default = "default_scrape_interval")] + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(rename = "scrape_interval_secs")] + pub interval: Duration, + + /// The timeout for each metric collection run. + /// + /// Defaults to 10 seconds. + #[serde(default = "default_scrape_timeout")] + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(rename = "scrape_timeout_secs")] + pub timeout: Duration, + + /// The name of the Kubernetes Node this pod is running on. + /// + /// Scopes metric collection to only the pods and node on which this + /// agent is running. In a DaemonSet deployment, inject this via the + /// Downward API: + /// + /// ```yaml + /// env: + /// - name: VECTOR_SELF_NODE_NAME + /// valueFrom: + /// fieldRef: + /// fieldPath: spec.nodeName + /// ``` + #[serde(default = "default_self_node_name_env_template")] + pub self_node_name: String, +} + +impl Default for MezmoKubernetesMetricsConfig { + fn default() -> Self { + Self { + interval: default_scrape_interval(), + timeout: default_scrape_timeout(), + self_node_name: default_self_node_name_env_template(), + } + } +} + +impl_generate_config_from_default!(MezmoKubernetesMetricsConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "mezmo_kubernetes_metrics")] +impl SourceConfig for MezmoKubernetesMetricsConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let client = Client::try_default() + .await + .map_err(|e| format!("Failed to build Kubernetes client: {e}"))?; + + let node_name = if self.self_node_name.is_empty() + || self.self_node_name == default_self_node_name_env_template() + { + std::env::var(SELF_NODE_NAME_ENV_KEY).map_err(|_| { + format!( + "self_node_name config value or {SELF_NODE_NAME_ENV_KEY} env var is not set" + ) + })? + } else { + self.self_node_name.clone() + }; + + warn_if_interval_too_low(self.timeout, self.interval); + + Ok(Box::pin(run( + client, + self.interval, + self.timeout, + node_name, + cx.out, + cx.shutdown, + ))) + } + + fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec { + vec![SourceOutput::new_maybe_logs( + DataType::Log, + schema::Definition::default_legacy_namespace(), + )] + } + + fn can_acknowledge(&self) -> bool { + false + } +} + +async fn run( + client: Client, + interval: Duration, + timeout: Duration, + node_name: String, + mut out: SourceSender, + shutdown: ShutdownSignal, +) -> Result<(), ()> { + info!(message = "Starting Kubernetes metrics collection.", %node_name); + + let mut ticker = IntervalStream::new(time::interval(interval)).take_until(shutdown); + + while ticker.next().await.is_some() { + match tokio::time::timeout(timeout, process_reporter_info(client.clone(), &node_name)).await + { + Ok(Ok((pods, nodes, cluster))) => { + let now = Utc::now(); + let events = pods + .into_iter() + .chain(nodes) + .chain(std::iter::once(cluster)) + .filter_map(|line| { + serde_json::from_str::(&line) + .ok() + .map(|v| { + let mut log = LogEvent::default(); + log.insert(log_schema().message_key_target_path().unwrap(), v); + LogNamespace::Legacy.insert_standard_vector_source_metadata( + &mut log, + MezmoKubernetesMetricsConfig::NAME, + now, + ); + Event::Log(log) + }) + }) + .collect::>(); + + if out.send_batch(events).await.is_err() { + error!( + message = + "Failed to send Kubernetes metrics; downstream may have shut down." + ); + return Err(()); + } + } + Ok(Err(e)) => error!(message = "Failed to gather Kubernetes metrics.", error = %e), + Err(_) => warn!(message = "Kubernetes metrics collection timed out."), + } + } + + Ok(()) +} + +async fn process_reporter_info( + client: Client, + node_name: &str, +) -> Result<(Vec, Vec, String)> { + trace!("Generating Kubernetes metrics report."); + + let pods = get_all_pods(client.clone(), node_name).await?; + let nodes = get_all_nodes(client.clone(), node_name).await?; + let pod_metrics = call_metric_api("PodMetrics", client.clone(), None).await?; + let node_metrics = call_metric_api( + "NodeMetrics", + client.clone(), + Some(&format!("metadata.name={node_name}")), + ) + .await?; + + let mut controller_map: HashMap = HashMap::new(); + let mut node_pod_counts_map: HashMap = HashMap::new(); + let mut node_container_counts_map: HashMap = HashMap::new(); + let mut pod_usage_map: HashMap = HashMap::new(); + let mut node_usage_map: HashMap = HashMap::new(); + let mut extended_pod_stats: Vec = Vec::new(); + let mut node_stats: Vec = Vec::new(); + + build_pod_metric_map(pod_metrics, &mut pod_usage_map); + process_pods( + pods, + &mut controller_map, + pod_usage_map, + &mut extended_pod_stats, + &mut node_pod_counts_map, + &mut node_container_counts_map, + ); + let pods_strings = format_pod_str(extended_pod_stats, controller_map); + + build_node_metric_map(node_metrics, &mut node_usage_map); + process_nodes( + nodes, + node_usage_map, + &mut node_stats, + &mut node_pod_counts_map, + &mut node_container_counts_map, + ); + + let node_strings = format_node_str(&node_stats); + let cluster_stats = build_cluster_stats(&node_stats); + let cluster_stats_string = format_cluster_str(&cluster_stats); + + Ok((pods_strings, node_strings, cluster_stats_string)) +} + +fn build_pod_metric_map( + pod_metrics: ObjectList, + pod_usage_map: &mut HashMap, +) { + for pod_metric in pod_metrics { + let pod_name = pod_metric.metadata.name.as_deref().unwrap_or(""); + let namespace = pod_metric.metadata.namespace.as_deref().unwrap_or(""); + + if let Some(containers) = pod_metric.data["containers"].as_array() { + for container in containers { + if let Some(container_name) = container["name"].as_str() { + let key = format!("{namespace}/{pod_name}/{container_name}"); + pod_usage_map.insert(key, container["usage"].clone()); + } + } + } + } +} + +fn build_node_metric_map( + node_metrics: ObjectList, + node_usage_map: &mut HashMap, +) { + for node_metric in node_metrics { + let node_name = node_metric + .metadata + .name + .unwrap_or_else(|| "NONE".to_string()); + node_usage_map.insert(node_name, node_metric.data["usage"].clone()); + } +} + +fn build_cluster_stats(node_stats: &[NodeStats]) -> ClusterStats { + macro_rules! aggregate_stat { + ($acc:ident, $node:ident, $field:ident) => { + $acc.$field = $acc.$field.map_or($node.$field, |current| { + $node.$field.map(|new| current + new) + }); + }; + } + + let mut cluster = ClusterStats::new(); + + for node in node_stats { + cluster.containers_init += node.containers_init; + cluster.containers_ready += node.containers_ready; + cluster.containers_running += node.containers_running; + cluster.containers_terminated += node.containers_terminated; + cluster.containers_total += node.containers_total; + cluster.containers_waiting += node.containers_waiting; + cluster.pods_failed += node.pods_failed; + cluster.pods_pending += node.pods_pending; + cluster.pods_running += node.pods_running; + cluster.pods_succeeded += node.pods_succeeded; + cluster.pods_total += node.pods_total; + cluster.pods_unknown += node.pods_unknown; + cluster.nodes_total += 1; + + aggregate_stat!(cluster, node, cpu_usage); + aggregate_stat!(cluster, node, memory_usage); + aggregate_stat!(cluster, node, cpu_allocatable); + aggregate_stat!(cluster, node, cpu_capacity); + aggregate_stat!(cluster, node, memory_allocatable); + aggregate_stat!(cluster, node, memory_capacity); + aggregate_stat!(cluster, node, pods_allocatable); + aggregate_stat!(cluster, node, pods_capacity); + + if node.ready.unwrap_or(false) { + cluster.nodes_ready += 1; + } else { + cluster.nodes_notready += 1; + } + + if node.unschedulable.unwrap_or(false) { + cluster.nodes_unschedulable += 1; + } + } + + cluster +} + +fn format_pod_str( + extended_pod_stats: Vec, + controller_map: HashMap, +) -> Vec { + extended_pod_stats + .into_iter() + .map(|mut stat| { + let key = format!( + "{}.{}.{}", + stat.pod_stats.namespace, stat.pod_stats.controller_type, stat.pod_stats.controller + ); + if let Some(controller) = controller_map.get(&key) { + stat.controller_stats.copy_stats(controller); + } + format!( + r#"{{"kube":{}}}"#, + serde_json::to_string(&stat).unwrap_or_default() + ) + }) + .inspect(|s| trace!("{}", s)) + .collect() +} + +fn format_node_str(nodes: &[NodeStats]) -> Vec { + nodes + .iter() + .map(|node| { + let s = format!( + r#"{{"kube":{}}}"#, + serde_json::to_string(node).unwrap_or_default() + ); + trace!("{}", s); + s + }) + .collect() +} + +fn format_cluster_str(cluster: &ClusterStats) -> String { + let s = format!( + r#"{{"kube":{}}}"#, + serde_json::to_string(cluster).unwrap_or_default() + ); + trace!("{}", s); + s +} + +fn process_pods( + pods: ObjectList, + controller_map: &mut HashMap, + pod_usage_map: HashMap, + extended_pod_stats: &mut Vec, + node_pod_counts_map: &mut HashMap, + node_container_counts_map: &mut HashMap, +) { + let empty_vec = Vec::new(); + + for pod in pods { + let (Some(status), Some(spec)) = (pod.status.as_ref(), pod.spec.as_ref()) else { + continue; + }; + + if status.conditions.is_none() || status.container_statuses.is_none() { + continue; + } + + let translated_pod = PodStats::from(&pod); + let node = translated_pod.node.clone(); + let phase = translated_pod.phase.clone(); + + node_pod_counts_map + .entry(node.clone()) + .or_default() + .inc(&phase); + + let controller_key = format!( + "{}.{}.{}", + translated_pod.namespace, translated_pod.controller_type, translated_pod.controller + ); + + let controller = controller_map.entry(controller_key.clone()).or_default(); + + if let Some(conditions) = &status.conditions + && conditions + .iter() + .any(|c| c.status.to_lowercase() == "true" && c.type_.to_lowercase() == "ready") + { + controller.inc_pods_ready(); + } + controller.inc_pods_total(); + + let mut container_status_map = HashMap::new(); + + for cs in status + .container_statuses + .as_ref() + .unwrap_or(&empty_vec) + .iter() + .chain( + status + .init_container_statuses + .as_ref() + .unwrap_or(&empty_vec) + .iter(), + ) + { + container_status_map.insert(cs.name.clone(), cs.clone()); + + let controller = controller_map.entry(controller_key.clone()).or_default(); + controller.inc_containers_total(); + if cs.ready { + controller.inc_containers_ready(); + } + } + + for container in &spec.containers { + if container.name.is_empty() + || container.image.is_none() + || container.resources.is_none() + { + continue; + } + + let container_status = container_status_map.get(&container.name); + if container_status.is_none() { + continue; + } + + if let Some(stat) = build_extended_pod_stat( + &pod_usage_map, + container, + container_status, + &translated_pod, + ) { + node_container_counts_map + .entry(node.clone()) + .or_default() + .inc( + &stat.container_stats.state, + stat.container_stats.ready, + false, + ); + extended_pod_stats.push(stat); + } + } + + let empty_containers = Vec::new(); + for init_container in spec.init_containers.as_ref().unwrap_or(&empty_containers) { + if init_container.name.is_empty() + || init_container.image.is_none() + || init_container.resources.is_none() + { + continue; + } + + let container_status = container_status_map.get(&init_container.name); + if container_status.is_none() { + continue; + } + + if let Some(stat) = build_extended_pod_stat( + &pod_usage_map, + init_container, + container_status, + &translated_pod, + ) { + node_container_counts_map + .entry(node.clone()) + .or_default() + .inc( + &stat.container_stats.state, + stat.container_stats.ready, + true, + ); + extended_pod_stats.push(stat); + } + } + } +} + +fn build_extended_pod_stat( + pod_usage_map: &HashMap, + container: &Container, + container_status: Option<&ContainerStatus>, + translated_pod: &PodStats, +) -> Option { + let key = format!( + "{}/{}/{}", + translated_pod.namespace, translated_pod.pod, container.name + ); + let usage = pod_usage_map.get(&key)?; + let c_status = container_status?; + + let translated_container = ContainerStats::new( + container, + c_status, + c_status.state.as_ref()?, + usage["cpu"].as_str().unwrap_or(""), + usage["memory"].as_str().unwrap_or(""), + ); + + Some(ExtendedPodStats::new( + translated_pod.clone(), + translated_container, + )) +} + +fn process_nodes( + nodes: ObjectList, + node_usage_map: HashMap, + output_node_vec: &mut Vec, + node_pod_counts_map: &mut HashMap, + node_container_counts_map: &mut HashMap, +) { + for node in nodes { + if node.spec.is_none() || node.status.is_none() || node.metadata.name.is_none() { + continue; + } + + let name = node.metadata.name.as_ref().unwrap(); + let default_container_stats = NodeContainerStats::new(); + let default_pod_stats = NodePodStats::new(); + + let node_container_stats = node_container_counts_map + .get(name) + .unwrap_or(&default_container_stats); + let node_pod_stats = node_pod_counts_map.get(name).unwrap_or(&default_pod_stats); + + if let Some(usage) = node_usage_map.get(name) { + output_node_vec.push(NodeStats::new( + &node, + node_pod_stats, + node_container_stats, + usage["cpu"].as_str().unwrap_or(""), + usage["memory"].as_str().unwrap_or(""), + )); + } + } +} + +async fn call_metric_api( + kind: &str, + client: Client, + field_selector: Option<&str>, +) -> Result, kube::Error> { + let gvk = GroupVersionKind::gvk("metrics.k8s.io", "v1beta1", kind); + let (ar, _caps) = discovery::pinned_kind(&client, &gvk).await?; + let api = Api::::all_with(client, &ar); + let params = match field_selector { + Some(selector) => ListParams::default().fields(selector), + None => ListParams::default(), + }; + api.list(¶ms).await +} + +async fn get_all_nodes(client: Client, node_name: &str) -> Result, kube::Error> { + Api::::all(client) + .list(&ListParams::default().fields(&format!("metadata.name={node_name}"))) + .await +} + +async fn get_all_pods(client: Client, node_name: &str) -> Result, kube::Error> { + Api::::all(client) + .list(&ListParams::default().fields(&format!("spec.nodeName={node_name}"))) + .await +} diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 617ea1de7..f4705e659 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -58,6 +58,8 @@ pub mod kubernetes_logs; pub mod logstash; #[cfg(feature = "sources-mezmo_demo_logs")] pub mod mezmo_demo_logs; +#[cfg(feature = "sources-mezmo_kubernetes_metrics")] +pub mod mezmo_kubernetes_metrics; #[cfg(feature = "sources-mezmo_pipeline_state_variable_change")] pub mod mezmo_pipeline_state_variable_change; #[cfg(feature = "sources-mezmo_user_logs")] From b6b39066fe4d6451b5f90f46d9bdb27f4f8f3c0d Mon Sep 17 00:00:00 2001 From: Phil Ciampini Date: Tue, 17 Mar 2026 09:11:03 -0400 Subject: [PATCH 2/2] fix(headers): remote task custom headers additions allow for the passing of extra headers through an env var to downstream requests for remote task executions including tap & metric reporting ref: LOG-23416 --- lib/vector-core/src/usage_metrics/mod.rs | 9 +++- src/app.rs | 8 ++++ src/mezmo/remote_task_execution/mod.rs | 56 +++++++++++++++++++++--- 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/lib/vector-core/src/usage_metrics/mod.rs b/lib/vector-core/src/usage_metrics/mod.rs index 59482b2da..c7098696b 100644 --- a/lib/vector-core/src/usage_metrics/mod.rs +++ b/lib/vector-core/src/usage_metrics/mod.rs @@ -677,12 +677,19 @@ async fn get_flusher( if let Some(endpoint_url) = endpoint_url { // Http endpoint used by Pulse let auth_token = env::var("MEZMO_LOCAL_DEPLOY_AUTH_TOKEN").ok(); - let headers = if let Some(token) = auth_token { + let mut headers = if let Some(token) = auth_token { HashMap::from([("Authorization".into(), format!("Token {token}"))]) } else { return Err(MetricsPublishingError::AuthNotSetError); }; + if let Some(extra) = env::var("MEZMO_REMOTE_TASK_EXTRA_HEADERS") + .ok() + .and_then(|v| serde_json::from_str::>(&v).ok()) + { + headers.extend(extra); + } + return Ok(Arc::new(HttpFlusher::new( &pod_name, endpoint_url, diff --git a/src/app.rs b/src/app.rs index 32943b05b..b6c358757 100644 --- a/src/app.rs +++ b/src/app.rs @@ -303,6 +303,7 @@ fn start_remote_task_execution( runtime: &Runtime, _config: &ApplicationConfig, ) -> Result<(), ExitCode> { + use serde_json; use std::env; #[cfg(feature = "api")] @@ -314,6 +315,11 @@ fn start_remote_task_execution( if let Some(auth_token) = auth_token { let get_endpoint_url = env::var("MEZMO_TASKS_FETCH_ENDPOINT_URL").ok(); let post_endpoint_url = env::var("MEZMO_TASKS_POST_ENDPOINT_URL").ok(); + let extra_headers: std::collections::HashMap = + env::var("MEZMO_REMOTE_TASK_EXTRA_HEADERS") + .ok() + .and_then(|v| serde_json::from_str(&v).ok()) + .unwrap_or_default(); match (get_endpoint_url, post_endpoint_url) { (Some(get_endpoint_url), Some(post_endpoint_url)) => { if !api_config.enabled { @@ -328,6 +334,7 @@ fn start_remote_task_execution( auth_token, get_endpoint_url, post_endpoint_url, + extra_headers, Some(1), ) .await; @@ -340,6 +347,7 @@ fn start_remote_task_execution( auth_token, get_endpoint_url, post_endpoint_url, + extra_headers, ) .await; }); diff --git a/src/mezmo/remote_task_execution/mod.rs b/src/mezmo/remote_task_execution/mod.rs index c4fe5c5c4..f31c6b939 100644 --- a/src/mezmo/remote_task_execution/mod.rs +++ b/src/mezmo/remote_task_execution/mod.rs @@ -48,6 +48,7 @@ pub(crate) async fn start_polling_for_tasks( auth_token: String, get_endpoint_url: String, post_endpoint_url: String, + extra_headers: HashMap, #[cfg(test)] max_iterations: Option, // for testing only, set to 0 for infinite loop ) { let task_initial_pool_delay = Duration::from_secs(mezmo_env_config!( @@ -87,6 +88,7 @@ pub(crate) async fn start_polling_for_tasks( &auth_token, &get_endpoint_url, &post_endpoint_url, + &extra_headers, ); if let Err(_) = tokio::time::timeout(task_execution_timeout, task_fut).await { @@ -117,8 +119,9 @@ async fn run_task_step( auth_token: &str, get_endpoint_url: &str, post_endpoint_url: &str, + extra_headers: &HashMap, ) { - let tasks = fetch_tasks(client, auth_token, get_endpoint_url) + let tasks = fetch_tasks(client, auth_token, get_endpoint_url, extra_headers) .await .unwrap_or_else(|e| { warn!("Remote task fetch failed: {e}"); @@ -141,7 +144,15 @@ async fn run_task_step( } let results = execute_task(&t, config).await; - if let Err(e) = post_task_results(client, auth_token, post_endpoint_url, &t, &results).await + if let Err(e) = post_task_results( + client, + auth_token, + post_endpoint_url, + &t, + &results, + extra_headers, + ) + .await { warn!( "There was an error when posting task results for {}: {}", @@ -209,7 +220,11 @@ impl FromStr for TaskType { } } -fn gen_headers(auth_token: &str, method: Method) -> header::HeaderMap { +fn gen_headers( + auth_token: &str, + method: Method, + extra_headers: &HashMap, +) -> header::HeaderMap { let mut headers = header::HeaderMap::new(); headers.insert(header::USER_AGENT, HeaderValue::from_static("Mezmo Pulse")); match method { @@ -228,6 +243,14 @@ fn gen_headers(auth_token: &str, method: Method) -> header::HeaderMap { header::AUTHORIZATION, HeaderValue::from_str(&format!("Token {auth_token}")).unwrap(), ); + for (k, v) in extra_headers { + if let (Ok(name), Ok(value)) = ( + header::HeaderName::from_bytes(k.as_bytes()), + HeaderValue::from_str(v), + ) { + headers.insert(name, value); + } + } headers } @@ -236,10 +259,11 @@ async fn fetch_tasks( client: &Client, auth_token: &str, endpoint_url: &str, + extra_headers: &HashMap, ) -> Result, Err> { let resp = client .get(endpoint_url) - .headers(gen_headers(auth_token, Method::GET)) + .headers(gen_headers(auth_token, Method::GET, extra_headers)) .send() .await .map_err(|e| format!("Connection error: {e}"))?; @@ -262,13 +286,14 @@ async fn post_task_results( endpoint_url: &str, task: &Task, results: &Result, + extra_headers: &HashMap, ) -> Result<(), Err> { let endpoint_url = endpoint_url.replace(":task_id", &task.task_id); let resp = client .post(&endpoint_url) .json(&results.to_json()) - .headers(gen_headers(auth_token, Method::POST)) + .headers(gen_headers(auth_token, Method::POST, extra_headers)) .send() .await .map_err(|e| format!("Connection error: {e}"))?; @@ -448,7 +473,15 @@ mod tests { let post_url = format!("http://{}{}", server.addr(), post_path); let client = Client::new(); - run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await; + run_task_step( + &Default::default(), + &client, + "token", + &get_url, + &post_url, + &HashMap::new(), + ) + .await; } #[tokio::test] @@ -498,7 +531,15 @@ mod tests { let post_url = format!("http://{}{}", server.addr(), post_path); let client = Client::new(); - run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await; + run_task_step( + &Default::default(), + &client, + "token", + &get_url, + &post_url, + &HashMap::new(), + ) + .await; } #[assay( @@ -535,6 +576,7 @@ mod tests { String::from("token"), get_url, unused_post_url, + HashMap::new(), Some(1), ) .await;