Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions cli/src/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
//monitoring CLI function for identity service
use anyhow::Error;
use colored::Colorize;
use k8s_openapi::chrono::DateTime;
use prost::Message;
use prost_types::FileDescriptorProto;
use std::result::Result::Ok;
use tonic_reflection::pb::v1::{ server_reflection_response::MessageResponse };
use tonic_reflection::pb::v1::server_reflection_response::MessageResponse;

use agent_api::client::{ connect_to_client, connect_to_server_reflection };
use agent_api::requests::{ get_all_features, send_active_connection_request };
use agent_api::client::{connect_to_client, connect_to_server_reflection};
use agent_api::requests::{get_all_features, send_active_connection_request};

use clap::command;
use clap::{ Args, Parser, Subcommand };
use clap::{Args, Parser, Subcommand};

//monitoring subcommands
#[derive(Subcommand, Debug, Clone)]
Expand Down Expand Up @@ -87,14 +88,14 @@ pub async fn list_features() -> Result<(), Error> {
}
}
}
Err(e) =>{
Err(e) => {
println!(
"{} {}",
"=====>".blue().bold(),
"Failed to connect to CortexFlow Server Reflection".red()
);
return Err(e);
}
}
}
Ok(())
}
Expand All @@ -111,7 +112,11 @@ pub async fn monitor_identity_events() -> Result<(), Error> {
if resp.events.is_empty() {
println!("{} No events found", "=====>".blue().bold());
} else {
println!("{} Found {} events", "=====>".blue().bold(), resp.events.len());
println!(
"{} Found {} events",
"=====>".blue().bold(),
resp.events.len()
);
for (i, ev) in resp.events.iter().enumerate() {
println!(
"{} Event[{}] id: {} src: {} dst: {}",
Expand All @@ -136,7 +141,7 @@ pub async fn monitor_identity_events() -> Result<(), Error> {
}
}
}
Err(e) =>{
Err(e) => {
println!(
"{} {}",
"=====>".blue().bold(),
Expand All @@ -163,10 +168,16 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> {
if resp.metrics.is_empty() {
println!("{} No latency metrics found", "=====>".blue().bold());
} else {
println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len());
println!(
"{} Found {} latency metrics",
"=====>".blue().bold(),
resp.metrics.len()
);

for (i, metric) in resp.metrics.iter().enumerate() {
let converted_timestamp= convert_timestamp_to_date(metric.timestamp_us);
println!(
"index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}",
"{} Latency[{}] \n tgid: {} \n process_name: {} \n address_family: {} \n delta(us): {} \n src_address_v4: {} \n dst_address_v4: {} \n src_address_v6: {} \n dst_address_v6: {} \n local_port: {} \n remote_port: {} \n timestamp_us: {}\n",
"=====>".blue().bold(),
i,
metric.tgid,
Expand All @@ -179,7 +190,7 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> {
format!("{:?}", metric.dst_address_v6),
metric.local_port,
metric.remote_port,
metric.timestamp_us
converted_timestamp
);
}
}
Expand All @@ -196,7 +207,7 @@ pub async fn monitor_latency_metrics() -> Result<(), Error> {
}
}
}
Err(e) =>{
Err(e) => {
println!(
"{} {}",
"=====>".blue().bold(),
Expand All @@ -220,10 +231,18 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> {
Ok(response) => {
let resp = response.into_inner();
if resp.metrics.is_empty() {
println!("{} No dropped packets metrics found", "=====>".blue().bold());
println!(
"{} No dropped packets metrics found",
"=====>".blue().bold()
);
} else {
println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len());
println!(
"{} Found {} dropped packets metrics",
"=====>".blue().bold(),
resp.metrics.len()
);
for (i, metric) in resp.metrics.iter().enumerate() {
let converted_timestamp= convert_timestamp_to_date(metric.timestamp_us);
println!(
"{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs",
"=====>".blue().bold(),
Expand All @@ -237,7 +256,7 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> {
metric.sk_wmem_queued,
metric.sk_rcvbuf,
metric.sk_ack_backlog,
metric.timestamp_us
converted_timestamp
);
}
}
Expand All @@ -254,7 +273,7 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> {
}
}
}
Err(e) =>{
Err(e) => {
println!(
"{} {}",
"=====>".blue().bold(),
Expand All @@ -264,4 +283,9 @@ pub async fn monitor_dropped_packets() -> Result<(), Error> {
}
}
Ok(())
}
}

fn convert_timestamp_to_date(timestamp:u64)->String{
let datetime = DateTime::from_timestamp_micros(timestamp as i64).unwrap();
datetime.to_string()
}
106 changes: 65 additions & 41 deletions core/src/components/identity/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@ use aya::{
programs::{SchedClassifier, TcAttachType},
};
use bytes::BytesMut;
use cortexbrain_common::constants;
use k8s_openapi::api::core::v1::Pod;
use kube::api::ObjectList;
use kube::{Api, Client};
use nix::net::if_::if_nameindex;
use std::collections::HashMap;
use std::collections::{HashMap};
use std::fs;
use std::hash::Hash;
use std::path::PathBuf;
use std::result::Result::Ok;
use std::sync::Mutex;
use std::time::Duration;
use std::{
borrow::BorrowMut,
net::Ipv4Addr,
Expand All @@ -31,7 +27,6 @@ use std::{
};
use tokio::time;
use tracing::{debug, error, info, warn};
use tracing_subscriber::fmt::format;

/*
* TryFrom Trait implementation for IpProtocols enum
Expand Down Expand Up @@ -295,14 +290,6 @@ pub async fn display_tcp_registry_events<T: BorrowMut<MapData>>(
let command_str = String::from_utf8_lossy(&command[..end]).to_string();
let cgroup_id = tcp_pl.cgroup_id;

// construct the parent path
//let proc_path = PathBuf::from("/proc")
// .join(event_id.to_string())
// .join("cgroup");

//let proc_content = fs::read_to_string(&proc_path);
//match proc_content {
// Ok(proc_content) => {
match IpProtocols::try_from(tcp_pl.proto) {
std::result::Result::Ok(proto) => {
info!(
Expand All @@ -324,13 +311,6 @@ pub async fn display_tcp_registry_events<T: BorrowMut<MapData>>(
);
}
};
//}
//Err(e) =>
// eprintln!(
// "An error occured while accessing the content from the {:?} path: {}",
// &proc_path,
// e
// ),
} else {
warn!("Received packet data too small: {} bytes", data.len());
}
Expand Down Expand Up @@ -380,10 +360,14 @@ pub async fn scan_cgroup_paths(path: String) -> Result<Vec<String>, Error> {
Ok(cgroup_paths)
}

struct ServiceIdentity {
uid: String,
container_id: String,
}

pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> {
let interval = std::time::Duration::from_secs(time_delta);
let mut discovered_pods = HashMap::<String, String>::new();
while true {
loop {
let scanned_paths = scan_cgroup_paths("/sys/fs/cgroup/kubelet.slice".to_string())
.await
.expect("An error occured during the cgroup scan");
Expand Down Expand Up @@ -423,6 +407,7 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> {
match subpaths_v2 {
Ok(paths) => {
for sub2 in paths {
info!("Debugging sub2: {}", &sub2); //return e.g. /sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-podb8701d38_3791_422d_ad15_890ad1a0844b.slice/docker-f2e265659293676231ecb38fafccc97b1a42b75be192c32a602bc8ea579dc866.scope
scanned_subpaths_v2.push(sub2);
// this contains the addressed like this
//kubelet-kubepods-besteffort-pod088f8704_24f0_4636_a8e2_13f75646f370.slice
Expand All @@ -435,21 +420,34 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> {
}
}

//read the subpaths

let mut uids = Vec::<String>::new();
let mut identites = Vec::<ServiceIdentity>::new();

//read the subpaths to extract the pod uid
for subpath in scanned_subpaths_v2 {
let uid = extract_pod_uid(subpath.clone())
.expect("An error occured during the extraction of pod UIDs");
let container_id = extract_container_id(subpath.clone())
.expect("An error occured during the extraction of the docker container id");
debug!("Debugging extracted UID: {:?}", &uid);
uids.push(uid);
// create a linked list for each service
let service_identity = ServiceIdentity { uid, container_id };
identites.push(service_identity); //push the linked list in a vector of ServiceIdentity structure. Each struct contains the uid and the container id
}

// get pod information from UID and store the info in an HashMqp for O(1) access
let service_map = get_pod_info().await?;

for (uid) in uids {
if let Some(name) = service_map.get(&uid) {
info!("UID (from eBPF): {} name:(from K8s): {}", &uid, name);
}
//info!("Debugging Identites vector: {:?}", identites);
for service in identites {
let name = service_cache(service_map.clone(), service.uid.clone());
let uid = service.uid;
let id = service.container_id;
info!(
"[Identity]: name: {:?} uid: {:?} docker container id {:?} ",
name, uid, id
);
}

info!(
Expand All @@ -458,10 +456,22 @@ pub async fn scan_cgroup_cronjob(time_delta: u64) -> Result<(), Error> {
);
time::sleep(interval).await;
}

Ok(())
}
fn service_cache(service_map: HashMap<String, String>, uid: String) -> String {
service_map.get(&uid).cloned().unwrap_or_else(|| {
error!("Service not found for uid: {}", uid);
"unknown".to_string()
})
}
fn extract_container_id(cgroup_path: String) -> Result<String, Error> {
let splits: Vec<&str> = cgroup_path.split("/").collect();

let index = extract_target_from_splits(splits.clone(), "docker-")?;
let docker_id_split = splits[index]
.trim_start_matches("docker-")
.trim_end_matches(".scope");
Ok(docker_id_split.to_string())
}
fn extract_pod_uid(cgroup_path: String) -> Result<String, Error> {
// example of cgroup path:
// /sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod93580201_87d5_44e6_9779_f6153ca17637.slice
Expand All @@ -470,12 +480,9 @@ fn extract_pod_uid(cgroup_path: String) -> Result<String, Error> {

// split the path by "/"
let splits: Vec<&str> = cgroup_path.split("/").collect();
let mut uid_vec = Vec::<String>::new();
debug!("Debugging splits: {:?}", &splits);

let mut pod_split_vec = Vec::<String>::new();

let index = extract_target_from_splits(splits.clone())?;
let index = extract_target_from_splits(splits.clone(), "-pod")?;

let pod_split = splits[index]
.trim_start_matches("kubelet-kubepods-besteffort-")
Expand All @@ -491,10 +498,10 @@ fn extract_pod_uid(cgroup_path: String) -> Result<String, Error> {
Ok(uid.to_string())
}

fn extract_target_from_splits(splits: Vec<&str>) -> Result<usize, Error> {
fn extract_target_from_splits(splits: Vec<&str>, target: &str) -> Result<usize, Error> {
for (index, split) in splits.iter().enumerate() {
// find the split that contains the word 'pod'
if split.contains("-pod") {
if split.contains(target) {
debug!("Target index; {}", index);
return Ok(index);
}
Expand Down Expand Up @@ -527,15 +534,15 @@ async fn get_pod_info() -> Result<HashMap<String, String>, Error> {
if let (Some(name), Some(uid)) = (pod.metadata.name, pod.metadata.uid) {
service_map.insert(uid, name);
}
}
} // insert the pod name and uid from the KubeAPI

Ok(service_map)
}

mod tests {
use tracing_subscriber::fmt::format;

use crate::helpers::{extract_pod_uid, extract_target_from_splits};
use crate::helpers::{extract_container_id, extract_pod_uid, extract_target_from_splits};

#[test]
fn extract_uid_from_string() {
Expand Down Expand Up @@ -566,12 +573,29 @@ mod tests {

let mut index_vec = Vec::<usize>::new();
for cgroup_path in cgroup_paths {
let mut splits: Vec<&str> = cgroup_path.split("/").collect();
let splits: Vec<&str> = cgroup_path.split("/").collect();

let target_index = extract_target_from_splits(splits).unwrap();
let target_index = extract_target_from_splits(splits, "-pod").unwrap();
index_vec.push(target_index);
}
let index_check = vec![6, 7];
assert_eq!(index_vec, index_check);
}

#[test]
fn extract_docker_id() {
let cgroup_paths = vec!["/sys/fs/cgroup/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod17fd3f7c_37e4_4009_8c38_e58b30691af3.slice/docker-13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861.scope".to_string(),
"/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-besteffort.slice/kubelet-kubepods-besteffort-pod17fd3f7c_37e4_4009_8c38_e58b30691af3.slice/docker-13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861.scope".to_string()];

let mut id_vec = Vec::<String>::new();
for cgroup_path in cgroup_paths {
let id = extract_container_id(cgroup_path).unwrap();
id_vec.push(id);
}
let id_check = vec![
"13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861".to_string(),
"13abd64c0ba349975a762476c9703b642d18077eabeb3aa1d941132048afc861".to_string(),
];
assert_eq!(id_vec, id_check);
}
}