Skip to content
This repository was archived by the owner on Jan 18, 2023. It is now read-only.

Commit 929761d

Browse files
committed
Add a limit to the number of concurrent jobs
1 parent 7787579 commit 929761d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+186
-1137
lines changed

conftest.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
# External Imports
55
import pytest
66

7+
# Internal Imports
8+
from firststreet import Http
9+
710
pytest_plugins = 'aiohttp.pytest_plugin'
811

912

@@ -22,3 +25,8 @@ def pytest_collection_modifyitems(config, items):
2225
for item in items:
2326
if "stress" in item.keywords:
2427
item.add_marker(skip_stress)
28+
29+
30+
@pytest.fixture(scope='session', autouse=True)
31+
def setup_connection(request):
32+
return Http("", 100, 4950, 60)

firststreet/http_util.py

Lines changed: 106 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66

77
# External Imports
88
import logging
9+
from json.decoder import JSONDecodeError
10+
911
import tqdm
1012
import aiohttp
1113
import ssl
1214
import certifi
1315
from asyncio_throttle import Throttler
16+
from itertools import islice
1417

1518
# Internal Imports
1619
import firststreet.errors as e
@@ -28,7 +31,14 @@ class Http:
2831
rate_period (int): The period of time for the limit
2932
version (str): The version to call the API with
3033
Methods:
34+
endpoint_execute: Sets up the throttler and session for the asynchronous call
3135
execute: Sends a request to the First Street Foundation API for the specified endpoint
36+
tile_response: Handles the response for a tile
37+
product_response: Handles the response for all other products
38+
_parse_rate_limit: Parses the rate limiter returned by the header
39+
_network_error: Handles any network errors returned by the response
40+
limited_as_completed: Limits the number of concurrent coroutines. Prevents Timeout errors due to too
41+
many coroutines
3242
"""
3343

3444
def __init__(self, api_key, connection_limit, rate_limit, rate_period, version=None):
@@ -65,10 +75,9 @@ async def endpoint_execute(self, endpoints):
6575

6676
# Asnycio create tasks for each endpoint
6777
try:
68-
tasks = [asyncio.create_task(self.execute(endpoint, session, throttler)) for endpoint in endpoints]
69-
for t in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)):
70-
await t
71-
ret = [t.result() for t in tasks]
78+
tasks = (asyncio.create_task(self.execute(endpoint, session, throttler)) for endpoint in endpoints)
79+
ret = [await t
80+
for t in tqdm.tqdm(self.limited_as_completed(tasks, self.connection_limit), total=len(endpoints))]
7281

7382
finally:
7483
await session.close()
@@ -91,65 +100,81 @@ async def execute(self, endpoint, session, throttler):
91100
# Retry loop
92101
retry = 0
93102
while retry < 5:
94-
try:
95103

96-
# Throttle
97-
async with throttler:
104+
# Throttle
105+
async with throttler:
106+
107+
try:
98108
async with session.get(endpoint[0], headers=headers) as response:
99109

100-
# Get rate limit from header
101-
rate_limit = self._parse_rate_limit(response.headers)
110+
print(response)
102111

103112
# Read a tile response
104113
if endpoint[2] == 'tile':
105-
body = await response.read()
106-
107-
if response.status != 200 and response.status != 500:
108-
raise self._network_error(self.options, rate_limit,
109-
status=response.reason, message=response.status)
110-
elif response.status == 500:
111-
logging.info(
112-
"Error retrieving tile from server. Check if the coordinates provided "
113-
"are correct: {}".format(endpoint[1]))
114-
return {"coordinate": endpoint[1], "image": None, 'valid_id': False}
115-
116-
return {"coordinate": endpoint[1], "image": body}
114+
return await self.tile_response(response, endpoint)
117115

118116
# Read a json response
119117
else:
120-
body = await response.json(content_type=None)
118+
return await self.product_response(response, endpoint)
121119

122-
if response.status != 200 and response.status != 404 and response.status != 500:
123-
raise self._network_error(self.options, rate_limit, error=body.get('error'))
120+
except (asyncio.TimeoutError, JSONDecodeError) as ex:
121+
logging.info("{} error for item: {} at {}. Retry {}".format(ex.__class__, endpoint[1],
122+
endpoint[0], retry))
123+
retry += 1
124+
await asyncio.sleep(1)
124125

125-
error = body.get("error")
126-
if error:
127-
search_item = endpoint[1]
128-
product = endpoint[2]
129-
product_subtype = endpoint[3]
126+
except aiohttp.ClientError as ex:
127+
logging.error("{} error getting item: {} from {}".format(ex.__class__, endpoint[1], endpoint[0]))
128+
return {'search_item': endpoint[1]}
130129

131-
if product == 'adaptation' and product_subtype == 'detail':
132-
return {'adaptationId': search_item, 'valid_id': False}
130+
logging.error("Timeout error after 5 retries for search_item: {} from {}".format(endpoint[1], endpoint[0]))
131+
return {'search_item': endpoint[1]}
133132

134-
elif product == 'historic' and product_subtype == 'event':
135-
return {'eventId': search_item, 'valid_id': False}
133+
async def tile_response(self, response, endpoint):
136134

137-
else:
138-
return {'fsid': search_item, 'valid_id': False}
135+
# Get rate limit from header
136+
rate_limit = self._parse_rate_limit(response.headers)
139137

140-
return body
138+
if response.status != 200 and response.status != 500:
139+
raise self._network_error(self.options, rate_limit,
140+
status=response.reason, message=response.status)
141141

142-
except asyncio.TimeoutError:
143-
logging.info("Timeout error for item: {} at {}. Retry {}".format(endpoint[1], endpoint[0], retry))
144-
retry += 1
145-
await asyncio.sleep(1)
142+
elif response.status == 500:
143+
logging.info(
144+
"Error retrieving tile from server. Check if the coordinates provided "
145+
"are correct: {}".format(endpoint[1]))
146+
return {"coordinate": endpoint[1], "image": None, 'valid_id': False}
146147

147-
except aiohttp.ClientError as ex:
148-
logging.error("{} error while getting item: {} from {}".format(ex.__class__, endpoint[1], endpoint[0]))
149-
return {'search_item': endpoint[1]}
148+
body = await response.read()
150149

151-
logging.error("Timeout error after 5 retries for search_item: {} from {}".format(endpoint[1], endpoint[0]))
152-
return {'search_item': endpoint[1]}
150+
return {"coordinate": endpoint[1], "image": body}
151+
152+
async def product_response(self, response, endpoint):
153+
154+
# Get rate limit from header
155+
rate_limit = self._parse_rate_limit(response.headers)
156+
157+
body = await response.json(content_type=None)
158+
159+
if response.status != 200 and response.status != 404 and response.status != 500:
160+
raise self._network_error(self.options, rate_limit, error=body.get('error'))
161+
162+
error = body.get("error")
163+
if error:
164+
search_item = endpoint[1]
165+
product = endpoint[2]
166+
product_subtype = endpoint[3]
167+
168+
if product == 'adaptation' and product_subtype == 'detail':
169+
return {'adaptationId': search_item, 'valid_id': False}
170+
171+
elif product == 'historic' and product_subtype == 'event':
172+
return {'eventId': search_item, 'valid_id': False}
173+
174+
else:
175+
return {'fsid': search_item, 'valid_id': False}
176+
177+
return body
153178

154179
@staticmethod
155180
def _parse_rate_limit(headers):
@@ -197,3 +222,39 @@ def _network_error(options, rate_limit, error=None, status=None, message=None):
197222
503: e.OfflineError(message=formatted, attachments={"options": options, "rate_limit": rate_limit}),
198223
}.get(status,
199224
e.UnknownError(message=formatted, attachments={"options": options, "rate_limit": rate_limit}))
225+
226+
@staticmethod
227+
def limited_as_completed(coros, limit):
228+
"""
229+
Run the coroutines (or futures) supplied in the
230+
iterable coros, ensuring that there are at most
231+
limit coroutines running at any time.
232+
Return an iterator whose values, when waited for,
233+
are Future instances containing the results of
234+
the coroutines.
235+
Results may be provided in any order, as they
236+
become available.
237+
238+
https://github.com/andybalaam/asyncioplus/blob/master/asyncioplus/limited_as_completed.py
239+
"""
240+
futures = [
241+
asyncio.ensure_future(c)
242+
for c in islice(coros, 0, limit)
243+
]
244+
245+
async def first_to_finish():
246+
while True:
247+
await asyncio.sleep(0)
248+
for f in futures:
249+
if f.done():
250+
futures.remove(f)
251+
try:
252+
newf = next(coros)
253+
futures.append(
254+
asyncio.ensure_future(newf))
255+
except StopIteration as e:
256+
pass
257+
return f.result()
258+
259+
while len(futures) > 0:
260+
yield first_to_finish()

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
setup(
1313
name='fsf-api-access_python',
14-
version='2.1.0',
14+
version='2.1.1',
1515
description='A Python API Access Client for the First Street Foundation API',
1616
url='https://github.com/FirstStreet/fsf_api_access_python',
1717
project_urls={

tests/data_text/adaptation_detail.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/data_text/adaptation_summary_cd.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/data_text/adaptation_summary_city.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/data_text/adaptation_summary_county.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/data_text/adaptation_summary_neighborhood.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/data_text/adaptation_summary_property.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/data_text/adaptation_summary_state.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)