-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathseq_db_diff.py
More file actions
executable file
·116 lines (94 loc) · 5.74 KB
/
seq_db_diff.py
File metadata and controls
executable file
·116 lines (94 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3
from ArgditLib.Utils import create_supp_file_path, check_seq_type
from Bio import SeqIO
from Bio.Seq import Seq
import argparse
import os
def parse_seq_file(seq_db_path, diff_mode, is_check_rev_comp_seq):
db_seq_records = dict()
db_rev_seq_records = dict()
num_of_seqs = 0
with open(seq_db_path, 'r') as f:
for seq_record in SeqIO.parse(f, 'fasta'):
num_of_seqs += 1
rev_seq_record_key = None
if diff_mode == 'h':
seq_record_key = seq_record.description
elif diff_mode == 's':
seq_record_key = str(seq_record.seq)
if seq_record_key in db_rev_seq_records:
continue
if is_check_rev_comp_seq and check_seq_type(seq_record_key) == 'NT':
rev_seq_record_key = str(seq_record.seq.reverse_complement())
else:
seq_str = str(seq_record.seq)
seq_record_key = '{}###{}'.format(seq_record.description, seq_str)
if seq_record_key in db_rev_seq_records:
continue
if is_check_rev_comp_seq and check_seq_type(seq_str) == 'NT':
rev_seq_record_key = '{}###{}'.format(seq_record.description,
str(seq_record.seq.reverse_complement()))
db_seq_records[seq_record_key] = seq_record
if rev_seq_record_key is not None:
db_rev_seq_records[rev_seq_record_key] = seq_record
return db_seq_records, db_rev_seq_records, num_of_seqs
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('seq_db_path', help = 'nucleotide/protein database FASTA file path')
parser.add_argument('old_seq_db_path', help = 'FASTA file path for previous database release')
parser.add_argument('-m', '--mode', default = 'b', choices = ['h', 's', 'b'],
help = 'diff mode (h:header; s:sequence; b:both [default])')
parser.add_argument('-o', '--old', action = 'store_true', help = 'export also sequences found in old database only')
parser.add_argument('-f', '--forward', action = 'store_true',
help = 'do not check reverse complement nucleotide sequences')
args = parser.parse_args()
new_db_seq_records, new_db_rev_seq_records, num_of_new_db_seqs = parse_seq_file(args.seq_db_path, args.mode,
not args.forward)
old_db_seq_records, old_db_rev_seq_records, num_of_old_db_seqs = parse_seq_file(args.old_seq_db_path, args.mode,
not args.forward)
new_db_seq_keys = set(new_db_seq_records.keys())
old_db_seq_keys = set(old_db_seq_records.keys())
common_seq_keys = new_db_seq_keys & old_db_seq_keys
old_db_rev_seq_keys = set(old_db_rev_seq_records.keys())
common_seq_keys = ((new_db_seq_keys - common_seq_keys) & old_db_rev_seq_keys) | common_seq_keys
new_db_only_seq_records = list()
for new_db_only_seq_key in (new_db_seq_keys - common_seq_keys):
new_db_only_seq_records.append(new_db_seq_records[new_db_only_seq_key])
new_db_only_seq_file_path = create_supp_file_path(args.seq_db_path, '_only.fa')
with open(new_db_only_seq_file_path, 'w') as f:
SeqIO.write(new_db_only_seq_records, f, 'fasta')
common_seq_records = list()
for common_seq_key in common_seq_keys:
common_seq_records.append(new_db_seq_records[common_seq_key])
common_seq_file_path = create_supp_file_path(args.seq_db_path, '_shared.fa')
with open(common_seq_file_path, 'w') as f:
SeqIO.write(common_seq_records, f, 'fasta')
if args.old:
common_seq_keys = new_db_seq_keys & old_db_seq_keys
new_db_rev_seq_keys = set(new_db_rev_seq_records.keys())
common_seq_keys = ((old_db_seq_keys - common_seq_keys) & new_db_rev_seq_keys) | common_seq_keys
old_db_only_seq_records = list()
for old_db_only_seq_key in (old_db_seq_keys - common_seq_keys):
old_db_only_seq_records.append(old_db_seq_records[old_db_only_seq_key])
old_db_only_seq_file_path = create_supp_file_path(args.old_seq_db_path, '_only.fa')
with open(old_db_only_seq_file_path, 'w') as f:
SeqIO.write(old_db_only_seq_records, f, 'fasta')
if args.mode == 'h':
seq_db_summary_stmt = ' {} sequences read {} {} of them are unique in terms of header'
elif args.mode == 's':
seq_db_summary_stmt = ' {} sequences read {} {} of them are unique in terms of sequence'
else:
seq_db_summary_stmt = ' {} sequences read {} {} of them are unique in terms of header and/or sequence'
print('----- Input summary -----')
print('For {}:'.format(args.seq_db_path))
print(seq_db_summary_stmt.format(num_of_new_db_seqs, os.linesep, len(new_db_seq_records)))
print('For {}:'.format(args.old_seq_db_path))
print(seq_db_summary_stmt.format(num_of_old_db_seqs, os.linesep, len(old_db_seq_records)))
print('----- Diff results -----')
print('{} sequences are common and exported to {}'.format(len(common_seq_records), common_seq_file_path))
print('{} sequences are present in {} only and exported to {}'.format(len(new_db_only_seq_records),
args.seq_db_path, new_db_only_seq_file_path))
if args.old:
print('{} sequences are present in {} only and exported to {}'.format(len(old_db_only_seq_records),
args.old_seq_db_path,
old_db_only_seq_file_path))