diff --git a/setup.py b/setup.py index 839faec..ed2d1be 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -VERSION = "1.5.21" +VERSION = "1.5.22" with open("requirements.txt") as f: requirements = f.read().splitlines() diff --git a/surge/reports.py b/surge/reports.py index 8a66b7e..06579d0 100644 --- a/surge/reports.py +++ b/surge/reports.py @@ -1,5 +1,5 @@ import gzip -from time import sleep +from time import sleep, monotonic import urllib import tempfile import shutil @@ -34,9 +34,18 @@ def save_report( filepath=None, poll_time=5 * 60, api_key: str = None, + poll_interval: float = 2, ): """ Request creation of a report, poll until the report is generated, and save the data to a file all in one call. + + Calls ``request`` once to start (or reuse) a generation job and + then polls ``check_status`` against the returned ``job_id`` + until that specific job finishes. Polling a specific job — + rather than re-calling ``request`` in a loop — avoids + re-triggering generation on projects that are still receiving + responses, which would otherwise cause this call to hang. + Arguments: project_id (string): UUID of project to get data for type (string): Must be one of these types: @@ -46,49 +55,52 @@ def save_report( * `export_csv_aggregated` * `export_csv_flattened` filepath (string or IO or None): Location to save the results file. If not specified, will save to "project_{project_id}_results.{csv/json} - poll_time (int): Number of seconds to poll for the report + poll_time (int): Maximum number of seconds to wait for the report to be generated + poll_interval (int or float): Seconds to wait between status checks """ - for _ in range(poll_time // 2): - response = cls.request(project_id=project_id, - type=type, - api_key=api_key) - # Download zipped project results if ready - if response.status == "READY": - file_ext = "csv" if "csv" in type else "json" - default_file_name = ( - "project_{project_id}_results.{file_ext}.gzip".format( - project_id=project_id, file_ext=file_ext)) - with urllib.request.urlopen(response.url) as response: - with tempfile.NamedTemporaryFile() as tmp_file: - shutil.copyfileobj(response, tmp_file) - tmp_file.flush() - # Unzip and save results - data = gzip.open(tmp_file.name, "r").read() - filepath = filepath or default_file_name.rstrip( - ".gzip") - if isinstance(filepath, str): - file = open( - filepath or default_file_name.rstrip(".gzip"), - "wb") - else: - file = filepath - file.write(data) - if isinstance(filepath, str): - file.close() - return data - - # Wait two seconds before polling again - elif response.status == "CREATING": - sleep(2) - continue - else: - raise ValueError( - "Report failed to generate with status {}".format( - response.status)) - - raise Exception( - "Report failed to generate within {poll_time} seconds".format( - poll_time=poll_time)) + response = cls.request(project_id=project_id, + type=type, + api_key=api_key) + # Capture the job_id from the initial CREATING response and + # reuse it across polls. check_status's IN_PROGRESS response + # does not include job_id; only RETRYING does (and means the + # server kicked off a new underlying job). + job_id = getattr(response, "job_id", None) + deadline = monotonic() + poll_time + while response.status in ("CREATING", "IN_PROGRESS", "RETRYING"): + if monotonic() >= deadline: + raise Exception( + "Report failed to generate within {poll_time} seconds". + format(poll_time=poll_time)) + sleep(poll_interval) + response = cls.check_status(project_id, job_id, api_key=api_key) + if response.status == "RETRYING": + job_id = response.job_id + + if response.status not in ("READY", "COMPLETED"): + raise ValueError("Report failed to generate with status {}".format( + response.status)) + + file_ext = "csv" if "csv" in type else "json" + default_file_name = "project_{project_id}_results.{file_ext}".format( + project_id=project_id, file_ext=file_ext) + target = filepath or default_file_name + with urllib.request.urlopen(response.url) as remote: + with tempfile.NamedTemporaryFile() as tmp_file: + shutil.copyfileobj(remote, tmp_file) + tmp_file.flush() + tmp_file.seek(0) + # Read via the open file handle rather than reopening + # tmp_file.name — NamedTemporaryFile cannot be reopened + # by path on Windows while the original handle is open. + with gzip.GzipFile(fileobj=tmp_file, mode="rb") as gz: + data = gz.read() + if isinstance(target, str): + with open(target, "wb") as f: + f.write(data) + else: + target.write(data) + return data @classmethod def download_json(cls, diff --git a/tests/test_reports.py b/tests/test_reports.py index 2fbcb0b..2b0341a 100644 --- a/tests/test_reports.py +++ b/tests/test_reports.py @@ -1,12 +1,116 @@ +import gzip +import io from surge import Report from surge.errors import SurgeRequestError from unittest import mock import pytest +def _gzipped(payload: bytes) -> bytes: + buf = io.BytesIO() + with gzip.GzipFile(fileobj=buf, mode="wb") as gz: + gz.write(payload) + return buf.getvalue() + + def test_save_report_on_empty_project_raises_an_error(): with mock.patch.object(Report, "post") as mock_post: mock_post.return_value = {"error": "Project has no responses"} with pytest.raises(SurgeRequestError): Report.save_report("fake_project_id", "export_csv", "my_report.csv") + + +def test_save_report_downloads_when_request_returns_ready(): + """If request returns READY, no polling is needed.""" + payload = b'[{"a": 1}]' + ready = Report(status="READY", url="https://signed.example/report.gz") + sink = io.BytesIO() + with mock.patch.object(Report, "request", + return_value=ready) as mock_request, \ + mock.patch.object(Report, "check_status") as mock_check, \ + mock.patch("urllib.request.urlopen") as mock_urlopen: + mock_urlopen.return_value.__enter__.return_value = io.BytesIO( + _gzipped(payload)) + Report.save_report("proj-123", "export_json", filepath=sink) + mock_request.assert_called_once() + mock_check.assert_not_called() + assert sink.getvalue() == payload + + +def test_save_report_polls_check_status_for_returned_job_id(): + """When request returns CREATING, poll check_status against that job_id. + + IN_PROGRESS responses from check_status do not include a job_id — + we have to remember the one from the initial CREATING response + rather than re-reading it on each iteration. + """ + payload = b"a,b\n1,2\n" + creating = Report(status="CREATING", job_id="job-abc") + in_progress = Report(status="IN_PROGRESS") + completed = Report(status="COMPLETED", + url="https://signed.example/report.gz") + sink = io.BytesIO() + with mock.patch.object(Report, "request", + return_value=creating) as mock_request, \ + mock.patch.object(Report, "check_status", + side_effect=[in_progress, completed]) as mock_check, \ + mock.patch("urllib.request.urlopen") as mock_urlopen, \ + mock.patch("surge.reports.sleep") as mock_sleep: + mock_urlopen.return_value.__enter__.return_value = io.BytesIO( + _gzipped(payload)) + Report.save_report("proj-123", "export_csv", filepath=sink) + mock_request.assert_called_once() + assert mock_check.call_count == 2 + for call in mock_check.call_args_list: + assert call.args[:2] == ("proj-123", "job-abc") + assert mock_sleep.call_count == 2 + assert sink.getvalue() == payload + + +def test_save_report_switches_job_id_on_retrying(): + """RETRYING responses include a new job_id; subsequent polls use it.""" + payload = b'[]' + creating = Report(status="CREATING", job_id="job-abc") + retrying = Report(status="RETRYING", job_id="job-xyz") + in_progress = Report(status="IN_PROGRESS") + completed = Report(status="COMPLETED", + url="https://signed.example/report.gz") + sink = io.BytesIO() + with mock.patch.object(Report, "request", return_value=creating), \ + mock.patch.object(Report, "check_status", + side_effect=[retrying, in_progress, completed]) as mock_check, \ + mock.patch("urllib.request.urlopen") as mock_urlopen, \ + mock.patch("surge.reports.sleep"): + mock_urlopen.return_value.__enter__.return_value = io.BytesIO( + _gzipped(payload)) + Report.save_report("proj-123", "export_json", filepath=sink) + job_ids = [call.args[1] for call in mock_check.call_args_list] + assert job_ids == ["job-abc", "job-xyz", "job-xyz"] + + +def test_save_report_raises_on_unexpected_status(): + creating = Report(status="CREATING", job_id="job-abc") + error = Report(status="ERROR", type="Report generation error") + with mock.patch.object(Report, "request", return_value=creating), \ + mock.patch.object(Report, "check_status", return_value=error), \ + mock.patch("surge.reports.sleep"): + with pytest.raises(ValueError, match="ERROR"): + Report.save_report("proj-123", + "export_json", + filepath=io.BytesIO()) + + +def test_save_report_times_out_when_job_never_completes(): + creating = Report(status="CREATING", job_id="job-abc") + in_progress = Report(status="IN_PROGRESS", job_id="job-abc") + with mock.patch.object(Report, "request", return_value=creating), \ + mock.patch.object(Report, "check_status", + return_value=in_progress), \ + mock.patch("surge.reports.sleep"), \ + mock.patch("surge.reports.monotonic", + side_effect=[0.0, 1000.0, 2000.0]): + with pytest.raises(Exception, match="within 300 seconds"): + Report.save_report("proj-123", + "export_json", + filepath=io.BytesIO())