diff --git a/README.md b/README.md
index c0f142b..8f389c9 100644
--- a/README.md
+++ b/README.md
@@ -172,6 +172,15 @@ _NOTE: the parameter "item_name" corresponds with the field "name" of the json.
- collectionIds: Array of collection IDs this item belongs to
- attachments: Array of file attachments
+#### Session isolation
+
+Each `BitwardenManager` instance creates its own temporary directory and
+sets the `BITWARDENCLI_APPDATA_DIR` environment variable to point to it.
+This means multiple instances can run in parallel without sharing login
+state or session keys. The `HOME` variable is also forwarded so the CLI
+can resolve paths correctly. The temporary directory is cleaned up
+automatically when `logout()` is called.
+
The module uses the [Bitwarden CLI](https://bitwarden.com/help/cli/) to interact with Bitwarden.
### HashiCorp Vault
@@ -259,6 +268,64 @@ Field Descriptions
The module uses the [hvac](https://hvac.readthedocs.io/) Python library to interact with HashiCorp Vault.
+### `resolve_credentials()` — Unified credential resolution
+
+The `CredentialManager` ABC provides a `resolve_credentials()` method that
+orchestrates login, secret fetching, field extraction, and logout in a single
+call. It is available on any credential manager instance (`BitwardenManager`,
+`HashicorpManager`).
+
+This method is designed to work independently of any CLI framework, making
+it usable from Perceval's `BackendCommand`, KingArthur, or any custom script.
+
+#### Example
+
+```python
+from grimoirelab_toolkit.credential_manager import BitwardenManager
+from grimoirelab_toolkit.credential_manager.hc_manager import HashicorpManager
+
+# Bitwarden example
+bw_manager = BitwardenManager("your-client-id", "your-client-secret", "your-master-password")
+credentials = bw_manager.resolve_credentials("GitHub", ["api_token", "user"])
+# credentials = {'api_token': 'ghp_...', 'user': 'myuser'}
+
+# HashiCorp example
+hc_manager = HashicorpManager("https://vault.example.com", "hvs.your-token")
+credentials = hc_manager.resolve_credentials("secret/my-service", ["api_token"])
+```
+
+#### Parameters
+
+- `item_name` (`str`) — Name/path of the secret item in the vault
+- `field_names` (`list[str]`) — List of field names to look up in the vault. Field names must match the names used when storing the secret.
+
+#### Return value
+
+A `dict[str, str]` mapping field names to resolved string values.
+Only fields that were found are included in the result. Missing fields
+produce a warning log and are omitted (partial results are valid).
+
+#### Field extraction behavior
+
+Each manager returns secrets in a different format. `resolve_credentials()`
+normalizes the extraction via each manager's `extract_field()` implementation:
+
+- **Bitwarden**: Checks `item['login']` dict first (for username, password),
+ then searches the `item['fields']` array (for custom fields like API tokens).
+- **HashiCorp**: Reads from `secret['data']['data'][field_name]`.
+
+#### Error handling
+
+- Empty `item_name` raises `ValueError`
+- Secret item not found raises `CredentialNotFoundError`
+- Individual missing fields are skipped with a warning (no error)
+- `logout()` is always called in a `finally` block
+
+### Optional dependencies
+
+- **Bitwarden**: requires `bw` CLI on `PATH` (no Python package needed)
+- **HashiCorp**: requires `hvac` (`poetry install --with hashicorp-manager`)
+
## License
Licensed under GNU General Public License (GPL), version 3 or later.
diff --git a/grimoirelab_toolkit/credential_manager/__init__.py b/grimoirelab_toolkit/credential_manager/__init__.py
index c0ff6b2..fe88a4c 100644
--- a/grimoirelab_toolkit/credential_manager/__init__.py
+++ b/grimoirelab_toolkit/credential_manager/__init__.py
@@ -15,22 +15,24 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
-# Author:
-# Alberto Ferrer Sánchez (alberefe@gmail.com)
-#
+
from .bw_manager import BitwardenManager
+from .credential_manager import CredentialManager
from .exceptions import (
CredentialManagerError,
InvalidCredentialsError,
CredentialNotFoundError,
BitwardenCLIError,
+ HashicorpVaultError,
)
__all__ = [
+ "CredentialManager",
"BitwardenManager",
"CredentialManagerError",
"InvalidCredentialsError",
"CredentialNotFoundError",
"BitwardenCLIError",
+ "HashicorpVaultError",
]
diff --git a/grimoirelab_toolkit/credential_manager/bw_manager.py b/grimoirelab_toolkit/credential_manager/bw_manager.py
index 1c07b43..b8a94af 100644
--- a/grimoirelab_toolkit/credential_manager/bw_manager.py
+++ b/grimoirelab_toolkit/credential_manager/bw_manager.py
@@ -1,5 +1,6 @@
+# -*- coding: utf-8 -*-
#
-#
+# Copyright (C) Grimoirelab Contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,14 +15,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
-# Author:
-# Alberto Ferrer Sánchez (alberefe@gmail.com)
-#
+
+
import json
+import os
import subprocess
import logging
import shutil
+import tempfile
+from .credential_manager import CredentialManager
from .exceptions import (
BitwardenCLIError,
InvalidCredentialsError,
@@ -31,7 +34,7 @@
logger = logging.getLogger(__name__)
-class BitwardenManager:
+class BitwardenManager(CredentialManager):
"""Retrieve credentials from Bitwarden.
This class defines functions to log in, retrieve secrets
@@ -48,6 +51,13 @@ class BitwardenManager:
master_password given as arguments when creating the instance,
so the object is reusable along the program.
+ Each instance creates an isolated temporary directory used as
+ ``BITWARDENCLI_APPDATA_DIR``, so multiple instances can run in
+ parallel without sharing session state. The ``HOME`` environment
+ variable is also forwarded so that the Bitwarden CLI can locate
+ its configuration. The temporary directory is cleaned up
+ automatically when manager.logout is called.
+
The path of Bitwarden CLI (bw) is retrieved using shutil.
"""
@@ -72,11 +82,15 @@ def __init__(self, client_id: str, client_secret: str, master_password: str):
if not self.bw_path:
raise BitwardenCLIError("Bitwarden CLI (bw) not found in PATH")
- # Set up environment variables for consistent execution context
+ # Each instance gets its own appdata dir so parallel sessions
+ # don't share state and bw can find its config directory.
+ self._appdata_dir = tempfile.mkdtemp(prefix="bw_session_")
self.env = {
+ "HOME": os.path.expanduser("~"),
"LANG": "C",
"BW_CLIENTID": client_id,
"BW_CLIENTSECRET": client_secret,
+ "BITWARDENCLI_APPDATA_DIR": self._appdata_dir,
}
def login(self) -> str | None:
@@ -190,7 +204,9 @@ def get_secret(self, item_name: str) -> dict:
def logout(self) -> None:
"""Log out from Bitwarden and invalidate the session.
- This method ends the current session and clears the session key.
+ This method ends the current session, clears the session key,
+ and removes the temporary appdata directory created during
+ initialization to avoid leaving stale session data on disk.
"""
logger.info("Logging out from Bitwarden")
@@ -209,4 +225,28 @@ def logout(self) -> None:
# Clear session key for security
self.session_key = None
+ # Remove the temporary appdata directory
+ shutil.rmtree(self._appdata_dir, ignore_errors=True)
+
logger.info("Successfully logged out from Bitwarden")
+
+ def extract_field(self, secret: dict, field_name: str) -> str | None:
+ """Extract a field value from a Bitwarden item.
+
+ Searches in the 'login' dict first, then in the 'fields' array.
+
+ :param dict secret: The Bitwarden item dictionary
+ :param str field_name: The name of the field to extract
+
+ :returns: The field value or None if not found
+ :rtype: str or None
+ """
+ login_data = secret.get('login', {})
+ if login_data and field_name in login_data:
+ return login_data[field_name]
+
+ for field in secret.get('fields', []):
+ if field.get('name') == field_name:
+ return field.get('value')
+
+ return None
diff --git a/grimoirelab_toolkit/credential_manager/credential_manager.py b/grimoirelab_toolkit/credential_manager/credential_manager.py
new file mode 100644
index 0000000..b094a37
--- /dev/null
+++ b/grimoirelab_toolkit/credential_manager/credential_manager.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) Grimoirelab Contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+
+
+import logging
+
+from abc import ABC, abstractmethod
+
+logger = logging.getLogger(__name__)
+
+
+class CredentialManager(ABC):
+ """Abstract base class for credential managers.
+
+ Defines the interface that all credential manager implementations
+ must follow. Provides a concrete `resolve_credentials` method
+ (Template Method) that orchestrates the shared workflow of
+ logging in, fetching a secret, extracting fields, and logging out.
+
+ Subclasses must implement `get_secret` and `extract_field`.
+ Subclasses that require authentication (e.g. Bitwarden) should
+ override `login` and `logout`.
+ """
+
+ @abstractmethod
+ def get_secret(self, item_name: str) -> dict:
+ """Retrieve a secret item from the vault.
+
+ :param str item_name: The name/path of the secret item
+
+ :returns: Dictionary containing the secret data
+ :rtype: dict
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ def extract_field(self, secret: dict, field_name: str) -> str | None:
+ """Extract a single field value from a secret item.
+
+ :param dict secret: The secret dictionary returned by `get_secret`
+ :param str field_name: The name of the field to extract
+
+ :returns: The field value, or None if not found
+ :rtype: str or None
+ """
+ raise NotImplementedError
+
+ def login(self):
+ """Log into the secrets manager. No-op by default."""
+ pass
+
+ def logout(self):
+ """Log out from the secrets manager. No-op by default."""
+ pass
+
+ def resolve_credentials(
+ self,
+ item_name: str,
+ field_names: list[str],
+ ) -> dict[str, str]:
+ """Resolve credentials from the secrets manager.
+
+ Fetches a secret item and extracts the requested fields.
+ Handles login/logout lifecycle automatically.
+
+ :param str item_name: Name/path of the secret item in the vault
+ :param list field_names: List of field names to extract
+
+ :returns: Dict mapping field names to their resolved values.
+ Only fields that were found are included.
+ :rtype: dict[str, str]
+
+ :raises ValueError: If item_name is empty
+ """
+ if not item_name:
+ raise ValueError("Missing item name")
+
+ if not field_names:
+ return {}
+
+ self.login()
+ try:
+ secret = self.get_secret(item_name)
+ result = {}
+ for field_name in field_names:
+ value = self.extract_field(secret, field_name)
+ if value is None:
+ logger.warning(
+ "Field '%s' not found in secret '%s'",
+ field_name, item_name,
+ )
+ continue
+ result[field_name] = value
+ finally:
+ self.logout()
+
+ return result
diff --git a/grimoirelab_toolkit/credential_manager/exceptions.py b/grimoirelab_toolkit/credential_manager/exceptions.py
index 9ba64cf..c26819c 100644
--- a/grimoirelab_toolkit/credential_manager/exceptions.py
+++ b/grimoirelab_toolkit/credential_manager/exceptions.py
@@ -15,9 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
-# Author:
-# Alberto Ferrer Sánchez (alberefe@gmail.com)
-#
+
"""Custom exceptions for the credential manager module."""
diff --git a/grimoirelab_toolkit/credential_manager/hc_manager.py b/grimoirelab_toolkit/credential_manager/hc_manager.py
index f78c63d..f8d3a22 100644
--- a/grimoirelab_toolkit/credential_manager/hc_manager.py
+++ b/grimoirelab_toolkit/credential_manager/hc_manager.py
@@ -21,12 +21,13 @@
import hvac
import hvac.exceptions
+from .credential_manager import CredentialManager
from .exceptions import HashicorpVaultError, CredentialNotFoundError
logger = logging.getLogger(__name__)
-class HashicorpManager:
+class HashicorpManager(CredentialManager):
"""Retrieve credentials from HashiCorp Vault.
This class defines functions to initialize a client and retrieve
@@ -54,16 +55,10 @@ def __init__(self, vault_url: str, token: str, certificate: str | bool = None):
verification should be performed, a string pointing at the CA bundle to use for
verification
- :raises ConnectionError: If connection issues occur
"""
- try:
- logger.debug("Creating Vault client")
- # Initialize client with URL, token, and certificate verification setting
- self.client = hvac.Client(url=vault_url, token=token, verify=certificate)
- logger.debug("Vault client initialized successfully")
- except Exception as e:
- logger.error("An error occurred initializing the client: %s", str(e))
- raise e
+ logger.debug("Creating Vault client")
+ self.client = hvac.Client(url=vault_url, token=token, verify=certificate)
+ logger.debug("Vault client initialized successfully")
def get_secret(self, item_name: str) -> dict:
"""Retrieve an item from the HashiCorp Vault.
@@ -85,7 +80,7 @@ def get_secret(self, item_name: str) -> dict:
:raises HashicorpVaultError: If Vault operations fail
"""
try:
- logger.info("Retrieving credentials from vault: %s", item_name)
+ logger.debug("Retrieving credentials from vault: %s", item_name)
# Read secret from KV secrets engine
secret = self.client.secrets.kv.read_secret(path=item_name)
return secret
@@ -97,3 +92,17 @@ def get_secret(self, item_name: str) -> dict:
except Exception as e:
logger.error("Error retrieving the secret: %s", str(e))
raise HashicorpVaultError(f"Vault operation failed: {e}")
+
+ def extract_field(self, secret: dict, field_name: str) -> str | None:
+ """Extract a field value from a HashiCorp Vault secret.
+
+ Reads from secret['data']['data'][field_name].
+
+ :param dict secret: The HashiCorp Vault secret dictionary
+ :param str field_name: The name of the field to extract
+
+ :returns: The field value or None if not found
+ :rtype: str or None
+ """
+ secret_data = secret.get('data', {}).get('data', {})
+ return secret_data.get(field_name)
diff --git a/releases/unreleased/standalone-credential-resolution.yaml b/releases/unreleased/standalone-credential-resolution.yaml
new file mode 100644
index 0000000..e62eb20
--- /dev/null
+++ b/releases/unreleased/standalone-credential-resolution.yaml
@@ -0,0 +1,7 @@
+title: 'Add standalone credential resolution function'
+category: added
+author: Alberto Ferrer Sánchez
+issue:
+notes: >
+ Add resolve_credentials() function that allows resolving credentials
+ from configuration without requiring the full GrimoireLab framework.
\ No newline at end of file
diff --git a/tests/test_hc_manager.py b/tests/test_hc_manager.py
index 135d03a..db3116f 100644
--- a/tests/test_hc_manager.py
+++ b/tests/test_hc_manager.py
@@ -115,7 +115,7 @@ def test_get_secret_not_found(self, mock_hvac_client):
manager = HashicorpManager(self.vault_url, self.token, self.certificate)
with self.assertRaises(CredentialNotFoundError) as context:
- manager.get_secret("nonexistent_service")
+ manager.get_secret("nonexistent_serviceChange get_secret logging from INFO to DEBUG")
self.assertIn("nonexistent_service", str(context.exception))
self.assertIn("not found", str(context.exception))
@@ -150,6 +150,33 @@ def test_vault_connection_error(self, mock_hvac_client):
self.assertIn("Vault operation failed", str(context.exception))
+ @patch("hvac.Client")
+ def test_extract_field_success(self, mock_hvac_client):
+ """Test extracting an existing field from a secret."""
+
+ manager = HashicorpManager(self.vault_url, self.token, self.certificate)
+ result = manager.extract_field(self.mock_secret_response, "username")
+
+ self.assertEqual(result, "test_user")
+
+ @patch("hvac.Client")
+ def test_extract_field_not_found(self, mock_hvac_client):
+ """Test extracting a non-existent field returns None."""
+
+ manager = HashicorpManager(self.vault_url, self.token, self.certificate)
+ result = manager.extract_field(self.mock_secret_response, "nonexistent")
+
+ self.assertIsNone(result)
+
+ @patch("hvac.Client")
+ def test_extract_field_empty_secret(self, mock_hvac_client):
+ """Test extracting a field from an empty secret returns None."""
+
+ manager = HashicorpManager(self.vault_url, self.token, self.certificate)
+ result = manager.extract_field({}, "username")
+
+ self.assertIsNone(result)
+
if __name__ == "__main__":
unittest.main(warnings="ignore")
diff --git a/tests/test_resolve_credentials.py b/tests/test_resolve_credentials.py
new file mode 100644
index 0000000..0e7e735
--- /dev/null
+++ b/tests/test_resolve_credentials.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) Grimoirelab Contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+
+"""Tests for resolve_credentials and extract_field methods."""
+
+import sys
+import unittest
+from unittest.mock import MagicMock, patch
+
+# Mock hvac before importing anything from credential_manager,
+# since the import chain may fail in environments where hvac is not installed.
+sys.modules.setdefault('hvac', MagicMock())
+sys.modules.setdefault('hvac.exceptions', MagicMock())
+
+from grimoirelab_toolkit.credential_manager.bw_manager import BitwardenManager
+from grimoirelab_toolkit.credential_manager.hc_manager import HashicorpManager
+from grimoirelab_toolkit.credential_manager.credential_manager import CredentialManager
+from grimoirelab_toolkit.credential_manager.exceptions import CredentialNotFoundError
+
+
+class TestBitwardenExtractField(unittest.TestCase):
+ """Tests for BitwardenManager.extract_field."""
+
+ def setUp(self):
+ with patch.object(BitwardenManager, '__init__', lambda self, *a, **kw: None):
+ self.manager = BitwardenManager()
+
+ def test_extract_login_field(self):
+ """Test extracting a field from the login section."""
+ item = {
+ 'login': {'username': 'myuser', 'password': 'mypass'},
+ 'fields': [],
+ }
+ self.assertEqual(self.manager.extract_field(item, 'username'), 'myuser')
+ self.assertEqual(self.manager.extract_field(item, 'password'), 'mypass')
+
+ def test_extract_custom_field(self):
+ """Test extracting a custom field from the fields array."""
+ item = {
+ 'login': {},
+ 'fields': [
+ {'name': 'api_key', 'value': 'tok123'},
+ {'name': 'other', 'value': 'val'},
+ ],
+ }
+ self.assertEqual(self.manager.extract_field(item, 'api_key'), 'tok123')
+
+ def test_login_takes_priority_over_custom_field(self):
+ """Test that login fields are checked before custom fields."""
+ item = {
+ 'login': {'token': 'login_token'},
+ 'fields': [{'name': 'token', 'value': 'custom_token'}],
+ }
+ self.assertEqual(self.manager.extract_field(item, 'token'), 'login_token')
+
+ def test_field_not_found(self):
+ """Test that None is returned for missing fields."""
+ item = {'login': {}, 'fields': []}
+ self.assertIsNone(self.manager.extract_field(item, 'nonexistent'))
+
+ def test_missing_login_key(self):
+ """Test extraction when 'login' key is missing from item."""
+ item = {'fields': [{'name': 'token', 'value': 'val'}]}
+ self.assertEqual(self.manager.extract_field(item, 'token'), 'val')
+
+ def test_missing_fields_key(self):
+ """Test extraction when 'fields' key is missing from item."""
+ item = {'login': {'username': 'user'}}
+ self.assertEqual(self.manager.extract_field(item, 'username'), 'user')
+ self.assertIsNone(self.manager.extract_field(item, 'nonexistent'))
+
+
+class TestHashicorpExtractField(unittest.TestCase):
+ """Tests for HashicorpManager.extract_field."""
+
+ def setUp(self):
+ with patch.object(HashicorpManager, '__init__', lambda self, *a, **kw: None):
+ self.manager = HashicorpManager()
+
+ def test_extract_field(self):
+ """Test extracting a field from HashiCorp secret structure."""
+ secret = {'data': {'data': {'api_key': 'hc_token', 'user': 'admin'}}}
+ self.assertEqual(self.manager.extract_field(secret, 'api_key'), 'hc_token')
+ self.assertEqual(self.manager.extract_field(secret, 'user'), 'admin')
+
+ def test_field_not_found(self):
+ """Test that None is returned for missing fields."""
+ secret = {'data': {'data': {'api_key': 'tok'}}}
+ self.assertIsNone(self.manager.extract_field(secret, 'nonexistent'))
+
+ def test_missing_data_structure(self):
+ """Test graceful handling of missing nested data keys."""
+ self.assertIsNone(self.manager.extract_field({}, 'field'))
+ self.assertIsNone(self.manager.extract_field({'data': {}}, 'field'))
+
+
+class TestResolveCredentialsBitwarden(unittest.TestCase):
+ """Tests for resolve_credentials on BitwardenManager."""
+
+ def setUp(self):
+ with patch.object(BitwardenManager, '__init__', lambda self, *a, **kw: None):
+ self.manager = BitwardenManager()
+
+ @patch.object(BitwardenManager, 'logout')
+ @patch.object(BitwardenManager, 'get_secret')
+ @patch.object(BitwardenManager, 'login')
+ def test_extract_login_fields(self, mock_login, mock_get_secret, mock_logout):
+ """Test resolving login fields (username, password) from Bitwarden."""
+ mock_get_secret.return_value = {
+ 'login': {'username': 'myuser', 'password': 'mypass'},
+ 'fields': [],
+ }
+
+ result = self.manager.resolve_credentials('my-item', ['username', 'password'])
+
+ self.assertEqual(result, {'username': 'myuser', 'password': 'mypass'})
+ mock_login.assert_called_once()
+ mock_logout.assert_called_once()
+
+ @patch.object(BitwardenManager, 'logout')
+ @patch.object(BitwardenManager, 'get_secret')
+ @patch.object(BitwardenManager, 'login')
+ def test_extract_custom_fields(self, mock_login, mock_get_secret, mock_logout):
+ """Test resolving custom fields (token) from Bitwarden."""
+ mock_get_secret.return_value = {
+ 'login': {},
+ 'fields': [{'name': 'api_key', 'value': 'tok123'}],
+ }
+
+ result = self.manager.resolve_credentials('my-item', ['api_key'])
+
+ self.assertEqual(result, {'api_key': 'tok123'})
+
+ @patch.object(BitwardenManager, 'logout')
+ @patch.object(BitwardenManager, 'get_secret')
+ @patch.object(BitwardenManager, 'login')
+ def test_missing_field_omitted(self, mock_login, mock_get_secret, mock_logout):
+ """Test that missing fields are omitted from result."""
+ mock_get_secret.return_value = {
+ 'login': {'username': 'myuser'},
+ 'fields': [],
+ }
+
+ result = self.manager.resolve_credentials('my-item', ['username', 'nonexistent'])
+
+ self.assertEqual(result, {'username': 'myuser'})
+
+ @patch.object(BitwardenManager, 'logout')
+ @patch.object(BitwardenManager, 'get_secret')
+ @patch.object(BitwardenManager, 'login')
+ def test_logout_called_on_failure(self, mock_login, mock_get_secret, mock_logout):
+ """Test that logout is called even when get_secret fails."""
+ mock_get_secret.side_effect = CredentialNotFoundError("not found")
+
+ with self.assertRaises(CredentialNotFoundError):
+ self.manager.resolve_credentials('my-item', ['username'])
+
+ mock_login.assert_called_once()
+ mock_logout.assert_called_once()
+
+
+class TestResolveCredentialsHashicorp(unittest.TestCase):
+ """Tests for resolve_credentials on HashicorpManager."""
+
+ def setUp(self):
+ with patch.object(HashicorpManager, '__init__', lambda self, *a, **kw: None):
+ self.manager = HashicorpManager()
+
+ @patch.object(HashicorpManager, 'get_secret')
+ def test_extract_fields(self, mock_get_secret):
+ """Test resolving fields from HashiCorp Vault."""
+ mock_get_secret.return_value = {
+ 'data': {'data': {'api_key': 'hc_token', 'user': 'admin'}},
+ }
+
+ result = self.manager.resolve_credentials(
+ 'secret/my-service', ['api_key', 'user']
+ )
+
+ self.assertEqual(result, {'api_key': 'hc_token', 'user': 'admin'})
+
+ @patch.object(HashicorpManager, 'get_secret')
+ def test_missing_field_omitted(self, mock_get_secret):
+ """Test that missing fields are omitted from result."""
+ mock_get_secret.return_value = {
+ 'data': {'data': {'api_key': 'tok'}},
+ }
+
+ result = self.manager.resolve_credentials(
+ 'secret/path', ['api_key', 'missing']
+ )
+
+ self.assertEqual(result, {'api_key': 'tok'})
+
+ @patch.object(HashicorpManager, 'get_secret')
+ def test_item_not_found_propagates(self, mock_get_secret):
+ """Test that CredentialNotFoundError from manager propagates."""
+ mock_get_secret.side_effect = CredentialNotFoundError("not found")
+
+ with self.assertRaises(CredentialNotFoundError):
+ self.manager.resolve_credentials('missing/path', ['field'])
+
+
+class TestResolveCredentialsGeneral(unittest.TestCase):
+ """Tests for general resolve_credentials behavior."""
+
+ def _make_stub_manager(self):
+ """Create a minimal concrete CredentialManager for testing."""
+
+ class StubManager(CredentialManager):
+ def get_secret(self, item_name):
+ return {}
+
+ def extract_field(self, secret, field_name):
+ return None
+
+ return StubManager()
+
+ def test_empty_item_name(self):
+ """Test that empty item_name raises ValueError."""
+ manager = self._make_stub_manager()
+ with self.assertRaises(ValueError):
+ manager.resolve_credentials('', ['field'])
+
+ def test_empty_field_names(self):
+ """Test that empty field_names returns empty dict without calling manager."""
+ manager = self._make_stub_manager()
+ result = manager.resolve_credentials('item', [])
+ self.assertEqual(result, {})
+
+
+if __name__ == '__main__':
+ unittest.main()