|
| 1 | +# |
| 2 | +# |
| 3 | +# |
| 4 | +# This program is free software; you can redistribute it and/or modify |
| 5 | +# it under the terms of the GNU General Public License as published by |
| 6 | +# the Free Software Foundation; either version 3 of the License, or |
| 7 | +# (at your option) any later version. |
| 8 | +# |
| 9 | +# This program is distributed in the hope that it will be useful, |
| 10 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | +# GNU General Public License for more details. |
| 13 | +# |
| 14 | +# You should have received a copy of the GNU General Public License |
| 15 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 16 | +# |
| 17 | + |
| 18 | + |
| 19 | +import logging |
| 20 | + |
| 21 | +from .secrets_manager_factory import SecretsManagerFactory |
| 22 | + |
| 23 | +logger = logging.getLogger(__name__) |
| 24 | + |
| 25 | +SUPPORTED_MANAGERS = ("bitwarden", "hashicorp") |
| 26 | + |
| 27 | + |
| 28 | +def _extract_bitwarden_field(item, field_name): |
| 29 | + """Extract a field value from a Bitwarden item. |
| 30 | +
|
| 31 | + Searches in the 'login' dict first, then in the 'fields' array. |
| 32 | +
|
| 33 | + :param dict item: The Bitwarden item dictionary |
| 34 | + :param str field_name: The name of the field to extract |
| 35 | +
|
| 36 | + :returns: The field value or None if not found |
| 37 | + :rtype: str or None |
| 38 | + """ |
| 39 | + # Check login fields first (username, password, etc.) |
| 40 | + login_data = item.get('login', {}) |
| 41 | + if login_data and field_name in login_data: |
| 42 | + return login_data[field_name] |
| 43 | + |
| 44 | + # Check custom fields array |
| 45 | + for field in item.get('fields', []): |
| 46 | + if field.get('name') == field_name: |
| 47 | + return field.get('value') |
| 48 | + |
| 49 | + return None |
| 50 | + |
| 51 | + |
| 52 | +def _extract_hashicorp_field(secret, field_name): |
| 53 | + """Extract a field value from a HashiCorp Vault secret. |
| 54 | +
|
| 55 | + Reads from secret['data']['data'][field_name]. |
| 56 | +
|
| 57 | + :param dict secret: The HashiCorp Vault secret dictionary |
| 58 | + :param str field_name: The name of the field to extract |
| 59 | +
|
| 60 | + :returns: The field value or None if not found |
| 61 | + :rtype: str or None |
| 62 | + """ |
| 63 | + secret_data = secret.get('data', {}).get('data', {}) |
| 64 | + return secret_data.get(field_name) |
| 65 | + |
| 66 | + |
| 67 | +def resolve_credentials( |
| 68 | + manager_type: str, |
| 69 | + manager_config: dict, |
| 70 | + item_name: str, |
| 71 | + field_names: list[str], |
| 72 | +) -> dict[str, str]: |
| 73 | + """Resolve credentials from a secrets manager. |
| 74 | +
|
| 75 | + Fetches a secret item from the specified secrets manager and extracts |
| 76 | + the requested fields. Field names in the vault must match the parameter |
| 77 | + names expected by Perceval (e.g. 'api_token', 'user', 'password'). |
| 78 | +
|
| 79 | + :param str manager_type: The secrets manager to use |
| 80 | + ('bitwarden' or 'hashicorp') |
| 81 | + :param dict manager_config: Manager-specific authentication config. |
| 82 | + Bitwarden: {'client_id', 'client_secret', 'master_password'} |
| 83 | + HashiCorp: {'vault_url', 'token', 'certificate'} |
| 84 | + :param str item_name: Name/path of the secret item in the vault |
| 85 | + :param list field_names: List of field names to look up in the vault. |
| 86 | + Example: ['api_token', 'user', 'password'] |
| 87 | +
|
| 88 | + :returns: Dict mapping field names to resolved string values. |
| 89 | + Only fields that were found are included. |
| 90 | + :rtype: dict[str, str] |
| 91 | +
|
| 92 | + :raises ValueError: If manager_type is unsupported or item_name is empty |
| 93 | + :raises CredentialNotFoundError: If the secret item is not found |
| 94 | + :raises CredentialManagerError: If manager authentication fails |
| 95 | + """ |
| 96 | + if manager_type not in SUPPORTED_MANAGERS: |
| 97 | + raise ValueError( |
| 98 | + f"Unsupported secrets manager: '{manager_type}'. " |
| 99 | + f"Supported: {', '.join(SUPPORTED_MANAGERS)}" |
| 100 | + ) |
| 101 | + |
| 102 | + if not item_name: |
| 103 | + raise ValueError("item_name must be non-empty") |
| 104 | + |
| 105 | + if not field_names: |
| 106 | + return {} |
| 107 | + |
| 108 | + result = {} |
| 109 | + |
| 110 | + if manager_type == 'bitwarden': |
| 111 | + manager = SecretsManagerFactory.get_bitwarden_manager( |
| 112 | + manager_config['client_id'], |
| 113 | + manager_config['client_secret'], |
| 114 | + manager_config['master_password'], |
| 115 | + ) |
| 116 | + manager.login() |
| 117 | + try: |
| 118 | + item = manager.get_secret(item_name) |
| 119 | + for field_name in field_names: |
| 120 | + value = _extract_bitwarden_field(item, field_name) |
| 121 | + if value is None: |
| 122 | + logger.warning( |
| 123 | + "Field '%s' not found in Bitwarden item '%s'", |
| 124 | + field_name, item_name, |
| 125 | + ) |
| 126 | + continue |
| 127 | + result[field_name] = value |
| 128 | + finally: |
| 129 | + manager.logout() |
| 130 | + |
| 131 | + elif manager_type == 'hashicorp': |
| 132 | + manager = SecretsManagerFactory.get_hashicorp_manager( |
| 133 | + manager_config['vault_url'], |
| 134 | + manager_config['token'], |
| 135 | + manager_config.get('certificate'), |
| 136 | + ) |
| 137 | + secret = manager.get_secret(item_name) |
| 138 | + for field_name in field_names: |
| 139 | + value = _extract_hashicorp_field(secret, field_name) |
| 140 | + if value is None: |
| 141 | + logger.warning( |
| 142 | + "Field '%s' not found in HashiCorp secret '%s'", |
| 143 | + field_name, item_name, |
| 144 | + ) |
| 145 | + continue |
| 146 | + result[field_name] = value |
| 147 | + |
| 148 | + return result |
0 commit comments