# -*- coding: utf-8 -*-
import pdb
import numpy as np
import os,sys
import scipy
import pandas as pd
from mrdna import logger, devlogger
from mrdna.segmentmodel import SegmentModel, SingleStrandedSegment, DoubleStrandedSegment
from mrdna.arbdmodel.coords import quaternion_from_matrix, rotationAboutAxis, quaternion_slerp
from mrdna import get_resource_path
ref_stack_position = np.array((-2.41851735, -0.259761333, 3.39999978))
def _three_prime_list_to_five_prime(three_prime):
five_prime = -np.ones(three_prime.shape, dtype=int)
has_three_prime = np.where(three_prime >= 0)[0]
five_prime[three_prime[has_three_prime]] = has_three_prime
return five_prime
def _primes_list_to_strands(three_prime, five_prime):
five_prime_ends = np.where(five_prime < 0)[0]
strands = []
strand_is_circular = []
idx_to_strand = -np.ones(three_prime.shape, dtype=int)
def build_strand(nt_idx, conditional):
strand = [nt_idx]
idx_to_strand[nt_idx] = len(strands)
while conditional(nt_idx):
nt_idx = three_prime[nt_idx]
strand.append(nt_idx)
idx_to_strand[nt_idx] = len(strands)
strands.append( np.array(strand, dtype=int) )
for nt_idx in five_prime_ends:
build_strand(nt_idx,
lambda nt: three_prime[nt] >= 0)
strand_is_circular.append(False)
while True:
## print("WARNING: working on circular strand {}".format(len(strands)))
ids = np.where(idx_to_strand < 0)[0]
if len(ids) == 0: break
build_strand(ids[0],
lambda nt: three_prime[nt] >= 0 and \
idx_to_strand[three_prime[nt]] < 0)
strand_is_circular.append(True)
return strands, strand_is_circular
[docs]
def find_stacks(centers, transforms):
## Find orientation and center of each nucleotide
expected_stack_positions = []
for R,c in zip(transforms,centers):
expected_stack_positions.append( c + ref_stack_position.dot(R) )
expected_stack_positions = np.array(expected_stack_positions, dtype=np.float32)
dists = scipy.spatial.distance_matrix(expected_stack_positions, centers)
dists = dists + 5*np.eye(len(dists))
idx1, idx2 = np.where(dists < 3.5)
## Convert distances to stacks
stacks_above = -np.ones(len(centers), dtype=int)
_z = np.array((0,0,1))
for i in np.unique(idx1):
js = idx2[ idx1 == i ]
with np.errstate(divide='ignore',invalid='ignore'):
angles = [np.arccos( transforms[j].T.dot( transforms[i].dot(_z) ).dot( _z ) ) for j in js]
angles = np.array( angles )
tmp = np.argmin(dists[i][js] + 1.0*angles)
j = js[tmp]
stacks_above[i] = j
return stacks_above
[docs]
def basepairs_and_stacks_to_helixmap(basepairs,stacks_above, nucleotide_rank=None):
""" Decide how helices should be created from basepair and stack data. Optionally use nucleotide_rank to determine direction of each helix and order of helices """
helixmap = -np.ones(basepairs.shape, dtype=int)
helixrank = -np.ones(basepairs.shape)
is_fwd = np.ones(basepairs.shape, dtype=int)
if nucleotide_rank is None:
nucleotide_rank = np.arange(len(basepairs), dtype=int)
## Remove stacks with nts lacking a basepairs
nobp = np.where(basepairs < 0)[0]
stacks_above[nobp] = -1
stacks_with_nobp = np.in1d(stacks_above, nobp)
stacks_above[stacks_with_nobp] = -1
end_ids = np.where( (stacks_above < 0)*(basepairs >= 0) )[0]
hid = 0
for end in sorted(end_ids, key=lambda x: nucleotide_rank[x]):
if helixmap[end] >= 0:
continue
rank = 0
nt = basepairs[end]
bp = basepairs[nt]
assert( bp == end )
if helixmap[nt] >= 0 or helixmap[bp] >= 0:
logger.warning(f'Ill-formed helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping')
continue
# assert(helixmap[nt] == -1)
# assert(helixmap[bp] == -1)
helixmap[nt] = helixmap[bp] = hid
helixrank[nt] = helixrank[bp] = rank
is_fwd[bp] = 0
rank +=1
_tmp = [(nt,bp)]
while stacks_above[nt] >= 0:
nt = stacks_above[nt]
if basepairs[nt] < 0: break
bp = basepairs[nt]
if helixmap[nt] >= 0 or helixmap[bp] >= 0:
logger.warning(f'Ill-formed helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping')
break
helixmap[nt] = helixmap[bp] = hid
helixrank[nt] = helixrank[bp] = rank
is_fwd[bp] = 0
_tmp.append((nt,bp))
rank +=1
hid += 1
## Create "helix" for each circular segment
intrahelical = []
processed = set()
unclaimed_bases = np.where( (basepairs >= 0)*(helixmap == -1) )[0]
for nt0 in unclaimed_bases:
if nt0 in processed: continue
nt = nt0
all_nts = [nt]
rank = 0
nt = nt0
bp = basepairs[nt]
if helixmap[nt] >= 0 or helixmap[bp] >= 0:
logger.warning(f'Ill-formed cylic helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping')
continue
helixmap[nt] = helixmap[bp] = hid
helixrank[nt] = helixrank[bp] = rank
is_fwd[bp] = 0
rank +=1
processed.add(nt)
processed.add(bp)
counter = 0
while stacks_above[nt] >= 0:
lastnt = nt
nt = stacks_above[nt]
bp = basepairs[nt]
if nt == nt0 or nt == basepairs[nt0]:
intrahelical.append((lastnt,nt0))
break
assert( bp >= 0 )
if helixmap[nt] >= 0 or helixmap[bp] >= 0:
logger.warning(f'Ill-formed cyclic helix: problematic basepair or stacking data near nucleotide {nt} or {bp}... skipping')
break
helixmap[nt] = helixmap[bp] = hid
helixrank[nt] = helixrank[bp] = rank
is_fwd[bp] = 0
processed.add(nt)
processed.add(bp)
rank +=1
hid += 1
return helixmap, helixrank, is_fwd, intrahelical
[docs]
def set_splines(seg, coordinate, hid, hmap, hrank, fwd, basepair, orientation=None):
maxrank = np.max( hrank[hmap==hid] )
if maxrank == 0:
ids = np.where((hmap == hid))[0]
pos = np.mean( [coordinate[r,:] for r in ids ], axis=0 )
coords = [pos,pos]
contours = [0,1]
if orientation is not None:
ids = np.where((hmap == hid) * fwd)[0]
assert( len(ids) == 1 )
q = quaternion_from_matrix( orientation[ids[0]] )
quats = [q, q]
coords[-1] = pos + orientation[ids[0]].dot(np.array((0,0,1)))
else:
coords,contours,quats = [[],[],[]]
last_q = None
for rank in range(int(maxrank)+1):
ids = np.where((hmap == hid) * (hrank == rank))[0]
coords.append(np.mean( [coordinate[r,:] for r in ids ], axis=0 ))
contours.append( float(rank+0.5)/(maxrank+1) )
if orientation is not None:
ids = np.where((hmap == hid) * (hrank == rank) * fwd)[0]
assert(len(ids) == 1)
q = quaternion_from_matrix( orientation[ids[0]] )
if last_q is not None and last_q.dot(q) < 0:
q = -q
## Average quaterion with reverse direction
bp = basepair[ids[0]]
if bp >= 0:
bp_o = orientation[bp].dot(rotationAboutAxis(np.array((1,0,0)),180))
q2 = quaternion_from_matrix( bp_o )
if q.dot(q2) < 0:
q2 = -q2
## probably good enough, but slerp is better: q = (q + q2)*0.5
q = quaternion_slerp(q,q2,0.5)
quats.append(q)
last_q = q
coords = np.array(coords)
seg.set_splines(contours,coords)
if orientation is not None:
quats = np.array(quats)
seg.set_orientation_splines(contours,quats)
seg.start_position = coords[0,:]
seg.end_position = coords[-1,:]
[docs]
def model_from_basepair_stack_3prime(coordinate, basepair, stack, three_prime,
sequence=None, orientation=None,
rank=None, explicitly_unstacked = tuple(),
max_basepairs_per_bead = 5,
max_nucleotides_per_bead = 5,
local_twist = False,
dimensions=(5000,5000,5000),
**model_parameters):
"""
Creates a SegmentModel object from lists of each nucleotide's
basepair, its stack (on 3' side) and its 3'-connected nucleotide
The first argument should be an N-by-3 numpy array containing the
coordinate of each nucleotide, where N is the number of
nucleotides. The following three arguments should be integer lists
where the i-th element corresponds to the i-th nucleotide; the
list element should the integer index of the corresponding
basepaired / stacked / phosphodiester-bonded nucleotide. If there
is no such nucleotide, the value should be -1.
By default, where there is a 3'-to-5' connection between
double-straned helices, they will be connected by an intrahelical
connection that makes the helical domain continue across the
connection, even if they do not stack in the stacking
array. Instead, a "terminal_crossover" representing a relatively
free (e.g. kinked) junction can be placed by including the index
of one of the participating bases in the optional
:explicitly_unstacked: tuple.
Args:
basepair: List of each nucleotide's basepair's index
stack: List containing index of the nucleotide stacked on the 3' of each nucleotide
three_prime: List of each nucleotide's the 3' end of each nucleotide
Returns:
SegmentModel
"""
""" Validate Input """
inputs = (basepair,three_prime)
try:
basepair,three_prime = [np.array(a,dtype=int) for a in inputs]
except:
raise TypeError("One or more of the input lists could not be converted into a numpy array")
inputs = (basepair,three_prime)
coordinate = np.array(coordinate)
if np.any( [len(a.shape) > 1 for a in inputs] ):
raise ValueError("One or more of the input lists has the wrong dimensionality")
if len(coordinate.shape) != 2:
raise ValueError("Coordinate array has the wrong dimensionality")
inputs = (coordinate,basepair,three_prime)
if not np.all(np.diff([len(a) for a in inputs]) == 0):
raise ValueError("Inputs are not the same length")
num_nt = len(basepair)
if sequence is not None and len(sequence) != num_nt:
raise ValueError("The 'sequence' parameter is the wrong length {} != {}".format(len(sequence),num_nt))
if orientation is not None:
orientation = np.array(orientation)
if len(orientation.shape) != 3:
raise ValueError("The 'orientation' array has the wrong dimensionality (should be Nx3x3)")
if orientation.shape != (num_nt,3,3):
raise ValueError("The 'orientation' array is not properly formatted")
if stack is None:
if orientation is not None:
stack = find_stacks(coordinate, orientation)
else:
## Guess stacking based on 3' connectivity
stack = np.array(three_prime,dtype=int) # Assume nts on 3' ends are stacked
_stack_below = _three_prime_list_to_five_prime(stack)
_has_bp = (basepair >= 0)
_nostack = np.where( (stack == -1)*_has_bp )[0]
_has_stack_below = _stack_below[basepair[_nostack]] >= 0
_nostack2 = _nostack[_has_stack_below]
stack[_nostack2] = basepair[_stack_below[basepair[_nostack2]]]
else:
try:
stack = np.array(stack,dtype=int)
except:
raise TypeError("The 'stack' array could not be converted into a numpy integer array")
if len(stack.shape) != 1:
raise ValueError("The 'stack' array has the wrong dimensionality")
if len(stack) != num_nt:
raise ValueError("The length of the 'stack' array does not match other inputs")
bps = basepair # alias
""" Fix stacks: require that the stack of a bp of a base's stack is its bp """
_has_bp = (bps >= 0)
_has_stack = (stack >= 0)
_stack_has_basepair = (bps[stack] >= 0) * _has_stack
stack = np.where( (stack[bps[stack]] == bps) * _has_bp * _has_stack * _has_bp,
stack, -np.ones(len(stack),dtype=int) )
five_prime = _three_prime_list_to_five_prime(three_prime)
""" Build map of dsDNA helices and strands """
hmap,hrank,fwd,intrahelical = basepairs_and_stacks_to_helixmap(bps,stack,rank)
double_stranded_helices = np.unique(hmap[hmap >= 0])
strands, strand_is_circular = _primes_list_to_strands(three_prime, five_prime)
""" Add ssDNA to hmap """
if len(double_stranded_helices) > 0:
hid = double_stranded_helices[-1]+1
else:
hid = 0
ss_residues = hmap < 0
#
if np.any(bps[ss_residues] != -1):
logger.warning(f'{np.sum(bps[ss_residues] != -1)} ssDNA nucleotides appear to have basepairs... ignoring')
for s,c in zip(strands, strand_is_circular):
strand_segment_ends = [i for i in np.where( np.diff(hmap[s]) != 0 )[0]] + [len(s)-1]
seg_start = 0
for i in strand_segment_ends:
if hmap[s[i]] < 0:
## Found single-stranded segment
ids = s[seg_start:i+1]
assert( np.all(hmap[ids] == -1) )
hmap[ids] = hid
hrank[ids] = np.arange(i+1-seg_start)
hid+=1
seg_start = i+1
if len(double_stranded_helices) > 0:
single_stranded_helices = np.arange(double_stranded_helices[-1]+1,hid)
else:
single_stranded_helices = np.arange(hid)
devlogger.info(f"Creating {len(double_stranded_helices)} dsDNA and {len(single_stranded_helices)} ssDNA segments")
## Create double-stranded segments
doubleSegments = []
for hid in double_stranded_helices:
seg = DoubleStrandedSegment(name=str(hid),
num_bp = np.sum(hmap==hid)//2,
_helix_idx = hid)
set_splines(seg, coordinate, hid, hmap, hrank, fwd, basepair, orientation)
assert(hid == len(doubleSegments))
doubleSegments.append(seg)
## Create single-stranded segments
singleSegments = []
for hid in single_stranded_helices:
seg = SingleStrandedSegment(name=str(hid),
num_nt = np.sum(hmap==hid),
_helix_idx = hid)
set_splines(seg, coordinate, hid, hmap, hrank, fwd, basepair, orientation)
assert(hid == len(doubleSegments) + len(singleSegments))
singleSegments.append(seg)
devlogger.debug("Finding crossovers and strand ends")
## Find crossovers and 5prime/3prime ends
crossovers,prime5,prime3 = [[],[],[]]
for s,c in zip(strands,strand_is_circular):
tmp = np.where(np.diff(hmap[s]) != 0)[0]
for i in tmp:
crossovers.append( (s[i],s[i+1]) )
if c:
if hmap[s[-1]] != hmap[s[0]]:
crossovers.append( (s[-1],s[0]) )
else:
prime5.append(s[0])
prime3.append(s[-1])
## Add connections
allSegments = doubleSegments+singleSegments
devlogger.debug(f"Adding {len(crossovers)} connections")
for r1,r2 in crossovers:
seg1,seg2 = [allSegments[hmap[i]] for i in (r1,r2)]
nt1,nt2 = [hrank[i] for i in (r1,r2)]
f1,f2 = [fwd[i] for i in (r1,r2)]
## Handle connections at the ends
is_terminal1 = (nt1,f1) in ((0,0),(seg1.num_nt-1,1))
is_terminal2 = (nt2,f2) in ((0,1),(seg2.num_nt-1,0))
# devlogger.info(seg1,seg2, r1, r2, is_terminal1, is_terminal2)
if is_terminal1 or is_terminal2:
""" Ensure that we don't have three-way dsDNA junctions """
if is_terminal1 and (bps[r1] >= 0) and (five_prime[bps[r1]] >= 0) and (three_prime[r1] >= 0):
if (bps[five_prime[bps[r1]]] >= 0) and (bps[three_prime[r1]] >= 0):
# is_terminal1 = (three_prime[r1] == bps[five_prime[bps[r1]]])
is_terminal1 = hmap[five_prime[bps[r1]]] == hmap[three_prime[r1]]
if is_terminal2 and (bps[r2] >= 0) and (three_prime[bps[r2]] >= 0) and (five_prime[r2] >= 0):
if (bps[three_prime[bps[r2]]] >= 0) and (bps[five_prime[r2]] >= 0):
# is_terminal2 = (five_prime[r2] == bps[three_prime[bps[r2]]])
is_terminal2 = hmap[three_prime[bps[r2]]] == hmap[five_prime[r2]]
""" Place connection """
if is_terminal1 and is_terminal2 and (r1 not in explicitly_unstacked) and (r2 not in explicitly_unstacked):
end1 = seg1.end3 if f1 else seg1.start3
end2 = seg2.start5 if f2 else seg2.end5
seg1._connect_ends( end1, end2, type_='intrahelical')
else:
seg1.add_crossover(nt1,seg2,nt2,[f1,f2],type_="terminal_crossover")
else:
seg1.add_crossover(nt1,seg2,nt2,[f1,f2])
devlogger.debug(f"Converting terminal crossovers to regular ones if other crossovers are nearby")
for seg1 in doubleSegments:
cAB = seg1.get_connections_and_locations(exclude='intrahelical')
conns = {}
for c,A,B in cAB:
seg2 = B.container
if seg2 not in conns: conns[seg2] = []
assert('crossover' in c.type_)
conns[seg2].append(c)
nt_pos = {c:A.get_nt_pos() for c,A,B in cAB}
for c,A,B in cAB:
if c.type_ != 'terminal_crossover': continue
seg2 = B.container
if seg2 not in doubleSegments: continue
dists = np.abs(np.array([nt_pos[c] - nt_pos[c2] for c2 in conns[seg2]]))
# devlogger.debug(f'Terminal crossover {c} distance: {dists}')
if np.sum((dists > 0) & (dists < 75)) > 0:
# devlogger.debug(f'Fixing crossover at {c,A,B}')
c.type_ = 'crossover'
devlogger.debug(f"Adding prime ends and dealing with circular helices")
## Add 5prime/3prime ends
for r in prime5:
seg = allSegments[hmap[r]]
seg.add_5prime(hrank[r],fwd[r])
for r in prime3:
seg = allSegments[hmap[r]]
seg.add_3prime(hrank[r],fwd[r])
## Add intrahelical connections to circular helical sections
for nt0,nt1 in intrahelical:
seg = allSegments[hmap[nt0]]
assert( seg is allSegments[hmap[nt1]] )
if three_prime[nt0] >= 0:
if hmap[nt0] == hmap[three_prime[nt0]]:
seg.connect_end3(seg.start5)
bp0,bp1 = [bps[nt] for nt in (nt0,nt1)]
if three_prime[bp1] >= 0:
if hmap[bp1] == hmap[three_prime[bp1]]:
seg.connect_start3(seg.end5)
devlogger.debug(f"Adding sequence")
## Assign sequence
if sequence is not None:
for hid in range(len(allSegments)):
resids = np.where( (hmap==hid)*(fwd==1) )[0]
s = allSegments[hid]
s.sequence = [sequence[r] for r in sorted(resids,key=lambda x: hrank[x])]
devlogger.debug(f"Building model")
## Build model
model = SegmentModel( allSegments,
max_basepairs_per_bead = max_basepairs_per_bead,
max_nucleotides_per_bead = max_nucleotides_per_bead,
local_twist = local_twist,
dimensions = dimensions,
**model_parameters )
devlogger.debug(f"Adding _reader_list attributes and randomizing unset sequences")
model._reader_list_coordinates = coordinate
model._reader_list_basepair = basepair
model._reader_list_stack = stack
model._reader_list_three_prime = three_prime
model._reader_list_five_prime = five_prime
model._reader_list_sequence = sequence
model._reader_list_orientation = orientation
model._reader_list_hmap = hmap
model._reader_list_fwd = fwd
model._reader_list_hrank = hrank
if sequence is None:
for s in model.segments:
s.randomize_unset_sequence()
return model
[docs]
def model_from_pandas(df,coordinate_col="r",bp_col="bp",stack_col="stack",three_prime_col="threeprime",
seq_col="seq",orientation_col="orientation",
max_basepairs_per_bead = 5,
max_nucleotides_per_bead = 5,
local_twist = False,
dimensions=(5000,5000,5000)
,**model_parameters):
try:
c=np.array(list(df[coordinate_col]))
except:
print("cannot locate coordinate")
try:
bp=np.array(df[bp_col])
except:
print("cannot locate bp")
try:
stack=np.array(df[stack_col])
except:
print("cannot find stack")
try:
tprime=np.array(df[three_prime_col])
except:
print("cannot locate 3's")
try:
seq=np.array(df[seq_col])
except:
print("no sequence inputted")
seq=None
try:
orient=np.array(list(df[orientation_col]))
except:
print("no orientation inputted")
orient=None
try:
model=model_from_basepair_stack_3prime(c, bp, stack, tprime,
sequence=seq, orientation=orient,
max_basepairs_per_bead = max_basepairs_per_bead,
max_nucleotides_per_bead =max_nucleotides_per_bead ,
local_twist = local_twist,
dimensions=dimensions,
**model_parameters)
model._dataframe=df
return model
except:
print("cannot phrase DataFrame, aborted")
[docs]
def UNIT_circular():
""" Make a circular DNA strand, with dsDNA in the middle """
coordinate = [(0,0,3.4*i) for i in range(7)]
stack = -1*np.ones(len(coordinate))
three_prime = [ 1, 2, 4,-1, 6, 3, 0]
basepair = [-1,-1, 3, 2, 5, 4,-1]
stack = [-1,-1, 4,-1,-1, 3,-1]
for i in [3,5]:
coordinate[i] = (1,0,3.4*i)
model = model_from_basepair_stack_3prime(coordinate, basepair, stack, three_prime,
max_basepairs_per_bead=1,
max_nucleotides_per_bead=1,
local_twist=True)
model.simulate(output_name='circular', directory='unittest')