Source code for mrdna.readers.cadnano_segments

# -*- coding: utf-8 -*-
import pdb
import numpy as np
import os,sys
from glob import glob
import re

from ..arbdmodel.coords import readArbdCoords, readAvgArbdCoords, rotationAboutAxis
from ..segmentmodel import SegmentModel, SingleStrandedSegment, DoubleStrandedSegment
from ..model.dna_sequence import m13 as m13seq

## Only testing on cadnano2.5
## TODO: separate SegmentModel from ArbdModel so multiple parts can be combined
## TODO: catch circular strands in "get_5prime" cadnano calls
## TODO: handle special motifs
##   - doubly-nicked helices
##   - helices that should be stacked across an empty region (crossovers from and end in the helix to another end in the helix)
##   - circular constructs



[docs] class cadnano_part(SegmentModel):
[docs] def __init__(self, part, **kwargs ): self.part = part self.lattice_type = _get_lattice(part) self._cadnano_part_to_segments(part) # SegmentModel.__init__(self,...) # self.segments = [seg for hid,segs in self.helices.items() for seg in segs] self.segments = [seg for hid,segs in sorted(self.helices.items()) for seg in segs] self._add_intrahelical_connections() self._add_crossovers() self._add_prime_ends() SegmentModel.__init__(self, self.segments, **kwargs)
def _get_helix_angle(self, helix_id, indices): """ Get "start_orientation" for helix """ # import ipdb # ipdb.set_trace() """ FROM CADNANO2.5 + angle is CCW - angle is CW Right handed DNA rotates clockwise from 5' to 3' we use the convention the 5' end starts at 0 degrees and it's pair is minor_groove_angle degrees away direction, hence the minus signs. eulerZ """ hp, bpr, tpr, eulerZ, mgroove = self.part.vh_properties.loc[helix_id, ['helical_pitch', 'bases_per_repeat', 'turns_per_repeat', 'eulerZ', 'minor_groove_angle']] twist_per_base = tpr*360./bpr # angle = eulerZ - twist_per_base*indices + 0.5*mgroove + 180 angle = eulerZ + twist_per_base*indices - 0.5*mgroove return angle def _cadnano_part_to_segments(self,part): try: from cadnano.cnenum import PointType except: try: from cadnano.proxies.cnenum import PointType except: from cadnano.proxies.cnenum import PointEnum as PointType segments = dict() self.helices = helices = dict() self.helix_ranges = helix_ranges = dict() props = part.getModelProperties().copy() if props.get('point_type') == PointType.ARBITRARY: # TODO add code to encode Parts with ARBITRARY point configurations raise NotImplementedError("Not implemented") else: try: vh_props, origins = part.helixPropertiesAndOrigins() except: origins = {hid:part.getVirtualHelixOrigin(hid)[:2] for hid in part.getidNums()} self.origins = origins vh_list = [] strand_list = [] xover_list = [] self.xovers_from = dict() self.xovers_to = dict() try: numHID = part.getMaxIdNum() + 1 except: numHID = part.getIdNumMax() + 1 for id_num in range(numHID): try: offset_and_size = part.getOffsetAndSize(id_num) except: offset_and_size = None if offset_and_size is None: ## Add a placeholder for empty helix vh_list.append((id_num, 0)) strand_list.append(None) else: offset, size = offset_and_size vh_list.append((id_num, size)) fwd_ss, rev_ss = part.getStrandSets(id_num) fwd_idxs, fwd_colors = fwd_ss.dump(xover_list) rev_idxs, rev_colors = rev_ss.dump(xover_list) strand_list.append((fwd_idxs, rev_idxs)) self.xovers_from[id_num] = [] self.xovers_to[id_num] = [] for xo in xover_list: h1,f1,z1,h2,f2,z2 = xo self.xovers_from[h1].append(xo) self.xovers_to[h2].append(xo) ## Get lists of 5/3prime ends strands5 = [o.strand5p() for o in part.oligos()] strands3 = [o.strand3p() for o in part.oligos()] self._5prime_list = [(s.idNum(),s.isForward(),s.idx5Prime()) for s in strands5] self._3prime_list = [(s.idNum(),s.isForward(),s.idx3Prime()) for s in strands3] ## Get dictionary of insertions self.insertions = allInsertions = part.insertions() self.strand_occupancies = dict() ## Build helices for hid in range(numHID): # print("Working on helix",hid) helices[hid] = [] helix_ranges[hid] = [] self.strand_occupancies[hid] = [] helixStrands = strand_list[hid] if helixStrands is None: continue ## Build list of tuples containing (idx,length) of insertions/skips insertions = sorted( [(i[0],i[1].length()) for i in allInsertions[hid].items()], key=lambda x: x[0] ) ## TODO: make the following code (until "regions = ...") more readable ## Build list of strand ends and list of mandatory node locations ends1,ends2 = self._helixStrandsToEnds(helixStrands) ## Find crossovers for this helix reqNodeZids = sorted(list(set( ends1 + ends2 ) ) ) ## Build lists of which nt sites are occupied in the helix strandOccupancies = [ [x for i in range(0,len(e),2) for x in range(e[i],e[i+1]+1)] for e in (ends1,ends2) ] self.strand_occupancies[hid] = strandOccupancies ends1,ends2 = [ [(e[i],e[i+1]) for i in range(0,len(e),2)] for e in (ends1,ends2) ] regions = combineRegionLists(ends1,ends2) ## Split regions in event of ssDNA crossover split_regions = [] for zid1,zid2 in regions: zMid = int(0.5*(zid1+zid2)) if zMid in strandOccupancies[0] and zMid in strandOccupancies[1]: split_regions.append( (zid1,zid2) ) else: is_fwd = zMid in strandOccupancies[0] ends = [z for h,f,z in self._get_crossover_locations( hid, range(zid1+1,zid2), is_fwd )] z1 = zid1 for z in sorted(ends): z2 = z if z2 > z1: split_regions.append( (z1,z2) ) z1 = z2+1 z2 = zid2 split_regions.append( (z1,z2) ) # if hid == 43: # import pdb for zid1,zid2 in split_regions: zMid = int(0.5*(zid1+zid2)) assert( zMid in strandOccupancies[0] or zMid in strandOccupancies[1] ) bp_to_zidx = [] insertion_dict = {idx:length for idx,length in insertions} for i in range(zid1,zid2+1): if i in insertion_dict: l = insertion_dict[i] else: l = 0 for j in range(i,i+1+l): bp_to_zidx.append(i) numBps = len(bp_to_zidx) # print("Adding helix with length",numBps,zid1,zid2) name = "%d-%d" % (hid,len(helices[hid])) # "H%03d" % hid kwargs = dict(name=name, segname=name, occupancy=hid) posargs1 = dict( start_position = self._get_cadnano_position(hid,zid1-0.25), end_position = self._get_cadnano_position(hid,zid2+0.25) ) posargs2 = dict( start_position = posargs1['end_position'], end_position = posargs1['start_position']) ## TODO get sequence from cadnano api if zMid in strandOccupancies[0] and zMid in strandOccupancies[1]: kwargs['num_bp'] = numBps _angle = self._get_helix_angle(hid, zid1) start_orientation = rotationAboutAxis(np.array((0,0,1)), _angle) seg = DoubleStrandedSegment(**kwargs,**posargs1, start_orientation = start_orientation) elif zMid in strandOccupancies[0]: kwargs['num_nt'] = numBps seg = SingleStrandedSegment(**kwargs,**posargs1) elif zMid in strandOccupancies[1]: kwargs['num_nt'] = numBps seg = SingleStrandedSegment(**kwargs,**posargs2) else: raise Exception("Segment could not be found") seg._cadnano_helix = hid seg._cadnano_start = zid1 seg._cadnano_end = zid2 seg._cadnano_bp_to_zidx = bp_to_zidx def callback(segment): for b in segment.beads: bp = int(round(b.get_nt_position(segment))) if bp < 0: bp = 0 if bp >= segment.num_nt: bp = segment.num_nt-1 try: b.beta = segment._cadnano_bp_to_zidx[bp] if 'orientation_bead' in b.__dict__: b.orientation_bead.beta = segment._cadnano_bp_to_zidx[bp] except: pass seg._generate_bead_callbacks.append(callback) def atomic_callback(nucleotide, bp_to_zidx=bp_to_zidx): nt = nucleotide segment = nucleotide.parent.segment bp = int(round(segment.contour_to_nt_pos( nt.contour_position ))) if bp < 0: bp = 0 if bp >= segment.num_nt: bp = segment.num_nt-1 try: nt.beta = bp_to_zidx[bp] nt.parent.occupancy = segment.occupancy except: pass seg._generate_nucleotide_callbacks.append(atomic_callback) helices[hid].append( seg ) helix_ranges[hid].append( (zid1,zid2) ) def _get_cadnano_position(self, hid, zid): return [10*a for a in self.origins[hid]] + [-3.4*zid] def _helixStrandsToEnds(self, helixStrands): """Utility method to convert cadnano strand lists into list of indices of terminal points""" endLists = [[],[]] for endList, strandList in zip(endLists,helixStrands): lastStrand = None for s in strandList: if lastStrand is None: ## first strand endList.append(s[0]) elif lastStrand[1] != s[0]-1: assert( s[0] > lastStrand[1] ) endList.extend( [lastStrand[1], s[0]] ) lastStrand = s if lastStrand is not None: endList.append(lastStrand[1]) return endLists def _helix_strands_to_segment_ranges(self, helix_strands): """Utility method to convert cadnano strand lists into list of indices of terminal points""" def _join(strands): ends = [] lastEnd = None for start,end in strands: if lastEnd is None: ends.append([start]) elif lastEnd != start-1: ends[-1].append(lastEnd) ends.append([start]) lastEnd = end if lastEnd is not None: ends[-1].append(lastEnd) return ends s1,s2 = [_join(s) for s in helix_strands] i = j = 0 ## iterate through strands while i < len(s1) and j < len(s2): min(s1[i][0],s2[j][0]) def _get_segment(self, hid, zid): ## TODO: rename these variables to segments segs = self.helices[hid] ranges = self.helix_ranges[hid] for i in range(len(ranges)): zmin,zmax = ranges[i] if zmin <= zid and zid <= zmax: return segs[i] raise Exception("Could not find segment in helix %d at position %d" % (hid,zid)) def _get_nucleotide(self, hid, zid): raise Exception("Deprecated") seg = self._get_segment(hid,zid) sid = self.helices[hid].index(seg) zmin,zmax = self.helix_ranges[hid][sid] nt = zid-zmin ## Find insertions # TODO: for i in range(zmin,zid+1): ? for i in range(zmin,zid): if i in self.insertions[hid]: nt += self.insertions[hid][i].length() return nt def _get_segment_nucleotide(self, hid, zid, get_forward_location=False): """ returns segments and zero-based nucleotide index """ seg = self._get_segment(hid,zid) sid = self.helices[hid].index(seg) zmin,zmax = self.helix_ranges[hid][sid] zMid = int(0.5*(zmin+zmax)) occ = self.strand_occupancies[hid] ins = self.insertions[hid] ## TODO combine if/else when nested TODO is resolved # if zid in self.insertions[hid]: # import pdb # pdb.set_trace() if (zMid not in occ[0]) and (zMid in occ[1]): ## reversed ssDNA strand nt = zmax-zid # TODO: for i in range(zmin,zid+1): ? for i in range(zid,zmax+1): if i in self.insertions[hid]: nt += self.insertions[hid][i].length() else: ## normal condition if get_forward_location: while zid in ins and ins[zid].length() < 0 and zid <= zmax: zid+=1 # else: # while zid in ins and ins[zid].length() > 0 and zid >= zmax: # zid-=1 nt = zid-zmin for i in range(zmin,zid): if i in ins: nt += ins[i].length() if not get_forward_location and zid in ins: nt += ins[zid].length() ## Find insertions return seg, nt """ Routines to add connnections between helices """ def _add_intrahelical_connections(self): for hid,segs in self.helices.items(): occ = self.strand_occupancies[hid] for i in range(len(segs)-1): seg1,seg2 = [segs[j] for j in (i,i+1)] if isinstance(seg1,SingleStrandedSegment) and isinstance(seg2,SingleStrandedSegment): continue r1,r2 = [self.helix_ranges[hid][j] for j in (i,i+1)] if r1[1]+1 == r2[0]: ## TODO: handle nicks that are at intrahelical connections(?) zmid1 = int(0.5*(r1[0]+r1[1])) zmid2 = int(0.5*(r2[0]+r2[1])) ## TODO: validate if zmid1 in occ[0] and zmid2 in occ[0]: seg1.connect_end3(seg2.start5) if zmid1 in occ[1] and zmid2 in occ[1]: if zmid1 in occ[0]: end = seg1.end5 else: end = seg1.start5 if zmid2 in occ[0]: seg2.connect_start3(end) else: seg2.connect_end3(end) def _get_crossover_locations(self, helix_idx, nt_idx_range, fwd_strand=None): xos = [] def append_if_in_range(h,f,z): if fwd_strand in (None,f) and z in nt_idx_range: xos.append((h,f,z)) for xo in self.xovers_from[helix_idx]: ## h1,f1,z1,h2,f2,z2 = xo[3:] append_if_in_range(*xo[:3]) for xo in self.xovers_to[helix_idx]: append_if_in_range(*xo[3:]) return xos def _add_crossovers(self): for hid,xos in self.xovers_from.items(): for h1,f1,z1,h2,f2,z2 in xos: seg1, nt1 = self._get_segment_nucleotide(h1,z1,not f1) seg2, nt2 = self._get_segment_nucleotide(h2,z2,f2) ## TODO: use different types of crossovers ## fwd? ## 5'-to-3' direction if isinstance(seg1, SingleStrandedSegment): f1 = True if isinstance(seg2, SingleStrandedSegment): f2 = True seg1.add_crossover(nt1,seg2,nt2,[f1,f2]) def _add_prime_ends(self): for h,fwd,z in self._5prime_list: seg, nt = self._get_segment_nucleotide(h,z, fwd) if isinstance(seg, SingleStrandedSegment): fwd = True # print("adding 5prime",seg.name,nt,fwd) seg.add_5prime(nt,fwd) for h,fwd,z in self._3prime_list: seg, nt = self._get_segment_nucleotide(h,z, not fwd) if isinstance(seg, SingleStrandedSegment): fwd = True # print("adding 3prime",seg.name,nt,fwd) seg.add_3prime(nt,fwd)
[docs] def get_bead(self, hid, zid): # get segment, get nucleotide, seg, nt = self._get_segment_nucleotide(h,z) # return seg.get_nearest_bead(seg,nt / seg.num_nt) return seg.get_nearest_bead(seg,nt / (seg.num_nt-1))
[docs] def read_json_file(filename): import json import re try: with open(filename) as ch: data = json.load(ch) except: with open(filename) as ch: content = "" for l in ch: l = re.sub(r"'", r'"', l) # https://stackoverflow.com/questions/4033633/handling-lazy-json-in-python-expecting-property-name # l = re.sub(r"{\s*(\w)", r'{"\1', l) # l = re.sub(r",\s*(\w)", r',"\1', l) # l = re.sub(r"(\w):", r'\1":', l) content += l+"\n" data = json.loads(content) return data
[docs] def decode_cadnano_part(json_data): import cadnano from cadnano.document import Document try: doc = Document() cadnano.fileio.v3decode.decode(doc, json_data) decoder = 3 except: doc = Document() cadnano.fileio.v2decode.decode(doc, json_data) decoder = 2 parts = [p for p in doc.getParts()] if len(parts) != 1: raise Exception("Only documents containing a single cadnano part are implemented at this time.") part = parts[0] if decoder == 2: """ It seems cadnano2.5 (as of ce6ff019) does not set the EulerZ for square lattice structures correctly, doing so here """ l = _get_lattice(part) if l == 'square': for id_num in part.getIdNums(): if part.vh_properties.loc[id_num,'eulerZ'] == 0: part.vh_properties.loc[id_num,'eulerZ'] = 360*(6/10.5) return part
def _get_lattice(part): lattice_type = None _gt = part.getGridType() try: lattice_type = _gt.name.lower() except: if _gt == 1: lattice_type = 'square' elif _gt == 2: lattice_type = 'honeycomb' else: print("WARNING: unable to determine cadnano part lattice type") return lattice_type
[docs] def package_archive( name, directory ): ...
[docs] def read_model(json_data, sequence=None, fill_sequence='T', **kwargs): """ Read in data """ part = decode_cadnano_part(json_data) model = cadnano_part(part, **kwargs) # TODO # try: # model.set_cadnano_sequence() # finally: # ... # if sequence is not None and len() : # model.strands[0].set_sequence(seq) if sequence is None or len(sequence) == 0: ## default m13mp18 model.set_sequence(m13seq,force=False, fill_sequence=fill_sequence) else: model.set_sequence(sequence, fill_sequence=fill_sequence) return model
# pynvml.nvmlInit() # gpus = range(pynvml.nvmlDeviceGetCount()) # pynvml.nvmlShutdown() # gpus = [0,1,2] # print(gpus)
[docs] def combineRegionLists(loHi1,loHi2,intersect=False): """Combines two lists of (lo,hi) pairs specifying integer regions a single list of regions. """ ## Validate input for l in (loHi1,loHi2): ## Assert each region in lists is sorted for pair in l: assert(len(pair) == 2) assert(pair[0] <= pair[1]) if len(loHi1) == 0: if intersect: return [] else: return loHi2 if len(loHi2) == 0: if intersect: return [] else: return loHi1 ## Break input into lists of compact regions compactRegions1,compactRegions2 = [[],[]] for compactRegions,loHi in zip( [compactRegions1,compactRegions2], [loHi1,loHi2]): tmp = [] lastHi = loHi[0][0]-1 for lo,hi in loHi: if lo-1 != lastHi: compactRegions.append(tmp) tmp = [] tmp.append((lo,hi)) lastHi = hi if len(tmp) > 0: compactRegions.append(tmp) ## Build result result = [] region = [] i,j = [0,0] compactRegions1.append([[1e10]]) compactRegions2.append([[1e10]]) while i < len(compactRegions1)-1 or j < len(compactRegions2)-1: cr1 = compactRegions1[i] cr2 = compactRegions2[j] ## initialize region if len(region) == 0: if cr1[0][0] <= cr2[0][0]: region = cr1 i += 1 continue else: region = cr2 j += 1 continue if region[-1][-1] >= cr1[0][0]: region = combineCompactRegionLists(region, cr1, intersect=False) i+=1 elif region[-1][-1] >= cr2[0][0]: region = combineCompactRegionLists(region, cr2, intersect=False) j+=1 else: result.extend(region) region = [] assert( len(region) > 0 ) result.extend(region) result = sorted(result) # print("loHi1:",loHi1) # print("loHi2:",loHi2) # print(result,"\n") if intersect: lo = max( [loHi1[0][0], loHi2[0][0]] ) hi = min( [loHi1[-1][1], loHi2[-1][1]] ) result = [r for r in result if r[0] >= lo and r[1] <= hi] return result
[docs] def combineCompactRegionLists(loHi1,loHi2,intersect=False): """Combines two lists of (lo,hi) pairs specifying regions within a compact integer set into a single list of regions. examples: loHi1 = [[0,4],[5,7]] loHi2 = [[2,4],[5,9]] out = [(0, 1), (2, 4), (5, 7), (8, 9)] loHi1 = [[0,3],[5,7]] loHi2 = [[2,4],[5,9]] out = [(0, 1), (2, 3), (4, 4), (5, 7), (8, 9)] """ ## Validate input for l in (loHi1,loHi2): ## Assert each region in lists is sorted for pair in l: assert(len(pair) == 2) assert(pair[0] <= pair[1]) ## Assert lists are compact for pair1,pair2 in zip(l[::2],l[1::2]): assert(pair1[1]+1 == pair2[0]) if len(loHi1) == 0: if intersect: return [] else: return loHi2 if len(loHi2) == 0: if intersect: return [] else: return loHi1 ## Find the ends of the region lo = min( [loHi1[0][0], loHi2[0][0]] ) hi = max( [loHi1[-1][1], loHi2[-1][1]] ) ## Make a list of indices where each region will be split splitAfter = [] for l,h in loHi2: if l != lo: splitAfter.append(l-1) if h != hi: splitAfter.append(h) for l,h in loHi1: if l != lo: splitAfter.append(l-1) if h != hi: splitAfter.append(h) splitAfter = sorted(list(set(splitAfter))) # print("splitAfter:",splitAfter) split=[] last = -2 for s in splitAfter: split.append(s) last = s # print("split:",split) returnList = [(i+1,j) if i != j else (i,j) for i,j in zip([lo-1]+split,split+[hi])] if intersect: lo = max( [loHi1[0][0], loHi2[0][0]] ) hi = min( [loHi1[-1][1], loHi2[-1][1]] ) returnList = [r for r in returnList if r[0] >= lo and r[1] <= hi] # print("loHi1:",loHi1) # print("loHi2:",loHi2) # print(returnList,"\n") return returnList
if __name__ == '__main__': loHi1 = [[0,4],[5,7]] loHi2 = [[2,4],[5,9]] out = [(0, 1), (2, 4), (5, 7), (8, 9)] print(loHi1) print(loHi2) print(combineRegionLists(loHi1,loHi2)) print(combineCompactRegionLists(loHi1,loHi2)) loHi1 = [[0,3],[5,7]] loHi2 = [[2,4],[5,9]] out = [(0, 1), (2, 3), (4, 4), (5, 7), (8, 9)] print(loHi1) print(loHi2) print(combineRegionLists(loHi1,loHi2)) print(combineCompactRegionLists(loHi1,loHi2)) combineRegionLists # for f in glob('json/*'): # print("Working on {}".format(f)) # out = re.match('json/(.*).json',f).group(1) # data = read_json_file(f) # run_simulation_protocol( out, "job-id", data, gpu=0 )