Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Rust
/target
/Cargo.lock

# CLion
/.idea/*
!/.idea/runConfigurations
391 changes: 391 additions & 0 deletions examples/no-fs-example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,391 @@
use cfdp::{
DummyFaultHook, FaultHandler, IndicationConfig, LocalEntityConfig, PduOwnedWithInfo,
RemoteEntityConfig, StdTimerCreator, TransactionId,
dest::DestinationHandler,
filestore::{FilestoreError, VirtualFilestore},
request::PutRequest,
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
};
use spacepackets::cfdp::{ChecksumType, ConditionCode, TransmissionMode};
use spacepackets::seq_count::SequenceCounterSimple;
use spacepackets::util::UnsignedByteFieldU16;
use std::collections::HashMap;
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
mpsc,
};
use std::thread;
use std::time::Duration;

// A virtual filestore that stores data in memory.
#[derive(Debug, Clone)]
pub struct NoFsFilestore {
files: Arc<Mutex<HashMap<String, Vec<u8>>>>,
}

impl Default for NoFsFilestore {
fn default() -> Self {
Self::new()
}
}

impl NoFsFilestore {
pub fn new() -> Self {
Self {
files: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn add_file(&self, path: &str, data: Vec<u8>) {
self.files.lock().unwrap().insert(path.to_string(), data);
}

pub fn get_file(&self, path: &str) -> Option<Vec<u8>> {
self.files.lock().unwrap().get(path).cloned()
}
}

impl VirtualFilestore for NoFsFilestore {
fn create_file(&self, file_path: &str) -> Result<(), FilestoreError> {
println!("Filestore: create_file({})", file_path);
self.files
.lock()
.unwrap()
.insert(file_path.to_string(), Vec::new());
Ok(())
}

fn remove_file(&self, file_path: &str) -> Result<(), FilestoreError> {
println!("Filestore: remove_file({})", file_path);
self.files.lock().unwrap().remove(file_path);
Ok(())
}

fn truncate_file(&self, file_path: &str) -> Result<(), FilestoreError> {
println!("Filestore: truncate_file({})", file_path);
if let Some(file) = self.files.lock().unwrap().get_mut(file_path) {
file.clear();
}
Ok(())
}

fn remove_dir(&self, _dir_path: &str, _all: bool) -> Result<(), FilestoreError> {
println!("Filestore: remove_dir");
Ok(())
}

fn create_dir(&self, _dir_path: &str) -> Result<(), FilestoreError> {
println!("Filestore: create_dir");
Ok(())
}

fn read_data(
&self,
file_path: &str,
offset: u64,
read_len: u64,
buf: &mut [u8],
) -> Result<(), FilestoreError> {
println!(
"Filestore: read_data({}, offset={}, len={})",
file_path, offset, read_len
);
let files = self.files.lock().unwrap();
if let Some(file_data) = files.get(file_path) {
let start = offset as usize;
let end = (offset + read_len) as usize;
let end = end.min(file_data.len());
let copy_len = end - start;
buf[..copy_len].copy_from_slice(&file_data[start..end]);
Ok(())
} else {
Err(FilestoreError::FileDoesNotExist)
}
}

fn write_data(&self, file_path: &str, offset: u64, buf: &[u8]) -> Result<(), FilestoreError> {
println!(
"Filestore: write_data({}, offset={}, len={})",
file_path,
offset,
buf.len()
);
let mut files = self.files.lock().unwrap();
let file_data = files.entry(file_path.to_string()).or_insert_with(Vec::new);

// Extend the file if necessary
let required_len = offset as usize + buf.len();
if file_data.len() < required_len {
file_data.resize(required_len, 0);
}

file_data[offset as usize..offset as usize + buf.len()].copy_from_slice(buf);
Ok(())
}

fn filename_from_full_path(path: &str) -> Option<&str>
where
Self: Sized,
{
Some(path)
}

fn is_file(&self, path: &str) -> Result<bool, FilestoreError> {
let exists = self.files.lock().unwrap().contains_key(path);
println!("Filestore: is_file({}) = {}", path, exists);
Ok(exists)
}

fn exists(&self, path: &str) -> Result<bool, FilestoreError> {
let exists = self.files.lock().unwrap().contains_key(path);
println!("Filestore: exists({}) = {}", path, exists);
Ok(exists)
}

fn file_name<'a>(&self, full_path: &'a str) -> Result<Option<&'a str>, FilestoreError> {
Ok(Some(full_path))
}

fn file_size(&self, path: &str) -> Result<u64, FilestoreError> {
let size = self
.files
.lock()
.unwrap()
.get(path)
.map(|f| f.len() as u64)
.unwrap_or(0);
println!("Filestore: file_size({}) = {}", path, size);
Ok(size)
}

fn calculate_checksum(
&self,
file_path: &str,
checksum_type: ChecksumType,
_size_to_verify: u64,
_verification_buf: &mut [u8],
) -> Result<u32, FilestoreError> {
println!(
"Filestore: calculate_checksum({}, {:?})",
file_path, checksum_type
);
// TODO: Real checksums?
Ok(0)
}
}

#[derive(Debug, Clone)]
pub struct PrintUser {
pub transaction_finished: Arc<AtomicBool>,
}

impl CfdpUser for PrintUser {
fn transaction_indication(&mut self, id: &TransactionId) {
println!("User: Transaction indication for {:?}", id);
}

fn eof_sent_indication(&mut self, id: &TransactionId) {
println!("User: EOF sent indication for {:?}", id);
}

fn transaction_finished_indication(&mut self, params: &TransactionFinishedParams) {
println!("User: Transaction finished indication for {:?}", params);
self.transaction_finished.store(true, Ordering::Relaxed);
}

fn metadata_recvd_indication(&mut self, params: &MetadataReceivedParams) {
println!("User: Metadata received indication for {:?}", params);
}

fn file_segment_recvd_indication(&mut self, params: &FileSegmentRecvdParams) {
println!("User: File segment received indication for {:?}", params);
}

fn report_indication(&mut self, id: &TransactionId) {
println!("User: Report indication for {:?}", id);
}

fn suspended_indication(&mut self, id: &TransactionId, condition_code: ConditionCode) {
println!(
"User: Suspended indication for {:?} with condition {:?}",
id, condition_code
);
}

fn resumed_indication(&mut self, id: &TransactionId, progress: u64) {
println!(
"User: Resumed indication for {:?} with progress {}",
id, progress
);
}

fn fault_indication(
&mut self,
id: &TransactionId,
condition_code: ConditionCode,
progress: u64,
) {
println!(
"User: Fault indication for {:?} with condition {:?} and progress {}",
id, condition_code, progress
);
}

fn abandoned_indication(
&mut self,
id: &TransactionId,
condition_code: ConditionCode,
progress: u64,
) {
println!(
"User: Abandoned indication for {:?} with condition {:?} and progress {}",
id, condition_code, progress
);
}

fn eof_recvd_indication(&mut self, id: &TransactionId) {
println!("User: EOF received indication for {:?}", id);
}
}

const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);

fn main() {
let test_data = b"Hello PVDX";
println!("Original data: {:?}", String::from_utf8_lossy(test_data));
println!("Data length: {} bytes\n", test_data.len());

let (pdu_tx_to_dest, pdu_rx_from_source) = mpsc::channel::<PduOwnedWithInfo>();
let (pdu_tx_to_source, pdu_rx_from_dest) = mpsc::channel::<PduOwnedWithInfo>();

let stop_threads = Arc::new(AtomicBool::new(false));
let stop_threads_source = stop_threads.clone();
let stop_threads_dest = stop_threads.clone();

let source_finished = Arc::new(AtomicBool::new(false));
let source_finished_user = source_finished.clone();

let dest_finished = Arc::new(AtomicBool::new(false));
let dest_finished_user = dest_finished.clone();

let source_filestore = NoFsFilestore::new();
source_filestore.add_file("source.txt", test_data.to_vec());

let dest_filestore = NoFsFilestore::new();
let dest_filestore_clone = dest_filestore.clone();

let source_thread = thread::spawn(move || {
let local_cfg = LocalEntityConfig {
id: LOCAL_ID.into(),
indication_cfg: IndicationConfig::default(),
fault_handler: FaultHandler::new(DummyFaultHook::default()),
};
let remote_cfg = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(),
1024,
false,
false,
TransmissionMode::Acknowledged,
ChecksumType::NullChecksum,
);

let mut source_handler = SourceHandler::new(
local_cfg,
pdu_tx_to_dest,
source_filestore,
remote_cfg,
StdTimerCreator::default(),
SequenceCounterSimple::<u16>::default(),
);

let put_request =
PutRequest::new_regular_request(REMOTE_ID.into(), "source.txt", "dest.txt", None, None)
.unwrap();
source_handler.put_request(&put_request).unwrap();

let mut user = PrintUser {
transaction_finished: source_finished_user,
};

while !stop_threads_source.load(Ordering::Relaxed) {
let pdu_from_dest = pdu_rx_from_dest.try_recv().ok();
source_handler
.state_machine(&mut user, pdu_from_dest.as_ref())
.unwrap();
thread::sleep(Duration::from_millis(100));
}
});

let dest_thread = thread::spawn(move || {
let local_cfg = LocalEntityConfig {
id: REMOTE_ID.into(),
indication_cfg: IndicationConfig::default(),
fault_handler: FaultHandler::new(DummyFaultHook::default()),
};
let remote_cfg = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(),
1024,
false,
false,
TransmissionMode::Acknowledged,
ChecksumType::NullChecksum,
);

let mut dest_handler = DestinationHandler::new(
local_cfg,
pdu_tx_to_source,
dest_filestore,
remote_cfg,
StdTimerCreator::default(),
cfdp::lost_segments::LostSegmentsList::default(),
);

let mut user = PrintUser {
transaction_finished: dest_finished_user,
};

while !stop_threads_dest.load(Ordering::Relaxed) {
if let Ok(pdu_from_source) = pdu_rx_from_source.try_recv() {
dest_handler
.state_machine(&mut user, Some(&pdu_from_source))
.unwrap();
}
thread::sleep(Duration::from_millis(100));
}
});

// Wait for both threads to finish
while !source_finished.load(Ordering::Relaxed) || !dest_finished.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(100));
}

stop_threads.store(true, Ordering::Relaxed);

source_thread.join().unwrap();
dest_thread.join().unwrap();

println!("\n=== Transfer Complete ===");

if let Some(received_data) = dest_filestore_clone.get_file("dest.txt") {
println!(
"Received data: {:?}",
String::from_utf8_lossy(&received_data)
);
println!("Received length: {} bytes", received_data.len());

if received_data == test_data {
println!("\nSUCCESS: Data transferred correctly!");
} else {
println!("\nERROR: Data mismatch!");
println!("Expected: {:?}", String::from_utf8_lossy(test_data));
println!("Got: {:?}", String::from_utf8_lossy(&received_data));
}
} else {
println!("\nERROR: No data received at destination!");
}

println!("\nExample finished.");
}