diff --git a/docs-src/api.rst b/docs-src/api.rst index b38bbd4..be4a023 100644 --- a/docs-src/api.rst +++ b/docs-src/api.rst @@ -14,9 +14,6 @@ Interval DisjointIntervalSequence ======================== -.. autoclass:: genome_kit.diseq.IndexDirection - :members: - .. autoclass:: genome_kit.diseq.DisjointIntervalSequence :special-members: :members: diff --git a/docs-src/diseq.rst b/docs-src/diseq.rst index ccf8122..e17144e 100644 --- a/docs-src/diseq.rst +++ b/docs-src/diseq.rst @@ -51,15 +51,15 @@ A DIS has two aspects: The following examples illustrate how the coordinate space and segment interact, using both diagrams and code. -Consider a transcript on the + strand with the following genomic layout: -:: +Consider a transcript on the + strand with the following genomic layout:: + Genomic Coordinates: 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 DNA Sequence: | A T G C C G C A T G C C G C | | |<------->| |<------->| |<--->| |<----------->| |<--->| | 5' Exon1 Intron1 Exon2 Intron2 Exon3 3' -Extracting only the exons yields the following disjoint intervals: -:: +Extracting only the exons yields the following disjoint intervals:: + Genomic Coordinates: 153 154 155 159 160 165 166 167 DNA Sequence: | A T G C A G C | | |<------->| |<--->| |<--->| | @@ -74,15 +74,15 @@ These exon intervals can be represented as :py:class:`~genome_kit.Interval` obje >>> exon3 = Interval("chr1", "+", 165, 167, "hg38") To define a segment spanning the full exonic sequence (from the start of Exon1 to -the end of Exon3), the exon intervals are first converted into a DIS coordinate space -:: +the end of Exon3), the exon intervals are first converted into a DIS coordinate space:: + DIS Coordinates: 0 1 2 3 4 5 6 7 DNA Sequence: | A T G C A G C | | |<------->| |<--->| |<--->| | 5' Exon1 Exon2 Exon3 3' -The default segment spans the entire coordinate space -:: +The default segment spans the entire coordinate space:: + DIS Coordinates: 0 1 2 3 4 5 6 7 DNA Sequence: A T G C A G C |<--------------------->| @@ -111,15 +111,15 @@ and an end index of 7:: A DIS can also represent a segment on the strand opposite the coordinate space. This is useful for modeling the complementary sequence or a binding partner. -Starting from the coordinate space defined above -:: +Starting from the coordinate space defined above:: + DIS Coordinates: 0 1 2 3 4 5 6 7 DNA Sequence: | A T G C A G C | | |<----->| |<->| |<->| | 5' Exon1 Exon2 Exon3 3' -The opposite strand shares the same DIS coordinate indices -:: +The opposite strand shares the same DIS coordinate indices:: + 5' Positive strand 3' DIS Coordinates: | 0 1 2 3 4 5 6 7 | DNA Sequence (+): | A T G C A G C | @@ -131,8 +131,8 @@ The opposite strand shares the same DIS coordinate indices The DIS coordinate indices are identical on both strands. To obtain the complement of a given segment, the same start and end indices apply; only the ``on_coordinate_strand`` flag changes. The following shows the full-length segment -on the opposite strand -:: +on the opposite strand:: + 5' Coordinate Strand 3' DIS Coordinates: | 0 1 2 3 4 5 6 7 | DNA Sequence (+): | A T G C A G C | @@ -171,23 +171,24 @@ segments, since the start and end indices alone do not encode strand information >>> dis = DisjointIntervalSequence.from_intervals([iv]) >>> assert dis.end5.start < dis.end3.start -Consider a transcript on the negative strand: -:: +Consider a transcript on the negative strand:: + 3' Exon3 Intron2 Exon2 Intron1 Exon1 5' | |<------->| |<------->| |<--->| |<----------->| |<--->| | DNA Sequence (-): | G T C A G T C A G T C A G T | Genomic Coordinates: 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 Negative Strand -Extracting only the exons: -:: + +Extracting only the exons:: + 3' Exon3 Exon2 Exon1 5' | |<------->| |<--->| |<--->| | DNA Sequence (-): | G T C C A G T | Genomic Coordinates: 153 154 155 159 160 165 166 167 Negative Strand -Converting these exons into a DIS coordinate space: -:: +Converting these exons into a DIS coordinate space:: + DIS Coordinates: 0 1 2 3 4 5 6 7 DNA Sequence: | T G A C C T G | | |<->| |<->| |<----->| | @@ -210,8 +211,8 @@ largest index to the 3' end:: >>> dis_neg.coordinate_length 7 -A full-length segment on the coordinate strand -:: +A full-length segment on the coordinate strand:: + DIS Coordinates: 0 1 2 3 4 5 6 7 DNA Sequence: T G A C C T G |<--------------------->| @@ -231,8 +232,8 @@ objects, strand is expressed only as "same strand" or "opposite strand":: >>> dis_neg.on_coordinate_strand True -The same coordinate space with an opposite-strand segment -:: +The same coordinate space with an opposite-strand segment:: + DIS Coordinates: 0 1 2 3 4 5 6 7 DNA Sequence (-): T G A C C T G ----------------------------------------------------- @@ -408,6 +409,7 @@ Strand Methods A DIS segment can sit on either 'virtual' strand independently of the coordinate intervals. The ``on_coordinate_strand`` property indicates whether the segment is on the same strand as the coordinate intervals:: + On Coordinate Strand: True Start Index: 1 End Index: 6 @@ -419,6 +421,8 @@ segment is on the same strand as the coordinate intervals:: DIS Coordinates: 0 1 2 3 4 5 6 7 Opposite Strand +:: + >>> dis.on_coordinate_strand True >>> dis.is_same_strand() @@ -549,6 +553,56 @@ Negative values contract the segment:: Contracting to exactly zero length is valid, but contracting past zero raises ``ValueError``. +expand_coord +~~~~~~~~~~~~ + +``expand`` and ``expand_coord`` both grow the segment, but they do so at +different layers. ``expand`` stretches the segment *within* the existing +coordinate space; the coord intervals are unchanged. +``expand_coord`` instead extends the coordinate +space itself by lengthening the outermost coord intervals, and grows the +segment by the same amount so it tracks the new edges. + +This distinction matters when you want flanking genomic context to become +part of the indexed coordinate space rather than just a virtual extension, +such as when getting the DNA of the DIS segment (off-coordinate bases are +N-padded). + +For example, when modeling a transcript plus its surrounding promoter and +poly-A regions, ``expand_coord(50)`` adds 50 genomic bases to each end +of the coordinate space, and those bases are now real, indexable, DNA- +backed positions rather than N-padding:: + + >>> dis.coord_strand + '+' + >>> dis.coordinate_intervals + (Interval("chr1", "+", 1000, 1100, "hg38"), + Interval("chr1", "+", 1200, 1300, "hg38"), + Interval("chr1", "+", 1500, 1600, "hg38")) + >>> dis.coordinate_length + 300 + >>> dis.start, dis.end + (0, 300) + + >>> expanded = dis.expand_coord(50) + >>> expanded.coordinate_intervals + (Interval("chr1", "+", 950, 1100, "hg38"), + Interval("chr1", "+", 1200, 1300, "hg38"), + Interval("chr1", "+", 1500, 1650, "hg38")) + >>> expanded.coordinate_length + 400 + >>> expanded.start, expanded.end + (0, 400) + +Only the outer 5' edge of the first coord interval and the outer 3' edge +of the last coord interval are extended; gaps between coord intervals are +preserved. On the negative strand, ``upstream`` and ``dnstream`` still +refer to the transcript's 5' and 3' ends, so the underlying genomic +adjustments mirror those on the plus strand. + +Unlike ``expand``, ``expand_coord`` does not accept negative values — +shrinking the coordinate space is not supported. + Positional Comparisons ====================== @@ -656,3 +710,119 @@ space and strand apply:: >>> z = DisjointIntervalSequence(coord_ivs, start=50, end=50) >>> z.within(a) True + +Mapping Between Genomic and DIS Coordinates +=========================================== + +A DIS lives in two coordinate systems at once: the genomic coordinates of +its underlying intervals, and the spliced DIS index space. ``lower`` and +``lift_interval`` are the two directions of travel between them. + +``lower`` projects the segment back to genomic space. ``lift_interval`` +projects a genomic interval into the DIS's index space and clips it +against the segment. They are conceptual inverses, but each has to deal +with a structural mismatch — gaps in genomic space have no representation +in DIS space, and contiguous DIS indices may correspond to two +non-adjacent genomic regions — so neither is a clean bijection. + +lower +~~~~~ + +Because a segment can straddle one or more boundaries between coord +intervals, ``lower`` returns a *list* of genomic +:py:class:`~genome_kit.Interval` objects rather than a single one. The +list is in 5'→3' order with respect to the segment, regardless of the +underlying coord strand or whether the segment is on the coord strand. + +A segment that fits inside a single coord interval lowers to a one-element +list. A segment that crosses N coord-interval boundaries lowers to N+1 +intervals — the gaps between coord intervals are skipped:: + + >>> dis = DisjointIntervalSequence( + ... [Interval("chr1", "+", 100, 200, "hg38"), + ... Interval("chr1", "+", 300, 400, "hg38")], + ... start=50, end=150, + ... ) + >>> dis.lower() + [Interval("chr1", "+", 150, 200, "hg38"), + Interval("chr1", "+", 300, 350, "hg38")] + +When the segment extends past the coord space (``start < 0`` or +``end > coordinate_length``), the out-of-bounds portion is **linearly +extrapolated** from the nearest outer edge of the coord intervals. The +returned intervals can therefore have negative starts or ends past the +chromosome length — there is no clipping to chromosome boundaries. + +A segment on the opposite strand lowers to genomic intervals on the +opposite strand, listed in *segment* 5'→3' order (which is the reverse of +coord 5'→3' order):: + + >>> dis_opp = dis.as_opposite_strand() + >>> dis_opp.lower() + [Interval("chr1", "-", 300, 350, "hg38"), + Interval("chr1", "-", 150, 200, "hg38")] + +lift_interval +~~~~~~~~~~~~~ + +``lift_interval`` is the inverse: it takes a genomic +:py:class:`~genome_kit.Interval` and returns a new DIS whose segment is +the intersection of that interval with this DIS's segment, expressed +in the DIS coordinate space. The input must share the same chromosome, reference +genome, and effective strand as the DIS. + +The lift is well-defined even when the input straddles gaps between coord +intervals: positions inside a gap collapse to the boundary index of the +adjacent coord interval, and positions outside the coord space extrapolate +linearly. The result is then clipped against the DIS's segment, so a +genomic interval that overlaps coord space but falls entirely outside the +segment returns ``None``:: + + >>> dis = DisjointIntervalSequence( + ... [Interval("chr1", "+", 100, 200, "hg38"), + ... Interval("chr1", "+", 300, 400, "hg38")], + ... start=0, end=200, + ... ) + >>> lifted = dis.lift_interval(Interval("chr1", "+", 320, 360, "hg38")) + >>> lifted.start, lifted.end + (120, 160) + + >>> # Falls in a coord-interval gap with no overlap of any coord interval + >>> dis.lift_interval(Interval("chr1", "+", 250, 260, "hg38")) is None + True + +Extracting DNA +============== + +``dna`` returns the spliced DNA sequence corresponding to the segment as +a single string in 5'→3' order. Internally, the segment is decomposed +via ``lower`` into one or more genomic intervals, each is read from the +reference, and the pieces are concatenated. This returns the spliced sequence +of the transcript: the gaps between coord intervals are dropped so that introns +(or other intervening regions) never appear in the output. + +The returned string already accounts for strand: if the segment is on the +opposite strand, the segment's bases are returned, and the bases are +ordered 5'→3' along the segment, not along the genome:: + + >>> dis = DisjointIntervalSequence.from_transcript(transcript) + >>> dis.dna() + 'ACGTGGTTTCA' # full spliced sequence, 5'→3' + + >>> opp = dis.as_opposite_strand() + >>> opp.dna() # reverse complement, 5'→3' along the opposite strand + 'TGAAACCACGT' + +When the segment extends past the coord space — e.g. after ``shift`` or +``expand`` pushed the indices outside ``[0, coordinate_length)`` — those +out-of-coord positions are returned as ``N`` by default. This is in +contrast to ``expand_coord``, which extends the coord space itself so +that new positions are backed by real reference DNA. Set +``allow_outside_coord=False`` to opt out of N-padding and raise an error instead. + +For convenience, :py:meth:`Genome.dna ` accepts a +DIS directly and dispatches to ``DisjointIntervalSequence.dna``, so +either of the following is equivalent:: + + >>> dis.dna() + >>> genome.dna(dis) diff --git a/genome_kit/__init__.py b/genome_kit/__init__.py index c56b3d3..5b95238 100644 --- a/genome_kit/__init__.py +++ b/genome_kit/__init__.py @@ -71,6 +71,7 @@ "Cds", "CdsTable", "DataManager", + "DisjointIntervalSequence", "Exon", "ExonTable", "Gene", diff --git a/genome_kit/diseq.py b/genome_kit/diseq.py index 109f6d8..e7b8e91 100644 --- a/genome_kit/diseq.py +++ b/genome_kit/diseq.py @@ -1,27 +1,10 @@ from copy import deepcopy -import enum from dataclasses import dataclass from typing import Sequence, Literal from .interval import Interval from .genome_annotation import Transcript - - -class IndexDirection(enum.Enum): - """Controls how indices are assigned to the 5' and 3' ends of a coordinate space. - Changing this value will result in undefined behaviour for - :py:class:`DisjointIntervalSequence` objects created under a different convention. - - ``TRANSCRIPT_FIVE_TO_THREE`` - Index 0 is always at the coordinate transcript's 5' end, regardless of genomic strand. - - ``POSITIVE_STRAND_LEFT_TO_RIGHT`` - Index 0 is always at the leftmost genomic position relative to the positive strand. - On the negative strand, this means the 3' end is at index 0. - """ - - TRANSCRIPT_FIVE_TO_THREE = "transcript_five_to_three" - POSITIVE_STRAND_LEFT_TO_RIGHT = "positive_strand_left_to_right" +from .genome import Genome @dataclass(frozen=True) @@ -45,9 +28,8 @@ class DisjointIntervalSequence: - A **coordinate space** defined by a sequence of non-overlapping genomic :py:class:`~genome_kit.Interval` objects (e.g. the exons of a transcript), - which are flattened into a contiguous 0-based index space. Indices for the - coordinate space are assigned according to the current :py:class:`IndexDirection` - value. + which are flattened into a contiguous 0-based index space. Index 0 is + always at the coordinate space's 5' end, regardless of genomic strand. - A **segment** within that coordinate space, defined by a 5' and 3' index. The segment may lie on the same, or opposite, strand as the coordinate space. @@ -55,29 +37,6 @@ class DisjointIntervalSequence: instances rather than calling the constructor directly. """ - _index_direction: IndexDirection = IndexDirection.TRANSCRIPT_FIVE_TO_THREE - - @classmethod - def set_index_direction(cls, direction: IndexDirection) -> None: - """Set the index direction convention for all DIS instances. - - Parameters - ---------- - direction : :py:class:`IndexDirection` - The index direction convention to use. - """ - cls._index_direction = direction - - @classmethod - def get_index_direction(cls) -> IndexDirection: - """Get the current index direction convention. - - Returns - ------- - :py:class:`IndexDirection` - """ - return cls._index_direction - def __init__( self, coordinate_intervals: Sequence[Interval], @@ -319,24 +278,6 @@ def strand(self) -> Literal["+", "-"]: return "-" return "+" - @property - def coordinate_end5_index(self) -> int: - """5' index of the coordinate space.""" - if self._index_direction == IndexDirection.TRANSCRIPT_FIVE_TO_THREE: - return 0 - if self.coord_strand == "+": - return 0 - return self.coordinate_length - - @property - def coordinate_end3_index(self) -> int: - """3' index of the coordinate space.""" - if self._index_direction == IndexDirection.TRANSCRIPT_FIVE_TO_THREE: - return self.coordinate_length - if self.coord_strand == "+": - return self.coordinate_length - return 0 - @property def end5_index(self) -> int: """5' index of the segment.""" @@ -390,12 +331,12 @@ def end3(self) -> "DisjointIntervalSequence": @property def coord_end5(self) -> "DisjointIntervalSequence": """0-length DIS at the coordinate space's 5' end.""" - return self._at_index(self.coordinate_end5_index, on_coordinate_strand=True) + return self._at_index(0, on_coordinate_strand=True) @property def coord_end3(self) -> "DisjointIntervalSequence": """0-length DIS at the coordinate space's 3' end.""" - return self._at_index(self.coordinate_end3_index, on_coordinate_strand=True) + return self._at_index(self.coordinate_length, on_coordinate_strand=True) @property def coordinate_intervals(self) -> tuple[Interval, ...]: @@ -413,6 +354,71 @@ def length(self) -> int: """Length of the segment on the coordinate space.""" return self.end - self.start + def _lower_coord(self, coord: int) -> list[int]: + """Convert a DIS coordinate index to the genomic coordinate(s). + + Returns a 1-element list for indices interior to a single coord + interval, at the outer 5' (``coord == 0``) and 3' + (``coord == coordinate_length``) edges of the DIS, and for any + extrapolated index (``coord < 0`` or ``coord > coordinate_length``). + Extrapolation is linear from the nearest interval boundary and may + yield negative values or values exceeding the chromosome length. + + Returns a 2-element list ``[iv_upstream_coord, iv_downstream_coord]`` sorted + 5' -> 3' when ``coord`` falls exactly on an internal boundary between two + adjacent coord intervals — where ``iv_upstream_coord`` is the upstream-most + coordinate boundary, and ``iv_downstream_coord`` is the downstream-most + coordinate boundary (regardless of strand). Touching intervals produce + two equal values. + + Parameters + ---------- + coord : :py:class:`int` + Index in the DIS coordinate space. + + Returns + ------- + :py:class:`list[int]` + Either ``[pos]`` or ``[iv_upstream.end, iv_downstream.start]``. + """ + ivs = self._coordinate_intervals + on_plus = self.coord_strand == "+" + coord_len = self.coordinate_length + + # Upstream of first interval — extrapolate (no upstream neighbor) + if coord <= 0: + if on_plus: + return [ivs[0].start - abs(coord)] + return [ivs[0].end + abs(coord)] + + # Downstream of last interval — extrapolate (no downstream neighbor) + if coord >= coord_len: + overshoot = coord - coord_len + if on_plus: + return [ivs[-1].end + overshoot] + return [ivs[-1].start - overshoot] + + # Walk intervals, consuming coord with a decrementing delta + delta = coord + interval_index = -1 + while delta >= 0: + interval_index += 1 + delta -= len(ivs[interval_index]) + + iv = ivs[interval_index] + if delta == -len(iv): + # coord landed on the cumulative end of ivs[interval_index - 1], + # i.e. the internal boundary between two coord intervals. + upstream = ivs[interval_index - 1] + if on_plus: + return [upstream.end, iv.start] # indices ordered 5' -> 3' + else: + return [upstream.start, iv.end] # indices ordered 5' -> 3' + + if on_plus: + return [iv.end - abs(delta)] + return [iv.start + abs(delta)] + def _upstream_index_step(self, on_coordinate_strand: bool | None = None) -> int: """Return +1 or -1 indicating the upstream direction in index space. @@ -422,10 +428,7 @@ def _upstream_index_step(self, on_coordinate_strand: bool | None = None) -> int: """ if on_coordinate_strand is None: on_coordinate_strand = self.on_coordinate_strand - if self._index_direction == IndexDirection.TRANSCRIPT_FIVE_TO_THREE: - return -1 if on_coordinate_strand else 1 - # POSITIVE_STRAND_LEFT_TO_RIGHT: effective strand determines direction - return -1 if self.strand == "+" else 1 + return -1 if on_coordinate_strand else 1 def _validate_same_coordinate_space( self, other: "DisjointIntervalSequence" @@ -499,6 +502,62 @@ def expand( ) return self._from_end_indices(new_end5, new_end3) + def expand_coord( + self, upstream: int, dnstream: int | None = None + ) -> "DisjointIntervalSequence": + """Expand the coordinate space and segment at its 5' and/or 3' ends. + + The outer 5' edge of the first coord interval is extended ``upstream`` + bases and the outer 3' edge of the last coord interval is extended + ``dnstream`` bases. The segment is expanded an equal amount in the + upstream/downstream direction as the coordinate intervals. + + Args: + upstream: Bases to add at the coordinate space's 5' end. + dnstream: Bases to add at the coordinate space's 3' end. + Defaults to upstream (symmetric). + + Raises: + ValueError: If either argument is negative. + """ + if dnstream is None: + dnstream = upstream + if upstream < 0 or dnstream < 0: + raise ValueError( + "expand_coord requires non-negative values; " + f"got upstream={upstream}, dnstream={dnstream}" + ) + + coord_ivs = list(self._coordinate_intervals) + iv0, ivn = coord_ivs[0], coord_ivs[-1] + chrom = iv0.chromosome + refg = iv0.reference_genome + if self.coord_strand == "+": + if len(coord_ivs) == 1: + coord_ivs = [Interval(chrom, "+", iv0.start - upstream, iv0.end + dnstream, refg)] + else: + coord_ivs[0] = Interval(chrom, "+", iv0.start - upstream, iv0.end, refg) + coord_ivs[-1] = Interval(chrom, "+", ivn.start, ivn.end + dnstream, refg) + else: + if len(coord_ivs) == 1: + coord_ivs = [Interval(chrom, "-", iv0.start - dnstream, iv0.end + upstream, refg)] + else: + coord_ivs[0] = Interval(chrom, "-", iv0.start, iv0.end + upstream, refg) + coord_ivs[-1] = Interval(chrom, "-", ivn.start - dnstream, ivn.end, refg) + + # The interval's lower index stays at self._start in the new coord space due + # to re-indexing of the coord space implicitly 'expanding' start upstream. + # end index is adjusted to account for extra bases introduced by upstream and + # downstream expansion + return DisjointIntervalSequence( + coord_ivs, + coord_name=self._coord_metadata.name, + segment_name=self._segment_metadata.name, + on_coordinate_strand=self.on_coordinate_strand, + start=self._start, + end=self._end + upstream + dnstream, + ) + def upstream_of(self, other: "DisjointIntervalSequence") -> bool: """True if self is strictly upstream of other (no overlap). @@ -647,6 +706,290 @@ def flip_strand(self) -> "DisjointIntervalSequence": end=self._end, ) + def lower(self) -> list[Interval]: + """Project the interval back to genomic :py:class:`~genome_kit.Interval` objects. + + Returns the genomic representation of the DIS interval as one or more + Intervals, in 5'->3' order. Multiple intervals are returned when the + DIS interval spans boundaries between coordinate intervals. All + returned Intervals use the effective strand of the DIS interval. + + Indices outside the coordinate space are extrapolated linearly from + the nearest boundary, so returned Intervals may extend beyond the + chromosome (their start may be negative or their end may exceed the + chromosome length). + + Returns + ------- + list[:py:class:`~genome_kit.Interval`] + """ + chrom = self.chromosome + strand = self.strand + refg = self.reference_genome + coord_strand = self.coord_strand + on_plus = coord_strand == "+" + start, end = self._start, self._end + + if start == end: + positions = self._lower_coord(start) + # On an internal boundary _lower_coord returns two flanking values; + # collapse to a single 0-length Interval corresponding to the value of + # the index that is an Interval.start (we do this because Interval.end + # is exclusive). + pos = positions[0] if len(positions) == 1 else max(positions) + return [Interval(chrom, strand, pos, pos, refg)] + + # _lower_coord returns 1 value interior/edge/extrapolated, or 2 values + # in 5'->3' order at an internal boundary. Cumulative len(_lower_coord(start)) + + # len(_lower_coord(end)) is 2 (neither on boundary), 3 (one on boundary), or 4 + # (both on boundary). In all cases, the start_pos should be the downstream-most + # value and the end_pos should be the upstream-most value. + start_positions = self._lower_coord(start) + end_positions = self._lower_coord(end) + start_pos = start_positions[-1] # Since sorted 5' -> 3', -1 is downstream + end_pos = end_positions[0] # 0 is the upstream value + + start_iv = Interval(chrom, coord_strand, start_pos, start_pos, refg) + end_iv = Interval(chrom, coord_strand, end_pos, end_pos, refg) + + coord_ivs = self._coordinate_intervals + last_interval_index = len(coord_ivs) - 1 + + first_idx = 0 + while first_idx < last_interval_index and start_iv.dnstream_of(coord_ivs[first_idx]): + first_idx += 1 + last_idx = last_interval_index + while last_idx > 0 and end_iv.upstream_of(coord_ivs[last_idx]): + last_idx -= 1 + + result: list[Interval] = [] + for i in range(first_idx, last_idx + 1): + iv = coord_ivs[i] + is_first = i == first_idx + is_last = i == last_idx + if on_plus: + genomic_start = start_pos if is_first else iv.start + genomic_end = end_pos if is_last else iv.end + else: + # swap start and end positions since going from coordinate-space that + # runs 5' -> 3' (DIS) to genomic space (3' -> 5' on - strand) + genomic_start = end_pos if is_last else iv.start + genomic_end = start_pos if is_first else iv.end + result.append(Interval(chrom, strand, genomic_start, genomic_end, refg)) + + # The loop emits intervals in coord 5'->3' order. When the segment is on + # the opposite strand, that is the reverse of segment 5'->3'. + if not self.on_coordinate_strand: + result.reverse() + + return result + + def genomic_span(self) -> Interval: + """Return a single :py:class:`~genome_kit.Interval` spanning the + segment's genomic extent (from its 5'-most to 3'-most positions), + ignoring gaps between coordinate intervals. + + Returns + ------- + :py:class:`~genome_kit.Interval` + """ + ivs = self.lower() + return Interval( + ivs[0].chromosome, + ivs[0].strand, + min(iv.start for iv in ivs), + max(iv.end for iv in ivs), + ivs[0].reference_genome, + ) + + def _lift_position(self, pos: int) -> int: + """Map a genomic position to a DIS index in this coordinate space. + + Positions outside the coord intervals are linearly extrapolated from + the nearest outer edge. Positions in a gap between coord intervals + are clipped to the cumulative end of the previous interval (i.e. the + boundary index). + """ + ivs = self._coordinate_intervals + coord_len = self.coordinate_length + if self.coord_strand == "+": + if pos < ivs[0].start: + return pos - ivs[0].start + if pos > ivs[-1].end: + return coord_len + (pos - ivs[-1].end) + cumulative = 0 + for iv in ivs: + if iv.start <= pos <= iv.end: + return cumulative + (pos - iv.start) + if pos < iv.start: + return cumulative + cumulative += len(iv) + assert False, "Position not found in any interval" + # minus + if pos > ivs[0].end: + return ivs[0].end - pos + if pos < ivs[-1].start: + return coord_len + (ivs[-1].start - pos) + cumulative = 0 + for iv in ivs: + if iv.start <= pos <= iv.end: + return cumulative + (iv.end - pos) + if pos > iv.end: + return cumulative + cumulative += len(iv) + assert False, "Position not found in any interval" + + def lift_interval( + self, other: Interval + ) -> "DisjointIntervalSequence | None": + """Lift a genomic :py:class:`~genome_kit.Interval` onto this DIS's segment. + + The interval's genomic span is mapped into DIS coordinate-space indices + and intersected with this DIS's segment. The returned DIS represents + that intersection as a segment in this coordinate space. + + Returns + ------- + :py:class:`DisjointIntervalSequence` or None + None if the lifted interval does not overlap this DIS's segment. + + Raises + ------ + ValueError + If ``other`` is not on the same chromosome, reference genome, and strand. + """ + if other.chromosome != self.chromosome: + raise ValueError( + f"Interval chromosome {other.chromosome!r} does not match DIS " + f"chromosome {self.chromosome!r}" + ) + if other.reference_genome != self.reference_genome: + raise ValueError( + f"Interval reference_genome {other.reference_genome!r} does not " + f"match DIS reference_genome {self.reference_genome!r}" + ) + if other.strand != self.strand: + raise ValueError( + f"Interval strand {other.strand!r} does not match DIS " + f"effective strand {self.strand!r}" + ) + + if self.coord_strand == "+": + seg_start = self._lift_position(other.start) + seg_end = self._lift_position(other.end) + else: + # On minus, lower genomic position maps to higher DIS index. + seg_start = self._lift_position(other.end) + seg_end = self._lift_position(other.start) + + # Clip to self's segment via half-open intersection. + intersected_start = max(seg_start, self._start) + intersected_end = min(seg_end, self._end) + if intersected_start >= intersected_end: + return None + + return DisjointIntervalSequence( + self._coordinate_intervals, + coord_name=self._coord_metadata.name, + on_coordinate_strand=(other.strand == self.coord_strand), + start=intersected_start, + end=intersected_end, + ) + + def intersect( + self, other: "DisjointIntervalSequence" + ) -> "DisjointIntervalSequence | None": + """Segment-wise intersection with another DIS in the same coord space. + + Both DIS objects must share the same coordinate intervals and the same + ``on_coordinate_strand``. 0-length intersections return None. + + Returns + ------- + :py:class:`DisjointIntervalSequence` or None + The intersection segment, or None if the segments do not overlap. + + Raises + ------ + ValueError + If the DIS objects do not share the same coordinate space or + ``on_coordinate_strand`` differs. + """ + self._validate_same_coordinate_space(other) + if self.on_coordinate_strand != other.on_coordinate_strand: + raise ValueError( + f"Cannot compare: self is on " + f"{'same' if self.on_coordinate_strand else 'opposite'} " + f"strand but other is on " + f"{'same' if other.on_coordinate_strand else 'opposite'} strand" + ) + + intersected_start = max(self._start, other._start) + intersected_end = min(self._end, other._end) + if intersected_start >= intersected_end: + return None + + return DisjointIntervalSequence( + self._coordinate_intervals, + coord_name=self._coord_metadata.name, + on_coordinate_strand=self.on_coordinate_strand, + start=intersected_start, + end=intersected_end, + ) + + def dna(self, allow_outside_coord: bool = True) -> str: + """Return the DNA sequence corresponding to the segment. + + Bases outside the coordinate intervals (segment indices ``< 0`` or + ``>= coordinate_length``) are returned as ``N`` when + ``allow_outside_coord`` is True; otherwise a ValueError is raised. + Returns an empty string for a 0-length segment. DNA is returned in + 5' -> 3' order. + + Args: + allow_outside_coord: When True (default), pad out-of-coord regions + with ``N``. When False, raise if the segment extends past the + coord intervals. + + Returns + ------- + :py:class:`str` + """ + if self._start == self._end: + return "" + + coord_len = self.coordinate_length + if self._upstream_index_step() == -1: + upstream_pad = 0 if self._start >= 0 else abs(self._start) + downstream_pad = max(self._end - coord_len, 0) + else: + upstream_pad = max(self._end - coord_len, 0) + downstream_pad = 0 if self._start >= 0 else abs(self._start) + if (upstream_pad or downstream_pad) and not allow_outside_coord: + raise ValueError( + f"DIS segment [{self._start}, {self._end}) extends outside " + f"coord range [0, {coord_len})" + ) + + # Clip start to being at least start of coord space + clipped_start = max(self._start, 0) + # Clip end to being at most end of coord space + clipped_end = min(self._end, coord_len) + # If the clipped segment has a positive length, get the DNA + if clipped_start < clipped_end: + clipped = DisjointIntervalSequence( + self._coordinate_intervals, + on_coordinate_strand=self.on_coordinate_strand, + start=clipped_start, + end=clipped_end, + ) + genome = Genome(self.reference_genome) + clipped_dna = "".join(genome.dna(iv) for iv in clipped.lower()) + else: + clipped_dna = "" + + return "N" * upstream_pad + clipped_dna + "N" * downstream_pad + def __len__(self) -> int: """Return the length of the segment.""" return self.length diff --git a/tests/test_diseq.py b/tests/test_diseq.py index 5d5582a..05dcdc8 100644 --- a/tests/test_diseq.py +++ b/tests/test_diseq.py @@ -956,6 +956,219 @@ def test_expand_negative_strand_coords_opposite(self): self.assertEqual(expanded.end, 155) +class TestExpandCoord(unittest.TestCase): + + def test_expand_coord_symmetric(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(5) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "+", 95, 200), ("chr1", "+", 300, 405)])), + ) + self.assertEqual(expanded.start, 30) + self.assertEqual(expanded.end, 160) + + def test_expand_coord_asymmetric(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(5, 10) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "+", 95, 200), ("chr1", "+", 300, 410)])), + ) + self.assertEqual(expanded.start, 30) + self.assertEqual(expanded.end, 165) + + def test_expand_coord_upstream_only(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(5, 0) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "+", 95, 200), ("chr1", "+", 300, 400)])), + ) + self.assertEqual(expanded.start, 30) + self.assertEqual(expanded.end, 155) + + def test_expand_coord_downstream_only(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(0, 10) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 410)])), + ) + self.assertEqual(expanded.start, 30) + self.assertEqual(expanded.end, 160) + + def test_expand_coord_zero(self): + dis = _dis(start=30, end=150) + expanded = dis.expand_coord(0) + self.assertEqual(expanded, dis) + + def test_expand_coord_negative_raises(self): + dis = _dis(start=30, end=150) + with self.assertRaises(ValueError): + dis.expand_coord(-1) + with self.assertRaises(ValueError): + dis.expand_coord(5, -1) + with self.assertRaises(ValueError): + dis.expand_coord(-1, 5) + + def test_expand_coord_negative_strand(self): + # Sorted 5'->3': [(300,400),(100,200)]. + # Upstream extends ivs[0]'s 5' by 5; dnstream extends ivs[-1]'s 3' by 10. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(5, 10) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "-", 300, 405), ("chr1", "-", 90, 200)])), + ) + self.assertEqual(expanded.start, 30) + self.assertEqual(expanded.end, 165) + + def test_expand_coord_single_interval_plus(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=20, end=30) + expanded = dis.expand_coord(5, 3) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "+", 95, 203)])), + ) + self.assertEqual(expanded.start, 20) + self.assertEqual(expanded.end, 38) + + def test_expand_coord_single_interval_minus(self): + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=20, end=30) + expanded = dis.expand_coord(5, 3) + # Minus single interval: 5' is .end (extend +U), 3' is .start (extend -D). + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([("chr1", "-", 97, 205)])), + ) + self.assertEqual(expanded.start, 20) + self.assertEqual(expanded.end, 38) + + def test_expand_coord_lower_matches_genomic_expansion_plus(self): + # Original segment [30, 150) lowers to [(130,200),(300,350)]. + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(5, 10) + self.assertEqual( + expanded.lower(), + [ + Interval("chr1", "+", 125, 200, REFG), + Interval("chr1", "+", 300, 360, REFG), + ], + ) + + def test_expand_coord_lower_matches_genomic_expansion_minus(self): + # Original segment [30, 150) lowers to [(300,370),(150,200)] on minus. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=30, end=150) + expanded = dis.expand_coord(5, 10) + self.assertEqual( + expanded.lower(), + [ + Interval("chr1", "-", 300, 375, REFG), + Interval("chr1", "-", 140, 200, REFG), + ], + ) + + def test_expand_coord_preserves_metadata(self): + dis = _dis(start=30, end=150, coord_name="c", segment_name="i") + expanded = dis.expand_coord(5, 10) + self.assertEqual(expanded.coord_name, "c") + self.assertEqual(expanded.name, "i") + self.assertTrue(expanded.on_coordinate_strand) + + def test_expand_coord_preserves_metadata_opposite_strand(self): + dis = _dis( + start=30, end=150, coord_name="c", segment_name="i", + on_coordinate_strand=False, + ) + expanded = dis.expand_coord(5, 10) + self.assertEqual(expanded.coord_name, "c") + self.assertEqual(expanded.name, "i") + self.assertFalse(expanded.on_coordinate_strand) + + def test_expand_coord_three_intervals_plus(self): + # Only the outer edges of the first and last coord intervals change; + # the middle interval is unchanged. + ivs = _make_intervals( + [("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)] + ) + dis = DisjointIntervalSequence(ivs, start=50, end=250) + expanded = dis.expand_coord(5, 10) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([ + ("chr1", "+", 95, 200), + ("chr1", "+", 300, 400), + ("chr1", "+", 500, 610), + ])), + ) + self.assertEqual(expanded.start, 50) + self.assertEqual(expanded.end, 265) + # Genomic span grows by 5 on coord-5' side and 10 on coord-3' side. + self.assertEqual( + expanded.lower(), + [ + Interval("chr1", "+", 145, 200, REFG), + Interval("chr1", "+", 300, 400, REFG), + Interval("chr1", "+", 500, 560, REFG), + ], + ) + + def test_expand_coord_three_intervals_minus(self): + # Sorted 5'->3': [(500,600), (300,400), (100,200)]. + # Upstream extends ivs[0]'s 5' edge (genomic right of 5'-most interval) by 5; + # dnstream extends ivs[-1]'s 3' edge (genomic left of 3'-most interval) by 10. + ivs = _make_intervals( + [("chr1", "-", 100, 200), ("chr1", "-", 300, 400), ("chr1", "-", 500, 600)] + ) + dis = DisjointIntervalSequence(ivs, start=50, end=250) + expanded = dis.expand_coord(5, 10) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([ + ("chr1", "-", 500, 605), + ("chr1", "-", 300, 400), + ("chr1", "-", 90, 200), + ])), + ) + self.assertEqual(expanded.start, 50) + self.assertEqual(expanded.end, 265) + self.assertEqual( + expanded.lower(), + [ + Interval("chr1", "-", 500, 555, REFG), + Interval("chr1", "-", 300, 400, REFG), + Interval("chr1", "-", 140, 200, REFG), + ], + ) + + def test_expand_coord_three_intervals_upstream_only(self): + ivs = _make_intervals( + [("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)] + ) + dis = DisjointIntervalSequence(ivs, start=50, end=250) + expanded = dis.expand_coord(5, 0) + self.assertEqual( + expanded.coordinate_intervals, + tuple(_make_intervals([ + ("chr1", "+", 95, 200), + ("chr1", "+", 300, 400), + ("chr1", "+", 500, 600), + ])), + ) + self.assertEqual(expanded.start, 50) + self.assertEqual(expanded.end, 255) + + class TestUpstreamOf(unittest.TestCase): def test_upstream_of_true(self): @@ -1162,5 +1375,884 @@ def test_within_negative_strand_coords_false(self): self.assertFalse(a.within(b)) +class TestLowerToCoordinate(unittest.TestCase): + """Tests for _lower_coord: DIS index → list of flanking genomic coordinate(s).""" + + def test_plus_strand_within_first_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(0), [100]) + self.assertEqual(dis._lower_coord(50), [150]) + self.assertEqual(dis._lower_coord(99), [199]) + + def test_plus_strand_within_second_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(150), [350]) + self.assertEqual(dis._lower_coord(199), [399]) + + def test_plus_strand_within_third_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(250), [550]) + self.assertEqual(dis._lower_coord(299), [599]) + self.assertEqual(dis._lower_coord(300), [600]) # outer 3' edge, length-1 + + def test_plus_strand_beyond_end(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(210), [410]) + self.assertEqual(dis._lower_coord(250), [450]) + + def test_plus_strand_before_start(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(-1), [99]) + self.assertEqual(dis._lower_coord(-10), [90]) + + def test_minus_strand_within_first_interval(self): + # Sorted 5'->3': [500,600) then [300,400) then [100,200) + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400), ("chr1", "-", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(0), [600]) + self.assertEqual(dis._lower_coord(50), [550]) + self.assertEqual(dis._lower_coord(99), [501]) + + def test_minus_strand_within_second_interval(self): + # Sorted 5'->3': [500,600) then [300,400) then [100,200) + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400), ("chr1", "-", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(150), [350]) + self.assertEqual(dis._lower_coord(199), [301]) + + def test_minus_strand_within_third_interval(self): + # Sorted 5'->3': [500,600) then [300,400) then [100,200) + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400), ("chr1", "-", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(250), [150]) + self.assertEqual(dis._lower_coord(299), [101]) + self.assertEqual(dis._lower_coord(300), [100]) # outer 3' edge, length-1 + + def test_minus_strand_beyond_end(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(210), [90]) + self.assertEqual(dis._lower_coord(250), [50]) + + def test_minus_strand_before_start(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(-1), [401]) + self.assertEqual(dis._lower_coord(-10), [410]) + + def test_negative_genomic_coord(self): + # Interval starts at genomic 5; going 6 positions upstream yields -1 + ivs = _make_intervals([("chr1", "+", 5, 15)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(-6), [-1]) + self.assertEqual(dis._lower_coord(-10), [-5]) + + def test_single_interval(self): + ivs = _make_intervals([("chr1", "+", 50, 150)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(0), [50]) + self.assertEqual(dis._lower_coord(99), [149]) + self.assertEqual(dis._lower_coord(100), [150]) + self.assertEqual(dis._lower_coord(-1), [49]) + self.assertEqual(dis._lower_coord(101), [151]) + + def test_boundary_plus_with_gap(self): + ivs = _make_intervals([("chr1", "+", 100, 110), ("chr1", "+", 120, 130)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(9), [109]) # last of first + self.assertEqual(dis._lower_coord(10), [110, 120]) # boundary: [upstream.end, downstream.start] + self.assertEqual(dis._lower_coord(11), [121]) # first interior index of second + + def test_boundary_plus_touching(self): + # Adjacent intervals with no genomic gap: both boundary values coincide + ivs = _make_intervals([("chr1", "+", 100, 110), ("chr1", "+", 110, 120)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(10), [110, 110]) + + def test_boundary_minus_with_gap(self): + # Minus strand sorted 5'->3' = [(120,130), (100,110)]. Boundary at coord=10. + ivs = _make_intervals([("chr1", "-", 100, 110), ("chr1", "-", 120, 130)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(10), [120, 110]) # 5'->3' order: [upstream.start, downstream.end] + + def test_boundary_minus_touching(self): + # Minus strand adjacent intervals with no genomic gap: both boundary values coincide. + # Sorted 5'->3': [(110,120), (100,110)]. Boundary at coord=10. + ivs = _make_intervals([("chr1", "-", 100, 110), ("chr1", "-", 110, 120)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(10), [110, 110]) + + def test_boundary_plus_three_intervals(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis._lower_coord(100), [200, 300]) + self.assertEqual(dis._lower_coord(200), [400, 500]) + + def test_boundary_minus_three_intervals(self): + # Same inputs, minus strand. Sorted 5'->3' internally: [(500,600),(300,400),(100,200)]. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400), ("chr1", "-", 500, 600)]) + dis = DisjointIntervalSequence(ivs) + # coord=100: between ivs[0]=(500,600) and ivs[1]=(300,400). iv_L=(300,400), iv_R=(500,600). + self.assertEqual(dis._lower_coord(100), [500, 400]) + # coord=200: between ivs[1]=(300,400) and ivs[2]=(100,200). iv_L=(100,200), iv_R=(300,400). + self.assertEqual(dis._lower_coord(200), [300, 200]) + + +class TestLower(unittest.TestCase): + """Tests for lower(): project DIS interval to list of genomic Intervals.""" + + # ---- Plus strand ---- + + def test_plus_full_span_single_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 100, 200, REFG)]) + + def test_plus_within_single_coord_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=20, end=60) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 120, 160, REFG)]) + + def test_plus_within_second_coord_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=120, end=180) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 320, 380, REFG)]) + + def test_plus_spans_two_coord_intervals(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "+", 150, 200, REFG), + Interval("chr1", "+", 300, 350, REFG), + ], + ) + + def test_plus_spans_three_coord_intervals_middle_used_asis(self): + ivs = _make_intervals( + [("chr1", "+", 100, 200), ("chr1", "+", 300, 400), ("chr1", "+", 500, 600)] + ) + dis = DisjointIntervalSequence(ivs, start=50, end=250) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "+", 150, 200, REFG), + Interval("chr1", "+", 300, 400, REFG), + Interval("chr1", "+", 500, 550, REFG), + ], + ) + + def test_plus_at_coord_interval_boundary(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 100, 200, REFG)]) + dis2 = DisjointIntervalSequence(ivs, start=100, end=200) + self.assertEqual(dis2.lower(), [Interval("chr1", "+", 300, 400, REFG)]) + + def test_plus_start_at_boundary_end_in_next(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=100, end=150) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 300, 350, REFG)]) + + def test_plus_extrapolate_upstream(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=50) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 95, 150, REFG)]) + + def test_plus_extrapolate_downstream(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=150, end=205) + # DIS [150, 200) within ivs[1]=[300,400): genomic [350, 400). + # DIS [200, 205) extrapolates 5 past 3' end: genomic [400, 405). + # Both fall on ivs[1] (extended), so a single Interval [350, 405). + self.assertEqual(dis.lower(), [Interval("chr1", "+", 350, 405, REFG)]) + + def test_plus_extrapolate_both_ends(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-10, end=110) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 90, 210, REFG)]) + + def test_plus_fully_extrapolated_upstream(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-20, end=-10) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 80, 90, REFG)]) + + def test_plus_fully_extrapolated_downstream(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=110, end=120) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 210, 220, REFG)]) + + def test_plus_zero_length(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=50) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 150, 150, REFG)]) + + def test_plus_zero_length_at_interval_boundary(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=100, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 300, 300, REFG)]) + + def test_plus_zero_length_at_touching_interval_boundary(self): + # Adjacent intervals with no genomic gap: boundary collapses to a single position. + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=100, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 200, 200, REFG)]) + + def test_plus_start_at_touching_boundary(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=100, end=150) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 200, 250, REFG)]) + + def test_plus_end_at_touching_boundary(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=50, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 150, 200, REFG)]) + + def test_plus_spans_touching_boundary(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "+", 150, 200, REFG), + Interval("chr1", "+", 200, 250, REFG), + ], + ) + + # ---- Minus strand ---- + + def test_minus_full_span_single_interval(self): + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 100, 200, REFG)]) + + def test_minus_within_single_coord_interval(self): + # Sorted 5'->3': [300,400) then [100,200). Coord_length=200. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=20, end=60) + # DIS [20, 60) on minus is within first (ivs[0]=[300,400)) + # DIS 20 = pos 379 (5'-most of range), DIS 59 = pos 340 (3'-most). + # Genomic [340, 380). + self.assertEqual(dis.lower(), [Interval("chr1", "-", 340, 380, REFG)]) + + def test_minus_spans_two_coord_intervals(self): + # Sorted 5'->3': [300,400) then [100,200). + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + # DIS [50, 100) = ivs[0] portion 5'-side: genomic [300, 350). + # DIS [100, 150) = ivs[1] portion 5'-side: genomic [150, 200). + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "-", 300, 350, REFG), + Interval("chr1", "-", 150, 200, REFG), + ], + ) + + def test_minus_spans_three_coord_intervals_middle_used_asis(self): + # Sorted 5'->3': [500,600), [300,400), [100,200). Coord_length=300. + ivs = _make_intervals( + [("chr1", "-", 100, 200), ("chr1", "-", 300, 400), ("chr1", "-", 500, 600)] + ) + dis = DisjointIntervalSequence(ivs, start=50, end=250) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "-", 500, 550, REFG), + Interval("chr1", "-", 300, 400, REFG), + Interval("chr1", "-", 150, 200, REFG), + ], + ) + + def test_minus_at_coord_interval_boundary(self): + # Sorted 5'->3': [300,400) then [100,200). + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 300, 400, REFG)]) + dis2 = DisjointIntervalSequence(ivs, start=100, end=200) + self.assertEqual(dis2.lower(), [Interval("chr1", "-", 100, 200, REFG)]) + + def test_minus_extrapolate_upstream(self): + # 5' end on minus is higher genomic. Start<0 extrapolates past iv[0].end. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=50) + # DIS [-5, 0): genomic [400, 405) (past the 5' end, higher genomic). + # DIS [0, 50): genomic [350, 400). + # Combined into first interval: genomic [350, 405). + self.assertEqual(dis.lower(), [Interval("chr1", "-", 350, 405, REFG)]) + + def test_minus_extrapolate_downstream(self): + # 3' end on minus is lower genomic. End > coord_length extrapolates past iv[-1].start. + # Sorted 5'->3': ivs[0]=[300,400), ivs[1]=[100,200). coord_length=200. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=150, end=205) + # DIS [150, 200) within ivs[1]=[100,200): genomic [100, 150). + # DIS [200, 205) extrapolates 5 past the 3' end: genomic [95, 100). + # Extends ivs[1]'s 3' boundary, merging into one Interval. + self.assertEqual(dis.lower(), [Interval("chr1", "-", 95, 150, REFG)]) + + def test_minus_fully_extrapolated_upstream(self): + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-20, end=-10) + # Extrapolate 5' upstream on minus = higher genomic. + # DIS -20 -> pos 219; DIS -11 -> pos 210. Genomic [210, 220). + self.assertEqual(dis.lower(), [Interval("chr1", "-", 210, 220, REFG)]) + + def test_minus_fully_extrapolated_downstream(self): + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=110, end=120) + # Extrapolate 3' downstream on minus = lower genomic. + # DIS 110 -> pos 89; DIS 119 -> pos 80. Genomic [80, 90). + self.assertEqual(dis.lower(), [Interval("chr1", "-", 80, 90, REFG)]) + + def test_minus_zero_length(self): + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=50, end=50) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 150, 150, REFG)]) + + def test_minus_zero_length_at_interval_boundary(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=100, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 300, 300, REFG)]) + + def test_minus_zero_length_at_touching_interval_boundary(self): + # Adjacent intervals with no genomic gap on minus: boundary collapses to a single position. + # Sorted 5'->3': [(200,300),(100,200)]. Boundary at coord=100. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=100, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 200, 200, REFG)]) + + def test_minus_start_at_touching_boundary(self): + # Sorted 5'->3': [(200,300),(100,200)]. DIS [100, 150) on minus. + # coord 100 is the boundary; coord 150 is interior to (100,200) at genomic 150. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=100, end=150) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 150, 200, REFG)]) + + def test_minus_end_at_touching_boundary(self): + # Sorted 5'->3': [(200,300),(100,200)]. DIS [50, 100) on minus. + # coord 50 interior to (200,300) at genomic 250; coord 100 is the boundary. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=50, end=100) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 200, 250, REFG)]) + + def test_minus_spans_touching_boundary(self): + # Sorted 5'->3': [(200,300),(100,200)]. DIS [50, 150) on minus crosses the boundary. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 200, 300)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "-", 200, 250, REFG), + Interval("chr1", "-", 150, 200, REFG), + ], + ) + + # ---- on_coordinate_strand=False ---- + + def test_off_coord_strand_uses_effective_strand(self): + # Coord strand "+", effective strand "-". + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=20, end=60) + self.assertEqual(dis.lower(), [Interval("chr1", "-", 120, 160, REFG)]) + + def test_off_coord_strand_minus_coord(self): + # Coord strand "-", effective strand "+". + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=20, end=60) + self.assertEqual(dis.lower(), [Interval("chr1", "+", 140, 180, REFG)]) + + def test_off_coord_strand_plus_coord_multi_interval(self): + # Plus coord, opposite-strand segment (effective minus): segment 5'->3' + # is decreasing genomic, so the higher-genomic coord interval comes first. + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=50, end=150) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "-", 300, 350, REFG), + Interval("chr1", "-", 150, 200, REFG), + ], + ) + + def test_off_coord_strand_minus_coord_multi_interval(self): + # Minus coord, opposite-strand segment (effective plus): segment 5'->3' + # is increasing genomic. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=50, end=150) + self.assertEqual( + dis.lower(), + [ + Interval("chr1", "+", 150, 200, REFG), + Interval("chr1", "+", 300, 350, REFG), + ], + ) + + +class TestGenomicSpan(unittest.TestCase): + + def test_plus_within_single_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=20, end=60) + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 120, 160, REFG)) + + def test_plus_spans_two_intervals(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + # Span from 5'-most lowered interval start (150) to 3'-most lowered end (350). + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 150, 350, REFG)) + + def test_minus_spans_two_intervals(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + self.assertEqual(dis.genomic_span(), Interval("chr1", "-", 150, 350, REFG)) + + def test_zero_length(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=50, end=50) + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 150, 150, REFG)) + + def test_off_coord_strand(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=20, end=60) + # Effective strand is minus. + self.assertEqual(dis.genomic_span(), Interval("chr1", "-", 120, 160, REFG)) + + def test_plus_extrapolate_upstream(self): + # Segment starts before the coord 5' edge: extrapolated upstream. + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=150) + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 95, 350, REFG)) + + def test_plus_extrapolate_downstream(self): + # Segment extends past the coord 3' edge: extrapolated downstream. + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=205) + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 150, 405, REFG)) + + def test_plus_fully_extrapolated_upstream(self): + # Segment lies entirely before the coord 5' edge. + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-20, end=-10) + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 80, 90, REFG)) + + def test_plus_fully_extrapolated_downstream(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=210, end=220) + self.assertEqual(dis.genomic_span(), Interval("chr1", "+", 410, 420, REFG)) + + def test_minus_extrapolate_upstream(self): + # Minus strand: 5' is genomic right; extrapolation upstream goes higher. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=150) + self.assertEqual(dis.genomic_span(), Interval("chr1", "-", 150, 405, REFG)) + + def test_minus_extrapolate_downstream(self): + # Minus strand: 3' is genomic left; extrapolation downstream goes lower. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=205) + self.assertEqual(dis.genomic_span(), Interval("chr1", "-", 95, 350, REFG)) + + +class TestLiftInterval(unittest.TestCase): + + def test_plus_within_segment(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=200) + lifted = dis.lift_interval(Interval("chr1", "+", 320, 360, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, 120) + self.assertEqual(lifted.end, 160) + self.assertTrue(lifted.on_coordinate_strand) + + def test_plus_partial_overlap(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=50) + lifted = dis.lift_interval(Interval("chr1", "+", 130, 170, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, 30) + self.assertEqual(lifted.end, 50) + self.assertTrue(lifted.on_coordinate_strand) + + def test_no_overlap_returns_none(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=50) + lifted = dis.lift_interval(Interval("chr1", "+", 320, 360, REFG)) + self.assertIsNone(lifted) + + def test_lift_in_coord_gap_returns_none(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=200) + lifted = dis.lift_interval(Interval("chr1", "+", 250, 260, REFG)) + self.assertIsNone(lifted) + + def test_minus_coord(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=200) + lifted = dis.lift_interval(Interval("chr1", "-", 130, 170, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, 130) + self.assertEqual(lifted.end, 170) + self.assertTrue(lifted.on_coordinate_strand) + + def test_minus_coord_plus_segment(self): + # Coord strand is "-", segment is on the opposite (+) strand. + # Minus coord 5'->3' = [(300,400), (100,200)], so genomic coord 320 -> DIS 80, + # 360 -> DIS 40. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence( + ivs, start=0, end=200, on_coordinate_strand=False, + ) + lifted = dis.lift_interval(Interval("chr1", "+", 320, 360, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, 40) + self.assertEqual(lifted.end, 80) + self.assertFalse(lifted.on_coordinate_strand) + + def test_lift_opposite_strand_interval_error(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=0, end=100) + with self.assertRaises(ValueError): + dis.lift_interval(Interval("chr1", "-", 120, 160, REFG)) + + def test_chromosome_mismatch_raises(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs) + with self.assertRaises(ValueError): + dis.lift_interval(Interval("chr2", "+", 120, 160, REFG)) + + def test_coord_name_preserved_segment_name_not(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence( + ivs, start=0, end=200, coord_name="cs", segment_name="src", + ) + lifted = dis.lift_interval(Interval("chr1", "+", 320, 360, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.coord_name, "cs") + self.assertIsNone(lifted.name) + + def test_zero_length_interval_returns_none(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=0, end=100) + lifted = dis.lift_interval(Interval("chr1", "+", 150, 150, REFG)) + self.assertIsNone(lifted) + + def test_lift_upstream_of_coord_plus(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-50, end=200) + lifted = dis.lift_interval(Interval("chr1", "+", 70, 90, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, -30) + self.assertEqual(lifted.end, -10) + self.assertTrue(lifted.on_coordinate_strand) + + def test_lift_downstream_of_coord_plus(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=250) + lifted = dis.lift_interval(Interval("chr1", "+", 420, 440, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, 220) + self.assertEqual(lifted.end, 240) + self.assertTrue(lifted.on_coordinate_strand) + + def test_lift_straddles_outer_5p_edge_plus(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-50, end=200) + lifted = dis.lift_interval(Interval("chr1", "+", 80, 130, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, -20) + self.assertEqual(lifted.end, 30) + self.assertTrue(lifted.on_coordinate_strand) + + def test_lift_outside_segment_returns_none(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=200) + lifted = dis.lift_interval(Interval("chr1", "+", 70, 90, REFG)) + self.assertIsNone(lifted) + + def test_lift_upstream_of_coord_minus(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=-50, end=200) + lifted = dis.lift_interval(Interval("chr1", "-", 420, 440, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, -40) + self.assertEqual(lifted.end, -20) + self.assertTrue(lifted.on_coordinate_strand) + + def test_lift_downstream_of_coord_minus(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=0, end=250) + lifted = dis.lift_interval(Interval("chr1", "-", 60, 80, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, 220) + self.assertEqual(lifted.end, 240) + self.assertTrue(lifted.on_coordinate_strand) + + def test_lift_minus_coord_off_strand_spans_full_coord_clipped(self): + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence( + ivs, start=-40, end=240, on_coordinate_strand=False, + ) + lifted = dis.lift_interval(Interval("chr1", "+", 50, 450, REFG)) + self.assertIsNotNone(lifted) + self.assertEqual(lifted.start, -40) + self.assertEqual(lifted.end, 240) + self.assertFalse(lifted.on_coordinate_strand) + + def test_lift_idempotency(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=20, end=180) + lowered = dis.lower() + + iv = Interval("chr1", "+", 320, 360, REFG) + lifted_once = dis.lift_interval(iv) + lifted_twice = lifted_once.lift_interval(iv) + self.assertEqual(lifted_once, lifted_twice) + + +class TestIntersect(unittest.TestCase): + + def test_overlap(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + a = DisjointIntervalSequence(ivs, start=20, end=80) + b = DisjointIntervalSequence(ivs, start=50, end=120) + result = a.intersect(b) + self.assertIsNotNone(result) + self.assertEqual(result.start, 50) + self.assertEqual(result.end, 80) + + def test_no_overlap_returns_none(self): + a = _dis(start=20, end=50) + b = _dis(start=60, end=90) + self.assertIsNone(a.intersect(b)) + + def test_touching_no_overlap(self): + # Strict half-open: a.end == b.start does not overlap. + a = _dis(start=20, end=50) + b = _dis(start=50, end=80) + self.assertIsNone(a.intersect(b)) + + def test_a_within_b(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + a = DisjointIntervalSequence(ivs, start=40, end=60) + b = DisjointIntervalSequence(ivs, start=20, end=100) + result = a.intersect(b) + self.assertIsNotNone(result) + self.assertEqual(result.start, 40) + self.assertEqual(result.end, 60) + + def test_different_coord_space_raises(self): + a = _dis(start=20, end=80) + other_ivs = _make_intervals([("chr1", "+", 1000, 1100)]) + b = DisjointIntervalSequence(other_ivs, start=10, end=50) + with self.assertRaises(ValueError): + a.intersect(b) + + def test_different_on_coord_strand_raises(self): + a = _dis(start=20, end=80, on_coordinate_strand=True) + b = _dis(start=50, end=120, on_coordinate_strand=False) + with self.assertRaises(ValueError): + a.intersect(b) + + def test_segment_name_not_preserved(self): + a = _dis(start=20, end=80, segment_name="a") + b = _dis(start=50, end=120, segment_name="b") + result = a.intersect(b) + self.assertIsNotNone(result) + self.assertIsNone(result.name) + + def test_zero_length_self_returns_none(self): + # 0-length self at an index inside b: half-open intersection is empty. + a = _dis(start=40, end=40) + b = _dis(start=20, end=80) + self.assertIsNone(a.intersect(b)) + + def test_zero_length_other_returns_none(self): + # 0-length other at an index inside self. + a = _dis(start=20, end=80) + b = _dis(start=40, end=40) + self.assertIsNone(a.intersect(b)) + + def test_zero_length_both_returns_none(self): + # Both 0-length at the same index. + a = _dis(start=40, end=40) + b = _dis(start=40, end=40) + self.assertIsNone(a.intersect(b)) + + +class TestDna(unittest.TestCase): + + def setUp(self): + self.genome = MiniGenome("hg19") + + def test_plus_within_single_interval(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=20, end=60) + expected = self.genome.dna(Interval("chr1", "+", 120, 160, REFG)) + self.assertEqual(dis.dna(), expected) + + def test_plus_spans_two_intervals(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=50, end=150) + expected = ( + self.genome.dna(Interval("chr1", "+", 150, 200, REFG)) + + self.genome.dna(Interval("chr1", "+", 300, 350, REFG)) + ) + self.assertEqual(dis.dna(), expected) + + def test_minus_strand(self): + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=20, end=60) + expected = self.genome.dna(Interval("chr1", "-", 140, 180, REFG)) + self.assertEqual(dis.dna(), expected) + + def test_zero_length_returns_empty(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=50, end=50) + self.assertEqual(dis.dna(), "") + + def test_zero_length_at_boundary_returns_empty(self): + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, start=100, end=100) + self.assertEqual(dis.dna(), "") + + def test_outside_coord_upstream_padded(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=50) + expected = "N" * 5 + self.genome.dna(Interval("chr1", "+", 100, 150, REFG)) + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_outside_coord_downstream_padded(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=50, end=110) + expected = self.genome.dna(Interval("chr1", "+", 150, 200, REFG)) + "N" * 10 + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_outside_coord_both_sides_padded(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-3, end=107) + expected = "NNN" + self.genome.dna(Interval("chr1", "+", 100, 200, REFG)) + "NNNNNNN" + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_outside_coord_minus_strand_padding_order(self): + # Coord minus, on coord strand. Segment indices < 0 + # Segment lies upstream of coordinate, pad should be added to segment + # upstream + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=50) + expected = "N" * 5 + self.genome.dna(Interval("chr1", "-", 150, 200, REFG)) + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_outside_coord_minus_strand_downstream_padded(self): + # Coord minus, on coord strand. Segment indices >= coord_length + # Segment lies downstream of coordinate, pad should be added to segment + # downstream + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=50, end=110) + expected = self.genome.dna(Interval("chr1", "-", 100, 150, REFG)) + "N" * 10 + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_outside_coord_minus_strand_both_sides_padded(self): + # Coord minus, on coord strand. Segment indices < 0 and >= coord_length + # Segment lies both upstream and downstream of coordinate, pad should be + # added to both segment upstream and downstream + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-3, end=107) + expected = "NNN" + self.genome.dna(Interval("chr1", "-", 100, 200, REFG)) + "NNNNNNN" + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_outside_coord_disallowed_raises(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=-5, end=50) + with self.assertRaises(ValueError): + dis.dna(allow_outside_coord=False) + + def test_within_coord_disallow_no_raise(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, start=10, end=50) + expected = self.genome.dna(Interval("chr1", "+", 110, 150, REFG)) + self.assertEqual(dis.dna(allow_outside_coord=False), expected) + + def test_off_coord_strand_plus_coord(self): + # Coord plus, segment opposite strand (effective minus) + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=20, end=60) + expected = self.genome.dna(Interval("chr1", "-", 120, 160, REFG)) + self.assertEqual(dis.dna(), expected) + + def test_off_coord_strand_minus_coord(self): + # Coord minus, segment opposite strand (effective plus). + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=20, end=60) + expected = self.genome.dna(Interval("chr1", "+", 140, 180, REFG)) + self.assertEqual(dis.dna(), expected) + + def test_off_coord_strand_plus_coord_extrapolated_upstream(self): + # Coord plus, opposite strand. Segment indices < 0 + # Segment lies upstream of coordinate, but pad should be added to segment + # downstream + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=-5, end=50) + expected = self.genome.dna(Interval("chr1", "-", 100, 150, REFG)) + "N" * 5 + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_off_coord_strand_plus_coord_extrapolated_downstream(self): + # Coord plus, opposite strand. Segment indices >= coord_length + # Segment lies downstream of coordinate, but pad should be added to + # segment upstream + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=50, end=110) + expected = "N" * 10 + self.genome.dna(Interval("chr1", "-", 150, 200, REFG)) + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_off_coord_strand_minus_coord_extrapolated_upstream(self): + # Coord minus, opposite strand. Segment indices < 0 + # Segment lies upstream of coordinate, but pad should be added to segment + # downstream + ivs = _make_intervals([("chr1", "-", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=-5, end=50) + expected = self.genome.dna(Interval("chr1", "+", 150, 200, REFG)) + "N" * 5 + self.assertEqual(dis.dna(allow_outside_coord=True), expected) + + def test_off_coord_strand_zero_length(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=50, end=50) + self.assertEqual(dis.dna(), "") + + def test_off_coord_strand_outside_disallowed_raises(self): + ivs = _make_intervals([("chr1", "+", 100, 200)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=-5, end=50) + with self.assertRaises(ValueError): + dis.dna(allow_outside_coord=False) + + def test_off_coord_strand_plus_coord_multi_interval(self): + # Effective minus segment: DNA in segment runs 5'->3' + # so the higher-genomic coord interval contributes first. + ivs = _make_intervals([("chr1", "+", 100, 200), ("chr1", "+", 300, 400)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=50, end=150) + expected = ( + self.genome.dna(Interval("chr1", "-", 300, 350, REFG)) + + self.genome.dna(Interval("chr1", "-", 150, 200, REFG)) + ) + self.assertEqual(dis.dna(), expected) + + def test_off_coord_strand_minus_coord_multi_interval(self): + # Effective plus segment: DNA in segment runs 5'->3' + # so the lower-genomic coord interval contributes first. + ivs = _make_intervals([("chr1", "-", 100, 200), ("chr1", "-", 300, 400)]) + dis = DisjointIntervalSequence(ivs, on_coordinate_strand=False, start=50, end=150) + expected = ( + self.genome.dna(Interval("chr1", "+", 150, 200, REFG)) + + self.genome.dna(Interval("chr1", "+", 300, 350, REFG)) + ) + self.assertEqual(dis.dna(), expected) + + if __name__ == "__main__": unittest.main()