Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 96 additions & 39 deletions capycli/common/github_support.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -------------------------------------------------------------------------------
# Copyright (c) 2025 Siemens
# Copyright (c) 2026 Siemens
# All Rights Reserved.
# Author: thomas.graf@siemens.com
# Author: thomas.graf@siemens.com, marvin.fette@siemens.com
#
# SPDX-License-Identifier: MIT
# -------------------------------------------------------------------------------
Expand All @@ -24,6 +24,10 @@

class GitHubSupport:
"""Support methods for accessing GitHub"""
default_wait_time = 60 # seconds
default_gh_api_timeout = 15 # seconds
last_request_error = False

def __init__(self) -> None:
self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$")

Expand All @@ -32,58 +36,110 @@ def github_request(url: str, username: str = "", token: str = "",
return_response: bool = False,
allow_redirects: bool = True, # default in requests
) -> Any:
"""Helper method to perform GitHub API requests"""

api = True if url.startswith("https://api.github.com/") else False

try:
headers = {}
if token:
headers["Authorization"] = "token " + token
if username:
headers["Username"] = username
response = requests.get(url, headers=headers,
allow_redirects=allow_redirects)
if response.status_code == 429 \
or 'rate limit exceeded' in response.reason \
or 'API rate limit exceeded' in response.json().get('message', ''):
response = requests.get(url, headers=GitHubSupport._gh_request_headers(
token, username, api=api
),
allow_redirects=allow_redirects,
timeout=GitHubSupport.default_gh_api_timeout)

# Check for rate limit errors (403 Forbidden or 429 Too Many Requests)
if GitHubSupport._blocked_by_ratelimit(response):
wait_time = GitHubSupport._calculate_ratelimit_wait_time(response)
print(
Fore.LIGHTYELLOW_EX +
" Github API rate limit exceeded - wait 60s and retry ... " +
f" Github API rate limit exceeded - wait {wait_time}s and retry ... " +
Style.RESET_ALL)
time.sleep(60)
return GitHubSupport.github_request(url, username, token, return_response=return_response)
if response.json().get('message', '').startswith("Bad credentials"):
time.sleep(wait_time)
return GitHubSupport.github_request(
url, username, token, return_response=return_response,
allow_redirects=allow_redirects)

# Check for credential issues
if GitHubSupport._credential_issue(response):
print_red("Invalid GitHub credential provided - aborting!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)

except AttributeError as err:
# response.json() did not return a dictionary
if hasattr(err, 'name'):
name = err.name
else: # Python prior to 3.10
name = err.args[0].split("'")[3]
if not name == 'get':
raise
# Check for not found (404)
if response.status_code == 404:
print(
Fore.LIGHTYELLOW_EX +
f" Resource not found at {url} - could be intended or not ... " +
Style.RESET_ALL)
return response if return_response else response.json()

# Raise an exception for other HTTP errors
response.raise_for_status()

except requests.exceptions.JSONDecodeError:
response._content = b'{}'
except requests.exceptions.RequestException as ex:
if GitHubSupport.last_request_error:
print_red(
f" Persistent issues accessing {url} " + repr(ex) +
"\n Aborting after retried once!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)

GitHubSupport.last_request_error = True

except requests.exceptions.ConnectionError as ex:
print(
Fore.LIGHTYELLOW_EX +
f" Connection issues accessing {url} " + repr(ex) +
"\n Retrying in 60 seconds!" +
"\n Retrying using the same url." +
Style.RESET_ALL)
time.sleep(60)
return GitHubSupport.github_request(url, username, token, return_response=return_response)
return GitHubSupport.github_request(url, username, token,
return_response=return_response, allow_redirects=allow_redirects)

except Exception as ex:
print(
Fore.LIGHTYELLOW_EX +
" Error accessing GitHub: " + repr(ex) +
Style.RESET_ALL)
response = requests.Response()
response._content = \
b'{' + f'"exception": "{repr(ex)}"'.encode() + b'}'
# Reset the error flag on success or after handling exceptions
GitHubSupport.last_request_error = False
return response if return_response else response.json()

@staticmethod
def _gh_request_headers(token: str = "", username: str = "", api: bool = False) -> dict:
"""Helper method to construct headers for GitHub API requests."""
headers = {}
if api:
headers["Accept"] = "application/vnd.github+json"
if token:
headers["Authorization"] = "token " + token
if username:
headers["Username"] = username
return headers

@staticmethod
def _blocked_by_ratelimit(response: requests.Response) -> bool:
"""Check if the GitHub API response indicates that the rate limit has been exceeded."""
if not response.status_code == 403 and not response.status_code == 429:
return False
if 'x-ratelimit-remaining' in response.headers:
remaining = int(response.headers['x-ratelimit-remaining'])
return remaining == 0
return False

@staticmethod
def _calculate_ratelimit_wait_time(response: requests.Response) -> int:
"""Calculate the wait time until the GitHub API rate limit resets."""
if 'x-ratelimit-reset' in response.headers:
reset_time = int(response.headers['x-ratelimit-reset'])
current_time = int(time.time())
wait_time = reset_time - current_time
return max(wait_time, 0)
if 'retry-after' in response.headers:
return int(response.headers['retry-after'])
return GitHubSupport.default_wait_time

@staticmethod
def _credential_issue(response: requests.Response) -> bool:
"""Check if the response indicates a credential issue."""
if response.status_code == 401:
return True
if not response.ok:
message = response.json().get('message', '')
return "bad credentials" in message.lower()
return False

@staticmethod
def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any:
"""Query for GitHub repositories"""
Expand All @@ -95,7 +151,8 @@ def get_repositories(name: str, language: str, username: str = "", token: str =
def get_repo_name(github_url: str) -> str:
"""Extract the GitHub repo name from the specified URL."""
git = "github.com/"
url = github_url.replace(".git", "").replace("#readme", "")[github_url.find(git) + len(git):]
url = github_url.replace(".git", "").replace(
"#readme", "")[github_url.find(git) + len(git):]
split = url.split("/")

if len(split) > 0:
Expand Down
55 changes: 32 additions & 23 deletions tests/test_find_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,11 +840,14 @@ def test_github_request_unauthenticated(self) -> None:
'allow_redirects': True,
}
res = out.github_request(**kwargs) # type: ignore
self.assertEqual(res.json(), {})
# unauthenticated /user -> 401
# Response contains HTML, not JSON - verify it's a valid response
self.assertEqual(res.status_code, 200)
self.assertIn('<!DOCTYPE html>', res.text)
# unauthenticated /user -> 401 triggers credential issue and sys.exit
kwargs['url'] = 'https://api.github.com/user'
res = out.github_request(**kwargs) # type: ignore
self.assertEqual(res.status_code, 401)
with patch('sys.exit', side_effect=SystemExit(69)):
with self.assertRaises(SystemExit):
out.github_request(**kwargs) # type: ignore
# unauthenticated /repos/sw360/capycli
kwargs['url'] = 'https://api.github.com/repos/sw360/capycli'
res = out.github_request(**kwargs) # type: ignore
Expand All @@ -861,38 +864,40 @@ def test_github_request_unauthenticated(self) -> None:
kwargs['url'] = 'https://api.github.com/repos/sw360/capycli/tarball/refs/tags/v2.5.1'
res = out.github_request(**kwargs) # type: ignore
self.assertTrue(300 < res.status_code < 400)
# usually unwise combination of settings
# usually unwise combination of settings - redirect with return_response=False
# causes JSONDecodeError since redirect has no JSON body
kwargs['return_response'] = False
res = out.github_request(**kwargs) # type: ignore
# recursive call
with self.assertRaises(requests.exceptions.JSONDecodeError):
out.github_request(**kwargs) # type: ignore
# recursive call - test credential issue with bad credentials message
kwargs['username'] = 'UnitTest'
kwargs['token'] = 'SomeToken'
response_sequence = (
(429, '{}'),
(200, json.dumps({'message': "API rate limit exceeded"})),
(200, json.dumps({'message': "Bad credentials! Those are very very bad credentials!"})),
)
for resp in response_sequence:
responses.get(kwargs['url'], status=resp[0], body=resp[1])
kwargs['return_response'] = True # Reset to avoid JSONDecodeError
# Use 401 status to trigger _credential_issue check
responses.get(kwargs['url'], status=401, body=json.dumps({'message': "Bad credentials"}))

with patch('sys.exit') as ultimate_failure:
with patch('time.sleep'):
res = out.github_request(**kwargs) # type: ignore
ultimate_failure.assert_called_once()
with patch('sys.exit', side_effect=SystemExit(69)) as ultimate_failure:
with self.assertRaises(SystemExit):
out.github_request(**kwargs) # type: ignore
ultimate_failure.assert_called_once()

# get a list response
kwargs['username'] = ''
kwargs['token'] = ''
kwargs['return_response'] = False # Reset to get JSON data instead of Response
kwargs['allow_redirects'] = True # Reset allow_redirects
responses.get(kwargs['url'], status=200, body='[{"name": "foo"}]')
res = out.github_request(**kwargs) # type: ignore
self.assertEqual(res, [{'name': 'foo'}])
# raise connection error for coverage
# raise connection error for coverage - tests retry on ConnectionError
# then generic Exception which bubbles up
with patch('requests.get', side_effect=[
requests.exceptions.ConnectionError('unittest'),
Exception('last except block'),
]) as conn_err:
with patch('time.sleep'):
res = out.github_request(**kwargs) # type: ignore
with self.assertRaises(Exception):
out.github_request(**kwargs) # type: ignore
self.assertEqual(conn_err.call_count, 2)

@responses.activate
Expand All @@ -907,7 +912,9 @@ def test_github_request_authenticated(self) -> None:
'allow_redirects': True,
}
res = out.github_request(**kwargs) # type: ignore
self.assertEqual(res.json(), {})
# Response contains HTML, not JSON - verify it's a valid response
self.assertEqual(res.status_code, 200)
self.assertIn('<!DOCTYPE html>', res.text)
# authenticated /user -> 200
kwargs['url'] = 'https://api.github.com/user'
res = out.github_request(**kwargs) # type: ignore
Expand All @@ -929,9 +936,11 @@ def test_github_request_authenticated(self) -> None:
kwargs['url'] = 'https://api.github.com/repos/sw360/capycli/tarball/refs/tags/v2.5.1'
res = out.github_request(**kwargs) # type: ignore
self.assertTrue(300 < res.status_code < 400)
# usually unwise combination of settings
# usually unwise combination of settings - redirect with return_response=False
# causes JSONDecodeError since redirect has no JSON body, so we expect an exception
kwargs['return_response'] = False
res = out.github_request(**kwargs) # type: ignore
with self.assertRaises(requests.exceptions.JSONDecodeError):
res = out.github_request(**kwargs) # type: ignore

def test_is_github_repo(self) -> None:
actual = FindSources.is_github_repo("https://github.com/python-attrs/attrs")
Expand Down
93 changes: 91 additions & 2 deletions tests/test_github_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,104 @@
# SPDX-License-Identifier: MIT
# -------------------------------------------------------------------------------

from unittest.mock import MagicMock

from capycli.common.github_support import GitHubSupport
from tests.test_base import TestBase


class GitHubSupportHtml(TestBase):
class GitHubSupportTest(TestBase):
"""Test class for GitHubSupport methods"""
INPUTFILE1 = "sbom.siemens.capycli.json"
OUTPUTFILE = "test.html"

def test_init(self) -> None:
"""Test GitHubSupport initialization"""
gh_support = GitHubSupport()
self.assertIsNotNone(gh_support.github_project_name_regex)
# Test regex pattern matching
self.assertIsNotNone(gh_support.github_project_name_regex.match("owner/repo"))
self.assertIsNotNone(gh_support.github_project_name_regex.match("sw360/capycli"))
self.assertIsNone(gh_support.github_project_name_regex.match("invalid url with spaces"))

def test_gh_request_headers_no_credentials(self) -> None:
"""Test header construction without credentials"""
# Without api=True, no Accept header is added
headers = GitHubSupport._gh_request_headers()
self.assertEqual(headers, {})
# With api=True, Accept header is added
headers = GitHubSupport._gh_request_headers(api=True)
self.assertEqual(headers, {"Accept": "application/vnd.github+json"})

def test_gh_request_headers_with_token(self) -> None:
"""Test header construction with token"""
headers = GitHubSupport._gh_request_headers(token="test_token", api=True)
self.assertEqual(headers["Accept"], "application/vnd.github+json")
self.assertEqual(headers["Authorization"], "token test_token")
self.assertNotIn("Username", headers)

def test_gh_request_headers_with_username_and_token(self) -> None:
"""Test header construction with username and token"""
headers = GitHubSupport._gh_request_headers(token="test_token", username="test_user", api=True)
self.assertEqual(headers["Accept"], "application/vnd.github+json")
self.assertEqual(headers["Authorization"], "token test_token")
self.assertEqual(headers["Username"], "test_user")

def test_blocked_by_ratelimit_not_blocked(self) -> None:
"""Test rate limit check when not blocked"""
response = MagicMock()
response.status_code = 200
response.headers = {}
self.assertFalse(GitHubSupport._blocked_by_ratelimit(response))

def test_blocked_by_ratelimit_403_with_remaining(self) -> None:
"""Test rate limit check when blocked with 403"""
response = MagicMock()
response.status_code = 403
response.headers = {'x-ratelimit-remaining': '0'}
self.assertTrue(GitHubSupport._blocked_by_ratelimit(response))

def test_blocked_by_ratelimit_429_with_remaining(self) -> None:
"""Test rate limit check when blocked with 429"""
response = MagicMock()
response.status_code = 429
response.headers = {'x-ratelimit-remaining': '0'}
self.assertTrue(GitHubSupport._blocked_by_ratelimit(response))

def test_blocked_by_ratelimit_403_with_remaining_nonzero(self) -> None:
"""Test rate limit check when 403 but remaining > 0"""
response = MagicMock()
response.status_code = 403
response.headers = {'x-ratelimit-remaining': '10'}
self.assertFalse(GitHubSupport._blocked_by_ratelimit(response))

def test_calculate_ratelimit_wait_time_with_retry_after(self) -> None:
"""Test wait time calculation with retry-after header"""
response = MagicMock()
response.headers = {'retry-after': '30'}
wait_time = GitHubSupport._calculate_ratelimit_wait_time(response)
self.assertEqual(wait_time, 30)

def test_calculate_ratelimit_wait_time_default(self) -> None:
"""Test wait time calculation with no headers"""
response = MagicMock()
response.headers = {}
wait_time = GitHubSupport._calculate_ratelimit_wait_time(response)
self.assertEqual(wait_time, GitHubSupport.default_wait_time)

def test_credential_issue_bad_credentials(self) -> None:
"""Test credential issue detection with bad credentials"""
response = MagicMock()
response.ok = False
response.json.return_value = {"message": "Bad credentials"}
self.assertTrue(GitHubSupport._credential_issue(response))

def test_credential_issue_no_issue(self) -> None:
"""Test credential issue detection with valid response"""
response = MagicMock()
response.ok = True
self.assertFalse(GitHubSupport._credential_issue(response))

def test_get_repositories(self) -> None:
actual = GitHubSupport.get_repositories("capycli", "python")
self.assertIsNotNone(actual, "GitHub request failed")
Expand Down Expand Up @@ -79,5 +168,5 @@ def test_get_repository_info(self) -> None:


if __name__ == "__main__":
APP = GitHubSupportHtml()
APP = GitHubSupportTest()
APP.test_get_repository_info()
Loading