Source code for riboraptor.coherence

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import pickle
import numpy as np
import pandas as pd
import six
from .helpers import identify_peaks


def _shift_bit_length(x):
    """Shift bit"""
    return 1 << (x - 1).bit_length()


def _padwithzeros(vector, pad_width, iaxis, kwargs):
    """Pad with zeros"""
    vector[:pad_width[0]] = 0
    vector[-pad_width[1]:] = 0
    return vector


[docs]def naive_periodicity(values, identify_peak=False): '''Calculate periodicity in a naive manner Take ratio of frame1 over avg(frame2+frame3) counts. By default the first value is treated as the first frame as well Parameters ---------- values : Series Metagene profile Returns ------- periodicity : float Periodicity ''' if identify_peak: peak_location = identify_peaks(values) min_index = min(values.index) max_index = max(values.index) # Frame 1 starts at the peak_location # We assume that if values is series it will have continuous # indices values = values[np.arange(peak_location, max_index)] values = pd.Series(values) values = values.fillna(0) frame1_total = 0 frame2_total = 0 frame3_total = 0 values = list(values)[0:len(values) - len(values) % 3] for i in range(0, len(values), 3): frame1_total += values[i] for i in range(1, len(values), 3): frame2_total += values[i] for i in range(2, len(values), 3): frame3_total += values[i] return frame1_total / (frame1_total + frame2_total + frame3_total)
[docs]def get_periodicity(values, input_is_stream=False): """Calculate periodicty wrt 1-0-0 signal. Parameters ---------- values : array like List of values Returns ------- periodicity : float Periodicity calculated as cross correlation between input and idea 1-0-0 signal """ from mtspec import mtspec, mt_coherence tbp = 4 kspec = 3 nf = 30 p = 0.90 if input_is_stream: values = list(map(lambda x: float(x.rstrip()), values)) if isinstance(values, six.string_types): try: values = pickle.load(open(values)) except KeyError: pass values = pd.Series(values) values = values[0:max(values.index)] length = len(values) next_pow2_length = _shift_bit_length(length) values = np.lib.pad(values, (0, next_pow2_length - len(values) % next_pow2_length), _padwithzeros) mean_centered_values = values - np.nanmean(values) normalized_values = mean_centered_values / \ np.max(np.abs(mean_centered_values)) uniform_signal = [1, -0.6, -0.4] * (next_pow2_length // 3) uniform_signal = np.lib.pad( uniform_signal, (0, next_pow2_length - len(uniform_signal) % next_pow2_length), _padwithzeros) out = mt_coherence( 1, normalized_values, uniform_signal, tbp, kspec, nf, p, freq=True, phase=True, cohe=True, iadapt=1) spec, freq, jackknife, fstatistics, _ = mtspec( data=values, delta=1., time_bandwidth=4, number_of_tapers=3, statistics=True, rshape=0, fcrit=0.9) p = 99 base_fstat = np.percentile(fstatistics, p) fstat = fstatistics[np.argmin(np.abs(freq - 1 / 3.0))] p_value = 'p < 0.05' if fstat < base_fstat: p_value = 'p > 0.05' return out['cohe'][np.argmin(np.abs(out['freq'] - 1 / 3.0))], p_value