diff --git a/Cargo.toml b/Cargo.toml index c4ee831..b3a3bce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "binseq" -version = "0.7.8" +version = "0.8.0" edition = "2021" description = "A high efficiency binary format for sequencing data" license = "MIT" diff --git a/examples/parallel_range.rs b/examples/parallel_range.rs index a7206b6..e17a1d4 100644 --- a/examples/parallel_range.rs +++ b/examples/parallel_range.rs @@ -8,7 +8,6 @@ struct RangeProcessor { tid: Option, range_start: usize, range_end: usize, - sbuf: Vec, } impl RangeProcessor { @@ -18,7 +17,6 @@ impl RangeProcessor { tid: None, range_start, range_end, - sbuf: Vec::new(), } } @@ -34,9 +32,6 @@ impl ParallelProcessor for RangeProcessor { // Print progress every 10,000 records if count % 10_000 == 0 { if let Some(tid) = self.tid { - // Decode the sequence to get its length - self.sbuf.clear(); - record.decode_s(&mut self.sbuf)?; println!( "Thread {}: Processed {} records (Range: {}-{}, Index: {}, Len: {})", tid, @@ -44,7 +39,7 @@ impl ParallelProcessor for RangeProcessor { self.range_start, self.range_end, record.index(), - self.sbuf.len() + record.sseq().len(), ); } } diff --git a/examples/read_write.rs b/examples/read_write.rs index 186acee..4e9b7c4 100644 --- a/examples/read_write.rs +++ b/examples/read_write.rs @@ -44,18 +44,18 @@ fn read_write_single(fastq_path: &str, binseq_path: &str, seq_size: usize) -> Re // Read the binary sequence let reader = MmapReader::new(binseq_path)?; let mut num_records_read = 0; - let mut record_buffer = Vec::new(); + let mut sbuf = Vec::new(); for idx in 0..reader.num_records() { let record = reader.get(idx)?; - record.decode_s(&mut record_buffer)?; + record.decode_s(&mut sbuf)?; // Check if the decoded sequence matches the original - let buf_str = std::str::from_utf8(&record_buffer)?; + let buf_str = std::str::from_utf8(&sbuf)?; let seq_str = std::str::from_utf8(&all_sequences[num_records_read])?; assert_eq!(buf_str, seq_str); num_records_read += 1; - record_buffer.clear(); + sbuf.clear(); } eprintln!("Finished reading {num_records_read} records (mmap)"); eprintln!( @@ -131,13 +131,14 @@ fn read_write_paired( // Read the binary sequence with mmap let reader = MmapReader::new(binseq_path)?; - let mut sbuf = Vec::new(); - let mut xbuf = Vec::new(); let mut n_processed = 0; + let mut sbuf = Vec::new(); + let mut xbuf = Vec::new(); for idx in 0..reader.num_records() { let record = reader.get(idx)?; + record.decode_s(&mut sbuf)?; record.decode_x(&mut xbuf)?; @@ -151,10 +152,9 @@ fn read_write_paired( assert_eq!(s_str, s_exp); assert_eq!(x_str, x_exp); + n_processed += 1; sbuf.clear(); xbuf.clear(); - - n_processed += 1; } eprintln!("Finished reading {n_processed} records"); diff --git a/src/bq/reader.rs b/src/bq/reader.rs index 8ce3d91..a60889c 100644 --- a/src/bq/reader.rs +++ b/src/bq/reader.rs @@ -41,6 +41,10 @@ pub struct RefRecord<'a> { buffer: &'a [u64], /// The configuration that defines the layout and size of record components config: RecordConfig, + /// Cached index string for the sequence header + header_buf: [u8; 20], + /// Length of the header in bytes + header_len: usize, } impl<'a> RefRecord<'a> { /// Creates a new record reference @@ -57,7 +61,13 @@ impl<'a> RefRecord<'a> { #[must_use] pub fn new(id: u64, buffer: &'a [u64], config: RecordConfig) -> Self { assert_eq!(buffer.len(), config.record_size_u64()); - Self { id, buffer, config } + Self { + id, + buffer, + config, + header_buf: [0; 20], + header_len: 0, + } } /// Returns the record's configuration /// @@ -66,6 +76,11 @@ impl<'a> RefRecord<'a> { pub fn config(&self) -> RecordConfig { self.config } + + pub fn set_id(&mut self, id: &[u8]) { + self.header_len = id.len(); + self.header_buf[..self.header_len].copy_from_slice(id); + } } impl BinseqRecord for RefRecord<'_> { @@ -76,16 +91,76 @@ impl BinseqRecord for RefRecord<'_> { self.id } /// Clear the buffer and fill it with the sequence header - fn sheader(&self, buffer: &mut Vec) { - buffer.clear(); - buffer.extend_from_slice(itoa::Buffer::new().format(self.id).as_bytes()); + fn sheader(&self) -> &[u8] { + &self.header_buf[..self.header_len] + } + + /// Clear the buffer and fill it with the extended header + fn xheader(&self) -> &[u8] { + self.sheader() + } + + fn flag(&self) -> Option { + if self.config.flags { + Some(self.buffer[0]) + } else { + None + } + } + fn slen(&self) -> u64 { + self.config.slen + } + fn xlen(&self) -> u64 { + self.config.xlen + } + fn sbuf(&self) -> &[u64] { + if self.config.flags { + &self.buffer[1..=(self.config.schunk as usize)] + } else { + &self.buffer[..(self.config.schunk as usize)] + } + } + fn xbuf(&self) -> &[u64] { + if self.config.flags { + &self.buffer[1 + self.config.schunk as usize..] + } else { + &self.buffer[self.config.schunk as usize..] + } + } +} + +/// A reference to a record in the map with a precomputed decoded buffer slice +pub struct BatchRecord<'a> { + /// Unprocessed buffer slice (with flags) + buffer: &'a [u64], + /// Decoded buffer slice + dbuf: &'a [u8], + /// Record ID + id: u64, + /// The configuration that defines the layout and size of record components + config: RecordConfig, + /// Cached index string for the sequence header + header_buf: [u8; 20], + /// Length of the header in bytes + header_len: usize, +} +impl BinseqRecord for BatchRecord<'_> { + fn bitsize(&self) -> BitSize { + self.config.bitsize + } + fn index(&self) -> u64 { + self.id + } + /// Clear the buffer and fill it with the sequence header + fn sheader(&self) -> &[u8] { + &self.header_buf[..self.header_len] } /// Clear the buffer and fill it with the extended header - fn xheader(&self, buffer: &mut Vec) { - buffer.clear(); - buffer.extend_from_slice(itoa::Buffer::new().format(self.id).as_bytes()); + fn xheader(&self) -> &[u8] { + self.sheader() } + fn flag(&self) -> Option { if self.config.flags { Some(self.buffer[0]) @@ -113,6 +188,32 @@ impl BinseqRecord for RefRecord<'_> { &self.buffer[self.config.schunk as usize..] } } + fn decode_s(&self, dbuf: &mut Vec) -> Result<()> { + dbuf.extend_from_slice(self.sseq()); + Ok(()) + } + fn decode_x(&self, dbuf: &mut Vec) -> Result<()> { + dbuf.extend_from_slice(self.xseq()); + Ok(()) + } + /// Override this method since we can make use of block information + fn sseq(&self) -> &[u8] { + if self.config.flags { + let scalar = self.config.scalar(); + &self.dbuf[scalar..scalar + self.config.slen()] + } else { + &self.dbuf[..self.config.slen()] + } + } + /// Override this method since we can make use of block information + fn xseq(&self) -> &[u8] { + if self.config.flags { + let scalar = self.config.scalar(); + &self.dbuf[scalar + self.config.slen()..] + } else { + &self.dbuf[self.config.slen()..] + } + } } /// Configuration for binary sequence record layout @@ -242,6 +343,14 @@ impl RecordConfig { (self.schunk + self.xchunk) as usize } } + + /// The number of nucleotides per word + pub fn scalar(&self) -> usize { + match self.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + } + } } /// A memory-mapped reader for binary sequence files @@ -385,6 +494,23 @@ impl MmapReader { let buffer = cast_slice(bytes); Ok(RefRecord::new(idx as u64, buffer, self.config)) } + + /// Returns a slice of the buffer containing the underlying u64 for that range + /// of records. + /// + /// Note: range 10..40 will return all u64s in the mmap between the record index 10 and 40 + pub fn get_buffer_slice(&self, range: Range) -> Result<&[u64]> { + if range.end > self.num_records() { + return Err(ReadError::OutOfRange(range.end, self.num_records()).into()); + } + let rsize = self.config.record_size_bytes(); + let total_records = range.end - range.start; + let lbound = SIZE_HEADER + (range.start * rsize); + let rbound = lbound + (total_records * rsize); + let bytes = &self.mmap[lbound..rbound]; + let buffer = cast_slice(bytes); + Ok(buffer) + } } /// A reader for streaming binary sequence data from any source that implements Read @@ -713,15 +839,71 @@ impl ParallelReader for MmapReader { return Ok(()); // No records for this thread } - for (batch_idx, idx) in (start_idx..end_idx).enumerate() { - let record = reader.get(idx)?; - processor.process_record(record)?; + // create a reusable buffer for translating record IDs + let mut translater = itoa::Buffer::new(); + + // initialize a decoding buffer + let mut dbuf = Vec::new(); - if batch_idx % BATCH_SIZE == 0 { - processor.on_batch_complete()?; + // calculate the size of a record in the cast u64 slice + let rsize_u64 = reader.config.record_size_bytes() / 8; + + // determine the required scalar size + let scalar = reader.config.scalar(); + + // calculate the size of a record in the batch decoded buffer + let mut dbuf_rsize = { (reader.config.schunk() + reader.config.xchunk()) * scalar }; + if reader.config.flags { + dbuf_rsize += scalar; + } + + // iterate over the range of indices + for range_start in (start_idx..end_idx).step_by(BATCH_SIZE) { + let range_end = (range_start + BATCH_SIZE).min(end_idx); + + // clear the decoded buffer + dbuf.clear(); + + // get the encoded buffer slice + let ebuf = reader.get_buffer_slice(range_start..range_end)?; + + // decode the entire buffer at once (with flags and extra bases) + reader + .config + .bitsize + .decode(ebuf, ebuf.len() * scalar, &mut dbuf)?; + + // iterate over each index in the range + for (inner_idx, idx) in (range_start..range_end).enumerate() { + // translate the index + let id_str = translater.format(idx); + + // create the index buffer + let mut header_buf = [0; 20]; + let header_len = id_str.len(); + header_buf[..header_len].copy_from_slice(id_str.as_bytes()); + + // find the buffer starts + let ebuf_start = inner_idx * rsize_u64; + let dbuf_start = inner_idx * dbuf_rsize; + + // initialize the record + let record = BatchRecord { + buffer: &ebuf[ebuf_start..(ebuf_start + rsize_u64)], + dbuf: &dbuf[dbuf_start..(dbuf_start + dbuf_rsize)], + id: idx as u64, + config: reader.config, + header_buf, + header_len, + }; + + // process the record + processor.process_record(record)?; } + + // process the batch + processor.on_batch_complete()?; } - processor.on_batch_complete()?; Ok(()) }); diff --git a/src/context/traits.rs b/src/context/traits.rs index 84d5055..9712d9d 100644 --- a/src/context/traits.rs +++ b/src/context/traits.rs @@ -22,6 +22,7 @@ pub trait SequenceContext { self.xbuf_mut().clear(); } #[inline] + #[allow(deprecated)] fn fill_sequences(&mut self, record: &R) -> Result<()> { self.clear_sequences(); record.decode_s(self.sbuf_mut())?; @@ -99,9 +100,10 @@ pub trait HeaderContext { #[inline] fn fill_headers(&mut self, record: &R) { - record.sheader(self.sheader_mut()); + self.clear_headers(); + self.sheader_mut().extend_from_slice(record.sheader()); if record.is_paired() { - record.xheader(self.xheader_mut()); + self.xheader_mut().extend_from_slice(record.xheader()); } } } diff --git a/src/parallel.rs b/src/parallel.rs index c0ba298..b46e49f 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -26,6 +26,18 @@ impl BinseqReader { } } + /// Set whether to decode sequences at once in each block + /// + /// Note: This setting applies to VBQ readers only. + pub fn set_decode_block(&mut self, decode_block: bool) { + match self { + Self::Bq(_) => { + // no-op + } + Self::Vbq(reader) => reader.set_decode_block(decode_block), + } + } + #[must_use] pub fn is_paired(&self) -> bool { match self { diff --git a/src/record.rs b/src/record.rs index 29a1a77..93c1570 100644 --- a/src/record.rs +++ b/src/record.rs @@ -22,11 +22,11 @@ pub trait BinseqRecord { /// Returns the flag value of this record fn flag(&self) -> Option; - /// Fills a buffer with the header of this record. - fn sheader(&self, buffer: &mut Vec); + /// Returns the header of this record + fn sheader(&self) -> &[u8]; - /// Fills a buffer with the header of the extended/paired sequence (empty if not paired) - fn xheader(&self, buffer: &mut Vec); + /// Returns the header of the extended/paired sequence (empty if not paired) + fn xheader(&self) -> &[u8]; /// Returns the length of the primary sequence of this record fn slen(&self) -> u64; @@ -70,6 +70,22 @@ pub trait BinseqRecord { Ok(()) } + /// Returns a reference to the primary decoded sequence of this record. + /// + /// This is not available on all types that implement the `Record` trait. + /// It should be available on types that implement it in this library however. + fn sseq(&self) -> &[u8] { + unimplemented!("This record does not implement direct sequence access"); + } + + /// Returns a reference to the extended decoded sequence of this record. + /// + /// This may not be available on all types that implement the `Record` trait. + /// It should be available on types that implement it in this library however. + fn xseq(&self) -> &[u8] { + unimplemented!("This record does not implement direct sequence access"); + } + /// Decodes the primary sequence of this record into a newly allocated buffer. /// /// Not advised to use this function as it allocates a new buffer every time. diff --git a/src/vbq/mod.rs b/src/vbq/mod.rs index 4e1b4ee..f46fb7a 100644 --- a/src/vbq/mod.rs +++ b/src/vbq/mod.rs @@ -129,16 +129,14 @@ //! //! // Process blocks one at a time //! let mut seq_buffer = Vec::new(); -//! let mut header_buffer = Vec::new(); //! while reader.read_block_into(&mut block).unwrap() { //! for record in block.iter() { //! record.decode_s(&mut seq_buffer).unwrap(); -//! record.sheader(&mut header_buffer); -//! println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap()); +//! let header = record.sheader(); +//! println!("Header: {}", std::str::from_utf8(header).unwrap()); //! println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap()); //! println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap()); //! seq_buffer.clear(); -//! header_buffer.clear(); //! } //! } //! # std::fs::remove_file("example.vbq").unwrap_or(()); diff --git a/src/vbq/reader.rs b/src/vbq/reader.rs index 41fd4ac..2973457 100644 --- a/src/vbq/reader.rs +++ b/src/vbq/reader.rs @@ -36,19 +36,15 @@ //! let mut block = reader.new_block(); //! //! // Read records with headers and quality scores -//! let mut seq_buffer = Vec::new(); -//! let mut header_buffer = Vec::new(); //! while reader.read_block_into(&mut block).unwrap() { //! for record in block.iter() { -//! record.decode_s(&mut seq_buffer).unwrap(); -//! record.sheader(&mut header_buffer); -//! println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap()); -//! println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap()); +//! let seq = record.sseq(); +//! let header = record.sheader(); +//! println!("Header: {}", std::str::from_utf8(header).unwrap()); +//! println!("Sequence: {}", std::str::from_utf8(seq).unwrap()); //! if !record.squal().is_empty() { //! println!("Quality: {}", std::str::from_utf8(record.squal()).unwrap()); //! } -//! seq_buffer.clear(); -//! header_buffer.clear(); //! } //! } //! ``` @@ -180,6 +176,9 @@ pub struct RecordBlock { /// Reusable zstd decompression context dctx: zstd_safe::DCtx<'static>, + + /// Reusable decoding buffer for the block + dbuf: Vec, } impl RecordBlock { /// Creates a new empty `RecordBlock` with the specified block size @@ -205,6 +204,7 @@ impl RecordBlock { records: Vec::default(), sequences: Vec::default(), rbuf: Vec::default(), + dbuf: Vec::default(), dctx: zstd_safe::DCtx::create(), } } @@ -270,6 +270,7 @@ impl RecordBlock { self.index = 0; self.records.clear(); self.sequences.clear(); + self.dbuf.clear(); // Note: We keep rbuf allocated for reuse } @@ -430,24 +431,95 @@ impl RecordBlock { slen, xlen, s_seq_span, - x_seq_span, s_qual_span, s_header_span, + x_seq_span, x_qual_span, x_header_span, }); } } + + /// Decodes all sequences in the block at once. + /// + /// Note: + /// Each record's sequence is padded internally to the nearest u64. + /// Because of this the global decoding will include nucleotides that are not present in the original data. + /// We track the non-contiguous regions of the sequence separately. + pub fn decode_all(&mut self) -> Result<()> { + if self.sequences.is_empty() { + return Ok(()); + } + self.dbuf.clear(); + match self.bitsize { + BitSize::Two => { + let num_bp = self.sequences.len() * 32; + bitnuc::twobit::decode(&self.sequences, num_bp, &mut self.dbuf) + } + BitSize::Four => { + let num_bp = self.sequences.len() * 16; + bitnuc::fourbit::decode(&self.sequences, num_bp, &mut self.dbuf) + } + }?; + Ok(()) + } + + /// Get decoded primary sequence for a record by index + #[must_use] + pub fn get_decoded_s(&self, record_idx: usize) -> Option<&[u8]> { + let meta = self.records.get(record_idx)?; + if self.dbuf.is_empty() { + return None; + } + + let bases_per_word = match self.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + }; + + // Calculate offset in decoded buffer (accounting for padding) + let offset = meta.s_seq_span.offset * bases_per_word; + let len = meta.slen as usize; + + Some(&self.dbuf[offset..offset + len]) + } + + /// Get decoded extended sequence for a record by index + #[must_use] + pub fn get_decoded_x(&self, record_idx: usize) -> Option<&[u8]> { + let meta = self.records.get(record_idx)?; + if meta.xlen == 0 { + return Some(&[]); + } + if self.dbuf.is_empty() { + return None; + } + + let bases_per_word = match self.bitsize { + BitSize::Two => 32, + BitSize::Four => 16, + }; + + let offset = meta.x_seq_span.offset * bases_per_word; + let len = meta.xlen as usize; + + Some(&self.dbuf[offset..offset + len]) + } } pub struct RecordBlockIter<'a> { block: &'a RecordBlock, pos: usize, + header_buffer: itoa::Buffer, } impl<'a> RecordBlockIter<'a> { #[must_use] pub fn new(block: &'a RecordBlock) -> Self { - Self { block, pos: 0 } + Self { + block, + pos: 0, + header_buffer: itoa::Buffer::new(), + } } } impl<'a> Iterator for RecordBlockIter<'a> { @@ -460,11 +532,26 @@ impl<'a> Iterator for RecordBlockIter<'a> { let meta = &self.block.records[self.pos]; let index = (self.block.index + self.pos) as u64; - self.pos += 1; + let index_in_block = self.pos; + + let mut header_buf = [0; 20]; + let mut header_len = 0; + if meta.s_header_span.len == 0 && meta.x_header_span.len == 0 { + let header_str = self.header_buffer.format(index); + header_len = header_str.len(); + header_buf[..header_len].copy_from_slice(header_str.as_bytes()); + } + + // increment position + { + self.pos += 1; + } Some(RefRecord { + block: self.block, bitsize: self.block.bitsize, index, + index_in_block, flag: meta.flag, slen: meta.slen, xlen: meta.xlen, @@ -476,14 +563,18 @@ impl<'a> Iterator for RecordBlockIter<'a> { xqual: meta.x_qual_span.slice(&self.block.rbuf), sheader: meta.s_header_span.slice(&self.block.rbuf), xheader: meta.x_header_span.slice(&self.block.rbuf), + header_buf, + header_len, }) } } /// Zero-copy record reference pub struct RefRecord<'a> { + block: &'a RecordBlock, bitsize: BitSize, index: u64, + index_in_block: usize, flag: Option, slen: u64, xlen: u64, @@ -493,9 +584,11 @@ pub struct RefRecord<'a> { xqual: &'a [u8], sheader: &'a [u8], xheader: &'a [u8], + header_buf: [u8; 20], + header_len: usize, } -impl<'a> BinseqRecord for RefRecord<'a> { +impl BinseqRecord for RefRecord<'_> { fn bitsize(&self) -> BitSize { self.bitsize } @@ -504,21 +597,19 @@ impl<'a> BinseqRecord for RefRecord<'a> { self.index } - fn sheader(&self, buffer: &mut Vec) { - buffer.clear(); + fn sheader(&self) -> &[u8] { if self.sheader.is_empty() { - buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes()); + &self.header_buf[..self.header_len] } else { - buffer.extend_from_slice(self.sheader); + self.sheader } } - fn xheader(&self, buffer: &mut Vec) { - buffer.clear(); + fn xheader(&self) -> &[u8] { if self.xheader.is_empty() { - buffer.extend_from_slice(itoa::Buffer::new().format(self.index).as_bytes()); + &self.header_buf[..self.header_len] } else { - buffer.extend_from_slice(self.xheader); + self.xheader } } @@ -549,6 +640,42 @@ impl<'a> BinseqRecord for RefRecord<'a> { fn xqual(&self) -> &[u8] { self.xqual } + + /// Override this method since we can make use of block information + fn decode_s(&self, buf: &mut Vec) -> Result<()> { + if let Some(decoded) = self.block.get_decoded_s(self.index_in_block) { + buf.extend_from_slice(decoded); + } else { + self.bitsize() + .decode(self.sbuf(), self.slen() as usize, buf)?; + } + Ok(()) + } + + /// Override this method since we can make use of block information + fn decode_x(&self, buf: &mut Vec) -> Result<()> { + if let Some(decoded) = self.block.get_decoded_x(self.index_in_block) { + buf.extend_from_slice(decoded); + } else { + self.bitsize() + .decode(self.xbuf(), self.xlen() as usize, buf)?; + } + Ok(()) + } + + /// Override this method since we can make use of block information + fn sseq(&self) -> &[u8] { + self.block + .get_decoded_s(self.index_in_block) + .expect("Reader was built without batch-decoding") + } + + /// Override this method since we can make use of block information + fn xseq(&self) -> &[u8] { + self.block + .get_decoded_x(self.index_in_block) + .expect("Reader was built without batch-decoding") + } } /// Memory-mapped reader for VBINSEQ files @@ -589,13 +716,13 @@ impl<'a> BinseqRecord for RefRecord<'a> { /// use binseq::vbq::MmapReader; /// use binseq::{BinseqRecord, Result}; /// +/// #[allow(deprecated)] /// fn main() -> Result<()> { /// let path = "./data/subset.vbq"; /// let mut reader = MmapReader::new(path)?; // Index loaded automatically /// /// // Create buffers for sequence data and headers /// let mut seq_buffer = Vec::new(); -/// let mut header_buffer = Vec::new(); /// let mut block = reader.new_block(); /// /// // Read blocks sequentially @@ -604,13 +731,12 @@ impl<'a> BinseqRecord for RefRecord<'a> { /// for record in block.iter() { /// // Decode sequence and header /// record.decode_s(&mut seq_buffer)?; -/// record.sheader(&mut header_buffer); +/// let header = record.sheader(); /// -/// println!("Header: {}", std::str::from_utf8(&header_buffer).unwrap_or("")); +/// println!("Header: {}", std::str::from_utf8(&header).unwrap_or("")); /// println!("Sequence: {}", std::str::from_utf8(&seq_buffer).unwrap_or("")); /// /// seq_buffer.clear(); -/// header_buffer.clear(); /// } /// } /// Ok(()) @@ -631,6 +757,9 @@ pub struct MmapReader { /// Total number of records read from the file so far total: usize, + + /// Whether to decode sequences at once in each block + decode_block: bool, } impl MmapReader { /// Creates a new `MmapReader` for a VBINSEQ file @@ -689,6 +818,7 @@ impl MmapReader { header, pos: SIZE_HEADER, total: 0, + decode_block: true, }) } @@ -714,6 +844,24 @@ impl MmapReader { RecordBlock::new(self.header.bits, self.header.block as usize) } + /// Sets whether to decode sequences at once in each block + /// + /// # Arguments + /// + /// * `decode_block` - Whether to decode sequences at once in each block + /// + /// # Examples + /// + /// ```rust,no_run + /// use binseq::vbq::MmapReader; + /// + /// let mut reader = MmapReader::new("example.vbq").unwrap(); + /// reader.set_decode_block(false); + /// ``` + pub fn set_decode_block(&mut self, decode_block: bool) { + self.decode_block = decode_block; + } + /// Returns the path where the index file would be located /// /// The index file is used for random access to blocks and has the same path as @@ -1166,6 +1314,11 @@ impl ParallelReader for MmapReader { // Update the record block index record_block.update_index(block_range.cumulative_records as usize); + // decode the data + if self.decode_block { + record_block.decode_all()?; + } + // Process records in this block that fall within our range for record in record_block.iter() { let global_record_idx = record.index as usize;