Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 67 additions & 21 deletions emulators/aws-ec2/emulator_core/services/dhcpoptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,53 @@ def _generate_id(self, prefix: str = 'dhcp') -> str:
from ..utils import get_scalar, get_int, get_indexed_list, parse_filters, parse_tags, str2bool, esc
from ..utils import is_error_response, serialize_error_response

def parse_dhcp_configurations(params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse CreateDhcpOptions DhcpConfiguration.N nested query parameters.

AWS CLI sends DHCP configurations as EC2 Query-style flattened nested fields,
for example:
DhcpConfiguration.1.Key=domain-name
DhcpConfiguration.1.Value.1=example.com

get_indexed_list(params, "DhcpConfiguration") only supports scalar indexed
fields such as DhcpConfiguration.1, so this helper reconstructs the nested
object list without changing the shared scalar-list parser used by other APIs.
"""
configs: List[Dict[str, Any]] = []
index = 1

while True:
prefix = f"DhcpConfiguration.{index}"
key_name = f"{prefix}.Key"

if key_name not in params:
break

key = get_scalar(params, key_name)
values: List[Dict[str, str]] = []

value_index = 1
while True:
value_name = f"{prefix}.Value.{value_index}"
if value_name not in params:
break

value = get_scalar(params, value_name)
if value is not None:
values.append({"value": value})

value_index += 1

configs.append({
"key": key,
"valueSet": values,
})

index += 1

return configs


class dhcpoptions_RequestParser:
@staticmethod
def parse_associate_dhcp_options_request(md: Dict[str, Any]) -> Dict[str, Any]:
Expand All @@ -212,7 +259,7 @@ def parse_associate_dhcp_options_request(md: Dict[str, Any]) -> Dict[str, Any]:
@staticmethod
def parse_create_dhcp_options_request(md: Dict[str, Any]) -> Dict[str, Any]:
return {
"DhcpConfiguration.N": get_indexed_list(md, "DhcpConfiguration"),
"DhcpConfiguration.N": parse_dhcp_configurations(md),
"DryRun": str2bool(get_scalar(md, "DryRun")),
"TagSpecification.N": parse_tags(md, "TagSpecification"),
}
Expand Down Expand Up @@ -335,27 +382,26 @@ def serialize_associate_dhcp_options_response(data: Dict[str, Any], request_id:
def serialize_create_dhcp_options_response(data: Dict[str, Any], request_id: str) -> str:
xml_parts = []
xml_parts.append(f'<CreateDhcpOptionsResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">')
xml_parts.append(f' <requestId>{esc(request_id)}</requestId>')
# Serialize dhcpOptions
_dhcpOptions_key = None
if "dhcpOptions" in data:
_dhcpOptions_key = "dhcpOptions"
elif "DhcpOptions" in data:
_dhcpOptions_key = "DhcpOptions"
if _dhcpOptions_key:
param_data = data[_dhcpOptions_key]
indent_str = " " * 1
if param_data:
xml_parts.append(f'{indent_str}<dhcpOptionsSet>')
for item in param_data:
xml_parts.append(f'{indent_str} <item>')
xml_parts.extend(dhcpoptions_ResponseSerializer._serialize_nested_fields(item, 2))
xml_parts.append(f'{indent_str} </item>')
xml_parts.append(f'{indent_str}</dhcpOptionsSet>')
else:
xml_parts.append(f'{indent_str}<dhcpOptionsSet/>')
xml_parts.append(f' <requestId>{esc(str(request_id))}</requestId>')

# CreateDhcpOptions is a singular create response.
# AWS CLI / botocore expects <dhcpOptions>, not <dhcpOptionsSet>.
param_data = data.get("dhcpOptions") or data.get("DhcpOptions") or []

if isinstance(param_data, list):
dhcp_options = param_data[0] if param_data else None
else:
dhcp_options = param_data

if dhcp_options:
xml_parts.append(f' <dhcpOptions>')
xml_parts.extend(dhcpoptions_ResponseSerializer._serialize_nested_fields(dhcp_options, 2))
xml_parts.append(f' </dhcpOptions>')
else:
xml_parts.append(f' <dhcpOptions/>')

xml_parts.append(f'</CreateDhcpOptionsResponse>')
return "\n".join(xml_parts)
return '\n'.join(xml_parts)

@staticmethod
def serialize_delete_dhcp_options_response(data: Dict[str, Any], request_id: str) -> str:
Expand Down
144 changes: 125 additions & 19 deletions emulators/aws-ec2/emulator_core/services/elasticnetworkinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ def CreateNetworkInterface(self, params: Dict[str, Any]):
if not sg:
return create_error_response("InvalidGroup.NotFound", f"The ID '{group_id}' does not exist")
group_set.append({
"GroupId": group_id,
"GroupName": getattr(sg, "group_name", ""),
"groupId": group_id,
"groupName": getattr(sg, "group_name", ""),
})

tag_set: List[Dict[str, Any]] = []
Expand Down Expand Up @@ -996,8 +996,8 @@ def ModifyNetworkInterfaceAttribute(self, params: Dict[str, Any]):
if not sg:
return create_error_response("InvalidGroup.NotFound", f"The ID '{group_id}' does not exist")
group_set.append({
"GroupId": group_id,
"GroupName": getattr(sg, "group_name", ""),
"groupId": group_id,
"groupName": getattr(sg, "group_name", ""),
})
network_interface.group_set = group_set

Expand Down Expand Up @@ -1378,6 +1378,27 @@ def _serialize_nested_fields(d: Dict[str, Any], indent_level: int) -> List[str]:
xml_parts.append(f'{indent}<{key}>{esc(str(value))}</{key}>')
return xml_parts


@staticmethod
def _serialize_group_set(group_set: List[Dict[str, Any]], indent_level: int) -> List[str]:
xml_parts = []
indent = ' ' * indent_level
if group_set:
xml_parts.append(f'{indent}<groupSet>')
for item in group_set:
xml_parts.append(f'{indent} <item>')
group_id = item.get("groupId") or item.get("GroupId") or ""
group_name = item.get("groupName") or item.get("GroupName") or ""
if group_id != "":
xml_parts.append(f'{indent} <groupId>{esc(str(group_id))}</groupId>')
if group_name != "":
xml_parts.append(f'{indent} <groupName>{esc(str(group_name))}</groupName>')
xml_parts.append(f'{indent} </item>')
xml_parts.append(f'{indent}</groupSet>')
else:
xml_parts.append(f'{indent}<groupSet/>')
return xml_parts

@staticmethod
def serialize_assign_ipv6_addresses_response(data: Dict[str, Any], request_id: str) -> str:
xml_parts = []
Expand Down Expand Up @@ -1516,6 +1537,26 @@ def serialize_attach_network_interface_response(data: Dict[str, Any], request_id
xml_parts.append(f'</AttachNetworkInterfaceResponse>')
return "\n".join(xml_parts)

@staticmethod
def _serialize_group_set(group_set: List[Dict[str, Any]], indent_level: int) -> List[str]:
xml_parts = []
indent = ' ' * indent_level
if group_set:
xml_parts.append(f'{indent}<groupSet>')
for item in group_set:
xml_parts.append(f'{indent} <item>')
group_id = item.get("groupId") or item.get("GroupId") or ""
group_name = item.get("groupName") or item.get("GroupName") or ""
if group_id != "":
xml_parts.append(f'{indent} <groupId>{esc(str(group_id))}</groupId>')
if group_name != "":
xml_parts.append(f'{indent} <groupName>{esc(str(group_name))}</groupName>')
xml_parts.append(f'{indent} </item>')
xml_parts.append(f'{indent}</groupSet>')
else:
xml_parts.append(f'{indent}<groupSet/>')
return xml_parts

@staticmethod
def serialize_create_network_interface_response(data: Dict[str, Any], request_id: str) -> str:
xml_parts = []
Expand All @@ -1541,10 +1582,44 @@ def serialize_create_network_interface_response(data: Dict[str, Any], request_id
param_data = data[_networkInterface_key]
indent_str = " " * 1
xml_parts.append(f'{indent_str}<networkInterface>')
xml_parts.extend(elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(param_data, 2))
for key, value in param_data.items():
if key == "groupSet":
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_group_set(
value, 2
)
)
elif value is None:
continue
elif isinstance(value, dict):
xml_parts.append(f' <{key}>')
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(
value, 3
)
)
xml_parts.append(f' </{key}>')
elif isinstance(value, list):
xml_parts.append(f' <{key}>')
for item in value:
if isinstance(item, dict):
xml_parts.append(f' <item>')
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(
item, 4
)
)
xml_parts.append(f' </item>')
else:
xml_parts.append(f' <item>{esc(str(item))}</item>')
xml_parts.append(f' </{key}>')
elif isinstance(value, bool):
xml_parts.append(f' <{key}>{str(value).lower()}</{key}>')
else:
xml_parts.append(f' <{key}>{esc(str(value))}</{key}>')
xml_parts.append(f'{indent_str}</networkInterface>')
xml_parts.append(f'</CreateNetworkInterfaceResponse>')
return "\n".join(xml_parts)
return "".join(xml_parts)

@staticmethod
def serialize_create_network_interface_permission_response(data: Dict[str, Any], request_id: str) -> str:
Expand Down Expand Up @@ -1657,16 +1732,11 @@ def serialize_describe_network_interface_attribute_response(data: Dict[str, Any]
_groupSet_key = "Groups"
if _groupSet_key:
param_data = data[_groupSet_key]
indent_str = " " * 1
if param_data:
xml_parts.append(f'{indent_str}<groupSet>')
for item in param_data:
xml_parts.append(f'{indent_str} <item>')
xml_parts.extend(elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(item, 2))
xml_parts.append(f'{indent_str} </item>')
xml_parts.append(f'{indent_str}</groupSet>')
else:
xml_parts.append(f'{indent_str}<groupSet/>')
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_group_set(
param_data, 1
)
)
# Serialize networkInterfaceId
_networkInterfaceId_key = None
if "networkInterfaceId" in data:
Expand All @@ -1690,7 +1760,7 @@ def serialize_describe_network_interface_attribute_response(data: Dict[str, Any]
xml_parts.extend(elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(param_data, 2))
xml_parts.append(f'{indent_str}</sourceDestCheck>')
xml_parts.append(f'</DescribeNetworkInterfaceAttributeResponse>')
return "\n".join(xml_parts)
return "".join(xml_parts)

@staticmethod
def serialize_describe_network_interface_permissions_response(data: Dict[str, Any], request_id: str) -> str:
Expand Down Expand Up @@ -1748,7 +1818,43 @@ def serialize_describe_network_interfaces_response(data: Dict[str, Any], request
xml_parts.append(f'{indent_str}<networkInterfaceSet>')
for item in param_data:
xml_parts.append(f'{indent_str} <item>')
xml_parts.extend(elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(item, 2))
for key, value in item.items():
if key == "groupSet":
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_group_set(
value, 3
)
)
elif value is None:
continue
elif isinstance(value, dict):
xml_parts.append(f' <{key}>')
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(
value, 4
)
)
xml_parts.append(f' </{key}>')
elif isinstance(value, list):
xml_parts.append(f' <{key}>')
for sub_item in value:
if isinstance(sub_item, dict):
xml_parts.append(f' <item>')
xml_parts.extend(
elasticnetworkinterface_ResponseSerializer._serialize_nested_fields(
sub_item, 5
)
)
xml_parts.append(f' </item>')
else:
xml_parts.append(
f' <item>{esc(str(sub_item))}</item>'
)
xml_parts.append(f' </{key}>')
elif isinstance(value, bool):
xml_parts.append(f' <{key}>{str(value).lower()}</{key}>')
else:
xml_parts.append(f' <{key}>{esc(str(value))}</{key}>')
xml_parts.append(f'{indent_str} </item>')
xml_parts.append(f'{indent_str}</networkInterfaceSet>')
else:
Expand All @@ -1764,7 +1870,7 @@ def serialize_describe_network_interfaces_response(data: Dict[str, Any], request
indent_str = " " * 1
xml_parts.append(f'{indent_str}<nextToken>{esc(str(param_data))}</nextToken>')
xml_parts.append(f'</DescribeNetworkInterfacesResponse>')
return "\n".join(xml_parts)
return "".join(xml_parts)

@staticmethod
def serialize_detach_network_interface_response(data: Dict[str, Any], request_id: str) -> str:
Expand Down
Loading