-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathShapeMapper_MMS0_Mut_Filter.py
More file actions
157 lines (120 loc) · 5.47 KB
/
ShapeMapper_MMS0_Mut_Filter.py
File metadata and controls
157 lines (120 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
######################################################################
# ShapeMapper MMS 0 Parsed Mutation File Filter
# Version 1.0
# (c) 2024 David Mitchell III
######################################################################
import itertools
import os
import argparse
######################################################################
## ADVANCED FILTER 10 ##
def scanLines(inl7, inl8):
line_len = len(inl8)
winsize = 10
step = 1
## Special case for first 50 nt in a read ##
testlist7 = list(map(int, inl7)) # Read string
testlist8 = list(map(int, inl8)) # Mutation string
testin8 = testlist8[0:50]
testn = testin8.count(1)
if testn / winsize >= 0.2:
testlist7[0:50] = [0]*50
testlist8[0:50] = [0]*50
## Case for the remainder of the read ##
for w in range(50, line_len, step):
Bstate = 0
testin7 = testlist7[w:w+winsize] # Read string
testin8 = testlist8[w:w+winsize] # Mutation string
# Determine density of mutations in window #
testn = testin8.count(1)
testdens = testn / winsize
if testdens >= 0.3 and testdens < 0.4:
Bstate = 1
elif testdens >= 0.4:
Bstate = 2
# Determine number of consecutive mutations in window #
testnum = 0
if Bstate >= 1:
testnum = max(len(list(v)) for g,v in itertools.groupby(testin8, lambda x: x == 1) if g)
# Identifiy windows with excessive mutations #
# Excessive mutations defined here as >= 3 consecutive mutations (a '1' in the window vs '0' for unmodified)
# Identifiy position of right-most mutation within the consecutive string and keep as '1'
# Change all mutations to the left of this mutation to '0' up to the end of the consecutive string
if Bstate == 1 and testnum >= 3:
a = 0
b = a + testnum
while a <= winsize-testnum:
if 0 not in testin8[a:b]:
switchlen = testnum-1
# Change "1" to "0" in mut string (testlist8)
switchin8 = [0]*switchlen
testin8[a:b-1] = switchin8
# Change "1" to "0" in read string (testlist7)
switchin7 = [0]*switchlen
testin7[a:b-1] = switchin7
mvr = testnum
else:
mvr = 1
a+=mvr
b+=mvr
elif Bstate == 2:
testin7 = [0]*winsize
testin8 = [0]*winsize
testlist7[w:winsize+w] = testin7
testlist8[w:winsize+w] = testin8
# Create output #
outl7 = ''.join(map(str, testlist7))
outl8 = ''.join(map(str, testlist8))
return outl7, outl8
######################################################################
def filterFiles(datafile, inname, outDir):
try:
os.mkdir(outDir)
print('Creating directory for filtered .mut files at ' + outDir + '.')
except:
print('Directory ' + outDir + ' already exists. Adding filtered .mut files here.')
outFileName = outDir + '/MMS0Filter_' + inname
outfile = open(outFileName, 'w')
print('Input file name:', inname, '\nOutput file name:', outFileName)
with open(datafile) as inp:
line = inp.readlines()
numlines = len(line)
for b in range(numlines):
spl = line[b].split()
num = len(spl)
if num <= 9 and spl[4] != 'OFF_TARGET':
outfile.write(line[b])
if num > 9 and spl[4] != 'OFF_TARGET':
in_line7 = spl[7]
in_line8 = spl[8]
out_line7, out_line8 = scanLines(in_line7, in_line8)
for a in range(7):
outfile.write(spl[a] + '\t')
outfile.write(out_line7 + '\t' + out_line8 + '\t')
for c in range(9,num,1):
outfile.write(spl[c] + ' ')
outfile.write('\n')
if b % 50000 == 0:
print('Working on line...', b+1)
outfile.close()
print('\n/// Completed MMS 0 Parsed Mutation File Filtering ///')
######################################################################
if __name__=='__main__':
parser=argparse.ArgumentParser(description = "Program to filter ShapeMapper MMS 0 parsed mutation files prior to running PairMapper")
parser.add_argument('datafile', help="Parsed mutation file (*.mut) generated from ShapeMapper with data generated using minimum mutation separation 0 (--min-mutation-separation 0)")
parser.add_argument('--output_name', type=str, help="Output name for new filtered parsed mutation files created by this program, prepended by 'MMS0Filter_'. If unspecified, it uses the original input.mut name")
parser.add_argument('--output_dir', type=str, default='MMS0_Filter_Output', help="Output directory for new filtered parsed mutation files. If unspecified, the default directory is 'MMS0_Filter_Output'")
args=parser.parse_args()
n = args.datafile
i = n.split('/')
inm = i[-1]
# Check if the output name is specified #
if args.output_name:
inm = args.output_name
# Check if the specified output name has a .mut extension #
tmp = inm.split('.')
ext = tmp[-1]
if ext != 'mut':
inm = inm + '.mut'
## Perform the Filtration ##
filterFiles(args.datafile, inm, args.output_dir)