# -*- coding: utf-8 -*-
import pdb
import numpy as np
import os,sys
from glob import glob
import re
from ..arbdmodel.coords import readArbdCoords, readAvgArbdCoords, rotationAboutAxis
from ..segmentmodel import SegmentModel, SingleStrandedSegment, DoubleStrandedSegment
from ..model.dna_sequence import m13 as m13seq
## Only testing on cadnano2.5
## TODO: separate SegmentModel from ArbdModel so multiple parts can be combined
## TODO: catch circular strands in "get_5prime" cadnano calls
## TODO: handle special motifs
## - doubly-nicked helices
## - helices that should be stacked across an empty region (crossovers from and end in the helix to another end in the helix)
## - circular constructs
[docs]
class cadnano_part(SegmentModel):
[docs]
def __init__(self, part,
**kwargs
):
self.part = part
self.lattice_type = _get_lattice(part)
self._cadnano_part_to_segments(part)
# SegmentModel.__init__(self,...)
# self.segments = [seg for hid,segs in self.helices.items() for seg in segs]
self.segments = [seg for hid,segs in sorted(self.helices.items()) for seg in segs]
self._add_intrahelical_connections()
self._add_crossovers()
self._add_prime_ends()
SegmentModel.__init__(self, self.segments,
**kwargs)
def _get_helix_angle(self, helix_id, indices):
""" Get "start_orientation" for helix """
# import ipdb
# ipdb.set_trace()
""" FROM CADNANO2.5
+ angle is CCW
- angle is CW
Right handed DNA rotates clockwise from 5' to 3'
we use the convention the 5' end starts at 0 degrees
and it's pair is minor_groove_angle degrees away
direction, hence the minus signs. eulerZ
"""
hp, bpr, tpr, eulerZ, mgroove = self.part.vh_properties.loc[helix_id,
['helical_pitch',
'bases_per_repeat',
'turns_per_repeat',
'eulerZ',
'minor_groove_angle']]
twist_per_base = tpr*360./bpr
# angle = eulerZ - twist_per_base*indices + 0.5*mgroove + 180
angle = eulerZ + twist_per_base*indices - 0.5*mgroove
return angle
def _cadnano_part_to_segments(self,part):
try:
from cadnano.cnenum import PointType
except:
try:
from cadnano.proxies.cnenum import PointType
except:
from cadnano.proxies.cnenum import PointEnum as PointType
segments = dict()
self.helices = helices = dict()
self.helix_ranges = helix_ranges = dict()
props = part.getModelProperties().copy()
if props.get('point_type') == PointType.ARBITRARY:
# TODO add code to encode Parts with ARBITRARY point configurations
raise NotImplementedError("Not implemented")
else:
try:
vh_props, origins = part.helixPropertiesAndOrigins()
except:
origins = {hid:part.getVirtualHelixOrigin(hid)[:2] for hid in part.getidNums()}
self.origins = origins
vh_list = []
strand_list = []
xover_list = []
self.xovers_from = dict()
self.xovers_to = dict()
try:
numHID = part.getMaxIdNum() + 1
except:
numHID = part.getIdNumMax() + 1
for id_num in range(numHID):
try:
offset_and_size = part.getOffsetAndSize(id_num)
except:
offset_and_size = None
if offset_and_size is None:
## Add a placeholder for empty helix
vh_list.append((id_num, 0))
strand_list.append(None)
else:
offset, size = offset_and_size
vh_list.append((id_num, size))
fwd_ss, rev_ss = part.getStrandSets(id_num)
fwd_idxs, fwd_colors = fwd_ss.dump(xover_list)
rev_idxs, rev_colors = rev_ss.dump(xover_list)
strand_list.append((fwd_idxs, rev_idxs))
self.xovers_from[id_num] = []
self.xovers_to[id_num] = []
for xo in xover_list:
h1,f1,z1,h2,f2,z2 = xo
self.xovers_from[h1].append(xo)
self.xovers_to[h2].append(xo)
## Get lists of 5/3prime ends
strands5 = [o.strand5p() for o in part.oligos()]
strands3 = [o.strand3p() for o in part.oligos()]
self._5prime_list = [(s.idNum(),s.isForward(),s.idx5Prime()) for s in strands5]
self._3prime_list = [(s.idNum(),s.isForward(),s.idx3Prime()) for s in strands3]
## Get dictionary of insertions
self.insertions = allInsertions = part.insertions()
self.strand_occupancies = dict()
## Build helices
for hid in range(numHID):
# print("Working on helix",hid)
helices[hid] = []
helix_ranges[hid] = []
self.strand_occupancies[hid] = []
helixStrands = strand_list[hid]
if helixStrands is None: continue
## Build list of tuples containing (idx,length) of insertions/skips
insertions = sorted( [(i[0],i[1].length()) for i in allInsertions[hid].items()],
key=lambda x: x[0] )
## TODO: make the following code (until "regions = ...") more readable
## Build list of strand ends and list of mandatory node locations
ends1,ends2 = self._helixStrandsToEnds(helixStrands)
## Find crossovers for this helix
reqNodeZids = sorted(list(set( ends1 + ends2 ) ) )
## Build lists of which nt sites are occupied in the helix
strandOccupancies = [ [x for i in range(0,len(e),2)
for x in range(e[i],e[i+1]+1)]
for e in (ends1,ends2) ]
self.strand_occupancies[hid] = strandOccupancies
ends1,ends2 = [ [(e[i],e[i+1]) for i in range(0,len(e),2)] for e in (ends1,ends2) ]
regions = combineRegionLists(ends1,ends2)
## Split regions in event of ssDNA crossover
split_regions = []
for zid1,zid2 in regions:
zMid = int(0.5*(zid1+zid2))
if zMid in strandOccupancies[0] and zMid in strandOccupancies[1]:
split_regions.append( (zid1,zid2) )
else:
is_fwd = zMid in strandOccupancies[0]
ends = [z for h,f,z in self._get_crossover_locations( hid, range(zid1+1,zid2), is_fwd )]
z1 = zid1
for z in sorted(ends):
z2 = z
if z2 > z1:
split_regions.append( (z1,z2) )
z1 = z2+1
z2 = zid2
split_regions.append( (z1,z2) )
# if hid == 43:
# import pdb
for zid1,zid2 in split_regions:
zMid = int(0.5*(zid1+zid2))
assert( zMid in strandOccupancies[0] or zMid in strandOccupancies[1] )
bp_to_zidx = []
insertion_dict = {idx:length for idx,length in insertions}
for i in range(zid1,zid2+1):
if i in insertion_dict:
l = insertion_dict[i]
else:
l = 0
for j in range(i,i+1+l):
bp_to_zidx.append(i)
numBps = len(bp_to_zidx)
# print("Adding helix with length",numBps,zid1,zid2)
name = "%d-%d" % (hid,len(helices[hid]))
# "H%03d" % hid
kwargs = dict(name=name, segname=name, occupancy=hid)
posargs1 = dict( start_position = self._get_cadnano_position(hid,zid1-0.25),
end_position = self._get_cadnano_position(hid,zid2+0.25) )
posargs2 = dict( start_position = posargs1['end_position'],
end_position = posargs1['start_position'])
## TODO get sequence from cadnano api
if zMid in strandOccupancies[0] and zMid in strandOccupancies[1]:
kwargs['num_bp'] = numBps
_angle = self._get_helix_angle(hid, zid1)
start_orientation = rotationAboutAxis(np.array((0,0,1)), _angle)
seg = DoubleStrandedSegment(**kwargs,**posargs1, start_orientation = start_orientation)
elif zMid in strandOccupancies[0]:
kwargs['num_nt'] = numBps
seg = SingleStrandedSegment(**kwargs,**posargs1)
elif zMid in strandOccupancies[1]:
kwargs['num_nt'] = numBps
seg = SingleStrandedSegment(**kwargs,**posargs2)
else:
raise Exception("Segment could not be found")
seg._cadnano_helix = hid
seg._cadnano_start = zid1
seg._cadnano_end = zid2
seg._cadnano_bp_to_zidx = bp_to_zidx
def callback(segment):
for b in segment.beads:
bp = int(round(b.get_nt_position(segment)))
if bp < 0: bp = 0
if bp >= segment.num_nt: bp = segment.num_nt-1
try:
b.beta = segment._cadnano_bp_to_zidx[bp]
if 'orientation_bead' in b.__dict__:
b.orientation_bead.beta = segment._cadnano_bp_to_zidx[bp]
except:
pass
seg._generate_bead_callbacks.append(callback)
def atomic_callback(nucleotide, bp_to_zidx=bp_to_zidx):
nt = nucleotide
segment = nucleotide.parent.segment
bp = int(round(segment.contour_to_nt_pos( nt.contour_position )))
if bp < 0: bp = 0
if bp >= segment.num_nt: bp = segment.num_nt-1
try:
nt.beta = bp_to_zidx[bp]
nt.parent.occupancy = segment.occupancy
except:
pass
seg._generate_nucleotide_callbacks.append(atomic_callback)
helices[hid].append( seg )
helix_ranges[hid].append( (zid1,zid2) )
def _get_cadnano_position(self, hid, zid):
return [10*a for a in self.origins[hid]] + [-3.4*zid]
def _helixStrandsToEnds(self, helixStrands):
"""Utility method to convert cadnano strand lists into list of
indices of terminal points"""
endLists = [[],[]]
for endList, strandList in zip(endLists,helixStrands):
lastStrand = None
for s in strandList:
if lastStrand is None:
## first strand
endList.append(s[0])
elif lastStrand[1] != s[0]-1:
assert( s[0] > lastStrand[1] )
endList.extend( [lastStrand[1], s[0]] )
lastStrand = s
if lastStrand is not None:
endList.append(lastStrand[1])
return endLists
def _helix_strands_to_segment_ranges(self, helix_strands):
"""Utility method to convert cadnano strand lists into list of
indices of terminal points"""
def _join(strands):
ends = []
lastEnd = None
for start,end in strands:
if lastEnd is None:
ends.append([start])
elif lastEnd != start-1:
ends[-1].append(lastEnd)
ends.append([start])
lastEnd = end
if lastEnd is not None:
ends[-1].append(lastEnd)
return ends
s1,s2 = [_join(s) for s in helix_strands]
i = j = 0
## iterate through strands
while i < len(s1) and j < len(s2):
min(s1[i][0],s2[j][0])
def _get_segment(self, hid, zid):
## TODO: rename these variables to segments
segs = self.helices[hid]
ranges = self.helix_ranges[hid]
for i in range(len(ranges)):
zmin,zmax = ranges[i]
if zmin <= zid and zid <= zmax:
return segs[i]
raise Exception("Could not find segment in helix %d at position %d" % (hid,zid))
def _get_nucleotide(self, hid, zid):
raise Exception("Deprecated")
seg = self._get_segment(hid,zid)
sid = self.helices[hid].index(seg)
zmin,zmax = self.helix_ranges[hid][sid]
nt = zid-zmin
## Find insertions
# TODO: for i in range(zmin,zid+1): ?
for i in range(zmin,zid):
if i in self.insertions[hid]:
nt += self.insertions[hid][i].length()
return nt
def _get_segment_nucleotide(self, hid, zid, get_forward_location=False):
""" returns segments and zero-based nucleotide index """
seg = self._get_segment(hid,zid)
sid = self.helices[hid].index(seg)
zmin,zmax = self.helix_ranges[hid][sid]
zMid = int(0.5*(zmin+zmax))
occ = self.strand_occupancies[hid]
ins = self.insertions[hid]
## TODO combine if/else when nested TODO is resolved
# if zid in self.insertions[hid]:
# import pdb
# pdb.set_trace()
if (zMid not in occ[0]) and (zMid in occ[1]):
## reversed ssDNA strand
nt = zmax-zid
# TODO: for i in range(zmin,zid+1): ?
for i in range(zid,zmax+1):
if i in self.insertions[hid]:
nt += self.insertions[hid][i].length()
else:
## normal condition
if get_forward_location:
while zid in ins and ins[zid].length() < 0 and zid <= zmax:
zid+=1
# else:
# while zid in ins and ins[zid].length() > 0 and zid >= zmax:
# zid-=1
nt = zid-zmin
for i in range(zmin,zid):
if i in ins:
nt += ins[i].length()
if not get_forward_location and zid in ins:
nt += ins[zid].length()
## Find insertions
return seg, nt
""" Routines to add connnections between helices """
def _add_intrahelical_connections(self):
for hid,segs in self.helices.items():
occ = self.strand_occupancies[hid]
for i in range(len(segs)-1):
seg1,seg2 = [segs[j] for j in (i,i+1)]
if isinstance(seg1,SingleStrandedSegment) and isinstance(seg2,SingleStrandedSegment):
continue
r1,r2 = [self.helix_ranges[hid][j] for j in (i,i+1)]
if r1[1]+1 == r2[0]:
## TODO: handle nicks that are at intrahelical connections(?)
zmid1 = int(0.5*(r1[0]+r1[1]))
zmid2 = int(0.5*(r2[0]+r2[1]))
## TODO: validate
if zmid1 in occ[0] and zmid2 in occ[0]:
seg1.connect_end3(seg2.start5)
if zmid1 in occ[1] and zmid2 in occ[1]:
if zmid1 in occ[0]:
end = seg1.end5
else:
end = seg1.start5
if zmid2 in occ[0]:
seg2.connect_start3(end)
else:
seg2.connect_end3(end)
def _get_crossover_locations(self, helix_idx, nt_idx_range, fwd_strand=None):
xos = []
def append_if_in_range(h,f,z):
if fwd_strand in (None,f) and z in nt_idx_range:
xos.append((h,f,z))
for xo in self.xovers_from[helix_idx]:
## h1,f1,z1,h2,f2,z2 = xo[3:]
append_if_in_range(*xo[:3])
for xo in self.xovers_to[helix_idx]:
append_if_in_range(*xo[3:])
return xos
def _add_crossovers(self):
for hid,xos in self.xovers_from.items():
for h1,f1,z1,h2,f2,z2 in xos:
seg1, nt1 = self._get_segment_nucleotide(h1,z1,not f1)
seg2, nt2 = self._get_segment_nucleotide(h2,z2,f2)
## TODO: use different types of crossovers
## fwd?
## 5'-to-3' direction
if isinstance(seg1, SingleStrandedSegment): f1 = True
if isinstance(seg2, SingleStrandedSegment): f2 = True
seg1.add_crossover(nt1,seg2,nt2,[f1,f2])
def _add_prime_ends(self):
for h,fwd,z in self._5prime_list:
seg, nt = self._get_segment_nucleotide(h,z, fwd)
if isinstance(seg, SingleStrandedSegment): fwd = True
# print("adding 5prime",seg.name,nt,fwd)
seg.add_5prime(nt,fwd)
for h,fwd,z in self._3prime_list:
seg, nt = self._get_segment_nucleotide(h,z, not fwd)
if isinstance(seg, SingleStrandedSegment): fwd = True
# print("adding 3prime",seg.name,nt,fwd)
seg.add_3prime(nt,fwd)
[docs]
def get_bead(self, hid, zid):
# get segment, get nucleotide,
seg, nt = self._get_segment_nucleotide(h,z)
# return seg.get_nearest_bead(seg,nt / seg.num_nt)
return seg.get_nearest_bead(seg,nt / (seg.num_nt-1))
[docs]
def read_json_file(filename):
import json
import re
try:
with open(filename) as ch:
data = json.load(ch)
except:
with open(filename) as ch:
content = ""
for l in ch:
l = re.sub(r"'", r'"', l)
# https://stackoverflow.com/questions/4033633/handling-lazy-json-in-python-expecting-property-name
# l = re.sub(r"{\s*(\w)", r'{"\1', l)
# l = re.sub(r",\s*(\w)", r',"\1', l)
# l = re.sub(r"(\w):", r'\1":', l)
content += l+"\n"
data = json.loads(content)
return data
[docs]
def decode_cadnano_part(json_data):
import cadnano
from cadnano.document import Document
try:
doc = Document()
cadnano.fileio.v3decode.decode(doc, json_data)
decoder = 3
except:
doc = Document()
cadnano.fileio.v2decode.decode(doc, json_data)
decoder = 2
parts = [p for p in doc.getParts()]
if len(parts) != 1:
raise Exception("Only documents containing a single cadnano part are implemented at this time.")
part = parts[0]
if decoder == 2:
""" It seems cadnano2.5 (as of ce6ff019) does not set the EulerZ for square lattice structures correctly, doing so here """
l = _get_lattice(part)
if l == 'square':
for id_num in part.getIdNums():
if part.vh_properties.loc[id_num,'eulerZ'] == 0:
part.vh_properties.loc[id_num,'eulerZ'] = 360*(6/10.5)
return part
def _get_lattice(part):
lattice_type = None
_gt = part.getGridType()
try:
lattice_type = _gt.name.lower()
except:
if _gt == 1:
lattice_type = 'square'
elif _gt == 2:
lattice_type = 'honeycomb'
else:
print("WARNING: unable to determine cadnano part lattice type")
return lattice_type
[docs]
def package_archive( name, directory ):
...
[docs]
def read_model(json_data, sequence=None, fill_sequence='T', **kwargs):
""" Read in data """
part = decode_cadnano_part(json_data)
model = cadnano_part(part,
**kwargs)
# TODO
# try:
# model.set_cadnano_sequence()
# finally:
# ...
# if sequence is not None and len() :
# model.strands[0].set_sequence(seq)
if sequence is None or len(sequence) == 0:
## default m13mp18
model.set_sequence(m13seq,force=False, fill_sequence=fill_sequence)
else:
model.set_sequence(sequence, fill_sequence=fill_sequence)
return model
# pynvml.nvmlInit()
# gpus = range(pynvml.nvmlDeviceGetCount())
# pynvml.nvmlShutdown()
# gpus = [0,1,2]
# print(gpus)
[docs]
def combineRegionLists(loHi1,loHi2,intersect=False):
"""Combines two lists of (lo,hi) pairs specifying integer
regions a single list of regions. """
## Validate input
for l in (loHi1,loHi2):
## Assert each region in lists is sorted
for pair in l:
assert(len(pair) == 2)
assert(pair[0] <= pair[1])
if len(loHi1) == 0:
if intersect:
return []
else:
return loHi2
if len(loHi2) == 0:
if intersect:
return []
else:
return loHi1
## Break input into lists of compact regions
compactRegions1,compactRegions2 = [[],[]]
for compactRegions,loHi in zip(
[compactRegions1,compactRegions2],
[loHi1,loHi2]):
tmp = []
lastHi = loHi[0][0]-1
for lo,hi in loHi:
if lo-1 != lastHi:
compactRegions.append(tmp)
tmp = []
tmp.append((lo,hi))
lastHi = hi
if len(tmp) > 0:
compactRegions.append(tmp)
## Build result
result = []
region = []
i,j = [0,0]
compactRegions1.append([[1e10]])
compactRegions2.append([[1e10]])
while i < len(compactRegions1)-1 or j < len(compactRegions2)-1:
cr1 = compactRegions1[i]
cr2 = compactRegions2[j]
## initialize region
if len(region) == 0:
if cr1[0][0] <= cr2[0][0]:
region = cr1
i += 1
continue
else:
region = cr2
j += 1
continue
if region[-1][-1] >= cr1[0][0]:
region = combineCompactRegionLists(region, cr1, intersect=False)
i+=1
elif region[-1][-1] >= cr2[0][0]:
region = combineCompactRegionLists(region, cr2, intersect=False)
j+=1
else:
result.extend(region)
region = []
assert( len(region) > 0 )
result.extend(region)
result = sorted(result)
# print("loHi1:",loHi1)
# print("loHi2:",loHi2)
# print(result,"\n")
if intersect:
lo = max( [loHi1[0][0], loHi2[0][0]] )
hi = min( [loHi1[-1][1], loHi2[-1][1]] )
result = [r for r in result if r[0] >= lo and r[1] <= hi]
return result
[docs]
def combineCompactRegionLists(loHi1,loHi2,intersect=False):
"""Combines two lists of (lo,hi) pairs specifying regions within a
compact integer set into a single list of regions.
examples:
loHi1 = [[0,4],[5,7]]
loHi2 = [[2,4],[5,9]]
out = [(0, 1), (2, 4), (5, 7), (8, 9)]
loHi1 = [[0,3],[5,7]]
loHi2 = [[2,4],[5,9]]
out = [(0, 1), (2, 3), (4, 4), (5, 7), (8, 9)]
"""
## Validate input
for l in (loHi1,loHi2):
## Assert each region in lists is sorted
for pair in l:
assert(len(pair) == 2)
assert(pair[0] <= pair[1])
## Assert lists are compact
for pair1,pair2 in zip(l[::2],l[1::2]):
assert(pair1[1]+1 == pair2[0])
if len(loHi1) == 0:
if intersect:
return []
else:
return loHi2
if len(loHi2) == 0:
if intersect:
return []
else:
return loHi1
## Find the ends of the region
lo = min( [loHi1[0][0], loHi2[0][0]] )
hi = max( [loHi1[-1][1], loHi2[-1][1]] )
## Make a list of indices where each region will be split
splitAfter = []
for l,h in loHi2:
if l != lo:
splitAfter.append(l-1)
if h != hi:
splitAfter.append(h)
for l,h in loHi1:
if l != lo:
splitAfter.append(l-1)
if h != hi:
splitAfter.append(h)
splitAfter = sorted(list(set(splitAfter)))
# print("splitAfter:",splitAfter)
split=[]
last = -2
for s in splitAfter:
split.append(s)
last = s
# print("split:",split)
returnList = [(i+1,j) if i != j else (i,j) for i,j in zip([lo-1]+split,split+[hi])]
if intersect:
lo = max( [loHi1[0][0], loHi2[0][0]] )
hi = min( [loHi1[-1][1], loHi2[-1][1]] )
returnList = [r for r in returnList if r[0] >= lo and r[1] <= hi]
# print("loHi1:",loHi1)
# print("loHi2:",loHi2)
# print(returnList,"\n")
return returnList
if __name__ == '__main__':
loHi1 = [[0,4],[5,7]]
loHi2 = [[2,4],[5,9]]
out = [(0, 1), (2, 4), (5, 7), (8, 9)]
print(loHi1)
print(loHi2)
print(combineRegionLists(loHi1,loHi2))
print(combineCompactRegionLists(loHi1,loHi2))
loHi1 = [[0,3],[5,7]]
loHi2 = [[2,4],[5,9]]
out = [(0, 1), (2, 3), (4, 4), (5, 7), (8, 9)]
print(loHi1)
print(loHi2)
print(combineRegionLists(loHi1,loHi2))
print(combineCompactRegionLists(loHi1,loHi2))
combineRegionLists
# for f in glob('json/*'):
# print("Working on {}".format(f))
# out = re.match('json/(.*).json',f).group(1)
# data = read_json_file(f)
# run_simulation_protocol( out, "job-id", data, gpu=0 )