Source code for mrdna.readers.segmentmodel_from_cadnano

# -*- coding: utf-8 -*-
import numpy as np
import os,sys
from glob import glob
import re
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
from .segmentmodel_from_lists import model_from_basepair_stack_3prime

from ..arbdmodel.coords import readArbdCoords, readAvgArbdCoords, rotationAboutAxis
from ..segmentmodel import SegmentModel, SingleStrandedSegment, DoubleStrandedSegment
from ..model.dna_sequence import m13 as m13seq
from .. import logger, devlogger
import json
import re
import pdb



## TODO: separate SegmentModel from ArbdModel so multiple parts can be combined
## TODO: catch circular strands in "get_5prime" cadnano calls
## TODO: handle special motifs
##   - doubly-nicked helices
##   - helices that should be stacked across an empty region (crossovers from and end in the helix to another end in the helix)
##   - circular constructs


[docs] def get_lattice(part): lattice_type = None _gt = part.getGridType() try: lattice_type = _gt.name.lower() except: if _gt == 1: lattice_type = 'square' elif _gt == 2: lattice_type = 'honeycomb' else: print(lattice_type) return lattice_type
[docs] def read_json_file(filename): import cadnano from cadnano.document import Document try: with open(filename) as ch: json_data = json.load(ch) except: with open(filename) as ch: content = "" for l in ch: l = re.sub(r"'", r'"', l) # https://stackoverflow.com/questions/4033633/handling-lazy-json-in-python-expecting-property-name # l = re.sub(r"{\s*(\w)", r'{"\1', l) # l = re.sub(r",\s*(\w)", r',"\1', l) # l = re.sub(r"(\w):", r'\1":', l) content += l+"\n" json_data = json.loads(content) try: doc = Document() cadnano.fileio.v3decode.decode(doc, json_data) decoder = 3 except: doc = Document() cadnano.fileio.v2decode.decode(doc, json_data) decoder = 2 parts = [p for p in doc.getParts()] if len(parts) != 1: raise Exception("Only documents containing a single cadnano part are implemented at this time.") part = parts[0] if decoder == 2: """ It seems cadnano2.5 (as of ce6ff019) does not set the EulerZ for square lattice structures correctly, doing so here """ l = get_lattice(part) if l == 'square': for id_num in part.getIdNums(): if part.vh_properties.loc[id_num,'eulerZ'] == 0: part.vh_properties.loc[id_num,'eulerZ'] = 360*(6/10.5) df=pd.DataFrame(json_data["vstrands"]) n_df=df.set_index("num") return part
[docs] def get_helix_angle(part, helix_id, indices, index_offset=0, fwd=True): """ Get "start_orientation" for helix """ """ FROM CADNANO2.5 + angle is CCW - angle is CW Right handed DNA rotates clockwise from 5' to 3' we use the convention the 5' end starts at 0 degrees and it's pair is minor_groove_angle degrees away direction, hence the minus signs. eulerZ """ hp, bpr, tpr, eulerZ, mgroove = part.vh_properties.loc[helix_id, ['helical_pitch', 'bases_per_repeat', 'turns_per_repeat', 'eulerZ', 'minor_groove_angle']] twist_per_base = tpr*360./bpr # angle = eulerZ - twist_per_base*indices + 0.5*mgroove + 180 angle = eulerZ - twist_per_base*indices - 0.5*mgroove angle=angle*(np.pi/180) cos,sin = (np.cos(angle), np.sin(angle)) R = np.zeros( (len(angle),3,3) ) R[:,0,0] = cos R[:,1,1] = cos R[:,0,1] = 2*(fwd-0.5)*sin R[:,1,0] = -2*(fwd-0.5)*sin R[:,2,2] = -2*(fwd-0.5) return R
[docs] def gen_id_series(df, slice_, strand, part, insertion_dict, offset = 0): vh = strand._id_num df.loc[slice_,"vh"]=vh df.loc[slice_,"fwd"]=strand.isForward() x=part.getVirtualHelixOrigin(strand._id_num)[0]*10 y=part.getVirtualHelixOrigin(strand._id_num)[1]*10 id_lo,id_hi=strand.idxs() zids=[str(i) for i in range(id_lo,id_hi+1)] zids0=list(range(id_lo,id_hi+1)) insert_dict=dict([(j.idx(),j.length()) for j in strand.insertionsOnStrand()]) for insert_base in insert_dict: z_ind=zids0.index(insert_base) z_val=insert_dict[insert_base] if vh not in insertion_dict: insertion_dict[vh] = dict() insertion_dict[vh][insert_base] = z_val # record insertions for later zids.pop(z_ind) zids0.pop(z_ind) if z_val!=-1: l=list(range(z_val+1)) l.reverse() for k in l: zids0.insert(z_ind,insert_base + k/len(l)) zids.insert(z_ind,str(insert_base)+"."+str(k)) df.loc[slice_,"zid0"] = zids0 df.loc[slice_,"zid"] = zids z = -3.4 * np.linspace( id_lo+0.25, id_hi-0.25, slice_.stop-slice_.start+1 ) # Note: this spreads # insertions uniformly through # the strand, not through # helical region like original did L=[(df["vh"][i],df["zid0"][i],df["fwd"][i]) for i in df.loc[slice_].index] stack = -np.ones(len(zids),dtype=int) if strand.isForward()==True: stack[:-1] = np.arange(len(zids)-1,dtype=int) + 1 + offset stack[-1] = -1 if strand.connection3p() is None: threeprime_tuple=L[1:]+[(-1,)*3] else: threeprime_tuple=L[1:]+[(strand.connection3p().idNum(),strand.connection3p().idx5Prime(),strand.connection3p().isForward())] else: stack[1:] = np.arange(len(zids)-1,dtype=int) + offset stack[0] = -1 if strand.connection3p() is None: threeprime_tuple=[(-1,)*3]+L[0:-1] else: threeprime_tuple=[(strand.connection3p().idNum(),strand.connection3p().idx5Prime(),strand.connection3p().isForward())]+L[0:-1] df.loc[slice_,"stack"] = stack df.loc[slice_,"threeprime_tuple"] = threeprime_tuple ## cadnano 3.1 sequence assign is wrong if there is insertion or deletion. df.loc[slice_,"z"] = z df.loc[slice_,"r"] = [np.array((x,y,val)) for val in z] return df
[docs] def gen_prop_table(part): devlogger.debug(f' Getting strand sets') strand_set=[] id_series=[] insertion_dict = dict() total_nts = 0 for i in part.getidNums(): fwd,rev=part.getStrandSets(i) [strand_set.append(i) for i in fwd.strands()] [strand_set.append(i) for i in rev.strands()] total_nts = sum(s.totalLength() for s in strand_set) devlogger.debug(f' Getting strand properties') nt_prop=pd.DataFrame(columns=["vh","zid0","zid","fwd","stack","threeprime_tuple","z","r"],index=range(total_nts),dtype=object) offset = 0 for strand in strand_set: l = strand.totalLength() gen_id_series(nt_prop, slice(offset,offset+l-1), strand, part, insertion_dict, offset=offset) offset += l devlogger.debug(f' Getting sequences') nt_prop["seq"]=-1 ind_tuple=list(zip(nt_prop["vh"],nt_prop["zid"],nt_prop["fwd"])) devlogger.debug(f' Stacking strand ends') not_stacked = nt_prop[(nt_prop["stack"]==-1)].index def _get_next(vh, zid, fwd): ret = np.empty(len(zid)) sl = np.ones(ret.shape, dtype=bool) nzid = np.array(zid[sl]) while np.sum(sl) > 0: nzid[sl] = np.floor(nzid[sl]) + fwd[sl]*2-1 insertions = np.empty(np.sum(sl),dtype=int) for i,(v,n) in enumerate(zip(vh[sl],nzid[sl])): if v in insertion_dict and n in insertion_dict[v]: insertions[i] = insertion_dict[v][n] else: insertions[i] = 0 sl2 = (insertions >= 0) ids = np.argwhere(sl).flatten()[sl2] ret[ids] = nzid[ids] sl[ids] = False return ret assert( (nt_prop['stack'] < -1).sum() == 0 ) vh,zid,fwd = [nt_prop[key][not_stacked] for key in ('vh','zid0','fwd')] nzid= _get_next(vh, zid, fwd) _hashtable = { (v,z,f):i for i,(v,z,f) in enumerate(zip(*[nt_prop[key] for key in ('vh','zid0','fwd')])) } for i,v,z,f in zip(not_stacked, vh,nzid,fwd): key = (v,z,f) if key in _hashtable: nt_prop.loc[i]['stack']= _hashtable[key] devlogger.debug(f' Building 3prime') tprime=[] _hashtable[(-1,)*3] = -1 nt_prop["threeprime"] = [_hashtable[i] for i in list(nt_prop["threeprime_tuple"])] devlogger.debug(f' Finding orientations') orientation = np.empty((len(nt_prop),3,3)) for vh in nt_prop["vh"].unique(): sl = (nt_prop["vh"] == vh) tmp = get_helix_angle(part, vh, np.round(nt_prop["z"][sl].values.astype(float)/3.4).astype(int), nt_prop['fwd'][sl]) orientation[sl] = tmp nt_prop["orientation"] = [x for x in orientation] nt_prop=nt_prop.fillna(-1).infer_objects(copy=False) counter=-1 devlogger.debug(f' Building bp and bp_map orientations') bp=-np.ones(len(nt_prop.index),dtype=int) bp_map=dict(zip(ind_tuple,nt_prop.index)) for i,j,k in ind_tuple: counter+=1 try: bp[counter]=bp_map[(i,j,not(k))] except: pass nt_prop["bp"]=bp return nt_prop
[docs] def mrdna_model_from_cadnano(json_file,seq=None,**model_parameters): logger.info(f'Reading "{json_file}"') part=read_json_file(json_file) devlogger.debug(f'Generating property table') nt_prop=gen_prop_table(part) devlogger.debug(f'Building arrays') if seq is None: if nt_prop["seq"][0]==-1: seq=None else: seq=nt_prop["seq"] r=np.array(list(nt_prop['r'])) bp=np.array(list(nt_prop['bp'])) three_prime=np.array((list(nt_prop["threeprime"]))) stack=np.array((list(nt_prop["stack"]))) orientation=np.array(list(nt_prop["orientation"])) rank = -np.array(list(nt_prop["z"])) # arrange segments in usual way _vh = np.array(list(nt_prop["vh"])) devlogger.debug(f'Building model from lists') model = model_from_basepair_stack_3prime( r, bp, stack, three_prime, seq, orientation, rank = rank, explicitly_unstacked = np.argwhere(stack == -1), **model_parameters ) devlogger.debug(f'Fixing segment names') hmap = model._reader_list_hmap hrank = model._reader_list_hrank fwd = model._reader_list_fwd _starts = dict() for seg in model.segments: hid = int(seg._helix_idx) _tmp = fwd & (hmap == hid) # select forward nts in helix _list_ids = np.argwhere(_tmp).flatten() _list_ids = _list_ids[np.argsort( nt_prop['zid'][_list_ids] )] start_idx, end_idx = (_list_ids[0],_list_ids[-1]) for _a in (start_idx, end_idx): assert( _a.size == 1 ) start = max(map(float, [nt_prop['zid'][i] for i in (start_idx,end_idx)] )) cadnano_helix = int(nt_prop['vh'][start_idx]) if cadnano_helix not in _starts: _starts[cadnano_helix] = [] _starts[cadnano_helix].append( (seg,start) ) ## Assign beta and occupancy seg._cadnano_bp_to_zidx = np.array( np.floor(nt_prop['zid'][_list_ids].astype(float)) ) seg.occupancy = cadnano_helix def callback(segment): for b in segment.beads: bp = int(round(b.get_nt_position(segment))) if bp < 0: bp = 0 if bp >= segment.num_nt: bp = segment.num_nt-1 try: b.beta = segment._cadnano_bp_to_zidx[bp] if 'orientation_bead' in b.__dict__: b.orientation_bead.beta = segment._cadnano_bp_to_zidx[bp] except: pass seg._generate_bead_callbacks.append(callback) def atomic_callback(nucleotide, bp_to_zidx=seg._cadnano_bp_to_zidx): nt = nucleotide segment = nucleotide.parent.segment bp = int(round(segment.contour_to_nt_pos( nt.contour_position ))) if bp < 0: bp = 0 if bp >= segment.num_nt: bp = segment.num_nt-1 try: nt.beta = segment.bp_to_zidx[bp] nt.parent.occupancy = segment.occupancy except: pass seg._generate_nucleotide_callbacks.append(atomic_callback) for vh,ss in _starts.items(): devlogger.debug(f'Renaming segs in {vh}') for i,(seg,s) in enumerate(sorted(ss, key=lambda x: x[-1])): devlogger.debug(f' seg {i}: {seg} {s} {seg.end3.connection}') _newname = f'{vh:d}-{i:d}' devlogger.debug(f' {seg} -> {_newname}') seg.name = _newname seg.segname = seg.name model._dataframe=nt_prop return model