Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions awscli/argprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,79 @@ def _unpack_json_cli_arg(argument_model, value, cli_name):
)


def _unpack_yaml_cli_arg(argument_model, value, cli_name):
try:
import yaml
except ImportError:
raise ParamError(
cli_name,
f"YAML is not available. Please install PyYAML to use YAML input.\n"
f"Attempted to parse as JSON but failed:\n{value}",
)
try:
# We use a custom loader to ensure we get OrderedDicts back.
# This matches the behavior of _unpack_json_cli_arg.
class OrderedDictLoader(yaml.SafeLoader):
pass

def construct_mapping(loader, node):
loader.flatten_mapping(node)
return OrderedDict(loader.construct_pairs(node))

OrderedDictLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping
)

parsed = yaml.load(value, Loader=OrderedDictLoader)
if parsed is None:
return None

# Basic type validation to ensure the YAML parsed into the expected type
if argument_model.type_name in ['structure', 'map'] and not isinstance(
parsed, dict
):
raise ParamError(
cli_name,
f"Expected a mapping for {argument_model.type_name}, "
f"found {type(parsed).__name__}. YAML received: {value}",
)
if argument_model.type_name == 'list' and not isinstance(parsed, list):
# If it's a list but only one element was provided and it's not a list,
# it might be a single YAML-parsed object. But we expect a list.
raise ParamError(
cli_name,
f"Expected a list, found {type(parsed).__name__}. "
f"YAML received: {value}",
)
return parsed
except Exception as e:
raise ParamError(
cli_name, f"Invalid YAML: {e}\nYAML received: {value}"
)


def _unpack_complex_cli_arg(argument_model, value, cli_name):
type_name = argument_model.type_name
if type_name == 'structure' or type_name == 'map':
if value.lstrip()[0] == '{':
if value.lstrip().startswith('{'):
return _unpack_json_cli_arg(argument_model, value, cli_name)
raise ParamError(cli_name, f"Invalid JSON:\n{value}")
return _unpack_yaml_cli_arg(argument_model, value, cli_name)
elif type_name == 'list':
if isinstance(value, str):
if value.lstrip()[0] == '[':
if value.lstrip().startswith('['):
return _unpack_json_cli_arg(argument_model, value, cli_name)
return _unpack_yaml_cli_arg(argument_model, value, cli_name)
elif isinstance(value, list) and len(value) == 1:
single_value = value[0].strip()
if single_value and single_value[0] == '[':
return _unpack_json_cli_arg(argument_model, value[0], cli_name)
if single_value:
if single_value.startswith('['):
return _unpack_json_cli_arg(argument_model, value[0], cli_name)
# If it's a list with one item, it might be a YAML string representing a list
try:
return _unpack_yaml_cli_arg(argument_model, value[0], cli_name)
except ParamError:
# Fall back to treated as a list of one string
pass
try:
# There's a couple of cases remaining here.
# 1. It's possible that this is just a list of strings, i.e
Expand Down
58 changes: 58 additions & 0 deletions awscli/customizations/cloudformation/yamlhelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,71 @@ def _dict_representer(dumper, data):
return dumper.represent_dict(data.items())


def _needs_quoting(value):
"""
Check if a string value needs to be quoted to prevent YAML from
interpreting it as a non-string type (number, boolean, null, etc.).

This addresses issue #3991 where strings like '1e10' were being
output without quotes, causing them to be interpreted as numbers
when the YAML is re-parsed.
"""
if not isinstance(value, str) or not value:
return False

# Check for scientific notation (e.g., 1e10, 1E-5, 2.5e+3)
# These are valid floats but should remain as strings if originally strings
import re
scientific_pattern = r'^[+-]?(\d+\.?\d*|\d*\.?\d+)[eE][+-]?\d+$'
if re.match(scientific_pattern, value):
return True

# Check for octal notation (e.g., 0o755, 0O644)
if re.match(r'^0[oO][0-7]+$', value):
return True

# Check for hex notation (e.g., 0x1A, 0X2B)
if re.match(r'^0[xX][0-9a-fA-F]+$', value):
return True

# Check for binary notation (e.g., 0b1010)
if re.match(r'^0[bB][01]+$', value):
return True

# Check for special YAML float values
if value.lower() in ('.inf', '-.inf', '.nan', '+.inf'):
return True

# Check for YAML 1.1 legacy octals (e.g., 0755) - numbers starting with 0
# but not just "0" and containing only digits
if re.match(r'^0\d+$', value):
return True

# Check for sexagesimal (base 60) numbers like 1:30:00
if re.match(r'^\d+:\d+(:\d+)*$', value):
return True

return False


def _string_representer(dumper, data):
"""
Custom string representer that quotes strings which could be
misinterpreted as numbers or other YAML types.
"""
if _needs_quoting(data):
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="'")
return dumper.represent_scalar('tag:yaml.org,2002:str', data)


def yaml_dump(dict_to_dump):
"""
Dumps the dictionary as a YAML document
:param dict_to_dump:
:return:
"""
FlattenAliasDumper.add_representer(OrderedDict, _dict_representer)
FlattenAliasDumper.add_representer(str, _string_representer)
return yaml.dump(
dict_to_dump,
default_flow_style=False,
Expand Down
89 changes: 89 additions & 0 deletions tests/unit/customizations/cloudformation/test_yamlhelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,92 @@ def test_unroll_yaml_anchors(self):
)
actual = yaml_dump(template)
self.assertEqual(actual, expected)

def test_scientific_notation_strings_are_quoted(self):
"""
Test fix for issue #3991: strings that look like scientific notation
should be quoted to prevent them from being interpreted as numbers.
"""
template = {
"Parameters": {
"Value1": {"Default": "1e10"},
"Value2": {"Default": "1E-5"},
"Value3": {"Default": "2.5e+3"},
}
}
dumped = yaml_dump(template)

# Scientific notation strings should be quoted
self.assertIn("'1e10'", dumped)
self.assertIn("'1E-5'", dumped)
self.assertIn("'2.5e+3'", dumped)

# Verify round-trip preserves string type
reparsed = yaml_parse(dumped)
self.assertEqual(reparsed["Parameters"]["Value1"]["Default"], "1e10")
self.assertEqual(reparsed["Parameters"]["Value2"]["Default"], "1E-5")
self.assertEqual(reparsed["Parameters"]["Value3"]["Default"], "2.5e+3")

def test_octal_hex_binary_strings_are_quoted(self):
"""
Test that octal, hex, and binary notation strings are quoted.
"""
template = {
"Values": {
"Octal": "0755",
"OctalNew": "0o755",
"Hex": "0x1A2B",
"Binary": "0b1010",
}
}
dumped = yaml_dump(template)

# These should be quoted
self.assertIn("'0755'", dumped)
self.assertIn("'0o755'", dumped)
self.assertIn("'0x1A2B'", dumped)
self.assertIn("'0b1010'", dumped)

# Verify round-trip
reparsed = yaml_parse(dumped)
self.assertEqual(reparsed["Values"]["Octal"], "0755")
self.assertEqual(reparsed["Values"]["Hex"], "0x1A2B")

def test_sexagesimal_strings_are_quoted(self):
"""
Test that sexagesimal (base 60) notation strings are quoted.
"""
template = {
"Values": {
"Time1": "1:30:00",
"Time2": "12:30",
}
}
dumped = yaml_dump(template)

# Should be quoted
self.assertIn("'1:30:00'", dumped)
self.assertIn("'12:30'", dumped)

# Verify round-trip
reparsed = yaml_parse(dumped)
self.assertEqual(reparsed["Values"]["Time1"], "1:30:00")
self.assertEqual(reparsed["Values"]["Time2"], "12:30")

def test_normal_strings_not_excessively_quoted(self):
"""
Test that normal strings are not unnecessarily quoted.
"""
template = {
"Values": {
"Normal1": "hello",
"Normal2": "world-123",
"Arn": "arn:aws:s3:::bucket",
}
}
dumped = yaml_dump(template)

# Normal strings should not be quoted (except ARN which has colons)
self.assertIn("Normal1: hello", dumped)
self.assertIn("Normal2: world-123", dumped)

53 changes: 53 additions & 0 deletions tests/unit/test_argprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,64 @@ def test_json_value_null(self):
value = 'null'
self.assertEqual(unpack_cli_arg(self.p, value), None)


def test_json_value_decode_error(self):
value = 'invalid string to be serialized'
with self.assertRaises(ParamError):
unpack_cli_arg(self.p, value)


class TestYAMLParams(BaseArgProcessTest):
def test_yaml_structure(self):
p = self.get_param_model('elasticbeanstalk.CreateConfigurationTemplate.'
'SourceConfiguration')
# Simple YAML mapping
yaml_content = "ApplicationName: foo\nTemplateName: bar"
result = unpack_cli_arg(p, yaml_content)
self.assertEqual(result, OrderedDict([('ApplicationName', 'foo'), ('TemplateName', 'bar')]))

def test_yaml_list(self):
p = self.get_param_model('cloudformation.CreateStack.Parameters')
# YAML list of mappings
yaml_content = "- ParameterKey: Key1\n ParameterValue: Val1\n- ParameterKey: Key2\n ParameterValue: Val2"
result = unpack_cli_arg(p, yaml_content)
self.assertEqual(result, [
OrderedDict([('ParameterKey', 'Key1'), ('ParameterValue', 'Val1')]),
OrderedDict([('ParameterKey', 'Key2'), ('ParameterValue', 'Val2')])
])

def test_yaml_map(self):
p = self.get_param_model('sqs.SetQueueAttributes.Attributes')
# YAML mapping for a map type
yaml_content = "VisibilityTimeout: '15'\nDelaySeconds: '10'"
result = unpack_cli_arg(p, yaml_content)
self.assertEqual(result, OrderedDict([('VisibilityTimeout', '15'), ('DelaySeconds', '10')]))

def test_invalid_yaml(self):
p = self.get_param_model('elasticbeanstalk.CreateConfigurationTemplate.'
'SourceConfiguration')
# Invalid YAML (missing colon)
yaml_content = "ApplicationName foo"
with self.assertRaises(ParamError) as cm:
unpack_cli_arg(p, yaml_content)
self.assertIn("Invalid YAML", str(cm.exception))

def test_yaml_type_mismatch_list(self):
p = self.get_param_model('cloudformation.CreateStack.Parameters')
# Providing a mapping when a list is expected
yaml_content = "Key: Value"
with self.assertRaises(ParamError) as cm:
unpack_cli_arg(p, yaml_content)
self.assertIn("Expected a list", str(cm.exception))

def test_yaml_preserves_order(self):
p = self.get_param_model('elasticbeanstalk.CreateConfigurationTemplate.'
'SourceConfiguration')
# Ensure order is preserved in the resulting OrderedDict
yaml_content = "Z: value\nA: value"
result = unpack_cli_arg(p, yaml_content)
self.assertEqual(list(result.keys()), ['Z', 'A'])


if __name__ == '__main__':
unittest.main()