-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathapi_base_client.py
More file actions
251 lines (215 loc) · 8.74 KB
/
api_base_client.py
File metadata and controls
251 lines (215 loc) · 8.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import logging
import uuid
from .exceptions import http_exceptions_dict, InvalidResponse, RequestFailed
from functools import wraps
from io import FileIO
from munch import munchify
from os import path
from requests import Session
from requests.auth import HTTPBasicAuth as BasicAuth
from simplejson import loads
logger = logging.getLogger(__name__)
IGNORE_PARAMS = ('uri', 'path')
def verify_file(fn):
@wraps(fn)
def wrapper(self, file_, *args, **kwargs):
if isinstance(file_, basestring):
# Using FileIO here instead of open()
# to be able to override the filename
# which is later used when uploading the file.
#
# Explanation:
#
# 1) requests reads the filename
# from "name" attribute of a file-like object,
# there is no other way to specify a filename;
#
# 2) The attribute may contain the full path to file,
# which does not work well as a filename;
#
# 3) The attribute is readonly when using open(),
# unlike FileIO object.
file_ = FileIO(file_, 'rb')
file_.name = path.basename(file_.name)
if hasattr(file_, 'read'):
# A file-like object must have 'read' method
output = fn(self, file_, *args, **kwargs)
file_.close()
return output
else:
try:
file_.close()
except AttributeError:
pass
raise TypeError('Expected either a string '
'containing a path to file or a '
'file-like object, got {}'.format(type(file_)))
return wrapper
class APITemplateClient(object):
"""base class for API"""
def __init__(self, login_pass=None, headers=None, user_agent=None):
self.headers = headers or {}
self.session = Session()
if login_pass is not None:
self.session.auth = BasicAuth(*login_pass)
if user_agent is None:
self.session.headers['User-Agent'] \
= 'op.client/{}'.format(uuid.uuid4().hex)
else:
self.session.headers['User-Agent'] = user_agent
def request(self, method, path=None, payload=None, json=None,
headers=None, params_dict=None, file_=None):
_headers = self.headers.copy()
_headers.update(headers or {})
if file_:
_headers.pop('Content-Type', None)
response = self.session.request(
method, path, data=payload, json=json, headers=_headers,
params=params_dict, files=file_
)
if response.status_code >= 400:
raise http_exceptions_dict\
.get(response.status_code, RequestFailed)(response)
return response
class APIBaseClient(APITemplateClient):
"""base class for API"""
host_url = 'https://api-sandbox.openprocurement.org'
api_version = '2.0'
headers = {'Content-Type': 'application/json'}
def __init__(self,
key,
resource,
host_url=None,
api_version=None,
params=None,
ds_client=None,
user_agent=None):
super(APIBaseClient, self)\
.__init__(login_pass=(key, ''), headers=self.headers,
user_agent=user_agent)
self.ds_client = ds_client
self.host_url = host_url or self.host_url
self.api_version = api_version or self.api_version
if not isinstance(params, dict):
params = {'mode': '_all_'}
self.params = params or {}
# To perform some operations (e.g. create a tender)
# we first need to obtain a cookie. For that reason,
# here we send a HEAD request to a neutral URL.
response = self.session.request(
'HEAD', '{}/api/{}/spore'.format(self.host_url, self.api_version)
)
response.raise_for_status()
self.prefix_path = '{}/api/{}/{}'\
.format(self.host_url, self.api_version, resource)
@staticmethod
def _get_access_token(obj):
return getattr(getattr(obj, 'access', ''), 'token', '')
def _update_params(self, params):
for key in params:
if key not in IGNORE_PARAMS:
self.params[key] = params[key]
def _create_resource_item(self, url, payload, headers=None, method='POST'):
_headers = self.headers.copy()
_headers.update(headers or {})
response_item = self.request(
method, url, headers=_headers, json=payload
)
if (response_item.status_code == 201 and method == 'POST') \
or (response_item.status_code in (200, 204)
and method in ('PUT', 'DELETE')):
return munchify(loads(response_item.text))
raise InvalidResponse(response_item)
def _get_resource_item(self, url, headers=None):
_headers = self.headers.copy()
_headers.update(headers or {})
response_item = self.request('GET', url, headers=_headers)
if response_item.status_code == 200:
return munchify(loads(response_item.text))
raise InvalidResponse(response_item)
def _patch_resource_item(self, url, payload, headers=None):
_headers = self.headers.copy()
_headers.update(headers or {})
response_item = self.request(
'PATCH', url, headers=_headers, json=payload
)
if response_item.status_code == 200:
return munchify(loads(response_item.text))
raise InvalidResponse(response_item)
def _upload_resource_file(
self, url, file_=None, headers=None, method='POST',
use_ds_client=True, doc_registration=True
):
if use_ds_client and self.ds_client:
if doc_registration:
response = self.ds_client.document_upload_registered(
file_=file_, headers=headers
)
else:
response = self.ds_client.document_upload_not_registered(
file_=file_, headers=headers
)
payload = {'data': response['data']}
response = self._create_resource_item(
url,
headers=headers,
payload=payload,
method=method
)
else:
if use_ds_client:
logger.warning('use_ds_client parameter is True while '
'DS-client is not passed to the client '
'constructor.')
logger.warning(
'File upload/download/delete outside of the Document Service '
'is deprecated'
)
response = self.request(
method, url, headers=headers, file_={'file': file_}
)
if response.status_code in (201, 200):
response = munchify(loads(response.text))
else:
raise InvalidResponse(response)
return response
def _delete_resource_item(self, url, headers=None):
_headers = self.headers.copy()
_headers.update(headers or {})
response_item = self.request('DELETE', url, headers=_headers)
if response_item.status_code == 200:
return munchify(loads(response_item.text))
raise InvalidResponse(response_item)
def _patch_obj_resource_item(self, patched_obj, item_obj, items_name):
return self._patch_resource_item(
'{}/{}/{}/{}'.format(
self.prefix_path, patched_obj.data.id,
items_name, item_obj['data']['id']
),
payload=item_obj,
headers={'X-Access-Token': self._get_access_token(patched_obj)}
)
def patch_document(self, obj, document):
return self._patch_obj_resource_item(obj, document, 'documents')
@verify_file
def upload_document(self, file_, obj, use_ds_client=True,
doc_registration=True):
return self._upload_resource_file(
'{}/{}/documents'.format(
self.prefix_path,
obj.data.id
),
file_=file_,
headers={'X-Access-Token': self._get_access_token(obj)},
use_ds_client=use_ds_client,
doc_registration=doc_registration
)
def get_resource_item(self, id, headers=None):
return self._get_resource_item('{}/{}'.format(self.prefix_path, id),
headers=headers)
def patch_credentials(self, id, access_token):
return self._patch_resource_item(
'{}/{}/credentials'.format(self.prefix_path, id),
payload=None,
headers={'X-Access-Token': access_token}
)