forked from Open-Book-Genome-Project/sequencer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline.py
More file actions
195 lines (153 loc) · 6.16 KB
/
pipeline.py
File metadata and controls
195 lines (153 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import argparse
import concurrent.futures
import glob
import json
import os
import sys
import traceback
from bgp import MINIMAL_SEQUENCER, ia
parser = argparse.ArgumentParser(prog='[pipeline]',
description='Automate Open Book Genome Project sequencer')
parser.add_argument('-p',
'--processes',
action='store',
metavar='process-count',
type=int,
help='number of pipeline processes to run concurrently (default is 1)')
parser.add_argument('Path',
metavar='source-path',
type=str,
help='path to the jsonl file with ia identifiers')
args = parser.parse_args()
input_path = args.Path
process_count = args.processes
if not process_count:
process_count = 1
if not os.path.isfile(input_path):
print('The path specified does not exist')
sys.exit()
RESULTS_PATH = 'results/' + input_path.split('.jsonl')[0] + '/'
books = []
def touch(identifier, record, data=None):
file_name = '{}{}/{}_{}'.format(RESULTS_PATH, identifier, record, identifier)
f = open(file_name, 'a')
if data:
f.write(data)
f.close()
def db_isbn_extracted(identifier, isbn):
touch(identifier, 'ISBN_{}'.format(isbn))
def db_isbn_none(identifier):
touch(identifier, 'UPDATE_NONE')
def db_update_failed(identifier):
touch(identifier, 'UPDATE_FAILED')
def db_update_succeed(identifier):
if os.path.exists('{}{}/UPDATE_FAILED_{}'.format(RESULTS_PATH, identifier, identifier)):
os.remove('{}{}/UPDATE_FAILED_{}'.format(RESULTS_PATH, identifier, identifier))
if os.path.exists('{}{}/UPDATE_CONFLICT_{}'.format(RESULTS_PATH, identifier, identifier)):
os.remove('{}{}/UPDATE_CONFLICT_{}'.format(RESULTS_PATH, identifier, identifier))
touch(identifier, 'UPDATE_SUCCEED')
def db_update_conflict(identifier):
touch(identifier, 'UPDATE_CONFLICT')
def db_urls_found(identifier, urls):
urls_count = len(urls)
urls_data = ''
for url in urls:
urls_data += '{}\n'.format(url)
touch(identifier, 'URLS_{}'.format(urls_count), data=urls_data)
def db_genome_updated(identifier):
touch(identifier, 'GENOME_UPDATED')
def db_sequence_success(identifier):
if os.path.exists('{}{}/SEQUENCE_FAILURE_{}'.format(RESULTS_PATH, identifier, identifier)):
os.remove('{}{}/SEQUENCE_FAILURE_{}'.format(RESULTS_PATH, identifier, identifier))
def db_sequence_failure(identifier, exception):
touch(identifier, 'SEQUENCE_FAILURE', data=str(exception))
def get_canonical_isbn(genome):
c_isbns = None
b_isbns = None
if genome['copyright_page']:
# ISBN's extracted from copyright page
c_isbns = genome['copyright_page'][0]['isbns']
if genome['backpage_isbn']:
# ISBN's extracted from back page
b_isbns = genome['backpage_isbn']
if c_isbns and b_isbns and any(x in c_isbns for x in b_isbns):
# Return matching ISBN from copyright page and back page
return [x for x in c_isbns if x in b_isbns][0]
elif c_isbns:
# Return first ISBN on copyright page
return c_isbns[0]
elif b_isbns:
# Return last isbn on back page
return b_isbns[-1]
def update_isbn(genome):
"""
Updates the Archive.org metadata for an item to add missing ISBN when possible.
Considers any ISBN-like identifiers extracted from the book's sequenced genome
in order to identify a canonical ISBN for this books.
"""
itemid = genome.get('metadata').get('identifier')
# Checks if ia item already has isbn
item = ia.get_item(itemid)
metadata = item.item_metadata['metadata']
if 'isbn' in metadata:
item_isbn = item.item_metadata['metadata']['isbn'][0]
else:
item_isbn = False
genome_isbn = get_canonical_isbn(genome)
if genome_isbn:
db_isbn_extracted(itemid, genome_isbn)
if not item_isbn:
try:
update = item.modify_metadata(dict(isbn=genome_isbn))
if update.status_code == 200:
db_update_succeed(itemid)
except Exception as e:
db_update_failed(itemid)
raise e
else:
db_update_conflict(itemid)
else:
db_isbn_none(itemid)
def extract_urls(genome):
"""
Save item's extracted urls to database.
"""
itemid = genome.get('metadata').get('identifier')
urls = set([url for url in genome['urls'] if 'archive.org' not in url])
db_urls_found(itemid, urls)
with open(input_path) as fin:
for line in fin:
books.append(json.loads(line.replace("\n", ""))['identifier'])
if RESULTS_PATH and not os.path.exists(RESULTS_PATH):
os.makedirs(RESULTS_PATH)
def run_pipeline(book):
try:
genome = None
if not os.path.exists('{}{}/'.format(RESULTS_PATH, book)):
os.makedirs('{}{}/'.format(RESULTS_PATH, book))
if not os.path.exists('{}{}/book_genome.json'.format(RESULTS_PATH, book)):
genome = MINIMAL_SEQUENCER.sequence(book)
genome.save(path=RESULTS_PATH)
genome.upload()
db_genome_updated(book)
genome = genome.results
if not genome:
# Get genome from file if not in memory
f = open('{}{}/book_genome.json'.format(RESULTS_PATH, book),)
genome = json.load(f)
update_failed = os.path.exists('{}{}/UPDATE_FAILED_{}'.format(RESULTS_PATH, book, book))
isbn_attempted = glob.glob('{}{}/ISBN_*'.format(RESULTS_PATH, book))
if update_failed or not isbn_attempted:
update_isbn(genome)
if not glob.glob('{}{}/URLS_*'.format(RESULTS_PATH, book)):
extract_urls(genome)
if not os.path.exists('{}{}/GENOME_UPDATED_{}'.format(RESULTS_PATH, book, book)):
MINIMAL_SEQUENCER._upload(results=genome)
db_genome_updated(book)
db_sequence_success(book)
except Exception:
e = traceback.format_exc()
db_sequence_failure(book, e)
executor = concurrent.futures.ProcessPoolExecutor(process_count)
futures = [executor.submit(run_pipeline, book) for book in books]
concurrent.futures.wait(futures)