# -*- coding: utf-8 -*-
import numpy as np
import os,sys
from glob import glob
import re
import pandas as pd
pd.options.mode.chained_assignment = None # default='warn'
from .segmentmodel_from_lists import model_from_basepair_stack_3prime
from ..arbdmodel.coords import readArbdCoords, readAvgArbdCoords, rotationAboutAxis
from ..segmentmodel import SegmentModel, SingleStrandedSegment, DoubleStrandedSegment
from ..model.dna_sequence import m13 as m13seq
from .. import logger, devlogger
import json
import re
import pdb
## TODO: separate SegmentModel from ArbdModel so multiple parts can be combined
## TODO: catch circular strands in "get_5prime" cadnano calls
## TODO: handle special motifs
## - doubly-nicked helices
## - helices that should be stacked across an empty region (crossovers from and end in the helix to another end in the helix)
## - circular constructs
[docs]
def get_lattice(part):
lattice_type = None
_gt = part.getGridType()
try:
lattice_type = _gt.name.lower()
except:
if _gt == 1:
lattice_type = 'square'
elif _gt == 2:
lattice_type = 'honeycomb'
else:
print(lattice_type)
return lattice_type
[docs]
def read_json_file(filename):
import cadnano
from cadnano.document import Document
try:
with open(filename) as ch:
json_data = json.load(ch)
except:
with open(filename) as ch:
content = ""
for l in ch:
l = re.sub(r"'", r'"', l)
# https://stackoverflow.com/questions/4033633/handling-lazy-json-in-python-expecting-property-name
# l = re.sub(r"{\s*(\w)", r'{"\1', l)
# l = re.sub(r",\s*(\w)", r',"\1', l)
# l = re.sub(r"(\w):", r'\1":', l)
content += l+"\n"
json_data = json.loads(content)
try:
doc = Document()
cadnano.fileio.v3decode.decode(doc, json_data)
decoder = 3
except:
doc = Document()
cadnano.fileio.v2decode.decode(doc, json_data)
decoder = 2
parts = [p for p in doc.getParts()]
if len(parts) != 1:
raise Exception("Only documents containing a single cadnano part are implemented at this time.")
part = parts[0]
if decoder == 2:
""" It seems cadnano2.5 (as of ce6ff019) does not set the EulerZ for square lattice structures correctly, doing so here """
l = get_lattice(part)
if l == 'square':
for id_num in part.getIdNums():
if part.vh_properties.loc[id_num,'eulerZ'] == 0:
part.vh_properties.loc[id_num,'eulerZ'] = 360*(6/10.5)
df=pd.DataFrame(json_data["vstrands"])
n_df=df.set_index("num")
return part
[docs]
def get_helix_angle(part, helix_id, indices, index_offset=0, fwd=True):
""" Get "start_orientation" for helix """
""" FROM CADNANO2.5
+ angle is CCW
- angle is CW
Right handed DNA rotates clockwise from 5' to 3'
we use the convention the 5' end starts at 0 degrees
and it's pair is minor_groove_angle degrees away
direction, hence the minus signs. eulerZ
"""
hp, bpr, tpr, eulerZ, mgroove = part.vh_properties.loc[helix_id,
['helical_pitch',
'bases_per_repeat',
'turns_per_repeat',
'eulerZ',
'minor_groove_angle']]
twist_per_base = tpr*360./bpr
# angle = eulerZ - twist_per_base*indices + 0.5*mgroove + 180
angle = eulerZ - twist_per_base*indices - 0.5*mgroove
angle=angle*(np.pi/180)
cos,sin = (np.cos(angle), np.sin(angle))
R = np.zeros( (len(angle),3,3) )
R[:,0,0] = cos
R[:,1,1] = cos
R[:,0,1] = 2*(fwd-0.5)*sin
R[:,1,0] = -2*(fwd-0.5)*sin
R[:,2,2] = -2*(fwd-0.5)
return R
[docs]
def gen_id_series(df, slice_, strand, part, insertion_dict, offset = 0):
vh = strand._id_num
df.loc[slice_,"vh"]=vh
df.loc[slice_,"fwd"]=strand.isForward()
x=part.getVirtualHelixOrigin(strand._id_num)[0]*10
y=part.getVirtualHelixOrigin(strand._id_num)[1]*10
id_lo,id_hi=strand.idxs()
zids=[str(i) for i in range(id_lo,id_hi+1)]
zids0=list(range(id_lo,id_hi+1))
insert_dict=dict([(j.idx(),j.length()) for j in strand.insertionsOnStrand()])
for insert_base in insert_dict:
z_ind=zids0.index(insert_base)
z_val=insert_dict[insert_base]
if vh not in insertion_dict:
insertion_dict[vh] = dict()
insertion_dict[vh][insert_base] = z_val # record insertions for later
zids.pop(z_ind)
zids0.pop(z_ind)
if z_val!=-1:
l=list(range(z_val+1))
l.reverse()
for k in l:
zids0.insert(z_ind,insert_base + k/len(l))
zids.insert(z_ind,str(insert_base)+"."+str(k))
df.loc[slice_,"zid0"] = zids0
df.loc[slice_,"zid"] = zids
z = -3.4 * np.linspace( id_lo+0.25, id_hi-0.25, slice_.stop-slice_.start+1 ) # Note: this spreads
# insertions uniformly through
# the strand, not through
# helical region like original did
L=[(df["vh"][i],df["zid0"][i],df["fwd"][i]) for i in df.loc[slice_].index]
stack = -np.ones(len(zids),dtype=int)
if strand.isForward()==True:
stack[:-1] = np.arange(len(zids)-1,dtype=int) + 1 + offset
stack[-1] = -1
if strand.connection3p() is None:
threeprime_tuple=L[1:]+[(-1,)*3]
else:
threeprime_tuple=L[1:]+[(strand.connection3p().idNum(),strand.connection3p().idx5Prime(),strand.connection3p().isForward())]
else:
stack[1:] = np.arange(len(zids)-1,dtype=int) + offset
stack[0] = -1
if strand.connection3p() is None:
threeprime_tuple=[(-1,)*3]+L[0:-1]
else:
threeprime_tuple=[(strand.connection3p().idNum(),strand.connection3p().idx5Prime(),strand.connection3p().isForward())]+L[0:-1]
df.loc[slice_,"stack"] = stack
df.loc[slice_,"threeprime_tuple"] = threeprime_tuple
## cadnano 3.1 sequence assign is wrong if there is insertion or deletion.
df.loc[slice_,"z"] = z
df.loc[slice_,"r"] = [np.array((x,y,val)) for val in z]
return df
[docs]
def gen_prop_table(part):
devlogger.debug(f' Getting strand sets')
strand_set=[]
id_series=[]
insertion_dict = dict()
total_nts = 0
for i in part.getidNums():
fwd,rev=part.getStrandSets(i)
[strand_set.append(i) for i in fwd.strands()]
[strand_set.append(i) for i in rev.strands()]
total_nts = sum(s.totalLength() for s in strand_set)
devlogger.debug(f' Getting strand properties')
nt_prop=pd.DataFrame(columns=["vh","zid0","zid","fwd","stack","threeprime_tuple","z","r"],index=range(total_nts),dtype=object)
offset = 0
for strand in strand_set:
l = strand.totalLength()
gen_id_series(nt_prop, slice(offset,offset+l-1), strand, part, insertion_dict, offset=offset)
offset += l
devlogger.debug(f' Getting sequences')
nt_prop["seq"]=-1
ind_tuple=list(zip(nt_prop["vh"],nt_prop["zid"],nt_prop["fwd"]))
devlogger.debug(f' Stacking strand ends')
not_stacked = nt_prop[(nt_prop["stack"]==-1)].index
def _get_next(vh, zid, fwd):
ret = np.empty(len(zid))
sl = np.ones(ret.shape, dtype=bool)
nzid = np.array(zid[sl])
while np.sum(sl) > 0:
nzid[sl] = np.floor(nzid[sl]) + fwd[sl]*2-1
insertions = np.empty(np.sum(sl),dtype=int)
for i,(v,n) in enumerate(zip(vh[sl],nzid[sl])):
if v in insertion_dict and n in insertion_dict[v]:
insertions[i] = insertion_dict[v][n]
else: insertions[i] = 0
sl2 = (insertions >= 0)
ids = np.argwhere(sl).flatten()[sl2]
ret[ids] = nzid[ids]
sl[ids] = False
return ret
assert( (nt_prop['stack'] < -1).sum() == 0 )
vh,zid,fwd = [nt_prop[key][not_stacked] for key in ('vh','zid0','fwd')]
nzid= _get_next(vh, zid, fwd)
_hashtable = { (v,z,f):i for i,(v,z,f) in
enumerate(zip(*[nt_prop[key] for key in ('vh','zid0','fwd')])) }
for i,v,z,f in zip(not_stacked, vh,nzid,fwd):
key = (v,z,f)
if key in _hashtable:
nt_prop.loc[i]['stack']= _hashtable[key]
devlogger.debug(f' Building 3prime')
tprime=[]
_hashtable[(-1,)*3] = -1
nt_prop["threeprime"] = [_hashtable[i] for i in list(nt_prop["threeprime_tuple"])]
devlogger.debug(f' Finding orientations')
orientation = np.empty((len(nt_prop),3,3))
for vh in nt_prop["vh"].unique():
sl = (nt_prop["vh"] == vh)
tmp = get_helix_angle(part, vh, np.round(nt_prop["z"][sl].values.astype(float)/3.4).astype(int), nt_prop['fwd'][sl])
orientation[sl] = tmp
nt_prop["orientation"] = [x for x in orientation]
nt_prop=nt_prop.fillna(-1).infer_objects(copy=False)
counter=-1
devlogger.debug(f' Building bp and bp_map orientations')
bp=-np.ones(len(nt_prop.index),dtype=int)
bp_map=dict(zip(ind_tuple,nt_prop.index))
for i,j,k in ind_tuple:
counter+=1
try:
bp[counter]=bp_map[(i,j,not(k))]
except:
pass
nt_prop["bp"]=bp
return nt_prop
[docs]
def mrdna_model_from_cadnano(json_file,seq=None,**model_parameters):
logger.info(f'Reading "{json_file}"')
part=read_json_file(json_file)
devlogger.debug(f'Generating property table')
nt_prop=gen_prop_table(part)
devlogger.debug(f'Building arrays')
if seq is None:
if nt_prop["seq"][0]==-1:
seq=None
else:
seq=nt_prop["seq"]
r=np.array(list(nt_prop['r']))
bp=np.array(list(nt_prop['bp']))
three_prime=np.array((list(nt_prop["threeprime"])))
stack=np.array((list(nt_prop["stack"])))
orientation=np.array(list(nt_prop["orientation"]))
rank = -np.array(list(nt_prop["z"])) # arrange segments in usual way
_vh = np.array(list(nt_prop["vh"]))
devlogger.debug(f'Building model from lists')
model = model_from_basepair_stack_3prime( r, bp, stack, three_prime, seq, orientation,
rank = rank, explicitly_unstacked = np.argwhere(stack == -1),
**model_parameters )
devlogger.debug(f'Fixing segment names')
hmap = model._reader_list_hmap
hrank = model._reader_list_hrank
fwd = model._reader_list_fwd
_starts = dict()
for seg in model.segments:
hid = int(seg._helix_idx)
_tmp = fwd & (hmap == hid) # select forward nts in helix
_list_ids = np.argwhere(_tmp).flatten()
_list_ids = _list_ids[np.argsort( nt_prop['zid'][_list_ids] )]
start_idx, end_idx = (_list_ids[0],_list_ids[-1])
for _a in (start_idx, end_idx): assert( _a.size == 1 )
start = max(map(float, [nt_prop['zid'][i] for i in (start_idx,end_idx)] ))
cadnano_helix = int(nt_prop['vh'][start_idx])
if cadnano_helix not in _starts: _starts[cadnano_helix] = []
_starts[cadnano_helix].append( (seg,start) )
## Assign beta and occupancy
seg._cadnano_bp_to_zidx = np.array( np.floor(nt_prop['zid'][_list_ids].astype(float)) )
seg.occupancy = cadnano_helix
def callback(segment):
for b in segment.beads:
bp = int(round(b.get_nt_position(segment)))
if bp < 0: bp = 0
if bp >= segment.num_nt: bp = segment.num_nt-1
try:
b.beta = segment._cadnano_bp_to_zidx[bp]
if 'orientation_bead' in b.__dict__:
b.orientation_bead.beta = segment._cadnano_bp_to_zidx[bp]
except:
pass
seg._generate_bead_callbacks.append(callback)
def atomic_callback(nucleotide, bp_to_zidx=seg._cadnano_bp_to_zidx):
nt = nucleotide
segment = nucleotide.parent.segment
bp = int(round(segment.contour_to_nt_pos( nt.contour_position )))
if bp < 0: bp = 0
if bp >= segment.num_nt: bp = segment.num_nt-1
try:
nt.beta = segment.bp_to_zidx[bp]
nt.parent.occupancy = segment.occupancy
except:
pass
seg._generate_nucleotide_callbacks.append(atomic_callback)
for vh,ss in _starts.items():
devlogger.debug(f'Renaming segs in {vh}')
for i,(seg,s) in enumerate(sorted(ss, key=lambda x: x[-1])):
devlogger.debug(f' seg {i}: {seg} {s} {seg.end3.connection}')
_newname = f'{vh:d}-{i:d}'
devlogger.debug(f' {seg} -> {_newname}')
seg.name = _newname
seg.segname = seg.name
model._dataframe=nt_prop
return model