Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = "1.5.21"
VERSION = "1.5.22"

with open("requirements.txt") as f:
requirements = f.read().splitlines()
Expand Down
98 changes: 55 additions & 43 deletions surge/reports.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import gzip
from time import sleep
from time import sleep, monotonic
import urllib
import tempfile
import shutil
Expand Down Expand Up @@ -34,9 +34,18 @@ def save_report(
filepath=None,
poll_time=5 * 60,
api_key: str = None,
poll_interval: float = 2,
):
"""
Request creation of a report, poll until the report is generated, and save the data to a file all in one call.

Calls ``request`` once to start (or reuse) a generation job and
then polls ``check_status`` against the returned ``job_id``
until that specific job finishes. Polling a specific job —
rather than re-calling ``request`` in a loop — avoids
re-triggering generation on projects that are still receiving
responses, which would otherwise cause this call to hang.

Arguments:
project_id (string): UUID of project to get data for
type (string): Must be one of these types:
Expand All @@ -46,49 +55,52 @@ def save_report(
* `export_csv_aggregated`
* `export_csv_flattened`
filepath (string or IO or None): Location to save the results file. If not specified, will save to "project_{project_id}_results.{csv/json}
poll_time (int): Number of seconds to poll for the report
poll_time (int): Maximum number of seconds to wait for the report to be generated
poll_interval (int or float): Seconds to wait between status checks
"""
for _ in range(poll_time // 2):
response = cls.request(project_id=project_id,
type=type,
api_key=api_key)
# Download zipped project results if ready
if response.status == "READY":
file_ext = "csv" if "csv" in type else "json"
default_file_name = (
"project_{project_id}_results.{file_ext}.gzip".format(
project_id=project_id, file_ext=file_ext))
with urllib.request.urlopen(response.url) as response:
with tempfile.NamedTemporaryFile() as tmp_file:
shutil.copyfileobj(response, tmp_file)
tmp_file.flush()
# Unzip and save results
data = gzip.open(tmp_file.name, "r").read()
filepath = filepath or default_file_name.rstrip(
".gzip")
if isinstance(filepath, str):
file = open(
filepath or default_file_name.rstrip(".gzip"),
"wb")
else:
file = filepath
file.write(data)
if isinstance(filepath, str):
file.close()
return data

# Wait two seconds before polling again
elif response.status == "CREATING":
sleep(2)
continue
else:
raise ValueError(
"Report failed to generate with status {}".format(
response.status))

raise Exception(
"Report failed to generate within {poll_time} seconds".format(
poll_time=poll_time))
response = cls.request(project_id=project_id,
type=type,
api_key=api_key)
# Capture the job_id from the initial CREATING response and
# reuse it across polls. check_status's IN_PROGRESS response
# does not include job_id; only RETRYING does (and means the
# server kicked off a new underlying job).
job_id = getattr(response, "job_id", None)
deadline = monotonic() + poll_time
while response.status in ("CREATING", "IN_PROGRESS", "RETRYING"):
if monotonic() >= deadline:
raise Exception(
"Report failed to generate within {poll_time} seconds".
format(poll_time=poll_time))
sleep(poll_interval)
response = cls.check_status(project_id, job_id, api_key=api_key)
if response.status == "RETRYING":
job_id = response.job_id

if response.status not in ("READY", "COMPLETED"):
raise ValueError("Report failed to generate with status {}".format(
response.status))

file_ext = "csv" if "csv" in type else "json"
default_file_name = "project_{project_id}_results.{file_ext}".format(
project_id=project_id, file_ext=file_ext)
target = filepath or default_file_name
with urllib.request.urlopen(response.url) as remote:
with tempfile.NamedTemporaryFile() as tmp_file:
shutil.copyfileobj(remote, tmp_file)
tmp_file.flush()
tmp_file.seek(0)
# Read via the open file handle rather than reopening
# tmp_file.name — NamedTemporaryFile cannot be reopened
# by path on Windows while the original handle is open.
with gzip.GzipFile(fileobj=tmp_file, mode="rb") as gz:
data = gz.read()
if isinstance(target, str):
with open(target, "wb") as f:
f.write(data)
else:
target.write(data)
return data

@classmethod
def download_json(cls,
Expand Down
104 changes: 104 additions & 0 deletions tests/test_reports.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,116 @@
import gzip
import io
from surge import Report
from surge.errors import SurgeRequestError
from unittest import mock
import pytest


def _gzipped(payload: bytes) -> bytes:
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
gz.write(payload)
return buf.getvalue()


def test_save_report_on_empty_project_raises_an_error():
with mock.patch.object(Report, "post") as mock_post:
mock_post.return_value = {"error": "Project has no responses"}
with pytest.raises(SurgeRequestError):
Report.save_report("fake_project_id", "export_csv",
"my_report.csv")


def test_save_report_downloads_when_request_returns_ready():
"""If request returns READY, no polling is needed."""
payload = b'[{"a": 1}]'
ready = Report(status="READY", url="https://signed.example/report.gz")
sink = io.BytesIO()
with mock.patch.object(Report, "request",
return_value=ready) as mock_request, \
mock.patch.object(Report, "check_status") as mock_check, \
mock.patch("urllib.request.urlopen") as mock_urlopen:
mock_urlopen.return_value.__enter__.return_value = io.BytesIO(
_gzipped(payload))
Report.save_report("proj-123", "export_json", filepath=sink)
mock_request.assert_called_once()
mock_check.assert_not_called()
assert sink.getvalue() == payload


def test_save_report_polls_check_status_for_returned_job_id():
"""When request returns CREATING, poll check_status against that job_id.

IN_PROGRESS responses from check_status do not include a job_id —
we have to remember the one from the initial CREATING response
rather than re-reading it on each iteration.
"""
payload = b"a,b\n1,2\n"
creating = Report(status="CREATING", job_id="job-abc")
in_progress = Report(status="IN_PROGRESS")
completed = Report(status="COMPLETED",
url="https://signed.example/report.gz")
sink = io.BytesIO()
with mock.patch.object(Report, "request",
return_value=creating) as mock_request, \
mock.patch.object(Report, "check_status",
side_effect=[in_progress, completed]) as mock_check, \
mock.patch("urllib.request.urlopen") as mock_urlopen, \
mock.patch("surge.reports.sleep") as mock_sleep:
mock_urlopen.return_value.__enter__.return_value = io.BytesIO(
_gzipped(payload))
Report.save_report("proj-123", "export_csv", filepath=sink)
mock_request.assert_called_once()
assert mock_check.call_count == 2
for call in mock_check.call_args_list:
assert call.args[:2] == ("proj-123", "job-abc")
assert mock_sleep.call_count == 2
assert sink.getvalue() == payload


def test_save_report_switches_job_id_on_retrying():
"""RETRYING responses include a new job_id; subsequent polls use it."""
payload = b'[]'
creating = Report(status="CREATING", job_id="job-abc")
retrying = Report(status="RETRYING", job_id="job-xyz")
in_progress = Report(status="IN_PROGRESS")
completed = Report(status="COMPLETED",
url="https://signed.example/report.gz")
sink = io.BytesIO()
with mock.patch.object(Report, "request", return_value=creating), \
mock.patch.object(Report, "check_status",
side_effect=[retrying, in_progress, completed]) as mock_check, \
mock.patch("urllib.request.urlopen") as mock_urlopen, \
mock.patch("surge.reports.sleep"):
mock_urlopen.return_value.__enter__.return_value = io.BytesIO(
_gzipped(payload))
Report.save_report("proj-123", "export_json", filepath=sink)
job_ids = [call.args[1] for call in mock_check.call_args_list]
assert job_ids == ["job-abc", "job-xyz", "job-xyz"]


def test_save_report_raises_on_unexpected_status():
creating = Report(status="CREATING", job_id="job-abc")
error = Report(status="ERROR", type="Report generation error")
with mock.patch.object(Report, "request", return_value=creating), \
mock.patch.object(Report, "check_status", return_value=error), \
mock.patch("surge.reports.sleep"):
with pytest.raises(ValueError, match="ERROR"):
Report.save_report("proj-123",
"export_json",
filepath=io.BytesIO())


def test_save_report_times_out_when_job_never_completes():
creating = Report(status="CREATING", job_id="job-abc")
in_progress = Report(status="IN_PROGRESS", job_id="job-abc")
with mock.patch.object(Report, "request", return_value=creating), \
mock.patch.object(Report, "check_status",
return_value=in_progress), \
mock.patch("surge.reports.sleep"), \
mock.patch("surge.reports.monotonic",
side_effect=[0.0, 1000.0, 2000.0]):
with pytest.raises(Exception, match="within 300 seconds"):
Report.save_report("proj-123",
"export_json",
filepath=io.BytesIO())
Loading