Source code for genoray.exprs

"""`Polars <https://docs.pola.rs/>`_ expressions for filtering a genoray index (extension :code:`.gvi`)
given the minimum set of index columns:

- :code:`"CHROM"` : :code:`pl.Utf8`
- :code:`"POS"` : :code:`pl.Int64`
- :code:`"REF"` : :code:`pl.Utf8`
- :code:`"ALT"` : :code:`pl.List[Utf8]`
- :code:`"ILEN"` : :code:`pl.List[Int32]`

Applicable for PGEN files and the experimental :meth:`VCF._load_index` method.

.. note::
    For PGEN, all columns that existed in the underlying PVAR will be available in the index.
"""

import re
from collections.abc import Iterable

import polars as pl

"""
on-disk schema is same as in-memory except:
CHROM is cat or enum
ALT can be comma delimited str or list[str]
ILEN is optional list[int]
"""

# in-memory schema
IndexSchema = {
    "CHROM": pl.Enum,
    "POS": pl.Int64,
    "REF": pl.Utf8,
    "ALT": pl.List(pl.Utf8),
    "ILEN": pl.List(pl.Int32),
}
"""Minimum in-memory schema for a genoray index file (extension :code:`.gvi`)."""

is_snp = (
    pl.col("ILEN")
    .list.eval((pl.element() == 0) & pl.element().is_not_null())
    .list.all()
)
"""True if all ALT alleles are SNPs (single nucleotide polymorphisms).

Un-sizable symbolic alleles (``null`` ILEN) are treated as neither SNP nor
indel: a row containing any ``null`` ILEN element evaluates to ``False``.
"""

is_indel = (
    pl.col("ILEN")
    .list.eval((pl.element() != 0) & pl.element().is_not_null())
    .list.all()
)
"""True if all ALT alleles are indels (insertions or deletions).

Un-sizable symbolic alleles (``null`` ILEN) are treated as neither SNP nor
indel: a row containing any ``null`` ILEN element evaluates to ``False``.
"""

is_biallelic = pl.col("ALT").list.len() == 1
"""True if the variant is biallelic (one ALT allele)."""

is_symbolic = pl.col("ALT").list.eval(pl.element().str.starts_with("<")).list.any()
"""True if any ALT allele is a symbolic allele (e.g. :code:`<DEL>`, :code:`<INS>`,
:code:`<DUP>`, :code:`<INV>`, :code:`<CNV>`, :code:`<BND>` — anything matching ``<…>``
per the VCF 4.x spec).

Symbolic ALTs are placeholders for structural variants whose exact replacement
nucleotides are unknown. Downstream haplotype injection (e.g. via
``genvarloader``) cannot expand them — the literal ``<DEL>`` ASCII bytes end up
in personalized DNA buffers and become non-canonical bytes for translators.

To drop symbolic records, pass this as a filter. For PGEN, the single ``filter``
expression suffices::

    pgen = genoray.PGEN("file.pgen", filter=~genoray.exprs.is_symbolic)

For VCF, pair it with the equivalent cyvcf2 ``filter`` (both are required)::

    vcf = genoray.VCF(
        "file.vcf.gz",
        filter=lambda rec: not any(a.startswith("<") for a in rec.ALT),
        pl_filter=~genoray.exprs.is_symbolic,
    )

``SparseVar.from_vcf`` / ``from_pgen`` inherit the source's filter, so the SVAR
is filtered to match.
"""

_BND_PATTERN = r"[\[\]]|^\.[A-Za-z]|[A-Za-z]\.$"
"""Regex matching a VCF breakend (BND) ALT replacement string.

Catches the paired mate forms (``t[p[``, ``t]p]``, ``]p]t``, ``[p[t`` — all
contain ``[`` or ``]``) and the single-breakend forms (``.t`` / ``t.`` — a ``.``
adjacent to a base). A lone ``.`` (no-ALT) does not match because a base must
sit beside the dot."""

is_breakend = (
    pl.col("ALT").list.eval(pl.element().str.contains(_BND_PATTERN)).list.any()
)
"""True if any ALT allele is a breakend (BND) in mate-pair / single-breakend
notation (e.g. :code:`G[chr2:321[`, :code:`]chr2:321]G`, :code:`.TGCA`,
:code:`TGCA.`), per the VCF 4.x spec (§5.4).

Breakends are a *distinct* ALT class from symbolic ``<...>`` alleles, so
:data:`is_symbolic` does **not** flag them. But like symbolic alleles they are
not expandable into nucleotides — the bracket/colon/position bytes corrupt
personalized DNA buffers in haplotype consumers (e.g. ``genvarloader``). Their
:func:`symbolic_ilen` value is ``null`` (so they are also :data:`is_imprecise`).

To drop breakends, pass ``pl_filter=~genoray.exprs.is_breakend``. To drop *all*
un-expandable ALTs (symbolic + breakends) for haplotype consumers, combine::

    pl_filter=~genoray.exprs.is_symbolic & ~genoray.exprs.is_breakend
"""


def _record_is_symbolic(alts: Iterable[str]) -> bool:
    """Record-level mirror of :data:`is_symbolic` for a cyvcf2 ``Variant.ALT``.

    True if any ALT allele is a symbolic allele (starts with ``<``). Used by the
    CLI to build the cyvcf2 ``filter`` callable that must match the polars
    ``pl_filter`` on the VCF path.
    """
    return any(a.startswith("<") for a in alts)


def _record_is_breakend(alts: Iterable[str]) -> bool:
    """Record-level mirror of :data:`is_breakend` for a cyvcf2 ``Variant.ALT``.

    True if any ALT allele is a breakend (matches :data:`_BND_PATTERN`). Reuses
    the same regex as :data:`is_breakend` so the cyvcf2 ``filter`` callable and
    the polars ``pl_filter`` cannot drift apart.
    """
    return any(re.search(_BND_PATTERN, a) is not None for a in alts)


ILEN = pl.col("ALT").list.eval(pl.element().str.len_bytes().cast(pl.Int32)) - pl.col(
    "REF"
).str.len_bytes().cast(pl.Int32)
"""Indel length of the variant. Positive for insertions, negative for deletions, and zero for SNPs and MNPs."""


[docs] def symbolic_ilen( alt: str = "ALT", ref: str = "REF", svlen: str = "SVLEN", end: str = "END", imprecise: str = "IMPRECISE", ) -> pl.Expr: """Per-ALT corrected ILEN as ``List[Int32]``. Non-symbolic, non-breakend ALTs use the literal ``len(ALT) - len(REF)``. Precise symbolic ``<DEL>``/``<INS>``/``<DUP>`` use ``SVLEN`` magnitude (or ``|END - POS|`` for ``<DEL>``/``<DUP>`` when ``SVLEN`` is absent): ``-|len|`` for ``<DEL>``, ``+|len|`` for ``<INS>``/``<DUP>``. Everything we cannot size precisely (``IMPRECISE`` flag, missing length, unsupported type such as ``<BND>``, ``<CNV>``, ``<INV>``, ``<*>``, and breakend mate notation such as ``G[chr2:321[``) becomes ``null``. ``svlen``/``end``/``imprecise`` name scalar columns already extracted by the caller (per record). ``SVLEN`` magnitude is read as ``|SVLEN|`` so the VCF 4.3/4.4 sign-convention flip does not matter. .. note:: Evaluated per-record via a Python callback (``map_elements``) at index-build time; expect one Python call per variant row. """ ref_len = pl.col(ref).str.len_bytes().cast(pl.Int32) svlen_mag = pl.col(svlen).abs().cast(pl.Int32) end_mag = (pl.col(end) - pl.col("POS")).abs().cast(pl.Int32) mag = pl.coalesce(svlen_mag, end_mag) # prefer SVLEN, fall back to END imprecise_flag = pl.col(imprecise).fill_null(False) # Extract the primary SV type per ALT element (e.g. "<DEL:ME>" -> "DEL"). # list.eval only allows pl.element(); outer columns are broadcast automatically. sv_type_list = pl.col(alt).list.eval( pl.element().str.extract(r"^<([A-Za-z0-9:]+)>", 1).str.split(":").list.first() ) # Literal (non-symbolic) ILEN per element: len(ALT_tok) - len(REF) literal_list = ( pl.col(alt).list.eval(pl.element().str.len_bytes().cast(pl.Int32)) - ref_len ) # Whether each ALT element is symbolic is_sym_list = pl.col(alt).list.eval(pl.element().str.starts_with("<")) # Whether each ALT element is a breakend (BND) — un-sizable like symbolic. is_bnd_list = pl.col(alt).list.eval(pl.element().str.contains(_BND_PATTERN)) # Per-row scalar: sized SV length (sign-corrected), or null if un-sizable. # This is broadcast across all symbolic ALTs in the row. del_mag = pl.when(imprecise_flag | mag.is_null()).then(None).otherwise(-mag) ins_dup_mag = pl.when(imprecise_flag | mag.is_null()).then(None).otherwise(mag) # Build ILEN list element-wise using list.eval on a packed struct. # Pack: sv_type string + is_sym flag + literal value. packed = pl.struct( sv_type=sv_type_list, is_sym=is_sym_list, is_bnd=is_bnd_list, literal=literal_list, del_mag=del_mag, ins_dup_mag=ins_dup_mag, ) return packed.map_elements(_compute_ilen_row, return_dtype=pl.List(pl.Int32))
def _compute_ilen_row(row: dict) -> list[int | None]: """Compute per-ALT ILEN for a single row (called via map_elements).""" sv_types: list = row["sv_type"] is_syms: list = row["is_sym"] is_bnds: list = row["is_bnd"] literals: list = row["literal"] del_mag = row["del_mag"] ins_dup_mag = row["ins_dup_mag"] result = [] for sv_type, sym, bnd, lit in zip(sv_types, is_syms, is_bnds, literals): if bnd: # Breakends carry no expandable length — un-sizable, like symbolic SVs. result.append(None) elif not sym: result.append(lit) elif sv_type == "DEL": result.append(del_mag) elif sv_type in ("INS", "DUP"): result.append(ins_dup_mag) else: result.append(None) return result is_imprecise = pl.col("ILEN").list.eval(pl.element().is_null()).list.any() """True if any ALT allele's ILEN could not be precisely determined (an un-sizable symbolic allele — ``IMPRECISE``, missing ``SVLEN``/``END``, or an unsupported symbolic type). Such alleles carry ``null`` ILEN. Filter them out with ``pl_filter=~genoray.exprs.is_imprecise`` to keep precise structural variants while dropping the rest; use ``~genoray.exprs.is_symbolic`` to drop *all* symbolic alleles (required for haplotype consumers such as genvarloader, which cannot expand any symbolic ALT)."""