Source code for mrdna.readers.segmentmodel_from_lists

# -*- coding: utf-8 -*-

import pdb
import numpy as np
import os,sys
import scipy
import pandas as pd

from mrdna import logger, devlogger
from mrdna.segmentmodel import SegmentModel, SingleStrandedSegment, DoubleStrandedSegment
from mrdna.arbdmodel.coords import quaternion_from_matrix, rotationAboutAxis, quaternion_slerp
from mrdna import get_resource_path

ref_stack_position = np.array((-2.41851735, -0.259761333, 3.39999978))

def _three_prime_list_to_five_prime(three_prime):
    five_prime = -np.ones(three_prime.shape, dtype=int)
    has_three_prime = np.where(three_prime >= 0)[0]
    five_prime[three_prime[has_three_prime]] = has_three_prime
    return five_prime  

def _primes_list_to_strands(three_prime, five_prime):
    five_prime_ends = np.where(five_prime < 0)[0]
    strands = []
    strand_is_circular = []
    
    idx_to_strand = -np.ones(three_prime.shape, dtype=int)

    def build_strand(nt_idx, conditional):
        strand = [nt_idx]
        idx_to_strand[nt_idx] = len(strands)
        while conditional(nt_idx):
            nt_idx = three_prime[nt_idx]
            strand.append(nt_idx)
            idx_to_strand[nt_idx] = len(strands)
        strands.append( np.array(strand, dtype=int) )

    for nt_idx in five_prime_ends:
        build_strand(nt_idx,
                     lambda nt: three_prime[nt] >= 0)
        strand_is_circular.append(False)

    while True:
        ## print("WARNING: working on circular strand {}".format(len(strands)))
        ids = np.where(idx_to_strand < 0)[0]
        if len(ids) == 0: break
        build_strand(ids[0],
                     lambda nt: three_prime[nt] >= 0 and \
                     idx_to_strand[three_prime[nt]] < 0)
        strand_is_circular.append(True)

    return strands, strand_is_circular

[docs] def find_stacks(centers, transforms): ## Find orientation and center of each nucleotide expected_stack_positions = [] for R,c in zip(transforms,centers): expected_stack_positions.append( c + ref_stack_position.dot(R) ) expected_stack_positions = np.array(expected_stack_positions, dtype=np.float32) dists = scipy.spatial.distance_matrix(expected_stack_positions, centers) dists = dists + 5*np.eye(len(dists)) idx1, idx2 = np.where(dists < 3.5) ## Convert distances to stacks stacks_above = -np.ones(len(centers), dtype=int) _z = np.array((0,0,1)) for i in np.unique(idx1): js = idx2[ idx1 == i ] with np.errstate(divide='ignore',invalid='ignore'): angles = [np.arccos( transforms[j].T.dot( transforms[i].dot(_z) ).dot( _z ) ) for j in js] angles = np.array( angles ) tmp = np.argmin(dists[i][js] + 1.0*angles) j = js[tmp] stacks_above[i] = j return stacks_above
[docs] def basepairs_and_stacks_to_helixmap(basepairs,stacks_above, nucleotide_rank=None): """ Decide how helices should be created from basepair and stack data. Optionally use nucleotide_rank to determine direction of each helix and order of helices """ helixmap = -np.ones(basepairs.shape, dtype=int) helixrank = -np.ones(basepairs.shape) is_fwd = np.ones(basepairs.shape, dtype=int) if nucleotide_rank is None: nucleotide_rank = np.arange(len(basepairs), dtype=int) ## Remove stacks with nts lacking a basepairs nobp = np.where(basepairs < 0)[0] stacks_above[nobp] = -1 stacks_with_nobp = np.in1d(stacks_above, nobp) stacks_above[stacks_with_nobp] = -1 end_ids = np.where( (stacks_above < 0)*(basepairs >= 0) )[0] hid = 0 for end in sorted(end_ids, key=lambda x: nucleotide_rank[x]): if helixmap[end] >= 0: continue rank = 0 nt = basepairs[end] bp = basepairs[nt] assert( bp == end ) if helixmap[nt] >= 0 or helixmap[bp] >= 0: logger.warning(f'Ill-formed helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping') continue # assert(helixmap[nt] == -1) # assert(helixmap[bp] == -1) helixmap[nt] = helixmap[bp] = hid helixrank[nt] = helixrank[bp] = rank is_fwd[bp] = 0 rank +=1 _tmp = [(nt,bp)] while stacks_above[nt] >= 0: nt = stacks_above[nt] if basepairs[nt] < 0: break bp = basepairs[nt] if helixmap[nt] >= 0 or helixmap[bp] >= 0: logger.warning(f'Ill-formed helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping') break helixmap[nt] = helixmap[bp] = hid helixrank[nt] = helixrank[bp] = rank is_fwd[bp] = 0 _tmp.append((nt,bp)) rank +=1 hid += 1 ## Create "helix" for each circular segment intrahelical = [] processed = set() unclaimed_bases = np.where( (basepairs >= 0)*(helixmap == -1) )[0] for nt0 in unclaimed_bases: if nt0 in processed: continue nt = nt0 all_nts = [nt] rank = 0 nt = nt0 bp = basepairs[nt] if helixmap[nt] >= 0 or helixmap[bp] >= 0: logger.warning(f'Ill-formed cylic helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping') continue helixmap[nt] = helixmap[bp] = hid helixrank[nt] = helixrank[bp] = rank is_fwd[bp] = 0 rank +=1 processed.add(nt) processed.add(bp) counter = 0 while stacks_above[nt] >= 0: lastnt = nt nt = stacks_above[nt] bp = basepairs[nt] if nt == nt0 or nt == basepairs[nt0]: intrahelical.append((lastnt,nt0)) break assert( bp >= 0 ) if helixmap[nt] >= 0 or helixmap[bp] >= 0: logger.warning(f'Ill-formed cyclic helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping') break helixmap[nt] = helixmap[bp] = hid helixrank[nt] = helixrank[bp] = rank is_fwd[bp] = 0 processed.add(nt) processed.add(bp) rank +=1 hid += 1 return helixmap, helixrank, is_fwd, intrahelical
[docs] def set_splines(seg, coordinate, hid, hmap, hrank, fwd, basepair, orientation=None): maxrank = np.max( hrank[hmap==hid] ) if maxrank == 0: ids = np.where((hmap == hid))[0] pos = np.mean( [coordinate[r,:] for r in ids ], axis=0 ) coords = [pos,pos] contours = [0,1] if orientation is not None: ids = np.where((hmap == hid) * fwd)[0] assert( len(ids) == 1 ) q = quaternion_from_matrix( orientation[ids[0]] ) quats = [q, q] coords[-1] = pos + orientation[ids[0]].dot(np.array((0,0,1))) else: coords,contours,quats = [[],[],[]] last_q = None for rank in range(int(maxrank)+1): ids = np.where((hmap == hid) * (hrank == rank))[0] coords.append(np.mean( [coordinate[r,:] for r in ids ], axis=0 )) contours.append( float(rank+0.5)/(maxrank+1) ) if orientation is not None: ids = np.where((hmap == hid) * (hrank == rank) * fwd)[0] assert(len(ids) == 1) q = quaternion_from_matrix( orientation[ids[0]] ) if last_q is not None and last_q.dot(q) < 0: q = -q ## Average quaterion with reverse direction bp = basepair[ids[0]] if bp >= 0: bp_o = orientation[bp].dot(rotationAboutAxis(np.array((1,0,0)),180)) q2 = quaternion_from_matrix( bp_o ) if q.dot(q2) < 0: q2 = -q2 ## probably good enough, but slerp is better: q = (q + q2)*0.5 q = quaternion_slerp(q,q2,0.5) quats.append(q) last_q = q coords = np.array(coords) seg.set_splines(contours,coords) if orientation is not None: quats = np.array(quats) seg.set_orientation_splines(contours,quats) seg.start_position = coords[0,:] seg.end_position = coords[-1,:]
[docs] def model_from_basepair_stack_3prime(coordinate, basepair, stack, three_prime, sequence=None, orientation=None, rank=None, explicitly_unstacked = tuple(), max_basepairs_per_bead = 5, max_nucleotides_per_bead = 5, local_twist = False, dimensions=(5000,5000,5000), **model_parameters): """ Creates a SegmentModel object from lists of each nucleotide's basepair, its stack (on 3' side) and its 3'-connected nucleotide The first argument should be an N-by-3 numpy array containing the coordinate of each nucleotide, where N is the number of nucleotides. The following three arguments should be integer lists where the i-th element corresponds to the i-th nucleotide; the list element should the integer index of the corresponding basepaired / stacked / phosphodiester-bonded nucleotide. If there is no such nucleotide, the value should be -1. By default, where there is a 3'-to-5' connection between double-straned helices, they will be connected by an intrahelical connection that makes the helical domain continue across the connection, even if they do not stack in the stacking array. Instead, a "terminal_crossover" representing a relatively free (e.g. kinked) junction can be placed by including the index of one of the participating bases in the optional :explicitly_unstacked: tuple. Args: basepair: List of each nucleotide's basepair's index stack: List containing index of the nucleotide stacked on the 3' of each nucleotide three_prime: List of each nucleotide's the 3' end of each nucleotide Returns: SegmentModel """ """ Validate Input """ inputs = (basepair,three_prime) try: basepair,three_prime = [np.array(a,dtype=int) for a in inputs] except: raise TypeError("One or more of the input lists could not be converted into a numpy array") inputs = (basepair,three_prime) coordinate = np.array(coordinate) if np.any( [len(a.shape) > 1 for a in inputs] ): raise ValueError("One or more of the input lists has the wrong dimensionality") if len(coordinate.shape) != 2: raise ValueError("Coordinate array has the wrong dimensionality") inputs = (coordinate,basepair,three_prime) if not np.all(np.diff([len(a) for a in inputs]) == 0): raise ValueError("Inputs are not the same length") num_nt = len(basepair) if sequence is not None and len(sequence) != num_nt: raise ValueError("The 'sequence' parameter is the wrong length {} != {}".format(len(sequence),num_nt)) if orientation is not None: orientation = np.array(orientation) if len(orientation.shape) != 3: raise ValueError("The 'orientation' array has the wrong dimensionality (should be Nx3x3)") if orientation.shape != (num_nt,3,3): raise ValueError("The 'orientation' array is not properly formatted") if stack is None: if orientation is not None: stack = find_stacks(coordinate, orientation) else: ## Guess stacking based on 3' connectivity stack = np.array(three_prime,dtype=int) # Assume nts on 3' ends are stacked _stack_below = _three_prime_list_to_five_prime(stack) _has_bp = (basepair >= 0) _nostack = np.where( (stack == -1)*_has_bp )[0] _has_stack_below = _stack_below[basepair[_nostack]] >= 0 _nostack2 = _nostack[_has_stack_below] stack[_nostack2] = basepair[_stack_below[basepair[_nostack2]]] else: try: stack = np.array(stack,dtype=int) except: raise TypeError("The 'stack' array could not be converted into a numpy integer array") if len(stack.shape) != 1: raise ValueError("The 'stack' array has the wrong dimensionality") if len(stack) != num_nt: raise ValueError("The length of the 'stack' array does not match other inputs") bps = basepair # alias """ Fix stacks: require that the stack of a bp of a base's stack is its bp """ _has_bp = (bps >= 0) _has_stack = (stack >= 0) _stack_has_basepair = (bps[stack] >= 0) * _has_stack stack = np.where( (stack[bps[stack]] == bps) * _has_bp * _has_stack * _has_bp, stack, -np.ones(len(stack),dtype=int) ) five_prime = _three_prime_list_to_five_prime(three_prime) """ Build map of dsDNA helices and strands """ hmap,hrank,fwd,intrahelical = basepairs_and_stacks_to_helixmap(bps,stack,rank) double_stranded_helices = np.unique(hmap[hmap >= 0]) strands, strand_is_circular = _primes_list_to_strands(three_prime, five_prime) """ Add ssDNA to hmap """ if len(double_stranded_helices) > 0: hid = double_stranded_helices[-1]+1 else: hid = 0 ss_residues = hmap < 0 # if np.any(bps[ss_residues] != -1): logger.warning(f'{np.sum(bps[ss_residues] != -1)} ssDNA nucleotides appear to have basepairs... ignoring') for s,c in zip(strands, strand_is_circular): strand_segment_ends = [i for i in np.where( np.diff(hmap[s]) != 0 )[0]] + [len(s)-1] seg_start = 0 for i in strand_segment_ends: if hmap[s[i]] < 0: ## Found single-stranded segment ids = s[seg_start:i+1] assert( np.all(hmap[ids] == -1) ) hmap[ids] = hid hrank[ids] = np.arange(i+1-seg_start) hid+=1 seg_start = i+1 if len(double_stranded_helices) > 0: single_stranded_helices = np.arange(double_stranded_helices[-1]+1,hid) else: single_stranded_helices = np.arange(hid) devlogger.info(f"Creating {len(double_stranded_helices)} dsDNA and {len(single_stranded_helices)} ssDNA segments") ## Create double-stranded segments doubleSegments = [] for hid in double_stranded_helices: seg = DoubleStrandedSegment(name=str(hid), num_bp = np.sum(hmap==hid)//2, _helix_idx = hid) set_splines(seg, coordinate, hid, hmap, hrank, fwd, basepair, orientation) assert(hid == len(doubleSegments)) doubleSegments.append(seg) ## Create single-stranded segments singleSegments = [] for hid in single_stranded_helices: seg = SingleStrandedSegment(name=str(hid), num_nt = np.sum(hmap==hid), _helix_idx = hid) set_splines(seg, coordinate, hid, hmap, hrank, fwd, basepair, orientation) assert(hid == len(doubleSegments) + len(singleSegments)) singleSegments.append(seg) devlogger.debug("Finding crossovers and strand ends") ## Find crossovers and 5prime/3prime ends crossovers,prime5,prime3 = [[],[],[]] for s,c in zip(strands,strand_is_circular): tmp = np.where(np.diff(hmap[s]) != 0)[0] for i in tmp: crossovers.append( (s[i],s[i+1]) ) if c: if hmap[s[-1]] != hmap[s[0]]: crossovers.append( (s[-1],s[0]) ) else: prime5.append(s[0]) prime3.append(s[-1]) ## Add connections allSegments = doubleSegments+singleSegments devlogger.debug(f"Adding {len(crossovers)} connections") for r1,r2 in crossovers: seg1,seg2 = [allSegments[hmap[i]] for i in (r1,r2)] nt1,nt2 = [hrank[i] for i in (r1,r2)] f1,f2 = [fwd[i] for i in (r1,r2)] ## Handle connections at the ends is_terminal1 = (nt1,f1) in ((0,0),(seg1.num_nt-1,1)) is_terminal2 = (nt2,f2) in ((0,1),(seg2.num_nt-1,0)) # devlogger.info(seg1,seg2, r1, r2, is_terminal1, is_terminal2) if is_terminal1 or is_terminal2: """ Ensure that we don't have three-way dsDNA junctions """ if is_terminal1 and (bps[r1] >= 0) and (five_prime[bps[r1]] >= 0) and (three_prime[r1] >= 0): if (bps[five_prime[bps[r1]]] >= 0) and (bps[three_prime[r1]] >= 0): # is_terminal1 = (three_prime[r1] == bps[five_prime[bps[r1]]]) is_terminal1 = hmap[five_prime[bps[r1]]] == hmap[three_prime[r1]] if is_terminal2 and (bps[r2] >= 0) and (three_prime[bps[r2]] >= 0) and (five_prime[r2] >= 0): if (bps[three_prime[bps[r2]]] >= 0) and (bps[five_prime[r2]] >= 0): # is_terminal2 = (five_prime[r2] == bps[three_prime[bps[r2]]]) is_terminal2 = hmap[three_prime[bps[r2]]] == hmap[five_prime[r2]] """ Place connection """ if is_terminal1 and is_terminal2 and (r1 not in explicitly_unstacked) and (r2 not in explicitly_unstacked): end1 = seg1.end3 if f1 else seg1.start3 end2 = seg2.start5 if f2 else seg2.end5 seg1._connect_ends( end1, end2, type_='intrahelical') else: seg1.add_crossover(nt1,seg2,nt2,[f1,f2],type_="terminal_crossover") else: seg1.add_crossover(nt1,seg2,nt2,[f1,f2]) devlogger.debug(f"Converting terminal crossovers to regular ones if other crossovers are nearby") for seg1 in doubleSegments: cAB = seg1.get_connections_and_locations(exclude='intrahelical') conns = {} for c,A,B in cAB: seg2 = B.container if seg2 not in conns: conns[seg2] = [] assert('crossover' in c.type_) conns[seg2].append(c) nt_pos = {c:A.get_nt_pos() for c,A,B in cAB} for c,A,B in cAB: if c.type_ != 'terminal_crossover': continue seg2 = B.container if seg2 not in doubleSegments: continue dists = np.abs(np.array([nt_pos[c] - nt_pos[c2] for c2 in conns[seg2]])) # devlogger.debug(f'Terminal crossover {c} distance: {dists}') if np.sum((dists > 0) & (dists < 75)) > 0: # devlogger.debug(f'Fixing crossover at {c,A,B}') c.type_ = 'crossover' devlogger.debug(f"Adding prime ends and dealing with circular helices") ## Add 5prime/3prime ends for r in prime5: seg = allSegments[hmap[r]] seg.add_5prime(hrank[r],fwd[r]) for r in prime3: seg = allSegments[hmap[r]] seg.add_3prime(hrank[r],fwd[r]) ## Add intrahelical connections to circular helical sections for nt0,nt1 in intrahelical: seg = allSegments[hmap[nt0]] assert( seg is allSegments[hmap[nt1]] ) if three_prime[nt0] >= 0: if hmap[nt0] == hmap[three_prime[nt0]]: seg.connect_end3(seg.start5) bp0,bp1 = [bps[nt] for nt in (nt0,nt1)] if three_prime[bp1] >= 0: if hmap[bp1] == hmap[three_prime[bp1]]: seg.connect_start3(seg.end5) devlogger.debug(f"Adding sequence") ## Assign sequence if sequence is not None: for hid in range(len(allSegments)): resids = np.where( (hmap==hid)*(fwd==1) )[0] s = allSegments[hid] s.sequence = [sequence[r] for r in sorted(resids,key=lambda x: hrank[x])] devlogger.debug(f"Building model") ## Build model model = SegmentModel( allSegments, max_basepairs_per_bead = max_basepairs_per_bead, max_nucleotides_per_bead = max_nucleotides_per_bead, local_twist = local_twist, dimensions = dimensions, **model_parameters ) devlogger.debug(f"Adding _reader_list attributes and randomizing unset sequences") model._reader_list_coordinates = coordinate model._reader_list_basepair = basepair model._reader_list_stack = stack model._reader_list_three_prime = three_prime model._reader_list_five_prime = five_prime model._reader_list_sequence = sequence model._reader_list_orientation = orientation model._reader_list_hmap = hmap model._reader_list_fwd = fwd model._reader_list_hrank = hrank if sequence is None: for s in model.segments: s.randomize_unset_sequence() return model
[docs] def model_from_pandas(df,coordinate_col="r",bp_col="bp",stack_col="stack",three_prime_col="threeprime", seq_col="seq",orientation_col="orientation", max_basepairs_per_bead = 5, max_nucleotides_per_bead = 5, local_twist = False, dimensions=(5000,5000,5000) ,**model_parameters): try: c=np.array(list(df[coordinate_col])) except: print("cannot locate coordinate") try: bp=np.array(df[bp_col]) except: print("cannot locate bp") try: stack=np.array(df[stack_col]) except: print("cannot find stack") try: tprime=np.array(df[three_prime_col]) except: print("cannot locate 3's") try: seq=np.array(df[seq_col]) except: print("no sequence inputted") seq=None try: orient=np.array(list(df[orientation_col])) except: print("no orientation inputted") orient=None try: model=model_from_basepair_stack_3prime(c, bp, stack, tprime, sequence=seq, orientation=orient, max_basepairs_per_bead = max_basepairs_per_bead, max_nucleotides_per_bead =max_nucleotides_per_bead , local_twist = local_twist, dimensions=dimensions, **model_parameters) model._dataframe=df return model except: print("cannot phrase DataFrame, aborted")
[docs] def UNIT_circular(): """ Make a circular DNA strand, with dsDNA in the middle """ coordinate = [(0,0,3.4*i) for i in range(7)] stack = -1*np.ones(len(coordinate)) three_prime = [ 1, 2, 4,-1, 6, 3, 0] basepair = [-1,-1, 3, 2, 5, 4,-1] stack = [-1,-1, 4,-1,-1, 3,-1] for i in [3,5]: coordinate[i] = (1,0,3.4*i) model = model_from_basepair_stack_3prime(coordinate, basepair, stack, three_prime, max_basepairs_per_bead=1, max_nucleotides_per_bead=1, local_twist=True) model.simulate(output_name='circular', directory='unittest')