diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e74cf0b --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +# Rust +/target +/Cargo.lock + +# CLion +/.idea/* +!/.idea/runConfigurations diff --git a/examples/no-fs-example.rs b/examples/no-fs-example.rs new file mode 100644 index 0000000..a87ec60 --- /dev/null +++ b/examples/no-fs-example.rs @@ -0,0 +1,391 @@ +use cfdp::{ + DummyFaultHook, FaultHandler, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo, + RemoteEntityConfig, StdTimerCreator, TransactionId, + dest::DestinationHandler, + filestore::{FilestoreError, VirtualFilestore}, + request::PutRequest, + source::SourceHandler, + user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, +}; +use spacepackets::cfdp::{ChecksumType, ConditionCode, TransmissionMode}; +use spacepackets::seq_count::SequenceCounterSimple; +use spacepackets::util::UnsignedByteFieldU16; +use std::collections::HashMap; +use std::sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + mpsc, +}; +use std::thread; +use std::time::Duration; + +// A virtual filestore that stores data in memory. +#[derive(Debug, Clone)] +pub struct NoFsFilestore { + files: Arc>>>, +} + +impl Default for NoFsFilestore { + fn default() -> Self { + Self::new() + } +} + +impl NoFsFilestore { + pub fn new() -> Self { + Self { + files: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn add_file(&self, path: &str, data: Vec) { + self.files.lock().unwrap().insert(path.to_string(), data); + } + + pub fn get_file(&self, path: &str) -> Option> { + self.files.lock().unwrap().get(path).cloned() + } +} + +impl VirtualFilestore for NoFsFilestore { + fn create_file(&self, file_path: &str) -> Result<(), FilestoreError> { + println!("Filestore: create_file({})", file_path); + self.files + .lock() + .unwrap() + .insert(file_path.to_string(), Vec::new()); + Ok(()) + } + + fn remove_file(&self, file_path: &str) -> Result<(), FilestoreError> { + println!("Filestore: remove_file({})", file_path); + self.files.lock().unwrap().remove(file_path); + Ok(()) + } + + fn truncate_file(&self, file_path: &str) -> Result<(), FilestoreError> { + println!("Filestore: truncate_file({})", file_path); + if let Some(file) = self.files.lock().unwrap().get_mut(file_path) { + file.clear(); + } + Ok(()) + } + + fn remove_dir(&self, _dir_path: &str, _all: bool) -> Result<(), FilestoreError> { + println!("Filestore: remove_dir"); + Ok(()) + } + + fn create_dir(&self, _dir_path: &str) -> Result<(), FilestoreError> { + println!("Filestore: create_dir"); + Ok(()) + } + + fn read_data( + &self, + file_path: &str, + offset: u64, + read_len: u64, + buf: &mut [u8], + ) -> Result<(), FilestoreError> { + println!( + "Filestore: read_data({}, offset={}, len={})", + file_path, offset, read_len + ); + let files = self.files.lock().unwrap(); + if let Some(file_data) = files.get(file_path) { + let start = offset as usize; + let end = (offset + read_len) as usize; + let end = end.min(file_data.len()); + let copy_len = end - start; + buf[..copy_len].copy_from_slice(&file_data[start..end]); + Ok(()) + } else { + Err(FilestoreError::FileDoesNotExist) + } + } + + fn write_data(&self, file_path: &str, offset: u64, buf: &[u8]) -> Result<(), FilestoreError> { + println!( + "Filestore: write_data({}, offset={}, len={})", + file_path, + offset, + buf.len() + ); + let mut files = self.files.lock().unwrap(); + let file_data = files.entry(file_path.to_string()).or_insert_with(Vec::new); + + // Extend the file if necessary + let required_len = offset as usize + buf.len(); + if file_data.len() < required_len { + file_data.resize(required_len, 0); + } + + file_data[offset as usize..offset as usize + buf.len()].copy_from_slice(buf); + Ok(()) + } + + fn filename_from_full_path(path: &str) -> Option<&str> + where + Self: Sized, + { + Some(path) + } + + fn is_file(&self, path: &str) -> Result { + let exists = self.files.lock().unwrap().contains_key(path); + println!("Filestore: is_file({}) = {}", path, exists); + Ok(exists) + } + + fn exists(&self, path: &str) -> Result { + let exists = self.files.lock().unwrap().contains_key(path); + println!("Filestore: exists({}) = {}", path, exists); + Ok(exists) + } + + fn file_name<'a>(&self, full_path: &'a str) -> Result, FilestoreError> { + Ok(Some(full_path)) + } + + fn file_size(&self, path: &str) -> Result { + let size = self + .files + .lock() + .unwrap() + .get(path) + .map(|f| f.len() as u64) + .unwrap_or(0); + println!("Filestore: file_size({}) = {}", path, size); + Ok(size) + } + + fn calculate_checksum( + &self, + file_path: &str, + checksum_type: ChecksumType, + _size_to_verify: u64, + _verification_buf: &mut [u8], + ) -> Result { + println!( + "Filestore: calculate_checksum({}, {:?})", + file_path, checksum_type + ); + // TODO: Real checksums? + Ok(0) + } +} + +#[derive(Debug, Clone)] +pub struct PrintUser { + pub transaction_finished: Arc, +} + +impl CfdpUser for PrintUser { + fn transaction_indication(&mut self, id: &TransactionId) { + println!("User: Transaction indication for {:?}", id); + } + + fn eof_sent_indication(&mut self, id: &TransactionId) { + println!("User: EOF sent indication for {:?}", id); + } + + fn transaction_finished_indication(&mut self, params: &TransactionFinishedParams) { + println!("User: Transaction finished indication for {:?}", params); + self.transaction_finished.store(true, Ordering::Relaxed); + } + + fn metadata_recvd_indication(&mut self, params: &MetadataReceivedParams) { + println!("User: Metadata received indication for {:?}", params); + } + + fn file_segment_recvd_indication(&mut self, params: &FileSegmentRecvdParams) { + println!("User: File segment received indication for {:?}", params); + } + + fn report_indication(&mut self, id: &TransactionId) { + println!("User: Report indication for {:?}", id); + } + + fn suspended_indication(&mut self, id: &TransactionId, condition_code: ConditionCode) { + println!( + "User: Suspended indication for {:?} with condition {:?}", + id, condition_code + ); + } + + fn resumed_indication(&mut self, id: &TransactionId, progress: u64) { + println!( + "User: Resumed indication for {:?} with progress {}", + id, progress + ); + } + + fn fault_indication( + &mut self, + id: &TransactionId, + condition_code: ConditionCode, + progress: u64, + ) { + println!( + "User: Fault indication for {:?} with condition {:?} and progress {}", + id, condition_code, progress + ); + } + + fn abandoned_indication( + &mut self, + id: &TransactionId, + condition_code: ConditionCode, + progress: u64, + ) { + println!( + "User: Abandoned indication for {:?} with condition {:?} and progress {}", + id, condition_code, progress + ); + } + + fn eof_recvd_indication(&mut self, id: &TransactionId) { + println!("User: EOF received indication for {:?}", id); + } +} + +const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); +const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); + +fn main() { + let test_data = b"Hello PVDX"; + println!("Original data: {:?}", String::from_utf8_lossy(test_data)); + println!("Data length: {} bytes\n", test_data.len()); + + let (pdu_tx_to_dest, pdu_rx_from_source) = mpsc::channel::(); + let (pdu_tx_to_source, pdu_rx_from_dest) = mpsc::channel::(); + + let stop_threads = Arc::new(AtomicBool::new(false)); + let stop_threads_source = stop_threads.clone(); + let stop_threads_dest = stop_threads.clone(); + + let source_finished = Arc::new(AtomicBool::new(false)); + let source_finished_user = source_finished.clone(); + + let dest_finished = Arc::new(AtomicBool::new(false)); + let dest_finished_user = dest_finished.clone(); + + let source_filestore = NoFsFilestore::new(); + source_filestore.add_file("source.txt", test_data.to_vec()); + + let dest_filestore = NoFsFilestore::new(); + let dest_filestore_clone = dest_filestore.clone(); + + let source_thread = thread::spawn(move || { + let local_cfg = LocalEntityConfig { + id: LOCAL_ID.into(), + indication_cfg: IndicationConfig::default(), + fault_handler: FaultHandler::new(DummyFaultHook::default()), + }; + let remote_cfg = RemoteEntityConfig::new_with_default_values( + REMOTE_ID.into(), + 1024, + false, + false, + TransmissionMode::Acknowledged, + ChecksumType::NullChecksum, + ); + + let mut source_handler = SourceHandler::new( + local_cfg, + pdu_tx_to_dest, + source_filestore, + remote_cfg, + StdTimerCreator::default(), + SequenceCounterSimple::::default(), + ); + + let put_request = + PutRequest::new_regular_request(REMOTE_ID.into(), "source.txt", "dest.txt", None, None) + .unwrap(); + source_handler.put_request(&put_request).unwrap(); + + let mut user = PrintUser { + transaction_finished: source_finished_user, + }; + + while !stop_threads_source.load(Ordering::Relaxed) { + let pdu_from_dest = pdu_rx_from_dest.try_recv().ok(); + source_handler + .state_machine(&mut user, pdu_from_dest.as_ref()) + .unwrap(); + thread::sleep(Duration::from_millis(100)); + } + }); + + let dest_thread = thread::spawn(move || { + let local_cfg = LocalEntityConfig { + id: REMOTE_ID.into(), + indication_cfg: IndicationConfig::default(), + fault_handler: FaultHandler::new(DummyFaultHook::default()), + }; + let remote_cfg = RemoteEntityConfig::new_with_default_values( + LOCAL_ID.into(), + 1024, + false, + false, + TransmissionMode::Acknowledged, + ChecksumType::NullChecksum, + ); + + let mut dest_handler = DestinationHandler::new( + local_cfg, + pdu_tx_to_source, + dest_filestore, + remote_cfg, + StdTimerCreator::default(), + cfdp::lost_segments::LostSegmentsList::default(), + ); + + let mut user = PrintUser { + transaction_finished: dest_finished_user, + }; + + while !stop_threads_dest.load(Ordering::Relaxed) { + if let Ok(pdu_from_source) = pdu_rx_from_source.try_recv() { + dest_handler + .state_machine(&mut user, Some(&pdu_from_source)) + .unwrap(); + } + thread::sleep(Duration::from_millis(100)); + } + }); + + // Wait for both threads to finish + while !source_finished.load(Ordering::Relaxed) || !dest_finished.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(100)); + } + + stop_threads.store(true, Ordering::Relaxed); + + source_thread.join().unwrap(); + dest_thread.join().unwrap(); + + println!("\n=== Transfer Complete ==="); + + if let Some(received_data) = dest_filestore_clone.get_file("dest.txt") { + println!( + "Received data: {:?}", + String::from_utf8_lossy(&received_data) + ); + println!("Received length: {} bytes", received_data.len()); + + if received_data == test_data { + println!("\nSUCCESS: Data transferred correctly!"); + } else { + println!("\nERROR: Data mismatch!"); + println!("Expected: {:?}", String::from_utf8_lossy(test_data)); + println!("Got: {:?}", String::from_utf8_lossy(&received_data)); + } + } else { + println!("\nERROR: No data received at destination!"); + } + + println!("\nExample finished."); +}