Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 71 additions & 38 deletions snapatac2-core/src/preprocessing/bam.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
mod mark_duplicates;
mod header;
mod flagstat;
mod header;
mod mark_duplicates;
pub use flagstat::{filter_bam, BamQC, FlagStat};
pub use mark_duplicates::{group_bam_by_barcode, BarcodeLocation};
pub use flagstat::{filter_bam, FlagStat, BamQC};

use bstr::BString;
use anyhow::{bail, Result};
use bed_utils::bed::BEDLike;
use noodles::{bam, sam::alignment::record::data::field::Tag};
use bstr::BString;
use indicatif::{style::ProgressStyle, ProgressBar, ProgressDrawTarget, ProgressIterator};
use regex::Regex;
use anyhow::{Result, bail};
use std::{collections::{HashMap, HashSet}, io::Write, path::Path};
use log::warn;
use noodles::{bam, sam::alignment::record::data::field::Tag};
use regex::Regex;
use std::{
collections::{HashMap, HashSet},
io::Write,
path::Path,
};

use crate::utils::{open_file_for_write, Compression};
use crate::preprocessing::Fragment;
use crate::utils::{open_file_for_write, Compression};

// SAM flags to discard during filtering: unmapped (0x4), mate unmapped (0x8),
// not primary (0x100), QC fail (0x200), PCR/optical duplicate (0x400).
// 0x4 | 0x8 | 0x100 | 0x200 | 0x400 = 1804.
const EXCLUDE_FLAGS: u16 = 1804;
// The PCR/optical duplicate flag (0x400).
const DUPLICATE_FLAG: u16 = 0x400;

#[derive(Debug, Clone, Default)]
pub struct FragmentQC {
Expand All @@ -37,7 +48,11 @@ impl FragmentQC {
self.num_pcr_duplicates += fragment.count() as u64 - 1;
self.num_unique_fragments += 1;
let size = fragment.len();
if self.mitochondrion.as_ref().map_or(true, |mito| !mito.contains(fragment.chrom())) {
if self
.mitochondrion
.as_ref()
.map_or(true, |mito| !mito.contains(fragment.chrom()))
{
if size < 147 {
self.num_frag_nfr += 1;
} else if size <= 294 {
Expand All @@ -57,9 +72,19 @@ impl FragmentQC {
/// as another read pair in the library.
pub fn report(&self) -> HashMap<String, f64> {
let mut result = HashMap::new();
result.insert("frac_duplicates".to_string(), self.num_pcr_duplicates as f64 / (self.num_unique_fragments + self.num_pcr_duplicates) as f64);
result.insert("frac_fragment_in_nucleosome_free_region".to_string(), self.num_frag_nfr as f64 / self.num_unique_fragments as f64);
result.insert("frac_fragment_flanking_single_nucleosome".to_string(), self.num_frag_single as f64 / self.num_unique_fragments as f64);
result.insert(
"frac_duplicates".to_string(),
self.num_pcr_duplicates as f64
/ (self.num_unique_fragments + self.num_pcr_duplicates) as f64,
);
result.insert(
"frac_fragment_in_nucleosome_free_region".to_string(),
self.num_frag_nfr as f64 / self.num_unique_fragments as f64,
);
result.insert(
"frac_fragment_flanking_single_nucleosome".to_string(),
self.num_frag_single as f64 / self.num_unique_fragments as f64,
);
result
}
}
Expand All @@ -80,6 +105,7 @@ impl FragmentQC {
/// * `bam_file` - File name of the BAM file.
/// * `output_file` - File name of the output fragment file.
/// * `is_paired` - Indicate whether the BAM file contain paired-end reads.
/// * `long_read` - Indicate whether the BAM file contains long reads (e.g. ONT).
/// * `barcode_tag` - Extract barcodes from TAG fields of BAM records, e.g., `barcode_tag = "CB"`.
/// * `barcode_regex` - Extract barcodes from read names of BAM records using regular expressions.
/// Reguler expressions should contain exactly one capturing group
Expand All @@ -102,6 +128,7 @@ pub fn make_fragment_file<P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path>>(
bam_file: P1,
output_file: P2,
is_paired: bool,
long_read: bool,
barcode_tag: Option<[u8; 2]>,
barcode_regex: Option<&str>,
umi_tag: Option<[u8; 2]>,
Expand Down Expand Up @@ -143,7 +170,7 @@ pub fn make_fragment_file<P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path>>(
Some("10x") => {
warn!("The number of PCR duplicates cannot be computed for 10X Genomics BAM files.");
header::read_10x_header(reader.get_mut())?
},
}
_ => reader.read_header()?,
};

Expand All @@ -157,36 +184,42 @@ pub fn make_fragment_file<P1: AsRef<Path>, P2: AsRef<Path>, P3: AsRef<Path>>(
.unwrap(),
);
let mut fragment_qc = FragmentQC::new(mitochondrion.clone());
let mut library_qc = BamQC::new(
mitochondrion.map(|mito| mito.into_iter().flat_map(
|x| header.reference_sequences().get_index_of(&BString::from(x))).collect()
)
);
let mut library_qc = BamQC::new(mitochondrion.map(|mito| {
mito.into_iter()
.flat_map(|x| header.reference_sequences().get_index_of(&BString::from(x)))
.collect()
}));
// Long-read pipelines mark duplicates upstream; keep those reads (drop only the
// duplicate bit from the filter) so SnapATAC2 deduplicates them itself.
let exclude_flags = if long_read {
EXCLUDE_FLAGS & !DUPLICATE_FLAG
} else {
EXCLUDE_FLAGS
};
let filtered_records = filter_bam(
reader.records().map(Result::unwrap),
is_paired,
&barcode,
umi.as_ref(),
mapq,
exclude_flags,
&mut library_qc,
);
group_bam_by_barcode(
filtered_records,
is_paired,
temp_dir,
chunk_size,
)
.into_fragments(&header)
.progress_with(spinner)
.for_each(|barcode| barcode.into_iter().for_each(|mut rec| {
if rec.strand().is_none() { // perform fragment length correction for paired-end reads
rec.set_start(rec.start().saturating_add_signed(shift_left));
rec.set_end(rec.end().saturating_add_signed(shift_right));
}
if rec.len() > 0 {
fragment_qc.update(&rec);
writeln!(output, "{}", rec).unwrap();
}
}));
group_bam_by_barcode(filtered_records, is_paired, temp_dir, chunk_size)
.into_fragments(&header)
.progress_with(spinner)
.for_each(|barcode| {
barcode.into_iter().for_each(|mut rec| {
if rec.strand().is_none() {
// perform fragment length correction for paired-end reads
rec.set_start(rec.start().saturating_add_signed(shift_left));
rec.set_end(rec.end().saturating_add_signed(shift_right));
}
if rec.len() > 0 {
fragment_qc.update(&rec);
writeln!(output, "{}", rec).unwrap();
}
})
});
Ok((library_qc, fragment_qc))
}
}
114 changes: 72 additions & 42 deletions snapatac2-core/src/preprocessing/bam/flagstat.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use noodles::{bam::Record, sam::alignment::record::{Cigar, Flags, cigar::op::Kind}};
use anyhow::{Context, Result};
use itertools::Itertools;
use noodles::{
bam::Record,
sam::alignment::record::{cigar::op::Kind, Cigar, Flags},
};
use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use std::collections::{HashMap, HashSet};
use anyhow::{Result, Context};
use itertools::Itertools;

use super::BarcodeLocation;

Expand Down Expand Up @@ -98,8 +101,8 @@ impl FlagStat {
self.singleton += 1;
} else {
self.mate_mapped += 1;
let rec_id = record.mate_reference_sequence_id().unwrap().unwrap();
let mat_id = record.reference_sequence_id().unwrap().unwrap();
let rec_id = record.mate_reference_sequence_id().unwrap().unwrap();
let mat_id = record.reference_sequence_id().unwrap().unwrap();

if mat_id != rec_id {
self.mate_reference_sequence_id_mismatch += 1;
Expand Down Expand Up @@ -133,12 +136,22 @@ impl BamQC {

pub fn update(&mut self, record: &Record, barcode: &Option<String>) {
let flagstat = FlagStat::new(record);
if flagstat.paired == 1 && flagstat.read_2 == 1{
if flagstat.paired == 1 && flagstat.read_2 == 1 {
self.num_read2_bases += record.sequence().len() as u64;
self.num_read2_q30_bases += record.quality_scores().as_ref().iter().filter(|x| **x >= 30).count() as u64;
self.num_read2_q30_bases += record
.quality_scores()
.as_ref()
.iter()
.filter(|x| **x >= 30)
.count() as u64;
} else {
self.num_read1_bases += record.sequence().len() as u64;
self.num_read1_q30_bases += record.quality_scores().as_ref().iter().filter(|x| **x >= 30).count() as u64;
self.num_read1_q30_bases += record
.quality_scores()
.as_ref()
.iter()
.filter(|x| **x >= 30)
.count() as u64;
}

self.all_reads_flagstat.add(&flagstat);
Expand All @@ -149,7 +162,12 @@ impl BamQC {
if barcode.is_some() {
self.barcoded_reads_flagstat.add(&flagstat);
if let Some(rid) = record.reference_sequence_id() {
if is_hq && self.mitochondrion.as_ref().map_or(false, |x| x.contains(&rid.unwrap())) {
if is_hq
&& self
.mitochondrion
.as_ref()
.map_or(false, |x| x.contains(&rid.unwrap()))
{
self.mito_flagstat.add(&flagstat);
}
}
Expand Down Expand Up @@ -197,9 +215,18 @@ impl BamQC {
};
result.insert("sequenced_reads".to_string(), num_reads as f64);
result.insert("sequenced_read_pairs".to_string(), num_pairs as f64);
result.insert("frac_q30_bases_read1".to_string(), self.num_read1_q30_bases as f64 / self.num_read1_bases as f64);
result.insert("frac_q30_bases_read2".to_string(), self.num_read2_q30_bases as f64 / self.num_read2_bases as f64);
result.insert("frac_confidently_mapped".to_string(), fraction_confidently_mapped);
result.insert(
"frac_q30_bases_read1".to_string(),
self.num_read1_q30_bases as f64 / self.num_read1_bases as f64,
);
result.insert(
"frac_q30_bases_read2".to_string(),
self.num_read2_q30_bases as f64 / self.num_read2_bases as f64,
);
result.insert(
"frac_confidently_mapped".to_string(),
fraction_confidently_mapped,
);
result.insert("frac_unmapped".to_string(), fraction_unmapped);
result.insert("frac_valid_barcode".to_string(), valid_barcode);
result.insert("frac_nonnuclear".to_string(), fraction_nonnuclear);
Expand All @@ -224,11 +251,7 @@ pub struct AlignmentInfo {
}

impl AlignmentInfo {
pub fn new(
rec: &Record,
barcode: Option<String>,
umi: Option<String>,
) -> Result<Self> {
pub fn new(rec: &Record, barcode: Option<String>, umi: Option<String>) -> Result<Self> {
let cigar = rec.cigar();
let start: usize = rec.alignment_start().unwrap().unwrap().try_into()?;
let alignment_start: u32 = start.try_into()?;
Expand All @@ -239,19 +262,26 @@ impl AlignmentInfo {
kind == Kind::HardClip || kind == Kind::SoftClip
});
let mut clips = clip_groups.into_iter();
let clipped_start: u32 = clips.next().map_or(0, |(is_clip, x)| if is_clip {
x.map(|x| x.len() as u32).sum()
} else {
0
let clipped_start: u32 = clips.next().map_or(0, |(is_clip, x)| {
if is_clip {
x.map(|x| x.len() as u32).sum()
} else {
0
}
});
let clipped_end: u32 = clips.last().map_or(0, |(is_clip, x)| if is_clip {
x.map(|x| x.len() as u32).sum()
} else {
0
let clipped_end: u32 = clips.last().map_or(0, |(is_clip, x)| {
if is_clip {
x.map(|x| x.len() as u32).sum()
} else {
0
}
});
Ok(Self {
name: std::str::from_utf8(rec.name().context("no read name")?)?.to_string(),
reference_sequence_id: rec.reference_sequence_id().context("no reference sequence id")??.try_into()?,
reference_sequence_id: rec
.reference_sequence_id()
.context("no reference sequence id")??
.try_into()?,
flags: rec.flags().bits(),
alignment_start,
alignment_end,
Expand All @@ -263,7 +293,9 @@ impl AlignmentInfo {
})
}

pub(crate) fn flags(&self) -> Flags { Flags::from_bits_retain(self.flags) }
pub(crate) fn flags(&self) -> Flags {
Flags::from_bits_retain(self.flags)
}

pub(crate) fn alignment_5p(&self) -> u32 {
if self.flags().is_reverse_complemented() {
Expand All @@ -274,27 +306,22 @@ impl AlignmentInfo {
}
}



/// Filter Bam records.
///
/// `exclude_flags` is the SAM flag mask used to discard reads.
pub fn filter_bam<'a, I>(
reads: I,
is_paired: bool,
barcode_loc: &'a BarcodeLocation,
umi_loc: Option<&'a BarcodeLocation>,
mapq_filter: Option<u8>,
exclude_flags: u16,
qc: &'a mut BamQC,
) -> impl Iterator<Item = AlignmentInfo> + 'a
where
I: Iterator<Item = Record> + 'a,
{
// flag (1804) meaning:
// - read unmapped
// - mate unmapped
// - not primary alignment
// - read fails platform/vendor quality checks
// - read is PCR or optical duplicate
let flag_failed = Flags::from_bits(1804).unwrap();
let flag_failed = Flags::from_bits_retain(exclude_flags);
reads.filter_map(move |r| {
let barcode = barcode_loc.extract(&r).ok();
let umi = umi_loc.and_then(|x| x.extract(&r).ok());
Expand All @@ -305,8 +332,8 @@ where
qc.update(&r, &barcode);

let flag = r.flags();
let is_properly_aligned = !flag.is_supplementary() &&
(!is_paired || flag.is_properly_segmented());
let is_properly_aligned =
!flag.is_supplementary() && (!is_paired || flag.is_properly_segmented());
let flag_pass = !flag.intersects(flag_failed);
if is_properly_aligned && flag_pass && is_hq && barcode.is_some() {
let alignment = AlignmentInfo::new(&r, barcode, umi).unwrap();
Expand All @@ -317,9 +344,12 @@ where
})
}


// The sum of all base qualities in the record above 15.
fn sum_of_qual_score(read: &Record) -> u32 {
read.quality_scores().as_ref().iter().map(|x| u8::from(*x) as u32)
.filter(|x| *x >= 15).sum()
}
read.quality_scores()
.as_ref()
.iter()
.map(|x| u8::from(*x) as u32)
.filter(|x| *x >= 15)
.sum()
}
Loading