diff --git a/PRIVACY_POLICY.md b/PRIVACY_POLICY.md index 1936aa0fd..1eedddca2 100644 --- a/PRIVACY_POLICY.md +++ b/PRIVACY_POLICY.md @@ -5,13 +5,14 @@ This will be an optional _opt-in_ feature that is not required to use PythonTA. ## What data will be sent? -When PyTA check a file/directory (by calling `python_ta.check_all`), two types of data may be sent: +When PyTA checks a file/directory (by calling `python_ta.check_all`), two types of data may be sent: - The errors detected by PyTA during the check. - The source files that you ran PyTA on. These forms of data submission are independent and optional. If you use a custom PyTA configuration, this information will be sent alongside either of the above data. +Each upload also includes the PythonTA version and an anonymous client ID used to group opt-in submissions. ## How can I opt in or opt out of this data collection? @@ -21,7 +22,11 @@ The default configuration in the `python_ta` directory is `no` for both options. ## How will the data be anonymised? PyTA will not collect or send identifying information about you or your computer. (_Note_: if you choose to submit source files checked by PyTA, those files may contain identifying information about you.) -PyTA does record a hash of your device's MAC address in order to identify when two runs come from the same device, but this is not used to deanonymize the collected data. +PyTA does not derive its anonymous client ID from hardware identifiers such as your device's MAC address. +Instead, when data is first submitted, PyTA generates a random ID and stores it locally. +Future opt-in submissions include a hash of this random ID, allowing submissions to be grouped without sending the locally stored ID itself. +On Windows this is stored in `%APPDATA%\PythonTA\anonymous_id`; on other platforms it is stored in `~/.python_ta/anonymous_id`. +Deleting this file resets the anonymous ID. ## Who will the data be sent to? @@ -31,4 +36,4 @@ PyTA maintainers and computer science education researchers at the University of ## How will this data be used? This data will be used to better understand how PyTA is used by students for the purpose of making it a better educational tool. -Potential research analyses of collected data include identifying common errors detected by PyTA and identifying errors that persist across multiple PyTA runs. +Potential research analyses of collected data include identifying common errors detected by PyTA and identifying errors that persist across multiple PyTA runs associated with the same anonymous ID. diff --git a/packages/python-ta/.coveragerc b/packages/python-ta/.coveragerc index efec262ac..2ec783606 100644 --- a/packages/python-ta/.coveragerc +++ b/packages/python-ta/.coveragerc @@ -3,7 +3,6 @@ omit = packages/python-ta/src/python_ta/debug/* packages/python-ta/src/python_ta/reporters/templates/* packages/python-ta/src/python_ta/util/* - packages/python-ta/src/python_ta/upload.py packages/python-ta/src/python_ta/utils.py patch = subprocess diff --git a/packages/python-ta/CHANGELOG.md b/packages/python-ta/CHANGELOG.md index 3a2c2443a..f68013f61 100644 --- a/packages/python-ta/CHANGELOG.md +++ b/packages/python-ta/CHANGELOG.md @@ -29,10 +29,13 @@ and adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Fixed bug that allowed users to inject code into the browser template through the E9920 unnecessary f-string checker - Fixed bug that caused user input containing markdown characters to be rendered by the markdown renderer in certain error messages - Fixed memory leak issue that caused the memory usage to increase with each call to `python_ta.check_all()`. +- Fixed opt-in data uploads to use a random anonymous ID instead of a MAC-address-derived hash, close uploaded files reliably, and time out stalled network requests. + Existing opt-in users will receive a new anonymous client ID after upgrading. ### 🔧 Internal changes - Removed old documentation files under `python_ta/reporters/` +- Added tests for the opt-in data upload path. - Added tests for `infinite_loop_checker.py` to improve coverage for the `_name_holds_generator` function and the generator portion of the `_check_constant_loop_cond` function. - Refactored `test_main.py` calls to use click's testing helpers. - The `Z3Visitor`, `Z3Parser`, and `Z3ParseException` classes have been extracted into a _new Python package_, `python-ta-z3`. diff --git a/packages/python-ta/src/python_ta/upload.py b/packages/python-ta/src/python_ta/upload.py index f08d59c9a..c8ec7cb60 100644 --- a/packages/python-ta/src/python_ta/upload.py +++ b/packages/python-ta/src/python_ta/upload.py @@ -2,96 +2,164 @@ import hashlib import json +import os +import sys import uuid -from typing import NamedTuple +from contextlib import ExitStack +from pathlib import Path +from typing import Any, Iterable import requests +UPLOAD_TIMEOUT_SECONDS = 5 +ANONYMOUS_ID_ENV_VAR = "PYTA_ANONYMOUS_ID_FILE" +_cached_local_anonymous_id: tuple[str, str] | None = None -def errors_to_dict(errors: list[NamedTuple]) -> dict[str, list[str]]: + +def errors_to_dict(errors: Iterable[Any]) -> dict[str, list[dict[str, Any]]]: """Convert PyTA errors from MessageSet format to a json format Dictionary.""" error_info = ["msg_id", "msg", "symbol", "module", "category", "line"] - error_types = ["code", "style"] err_as_dict = {} - for msg_set in errors: # This iterates over the (filename, code, style) MessageSets - for error_type in error_types: # This iterates over the code and style attributes - current_type = getattr(msg_set, error_type) # Gets either the code or style dictionary - for key in current_type.keys(): # Iterates over the error id's of caught errors - err_as_dict[key] = [] - info_set = current_type.get(key) - for ( - msg - ) in ( - info_set.messages - ): # Iterates over the messages for each error of the given code - err_as_dict[key].append({k: getattr(msg, k) for k in error_info}) + for msg in _iter_error_messages(errors): + msg_id = getattr(msg, "msg_id", None) + if msg_id is None: + continue + err_as_dict.setdefault(msg_id, []).append( + {field: getattr(msg, field, None) for field in error_info} + ) return err_as_dict def upload_to_server( - errors: list[NamedTuple], paths: list[str], config: dict[str, str], url: str, version: str + errors: Iterable[Any], paths: list[str], config: dict[str, Any], url: str, version: str ) -> None: """Send POST request to server with formatted data.""" - unique_id = get_hashed_id() # Generates a device-specific ID - files = [] - for path in paths: - f = open(path) - files.append(f) - upload = {str(i): f for i, f in enumerate(files)} # requests.post() requires passing a dict - # 'upload' is an empty dict in the case that 'files' is empty + unique_id = get_anonymous_id() errors_dict = errors_to_dict(errors) to_json = {"errors": errors_dict} if config: # 'config' is an empty dictionary if the default was used to_json["cfg"] = config - payload = json.dumps(to_json) + payload = json.dumps(to_json, default=str) + try: - response = requests.post( - url=url, files=upload, data={"id": unique_id, "version": version, "payload": payload} - ) - for f in files: - f.close() + with ExitStack() as stack: + upload = {str(i): stack.enter_context(open(path, "rb")) for i, path in enumerate(paths)} + response = requests.post( + url=url, + files=upload, + data={"id": unique_id, "version": version, "payload": payload}, + timeout=UPLOAD_TIMEOUT_SECONDS, + ) response.raise_for_status() print("[INFO] Upload successful") except requests.HTTPError as e: print("[ERROR] Upload failed") - if e.response.status_code == 400: + status_code = e.response.status_code if e.response is not None else None + if status_code == 400: print( "[ERROR] HTTP Response Status 400: Client-side error, likely due to improper syntax. " "Please report this to your instructor (and attach the code that caused the error)." ) - elif e.response.status_code == 403: + elif status_code == 403: print( "[ERROR] HTTP Response Status 403: Authorization is currently required for submission." ) - elif e.response.status_code == 500: + elif status_code == 500: print( "[ERROR] HTTP Response Status 500: The server ran into a situation it doesn't know how to handle. " ) print( "Please report this to your instructor (and attach the code that caused the error)." ) - elif e.response.status_code == 503: + elif status_code == 503: print( "[ERROR] HTTP Response Status 503: The server is not ready to handle your request. " ) print("It may be down for maintenance.") else: print('[ERROR] Error message: "{}"'.format(e)) - - except requests.ConnectionError as e: + except requests.Timeout: + print("[ERROR] Upload failed") + print("[ERROR] Error message: Connection timed out. The server may be temporarily down.") + except requests.ConnectionError: print("[ERROR] Upload failed") print( - "[ERROR] Error message: Connection timed out. This may be caused by your firewall, or the server may be " + "[ERROR] Error message: Could not connect. This may be caused by your firewall, or the server may be " "temporarily down." ) + except requests.RequestException as e: + print("[ERROR] Upload failed") + print('[ERROR] Error message: "{}"'.format(e)) + except OSError as e: + print("[ERROR] Upload failed") + print(f'[ERROR] Could not read a file selected for upload: "{e}"') -def get_hashed_id() -> str: +def get_anonymous_id() -> str: + """Return an anonymous ID for opt-in data uploads. + + This is a hash of a random local ID so multiple opt-in uploads can be + grouped without deriving an identifier from hardware information. """ - Generates a unique ID by hashing the user's mac-address. + local_anonymous_id = _get_or_create_local_anonymous_id() + return hashlib.sha512(local_anonymous_id.encode("utf-8")).hexdigest() + + +def get_hashed_id() -> str: + """Return the anonymous upload ID. + + This function is kept as a backwards-compatible alias for older code that + imported it directly. """ - mac = str(uuid.uuid1())[24:] - hash_gen = hashlib.sha512() - encoded = mac.encode("utf-8") - hash_gen.update(encoded) - return hash_gen.hexdigest() + return get_anonymous_id() + + +def _get_or_create_local_anonymous_id() -> str: + """Return the random local ID used as input for the anonymous upload ID.""" + global _cached_local_anonymous_id + + anonymous_id_path = _get_anonymous_id_path() + anonymous_id_path_key = str(anonymous_id_path) + if ( + _cached_local_anonymous_id is not None + and _cached_local_anonymous_id[0] == anonymous_id_path_key + ): + return _cached_local_anonymous_id[1] + + try: + anonymous_id = anonymous_id_path.read_text(encoding="utf-8").strip() + uuid.UUID(anonymous_id) + return anonymous_id + except (OSError, ValueError): + anonymous_id = str(uuid.uuid4()) + + try: + anonymous_id_path.parent.mkdir(parents=True, exist_ok=True) + anonymous_id_path.write_text(anonymous_id + "\n", encoding="utf-8") + except OSError: + _cached_local_anonymous_id = (anonymous_id_path_key, anonymous_id) + return anonymous_id + + +def _iter_error_messages(errors: Iterable[Any]) -> Iterable[Any]: + """Yield individual messages from current and legacy reporter upload data.""" + for error_group in errors: + if isinstance(error_group, list): + yield from error_group + elif hasattr(error_group, "code") and hasattr(error_group, "style"): + for error_type in ("code", "style"): + current_type = getattr(error_group, error_type) + for info_set in current_type.values(): + yield from info_set.messages + else: + yield error_group + + +def _get_anonymous_id_path() -> Path: + """Return the local path used to store the anonymous upload ID.""" + if ANONYMOUS_ID_ENV_VAR in os.environ: + return Path(os.environ[ANONYMOUS_ID_ENV_VAR]).expanduser() + + if sys.platform == "win32" and os.environ.get("APPDATA"): + return Path(os.environ["APPDATA"]) / "PythonTA" / "anonymous_id" + return Path.home() / ".python_ta" / "anonymous_id" diff --git a/packages/python-ta/tests/test_custom_checkers/test_invalid_name_checker.py b/packages/python-ta/tests/test_custom_checkers/test_invalid_name_checker.py index 15fb90e30..741a7935d 100644 --- a/packages/python-ta/tests/test_custom_checkers/test_invalid_name_checker.py +++ b/packages/python-ta/tests/test_custom_checkers/test_invalid_name_checker.py @@ -4,6 +4,7 @@ import re import sys import unittest +from unittest.mock import patch import astroid import pylint.testutils @@ -12,6 +13,7 @@ import python_ta from python_ta.checkers.invalid_name_checker import ( InvalidNameChecker, + _parse_name, _to_pascal_case, _to_upper_case_with_underscores, ) @@ -905,6 +907,16 @@ def test_module_name_no_snippet() -> None: class TestNamingConventionHelpers(unittest.TestCase): + def test_parse_name_returns_empty_result_when_name_is_not_parsed(self) -> None: + """Test that parsing handles unexpected match failures defensively.""" + with patch("python_ta.checkers.invalid_name_checker.re.match", return_value=None): + self.assertEqual(_parse_name("snake_case"), ("", None, "")) + + def test_converters_return_none_for_names_starting_with_digit(self) -> None: + """Test that name conversion fails when a name starts with a digit.""" + self.assertIsNone(_to_pascal_case("1bad_name")) + self.assertIsNone(_to_upper_case_with_underscores("1bad_name")) + def test_to_pascal_case(self) -> None: """Test that names are correctly converted to PascalCase.""" self.assertEqual(_to_pascal_case("snake_case"), "SnakeCase") diff --git a/packages/python-ta/tests/test_upload.py b/packages/python-ta/tests/test_upload.py new file mode 100644 index 000000000..4881f9530 --- /dev/null +++ b/packages/python-ta/tests/test_upload.py @@ -0,0 +1,416 @@ +import hashlib +import json +import uuid +from types import SimpleNamespace + +import pytest +import requests + +from python_ta.upload import ( + ANONYMOUS_ID_ENV_VAR, + UPLOAD_TIMEOUT_SECONDS, + _get_anonymous_id_path, + errors_to_dict, + get_anonymous_id, + get_hashed_id, + upload_to_server, +) + + +class FakeResponse: + def raise_for_status(self) -> None: + return None + + +def _make_message(msg_id: str = "E0001") -> SimpleNamespace: + return SimpleNamespace( + msg_id=msg_id, + msg="syntax error", + symbol="syntax-error", + module="sample", + category="error", + line=1, + ) + + +def test_get_anonymous_id_is_random_and_stable(monkeypatch, tmp_path) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + anonymous_id = get_anonymous_id() + local_anonymous_id = anonymous_id_file.read_text(encoding="utf-8").strip() + + int(anonymous_id, 16) + assert len(anonymous_id) == 128 + uuid.UUID(local_anonymous_id) + assert get_anonymous_id() == anonymous_id + assert local_anonymous_id != anonymous_id + + +def test_get_anonymous_id_hashes_locally_stored_id(monkeypatch, tmp_path) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + anonymous_id = get_anonymous_id() + local_anonymous_id = anonymous_id_file.read_text(encoding="utf-8").strip() + + assert anonymous_id == hashlib.sha512(local_anonymous_id.encode("utf-8")).hexdigest() + + +def test_get_anonymous_id_replaces_invalid_stored_id(monkeypatch, tmp_path) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + anonymous_id_file.write_text("not-a-uuid\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + anonymous_id = get_anonymous_id() + local_anonymous_id = anonymous_id_file.read_text(encoding="utf-8").strip() + + int(anonymous_id, 16) + uuid.UUID(local_anonymous_id) + assert local_anonymous_id != "not-a-uuid" + + +def test_get_anonymous_id_uses_stable_fallback_when_id_file_is_unwritable(monkeypatch) -> None: + class FakeParent: + def mkdir(self, **_kwargs) -> None: + return None + + class FakePath: + parent = FakeParent() + + def __str__(self) -> str: + return "fake-unwritable-id-path" + + def read_text(self, **_kwargs) -> str: + raise OSError + + def write_text(self, *_args, **_kwargs) -> None: + raise OSError + + monkeypatch.setattr("python_ta.upload._cached_local_anonymous_id", None) + monkeypatch.setattr("python_ta.upload._get_anonymous_id_path", lambda: FakePath()) + + anonymous_id = get_anonymous_id() + + assert get_anonymous_id() == anonymous_id + + +def test_get_hashed_id_aliases_anonymous_id(monkeypatch, tmp_path) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + assert get_hashed_id() == get_anonymous_id() + + +def test_get_anonymous_id_path_uses_environment_override(monkeypatch, tmp_path) -> None: + anonymous_id_file = tmp_path / "custom_id" + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + assert _get_anonymous_id_path() == anonymous_id_file + + +def test_get_anonymous_id_path_uses_appdata_on_windows(monkeypatch, tmp_path) -> None: + monkeypatch.delenv(ANONYMOUS_ID_ENV_VAR, raising=False) + monkeypatch.setenv("APPDATA", str(tmp_path / "AppData" / "Roaming")) + monkeypatch.setattr("python_ta.upload.sys.platform", "win32") + + expected_path = tmp_path / "AppData" / "Roaming" / "PythonTA" / "anonymous_id" + + assert _get_anonymous_id_path() == expected_path + + +def test_get_anonymous_id_path_uses_home_directory_by_default(monkeypatch, tmp_path) -> None: + monkeypatch.delenv(ANONYMOUS_ID_ENV_VAR, raising=False) + monkeypatch.delenv("APPDATA", raising=False) + monkeypatch.setattr("python_ta.upload.sys.platform", "linux") + monkeypatch.setattr("python_ta.upload.Path.home", lambda: tmp_path) + + assert _get_anonymous_id_path() == tmp_path / ".python_ta" / "anonymous_id" + + +def test_errors_to_dict_accepts_current_reporter_messages() -> None: + message = _make_message() + + assert errors_to_dict([[message]]) == { + "E0001": [ + { + "msg_id": "E0001", + "msg": "syntax error", + "symbol": "syntax-error", + "module": "sample", + "category": "error", + "line": 1, + } + ] + } + + +def test_errors_to_dict_accepts_legacy_message_sets() -> None: + message = _make_message("C0301") + message_set = SimpleNamespace( + code={"E0001": SimpleNamespace(messages=[_make_message()])}, + style={"C0301": SimpleNamespace(messages=[message])}, + ) + + assert errors_to_dict([message_set])["C0301"][0]["symbol"] == "syntax-error" + assert errors_to_dict([message_set])["E0001"][0]["symbol"] == "syntax-error" + + +def test_errors_to_dict_accepts_single_messages_and_skips_messages_without_ids() -> None: + message = _make_message() + message_without_id = SimpleNamespace(msg="missing id") + + assert errors_to_dict([message, message_without_id]) == { + "E0001": [ + { + "msg_id": "E0001", + "msg": "syntax error", + "symbol": "syntax-error", + "module": "sample", + "category": "error", + "line": 1, + } + ] + } + + +def test_errors_to_dict_uses_none_for_missing_optional_message_fields() -> None: + message = SimpleNamespace(msg_id="E9999") + + assert errors_to_dict([message]) == { + "E9999": [ + { + "msg_id": "E9999", + "msg": None, + "symbol": None, + "module": None, + "category": None, + "line": None, + } + ] + } + + +def test_upload_to_server_posts_payload_with_timeout(monkeypatch, tmp_path) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + posted = {} + + def fake_post(**kwargs): + posted.update(kwargs) + return FakeResponse() + + monkeypatch.setattr("python_ta.upload.requests.post", fake_post) + + upload_to_server( + errors=[[_make_message()]], + paths=[str(upload_file)], + config={"output-format": "pyta-json"}, + url="https://example.com/upload", + version="1.0.0", + ) + + int(posted["data"]["id"], 16) + assert len(posted["data"]["id"]) == 128 + assert posted["timeout"] == UPLOAD_TIMEOUT_SECONDS + assert posted["files"]["0"].closed + + payload = json.loads(posted["data"]["payload"]) + assert payload["cfg"] == {"output-format": "pyta-json"} + assert payload["errors"]["E0001"][0]["msg"] == "syntax error" + + +def test_upload_to_server_handles_missing_upload_file(monkeypatch, tmp_path, capsys) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + missing_file = tmp_path / "missing.py" + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + upload_to_server( + errors=[], + paths=[str(missing_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert "Could not read a file selected for upload" in capsys.readouterr().out + + +def test_upload_to_server_handles_timeout(monkeypatch, tmp_path, capsys) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + def fake_post(**_kwargs): + raise requests.Timeout + + monkeypatch.setattr("python_ta.upload.requests.post", fake_post) + + upload_to_server( + errors=[], + paths=[str(upload_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert "Connection timed out" in capsys.readouterr().out + + +def test_upload_to_server_handles_forbidden_response(monkeypatch, tmp_path, capsys) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + class ForbiddenResponse: + def raise_for_status(self) -> None: + response = SimpleNamespace(status_code=403) + raise requests.HTTPError(response=response) + + monkeypatch.setattr("python_ta.upload.requests.post", lambda **_kwargs: ForbiddenResponse()) + + upload_to_server( + errors=[], + paths=[str(upload_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert "HTTP Response Status 403" in capsys.readouterr().out + + +@pytest.mark.parametrize( + ("status_code", "expected_message"), + [ + (400, "HTTP Response Status 400"), + (500, "HTTP Response Status 500"), + (503, "HTTP Response Status 503"), + ], +) +def test_upload_to_server_handles_http_error_responses( + monkeypatch, tmp_path, capsys, status_code: int, expected_message: str +) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + class ErrorResponse: + def raise_for_status(self) -> None: + response = SimpleNamespace(status_code=status_code) + raise requests.HTTPError(response=response) + + monkeypatch.setattr("python_ta.upload.requests.post", lambda **_kwargs: ErrorResponse()) + + upload_to_server( + errors=[], + paths=[str(upload_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert expected_message in capsys.readouterr().out + + +def test_upload_to_server_handles_http_error_without_response( + monkeypatch, tmp_path, capsys +) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + class ErrorResponse: + def raise_for_status(self) -> None: + raise requests.HTTPError("server failed") + + monkeypatch.setattr("python_ta.upload.requests.post", lambda **_kwargs: ErrorResponse()) + + upload_to_server( + errors=[], + paths=[str(upload_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert 'Error message: "server failed"' in capsys.readouterr().out + + +def test_upload_to_server_handles_request_exception(monkeypatch, tmp_path, capsys) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + + def fake_post(**_kwargs): + raise requests.RequestException("request failed") + + monkeypatch.setattr("python_ta.upload.requests.post", fake_post) + + upload_to_server( + errors=[], + paths=[str(upload_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert 'Error message: "request failed"' in capsys.readouterr().out + + +def test_upload_to_server_posts_empty_files_and_serializes_config_values( + monkeypatch, tmp_path +) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + posted = {} + + def fake_post(**kwargs): + posted.update(kwargs) + return FakeResponse() + + monkeypatch.setattr("python_ta.upload.requests.post", fake_post) + + upload_to_server( + errors=[], + paths=[], + config={"path": tmp_path}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert posted["files"] == {} + payload = json.loads(posted["data"]["payload"]) + assert payload == {"errors": {}, "cfg": {"path": str(tmp_path)}} + + +def test_upload_to_server_closes_files_when_request_fails(monkeypatch, tmp_path, capsys) -> None: + anonymous_id_file = tmp_path / "anonymous_id" + upload_file = tmp_path / "sample.py" + upload_file.write_text("print('hello')\n", encoding="utf-8") + monkeypatch.setenv(ANONYMOUS_ID_ENV_VAR, str(anonymous_id_file)) + posted = {} + + def fake_post(**kwargs): + posted.update(kwargs) + raise requests.ConnectionError + + monkeypatch.setattr("python_ta.upload.requests.post", fake_post) + + upload_to_server( + errors=[], + paths=[str(upload_file)], + config={}, + url="https://example.com/upload", + version="1.0.0", + ) + + assert posted["files"]["0"].closed + assert "[ERROR] Upload failed" in capsys.readouterr().out