Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "binseq"
version = "0.7.8"
version = "0.8.0"
edition = "2021"
description = "A high efficiency binary format for sequencing data"
license = "MIT"
Expand Down
7 changes: 1 addition & 6 deletions examples/parallel_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ struct RangeProcessor {
tid: Option<usize>,
range_start: usize,
range_end: usize,
sbuf: Vec<u8>,
}

impl RangeProcessor {
Expand All @@ -18,7 +17,6 @@ impl RangeProcessor {
tid: None,
range_start,
range_end,
sbuf: Vec::new(),
}
}

Expand All @@ -34,17 +32,14 @@ impl ParallelProcessor for RangeProcessor {
// Print progress every 10,000 records
if count % 10_000 == 0 {
if let Some(tid) = self.tid {
// Decode the sequence to get its length
self.sbuf.clear();
record.decode_s(&mut self.sbuf)?;
println!(
"Thread {}: Processed {} records (Range: {}-{}, Index: {}, Len: {})",
tid,
count + 1,
self.range_start,
self.range_end,
record.index(),
self.sbuf.len()
record.sseq().len(),
);
}
}
Expand Down
16 changes: 8 additions & 8 deletions examples/read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ fn read_write_single(fastq_path: &str, binseq_path: &str, seq_size: usize) -> Re
// Read the binary sequence
let reader = MmapReader::new(binseq_path)?;
let mut num_records_read = 0;
let mut record_buffer = Vec::new();
let mut sbuf = Vec::new();
for idx in 0..reader.num_records() {
let record = reader.get(idx)?;
record.decode_s(&mut record_buffer)?;
record.decode_s(&mut sbuf)?;

// Check if the decoded sequence matches the original
let buf_str = std::str::from_utf8(&record_buffer)?;
let buf_str = std::str::from_utf8(&sbuf)?;
let seq_str = std::str::from_utf8(&all_sequences[num_records_read])?;
assert_eq!(buf_str, seq_str);

num_records_read += 1;
record_buffer.clear();
sbuf.clear();
}
eprintln!("Finished reading {num_records_read} records (mmap)");
eprintln!(
Expand Down Expand Up @@ -131,13 +131,14 @@ fn read_write_paired(

// Read the binary sequence with mmap
let reader = MmapReader::new(binseq_path)?;
let mut sbuf = Vec::new();
let mut xbuf = Vec::new();

let mut n_processed = 0;
let mut sbuf = Vec::new();
let mut xbuf = Vec::new();

for idx in 0..reader.num_records() {
let record = reader.get(idx)?;

record.decode_s(&mut sbuf)?;
record.decode_x(&mut xbuf)?;

Expand All @@ -151,10 +152,9 @@ fn read_write_paired(
assert_eq!(s_str, s_exp);
assert_eq!(x_str, x_exp);

n_processed += 1;
sbuf.clear();
xbuf.clear();

n_processed += 1;
}
eprintln!("Finished reading {n_processed} records");

Expand Down
208 changes: 195 additions & 13 deletions src/bq/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub struct RefRecord<'a> {
buffer: &'a [u64],
/// The configuration that defines the layout and size of record components
config: RecordConfig,
/// Cached index string for the sequence header
header_buf: [u8; 20],
/// Length of the header in bytes
header_len: usize,
}
impl<'a> RefRecord<'a> {
/// Creates a new record reference
Expand All @@ -57,7 +61,13 @@ impl<'a> RefRecord<'a> {
#[must_use]
pub fn new(id: u64, buffer: &'a [u64], config: RecordConfig) -> Self {
assert_eq!(buffer.len(), config.record_size_u64());
Self { id, buffer, config }
Self {
id,
buffer,
config,
header_buf: [0; 20],
header_len: 0,
}
}
/// Returns the record's configuration
///
Expand All @@ -66,6 +76,11 @@ impl<'a> RefRecord<'a> {
pub fn config(&self) -> RecordConfig {
self.config
}

pub fn set_id(&mut self, id: &[u8]) {
self.header_len = id.len();
self.header_buf[..self.header_len].copy_from_slice(id);
}
}

impl BinseqRecord for RefRecord<'_> {
Expand All @@ -76,16 +91,76 @@ impl BinseqRecord for RefRecord<'_> {
self.id
}
/// Clear the buffer and fill it with the sequence header
fn sheader(&self, buffer: &mut Vec<u8>) {
buffer.clear();
buffer.extend_from_slice(itoa::Buffer::new().format(self.id).as_bytes());
fn sheader(&self) -> &[u8] {
&self.header_buf[..self.header_len]
}

/// Clear the buffer and fill it with the extended header
fn xheader(&self) -> &[u8] {
self.sheader()
}

fn flag(&self) -> Option<u64> {
if self.config.flags {
Some(self.buffer[0])
} else {
None
}
}
fn slen(&self) -> u64 {
self.config.slen
}
fn xlen(&self) -> u64 {
self.config.xlen
}
fn sbuf(&self) -> &[u64] {
if self.config.flags {
&self.buffer[1..=(self.config.schunk as usize)]
} else {
&self.buffer[..(self.config.schunk as usize)]
}
}
fn xbuf(&self) -> &[u64] {
if self.config.flags {
&self.buffer[1 + self.config.schunk as usize..]
} else {
&self.buffer[self.config.schunk as usize..]
}
}
}

/// A reference to a record in the map with a precomputed decoded buffer slice
pub struct BatchRecord<'a> {
/// Unprocessed buffer slice (with flags)
buffer: &'a [u64],
/// Decoded buffer slice
dbuf: &'a [u8],
/// Record ID
id: u64,
/// The configuration that defines the layout and size of record components
config: RecordConfig,
/// Cached index string for the sequence header
header_buf: [u8; 20],
/// Length of the header in bytes
header_len: usize,
}
impl BinseqRecord for BatchRecord<'_> {
fn bitsize(&self) -> BitSize {
self.config.bitsize
}
fn index(&self) -> u64 {
self.id
}
/// Clear the buffer and fill it with the sequence header
fn sheader(&self) -> &[u8] {
&self.header_buf[..self.header_len]
}

/// Clear the buffer and fill it with the extended header
fn xheader(&self, buffer: &mut Vec<u8>) {
buffer.clear();
buffer.extend_from_slice(itoa::Buffer::new().format(self.id).as_bytes());
fn xheader(&self) -> &[u8] {
self.sheader()
}

fn flag(&self) -> Option<u64> {
if self.config.flags {
Some(self.buffer[0])
Expand Down Expand Up @@ -113,6 +188,32 @@ impl BinseqRecord for RefRecord<'_> {
&self.buffer[self.config.schunk as usize..]
}
}
fn decode_s(&self, dbuf: &mut Vec<u8>) -> Result<()> {
dbuf.extend_from_slice(self.sseq());
Ok(())
}
fn decode_x(&self, dbuf: &mut Vec<u8>) -> Result<()> {
dbuf.extend_from_slice(self.xseq());
Ok(())
}
/// Override this method since we can make use of block information
fn sseq(&self) -> &[u8] {
if self.config.flags {
let scalar = self.config.scalar();
&self.dbuf[scalar..scalar + self.config.slen()]
} else {
&self.dbuf[..self.config.slen()]
}
}
/// Override this method since we can make use of block information
fn xseq(&self) -> &[u8] {
if self.config.flags {
let scalar = self.config.scalar();
&self.dbuf[scalar + self.config.slen()..]
} else {
&self.dbuf[self.config.slen()..]
}
}
}

/// Configuration for binary sequence record layout
Expand Down Expand Up @@ -242,6 +343,14 @@ impl RecordConfig {
(self.schunk + self.xchunk) as usize
}
}

/// The number of nucleotides per word
pub fn scalar(&self) -> usize {
match self.bitsize {
BitSize::Two => 32,
BitSize::Four => 16,
}
}
}

/// A memory-mapped reader for binary sequence files
Expand Down Expand Up @@ -385,6 +494,23 @@ impl MmapReader {
let buffer = cast_slice(bytes);
Ok(RefRecord::new(idx as u64, buffer, self.config))
}

/// Returns a slice of the buffer containing the underlying u64 for that range
/// of records.
///
/// Note: range 10..40 will return all u64s in the mmap between the record index 10 and 40
pub fn get_buffer_slice(&self, range: Range<usize>) -> Result<&[u64]> {
if range.end > self.num_records() {
return Err(ReadError::OutOfRange(range.end, self.num_records()).into());
}
let rsize = self.config.record_size_bytes();
let total_records = range.end - range.start;
let lbound = SIZE_HEADER + (range.start * rsize);
let rbound = lbound + (total_records * rsize);
let bytes = &self.mmap[lbound..rbound];
let buffer = cast_slice(bytes);
Ok(buffer)
}
}

/// A reader for streaming binary sequence data from any source that implements Read
Expand Down Expand Up @@ -713,15 +839,71 @@ impl ParallelReader for MmapReader {
return Ok(()); // No records for this thread
}

for (batch_idx, idx) in (start_idx..end_idx).enumerate() {
let record = reader.get(idx)?;
processor.process_record(record)?;
// create a reusable buffer for translating record IDs
let mut translater = itoa::Buffer::new();

// initialize a decoding buffer
let mut dbuf = Vec::new();

if batch_idx % BATCH_SIZE == 0 {
processor.on_batch_complete()?;
// calculate the size of a record in the cast u64 slice
let rsize_u64 = reader.config.record_size_bytes() / 8;

// determine the required scalar size
let scalar = reader.config.scalar();

// calculate the size of a record in the batch decoded buffer
let mut dbuf_rsize = { (reader.config.schunk() + reader.config.xchunk()) * scalar };
if reader.config.flags {
dbuf_rsize += scalar;
}

// iterate over the range of indices
for range_start in (start_idx..end_idx).step_by(BATCH_SIZE) {
let range_end = (range_start + BATCH_SIZE).min(end_idx);

// clear the decoded buffer
dbuf.clear();

// get the encoded buffer slice
let ebuf = reader.get_buffer_slice(range_start..range_end)?;

// decode the entire buffer at once (with flags and extra bases)
reader
.config
.bitsize
.decode(ebuf, ebuf.len() * scalar, &mut dbuf)?;

// iterate over each index in the range
for (inner_idx, idx) in (range_start..range_end).enumerate() {
// translate the index
let id_str = translater.format(idx);

// create the index buffer
let mut header_buf = [0; 20];
let header_len = id_str.len();
header_buf[..header_len].copy_from_slice(id_str.as_bytes());

// find the buffer starts
let ebuf_start = inner_idx * rsize_u64;
let dbuf_start = inner_idx * dbuf_rsize;

// initialize the record
let record = BatchRecord {
buffer: &ebuf[ebuf_start..(ebuf_start + rsize_u64)],
dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize)],
id: idx as u64,
config: reader.config,
header_buf,
header_len,
};

// process the record
processor.process_record(record)?;
}

// process the batch
processor.on_batch_complete()?;
}
processor.on_batch_complete()?;

Ok(())
});
Expand Down
6 changes: 4 additions & 2 deletions src/context/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub trait SequenceContext {
self.xbuf_mut().clear();
}
#[inline]
#[allow(deprecated)]
fn fill_sequences<R: BinseqRecord>(&mut self, record: &R) -> Result<()> {
self.clear_sequences();
record.decode_s(self.sbuf_mut())?;
Expand Down Expand Up @@ -99,9 +100,10 @@ pub trait HeaderContext {

#[inline]
fn fill_headers<R: BinseqRecord>(&mut self, record: &R) {
record.sheader(self.sheader_mut());
self.clear_headers();
self.sheader_mut().extend_from_slice(record.sheader());
if record.is_paired() {
record.xheader(self.xheader_mut());
self.xheader_mut().extend_from_slice(record.xheader());
}
}
}
Loading