Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 74 additions & 45 deletions umbra/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,55 +103,66 @@ def reconnect(self, *args, **kwargs):

def _wait_for_and_browse_urls(self, conn, consumer, timeout):
start = time.time()
browser = None

while not self._consumer_stop.is_set() and time.time() - start < timeout and not self._reconnect_requested:
while not (self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested):

self.logger.debug("Umbra browser pool have %d in use browsers and %d available",
self._browser_pool.num_in_use(), self._browser_pool.num_available())

browser = None
try:
browser = self._browser_pool.acquire() # raises KeyError if none available
browser = self._browser_pool.acquire() # raises brozzler.browser.NoBrowsersAvailable if none available
except brozzler.browser.NoBrowsersAvailable:
# no browsers available
self.logger.debug("No browser available so sleeping")
time.sleep(0.5)
continue
except:
self.logger.critical("problem with browser initialization", exc_info=True)
time.sleep(0.5)
continue
self.logger.info("Aquired browser on port %s", browser.chrome.port)

def callback(body, message):
try:
client_id = body.get('clientId')
url = body['url']
metadata = body.get('metadata')
behavior_parameters = body.get('behaviorParameters')
username = body.get('username')
password = body.get('password')
except:
self.logger.error("unable to decipher message %s",
message, exc_info=True)
self.logger.error("discarding bad message")
message.reject()
browser.stop()
self._browser_pool.release(browser)
return
self._start_browsing_page(
browser, message, client_id, url, metadata,
behavior_parameters, username, password)

consumer.callbacks = [callback]
def callback(body, message):
try:
client_id = body.get('clientId')
url = body['url']
metadata = body.get('metadata')
behavior_parameters = body.get('behaviorParameters')
username = body.get('username')
password = body.get('password')
except:
self.logger.error("unable to decipher message %s",
message, exc_info=True)
self.logger.error("discarding bad message")
message.reject()
return

self.logger.info("Starting to browse url %s with browser %d", url, browser.chrome.port)

self._start_browsing_page(
browser, message, client_id, url, metadata,
behavior_parameters, username, password)

consumer.callbacks = [callback]

try:
while True:
try:
#Handle ONE event
conn.drain_events(timeout=0.5)
break # out of "while True" to acquire another browser
break # out of "while True", to take another go-around in the outer while loop
except socket.timeout:
pass
except socket.error:
self.logger.error("problem consuming messages from AMQP, will try reconnecting after active browsing finishes", exc_info=True)
self._reconnect_requested = True

if self._consumer_stop.is_set() or time.time() - start >= timeout or self._reconnect_requested:
browser.stop()
self.logger.debug("Releasing browser on port %s", browser.chrome.port)
self._browser_pool.release(browser)
break

except brozzler.browser.NoBrowsersAvailable:
# no browsers available
time.sleep(0.5)
except:
self.logger.critical("problem with browser initialization", exc_info=True)
time.sleep(0.5)
finally:
consumer.callbacks = None

Expand Down Expand Up @@ -283,7 +294,6 @@ def browse_page_sync():
'browser=%s client_id=%s url=%s behavior_parameters=%s',
browser, client_id, url, behavior_parameters)
try:
browser.start()
final_page_url, outlinks = browser.browse_page(
url, on_response=on_response,
behavior_parameters=behavior_parameters,
Expand All @@ -293,9 +303,10 @@ def browse_page_sync():
#post_outlinks(outlinks)

message.ack()
except brozzler.PageInterstitialShown as e:
self.logger.info("page interstitial shown, likely unsupported http auth, for url {} - {}".format(url, e))
message.reject()
self.logger.info("Completed outlink extraction")
# except brozzler.PageInterstitialShown as e:
# self.logger.info("page interstitial shown, likely unsupported http auth, for url {} - {}".format(url, e))
# message.reject()
except brozzler.ShutdownRequested as e:
self.logger.info("browsing did not complete normally, requeuing url {} - {}".format(url, e))
message.requeue() # republish?
Expand All @@ -305,9 +316,6 @@ def browse_page_sync():
except:
self.logger.critical("problem browsing page, republishing url {}, may have lost browser process".format(url), exc_info=True)
republish_amqp(self, message)
finally:
browser.stop()
self._browser_pool.release(browser)

def republish_amqp(self, message):
# republish on exception, not requeue!
Expand Down Expand Up @@ -337,12 +345,33 @@ def to_str(bytes_or_str):
return value # Instance of str

def browse_thread_run_then_cleanup():
browse_page_sync()
self.logger.info(
'removing thread %s from self._browsing_threads',
threading.current_thread())
with self._browsing_threads_lock:
self._browsing_threads.remove(threading.current_thread())
th = threading.current_thread()
try: # Try to remove browsing thread

try: #Try for browser pool release

try: #Try for browser start with finally stop
self.logger.info("Starting browser")
browser.start()

self.logger.info("Starting page sync")
browse_page_sync()
self.logger.info("Completed page sync")
finally:
self.logger.debug("Stopping browser")
#Stop the started browser, no matter what happens
browser.stop()
self.logger.debug("Stopped browser")
finally:
self.logger.debug("Releasing browser")
self._browser_pool.release(browser)
self.logger.debug("Released browser")
finally:
#And as this is the last of this thread, deregister it
self.logger.debug(
'Removing from self._browsing_threads')
with self._browsing_threads_lock:
self._browsing_threads.remove(th)

thread_name = "BrowsingThread:%s" % browser.chrome.port
th = threading.Thread(target=browse_thread_run_then_cleanup, name=thread_name)
Expand Down