Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 94 additions & 46 deletions core/services/cable_guy/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,21 +383,99 @@ def remove_ip(self, interface_name: str, ip_address: str) -> None:

self._update_interface_settings(interface_name, saved_interface)

def get_interface_by_name(self, name: str) -> NetworkInterface:
for interface in self.get_ethernet_interfaces():
def get_interface_by_name(self, name: str, include_dhcp_markers: bool = False) -> NetworkInterface:
for interface in self.get_ethernet_interfaces(include_dhcp_markers):
if interface.name == name:
return interface
raise ValueError(f"No interface with name '{name}' is present.")

def get_saved_interface_by_name(self, name: str) -> Optional[NetworkInterface]:
return next((i for i in self._settings.content if i.name == name), None)

# pylint: disable=too-many-locals
def get_interfaces(self, filter_wifi: bool = False) -> List[NetworkInterface]:
def _determine_address_mode(self, interface: str, ip: str, is_static_ip: bool, valid_ip: bool) -> AddressMode:
if self._is_dhcp_server_running_on_interface(interface) and str(
self._dhcp_server_on_interface(interface).ipv4_gateway
) == str(ip):
if self._dhcp_server_on_interface(interface).is_backup_server:
return AddressMode.BackupServer
return AddressMode.Server

if is_static_ip and valid_ip:
return AddressMode.Unmanaged
return AddressMode.Client

def _process_interface_addresses(self, interface: str, addresses: List[Any]) -> List[InterfaceAddress]:
valid_addresses = []
for address in addresses:
# We just care about IPV4 addresses
if address.family != AddressFamily.AF_INET:
continue

valid_ip = EthernetManager.weak_is_ip_address(address.address)
ip = address.address if valid_ip else "undefined"

is_static_ip = self.is_static_ip(ip)
mode = self._determine_address_mode(interface, ip, is_static_ip, valid_ip)
valid_addresses.append(InterfaceAddress(ip=ip, mode=mode))

return valid_addresses

def _add_dhcp_markers_if_needed(
self, interface: str, valid_addresses: List[InterfaceAddress], include_dhcp_markers: bool
) -> List[InterfaceAddress]:
if not include_dhcp_markers:
return valid_addresses

interface_settings = self.get_saved_interface_by_name(interface)
if (
interface_settings
and any(
str(address.ip) == "0.0.0.0" and address.mode == AddressMode.Client
for address in interface_settings.addresses
)
and not any(
str(address.ip) == "0.0.0.0" and address.mode == AddressMode.Client for address in valid_addresses
)
):
valid_addresses.append(InterfaceAddress(ip="0.0.0.0", mode=AddressMode.Client))

return valid_addresses

def _get_interface_priority_value(self, interface: str) -> Optional[int]:
saved_interface = self.get_saved_interface_by_name(interface)
if saved_interface and saved_interface.priority is not None:
return saved_interface.priority

interface_metric = self.get_interface_priority(interface)
if interface_metric:
return interface_metric.priority

return None

def _build_network_interface(
self, interface: str, valid_addresses: List[InterfaceAddress], filter_wifi: bool
) -> Optional[NetworkInterface]:
info = self.get_interface_info(interface)
priority = self._get_interface_priority_value(interface)
routes = self.get_routes(interface, ignore_unmanaged=False)

interface_data = NetworkInterface(
name=interface, addresses=valid_addresses, info=info, priority=priority, routes=list(routes)
)

# Check if it's valid and return the interface data
if self.validate_interface_data(interface_data, filter_wifi):
return interface_data

return None

def get_interfaces(self, filter_wifi: bool = False, include_dhcp_markers: bool = False) -> List[NetworkInterface]:
"""Get interfaces information

Args:
filter_wifi (boolean, optional): Enable wifi interface filtering
include_dhcp_markers (boolean, optional): DHCP marker is the IP 0.0.0.0 AddressMode.Client, used to
inform cable guy that a dynamic IP should be acquired if available.

Returns:
List of NetworkInterface instances available
Expand All @@ -410,56 +488,26 @@ def get_interfaces(self, filter_wifi: bool = False) -> List[NetworkInterface]:
if not self.is_valid_interface_name(interface, filter_wifi):
continue

valid_addresses = []
for address in addresses:
# We just care about IPV4 addresses
if not address.family == AddressFamily.AF_INET:
continue

valid_ip = EthernetManager.weak_is_ip_address(address.address)
ip = address.address if valid_ip else "undefined"

is_static_ip = self.is_static_ip(ip)

# Populate our output item
if self._is_dhcp_server_running_on_interface(interface) and str(
self._dhcp_server_on_interface(interface).ipv4_gateway
) == str(ip):
mode = AddressMode.Server
if self._dhcp_server_on_interface(interface).is_backup_server:
mode = AddressMode.BackupServer
else:
mode = AddressMode.Unmanaged if is_static_ip and valid_ip else AddressMode.Client
valid_addresses.append(InterfaceAddress(ip=ip, mode=mode))
info = self.get_interface_info(interface)
saved_interface = self.get_saved_interface_by_name(interface)
# Get priority from saved interface or from current interface metrics, defaulting to None if neither exists
priority = None
if saved_interface and saved_interface.priority is not None:
priority = saved_interface.priority
else:
interface_metric = self.get_interface_priority(interface)
if interface_metric:
priority = interface_metric.priority

routes = self.get_routes(interface, ignore_unmanaged=False)
valid_addresses = self._process_interface_addresses(interface, addresses)
valid_addresses = self._add_dhcp_markers_if_needed(interface, valid_addresses, include_dhcp_markers)

interface_data = NetworkInterface(
name=interface, addresses=valid_addresses, info=info, priority=priority, routes=list(routes)
)
# Check if it's valid and add to the result
if self.validate_interface_data(interface_data, filter_wifi):
result += [interface_data]
interface_data = self._build_network_interface(interface, valid_addresses, filter_wifi)
if interface_data:
result.append(interface_data)

return result

def get_ethernet_interfaces(self) -> List[NetworkInterface]:
def get_ethernet_interfaces(self, include_dhcp_markers: bool = False) -> List[NetworkInterface]:
"""Get ethernet interfaces information

Args:
include_dhcp_markers (boolean, optional): DHCP marker is the IP 0.0.0.0 AddressMode.Client, used to
inform cable guy that a dynamic IP should be acquired if available.

Returns:
List of NetworkInterface instances available
"""
return self.get_interfaces(filter_wifi=True)
return self.get_interfaces(filter_wifi=True, include_dhcp_markers=include_dhcp_markers)

def get_interface_ndb(self, interface_name: str) -> Any:
"""Get interface NDB information for interface
Expand Down Expand Up @@ -634,7 +682,7 @@ def _execute_route(self, action: str, interface_name: str, route: Route) -> None
raise

# Update settings
current_interface = self.get_interface_by_name(interface_name)
current_interface = self.get_interface_by_name(interface_name, include_dhcp_markers=True)
for current_route in current_interface.routes:
if current_route.destination_parsed == route.destination_parsed and current_route.gateway == route.gateway:
current_route.managed = route.managed
Expand Down