From 974a9af9714e81303c82f8e1c61eb930bf0fa3fc Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Tue, 23 Jun 2026 20:27:57 -0500 Subject: [PATCH 1/2] BED-8693: add exception handling and logging to API calls --- src/openhound_github/resources/enterprise.py | 414 ++--- .../resources/organization.py | 1328 ++++++++++------- tests/test_error_resilience.py | 189 +++ 3 files changed, 1240 insertions(+), 691 deletions(-) create mode 100644 tests/test_error_resilience.py diff --git a/src/openhound_github/resources/enterprise.py b/src/openhound_github/resources/enterprise.py index 09cec31..e53c5c2 100644 --- a/src/openhound_github/resources/enterprise.py +++ b/src/openhound_github/resources/enterprise.py @@ -1,3 +1,4 @@ +import logging from dataclasses import dataclass from dlt.sources.helpers.rest_client.client import RESTClient @@ -28,6 +29,8 @@ EnterpriseUser, ) +logger = logging.getLogger(__name__) + @dataclass class SourceContext: @@ -51,17 +54,24 @@ def enterprise(ctx: SourceContext): "variables": {"slug": ctx.enterprise_name, "after": None}, } - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - page_enterprise = page_data[0].get("enterprise") - - if page_enterprise: - yield page_enterprise + try: + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + page_enterprise = page_data[0].get("enterprise") + + if page_enterprise: + yield page_enterprise + except Exception as e: + logger.error( + f"Error in resource 'enterprise' processing enterprise '{ctx.enterprise_name}': {e}", + extra={"resource": "enterprise", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -89,24 +99,31 @@ def enterprise_members(enterprise_data: Enterprise, ctx: SourceContext): "query": ENTERPRISE_MEMBERS_QUERY, "variables": {"slug": ctx.enterprise_name, "count": 100, "after": None}, } - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - members = es_data.get("members", {}) - for edge in members.get("edges", []): - node = edge.get("node") - if node: - yield { - **node, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } + try: + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + members = es_data.get("members", {}) + for edge in members.get("edges", []): + node = edge.get("node") + if node: + yield { + **node, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_members' processing enterprise '{ctx.enterprise_name}': {e}", + extra={"resource": "enterprise_members", "phase": "resource_iteration"}, + ) + return @app.transformer(name="enterprise_users", columns=EnterpriseUser, parallelized=True) @@ -148,15 +165,22 @@ def enterprise_managed_users(base_user: BaseUser, ctx: SourceContext): @app.transformer(name="enterprise_teams", columns=EnterpriseTeam, parallelized=True) def enterprise_teams(enterprise_data: Enterprise, ctx: SourceContext): - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/teams", params={"per_page": 100} - ): - for team in page: - yield { - **team, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } + try: + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/teams", params={"per_page": 100} + ): + for team in page: + yield { + **team, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_teams' processing enterprise '{enterprise_data.id}': {e}", + extra={"resource": "enterprise_teams", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -177,20 +201,27 @@ def enterprise_team_roles(team: EnterpriseTeam): ) def enterprise_team_members(team: EnterpriseTeam, ctx: SourceContext): - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/memberships", - params={"per_page": 100}, - ): - for member in page: - node_id = member.get("node_id") or member.get("user", {}).get("node_id") - if node_id: - yield { - **member, - "node_id": node_id, - "team_id": team.id, - "enterprise_node_id": team.enterprise_node_id, - "enterprise_slug": team.enterprise_slug, - } + try: + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/memberships", + params={"per_page": 100}, + ): + for member in page: + node_id = member.get("node_id") or member.get("user", {}).get("node_id") + if node_id: + yield { + **member, + "node_id": node_id, + "team_id": team.id, + "enterprise_node_id": team.enterprise_node_id, + "enterprise_slug": team.enterprise_slug, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_team_members' processing team '{team.id}': {e}", + extra={"resource": "enterprise_team_members", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -200,45 +231,59 @@ def enterprise_team_members(team: EnterpriseTeam, ctx: SourceContext): ) def enterprise_team_organizations(team: EnterpriseTeam, ctx: SourceContext): - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/organizations", - params={"per_page": 100}, - ): - for org in page: - node_id = org.get("node_id") or org.get("id") - if node_id: - yield { - **org, - "node_id": node_id, - "team_id": team.id, - "projected_slug": team.slug, - "enterprise_node_id": team.enterprise_node_id, - "enterprise_slug": team.enterprise_slug, - } + try: + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/organizations", + params={"per_page": 100}, + ): + for org in page: + node_id = org.get("node_id") or org.get("id") + if node_id: + yield { + **org, + "node_id": node_id, + "team_id": team.id, + "projected_slug": team.slug, + "enterprise_node_id": team.enterprise_node_id, + "enterprise_slug": team.enterprise_slug, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_team_organizations' processing team '{team.id}': {e}", + extra={"resource": "enterprise_team_organizations", "phase": "resource_iteration"}, + ) + return @app.transformer(name="enterprise_roles", columns=EnterpriseRole, parallelized=True) def enterprise_roles(enterprise_data: Enterprise, ctx: SourceContext): - result = ctx.client.get( - f"/enterprises/{ctx.enterprise_name}/enterprise-roles" - ).json() + try: + result = ctx.client.get( + f"/enterprises/{ctx.enterprise_name}/enterprise-roles" + ).json() + + for role in result.get("roles", []): + yield { + **role, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } - for role in result.get("roles", []): yield { - **role, + "id": "owners", + "name": "owners", + "description": "Enterprise administrators discovered from ownerInfo.admins", + "source": "Default", + "permissions": [], "enterprise_node_id": enterprise_data.id, "enterprise_slug": ctx.enterprise_name, } - - yield { - "id": "owners", - "name": "owners", - "description": "Enterprise administrators discovered from ownerInfo.admins", - "source": "Default", - "permissions": [], - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_roles' processing enterprise '{ctx.enterprise_name}': {e}", + extra={"resource": "enterprise_roles", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -248,18 +293,25 @@ def enterprise_role_teams(role: EnterpriseRole, ctx: SourceContext): if role.id == "owners": return - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/teams", - params={"per_page": 100}, - ): - for team in page: - if team.get("id"): - yield { - **team, - "role_id": role.id, - "enterprise_node_id": role.enterprise_node_id, - "enterprise_slug": role.enterprise_slug, - } + try: + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/teams", + params={"per_page": 100}, + ): + for team in page: + if team.get("id"): + yield { + **team, + "role_id": role.id, + "enterprise_node_id": role.enterprise_node_id, + "enterprise_slug": role.enterprise_slug, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_role_teams' processing role '{role.id}': {e}", + extra={"resource": "enterprise_role_teams", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -269,18 +321,25 @@ def enterprise_role_users(role: EnterpriseRole, ctx: SourceContext): if role.id == "owners": return - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/users", - params={"per_page": 100}, - ): - for user in page: - if user.get("node_id"): - yield { - **user, - "role_id": role.id, - "enterprise_node_id": role.enterprise_node_id, - "enterprise_slug": role.enterprise_slug, - } + try: + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/users", + params={"per_page": 100}, + ): + for user in page: + if user.get("node_id"): + yield { + **user, + "role_id": role.id, + "enterprise_node_id": role.enterprise_node_id, + "enterprise_slug": role.enterprise_slug, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_role_users' processing role '{role.id}': {e}", + extra={"resource": "enterprise_role_users", "phase": "resource_iteration"}, + ) + return @app.transformer(name="enterprise_admins", columns=EnterpriseAdmin, parallelized=True) @@ -296,27 +355,34 @@ def enterprise_admins(enterprise_data: Enterprise, ctx: SourceContext): "query": ENTERPRISE_ADMINS_QUERY, "variables": {"slug": ctx.enterprise_name, "count": 100, "after": None}, } - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - owner_info = es_data.get("ownerInfo") or {} - for edge in (owner_info.get("admins") or {}).get("edges") or []: - node = edge.get("node") - if node and node.get("id"): - yield { - "node_id": node["id"], - "login": node.get("login"), - "assignment": "direct", - "role_id": "owners", - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } + try: + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + owner_info = es_data.get("ownerInfo") or {} + for edge in (owner_info.get("admins") or {}).get("edges") or []: + node = edge.get("node") + if node and node.get("id"): + yield { + "node_id": node["id"], + "login": node.get("login"), + "assignment": "direct", + "role_id": "owners", + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_admins' processing enterprise '{ctx.enterprise_name}': {e}", + extra={"resource": "enterprise_admins", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -335,23 +401,30 @@ def enterprise_saml_provider(enterprise_data: Enterprise, ctx: SourceContext): "variables": {"slug": ctx.enterprise_name, "count": 1, "after": None}, } - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - saml_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") - if not saml_provider: - return - yield { - **{k: v for k, v in saml_provider.items() if k != "externalIdentities"}, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } + try: + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + saml_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") + if not saml_provider: + return + yield { + **{k: v for k, v in saml_provider.items() if k != "externalIdentities"}, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_saml_provider' processing enterprise '{ctx.enterprise_name}': {e}", + extra={"resource": "enterprise_saml_provider", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -374,29 +447,36 @@ def enterprise_external_identities( "variables": {"slug": ctx.enterprise_name, "count": 100, "after": None}, } - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - page_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") - if not page_provider: - return - for identity in (page_provider.get("externalIdentities") or {}).get( - "nodes" - ) or []: - yield { - **identity, - "saml_provider_id": saml_provider.id, - "saml_provider_issuer": saml_provider.issuer, - "saml_provider_sso_url": saml_provider.sso_url, - "enterprise_node_id": saml_provider.enterprise_node_id, - "enterprise_slug": saml_provider.enterprise_slug, - } + try: + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + page_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") + if not page_provider: + return + for identity in (page_provider.get("externalIdentities") or {}).get( + "nodes" + ) or []: + yield { + **identity, + "saml_provider_id": saml_provider.id, + "saml_provider_issuer": saml_provider.issuer, + "saml_provider_sso_url": saml_provider.sso_url, + "enterprise_node_id": saml_provider.enterprise_node_id, + "enterprise_slug": saml_provider.enterprise_slug, + } + except Exception as e: + logger.error( + f"Error in resource 'enterprise_external_identities' processing saml_provider '{saml_provider.id}': {e}", + extra={"resource": "enterprise_external_identities", "phase": "resource_iteration"}, + ) + return def enterprise_resources(ctx: SourceContext): diff --git a/src/openhound_github/resources/organization.py b/src/openhound_github/resources/organization.py index 12996d6..b856ae4 100644 --- a/src/openhound_github/resources/organization.py +++ b/src/openhound_github/resources/organization.py @@ -1,5 +1,6 @@ import base64 import binascii +import logging from collections.abc import Iterable from dataclasses import dataclass, field from datetime import datetime @@ -73,6 +74,8 @@ from openhound_github.models.repo_role_assignment import TEAM_PERMISSION_MAP from openhound_github.models.repository_role import DEFAULT_REPO_ROLES +logger = logging.getLogger(__name__) + @dataclass class OrgContext: @@ -233,26 +236,33 @@ def organizations(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - org_data = client.get(f"/orgs/{org_name}").json() - - actions = _actions_permissions(ctx, client, org_name) - self_hosted_runners = _runner_permissions(ctx, client, org_name) - workflow_perms = _workflow_permissions(ctx, client, org_name) - - org_data["actions_enabled_repositories"] = actions.get("enabled_repositories") - org_data["actions_allowed_actions"] = actions.get("allowed_actions") - org_data["actions_sha_pinning_required"] = actions.get("sha_pinning_required") - org_data["self_hosted_runners_enabled_repositories"] = self_hosted_runners.get( - "enabled_repositories" - ) - org_data["default_workflow_permissions"] = workflow_perms.get( - "default_workflow_permissions" - ) - org_data["can_approve_pull_request_reviews"] = workflow_perms.get( - "can_approve_pull_request_reviews" - ) + try: + org_data = client.get(f"/orgs/{org_name}").json() + + actions = _actions_permissions(ctx, client, org_name) + self_hosted_runners = _runner_permissions(ctx, client, org_name) + workflow_perms = _workflow_permissions(ctx, client, org_name) + + org_data["actions_enabled_repositories"] = actions.get("enabled_repositories") + org_data["actions_allowed_actions"] = actions.get("allowed_actions") + org_data["actions_sha_pinning_required"] = actions.get("sha_pinning_required") + org_data["self_hosted_runners_enabled_repositories"] = self_hosted_runners.get( + "enabled_repositories" + ) + org_data["default_workflow_permissions"] = workflow_perms.get( + "default_workflow_permissions" + ) + org_data["can_approve_pull_request_reviews"] = workflow_perms.get( + "can_approve_pull_request_reviews" + ) - yield org_data + yield org_data + except Exception as e: + logger.error( + f"Error in resource 'organizations' processing organization '{org_name}': {e}", + extra={"resource": "organizations", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="org_roles", columns=OrgRole, parallelized=True) @@ -293,16 +303,23 @@ def org_roles(org: Organization, ctx: SourceContext): } client = _client_for_org(ctx, org.login) - for page in client.paginate( - f"/orgs/{org.login}/organization-roles", params={"per_page": 100} - ): - for role in page: - yield { - **role, - "type": "custom", - "org_node_id": org.node_id, - "org_login": org.login, - } + try: + for page in client.paginate( + f"/orgs/{org.login}/organization-roles", params={"per_page": 100} + ): + for role in page: + yield { + **role, + "type": "custom", + "org_node_id": org.node_id, + "org_login": org.login, + } + except Exception as e: + logger.error( + f"Error in resource 'org_roles' processing organization '{org.login}': {e}", + extra={"resource": "org_roles", "phase": "resource_iteration"}, + ) + return @app.transformer(name="org_role_teams", columns=OrgRoleTeam, parallelized=True) @@ -321,17 +338,24 @@ def org_role_teams(role: OrgRole, ctx: SourceContext): if role.type == "custom": client = _client_for_org(ctx, role.org_login) - for page in client.paginate( - f"/orgs/{role.org_login}/organization-roles/{role.id}/teams" - ): - for team in page: - yield { - "org_role_id": role.id, - "org_role_name": role.name, - "org_node_id": role.org_node_id, - "org_login": role.org_login, - **team, - } + try: + for page in client.paginate( + f"/orgs/{role.org_login}/organization-roles/{role.id}/teams" + ): + for team in page: + yield { + "org_role_id": role.id, + "org_role_name": role.name, + "org_node_id": role.org_node_id, + "org_login": role.org_login, + **team, + } + except Exception as e: + logger.error( + f"Error in resource 'org_role_teams' processing role '{role.id}': {e}", + extra={"resource": "org_role_teams", "phase": "resource_iteration"}, + ) + return @app.transformer(name="org_role_members", columns=OrgRoleMember, parallelized=True) @@ -349,17 +373,24 @@ def org_role_members(role: OrgRole, ctx: SourceContext): """ if role.type == "custom": client = _client_for_org(ctx, role.org_login) - for page in client.paginate( - f"/orgs/{role.org_login}/organization-roles/{role.id}/users" - ): - for user in page: - yield { - **user, - "org_role_name": role.name, - "org_role_id": role.id, - "org_node_id": role.org_node_id, - "org_login": role.org_login, - } + try: + for page in client.paginate( + f"/orgs/{role.org_login}/organization-roles/{role.id}/users" + ): + for user in page: + yield { + **user, + "org_role_name": role.name, + "org_role_id": role.id, + "org_node_id": role.org_node_id, + "org_login": role.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'org_role_members' processing role '{role.id}': {e}", + extra={"resource": "org_role_members", "phase": "resource_iteration"}, + ) + return @app.resource(name="app_installations", columns=AppInstallation, parallelized=True) @@ -375,9 +406,16 @@ def app_installations(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate(f"/orgs/{org_name}/installations"): - for item in page: - yield {**item, "org_login": org_name} + try: + for page in client.paginate(f"/orgs/{org_name}/installations"): + for item in page: + yield {**item, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'app_installations' processing organization '{org_name}': {e}", + extra={"resource": "app_installations", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="applications", columns=App, parallelized=True) @@ -395,10 +433,17 @@ def applications(app_install: AppInstallation, ctx: SourceContext): if app_install.id: client = _client_for_org(ctx, app_install.org_login) app_slug = str(app_slug) - if app_slug not in ctx.app_cache: - with ctx.cache_lock: - if app_slug not in ctx.app_cache: - ctx.app_cache[app_slug] = client.get(f"/apps/{app_slug}").json() + try: + if app_slug not in ctx.app_cache: + with ctx.cache_lock: + if app_slug not in ctx.app_cache: + ctx.app_cache[app_slug] = client.get(f"/apps/{app_slug}").json() + except Exception as e: + logger.error( + f"Error in resource 'applications' processing app '{app_slug}': {e}", + extra={"resource": "applications", "phase": "resource_iteration"}, + ) + return app_data = ctx.app_cache[app_slug] if app_data.get("node_id"): yield {**app_data, "slug": app_slug, "org_login": app_install.org_login} @@ -421,27 +466,34 @@ def users(ctx: SourceContext) -> Iterator[dict[str, Any]]: for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.membersWithRole.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - ) - data = { - "query": MEMBERS_WITH_ROLE_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.membersWithRole.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + ) + data = { + "query": MEMBERS_WITH_ROLE_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for edge in page_data[0]["organization"]["membersWithRole"]["edges"]: - node = edge.get("node", {}) - yield {**node, **edge, "org_login": org_name} + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for edge in page_data[0]["organization"]["membersWithRole"]["edges"]: + node = edge.get("node", {}) + yield {**node, **edge, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'users' processing organization '{org_name}': {e}", + extra={"resource": "users", "phase": "resource_iteration"}, + ) + continue @app.resource(name="teams", columns=Team, parallelized=True) @@ -465,26 +517,33 @@ def teams(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.teams.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - ) - data = { - "query": TEAMS_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - teams_data = page_data[0]["organization"]["teams"] - for team in teams_data["nodes"]: - yield {**team, "org_login": org_name} + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.teams.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + ) + data = { + "query": TEAMS_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + teams_data = page_data[0]["organization"]["teams"] + for team in teams_data["nodes"]: + yield {**team, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'teams' processing organization '{org_name}': {e}", + extra={"resource": "teams", "phase": "resource_iteration"}, + ) + continue @app.resource( @@ -498,12 +557,19 @@ def projected_enterprise_teams(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/teams", params={"per_page": 100} - ): - for team in page: - if str(team.get("slug", "")).startswith("ent:") and team.get("node_id"): - yield {**team, "org_login": org_name} + try: + for page in client.paginate( + f"/orgs/{org_name}/teams", params={"per_page": 100} + ): + for team in page: + if str(team.get("slug", "")).startswith("ent:") and team.get("node_id"): + yield {**team, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'projected_enterprise_teams' processing organization '{org_name}': {e}", + extra={"resource": "projected_enterprise_teams", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="team_roles", columns=TeamRole, parallelized=True) @@ -571,21 +637,28 @@ def team_members(team: Team, ctx: SourceContext): "slug": team.slug, }, } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for member in page_data[0]["organization"]["team"]["members"]["edges"]: - yield { - "team_id": team.id, - "id": member["node"]["id"], - "login": member["node"]["login"], - "role": member["role"], - "org_login": team.org_login, - } + try: + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for member in page_data[0]["organization"]["team"]["members"]["edges"]: + yield { + "team_id": team.id, + "id": member["node"]["id"], + "login": member["node"]["login"], + "role": member["role"], + "org_login": team.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'team_members' processing team '{team.slug}': {e}", + extra={"resource": "team_members", "phase": "resource_iteration"}, + ) + return @app.resource(name="actions_permissions", columns=ActionPermission, parallelized=True) @@ -593,11 +666,18 @@ def actions_permissions(ctx): for org in ctx.organizations: org_name = org.org_name client = org.client - actions = _actions_permissions(ctx, client, org_name) - yield { - **actions, - "org_login": org_name, - } + try: + actions = _actions_permissions(ctx, client, org_name) + yield { + **actions, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'actions_permissions' processing organization '{org_name}': {e}", + extra={"resource": "actions_permissions", "phase": "resource_iteration"}, + ) + continue @app.resource(name="repositories", columns=Repository, parallelized=True) @@ -613,53 +693,60 @@ def repositories(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - actions = _actions_permissions(ctx, client, org_name) - runner_settings = _runner_permissions(ctx, client, org_name) - - enabled_repo_ids: set[str] | None = None - if actions.get("enabled_repositories") == "selected": - enabled_repo_ids = set() - for page in client.paginate( - f"/orgs/{org_name}/actions/permissions/repositories", - params={"per_page": 100}, - data_selector="repositories", - ): - enabled_repo_ids.update( - repo["node_id"] for repo in page if repo.get("node_id") - ) + try: + actions = _actions_permissions(ctx, client, org_name) + runner_settings = _runner_permissions(ctx, client, org_name) + + enabled_repo_ids: set[str] | None = None + if actions.get("enabled_repositories") == "selected": + enabled_repo_ids = set() + for page in client.paginate( + f"/orgs/{org_name}/actions/permissions/repositories", + params={"per_page": 100}, + data_selector="repositories", + ): + enabled_repo_ids.update( + repo["node_id"] for repo in page if repo.get("node_id") + ) + + runner_enabled_repo_ids: set[str] | None = None + if runner_settings.get("enabled_repositories") == "selected": + runner_enabled_repo_ids = set() + for page in client.paginate( + f"/orgs/{org_name}/actions/permissions/self-hosted-runners/repositories", + params={"per_page": 100}, + data_selector="repositories", + ): + runner_enabled_repo_ids.update( + repo["node_id"] for repo in page if repo.get("node_id") + ) - runner_enabled_repo_ids: set[str] | None = None - if runner_settings.get("enabled_repositories") == "selected": - runner_enabled_repo_ids = set() for page in client.paginate( - f"/orgs/{org_name}/actions/permissions/self-hosted-runners/repositories", - params={"per_page": 100}, - data_selector="repositories", + f"/orgs/{org_name}/repos", params={"per_page": 100} ): - runner_enabled_repo_ids.update( - repo["node_id"] for repo in page if repo.get("node_id") - ) - - for page in client.paginate( - f"/orgs/{org_name}/repos", params={"per_page": 100} - ): - for repo in page: - repo_node_id = repo.get("node_id") - actions_enabled = actions.get("enabled_repositories") == "all" or ( - enabled_repo_ids is not None and repo_node_id in enabled_repo_ids - ) - self_hosted_runners_enabled = runner_settings.get( - "enabled_repositories" - ) == "all" or ( - runner_enabled_repo_ids is not None - and repo_node_id in runner_enabled_repo_ids - ) - yield { - **repo, - "actions_enabled": actions_enabled, - "self_hosted_runners_enabled": self_hosted_runners_enabled, - "org_login": org_name, - } + for repo in page: + repo_node_id = repo.get("node_id") + actions_enabled = actions.get("enabled_repositories") == "all" or ( + enabled_repo_ids is not None and repo_node_id in enabled_repo_ids + ) + self_hosted_runners_enabled = runner_settings.get( + "enabled_repositories" + ) == "all" or ( + runner_enabled_repo_ids is not None + and repo_node_id in runner_enabled_repo_ids + ) + yield { + **repo, + "actions_enabled": actions_enabled, + "self_hosted_runners_enabled": self_hosted_runners_enabled, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'repositories' processing organization '{org_name}': {e}", + extra={"resource": "repositories", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -695,23 +782,30 @@ def repo_role_assignments( client = _client_for_org(ctx, repo.org_login) - for collab_page in client.paginate( - f"/repos/{repo.org_login}/{repo_name}/collaborators", - params={"affiliation": "direct", "per_page": 100}, - ): - for collaborator in collab_page: - role = collaborator.get("role_name", "") - custom_role = custom_roles.get(role) - yield { - **collaborator, - "org_login": repo.org_login, - "assignee_type": "user", - "repo_node_id": repo_node_id, - "repo_name": repo_name, - "role_name": role, - "base_role": custom_role.base_role if custom_role else None, - "role_permissions": custom_role.permissions if custom_role else [], - } + try: + for collab_page in client.paginate( + f"/repos/{repo.org_login}/{repo_name}/collaborators", + params={"affiliation": "direct", "per_page": 100}, + ): + for collaborator in collab_page: + role = collaborator.get("role_name", "") + custom_role = custom_roles.get(role) + yield { + **collaborator, + "org_login": repo.org_login, + "assignee_type": "user", + "repo_node_id": repo_node_id, + "repo_name": repo_name, + "role_name": role, + "base_role": custom_role.base_role if custom_role else None, + "role_permissions": custom_role.permissions if custom_role else [], + } + except Exception as e: + logger.error( + f"Error in resource 'repo_role_assignments' processing repository '{repo.full_name}': {e}", + extra={"resource": "repo_role_assignments", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -735,25 +829,32 @@ def team_repo_role_assignments( if role.get("org_login") == team.org_login } client = _client_for_org(ctx, team.org_login) - for repo_page in client.paginate( - f"/orgs/{team.org_login}/teams/{team.slug}/repos", - params={"per_page": 100}, - ): - for repo in repo_page: - role = _repo_permission_role(repo) - custom_role = custom_roles.get(role) - yield { - "id": team.database_id or 0, - "node_id": team.node_id, - "type": "Team", - "assignee_type": "team", - "repo_node_id": repo["node_id"], - "org_login": team.org_login, - "repo_name": repo["name"], - "role_name": role, - "base_role": custom_role.base_role if custom_role else None, - "role_permissions": custom_role.permissions if custom_role else [], - } + try: + for repo_page in client.paginate( + f"/orgs/{team.org_login}/teams/{team.slug}/repos", + params={"per_page": 100}, + ): + for repo in repo_page: + role = _repo_permission_role(repo) + custom_role = custom_roles.get(role) + yield { + "id": team.database_id or 0, + "node_id": team.node_id, + "type": "Team", + "assignee_type": "team", + "repo_node_id": repo["node_id"], + "org_login": team.org_login, + "repo_name": repo["name"], + "role_name": role, + "base_role": custom_role.base_role if custom_role else None, + "role_permissions": custom_role.permissions if custom_role else [], + } + except Exception as e: + logger.error( + f"Error in resource 'team_repo_role_assignments' processing team '{team.slug}': {e}", + extra={"resource": "team_repo_role_assignments", "phase": "resource_iteration"}, + ) + return @app.resource(name="repository_roles_base", parallelized=True, columns=BaseRepoRole) @@ -769,14 +870,21 @@ def repository_roles_base(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/custom-repository-roles", params={"per_page": 100} - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/custom-repository-roles", params={"per_page": 100} + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'repository_roles_base' processing organization '{org_name}': {e}", + extra={"resource": "repository_roles_base", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="repo_roles", columns=RepoRole, parallelized=True) @@ -842,27 +950,34 @@ def repositories_graphql(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.repositories.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - ) - data = { - "query": REPO_REFS_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.repositories.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + ) + data = { + "query": REPO_REFS_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - repos_page = page_data[0]["organization"]["repositories"] - for repo in repos_page["nodes"]: - yield {**repo, "org_login": org_name} + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + repos_page = page_data[0]["organization"]["repositories"] + for repo in repos_page["nodes"]: + yield {**repo, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'repositories_graphql' processing organization '{org_name}': {e}", + extra={"resource": "repositories_graphql", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="branches", columns=Branch, parallelized=True) @@ -906,20 +1021,27 @@ def branches(repository: RepositoryQL, ctx: SourceContext): }, } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for branch in page_data[0]["repository"]["refs"]["nodes"]: - yield { - **branch, - "repository_node_id": repository.id, - "repository_name": repository.name, - "org_login": repository.org_login, - } + try: + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for branch in page_data[0]["repository"]["refs"]["nodes"]: + yield { + **branch, + "repository_node_id": repository.id, + "repository_name": repository.name, + "org_login": repository.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'branches' processing repository '{repository.org_login}/{repository.name}': {e}", + extra={"resource": "branches", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -950,27 +1072,34 @@ def branch_protection_rules(repository: RepositoryQL, ctx: SourceContext): rule_ids_list = list(rule_ids_seen) client = _client_for_org(ctx, repository.org_login) - for i in range(0, len(rule_ids_list), 100): - rules_chunk = rule_ids_list[i : i + 100] - if rules_chunk: - data = {"query": PROTECTION_RULES_QUERY, "variables": {"ids": rules_chunk}} - response = client.post("/graphql", json=data).json() - for rule in response["data"].get("nodes", []): - # GitHub can return null actors for deleted or inaccessible allowance actors. - for allowance_key in ("bypassPullRequestAllowances", "pushAllowances"): - allowances = rule.get(allowance_key) - if allowances and allowances.get("nodes"): - allowances["nodes"] = [ - node - for node in allowances["nodes"] - if node.get("actor") is not None - ] - yield { - **rule, - "org_login": repository.org_login, - "repository_node_id": repository.id, - "repository_name": repository.name, - } + try: + for i in range(0, len(rule_ids_list), 100): + rules_chunk = rule_ids_list[i : i + 100] + if rules_chunk: + data = {"query": PROTECTION_RULES_QUERY, "variables": {"ids": rules_chunk}} + response = client.post("/graphql", json=data).json() + for rule in response["data"].get("nodes", []): + # GitHub can return null actors for deleted or inaccessible allowance actors. + for allowance_key in ("bypassPullRequestAllowances", "pushAllowances"): + allowances = rule.get(allowance_key) + if allowances and allowances.get("nodes"): + allowances["nodes"] = [ + node + for node in allowances["nodes"] + if node.get("actor") is not None + ] + yield { + **rule, + "org_login": repository.org_login, + "repository_node_id": repository.id, + "repository_name": repository.name, + } + except Exception as e: + logger.error( + f"Error in resource 'branch_protection_rules' processing repository '{repository.org_login}/{repository.name}': {e}", + extra={"resource": "branch_protection_rules", "phase": "resource_iteration"}, + ) + return @app.transformer(name="workflows", columns=Workflow, parallelized=True) @@ -1012,12 +1141,19 @@ def _workflow_file_contents( } client = _client_for_org(ctx, repo.org_login) - for page in client.paginate( - f"/repos/{repo.full_name}/actions/workflows", params={"per_page": 100} - ): - for workflow in page: - if workflow.get("state") == "active": - yield _workflow_file_contents(client, repo, workflow) + try: + for page in client.paginate( + f"/repos/{repo.full_name}/actions/workflows", params={"per_page": 100} + ): + for workflow in page: + if workflow.get("state") == "active": + yield _workflow_file_contents(client, repo, workflow) + except Exception as e: + logger.error( + f"Error in resource 'workflows' processing repository '{repo.full_name}': {e}", + extra={"resource": "workflows", "phase": "resource_iteration"}, + ) + return @app.transformer(name="workflow_jobs", columns=WorkflowJob, parallelized=True) @@ -1048,19 +1184,26 @@ def environments(repo: Repository, ctx: SourceContext): repo_name = repo.name repo_node_id = repo.node_id client = _client_for_org(ctx, repo.org_login) - for page in client.paginate( - f"/repos/{full_name}/environments", - params={"per_page": 100}, - data_selector="environments", - ): - for env in page: - yield { - **env, - "org_login": repo.org_login, - "repository_name": repo_name, - "repository_full_name": full_name, - "repository_node_id": repo_node_id, - } + try: + for page in client.paginate( + f"/repos/{full_name}/environments", + params={"per_page": 100}, + data_selector="environments", + ): + for env in page: + yield { + **env, + "org_login": repo.org_login, + "repository_name": repo_name, + "repository_full_name": full_name, + "repository_node_id": repo_node_id, + } + except Exception as e: + logger.error( + f"Error in resource 'environments' processing repository '{repo.full_name}': {e}", + extra={"resource": "environments", "phase": "resource_iteration"}, + ) + return @app.resource(name="runner_groups", columns=RunnerGroup, parallelized=True) @@ -1068,16 +1211,23 @@ def runner_groups(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/runner-groups", - params={"per_page": 100}, - data_selector="runner_groups", - ): - for group in page: - yield { - **group, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/runner-groups", + params={"per_page": 100}, + data_selector="runner_groups", + ): + for group in page: + yield { + **group, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'runner_groups' processing organization '{org_name}': {e}", + extra={"resource": "runner_groups", "phase": "resource_iteration"}, + ) + continue @app.resource(name="org_runners", columns=OrgRunner, parallelized=True) @@ -1085,16 +1235,23 @@ def org_runners(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/runners", - params={"per_page": 100}, - data_selector="runners", - ): - for runner in page: - yield { - **runner, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/runners", + params={"per_page": 100}, + data_selector="runners", + ): + for runner in page: + yield { + **runner, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'org_runners' processing organization '{org_name}': {e}", + extra={"resource": "org_runners", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -1128,7 +1285,11 @@ def org_runner_group_memberships( "accessible_repo_node_ids": accessible_repo_node_ids, "org_login": org_name, } - except Exception: + except Exception as e: + logger.error( + f"Error in resource 'org_runner_group_memberships' processing runner group '{group.id}': {e}", + extra={"resource": "org_runner_group_memberships", "phase": "resource_iteration"}, + ) return @@ -1137,19 +1298,26 @@ def repo_runners(repo: Repository, ctx: SourceContext): if not repo.self_hosted_runners_enabled: return client = _client_for_org(ctx, repo.org_login) - for page in client.paginate( - f"/repos/{repo.full_name}/actions/runners", - params={"per_page": 100}, - data_selector="runners", - ): - for runner in page: - yield { - **runner, - "repository_name": repo.name, - "repository_node_id": repo.node_id, - "repository_full_name": repo.full_name, - "org_login": repo.org_login, - } + try: + for page in client.paginate( + f"/repos/{repo.full_name}/actions/runners", + params={"per_page": 100}, + data_selector="runners", + ): + for runner in page: + yield { + **runner, + "repository_name": repo.name, + "repository_node_id": repo.node_id, + "repository_full_name": repo.full_name, + "org_login": repo.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'repo_runners' processing repository '{repo.full_name}': {e}", + extra={"resource": "repo_runners", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -1173,18 +1341,25 @@ def environment_variables(environment: Environment, ctx: SourceContext): repo_node_id = environment.repository_node_id client = _client_for_org(ctx, environment.org_login) - for page in client.paginate( - f"/repos/{full_repo_name}/environments/{env_name}/variables" - ): - for item in page: - yield { - **item, - "org_login": environment.org_login, - "environment_node_id": env_node_id, - "environment_name": env_name, - "repository_name": repo_name, - "repository_node_id": repo_node_id, - } + try: + for page in client.paginate( + f"/repos/{full_repo_name}/environments/{env_name}/variables" + ): + for item in page: + yield { + **item, + "org_login": environment.org_login, + "environment_node_id": env_node_id, + "environment_name": env_name, + "repository_name": repo_name, + "repository_node_id": repo_node_id, + } + except Exception as e: + logger.error( + f"Error in resource 'environment_variables' processing environment '{env_name}': {e}", + extra={"resource": "environment_variables", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -1212,18 +1387,25 @@ def environment_branch_policies(environment: Environment, ctx: SourceContext): env_name = environment.name env_node_id = environment.node_id client = _client_for_org(ctx, environment.org_login) - for page in client.paginate( - f"/repos/{full_repo_name}/environments/{env_name}/deployment-branch-policies" - ): - for policy in page: - yield { - **policy, - "environment_node_id": env_node_id, - "environment_name": env_name, - "repository_name": repo_name, - "repository_node_id": repo_node_id, - "org_login": environment.org_login, - } + try: + for page in client.paginate( + f"/repos/{full_repo_name}/environments/{env_name}/deployment-branch-policies" + ): + for policy in page: + yield { + **policy, + "environment_node_id": env_node_id, + "environment_name": env_name, + "repository_name": repo_name, + "repository_node_id": repo_node_id, + "org_login": environment.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'environment_branch_policies' processing environment '{env_name}': {e}", + extra={"resource": "environment_branch_policies", "phase": "resource_iteration"}, + ) + return @app.transformer( @@ -1246,18 +1428,25 @@ def environment_secrets(environment: Environment, ctx: SourceContext): env_name = environment.name env_node_id = environment.node_id client = _client_for_org(ctx, environment.org_login) - for page in client.paginate( - f"/repos/{full_repo_name}/environments/{env_name}/secrets" - ): - for secret in page: - yield { - **secret, - "org_login": environment.org_login, - "repository_name": repo_name, - "repository_node_id": repo_node_id, - "environment_name": env_name, - "environment_node_id": env_node_id, - } + try: + for page in client.paginate( + f"/repos/{full_repo_name}/environments/{env_name}/secrets" + ): + for secret in page: + yield { + **secret, + "org_login": environment.org_login, + "repository_name": repo_name, + "repository_node_id": repo_node_id, + "environment_name": env_name, + "environment_node_id": env_node_id, + } + except Exception as e: + logger.error( + f"Error in resource 'environment_secrets' processing environment '{env_name}': {e}", + extra={"resource": "environment_secrets", "phase": "resource_iteration"}, + ) + return @app.resource(name="organization_secrets", columns=OrgSecret, parallelized=True) @@ -1289,14 +1478,21 @@ def organization_secrets(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/secrets", params={"per_page": 100} - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/secrets", params={"per_page": 100} + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'organization_secrets' processing organization '{org_name}': {e}", + extra={"resource": "organization_secrets", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -1316,17 +1512,24 @@ def selected_organization_secrets(secret: OrgSecret, ctx: SourceContext): """ if secret.visibility == "selected": client = _client_for_org(ctx, secret.org_login) - for page in client.paginate( - f"/orgs/{secret.org_login}/actions/secrets/{secret.name}/repositories", - params={"per_page": 100}, - ): - for repo in page: - yield { - "name": secret.name, - "repository_full_name": repo["full_name"], - "repository_node_id": repo["node_id"], - "org_login": secret.org_login, - } + try: + for page in client.paginate( + f"/orgs/{secret.org_login}/actions/secrets/{secret.name}/repositories", + params={"per_page": 100}, + ): + for repo in page: + yield { + "name": secret.name, + "repository_full_name": repo["full_name"], + "repository_node_id": repo["node_id"], + "org_login": secret.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'selected_organization_secrets' processing secret '{secret.name}': {e}", + extra={"resource": "selected_organization_secrets", "phase": "resource_iteration"}, + ) + return @app.resource(name="organization_variables", columns=OrgVariable, parallelized=True) @@ -1342,14 +1545,21 @@ def organization_variables(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/actions/variables", params={"per_page": 100} - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/actions/variables", params={"per_page": 100} + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'organization_variables' processing organization '{org_name}': {e}", + extra={"resource": "organization_variables", "phase": "resource_iteration"}, + ) + continue @app.transformer( @@ -1371,16 +1581,23 @@ def selected_organization_variables(variable: OrgVariable, ctx: SourceContext): """ if variable.visibility == "selected": client = _client_for_org(ctx, variable.org_login) - for page in client.paginate( - f"/orgs/{variable.org_login}/actions/variables/{variable.name}/repositories", - params={"per_page": 100}, - ): - for repo in page: - yield { - "name": variable.name, - "repository_node_id": repo["node_id"], - "org_login": variable.org_login, - } + try: + for page in client.paginate( + f"/orgs/{variable.org_login}/actions/variables/{variable.name}/repositories", + params={"per_page": 100}, + ): + for repo in page: + yield { + "name": variable.name, + "repository_node_id": repo["node_id"], + "org_login": variable.org_login, + } + except Exception as e: + logger.error( + f"Error in resource 'selected_organization_variables' processing variable '{variable.name}': {e}", + extra={"resource": "selected_organization_variables", "phase": "resource_iteration"}, + ) + return @app.transformer(name="repository_secrets", columns=RepoSecret, parallelized=True) @@ -1395,16 +1612,23 @@ def repository_secrets(repo: Repository, ctx: SourceContext): RepoSecret (RepoSecret): Repository secret record. """ client = _client_for_org(ctx, repo.org_login) - for page in client.paginate( - f"/repos/{repo.full_name}/actions/secrets", params={"per_page": 100} - ): - for secret in page: - yield { - **secret, - "org_login": repo.org_login, - "repository_name": repo.full_name, - "repository_node_id": repo.node_id, - } + try: + for page in client.paginate( + f"/repos/{repo.full_name}/actions/secrets", params={"per_page": 100} + ): + for secret in page: + yield { + **secret, + "org_login": repo.org_login, + "repository_name": repo.full_name, + "repository_node_id": repo.node_id, + } + except Exception as e: + logger.error( + f"Error in resource 'repository_secrets' processing repository '{repo.full_name}': {e}", + extra={"resource": "repository_secrets", "phase": "resource_iteration"}, + ) + return @app.transformer(name="repository_variables", columns=RepoVariable, parallelized=True) @@ -1422,16 +1646,23 @@ def repository_variables(repo: Repository, ctx: SourceContext): RepoVariable (RepoVariable): Repository variable record. """ client = _client_for_org(ctx, repo.org_login) - for page in client.paginate( - f"/repos/{repo.full_name}/actions/variables", params={"per_page": 100} - ): - for variable in page: - yield { - **variable, - "org_login": repo.org_login, - "repository_name": repo.full_name, - "repository_node_id": repo.node_id, - } + try: + for page in client.paginate( + f"/repos/{repo.full_name}/actions/variables", params={"per_page": 100} + ): + for variable in page: + yield { + **variable, + "org_login": repo.org_login, + "repository_name": repo.full_name, + "repository_node_id": repo.node_id, + } + except Exception as e: + logger.error( + f"Error in resource 'repository_variables' processing repository '{repo.full_name}': {e}", + extra={"resource": "repository_variables", "phase": "resource_iteration"}, + ) + return @app.resource( @@ -1455,33 +1686,40 @@ def secret_scanning_alerts(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/secret-scanning/alerts", params={"per_page": 100} - ): - for alert in page: - valid_token_user_node_id: str | None = None - secret = alert.get("secret") - if ( - alert.get("state") == "open" - and alert.get("secret_type") == "github_personal_access_token" - and secret - ): - try: - resp = requests.get( - "https://api.github.com/user", - headers={"Authorization": f"Bearer {secret}"}, - timeout=10, - ) - if resp.status_code == 200: - valid_token_user_node_id = resp.json().get("node_id") - except Exception: - pass - - yield { - **alert, - "valid_token_user_node_id": valid_token_user_node_id, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/secret-scanning/alerts", params={"per_page": 100} + ): + for alert in page: + valid_token_user_node_id: str | None = None + secret = alert.get("secret") + if ( + alert.get("state") == "open" + and alert.get("secret_type") == "github_personal_access_token" + and secret + ): + try: + resp = requests.get( + "https://api.github.com/user", + headers={"Authorization": f"Bearer {secret}"}, + timeout=10, + ) + if resp.status_code == 200: + valid_token_user_node_id = resp.json().get("node_id") + except Exception: + pass + + yield { + **alert, + "valid_token_user_node_id": valid_token_user_node_id, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'secret_scanning_alerts' processing organization '{org_name}': {e}", + extra={"resource": "secret_scanning_alerts", "phase": "resource_iteration"}, + ) + continue @app.resource( @@ -1504,14 +1742,21 @@ def personal_access_tokens(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/personal-access-tokens", params={"per_page": 100} - ): - for pat in page: - yield { - **pat, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/personal-access-tokens", params={"per_page": 100} + ): + for pat in page: + yield { + **pat, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'personal_access_tokens' processing organization '{org_name}': {e}", + extra={"resource": "personal_access_tokens", "phase": "resource_iteration"}, + ) + continue @app.transformer(name="pat_repo_access", columns=PatRepoAccess, parallelized=True) @@ -1529,12 +1774,19 @@ def pat_repo_access(pat: PersonalAccessToken, ctx: SourceContext): return client = _client_for_org(ctx, pat.org_login) - for page in client.paginate( - f"/orgs/{pat.org_login}/personal-access-tokens/{pat.id}/repositories", - params={"per_page": 100}, - ): - for item in page: - yield {"pat_id": pat.id, **item, "org_login": pat.org_login} + try: + for page in client.paginate( + f"/orgs/{pat.org_login}/personal-access-tokens/{pat.id}/repositories", + params={"per_page": 100}, + ): + for item in page: + yield {"pat_id": pat.id, **item, "org_login": pat.org_login} + except Exception as e: + logger.error( + f"Error in resource 'pat_repo_access' processing PAT '{pat.id}': {e}", + extra={"resource": "pat_repo_access", "phase": "resource_iteration"}, + ) + return @app.resource( @@ -1558,15 +1810,22 @@ def personal_access_token_requests(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - for page in client.paginate( - f"/orgs/{org_name}/personal-access-token-requests", - params={"per_page": 100}, - ): - for item in page: - yield { - **item, - "org_login": org_name, - } + try: + for page in client.paginate( + f"/orgs/{org_name}/personal-access-token-requests", + params={"per_page": 100}, + ): + for item in page: + yield { + **item, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'personal_access_token_requests' processing organization '{org_name}': {e}", + extra={"resource": "personal_access_token_requests", "phase": "resource_iteration"}, + ) + continue @app.resource(name="saml_provider", columns=SamlProvider, parallelized=True) @@ -1585,25 +1844,32 @@ def saml_provider(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - data = { - "query": SAML_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + data = { + "query": SAML_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - response = client.post("/graphql", json=data).json() - response_data = response.get("data", {}) - org_data = response_data.get("organization", {}) - if response_data and org_data: - idp = org_data.get("samlIdentityProvider") - if not idp: - continue + response = client.post("/graphql", json=data).json() + response_data = response.get("data", {}) + org_data = response_data.get("organization", {}) + if response_data and org_data: + idp = org_data.get("samlIdentityProvider") + if not idp: + continue - yield { - **idp, - "org_node_id": org_data["id"], - "org_name": org_data["name"], - "org_login": org_name, - } + yield { + **idp, + "org_node_id": org_data["id"], + "org_name": org_data["name"], + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'saml_provider' processing organization '{org_name}': {e}", + extra={"resource": "saml_provider", "phase": "resource_iteration"}, + ) + continue @app.resource(name="external_identities", columns=ExternalIdentity, parallelized=True) @@ -1620,34 +1886,41 @@ def external_identities(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - paginator = GraphQLCursorPaginator( - page_info_path="data.organization.samlIdentityProvider.externalIdentities.pageInfo", - cursor_variable="after", - cursor_field="endCursor", - has_next_field="hasNextPage", - allow_missing_page_info=True, - ) - data = { - "query": SAML_IDENTITIES_QUERY, - "variables": {"login": org_name, "count": 100, "after": None}, - } + try: + paginator = GraphQLCursorPaginator( + page_info_path="data.organization.samlIdentityProvider.externalIdentities.pageInfo", + cursor_variable="after", + cursor_field="endCursor", + has_next_field="hasNextPage", + allow_missing_page_info=True, + ) + data = { + "query": SAML_IDENTITIES_QUERY, + "variables": {"login": org_name, "count": 100, "after": None}, + } - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for org in page_data: - org_data = org.get("organization") - idp = org_data.get("samlIdentityProvider") - if not idp: - continue - for identity in (idp.get("externalIdentities") or {}).get( - "nodes" - ) or []: - yield {**identity, "org_login": org_name} + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for org in page_data: + org_data = org.get("organization") + idp = org_data.get("samlIdentityProvider") + if not idp: + continue + for identity in (idp.get("externalIdentities") or {}).get( + "nodes" + ) or []: + yield {**identity, "org_login": org_name} + except Exception as e: + logger.error( + f"Error in resource 'external_identities' processing organization '{org_name}': {e}", + extra={"resource": "external_identities", "phase": "resource_iteration"}, + ) + continue @app.resource(name="scim_users", columns=ScimResource, parallelized=True) @@ -1663,23 +1936,30 @@ def scim_users(ctx: SourceContext): for org in ctx.organizations: org_name = org.org_name client = org.client - scim_paginator = OffsetPaginator( - offset_param="startIndex", - limit_param="itemsPerPage", - limit=100, - total_path="totalResults", - ) - for page in client.paginate( - f"/scim/v2/organizations/{org_name}/Users", - params={"startIndex": 1, "itemsPerPage": 100}, - paginator=scim_paginator, - data_selector="Resources", - ): - for user in page: - yield { - **user, - "org_login": org_name, - } + try: + scim_paginator = OffsetPaginator( + offset_param="startIndex", + limit_param="itemsPerPage", + limit=100, + total_path="totalResults", + ) + for page in client.paginate( + f"/scim/v2/organizations/{org_name}/Users", + params={"startIndex": 1, "itemsPerPage": 100}, + paginator=scim_paginator, + data_selector="Resources", + ): + for user in page: + yield { + **user, + "org_login": org_name, + } + except Exception as e: + logger.error( + f"Error in resource 'scim_users' processing organization '{org_name}': {e}", + extra={"resource": "scim_users", "phase": "resource_iteration"}, + ) + continue def organization_resources(ctx: SourceContext): diff --git a/tests/test_error_resilience.py b/tests/test_error_resilience.py new file mode 100644 index 0000000..9589e14 --- /dev/null +++ b/tests/test_error_resilience.py @@ -0,0 +1,189 @@ +""" +Tests verifying error-resilient behavior in multi-org resource generators and transformers. + +After the fix: + Each org's processing is isolated in its own try/except block. A failure for + one org is caught, logged, and iteration continues with the next org — no data + is silently dropped. +""" +import inspect +import logging +from unittest.mock import MagicMock + +from openhound_github.resources.organization import ( + OrgContext, + SourceContext, + applications, + organizations, + users, +) + + +def _success_client(org_name: str) -> MagicMock: + """Mock RESTClient that returns a minimal org dict for every .get() call.""" + client = MagicMock() + client.get.return_value.json.return_value = { + "login": org_name, + "node_id": f"node_{org_name}", + } + return client + + +def _failing_client(error: Exception) -> MagicMock: + """Mock RESTClient that raises on any .get() call.""" + client = MagicMock() + client.get.side_effect = error + return client + + +def _graphql_page(org_login: str, logins: list[str]) -> list[dict]: + """Return one GraphQL page in the shape the `users` resource expects. + + `client.paginate(..., data_selector="data")` yields a list of the items + extracted from the `data` key. The `users` resource accesses + ``page_data[0]["organization"]["membersWithRole"]["edges"]``, so each + page must be a list whose first element is the parsed GraphQL data dict. + """ + edges = [{"node": {"id": f"id_{login}", "login": login}, "role": "MEMBER"} for login in logins] + return [ + { + "organization": { + "membersWithRole": { + "edges": edges, + "pageInfo": {"hasNextPage": False, "endCursor": None}, + } + } + } + ] + + +def test_rest_resource_continues_after_org_failure(caplog) -> None: + """A mid-iteration REST failure does NOT drop subsequent orgs. + + Setup: + org1 — succeeds (client.get returns valid org dict) + org2 — raises ConnectionError on its first API call + org3 — succeeds + + Expected (fixed behavior): + org1's data IS yielded. + org2 is skipped and an error is logged. + org3's data IS yielded — iteration continues after the failure. + """ + client1 = _success_client("org1") + client2 = _failing_client(ConnectionError("HTTP 500: Internal Server Error")) + client3 = _success_client("org3") + + ctx = SourceContext( + client=client1, + organizations=[ + OrgContext(client=client1, org_name="org1"), + OrgContext(client=client2, org_name="org2"), + OrgContext(client=client3, org_name="org3"), + ], + ) + + with caplog.at_level(logging.ERROR, logger="openhound_github.resources.organization"): + results = list(organizations.bind(ctx)) + + yielded_logins = {r.login for r in results} + + assert "org1" in yielded_logins, "org1 data should have been yielded before the error on org2" + assert "org3" in yielded_logins, "org3 data should still be yielded after org2 fails" + + assert any( + "Error in resource 'organizations' processing organization 'org2'" in msg + for msg in caplog.messages + ), "Expected an error log for org2" + + +def test_graphql_resource_continues_after_org_failure(caplog) -> None: + """A mid-iteration GraphQL failure does NOT drop subsequent orgs. + + Setup: + org1 — paginate succeeds, yields user1 + org2 — paginate raises ConnectionError + org3 — paginate succeeds, yields user3 + + Expected (fixed behavior): + user1 (org1) IS yielded. + org2 is skipped and an error is logged. + user3 (org3) IS yielded. + + Note: the raw generator is called directly (via ``_pipe.gen``) to stay + within the unit-test boundary and avoid DLT model-validation concerns. + """ + client1 = MagicMock() + client1.paginate.return_value = [_graphql_page("org1", ["user1"])] + + client2 = MagicMock() + client2.paginate.side_effect = ConnectionError("GraphQL endpoint unreachable") + + client3 = MagicMock() + client3.paginate.return_value = [_graphql_page("org3", ["user3"])] + + ctx = SourceContext( + client=client1, + organizations=[ + OrgContext(client=client1, org_name="org1"), + OrgContext(client=client2, org_name="org2"), + OrgContext(client=client3, org_name="org3"), + ], + ) + + # Call the raw generator function to bypass DLT pipeline machinery. + # parallelized=True wraps the pipe gen with wrap_parallel_iterator (functools.wraps), + # which yields deferred callables instead of data and loops forever in tests. + # inspect.unwrap() follows __wrapped__ back to the original generator function. + with caplog.at_level(logging.ERROR, logger="openhound_github.resources.organization"): + results = list(inspect.unwrap(users._pipe.gen)(ctx)) + + yielded_logins = {r["login"] for r in results} + + assert "user1" in yielded_logins, "user1 (org1) should have been yielded before org2 failed" + assert "user3" in yielded_logins, "user3 (org3) should still be yielded after org2 fails" + + assert any( + "Error in resource 'users' processing organization 'org2'" in msg + for msg in caplog.messages + ), "Expected an error log for org2" + + +def test_applications_transformer_handles_api_error(caplog) -> None: + """A failing API call in the applications transformer is caught and logged. + + The transformer should log the error and yield nothing rather than + propagating the exception, so that the DLT pipeline can continue with + the next AppInstallation. + + Note: the raw generator is called directly (via ``_pipe.gen``) because + DLT transformers use a different calling convention when invoked via + their decorated interface — the first argument is reserved for upstream + pipeline items, not direct calls. + """ + failing_client = MagicMock() + failing_client.get.side_effect = ConnectionError("HTTP 503: Service Unavailable") + + ctx = SourceContext( + client=failing_client, + organizations=[OrgContext(client=failing_client, org_name="org1")], + ) + + # Use a MagicMock to stand in for AppInstallation — the transformer + # only accesses .id, .app_slug, and .org_login from the install object. + install = MagicMock() + install.id = 42 + install.app_slug = "acme-bot" + install.org_login = "org1" + + # Call the raw generator function to bypass DLT pipeline machinery. + # Same parallelized=True wrapping issue as users — unwrap to get the original function. + with caplog.at_level(logging.ERROR, logger="openhound_github.resources.organization"): + result = list(inspect.unwrap(applications._pipe.gen)(install, ctx)) + + assert result == [], "Transformer should yield nothing when the API call fails" + + assert any( + "Error in resource 'applications' processing app 'acme-bot'" in msg + for msg in caplog.messages + ), "Expected an error log for the failing app fetch" From bf3b7ca9554df5a38caa9bbcce4c815776adbb0d Mon Sep 17 00:00:00 2001 From: Stran Dutton Date: Wed, 24 Jun 2026 10:21:41 -0500 Subject: [PATCH 2/2] remove try/catch from transformers --- src/openhound_github/resources/enterprise.py | 382 +++++------ .../resources/organization.py | 647 +++++++----------- tests/test_error_resilience.py | 41 -- 3 files changed, 406 insertions(+), 664 deletions(-) diff --git a/src/openhound_github/resources/enterprise.py b/src/openhound_github/resources/enterprise.py index e53c5c2..a5d114e 100644 --- a/src/openhound_github/resources/enterprise.py +++ b/src/openhound_github/resources/enterprise.py @@ -99,31 +99,24 @@ def enterprise_members(enterprise_data: Enterprise, ctx: SourceContext): "query": ENTERPRISE_MEMBERS_QUERY, "variables": {"slug": ctx.enterprise_name, "count": 100, "after": None}, } - try: - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - members = es_data.get("members", {}) - for edge in members.get("edges", []): - node = edge.get("node") - if node: - yield { - **node, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_members' processing enterprise '{ctx.enterprise_name}': {e}", - extra={"resource": "enterprise_members", "phase": "resource_iteration"}, - ) - return + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + members = es_data.get("members", {}) + for edge in members.get("edges", []): + node = edge.get("node") + if node: + yield { + **node, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } @app.transformer(name="enterprise_users", columns=EnterpriseUser, parallelized=True) @@ -165,22 +158,15 @@ def enterprise_managed_users(base_user: BaseUser, ctx: SourceContext): @app.transformer(name="enterprise_teams", columns=EnterpriseTeam, parallelized=True) def enterprise_teams(enterprise_data: Enterprise, ctx: SourceContext): - try: - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/teams", params={"per_page": 100} - ): - for team in page: - yield { - **team, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_teams' processing enterprise '{enterprise_data.id}': {e}", - extra={"resource": "enterprise_teams", "phase": "resource_iteration"}, - ) - return + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/teams", params={"per_page": 100} + ): + for team in page: + yield { + **team, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } @app.transformer( @@ -201,27 +187,20 @@ def enterprise_team_roles(team: EnterpriseTeam): ) def enterprise_team_members(team: EnterpriseTeam, ctx: SourceContext): - try: - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/memberships", - params={"per_page": 100}, - ): - for member in page: - node_id = member.get("node_id") or member.get("user", {}).get("node_id") - if node_id: - yield { - **member, - "node_id": node_id, - "team_id": team.id, - "enterprise_node_id": team.enterprise_node_id, - "enterprise_slug": team.enterprise_slug, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_team_members' processing team '{team.id}': {e}", - extra={"resource": "enterprise_team_members", "phase": "resource_iteration"}, - ) - return + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/memberships", + params={"per_page": 100}, + ): + for member in page: + node_id = member.get("node_id") or member.get("user", {}).get("node_id") + if node_id: + yield { + **member, + "node_id": node_id, + "team_id": team.id, + "enterprise_node_id": team.enterprise_node_id, + "enterprise_slug": team.enterprise_slug, + } @app.transformer( @@ -231,59 +210,45 @@ def enterprise_team_members(team: EnterpriseTeam, ctx: SourceContext): ) def enterprise_team_organizations(team: EnterpriseTeam, ctx: SourceContext): - try: - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/organizations", - params={"per_page": 100}, - ): - for org in page: - node_id = org.get("node_id") or org.get("id") - if node_id: - yield { - **org, - "node_id": node_id, - "team_id": team.id, - "projected_slug": team.slug, - "enterprise_node_id": team.enterprise_node_id, - "enterprise_slug": team.enterprise_slug, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_team_organizations' processing team '{team.id}': {e}", - extra={"resource": "enterprise_team_organizations", "phase": "resource_iteration"}, - ) - return + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/teams/{team.id}/organizations", + params={"per_page": 100}, + ): + for org in page: + node_id = org.get("node_id") or org.get("id") + if node_id: + yield { + **org, + "node_id": node_id, + "team_id": team.id, + "projected_slug": team.slug, + "enterprise_node_id": team.enterprise_node_id, + "enterprise_slug": team.enterprise_slug, + } @app.transformer(name="enterprise_roles", columns=EnterpriseRole, parallelized=True) def enterprise_roles(enterprise_data: Enterprise, ctx: SourceContext): - try: - result = ctx.client.get( - f"/enterprises/{ctx.enterprise_name}/enterprise-roles" - ).json() - - for role in result.get("roles", []): - yield { - **role, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } + result = ctx.client.get( + f"/enterprises/{ctx.enterprise_name}/enterprise-roles" + ).json() + for role in result.get("roles", []): yield { - "id": "owners", - "name": "owners", - "description": "Enterprise administrators discovered from ownerInfo.admins", - "source": "Default", - "permissions": [], + **role, "enterprise_node_id": enterprise_data.id, "enterprise_slug": ctx.enterprise_name, } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_roles' processing enterprise '{ctx.enterprise_name}': {e}", - extra={"resource": "enterprise_roles", "phase": "resource_iteration"}, - ) - return + + yield { + "id": "owners", + "name": "owners", + "description": "Enterprise administrators discovered from ownerInfo.admins", + "source": "Default", + "permissions": [], + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } @app.transformer( @@ -293,25 +258,18 @@ def enterprise_role_teams(role: EnterpriseRole, ctx: SourceContext): if role.id == "owners": return - try: - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/teams", - params={"per_page": 100}, - ): - for team in page: - if team.get("id"): - yield { - **team, - "role_id": role.id, - "enterprise_node_id": role.enterprise_node_id, - "enterprise_slug": role.enterprise_slug, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_role_teams' processing role '{role.id}': {e}", - extra={"resource": "enterprise_role_teams", "phase": "resource_iteration"}, - ) - return + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/teams", + params={"per_page": 100}, + ): + for team in page: + if team.get("id"): + yield { + **team, + "role_id": role.id, + "enterprise_node_id": role.enterprise_node_id, + "enterprise_slug": role.enterprise_slug, + } @app.transformer( @@ -321,25 +279,18 @@ def enterprise_role_users(role: EnterpriseRole, ctx: SourceContext): if role.id == "owners": return - try: - for page in ctx.client.paginate( - f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/users", - params={"per_page": 100}, - ): - for user in page: - if user.get("node_id"): - yield { - **user, - "role_id": role.id, - "enterprise_node_id": role.enterprise_node_id, - "enterprise_slug": role.enterprise_slug, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_role_users' processing role '{role.id}': {e}", - extra={"resource": "enterprise_role_users", "phase": "resource_iteration"}, - ) - return + for page in ctx.client.paginate( + f"/enterprises/{ctx.enterprise_name}/enterprise-roles/{role.id}/users", + params={"per_page": 100}, + ): + for user in page: + if user.get("node_id"): + yield { + **user, + "role_id": role.id, + "enterprise_node_id": role.enterprise_node_id, + "enterprise_slug": role.enterprise_slug, + } @app.transformer(name="enterprise_admins", columns=EnterpriseAdmin, parallelized=True) @@ -355,34 +306,27 @@ def enterprise_admins(enterprise_data: Enterprise, ctx: SourceContext): "query": ENTERPRISE_ADMINS_QUERY, "variables": {"slug": ctx.enterprise_name, "count": 100, "after": None}, } - try: - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - owner_info = es_data.get("ownerInfo") or {} - for edge in (owner_info.get("admins") or {}).get("edges") or []: - node = edge.get("node") - if node and node.get("id"): - yield { - "node_id": node["id"], - "login": node.get("login"), - "assignment": "direct", - "role_id": "owners", - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_admins' processing enterprise '{ctx.enterprise_name}': {e}", - extra={"resource": "enterprise_admins", "phase": "resource_iteration"}, - ) - return + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + owner_info = es_data.get("ownerInfo") or {} + for edge in (owner_info.get("admins") or {}).get("edges") or []: + node = edge.get("node") + if node and node.get("id"): + yield { + "node_id": node["id"], + "login": node.get("login"), + "assignment": "direct", + "role_id": "owners", + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } @app.transformer( @@ -401,30 +345,23 @@ def enterprise_saml_provider(enterprise_data: Enterprise, ctx: SourceContext): "variables": {"slug": ctx.enterprise_name, "count": 1, "after": None}, } - try: - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - saml_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") - if not saml_provider: - return - yield { - **{k: v for k, v in saml_provider.items() if k != "externalIdentities"}, - "enterprise_node_id": enterprise_data.id, - "enterprise_slug": ctx.enterprise_name, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_saml_provider' processing enterprise '{ctx.enterprise_name}': {e}", - extra={"resource": "enterprise_saml_provider", "phase": "resource_iteration"}, - ) - return + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + saml_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") + if not saml_provider: + return + yield { + **{k: v for k, v in saml_provider.items() if k != "externalIdentities"}, + "enterprise_node_id": enterprise_data.id, + "enterprise_slug": ctx.enterprise_name, + } @app.transformer( @@ -447,36 +384,29 @@ def enterprise_external_identities( "variables": {"slug": ctx.enterprise_name, "count": 100, "after": None}, } - try: - for page_data in ctx.client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for enterprise_object in page_data: - es_data = enterprise_object.get("enterprise", {}) - page_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") - if not page_provider: - return - for identity in (page_provider.get("externalIdentities") or {}).get( - "nodes" - ) or []: - yield { - **identity, - "saml_provider_id": saml_provider.id, - "saml_provider_issuer": saml_provider.issuer, - "saml_provider_sso_url": saml_provider.sso_url, - "enterprise_node_id": saml_provider.enterprise_node_id, - "enterprise_slug": saml_provider.enterprise_slug, - } - except Exception as e: - logger.error( - f"Error in resource 'enterprise_external_identities' processing saml_provider '{saml_provider.id}': {e}", - extra={"resource": "enterprise_external_identities", "phase": "resource_iteration"}, - ) - return + for page_data in ctx.client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for enterprise_object in page_data: + es_data = enterprise_object.get("enterprise", {}) + page_provider = (es_data.get("ownerInfo") or {}).get("samlIdentityProvider") + if not page_provider: + return + for identity in (page_provider.get("externalIdentities") or {}).get( + "nodes" + ) or []: + yield { + **identity, + "saml_provider_id": saml_provider.id, + "saml_provider_issuer": saml_provider.issuer, + "saml_provider_sso_url": saml_provider.sso_url, + "enterprise_node_id": saml_provider.enterprise_node_id, + "enterprise_slug": saml_provider.enterprise_slug, + } def enterprise_resources(ctx: SourceContext): diff --git a/src/openhound_github/resources/organization.py b/src/openhound_github/resources/organization.py index b856ae4..2ca7524 100644 --- a/src/openhound_github/resources/organization.py +++ b/src/openhound_github/resources/organization.py @@ -303,23 +303,16 @@ def org_roles(org: Organization, ctx: SourceContext): } client = _client_for_org(ctx, org.login) - try: - for page in client.paginate( - f"/orgs/{org.login}/organization-roles", params={"per_page": 100} - ): - for role in page: - yield { - **role, - "type": "custom", - "org_node_id": org.node_id, - "org_login": org.login, - } - except Exception as e: - logger.error( - f"Error in resource 'org_roles' processing organization '{org.login}': {e}", - extra={"resource": "org_roles", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/orgs/{org.login}/organization-roles", params={"per_page": 100} + ): + for role in page: + yield { + **role, + "type": "custom", + "org_node_id": org.node_id, + "org_login": org.login, + } @app.transformer(name="org_role_teams", columns=OrgRoleTeam, parallelized=True) @@ -338,24 +331,17 @@ def org_role_teams(role: OrgRole, ctx: SourceContext): if role.type == "custom": client = _client_for_org(ctx, role.org_login) - try: - for page in client.paginate( - f"/orgs/{role.org_login}/organization-roles/{role.id}/teams" - ): - for team in page: - yield { - "org_role_id": role.id, - "org_role_name": role.name, - "org_node_id": role.org_node_id, - "org_login": role.org_login, - **team, - } - except Exception as e: - logger.error( - f"Error in resource 'org_role_teams' processing role '{role.id}': {e}", - extra={"resource": "org_role_teams", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/orgs/{role.org_login}/organization-roles/{role.id}/teams" + ): + for team in page: + yield { + "org_role_id": role.id, + "org_role_name": role.name, + "org_node_id": role.org_node_id, + "org_login": role.org_login, + **team, + } @app.transformer(name="org_role_members", columns=OrgRoleMember, parallelized=True) @@ -373,24 +359,17 @@ def org_role_members(role: OrgRole, ctx: SourceContext): """ if role.type == "custom": client = _client_for_org(ctx, role.org_login) - try: - for page in client.paginate( - f"/orgs/{role.org_login}/organization-roles/{role.id}/users" - ): - for user in page: - yield { - **user, - "org_role_name": role.name, - "org_role_id": role.id, - "org_node_id": role.org_node_id, - "org_login": role.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'org_role_members' processing role '{role.id}': {e}", - extra={"resource": "org_role_members", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/orgs/{role.org_login}/organization-roles/{role.id}/users" + ): + for user in page: + yield { + **user, + "org_role_name": role.name, + "org_role_id": role.id, + "org_node_id": role.org_node_id, + "org_login": role.org_login, + } @app.resource(name="app_installations", columns=AppInstallation, parallelized=True) @@ -433,17 +412,10 @@ def applications(app_install: AppInstallation, ctx: SourceContext): if app_install.id: client = _client_for_org(ctx, app_install.org_login) app_slug = str(app_slug) - try: - if app_slug not in ctx.app_cache: - with ctx.cache_lock: - if app_slug not in ctx.app_cache: - ctx.app_cache[app_slug] = client.get(f"/apps/{app_slug}").json() - except Exception as e: - logger.error( - f"Error in resource 'applications' processing app '{app_slug}': {e}", - extra={"resource": "applications", "phase": "resource_iteration"}, - ) - return + if app_slug not in ctx.app_cache: + with ctx.cache_lock: + if app_slug not in ctx.app_cache: + ctx.app_cache[app_slug] = client.get(f"/apps/{app_slug}").json() app_data = ctx.app_cache[app_slug] if app_data.get("node_id"): yield {**app_data, "slug": app_slug, "org_login": app_install.org_login} @@ -637,28 +609,21 @@ def team_members(team: Team, ctx: SourceContext): "slug": team.slug, }, } - try: - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for member in page_data[0]["organization"]["team"]["members"]["edges"]: - yield { - "team_id": team.id, - "id": member["node"]["id"], - "login": member["node"]["login"], - "role": member["role"], - "org_login": team.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'team_members' processing team '{team.slug}': {e}", - extra={"resource": "team_members", "phase": "resource_iteration"}, - ) - return + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for member in page_data[0]["organization"]["team"]["members"]["edges"]: + yield { + "team_id": team.id, + "id": member["node"]["id"], + "login": member["node"]["login"], + "role": member["role"], + "org_login": team.org_login, + } @app.resource(name="actions_permissions", columns=ActionPermission, parallelized=True) @@ -782,30 +747,23 @@ def repo_role_assignments( client = _client_for_org(ctx, repo.org_login) - try: - for collab_page in client.paginate( - f"/repos/{repo.org_login}/{repo_name}/collaborators", - params={"affiliation": "direct", "per_page": 100}, - ): - for collaborator in collab_page: - role = collaborator.get("role_name", "") - custom_role = custom_roles.get(role) - yield { - **collaborator, - "org_login": repo.org_login, - "assignee_type": "user", - "repo_node_id": repo_node_id, - "repo_name": repo_name, - "role_name": role, - "base_role": custom_role.base_role if custom_role else None, - "role_permissions": custom_role.permissions if custom_role else [], - } - except Exception as e: - logger.error( - f"Error in resource 'repo_role_assignments' processing repository '{repo.full_name}': {e}", - extra={"resource": "repo_role_assignments", "phase": "resource_iteration"}, - ) - return + for collab_page in client.paginate( + f"/repos/{repo.org_login}/{repo_name}/collaborators", + params={"affiliation": "direct", "per_page": 100}, + ): + for collaborator in collab_page: + role = collaborator.get("role_name", "") + custom_role = custom_roles.get(role) + yield { + **collaborator, + "org_login": repo.org_login, + "assignee_type": "user", + "repo_node_id": repo_node_id, + "repo_name": repo_name, + "role_name": role, + "base_role": custom_role.base_role if custom_role else None, + "role_permissions": custom_role.permissions if custom_role else [], + } @app.transformer( @@ -829,32 +787,25 @@ def team_repo_role_assignments( if role.get("org_login") == team.org_login } client = _client_for_org(ctx, team.org_login) - try: - for repo_page in client.paginate( - f"/orgs/{team.org_login}/teams/{team.slug}/repos", - params={"per_page": 100}, - ): - for repo in repo_page: - role = _repo_permission_role(repo) - custom_role = custom_roles.get(role) - yield { - "id": team.database_id or 0, - "node_id": team.node_id, - "type": "Team", - "assignee_type": "team", - "repo_node_id": repo["node_id"], - "org_login": team.org_login, - "repo_name": repo["name"], - "role_name": role, - "base_role": custom_role.base_role if custom_role else None, - "role_permissions": custom_role.permissions if custom_role else [], - } - except Exception as e: - logger.error( - f"Error in resource 'team_repo_role_assignments' processing team '{team.slug}': {e}", - extra={"resource": "team_repo_role_assignments", "phase": "resource_iteration"}, - ) - return + for repo_page in client.paginate( + f"/orgs/{team.org_login}/teams/{team.slug}/repos", + params={"per_page": 100}, + ): + for repo in repo_page: + role = _repo_permission_role(repo) + custom_role = custom_roles.get(role) + yield { + "id": team.database_id or 0, + "node_id": team.node_id, + "type": "Team", + "assignee_type": "team", + "repo_node_id": repo["node_id"], + "org_login": team.org_login, + "repo_name": repo["name"], + "role_name": role, + "base_role": custom_role.base_role if custom_role else None, + "role_permissions": custom_role.permissions if custom_role else [], + } @app.resource(name="repository_roles_base", parallelized=True, columns=BaseRepoRole) @@ -1021,27 +972,20 @@ def branches(repository: RepositoryQL, ctx: SourceContext): }, } - try: - for page_data in client.paginate( - "/graphql", - method="POST", - json=data, - paginator=paginator, - data_selector="data", - ): - for branch in page_data[0]["repository"]["refs"]["nodes"]: - yield { - **branch, - "repository_node_id": repository.id, - "repository_name": repository.name, - "org_login": repository.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'branches' processing repository '{repository.org_login}/{repository.name}': {e}", - extra={"resource": "branches", "phase": "resource_iteration"}, - ) - return + for page_data in client.paginate( + "/graphql", + method="POST", + json=data, + paginator=paginator, + data_selector="data", + ): + for branch in page_data[0]["repository"]["refs"]["nodes"]: + yield { + **branch, + "repository_node_id": repository.id, + "repository_name": repository.name, + "org_login": repository.org_login, + } @app.transformer( @@ -1072,34 +1016,27 @@ def branch_protection_rules(repository: RepositoryQL, ctx: SourceContext): rule_ids_list = list(rule_ids_seen) client = _client_for_org(ctx, repository.org_login) - try: - for i in range(0, len(rule_ids_list), 100): - rules_chunk = rule_ids_list[i : i + 100] - if rules_chunk: - data = {"query": PROTECTION_RULES_QUERY, "variables": {"ids": rules_chunk}} - response = client.post("/graphql", json=data).json() - for rule in response["data"].get("nodes", []): - # GitHub can return null actors for deleted or inaccessible allowance actors. - for allowance_key in ("bypassPullRequestAllowances", "pushAllowances"): - allowances = rule.get(allowance_key) - if allowances and allowances.get("nodes"): - allowances["nodes"] = [ - node - for node in allowances["nodes"] - if node.get("actor") is not None - ] - yield { - **rule, - "org_login": repository.org_login, - "repository_node_id": repository.id, - "repository_name": repository.name, - } - except Exception as e: - logger.error( - f"Error in resource 'branch_protection_rules' processing repository '{repository.org_login}/{repository.name}': {e}", - extra={"resource": "branch_protection_rules", "phase": "resource_iteration"}, - ) - return + for i in range(0, len(rule_ids_list), 100): + rules_chunk = rule_ids_list[i : i + 100] + if rules_chunk: + data = {"query": PROTECTION_RULES_QUERY, "variables": {"ids": rules_chunk}} + response = client.post("/graphql", json=data).json() + for rule in response["data"].get("nodes", []): + # GitHub can return null actors for deleted or inaccessible allowance actors. + for allowance_key in ("bypassPullRequestAllowances", "pushAllowances"): + allowances = rule.get(allowance_key) + if allowances and allowances.get("nodes"): + allowances["nodes"] = [ + node + for node in allowances["nodes"] + if node.get("actor") is not None + ] + yield { + **rule, + "org_login": repository.org_login, + "repository_node_id": repository.id, + "repository_name": repository.name, + } @app.transformer(name="workflows", columns=Workflow, parallelized=True) @@ -1141,19 +1078,12 @@ def _workflow_file_contents( } client = _client_for_org(ctx, repo.org_login) - try: - for page in client.paginate( - f"/repos/{repo.full_name}/actions/workflows", params={"per_page": 100} - ): - for workflow in page: - if workflow.get("state") == "active": - yield _workflow_file_contents(client, repo, workflow) - except Exception as e: - logger.error( - f"Error in resource 'workflows' processing repository '{repo.full_name}': {e}", - extra={"resource": "workflows", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{repo.full_name}/actions/workflows", params={"per_page": 100} + ): + for workflow in page: + if workflow.get("state") == "active": + yield _workflow_file_contents(client, repo, workflow) @app.transformer(name="workflow_jobs", columns=WorkflowJob, parallelized=True) @@ -1184,26 +1114,19 @@ def environments(repo: Repository, ctx: SourceContext): repo_name = repo.name repo_node_id = repo.node_id client = _client_for_org(ctx, repo.org_login) - try: - for page in client.paginate( - f"/repos/{full_name}/environments", - params={"per_page": 100}, - data_selector="environments", - ): - for env in page: - yield { - **env, - "org_login": repo.org_login, - "repository_name": repo_name, - "repository_full_name": full_name, - "repository_node_id": repo_node_id, - } - except Exception as e: - logger.error( - f"Error in resource 'environments' processing repository '{repo.full_name}': {e}", - extra={"resource": "environments", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{full_name}/environments", + params={"per_page": 100}, + data_selector="environments", + ): + for env in page: + yield { + **env, + "org_login": repo.org_login, + "repository_name": repo_name, + "repository_full_name": full_name, + "repository_node_id": repo_node_id, + } @app.resource(name="runner_groups", columns=RunnerGroup, parallelized=True) @@ -1271,26 +1194,19 @@ def org_runner_group_memberships( accessible_repo_node_ids = _selected_runner_group_repo_node_ids( group_row, client, org_name ) - try: - for runner_page in client.paginate( - f"/orgs/{org_name}/actions/runner-groups/{group.id}/runners", - params={"per_page": 100}, - data_selector="runners", - ): - for runner in runner_page: - yield { - "runner_group_id": group.id, - "runner_id": runner["id"], - "runner_group_visibility": group.visibility, - "accessible_repo_node_ids": accessible_repo_node_ids, - "org_login": org_name, - } - except Exception as e: - logger.error( - f"Error in resource 'org_runner_group_memberships' processing runner group '{group.id}': {e}", - extra={"resource": "org_runner_group_memberships", "phase": "resource_iteration"}, - ) - return + for runner_page in client.paginate( + f"/orgs/{org_name}/actions/runner-groups/{group.id}/runners", + params={"per_page": 100}, + data_selector="runners", + ): + for runner in runner_page: + yield { + "runner_group_id": group.id, + "runner_id": runner["id"], + "runner_group_visibility": group.visibility, + "accessible_repo_node_ids": accessible_repo_node_ids, + "org_login": org_name, + } @app.transformer(name="repo_runners", columns=RepoRunner, parallelized=True) @@ -1298,26 +1214,19 @@ def repo_runners(repo: Repository, ctx: SourceContext): if not repo.self_hosted_runners_enabled: return client = _client_for_org(ctx, repo.org_login) - try: - for page in client.paginate( - f"/repos/{repo.full_name}/actions/runners", - params={"per_page": 100}, - data_selector="runners", - ): - for runner in page: - yield { - **runner, - "repository_name": repo.name, - "repository_node_id": repo.node_id, - "repository_full_name": repo.full_name, - "org_login": repo.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'repo_runners' processing repository '{repo.full_name}': {e}", - extra={"resource": "repo_runners", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{repo.full_name}/actions/runners", + params={"per_page": 100}, + data_selector="runners", + ): + for runner in page: + yield { + **runner, + "repository_name": repo.name, + "repository_node_id": repo.node_id, + "repository_full_name": repo.full_name, + "org_login": repo.org_login, + } @app.transformer( @@ -1341,25 +1250,18 @@ def environment_variables(environment: Environment, ctx: SourceContext): repo_node_id = environment.repository_node_id client = _client_for_org(ctx, environment.org_login) - try: - for page in client.paginate( - f"/repos/{full_repo_name}/environments/{env_name}/variables" - ): - for item in page: - yield { - **item, - "org_login": environment.org_login, - "environment_node_id": env_node_id, - "environment_name": env_name, - "repository_name": repo_name, - "repository_node_id": repo_node_id, - } - except Exception as e: - logger.error( - f"Error in resource 'environment_variables' processing environment '{env_name}': {e}", - extra={"resource": "environment_variables", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{full_repo_name}/environments/{env_name}/variables" + ): + for item in page: + yield { + **item, + "org_login": environment.org_login, + "environment_node_id": env_node_id, + "environment_name": env_name, + "repository_name": repo_name, + "repository_node_id": repo_node_id, + } @app.transformer( @@ -1387,25 +1289,18 @@ def environment_branch_policies(environment: Environment, ctx: SourceContext): env_name = environment.name env_node_id = environment.node_id client = _client_for_org(ctx, environment.org_login) - try: - for page in client.paginate( - f"/repos/{full_repo_name}/environments/{env_name}/deployment-branch-policies" - ): - for policy in page: - yield { - **policy, - "environment_node_id": env_node_id, - "environment_name": env_name, - "repository_name": repo_name, - "repository_node_id": repo_node_id, - "org_login": environment.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'environment_branch_policies' processing environment '{env_name}': {e}", - extra={"resource": "environment_branch_policies", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{full_repo_name}/environments/{env_name}/deployment-branch-policies" + ): + for policy in page: + yield { + **policy, + "environment_node_id": env_node_id, + "environment_name": env_name, + "repository_name": repo_name, + "repository_node_id": repo_node_id, + "org_login": environment.org_login, + } @app.transformer( @@ -1428,25 +1323,18 @@ def environment_secrets(environment: Environment, ctx: SourceContext): env_name = environment.name env_node_id = environment.node_id client = _client_for_org(ctx, environment.org_login) - try: - for page in client.paginate( - f"/repos/{full_repo_name}/environments/{env_name}/secrets" - ): - for secret in page: - yield { - **secret, - "org_login": environment.org_login, - "repository_name": repo_name, - "repository_node_id": repo_node_id, - "environment_name": env_name, - "environment_node_id": env_node_id, - } - except Exception as e: - logger.error( - f"Error in resource 'environment_secrets' processing environment '{env_name}': {e}", - extra={"resource": "environment_secrets", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{full_repo_name}/environments/{env_name}/secrets" + ): + for secret in page: + yield { + **secret, + "org_login": environment.org_login, + "repository_name": repo_name, + "repository_node_id": repo_node_id, + "environment_name": env_name, + "environment_node_id": env_node_id, + } @app.resource(name="organization_secrets", columns=OrgSecret, parallelized=True) @@ -1512,24 +1400,17 @@ def selected_organization_secrets(secret: OrgSecret, ctx: SourceContext): """ if secret.visibility == "selected": client = _client_for_org(ctx, secret.org_login) - try: - for page in client.paginate( - f"/orgs/{secret.org_login}/actions/secrets/{secret.name}/repositories", - params={"per_page": 100}, - ): - for repo in page: - yield { - "name": secret.name, - "repository_full_name": repo["full_name"], - "repository_node_id": repo["node_id"], - "org_login": secret.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'selected_organization_secrets' processing secret '{secret.name}': {e}", - extra={"resource": "selected_organization_secrets", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/orgs/{secret.org_login}/actions/secrets/{secret.name}/repositories", + params={"per_page": 100}, + ): + for repo in page: + yield { + "name": secret.name, + "repository_full_name": repo["full_name"], + "repository_node_id": repo["node_id"], + "org_login": secret.org_login, + } @app.resource(name="organization_variables", columns=OrgVariable, parallelized=True) @@ -1581,23 +1462,16 @@ def selected_organization_variables(variable: OrgVariable, ctx: SourceContext): """ if variable.visibility == "selected": client = _client_for_org(ctx, variable.org_login) - try: - for page in client.paginate( - f"/orgs/{variable.org_login}/actions/variables/{variable.name}/repositories", - params={"per_page": 100}, - ): - for repo in page: - yield { - "name": variable.name, - "repository_node_id": repo["node_id"], - "org_login": variable.org_login, - } - except Exception as e: - logger.error( - f"Error in resource 'selected_organization_variables' processing variable '{variable.name}': {e}", - extra={"resource": "selected_organization_variables", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/orgs/{variable.org_login}/actions/variables/{variable.name}/repositories", + params={"per_page": 100}, + ): + for repo in page: + yield { + "name": variable.name, + "repository_node_id": repo["node_id"], + "org_login": variable.org_login, + } @app.transformer(name="repository_secrets", columns=RepoSecret, parallelized=True) @@ -1612,23 +1486,16 @@ def repository_secrets(repo: Repository, ctx: SourceContext): RepoSecret (RepoSecret): Repository secret record. """ client = _client_for_org(ctx, repo.org_login) - try: - for page in client.paginate( - f"/repos/{repo.full_name}/actions/secrets", params={"per_page": 100} - ): - for secret in page: - yield { - **secret, - "org_login": repo.org_login, - "repository_name": repo.full_name, - "repository_node_id": repo.node_id, - } - except Exception as e: - logger.error( - f"Error in resource 'repository_secrets' processing repository '{repo.full_name}': {e}", - extra={"resource": "repository_secrets", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{repo.full_name}/actions/secrets", params={"per_page": 100} + ): + for secret in page: + yield { + **secret, + "org_login": repo.org_login, + "repository_name": repo.full_name, + "repository_node_id": repo.node_id, + } @app.transformer(name="repository_variables", columns=RepoVariable, parallelized=True) @@ -1646,23 +1513,16 @@ def repository_variables(repo: Repository, ctx: SourceContext): RepoVariable (RepoVariable): Repository variable record. """ client = _client_for_org(ctx, repo.org_login) - try: - for page in client.paginate( - f"/repos/{repo.full_name}/actions/variables", params={"per_page": 100} - ): - for variable in page: - yield { - **variable, - "org_login": repo.org_login, - "repository_name": repo.full_name, - "repository_node_id": repo.node_id, - } - except Exception as e: - logger.error( - f"Error in resource 'repository_variables' processing repository '{repo.full_name}': {e}", - extra={"resource": "repository_variables", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/repos/{repo.full_name}/actions/variables", params={"per_page": 100} + ): + for variable in page: + yield { + **variable, + "org_login": repo.org_login, + "repository_name": repo.full_name, + "repository_node_id": repo.node_id, + } @app.resource( @@ -1774,19 +1634,12 @@ def pat_repo_access(pat: PersonalAccessToken, ctx: SourceContext): return client = _client_for_org(ctx, pat.org_login) - try: - for page in client.paginate( - f"/orgs/{pat.org_login}/personal-access-tokens/{pat.id}/repositories", - params={"per_page": 100}, - ): - for item in page: - yield {"pat_id": pat.id, **item, "org_login": pat.org_login} - except Exception as e: - logger.error( - f"Error in resource 'pat_repo_access' processing PAT '{pat.id}': {e}", - extra={"resource": "pat_repo_access", "phase": "resource_iteration"}, - ) - return + for page in client.paginate( + f"/orgs/{pat.org_login}/personal-access-tokens/{pat.id}/repositories", + params={"per_page": 100}, + ): + for item in page: + yield {"pat_id": pat.id, **item, "org_login": pat.org_login} @app.resource( diff --git a/tests/test_error_resilience.py b/tests/test_error_resilience.py index 9589e14..5f59289 100644 --- a/tests/test_error_resilience.py +++ b/tests/test_error_resilience.py @@ -13,7 +13,6 @@ from openhound_github.resources.organization import ( OrgContext, SourceContext, - applications, organizations, users, ) @@ -147,43 +146,3 @@ def test_graphql_resource_continues_after_org_failure(caplog) -> None: "Error in resource 'users' processing organization 'org2'" in msg for msg in caplog.messages ), "Expected an error log for org2" - - -def test_applications_transformer_handles_api_error(caplog) -> None: - """A failing API call in the applications transformer is caught and logged. - - The transformer should log the error and yield nothing rather than - propagating the exception, so that the DLT pipeline can continue with - the next AppInstallation. - - Note: the raw generator is called directly (via ``_pipe.gen``) because - DLT transformers use a different calling convention when invoked via - their decorated interface — the first argument is reserved for upstream - pipeline items, not direct calls. - """ - failing_client = MagicMock() - failing_client.get.side_effect = ConnectionError("HTTP 503: Service Unavailable") - - ctx = SourceContext( - client=failing_client, - organizations=[OrgContext(client=failing_client, org_name="org1")], - ) - - # Use a MagicMock to stand in for AppInstallation — the transformer - # only accesses .id, .app_slug, and .org_login from the install object. - install = MagicMock() - install.id = 42 - install.app_slug = "acme-bot" - install.org_login = "org1" - - # Call the raw generator function to bypass DLT pipeline machinery. - # Same parallelized=True wrapping issue as users — unwrap to get the original function. - with caplog.at_level(logging.ERROR, logger="openhound_github.resources.organization"): - result = list(inspect.unwrap(applications._pipe.gen)(install, ctx)) - - assert result == [], "Transformer should yield nothing when the API call fails" - - assert any( - "Error in resource 'applications' processing app 'acme-bot'" in msg - for msg in caplog.messages - ), "Expected an error log for the failing app fetch"