"""All functions that are not so useful, but still useful."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from collections import OrderedDict
from collections import defaultdict
import csv
import errno
import itertools
import math
import os
import sys
import glob
import ntpath
import pickle
from scipy import stats
import numpy as np
import pandas as pd
import six
CBB_PALETTE = [
"#000000", "#E69F00", "#56B4E9", "#009E73", "#F0E442", "#0072B2",
"#D55E00", "#CC79A7"
]
def _fix_bed_coltype(bed):
"""Fix bed chrom and name columns to be string
This is necessary since the chromosome numbers are often interpreted as int
"""
bed['chrom'] = bed['chrom'].astype(str)
bed['name'] = bed['name'].astype(str)
return bed
[docs]def check_file_exists(filepath):
"""Check if file exists.
Parameters
----------
filepath : str
Path to file
"""
if os.path.isfile(os.path.abspath(filepath)):
return True
return False
[docs]def list_to_ranges(list_of_int):
"""Convert a list to a list of range object
Parameters
----------
list_of_int: list
List of integers to be squeezed into range
Returns
-------
list_of_range: list
List of range objects
"""
sorted_list = sorted(set(list_of_int))
for key, group in itertools.groupby(
enumerate(sorted_list), lambda x: x[1] - x[0]):
group = list(group)
yield group[0][1], group[-1][1]
[docs]def create_ideal_periodic_signal(signal_length):
"""Create ideal ribo-seq signal.
Parameters
----------
signal_length : int
Length of signal to create
Returns
-------
signal : array_like
1-0-0 signal
"""
uniform_signal = np.array([4 / 6.0] * signal_length)
uniform_signal[list(range(1, len(uniform_signal), 3))] = 1 / 6.0
uniform_signal[list(range(2, len(uniform_signal), 3))] = 1 / 6.0
return uniform_signal
[docs]def identify_peaks(coverage):
"""Given coverage array, find the site of maximum density"""
return np.argmax(coverage[list(range(-18, -10))])
[docs]def millify(n):
"""Convert integer to human readable format.
Parameters
----------
n : int
Returns
-------
millidx : str
Formatted integer
"""
millnames = ['', ' K', ' M', ' B', ' T']
# Source: http://stackoverflow.com/a/3155023/756986
n = float(n)
millidx = max(
0,
min(
len(millnames) - 1,
int(math.floor(0 if n == 0 else math.log10(abs(n)) / 3))))
return '{:.1f}{}'.format(n / 10**(3 * millidx), millnames[millidx])
[docs]def mkdir_p(path):
"""Python version mkdir -p
Parameters
----------
path : str
"""
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
[docs]def r2(x, y):
'''Calculate pearson correlation between two vectors.
Parameters
----------
x : array_like
Input
y : array_like
Input
'''
return stats.pearsonr(x, y)[0]**2
[docs]def round_to_nearest(x, base=5):
'''Round to nearest base.
Parameters
----------
x : float
Input
Returns
-------
v : int
Output
'''
return int(base * round(float(x) / base))
[docs]def set_xrotation(ax, degrees):
"""Rotate labels on x-axis.
Parameters
----------
ax : matplotlib.Axes
Axes object
degrees : int
Rotation degrees
"""
for i in ax.get_xticklabels():
i.set_rotation(degrees)
[docs]def summary_stats_two_arrays_welch(old_mean_array,
new_array,
old_var_array=None,
old_n_counter=None,
carried_forward_observations=None):
"""Average two arrays using welch's method
Parameters
----------
old_mean_array : Series
Series of previous means with index as positions
old_var_array : Series
Series of previous variances with index as positions
new_array : array like
Series of new observations
(Does noes
Ciunts of number of positions at a certain index
Returns
-------
m : array like
Column wise Mean array
var : array like
Column wise variance
Consider an example: [1,2,3], [1,2,3,4], [1,2,3,4,5]
old = [1,2,3]
new = [1,2,3,4]
counter = [1,1,1]
mean = [1,2,3,4] Var =[na, na, na, na], carried_fowrad = [[1,1], [2,2], [3,3], [4]]
old = [1,2,3,4]
new = [1,2,3,4,5]
couter = [2,2,2,1]
mean = [1,2,3,4,5]
var = [0,0,0, na, na]
carried_forward = [[], [], [], [4,4], [5]]
"""
if not isinstance(old_mean_array, pd.Series):
old_mean_array = pd.Series(old_mean_array)
if not isinstance(new_array, pd.Series):
new_array = pd.Series(new_array)
if old_n_counter is not None and not isinstance(old_n_counter, pd.Series):
old_n_counter = pd.Series(old_n_counter)
len_old, len_new = len(old_mean_array), len(new_array)
if old_n_counter is None:
# Initlaized from current series
old_n_counter = pd.Series(
np.zeros(len(old_mean_array)) + 1, index=old_mean_array.index)
if old_var_array is None:
# Initlaized from current series
old_var_array = pd.Series(
np.zeros(len(old_mean_array)) + np.nan, index=old_mean_array.index)
# Update positions counts based on new_array
new_n_counter = old_n_counter.add(
pd.Series(np.zeros(len(new_array)) + 1, index=new_array.index),
fill_value=0)
if len_old > len_new:
len_diff = len_old - len_new
# Pad the incoming array
# We append NAs to the end of new_array since it will mostly be in the metagene context
max_index = np.max(new_array.index.tolist())
new_index = np.arange(max_index + 1, max_index + 1 + len_diff)
new_array = new_array.append(
pd.Series(np.zeros(len_diff) + np.nan, index=new_index),
verify_integrity=True)
elif len_old < len_new:
len_diff = len_new - len_old
# Pad the old array
if len_old == 0:
old_mean_array = pd.Series([])
else:
max_index = np.max(old_mean_array.index.tolist())
new_index = np.arange(max_index + 1, max_index + 1 + len_diff)
old_mean_array = old_mean_array.append(
pd.Series(np.zeros(len_diff) + np.nan, index=new_index),
verify_integrity=True)
if not (old_mean_array.index == new_array.index).all():
print('old array index: {}'.format(old_mean_array))
print('new array index: {}'.format(new_array))
positions_with_less_than3_obs = defaultdict(list)
for index, counts in six.iteritems(new_n_counter):
# Which positions has <3 counts for calculating variance
if counts <= 3:
# Fetch the exact observations from history
try:
last_observations = carried_forward_observations[index]
except:
# No carreid forward passed
if not np.isnan(old_mean_array[index]):
last_observations = [old_mean_array[index]]
else:
last_observations = []
# Add entry from new_array only if it is not NAN
if not np.isnan(new_array[index]):
last_observations.append(new_array[index])
positions_with_less_than3_obs[index] = last_observations
# positions_with_less_than3_obs = pd.Series(positions_with_less_than3_obs)
# delta = x_n - mean(x_{n-1})
delta = new_array.subtract(old_mean_array)
"""
for index, value in six.iteritems( delta ):
if np.isnan(value):
if not np.isnan(old_mean_array[index]):
delta[index] = old_mean_array[index]
else:
delta[index] = new_array[index]
"""
# delta = delta/n
delta_normalized = delta.divide(new_n_counter)
# mean(x_n) = mean(x_{n-1}) + delta/n
new_mean_array = old_mean_array.add(delta_normalized)
for index, value in six.iteritems(new_mean_array):
if np.isnan(value):
if not np.isnan(old_mean_array[index]):
new_mean_array[index] = old_mean_array[index]
else:
new_mean_array[index] = new_array[index]
#print(delta)
#print(new_n_counter)
#print(delta_normalized)
#print(new_mean_array)
# mean_difference_current = x_n - mean(x_n)
# mean_difference_previous = x_n - mean(x_{n-1})
mean_difference_current = new_array.fillna(0) - new_mean_array.fillna(0)
mean_difference_previous = new_array.fillna(0) - old_mean_array.fillna(0)
# (x_n-mean(x_n))(x_n-mean(x_{n-1})
product = np.multiply(mean_difference_current, mean_difference_previous)
# (n-1)S_n^2 - (n-2)S_{n-1}^2 = (x_n-mean(x_n)) (x_n-mean(x_{n-1}))
# old_ssq = (n-1)S_{n-1}^2
# (n-2)S_{n-1}^2
old_sum_of_sq = (old_n_counter - 2).multiply(old_var_array.fillna(0))
# new_ssq = (old_ssq + product)
# (n-1) S_n^2
new_sum_of_sq = old_sum_of_sq + product
# if counts is less than 3, set sum of sq to NA
new_sum_of_sq[new_n_counter < 3] = np.nan
# if counts just became 3, compute the variance
for index, counts in six.iteritems(new_n_counter):
if counts == 3:
observations = positions_with_less_than3_obs[index]
variance = np.var(observations)
print(index, variance)
new_sum_of_sq[index] = variance
# delete it from the history
del positions_with_less_than3_obs[index]
new_var_array = new_sum_of_sq.divide(new_n_counter - 1)
new_var_array[new_var_array == np.inf] = np.nan
new_var_array[new_n_counter < 3] = np.nan
"""
for index, counts in six.iteritems(new_n_counter):
if counts < 3:
if not np.isnan(new_array[index]):
if index not in list(positions_with_less_than3_obs.keys()):
positions_with_less_than3_obs[index] = list()
assert index in positions_with_less_than3_obs.keys()
positions_with_less_than3_obs[index].append(new_array[index])
"""
return new_mean_array, new_var_array, new_n_counter, positions_with_less_than3_obs
[docs]def path_leaf(path):
"""Get path's tail from a filepath"""
head, tail = ntpath.split(path)
return tail or ntpath.basename(head)
[docs]def parse_star_logs(infile, outfile=None):
"""Parse star logs into a dict
Parameters
----------
infile : str
Path to starlogs.final.out file
Returns
-------
star_info : dict
Dict with necessary records parsed
"""
ANNOTATIONS = [
'total_reads', 'uniquely_mapped', 'uniquely_mapped_percent',
'multi_mapped_percent', 'unmapped_percent', 'multi_mapped'
]
star_info = OrderedDict()
with open(infile) as fh:
for line in fh:
line = line.strip()
if line.startswith('Number of input reads'):
star_info[ANNOTATIONS[0]] = int(line.strip().split('\t')[1])
elif line.startswith('Uniquely mapped reads number'):
star_info[ANNOTATIONS[1]] = int(line.strip().split('\t')[1])
elif line.startswith('Uniquely mapped reads %'):
star_info[ANNOTATIONS[2]] = round(
float(line.strip('%').split('\t')[1]), 2)
elif line.startswith('Number of reads mapped to multiple loci'):
star_info[ANNOTATIONS[5]] = int(line.strip().split('\t')[1])
elif line.startswith('Number of reads mapped to too many loci'):
star_info[ANNOTATIONS[5]] += int(line.strip().split('\t')[1])
elif line.startswith('% of reads mapped to multiple loci'):
star_info[ANNOTATIONS[3]] = round(
float(line.strip('%').split('\t')[1]), 2)
elif line.startswith('% of reads mapped to too many loci'):
star_info[ANNOTATIONS[3]] += round(
float(line.strip('%').split('\t')[1]), 2)
elif line.startswith('% of reads unmapped: too many mismatches'):
star_info[ANNOTATIONS[4]] = round(
float(line.strip('%').split('\t')[1]), 2)
elif line.startswith('% of reads unmapped: too short'):
star_info[ANNOTATIONS[4]] += round(
float(line.strip('%').split('\t')[1]), 2)
elif line.startswith('% of reads unmapped: other'):
star_info[ANNOTATIONS[4]] += round(
float(line.strip('%').split('\t')[1]), 2)
star_info = {
key: round(star_info[key], 2)
for key in list(star_info.keys())
}
if outfile is None:
return star_info
filename = path_leaf(infile)
filename = filename.strip('Log.final.out')
counts_df = pd.DataFrame.from_dict(star_info, orient='index').T
counts_df.index = [filename]
counts_df.to_csv(outfile, sep=str('\t'), index=True, header=True)
return counts_df
[docs]def get_strandedness(filepath):
"""Parse output of infer_experiment.py from RSeqC to get strandedness.
Parameters
----------
filepath : str
Path to infer_experiment.py output
Returns
-------
strandedness : str
reverse or forward or none
"""
with open(filepath) as f:
data = f.read()
splitted = [x.strip() for x in data.split('\n') if len(x.strip()) >= 1]
strandedness = None
assert splitted[0] == 'This is SingleEnd Data'
few_percentage = None
rev_percentage = None
for line in splitted[1:]:
if 'Fraction of reads failed to determine:' in line:
continue
elif 'Fraction of reads explained by "++,--":' in line:
fwd_percentage = float(line.split(':')[1])
elif 'Fraction of reads explained by "+-,-+":' in line:
rev_percentage = float(line.split(':')[1])
assert rev_percentage is not None
assert fwd_percentage is not None
ratio = fwd_percentage / rev_percentage
if np.isclose([ratio], [1]):
return 'none'
elif ratio >= 0.5:
return 'forward'
else:
return 'reverse'
[docs]def load_pickle(filepath):
"""Read pickled files easy in Python 2/3"""
if '.tsv' in filepath:
raise IndexError
if sys.version_info > (3, 0):
pickled = pickle.load(open(filepath, 'rb'), encoding='latin1')
else:
pickled = pickle.load(open(filepath, 'rb'))
return pickled
[docs]def pad_or_truncate(some_list, target_len):
"""Pad or truncate a list upto given target length
Parameters
----------
some_list : list
Input list
target_length : int
Final length of list
If being extended, returns list padded with NAs.
"""
return some_list[:target_len] + [np.nan] * (target_len - len(some_list))
[docs]def pad_five_prime_or_truncate(some_list, offset_5p, target_len):
"""Pad first the 5prime end and then the 3prime end or truncate
Parameters
----------
some_list : list
Input list
offset_5p : int
5' offset
target_length : int
Final length of list
If being extended, returns list padded with NAs.
"""
some_list = list(some_list)
padded_5p = [np.nan] * offset_5p + some_list
return padded_5p[:target_len] + [np.nan] * (target_len - len(padded_5p))
[docs]def codon_to_anticodon(codon):
"""Codon to anticodon.
Parameters
----------
codon : string
Input codon
"""
pairs = {'A': 'T', 'C': 'G', 'T': 'A', 'G': 'C', 'N': 'N'}
return ''.join(pairs[c] for c in codon)[::-1]
[docs]def collapse_bed_intervals(intervals,
chromosome_lengths=None,
offset_5p=0,
offset_3p=0):
"""Collapse intervals into non overlapping manner
# NOTE
# TODO : This function has a subtle bug that it will be offset by 1
# position when the gene is on negative strand
# So essentially if you have CDS on a negative strand
# The first position should be discarded
# Similary for the last position in the gene on + strand
# you have an extra position in the end
Parameters
----------
intervals : list of tuples
Like [('chr1', 310, 320, '+'), ('chr1', 321, 330, '+')]
chromosome_lengths : dict
A map of each chromosome'e length
Only used with offset_3p, offset_5p>0
offset_5p : int (positive)
Number of bases to count upstream (5')
offset_3p : int (positive)
Number of bases to count downstream (3')
Returns
-------
interval_combined : list of tuples
A collapsed version of interval
This is useful when the annotations are overlapping.
Example:
chr1 310 320 gene1 +
chr1 319 324 gene1 +
Returns:
chr1 310 324 gene1 +
intervals_for_fasta_read : list of tuples
This list can be used to directly fetch
fasta from pyfaidx.
NOTE: DO NOT do offset adjustments
as they are already adjusted for pyfaidx format
(1-end both start and end)
gene_offset_5p, gene_offset_3 : in
Gene wise offsets.
This might be different from offset_5p in cases where
`offset_5p` leads to a negative coordinate
"""
chrom = intervals[0][0]
strand = intervals[0][3]
chroms = list(set([i[0] for i in intervals]))
strands = list(set([i[3] for i in intervals]))
if len(chroms) != 1:
sys.stderr.write('Error chromosomes should be unique')
return
if len(strands) != 1:
sys.stderr.write('Error strands should be unique')
return
intervals_list = [list(element) for element in list(intervals)[:]]
first_interval = intervals_list[0]
last_interval = intervals_list[-1]
if offset_5p != 0 and offset_3p != 0:
chrom_length = chromosome_lengths[str(first_interval[0])]
else:
chrom_length = np.inf
# Need to convert to list instead frm tuples
# TODO fix this?
# intervals = list(map(list, list(intervals)))
if strand == '+':
# For positive strand shift
# start codon position first_interval[1] by -offset
if first_interval[1] - offset_5p >= 0:
first_interval[1] = first_interval[1] - offset_5p
gene_offset_5p = offset_5p
else:
sys.stderr.write('Cannot offset beyond 0 for interval: {}. \
Set to start of chromsome.\n'.format(first_interval))
# Reset offset to minimum possible
gene_offset_5p = first_interval[1]
first_interval[1] = 0
if (last_interval[2] + offset_3p <= chrom_length):
last_interval[2] = last_interval[2] + offset_3p
gene_offset_3p = offset_3p
else:
sys.stderr.write('Cannot offset beyond 0 for interval: {}. \
Set to end of chromsome.\n'.format(last_interval))
gene_offset_3p = chrom_length - last_interval[2]
# 1-end so chrom_length
last_interval[2] = chrom_length
else:
# Else shift cooridnate of last element in intervals stop by + offset
if (last_interval[2] + offset_5p <= chrom_length):
last_interval[2] = last_interval[2] + offset_5p
gene_offset_5p = offset_5p
else:
sys.stderr.write('Cannot offset beyond 0 for interval: {}. \
Set to end of chromsome.\n'.format(last_interval))
gene_offset_5p = chrom_length - last_interval[2]
# 1-end so chrom_length
last_interval[2] = chrom_length
if first_interval[1] - offset_3p >= 0:
first_interval[1] = first_interval[1] - offset_3p
gene_offset_3p = offset_3p
else:
sys.stderr.write('Cannot offset beyond 0 for interval: {}. \
Set to start of chromsome.\n'.format(first_interval))
# Reset offset to minimum possible
gene_offset_5p = first_interval[1]
first_interval[1] = 0
intervals = [tuple(element) for element in intervals_list]
interval_coverage_list = []
for index, interval in enumerate(intervals):
strand = interval[3]
if strand == '+':
series_range = list(range(interval[1], interval[2]))
elif strand == '-':
series_range = list(range(interval[2], interval[1], -1))
series = pd.Series(series_range, index=series_range)
interval_coverage_list.append(series)
if len(interval_coverage_list) == 0:
# Some genes might not be present in the bigwig at all
sys.stderr.write('Got empty list! intervals for chr : {}\n'.format(
first_interval[0]))
return (pd.Series([]), pd.Series([]), pd.Series([]), 0, 0)
interval_combined = interval_coverage_list[0]
for interval in interval_coverage_list[1:]:
interval_combined = interval_combined.combine_first(interval)
interval_index = np.arange(len(interval_combined)) - gene_offset_5p
index_to_genomic_pos_map = pd.Series(
interval_combined.index.tolist(), index=interval_index)
"""
intervals_for_fasta_read = []
for pos in index_to_genomic_pos_map.values:
# we use 1-based indexing (both start and end) for fetching fasta
intervals_for_fasta_read.append((chrom, pos + 1, pos + 1, strand))
"""
interval_combined = interval_combined.reset_index(drop=True)
interval_combined = interval_combined.rename(lambda x: x - gene_offset_5p)
interval_zero_start = list_to_ranges(
interval_combined.sort_values().astype(int).values.tolist())
interval_one_start = list_to_ranges(
(1 + interval_combined).sort_values().astype(int).values.tolist())
query_intervals = []
for index, (start, stop) in enumerate(interval_zero_start):
# our intervals were already 0-based start and 1-based end
# so we want to retain that
# it is tricky
# but the start and end were both transformed to 0-based since we used range before
query_intervals.append((chrom, start, stop + 1, strand))
fasta_onebased_intervals = []
for index, (start, stop) in enumerate(interval_one_start):
# our intervals were already 0-based start and 1-based end
# so we want to retain that
# it is tricky
# but the start and end were both transformed to 0-based since we used range before
fasta_onebased_intervals.append((chrom, start, stop, strand))
return query_intervals, fasta_onebased_intervals, gene_offset_5p, gene_offset_3p
[docs]def summarize_counters(samplewise_dict):
"""Summarize gene counts for a collection of samples.
Parameters
----------
samplewise_dict : dict
A dictionary with key as sample name and value
as another dictionary of counts for each gene
Returns
-------
totals : dict
A dictionary with key as sample name and value as total gene count
"""
totals = {}
for key, sample_dict in six.iteritems(samplewise_dict):
totals[key] = np.nansum(
[np.nansum(d) for d in list(sample_dict.values)])
return totals