This repository was archived by the owner on Sep 24, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathcommon.py
More file actions
executable file
·310 lines (276 loc) · 11 KB
/
common.py
File metadata and controls
executable file
·310 lines (276 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# coding: utf-8
'''
common.py
Provides common functions used throughout the program.
Currently these functions relate to the downloading
and reading in of a file and is used in both gp_baseline
and change_log.
'''
from ftplib import FTP
from lxml import etree
import gzip
import os
import re
import shutil
import time
import urllib.parse
import urllib.request
def download(url, fileName=None):
def getFileName(url, openUrl):
if 'Content-Disposition' in openUrl.info():
# If the response has Content-Disposition, try to get filename from
# it
cd = dict(map(
lambda x: x.strip().split('=') if '=' in x else (x.strip(), ''),
openUrl.info()['Content-Disposition'].split(';')))
if 'filename' in cd:
filename = cd['filename'].strip("\"'")
if filename:
return filename
# if no filename was found above, parse it out of the final URL.
return os.path.basename(urllib.parse.urlsplit(openUrl.url)[2])
if url.startswith("ftp://"):
urltokens = urllib.parse.urlsplit(url)
ftp = FTP(urltokens.netloc)
ftp.login()
moddt = ftp.sendcmd("MDTM " + urltokens.path)
if fileName is None:
fileName = os.path.basename(urltokens.path)
with open(fileName, "wb") as ftpf, open(fileName + ".info", 'w') as info:
ftp.retrbinary("RETR " + urltokens.path, ftpf.write)
info.write("URL: " + url + "\n")
info.write("Filename: " + fileName + "\n")
info.write("Last modified: " + moddt.split(" ")[1] + "\n")
info.write(
"Downloaded at: " +
time.strftime("%Y-%m-%d %H:%M:%S") +
"\n")
else:
with urllib.request.urlopen(url) as r, open(fileName, 'wb') as f, open(fileName + ".info", 'w') as info:
fileName = fileName or getFileName(url, r)
shutil.copyfileobj(r, f)
info.write("URL: " + url + "\n")
info.write("Filename: " + fileName + "\n")
if 'Last-Modified' in r.info():
info.write(
"Last modified: " +
r.info()['Last-Modified'].strip("\"'") +
"\n")
info.write(
"Downloaded at: " +
time.strftime("%Y-%m-%d %H:%M:%S") +
"\n")
def gzip_to_text(gzip_file, encoding="iso-8859-1"):
with gzip.open(gzip_file) as gzf:
for line in gzf:
yield str(line, encoding)
def get_latest_GO_filename(go_file):
""" Get the name of the current GO termdb.obo-xml.gz file. """
url = go_file
if url[-3:] == '.gz':
url = url[:url.rfind('/')]
# read the index info of latest-full
try:
src = urllib.request.urlopen(url).read().decode("utf-8")
except:
print(
'WARNING! [function get_latest_GO_filename] Unable to fetch URL: %s\n' %
(url))
return go_file
# file matching pattern for resoure filename
p_fn = re.compile('go_\d+-termdb.obo-xml.gz', re.M | re.S)
try:
fn = p_fn.findall(src)[0]
go_file = '/'.join([url, fn])
except:
# unable to locate resoure filename
print(
'WARNING! [function get_latest_GO_filename] Unable to identify data file in %s\n' %
(url))
pass
return go_file
def get_latest_MeSH_filename(url, prefix, suffix):
""" Get the URL of the current MeSH file, given the directory url and file prefix.
For example, the ASCII MeSH Descriptors file will start with prefix 'd' and be found in
ftp://nlmpubs.nlm.nih.gov/online/mesh/.asciimesh/. """
try:
directory = urllib.request.urlopen(url)
except:
print('WARNING! unable to fetch URL: {0}'.format(url))
filenames = []
for line in directory:
line = line.decode('cp1252')
filenames.append(line.split()[-1])
filenames = sorted([filename for filename in filenames if (
filename.startswith(prefix) and filename.endswith(suffix))])
current_file = '/'.join([url, filenames[-1]])
return current_file
p1 = re.compile('Last modified: ?(.*?)[\n|$]', re.M | re.S)
p2 = re.compile('Downloaded at: ?(.*?)[\n|$]', re.M | re.S)
p3 = re.compile('Filename: ?(.*?)[\n|$]', re.M | re.S)
p4 = re.compile('URL: ?(.*?)[\n|$]', re.M | re.S)
p_chebi_1 = re.compile('ChEBI Release version \d+')
# XX:XX:XXXX XX:XX
p_chebi_2 = re.compile('\d{2}:\d{2}:\d{4} \d{2}:\d{2}')
p_go_1 = re.compile(
'\<data-version\>.*?(\d\d\d\d-\d\d-\d\d)\<\/data-version\>')
p_go_2 = re.compile('\<date\>(\d\d:\d\d:\d\d\d\d).*?\<\/date\>')
p_rgd_1 = re.compile('# GENERATED-ON: (\d\d\d\d\/\d\d\/\d\d)')
def get_citation_info(name, header, data_file):
"""
Add Namespace, Citation and Author values
+ affy: (datasets/affy.xml)
+ chebi: (datasets/chebi.owl)
<owl:versionIRI rdf:datatype="http://www.w3.org/2001/XMLSchema#string">109</owl:versionIRI>
<dc:date rdf:datatype="http://www.w3.org/2001/XMLSchema#string">2013-11-01 17:17</dc:date>
+ do: (datasets/doid.owl)
<owl:versionIRI rdf:resource="http://purl.obolibrary.org/obo/doid/releases/2014-04-12/doid.owl"/>
+ go: (datasets/gobp.xml.gz)
<data-version>2013-09-28</data-version>
<date>27:09:2013 10:57</date>
mesh: ?
+ rgd: (datasets/rgd.txt)
# GENERATED-ON: 2013/11/01
"""
from configuration import data_file_info
header = header.replace(
'\nCreatedDateTime=[#VALUE#]',
'\nCreatedDateTime=' +
time.strftime("%Y-%m-%dT%X"))
header = header.replace('\nVersionString=[#VALUE#]',
'\nVersionString=' + time.strftime("%Y%m%d"))
header = header.replace(
'\nCopyrightString=Copyright (c) [#VALUE#]',
'\nCopyrightString=Copyright (c) ' +
time.strftime("%Y"))
new_data_file = data_file_info.get(name)
if new_data_file:
# use alternative reference if exists
data_file = new_data_file
info_file = data_file + '.info'
try:
info_text = open('./datasets/' + info_file).read()
except:
info_text = None
print('WARNING - could not open {0}'.format(info_file))
# placeholders
pubver = 'NA'
pubdate = time.strftime("%Y-%m-%d")
# parse version and date from info file
try:
# Last modified from .info file
pubdate = p1.search(info_text).group(1)
except:
try:
# Downloaded at from .info file
pubdate = p2.search(info_text).group(1)
except:
# default above - today's date
pass
if pubdate:
tv = None
# if pubdate a string of digits, reformat it
if re.match('^\d+$', pubdate):
tt = time.strptime(pubdate, '%Y%m%d%H%M%S')
pubdate = time.strftime("%a, %d %b %Y %H:%M:%S", tt)
pubdate = pubdate.replace(' GMT', '')
if re.match(
'^[A-Za-z]{3}, \d\d [A-Za-z]{3} \d{4} \d\d:\d\d:\d\d$',
pubdate):
tv = time.strptime(pubdate, "%a, %d %b %Y %H:%M:%S")
elif re.match('^\d{4}-\d\d-\d\d \d\d:\d\d:\d\d', pubdate):
tv = time.strptime(pubdate, "%Y-%m-%d %H:%M:%S")
if tv:
pubver = time.strftime("%a, %d %b %Y %H:%M:%S", tv)
pubdate = time.strftime("%Y-%m-%d", tv)
else:
# don't seem to hit this case
pass
# parse version and date from dataset file
if data_file.find('chebi') >= 0:
f = open('./datasets/' + data_file, 'r')
# break loop on 0 value
sentinel = 2
while True:
line = f.readline().strip()
if not line:
break
if line.find('ChEBI Release version') > 0:
pubver = p_chebi_1.search(line).group(0)
sentinel -= 1
elif line.find('oboInOwl:date') > 0:
pubdate = p_chebi_2.search(line).group(0)
sentinel -= 1
if sentinel == 0:
break
f.close()
elif data_file.find('doid') >= 0:
f = open('./datasets/' + data_file, 'r')
while True:
line = f.readline().strip()
if not line:
break
# if line.find('owl:versionIRI') > 0:
# pubdate = p_do_1.search(line).group(1)
# d,m,y = pubdate.split(':')
# pubdate = '-'.join([y,m,d])
pubver = pubdate
break
f.close()
elif data_file.find('go') >= 0 and data_file:
f = gzip.open('./datasets/' + data_file, 'r')
while True:
line = f.readline().strip()
if not line:
break
line = str(line, 'utf-8')
if line.find('<data-version>') >= 0:
pubver = p_go_1.search(line).group(1)
elif line.find('<date>') >= 0:
pubdate = p_go_2.search(line).group(1)
d, m, y = pubdate.split(':')
pubdate = '-'.join([y, m, d])
break
f.close()
elif data_file.find('rgd') >= 0:
f = open('./datasets/' + data_file, 'r')
while True:
line = f.readline().strip()
if not line:
break
if line.find('# GENERATED-ON') >= 0:
pubdate = p_rgd_1.search(line).group(1)
pubdate = pubdate.replace('/', '-')
pubver = pubdate
break
f.close()
elif data_file.find('affy') >= 0:
f = etree.iterparse('./datasets/' + data_file)
for action, elem in f:
# mapping version and date to HG-U133_Plus_2 Array
if elem.tag == 'Array' and elem.get('name') == 'HG-U133_Plus_2':
for n in elem.findall('Annotation'):
if n.get('type') == 'Annot CSV':
# date format e.g., "Oct 30, 2012"
annofile = n.find('File')
date = annofile.get('date')
tv = time.strptime(date, "%b %d, %Y")
pubdate = time.strftime("%Y-%m-%d", tv)
pubver = annofile.get('name').split('.')[1]
break
elif data_file.find('mesh') >= 0:
# get MeSH version from info file download URL
if info_text:
pubver = p4.search(info_text).group(1)
pubver = pubver.split('/')[-1]
pubver = pubver.lstrip('d').rstrip('.bin')
# pubdate = p1.search(info_text).group(1)
# if re.match('^\d+$', pubdate):
# tt = time.strptime(pubdate, '%Y%m%d%H%M%S')
# pubdate = time.strftime("%Y-%m-%d", tt)
header = header.replace('\nPublishedVersionString=[#VALUE#]',
'\nPublishedVersionString=' + pubver)
header = header.replace('\nPublishedDate=[#VALUE#]',
'\nPublishedDate=' + pubdate)
print('...%s : %s -- %s' % (name, pubdate, pubver))
return header