Source code for riboraptor.fasta

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import os
import re
import sys
import pybedtools
import pandas as pd
import numpy as np

from pyfaidx import Fasta
from Bio.Seq import Seq
from Bio.Alphabet import generic_dna

from .genome import _get_sizes
from .genome import _get_bed
from .genome import __GENOMES_DB__

from .helpers import collapse_bed_intervals
from .helpers import list_to_ranges
from .helpers import mkdir_p
from .helpers import _fix_bed_coltype


[docs]def get_fasta_sequence(fasta_f, intervals): """Extract fasta sequence given a list of intervals. Parameters ---------- fasta_f : str Path to fasta file intervals : list(tuple) A list of tuple in the form [(chrom, start, stop, strand)] NOTE: 1-based start and stop only! Returns ------- seq : list List of sequences at intervals """ fasta = Fasta(fasta_f) chrom = intervals[0][0] strand = intervals[0][-1] seq = '' for interval in intervals: # Fetching is 1-based for both start and stop and since we are # fetching from ranges, # This is already accounted for internally in the pipeline # when using the method ""there is additional +1 seq += str(fasta.get_seq(chrom, int(interval[1]), int(interval[2]))) if strand == '-': seq = str(Seq(seq, generic_dna).reverse_complement()) return seq
def _get_interval(coordinates, chrom, strand): intervals = [] for coord in coordinates: start = coord[0] end = coord[1] intervals.append((chrom, start, end, strand)) return intervals
[docs]def export_fasta_from_bed(gene_name, bed, chrom_sizes, fasta_f, gene_group=None, offset_5p=0, offset_3p=0): """Extract fasta genewise given coordinates in bed file Parameters ---------- gene_name : str Gene name bed : str Path to CDS or 5'UTR or 3'UTR bed fasta_f : str Path to fasta file chrom_sizes : str Path to chrom.sizes file offset_5p : int (positive) Number of bases to count upstream (5') offset_3p : int (positive) Number of bases to count downstream (3') Returns ------- gene_offset : int Gene wise offsets """ assert (offset_5p >= 0) assert (offset_3p >= 0) chromosome_lengths = pd.read_table( chrom_sizes, names=['chrom', 'size']).set_index('chrom') if not isinstance(bed, pd.DataFrame): bed = pybedtools.BedTool(bed).to_dataframe() bed = _fix_bed_coltype(bed) assert gene_name in bed['name'].tolist() if gene_group is None: gene_group = bed[bed['name'] == gene_name] assert len(gene_group['strand'].unique()) == 1 assert len(gene_group['chrom'].unique()) == 1 chrom = gene_group['chrom'].unique()[0] strand = gene_group['strand'].unique()[0] if strand == '+': rc = False elif strand == '-': rc = True # Collect all intervals at once intervals = list( zip(gene_group['chrom'], gene_group['start'], gene_group['end'], gene_group['strand'])) query_intervals, fasta_onebased_intervals, gene_offset_5p, gene_offset_3p = collapse_bed_intervals( intervals, chromosome_lengths, offset_5p, offset_3p) seq = get_fasta_sequence(fasta_f, list(fasta_onebased_intervals)) return gene_offset_5p, gene_offset_3p, str(seq)
[docs]def export_all_fasta(region_bed_f, chrom_sizes, fasta, prefix, offset_5p=60, offset_3p=0, ignore_tx_version=True): """Export all gene coverages. Parameters ---------- region_bed_f : str Path to region bed file (CDS/3'UTR/5'UTR) with bed name column as gene chrom_sizes : str Path to chrom.sizes file prefix : str Prefix to write output file offset_5p : int number of bases to count upstream (5') offset_30 : int number of bases to count downstream (3') ignore_tx_version : bool Should versions be ignored for gene names Returns ------- """ if region_bed_f.lower().split('_')[0] in __GENOMES_DB__: genome, region_type = region_bed_f.lower().split('_') region_bed_f = _get_bed(region_type, genome) region_bed = pybedtools.BedTool(region_bed_f).sort().to_dataframe() region_bed = _fix_bed_coltype(region_bed) # Group intervals by gene name cds_grouped = region_bed.groupby('name') mkdir_p(os.path.dirname(prefix)) with open('{}_all_genes.tsv'.format(prefix), 'w') as outfile: outfile.write('gene_name\toffset_5p\toffset_3p\tseq\n') for gene_name, gene_group in cds_grouped: if ignore_tx_version: gene_name = re.sub(r'\.[0-9]+', '', gene_name) gene_offset_5p, gene_offset_3p, seq = export_fasta_from_bed( gene_name, region_bed, chrom_sizes, fasta, gene_group, offset_5p, offset_3p) with open('{}_{}.fasta'.format(prefix, gene_name), 'w') as fh_fasta: fh_fasta.write('>{}_5poffset-{}_3poffset-{}\n{}'.format( gene_name, gene_offset_5p, gene_offset_3p, seq)) outfile.write('{}\t{}\t{}\t{}\n'.format( gene_name, int(gene_offset_5p), int(gene_offset_3p), seq))
[docs]def complete_gene_fasta(utr5_bed_f, cds_bed_f, utr3_bed_f, fasta_f, prefix): """Merge Utr5, CDS, UTR3 coordinates to get one fasta. Parameters ---------- utr5_bed : str Path to 5'UTR bed cds_bed : str Path to CDS bed utr3_bed : str Path to 3'UTR bed """ utr5_bed = _fix_bed_coltype( pybedtools.BedTool(utr5_bed_f).sort().to_dataframe()) cds_bed = _fix_bed_coltype( pybedtools.BedTool(cds_bed_f).sort().to_dataframe()) utr3_bed = _fix_bed_coltype( pybedtools.BedTool(utr3_bed_f).sort().to_dataframe()) # Group intervals by gene namei utr5_grouped = utr5_bed.groupby('name') cds_grouped = cds_bed.groupby('name') utr3_grouped = utr3_bed.groupby('name') mkdir_p(os.path.dirname(prefix)) # For now just write records where wll three annotations are complete utr5_genes = set(list(utr5_grouped.groups)) cds_genes = set(list(cds_grouped.groups)) utr3_genes = set(list(utr3_grouped.groups)) common_genes = utr5_genes.intersection(cds_genes).intersection(utr3_genes) for gene_name in common_genes: chrom = cds_grouped.get_group(gene_name)['chrom'].unique()[0] strand = cds_grouped.get_group(gene_name)['strand'].unique()[0] utr5_group = utr5_grouped.get_group(gene_name) cds_group = cds_grouped.get_group(gene_name) utr3_group = utr3_grouped.get_group(gene_name) utr5_intervals = list( zip(utr5_group['chrom'], utr5_group['start'], utr5_group['end'], utr5_group['strand'])) cds_intervals = list( zip(cds_group['chrom'], cds_group['start'], cds_group['end'], cds_group['strand'])) utr3_intervals = list( zip(utr3_group['chrom'], utr3_group['start'], utr3_group['end'], utr3_group['strand'])) _, utr5_onebased_intervals, _, _ = collapse_bed_intervals( utr5_intervals, chromosome_lengths, offset_5p, offset_3p) _, cds_onebased_intervals, _, _ = collapse_bed_intervals( cds_intervals, chromosome_lengths, offset_5p, offset_3p) _, utr3_onebased_intervals, _, _ = collapse_bed_intervals( utr3_intervals, chromosome_lengths, offset_5p, offset_3p) utr5seq = get_fasta_sequence(fasta_f, list(utr5_onebased_intervals)) cdsseq = get_fasta_sequence(fasta_f, list(cds_onebased_intervals)) utr3seq = get_fasta_sequence(fasta_f, list(utr3_onebased_intervals)) utr5len = len(utr5seq) cdslen = len(cdsseq) utr3len = len(utr3seq) intervals = list(utr5_onebased_intervals) + list( cds_onebased_intervals) + list(utr3_onebased_intervals) seq = get_fasta_sequence(fasta_f, intervals) with open('{}_{}.fasta'.format(prefix, gene_name), 'w') as fh_fasta: fh_fasta.write('>{}_utr5len={};cdslen={};utr3len={}\n{}'.format( gene_name, utr5len, cdslen, utr3len, seq))