From 6fac8807364105f1dfae1182735cd0b370d1e8ce Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Sat, 20 Dec 2025 22:07:15 +0100 Subject: [PATCH 1/2] Improved cfcli monitoring latencymetrics response for better output clarity --- cli/src/monitoring.rs | 58 ++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/cli/src/monitoring.rs b/cli/src/monitoring.rs index 03480e0..506cc5f 100644 --- a/cli/src/monitoring.rs +++ b/cli/src/monitoring.rs @@ -3,16 +3,17 @@ //monitoring CLI function for identity service use anyhow::Error; use colored::Colorize; +use k8s_openapi::chrono::DateTime; use prost::Message; use prost_types::FileDescriptorProto; use std::result::Result::Ok; -use tonic_reflection::pb::v1::{ server_reflection_response::MessageResponse }; +use tonic_reflection::pb::v1::server_reflection_response::MessageResponse; -use agent_api::client::{ connect_to_client, connect_to_server_reflection }; -use agent_api::requests::{ get_all_features, send_active_connection_request }; +use agent_api::client::{connect_to_client, connect_to_server_reflection}; +use agent_api::requests::{get_all_features, send_active_connection_request}; use clap::command; -use clap::{ Args, Parser, Subcommand }; +use clap::{Args, Parser, Subcommand}; //monitoring subcommands #[derive(Subcommand, Debug, Clone)] @@ -87,14 +88,14 @@ pub async fn list_features() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), "Failed to connect to CortexFlow Server Reflection".red() ); return Err(e); - } + } } Ok(()) } @@ -111,7 +112,11 @@ pub async fn monitor_identity_events() -> Result<(), Error> { if resp.events.is_empty() { println!("{} No events found", "=====>".blue().bold()); } else { - println!("{} Found {} events", "=====>".blue().bold(), resp.events.len()); + println!( + "{} Found {} events", + "=====>".blue().bold(), + resp.events.len() + ); for (i, ev) in resp.events.iter().enumerate() { println!( "{} Event[{}] id: {} src: {} dst: {}", @@ -136,7 +141,7 @@ pub async fn monitor_identity_events() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), @@ -163,10 +168,16 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> { if resp.metrics.is_empty() { println!("{} No latency metrics found", "=====>".blue().bold()); } else { - println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len()); + println!( + "{} Found {} latency metrics", + "=====>".blue().bold(), + resp.metrics.len() + ); + for (i, metric) in resp.metrics.iter().enumerate() { + let converted_timestamp= convert_timestamp_to_date(metric.timestamp_us); println!( - "index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}", + "{} Latency[{}] \n tgid: {} \n process_name: {} \n address_family: {} \n delta(us): {} \n src_address_v4: {} \n dst_address_v4: {} \n src_address_v6: {} \n dst_address_v6: {} \n local_port: {} \n remote_port: {} \n timestamp_us: {}\n", "=====>".blue().bold(), i, metric.tgid, @@ -179,7 +190,7 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> { format!("{:?}", metric.dst_address_v6), metric.local_port, metric.remote_port, - metric.timestamp_us + converted_timestamp ); } } @@ -196,7 +207,7 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), @@ -220,10 +231,18 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { Ok(response) => { let resp = response.into_inner(); if resp.metrics.is_empty() { - println!("{} No dropped packets metrics found", "=====>".blue().bold()); + println!( + "{} No dropped packets metrics found", + "=====>".blue().bold() + ); } else { - println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len()); + println!( + "{} Found {} dropped packets metrics", + "=====>".blue().bold(), + resp.metrics.len() + ); for (i, metric) in resp.metrics.iter().enumerate() { + let converted_timestamp= convert_timestamp_to_date(metric.timestamp_us); println!( "{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs", "=====>".blue().bold(), @@ -237,7 +256,7 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { metric.sk_wmem_queued, metric.sk_rcvbuf, metric.sk_ack_backlog, - metric.timestamp_us + converted_timestamp ); } } @@ -254,7 +273,7 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { } } } - Err(e) =>{ + Err(e) => { println!( "{} {}", "=====>".blue().bold(), @@ -264,4 +283,9 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> { } } Ok(()) -} \ No newline at end of file +} + +fn convert_timestamp_to_date(timestamp:u64)->String{ + let datetime = DateTime::from_timestamp_micros(timestamp as i64).unwrap(); + datetime.to_string() +} From 13bcd2555c4029ba66ffcf9beab49dace60068ef Mon Sep 17 00:00:00 2001 From: LorenzoTettamanti Date: Mon, 22 Dec 2025 23:39:48 +0100 Subject: [PATCH 2/2] [#150]: added extract_docker_id() unit test. Added extract_container_id function and service_cache function. Added ServiceIdentity struct to store services relevant informations. Added Identites vector to store multiple ServiceIdentity structures --- core/src/components/identity/src/helpers.rs | 106 ++++++++++++-------- 1 file changed, 65 insertions(+), 41 deletions(-) diff --git a/core/src/components/identity/src/helpers.rs b/core/src/components/identity/src/helpers.rs index 5e236e3..90ef530 100644 --- a/core/src/components/identity/src/helpers.rs +++ b/core/src/components/identity/src/helpers.rs @@ -9,18 +9,14 @@ use aya::{ programs::{SchedClassifier, TcAttachType}, }; use bytes::BytesMut; -use cortexbrain_common::constants; use k8s_openapi::api::core::v1::Pod; use kube::api::ObjectList; use kube::{Api, Client}; use nix::net::if_::if_nameindex; -use std::collections::HashMap; +use std::collections::{HashMap}; use std::fs; -use std::hash::Hash; -use std::path::PathBuf; use std::result::Result::Ok; use std::sync::Mutex; -use std::time::Duration; use std::{ borrow::BorrowMut, net::Ipv4Addr, @@ -31,7 +27,6 @@ use std::{ }; use tokio::time; use tracing::{debug, error, info, warn}; -use tracing_subscriber::fmt::format; /* * TryFrom Trait implementation for IpProtocols enum @@ -295,14 +290,6 @@ pub async fn display_tcp_registry_events>( let command_str = String::from_utf8_lossy(&command[..end]).to_string(); let cgroup_id = tcp_pl.cgroup_id; - // construct the parent path - //let proc_path = PathBuf::from("/proc") - // .join(event_id.to_string()) - // .join("cgroup"); - - //let proc_content = fs::read_to_string(&proc_path); - //match proc_content { - // Ok(proc_content) => { match IpProtocols::try_from(tcp_pl.proto) { std::result::Result::Ok(proto) => { info!( @@ -324,13 +311,6 @@ pub async fn display_tcp_registry_events>( ); } }; - //} - //Err(e) => - // eprintln!( - // "An error occured while accessing the content from the {:?} path: {}", - // &proc_path, - // e - // ), } else { warn!("Received packet data too small: {} bytes", data.len()); } @@ -380,10 +360,14 @@ pub async fn scan_cgroup_paths(path: String) -> Result, Error> { Ok(cgroup_paths) } +struct ServiceIdentity { + uid: String, + container_id: String, +} + pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { let interval = std::time::Duration::from_secs(time_delta); - let mut discovered_pods = HashMap::::new(); - while true { + loop { let scanned_paths = scan_cgroup_paths("/sys/fs/cgroup/kubelet.slice".to_string()) .await .expect("An error occured during the cgroup scan"); @@ -423,6 +407,7 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { match subpaths_v2 { Ok(paths) => { for sub2 in paths { + info!("Debugging sub2: {}", &sub2); //return e.g. /sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-podb8701d38_3791_422d_ad15_890ad1a0844b.slice/docker-f2e265659293676231ecb38fafccc97b1a42b75be192c32a602bc8ea579dc866.scope scanned_subpaths_v2.push(sub2); // this contains the addressed like this //kubelet-kubepods-besteffort-pod088f8704_24f0_4636_a8e2_13f75646f370.slice @@ -435,21 +420,34 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { } } - //read the subpaths + let mut uids = Vec::::new(); + let mut identites = Vec::::new(); + + //read the subpaths to extract the pod uid for subpath in scanned_subpaths_v2 { let uid = extract_pod_uid(subpath.clone()) .expect("An error occured during the extraction of pod UIDs"); + let container_id = extract_container_id(subpath.clone()) + .expect("An error occured during the extraction of the docker container id"); debug!("Debugging extracted UID: {:?}", &uid); - uids.push(uid); + // create a linked list for each service + let service_identity = ServiceIdentity { uid, container_id }; + identites.push(service_identity); //push the linked list in a vector of ServiceIdentity structure. Each struct contains the uid and the container id } + // get pod information from UID and store the info in an HashMqp for O(1) access let service_map = get_pod_info().await?; - for (uid) in uids { - if let Some(name) = service_map.get(&uid) { - info!("UID (from eBPF): {} name:(from K8s): {}", &uid, name); - } + //info!("Debugging Identites vector: {:?}", identites); + for service in identites { + let name = service_cache(service_map.clone(), service.uid.clone()); + let uid = service.uid; + let id = service.container_id; + info!( + "[Identity]: name: {:?} uid: {:?} docker container id {:?} ", + name, uid, id + ); } info!( @@ -458,10 +456,22 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> { ); time::sleep(interval).await; } - - Ok(()) } +fn service_cache(service_map: HashMap, uid: String) -> String { + service_map.get(&uid).cloned().unwrap_or_else(|| { + error!("Service not found for uid: {}", uid); + "unknown".to_string() + }) +} +fn extract_container_id(cgroup_path: String) -> Result { + let splits: Vec<&str> = cgroup_path.split("/").collect(); + let index = extract_target_from_splits(splits.clone(), "docker-")?; + let docker_id_split = splits[index] + .trim_start_matches("docker-") + .trim_end_matches(".scope"); + Ok(docker_id_split.to_string()) +} fn extract_pod_uid(cgroup_path: String) -> Result { // example of cgroup path: // /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod93580201_87d5_44e6_9779_f6153ca17637.slice @@ -470,12 +480,9 @@ fn extract_pod_uid(cgroup_path: String) -> Result { // split the path by "/" let splits: Vec<&str> = cgroup_path.split("/").collect(); - let mut uid_vec = Vec::::new(); debug!("Debugging splits: {:?}", &splits); - let mut pod_split_vec = Vec::::new(); - - let index = extract_target_from_splits(splits.clone())?; + let index = extract_target_from_splits(splits.clone(), "-pod")?; let pod_split = splits[index] .trim_start_matches("kubelet-kubepods-besteffort-") @@ -491,10 +498,10 @@ fn extract_pod_uid(cgroup_path: String) -> Result { Ok(uid.to_string()) } -fn extract_target_from_splits(splits: Vec<&str>) -> Result { +fn extract_target_from_splits(splits: Vec<&str>, target: &str) -> Result { for (index, split) in splits.iter().enumerate() { // find the split that contains the word 'pod' - if split.contains("-pod") { + if split.contains(target) { debug!("Target index; {}", index); return Ok(index); } @@ -527,7 +534,7 @@ async fn get_pod_info() -> Result, Error> { if let (Some(name), Some(uid)) = (pod.metadata.name, pod.metadata.uid) { service_map.insert(uid, name); } - } + } // insert the pod name and uid from the KubeAPI Ok(service_map) } @@ -535,7 +542,7 @@ async fn get_pod_info() -> Result, Error> { mod tests { use tracing_subscriber::fmt::format; - use crate::helpers::{extract_pod_uid, extract_target_from_splits}; + use crate::helpers::{extract_container_id, extract_pod_uid, extract_target_from_splits}; #[test] fn extract_uid_from_string() { @@ -566,12 +573,29 @@ mod tests { let mut index_vec = Vec::::new(); for cgroup_path in cgroup_paths { - let mut splits: Vec<&str> = cgroup_path.split("/").collect(); + let splits: Vec<&str> = cgroup_path.split("/").collect(); - let target_index = extract_target_from_splits(splits).unwrap(); + let target_index = extract_target_from_splits(splits, "-pod").unwrap(); index_vec.push(target_index); } let index_check = vec![6, 7]; assert_eq!(index_vec, index_check); } + + #[test] + fn extract_docker_id() { + let cgroup_paths = vec!["/sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod17fd3f7c_37e4_4009_8c38_e58b30691af3.slice/docker-13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861.scope".to_string(), + "/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod17fd3f7c_37e4_4009_8c38_e58b30691af3.slice/docker-13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861.scope".to_string()]; + + let mut id_vec = Vec::::new(); + for cgroup_path in cgroup_paths { + let id = extract_container_id(cgroup_path).unwrap(); + id_vec.push(id); + } + let id_check = vec![ + "13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861".to_string(), + "13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861".to_string(), + ]; + assert_eq!(id_vec, id_check); + } }