Skip to content

Commit cab6fc0

Browse files
authored
fix: handle case-insensitive retrieval in GoogleSecretManagerSettingsSource (#730)
1 parent 7b3dbdd commit cab6fc0

File tree

3 files changed

+165
-13
lines changed

3 files changed

+165
-13
lines changed

docs/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2452,6 +2452,9 @@ For nested models, Secret Manager supports the `env_nested_delimiter` setting as
24522452
### Important Notes
24532453

24542454
1. **Case Sensitivity**: By default, secret names are case-sensitive.
2455+
* If you set `case_sensitive=False`, `pydantic-settings` will attempt to resolve secrets in a case-insensitive manner. It prioritizes exact matches over case-insensitive matches. For some examples of this, imagine `case_sensitive=False` and the model attribute is named `my_secret`:
2456+
* If Google Secret Manager has both `MY_SECRET` and `my_secret` defined - the value of `my_secret` will be returned.
2457+
* If Google Secret Manager has `MY_SECRET`, `My_Secret`, and `my_Secret` defined - a warning will be raised and the value of `my_Secret` will be returned - as the secret names are first sorted in ASCII sort order (where lowercased letters are greater than upper case letters) and the last one is chosen (which would be `my_Secret` in this case).
24552458
2. **Secret Naming**: Create secrets in Google Secret Manager with names that match your field names (including any prefix). According the [Secret Manager documentation](https://cloud.google.com/secret-manager/docs/creating-and-accessing-secrets#create-a-secret), a secret name can contain uppercase and lowercase letters, numerals, hyphens, and underscores. The maximum allowed length for a name is 255 characters.
24562459
3. **Secret Versions**: The GoogleSecretManagerSettingsSource uses the "latest" version of secrets.
24572460

pydantic_settings/sources/providers/gcp.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations as _annotations
22

3+
import warnings
34
from collections.abc import Iterator, Mapping
45
from functools import cached_property
56
from typing import TYPE_CHECKING
@@ -47,36 +48,69 @@ def __init__(self, secret_client: SecretManagerServiceClient, project_id: str, c
4748
def _gcp_project_path(self) -> str:
4849
return self._secret_client.common_project_path(self._project_id)
4950

51+
def _select_case_insensitive_secret(self, lower_name: str, candidates: list[str]) -> str:
52+
if len(candidates) == 1:
53+
return candidates[0]
54+
55+
# Sort to ensure deterministic selection (prefer lowercase / ASCII last)
56+
candidates.sort()
57+
winner = candidates[-1]
58+
warnings.warn(
59+
f"Secret collision: Found multiple secrets {candidates} normalizing to '{lower_name}'. "
60+
f"Using '{winner}' for case-insensitive lookup.",
61+
UserWarning,
62+
stacklevel=2,
63+
)
64+
return winner
65+
5066
@cached_property
51-
def _secret_names(self) -> list[str]:
52-
rv: list[str] = []
67+
def _secret_name_map(self) -> dict[str, str]:
68+
mapping: dict[str, str] = {}
69+
# Group secrets by normalized name to detect collisions
70+
normalized_groups: dict[str, list[str]] = {}
5371

5472
secrets = self._secret_client.list_secrets(parent=self._gcp_project_path)
5573
for secret in secrets:
5674
name = self._secret_client.parse_secret_path(secret.name).get('secret', '')
75+
mapping[name] = name
76+
5777
if not self._case_sensitive:
58-
name = name.lower()
59-
rv.append(name)
60-
return rv
78+
lower_name = name.lower()
79+
if lower_name not in normalized_groups:
80+
normalized_groups[lower_name] = []
81+
normalized_groups[lower_name].append(name)
82+
83+
if not self._case_sensitive:
84+
for lower_name, candidates in normalized_groups.items():
85+
mapping[lower_name] = self._select_case_insensitive_secret(lower_name, candidates)
86+
87+
return mapping
88+
89+
@property
90+
def _secret_names(self) -> list[str]:
91+
return list(self._secret_name_map.keys())
6192

6293
def _secret_version_path(self, key: str, version: str = 'latest') -> str:
6394
return self._secret_client.secret_version_path(self._project_id, key, version)
6495

6596
def __getitem__(self, key: str) -> str | None:
66-
if not self._case_sensitive:
67-
key = key.lower()
68-
if key not in self._loaded_secrets:
69-
# If we know the key isn't available in secret manager, raise a key error
70-
if key not in self._secret_names:
71-
raise KeyError(key)
97+
if key in self._loaded_secrets:
98+
return self._loaded_secrets[key]
99+
100+
gcp_secret_name = self._secret_name_map.get(key)
101+
if gcp_secret_name is None and not self._case_sensitive:
102+
gcp_secret_name = self._secret_name_map.get(key.lower())
72103

104+
if gcp_secret_name:
73105
try:
74106
self._loaded_secrets[key] = self._secret_client.access_secret_version(
75-
name=self._secret_version_path(key)
107+
name=self._secret_version_path(gcp_secret_name)
76108
).payload.data.decode('UTF-8')
77109
except Exception:
78110
# If we can't access the secret, we return None
79111
self._loaded_secrets[key] = None
112+
else:
113+
raise KeyError(key)
80114

81115
return self._loaded_secrets[key]
82116

tests/test_source_gcp_secret_manager.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def settings_customise_sources(
208208
)
209209

210210
with pytest.raises(ValidationError):
211-
_ = Settings()
211+
_ = Settings() # type: ignore
212212

213213
def test_pydantic_base_settings_with_default_value(self, mock_secret_client):
214214
class Settings(BaseSettings):
@@ -242,3 +242,118 @@ def test_secret_manager_mapping_list_secrets_error(self, secret_manager_mapping,
242242

243243
with pytest.raises(Exception, match='Permission denied'):
244244
_ = secret_manager_mapping._secret_names
245+
246+
@pytest.mark.parametrize(
247+
'case_sensitive, secret_name_in_gcp, requested_key, expected_value',
248+
[
249+
(True, 'test-secret', 'test-secret', 'test-value'),
250+
(True, 'TEST-SECRET', 'TEST-SECRET', 'test-value'),
251+
(True, 'testSecret', 'testSecret', 'test-value'),
252+
(True, 'TEST-SECRET', 'test-secret', None),
253+
(True, 'test-secret', 'TEST_SECRET', None),
254+
(False, 'test-secret', 'TEST-SECRET', 'test-value'),
255+
(False, 'TEST-SECRET', 'test-secret', 'test-value'),
256+
(False, 'TEST-SECRET', 'TEST-SECRET', 'test-value'),
257+
(False, 'testSecret', 'testSecret', 'test-value'),
258+
(False, 'testSecret', 'TESTSECRET', 'test-value'),
259+
],
260+
)
261+
def test_secret_manager_mapping_retrieval_cases(
262+
self, mocker, case_sensitive, secret_name_in_gcp, requested_key, expected_value
263+
):
264+
"""
265+
Tests various combinations of case sensitivity and secret naming.
266+
"""
267+
client = mocker.Mock(spec=SecretManagerServiceClient)
268+
client.common_project_path.return_value = 'projects/test-project'
269+
client.secret_version_path = (
270+
lambda project, secret, version: f'projects/{project}/secrets/{secret}/versions/{version}'
271+
)
272+
client.parse_secret_path = SecretManagerServiceClient.parse_secret_path
273+
274+
# Mock list_secrets to return the specific secret name
275+
secret = mocker.Mock()
276+
secret.name = f'projects/test-project/secrets/{secret_name_in_gcp}'
277+
client.list_secrets.return_value = [secret]
278+
279+
secret_response = mocker.Mock()
280+
secret_response.payload.data.decode.return_value = 'test-value'
281+
282+
def mock_access_secret_version(name: str):
283+
# GCP is always case-sensitive
284+
if name == f'projects/test-project/secrets/{secret_name_in_gcp}/versions/latest':
285+
return secret_response
286+
raise Exception(f'Secret not found or access denied: {name}')
287+
288+
client.access_secret_version = mock_access_secret_version
289+
290+
mapping = GoogleSecretManagerMapping(client, project_id='test-project', case_sensitive=case_sensitive)
291+
292+
if expected_value is None:
293+
# Depending on implementation, it might raise KeyError or return None if we try to access it via .get() or handled access
294+
# The Mapping __getitem__ implementation in pydantic-settings currently returns None if access fails
295+
# OR raises KeyError if the key isn't in the list at all.
296+
297+
# If the key is not in _secret_names, it raises KeyError.
298+
# If it IS in _secret_names but access fails, it returns None.
299+
300+
# For case (True, 'TEST-SECRET', 'test-secret', None):
301+
# _secret_names will be ['TEST-SECRET']. 'test-secret' is not in there. KeyError expected.
302+
try:
303+
val = mapping[requested_key]
304+
assert val == expected_value
305+
except KeyError:
306+
assert expected_value is None
307+
else:
308+
assert mapping[requested_key] == expected_value
309+
310+
@pytest.mark.parametrize(
311+
'case_sensitive, requested_key, expected_value',
312+
[
313+
(True, 'TEST-SECRET', 'UPPER_VAL'),
314+
(True, 'test-secret', 'lower_val'),
315+
# Case insensitive collision with "Prefer Exact Match" logic:
316+
(False, 'TEST-SECRET', 'UPPER_VAL'), # Exact match exists, prefer it
317+
(False, 'test-secret', 'lower_val'), # Exact match exists, prefer it
318+
(False, 'Test-Secret', 'lower_val'), # No exact match, fallback to 'lower_val' (last loaded)
319+
],
320+
)
321+
def test_secret_manager_mapping_collision(self, mocker, case_sensitive, requested_key, expected_value):
322+
client = mocker.Mock(spec=SecretManagerServiceClient)
323+
client.common_project_path.return_value = 'projects/test-project'
324+
client.secret_version_path = (
325+
lambda project, secret, version: f'projects/{project}/secrets/{secret}/versions/{version}'
326+
)
327+
client.parse_secret_path = SecretManagerServiceClient.parse_secret_path
328+
329+
# Mock list_secrets with colliding names
330+
secrets = []
331+
for name in ['TEST-SECRET', 'test-secret']:
332+
s = mocker.Mock()
333+
s.name = f'projects/test-project/secrets/{name}'
334+
secrets.append(s)
335+
client.list_secrets.return_value = secrets
336+
337+
def mock_access_secret_version(name: str):
338+
# name format: projects/test-project/secrets/{SECRET_ID}/versions/latest
339+
if '/secrets/TEST-SECRET/' in name:
340+
resp = mocker.Mock()
341+
resp.payload.data.decode.return_value = 'UPPER_VAL'
342+
return resp
343+
elif '/secrets/test-secret/' in name:
344+
resp = mocker.Mock()
345+
resp.payload.data.decode.return_value = 'lower_val'
346+
return resp
347+
raise Exception(f'Secret not found: {name}')
348+
349+
client.access_secret_version = mock_access_secret_version
350+
351+
mapping = GoogleSecretManagerMapping(client, project_id='test-project', case_sensitive=case_sensitive)
352+
353+
if not case_sensitive:
354+
with pytest.warns(UserWarning, match='Secret collision'):
355+
_ = mapping._secret_name_map
356+
else:
357+
_ = mapping._secret_name_map
358+
359+
assert mapping[requested_key] == expected_value

0 commit comments

Comments
 (0)