diff --git a/capycli/common/github_support.py b/capycli/common/github_support.py index 48f49a4..591df2a 100644 --- a/capycli/common/github_support.py +++ b/capycli/common/github_support.py @@ -1,7 +1,7 @@ # ------------------------------------------------------------------------------- -# Copyright (c) 2025 Siemens +# Copyright (c) 2026 Siemens # All Rights Reserved. -# Author: thomas.graf@siemens.com +# Author: thomas.graf@siemens.com, marvin.fette@siemens.com # # SPDX-License-Identifier: MIT # ------------------------------------------------------------------------------- @@ -24,6 +24,10 @@ class GitHubSupport: """Support methods for accessing GitHub""" + default_wait_time = 60 # seconds + default_gh_api_timeout = 15 # seconds + last_request_error = False + def __init__(self) -> None: self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$") @@ -32,58 +36,110 @@ def github_request(url: str, username: str = "", token: str = "", return_response: bool = False, allow_redirects: bool = True, # default in requests ) -> Any: + """Helper method to perform GitHub API requests""" + + api = True if url.startswith("https://api.github.com/") else False + try: - headers = {} - if token: - headers["Authorization"] = "token " + token - if username: - headers["Username"] = username - response = requests.get(url, headers=headers, - allow_redirects=allow_redirects) - if response.status_code == 429 \ - or 'rate limit exceeded' in response.reason \ - or 'API rate limit exceeded' in response.json().get('message', ''): + response = requests.get(url, headers=GitHubSupport._gh_request_headers( + token, username, api=api + ), + allow_redirects=allow_redirects, + timeout=GitHubSupport.default_gh_api_timeout) + + # Check for rate limit errors (403 Forbidden or 429 Too Many Requests) + if GitHubSupport._blocked_by_ratelimit(response): + wait_time = GitHubSupport._calculate_ratelimit_wait_time(response) print( Fore.LIGHTYELLOW_EX + - " Github API rate limit exceeded - wait 60s and retry ... " + + f" Github API rate limit exceeded - wait {wait_time}s and retry ... " + Style.RESET_ALL) - time.sleep(60) - return GitHubSupport.github_request(url, username, token, return_response=return_response) - if response.json().get('message', '').startswith("Bad credentials"): + time.sleep(wait_time) + return GitHubSupport.github_request( + url, username, token, return_response=return_response, + allow_redirects=allow_redirects) + + # Check for credential issues + if GitHubSupport._credential_issue(response): print_red("Invalid GitHub credential provided - aborting!") sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE) - except AttributeError as err: - # response.json() did not return a dictionary - if hasattr(err, 'name'): - name = err.name - else: # Python prior to 3.10 - name = err.args[0].split("'")[3] - if not name == 'get': - raise + # Check for not found (404) + if response.status_code == 404: + print( + Fore.LIGHTYELLOW_EX + + f" Resource not found at {url} - could be intended or not ... " + + Style.RESET_ALL) + return response if return_response else response.json() + + # Raise an exception for other HTTP errors + response.raise_for_status() - except requests.exceptions.JSONDecodeError: - response._content = b'{}' + except requests.exceptions.RequestException as ex: + if GitHubSupport.last_request_error: + print_red( + f" Persistent issues accessing {url} " + repr(ex) + + "\n Aborting after retried once!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE) + + GitHubSupport.last_request_error = True - except requests.exceptions.ConnectionError as ex: print( Fore.LIGHTYELLOW_EX + f" Connection issues accessing {url} " + repr(ex) + - "\n Retrying in 60 seconds!" + + "\n Retrying using the same url." + Style.RESET_ALL) - time.sleep(60) - return GitHubSupport.github_request(url, username, token, return_response=return_response) + return GitHubSupport.github_request(url, username, token, + return_response=return_response, allow_redirects=allow_redirects) - except Exception as ex: - print( - Fore.LIGHTYELLOW_EX + - " Error accessing GitHub: " + repr(ex) + - Style.RESET_ALL) - response = requests.Response() - response._content = \ - b'{' + f'"exception": "{repr(ex)}"'.encode() + b'}' + # Reset the error flag on success or after handling exceptions + GitHubSupport.last_request_error = False return response if return_response else response.json() + @staticmethod + def _gh_request_headers(token: str = "", username: str = "", api: bool = False) -> dict: + """Helper method to construct headers for GitHub API requests.""" + headers = {} + if api: + headers["Accept"] = "application/vnd.github+json" + if token: + headers["Authorization"] = "token " + token + if username: + headers["Username"] = username + return headers + + @staticmethod + def _blocked_by_ratelimit(response: requests.Response) -> bool: + """Check if the GitHub API response indicates that the rate limit has been exceeded.""" + if not response.status_code == 403 and not response.status_code == 429: + return False + if 'x-ratelimit-remaining' in response.headers: + remaining = int(response.headers['x-ratelimit-remaining']) + return remaining == 0 + return False + + @staticmethod + def _calculate_ratelimit_wait_time(response: requests.Response) -> int: + """Calculate the wait time until the GitHub API rate limit resets.""" + if 'x-ratelimit-reset' in response.headers: + reset_time = int(response.headers['x-ratelimit-reset']) + current_time = int(time.time()) + wait_time = reset_time - current_time + return max(wait_time, 0) + if 'retry-after' in response.headers: + return int(response.headers['retry-after']) + return GitHubSupport.default_wait_time + + @staticmethod + def _credential_issue(response: requests.Response) -> bool: + """Check if the response indicates a credential issue.""" + if response.status_code == 401: + return True + if not response.ok: + message = response.json().get('message', '') + return "bad credentials" in message.lower() + return False + @staticmethod def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any: """Query for GitHub repositories""" @@ -95,7 +151,8 @@ def get_repositories(name: str, language: str, username: str = "", token: str = def get_repo_name(github_url: str) -> str: """Extract the GitHub repo name from the specified URL.""" git = "github.com/" - url = github_url.replace(".git", "").replace("#readme", "")[github_url.find(git) + len(git):] + url = github_url.replace(".git", "").replace( + "#readme", "")[github_url.find(git) + len(git):] split = url.split("/") if len(split) > 0: diff --git a/tests/test_find_sources.py b/tests/test_find_sources.py index a15106d..5366c79 100644 --- a/tests/test_find_sources.py +++ b/tests/test_find_sources.py @@ -840,11 +840,14 @@ def test_github_request_unauthenticated(self) -> None: 'allow_redirects': True, } res = out.github_request(**kwargs) # type: ignore - self.assertEqual(res.json(), {}) - # unauthenticated /user -> 401 + # Response contains HTML, not JSON - verify it's a valid response + self.assertEqual(res.status_code, 200) + self.assertIn('', res.text) + # unauthenticated /user -> 401 triggers credential issue and sys.exit kwargs['url'] = 'https://api.github.com/user' - res = out.github_request(**kwargs) # type: ignore - self.assertEqual(res.status_code, 401) + with patch('sys.exit', side_effect=SystemExit(69)): + with self.assertRaises(SystemExit): + out.github_request(**kwargs) # type: ignore # unauthenticated /repos/sw360/capycli kwargs['url'] = 'https://api.github.com/repos/sw360/capycli' res = out.github_request(**kwargs) # type: ignore @@ -861,38 +864,40 @@ def test_github_request_unauthenticated(self) -> None: kwargs['url'] = 'https://api.github.com/repos/sw360/capycli/tarball/refs/tags/v2.5.1' res = out.github_request(**kwargs) # type: ignore self.assertTrue(300 < res.status_code < 400) - # usually unwise combination of settings + # usually unwise combination of settings - redirect with return_response=False + # causes JSONDecodeError since redirect has no JSON body kwargs['return_response'] = False - res = out.github_request(**kwargs) # type: ignore - # recursive call + with self.assertRaises(requests.exceptions.JSONDecodeError): + out.github_request(**kwargs) # type: ignore + # recursive call - test credential issue with bad credentials message kwargs['username'] = 'UnitTest' kwargs['token'] = 'SomeToken' - response_sequence = ( - (429, '{}'), - (200, json.dumps({'message': "API rate limit exceeded"})), - (200, json.dumps({'message': "Bad credentials! Those are very very bad credentials!"})), - ) - for resp in response_sequence: - responses.get(kwargs['url'], status=resp[0], body=resp[1]) + kwargs['return_response'] = True # Reset to avoid JSONDecodeError + # Use 401 status to trigger _credential_issue check + responses.get(kwargs['url'], status=401, body=json.dumps({'message': "Bad credentials"})) - with patch('sys.exit') as ultimate_failure: - with patch('time.sleep'): - res = out.github_request(**kwargs) # type: ignore - ultimate_failure.assert_called_once() + with patch('sys.exit', side_effect=SystemExit(69)) as ultimate_failure: + with self.assertRaises(SystemExit): + out.github_request(**kwargs) # type: ignore + ultimate_failure.assert_called_once() # get a list response kwargs['username'] = '' kwargs['token'] = '' + kwargs['return_response'] = False # Reset to get JSON data instead of Response + kwargs['allow_redirects'] = True # Reset allow_redirects responses.get(kwargs['url'], status=200, body='[{"name": "foo"}]') res = out.github_request(**kwargs) # type: ignore self.assertEqual(res, [{'name': 'foo'}]) - # raise connection error for coverage + # raise connection error for coverage - tests retry on ConnectionError + # then generic Exception which bubbles up with patch('requests.get', side_effect=[ requests.exceptions.ConnectionError('unittest'), Exception('last except block'), ]) as conn_err: with patch('time.sleep'): - res = out.github_request(**kwargs) # type: ignore + with self.assertRaises(Exception): + out.github_request(**kwargs) # type: ignore self.assertEqual(conn_err.call_count, 2) @responses.activate @@ -907,7 +912,9 @@ def test_github_request_authenticated(self) -> None: 'allow_redirects': True, } res = out.github_request(**kwargs) # type: ignore - self.assertEqual(res.json(), {}) + # Response contains HTML, not JSON - verify it's a valid response + self.assertEqual(res.status_code, 200) + self.assertIn('', res.text) # authenticated /user -> 200 kwargs['url'] = 'https://api.github.com/user' res = out.github_request(**kwargs) # type: ignore @@ -929,9 +936,11 @@ def test_github_request_authenticated(self) -> None: kwargs['url'] = 'https://api.github.com/repos/sw360/capycli/tarball/refs/tags/v2.5.1' res = out.github_request(**kwargs) # type: ignore self.assertTrue(300 < res.status_code < 400) - # usually unwise combination of settings + # usually unwise combination of settings - redirect with return_response=False + # causes JSONDecodeError since redirect has no JSON body, so we expect an exception kwargs['return_response'] = False - res = out.github_request(**kwargs) # type: ignore + with self.assertRaises(requests.exceptions.JSONDecodeError): + res = out.github_request(**kwargs) # type: ignore def test_is_github_repo(self) -> None: actual = FindSources.is_github_repo("https://github.com/python-attrs/attrs") diff --git a/tests/test_github_support.py b/tests/test_github_support.py index 4fc19f2..1520f0b 100644 --- a/tests/test_github_support.py +++ b/tests/test_github_support.py @@ -6,15 +6,104 @@ # SPDX-License-Identifier: MIT # ------------------------------------------------------------------------------- +from unittest.mock import MagicMock + from capycli.common.github_support import GitHubSupport from tests.test_base import TestBase -class GitHubSupportHtml(TestBase): +class GitHubSupportTest(TestBase): """Test class for GitHubSupport methods""" INPUTFILE1 = "sbom.siemens.capycli.json" OUTPUTFILE = "test.html" + def test_init(self) -> None: + """Test GitHubSupport initialization""" + gh_support = GitHubSupport() + self.assertIsNotNone(gh_support.github_project_name_regex) + # Test regex pattern matching + self.assertIsNotNone(gh_support.github_project_name_regex.match("owner/repo")) + self.assertIsNotNone(gh_support.github_project_name_regex.match("sw360/capycli")) + self.assertIsNone(gh_support.github_project_name_regex.match("invalid url with spaces")) + + def test_gh_request_headers_no_credentials(self) -> None: + """Test header construction without credentials""" + # Without api=True, no Accept header is added + headers = GitHubSupport._gh_request_headers() + self.assertEqual(headers, {}) + # With api=True, Accept header is added + headers = GitHubSupport._gh_request_headers(api=True) + self.assertEqual(headers, {"Accept": "application/vnd.github+json"}) + + def test_gh_request_headers_with_token(self) -> None: + """Test header construction with token""" + headers = GitHubSupport._gh_request_headers(token="test_token", api=True) + self.assertEqual(headers["Accept"], "application/vnd.github+json") + self.assertEqual(headers["Authorization"], "token test_token") + self.assertNotIn("Username", headers) + + def test_gh_request_headers_with_username_and_token(self) -> None: + """Test header construction with username and token""" + headers = GitHubSupport._gh_request_headers(token="test_token", username="test_user", api=True) + self.assertEqual(headers["Accept"], "application/vnd.github+json") + self.assertEqual(headers["Authorization"], "token test_token") + self.assertEqual(headers["Username"], "test_user") + + def test_blocked_by_ratelimit_not_blocked(self) -> None: + """Test rate limit check when not blocked""" + response = MagicMock() + response.status_code = 200 + response.headers = {} + self.assertFalse(GitHubSupport._blocked_by_ratelimit(response)) + + def test_blocked_by_ratelimit_403_with_remaining(self) -> None: + """Test rate limit check when blocked with 403""" + response = MagicMock() + response.status_code = 403 + response.headers = {'x-ratelimit-remaining': '0'} + self.assertTrue(GitHubSupport._blocked_by_ratelimit(response)) + + def test_blocked_by_ratelimit_429_with_remaining(self) -> None: + """Test rate limit check when blocked with 429""" + response = MagicMock() + response.status_code = 429 + response.headers = {'x-ratelimit-remaining': '0'} + self.assertTrue(GitHubSupport._blocked_by_ratelimit(response)) + + def test_blocked_by_ratelimit_403_with_remaining_nonzero(self) -> None: + """Test rate limit check when 403 but remaining > 0""" + response = MagicMock() + response.status_code = 403 + response.headers = {'x-ratelimit-remaining': '10'} + self.assertFalse(GitHubSupport._blocked_by_ratelimit(response)) + + def test_calculate_ratelimit_wait_time_with_retry_after(self) -> None: + """Test wait time calculation with retry-after header""" + response = MagicMock() + response.headers = {'retry-after': '30'} + wait_time = GitHubSupport._calculate_ratelimit_wait_time(response) + self.assertEqual(wait_time, 30) + + def test_calculate_ratelimit_wait_time_default(self) -> None: + """Test wait time calculation with no headers""" + response = MagicMock() + response.headers = {} + wait_time = GitHubSupport._calculate_ratelimit_wait_time(response) + self.assertEqual(wait_time, GitHubSupport.default_wait_time) + + def test_credential_issue_bad_credentials(self) -> None: + """Test credential issue detection with bad credentials""" + response = MagicMock() + response.ok = False + response.json.return_value = {"message": "Bad credentials"} + self.assertTrue(GitHubSupport._credential_issue(response)) + + def test_credential_issue_no_issue(self) -> None: + """Test credential issue detection with valid response""" + response = MagicMock() + response.ok = True + self.assertFalse(GitHubSupport._credential_issue(response)) + def test_get_repositories(self) -> None: actual = GitHubSupport.get_repositories("capycli", "python") self.assertIsNotNone(actual, "GitHub request failed") @@ -79,5 +168,5 @@ def test_get_repository_info(self) -> None: if __name__ == "__main__": - APP = GitHubSupportHtml() + APP = GitHubSupportTest() APP.test_get_repository_info()