Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/mm.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added remote file copy to Arita EOS.
2 changes: 2 additions & 0 deletions changes/mm.removed
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Removed warning log message.
Removed log initialization.
4 changes: 0 additions & 4 deletions pyntc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Kickoff functions for getting instance of device objects."""

import os
import warnings
from importlib import metadata

from .devices import supported_devices
Expand All @@ -23,9 +22,6 @@
LIB_PATH_DEFAULT = "~/.ntc.conf"


warnings.simplefilter("default")


def ntc_device(device_type, *args, **kwargs):
"""
Instantiate an instance of a ``pyntc.devices.BaseDevice`` by ``device_type``.
Expand Down
99 changes: 99 additions & 0 deletions pyntc/devices/eos_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
RebootTimeoutError,
)
from pyntc.utils import convert_list_by_key
from pyntc.utils.models import FileCopyModel

BASIC_FACTS_KM = {"model": "modelName", "os_version": "internalVersion", "serial_number": "serialNumber"}
INTERFACES_KM = {
Expand Down Expand Up @@ -397,6 +398,104 @@ def file_copy(self, src, dest=None, file_system=None):
)
raise FileTransferError

def verify_file(self, checksum, filename, hashing_algorithm="md5", file_system=None):
"""Verify a file on the remote device by and validate the checksums.

Args:
checksum (str): The checksum of the file.
filename (str): The name of the file to check for on the remote device.
hashing_algorithm (str): The hashing algorithm to use (default: "md5").
file_system (str): Supported only for IOS and NXOS. The file system for the
remote file. If no file_system is provided, then the ``get_file_system``
method is used to determine the correct file system to use.

Returns:
(bool): True if the file is verified successfully, False otherwise.
"""
return self.check_file_exists(filename, file_system=file_system) and self.compare_file_checksum(
checksum, filename, hashing_algorithm, file_system=file_system
)

def remote_file_copy(self, src: FileCopyModel, dest=None, file_system=None, **kwargs):
"""Copy a file to a remote device.

Args:
src (FileCopyModel): The source file model.
dest (str): The destination file path on the remote device.
kwargs (dict): Additional keyword arguments that may be used by subclasses.

Keyword Args:
file_system (str): Supported only for IOS and NXOS. The file system for the
remote file. If no file_system is provided, then the ``get_file_system``
method is used to determine the correct file system to use.

Raises:
TypeError: If src is not an instance of FileCopyModel.
FileTransferError: If there is an error during file transfer or if the file cannot be verified after transfer.
"""
if not isinstance(src, FileCopyModel):
raise TypeError("src must be an instance of FileCopyModel")
if file_system is None:
file_system = self._get_file_system()
if dest is None:
dest = src.file_name
if not self.verify_file(src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system):
current_prompt = self.native.find_prompt()

# MM - Arista prompts password, but errors for any other required item in command
# Define prompt mapping for expected prompts during file copy
prompt_answers = {
r"Password": src.token,
# r"Source username": src.username,
# r"yes/no|Are you sure you want to continue connecting": "yes",
# r"(confirm|Address or name of remote host|Source filename|Destination filename)": "", # Press Enter
}
keys = list(prompt_answers.keys()) + [re.escape(current_prompt)]
expect_regex = f"({'|'.join(keys)})"

# command example: copy scp://janedoe:janepassword@scp-host.internaldomain/EOS-4.15.10M.swi flash:
command = f"copy {src.clean_url} {file_system}{dest}" # TODO: clean_url is... a name.
if src.vrf and src.scheme not in {"http", "https"}:
command = f"{command} vrf {src.vrf}"

# _send_command currently checks for % and raises an error, but during the file copy
# there may be a % warning that does not indicate a failure so we will use send_command directly.
output = self.native.send_command(command, expect_string=expect_regex, read_timeout=src.timeout)

while current_prompt not in output:
# Check for success message in output to break loop and avoid waiting for next prompt
if re.search(r"Copy completed successfully.", output, re.IGNORECASE):
log.info(
"Host %s: File %s transferred successfully with output: %s", self.host, src.file_name, output
)
break
# Check for errors explicitly to avoid infinite loops on failure
if re.search(r"(% Error copying|% Incomplete command)", output, re.IGNORECASE):
log.error("Host %s: File transfer error %s", self.host, FileTransferError.default_message)
raise FileTransferError
# Allow transfers to continue if in progress
# % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
# Dload  Upload   Total   Spent    Left  Speed
if re.search(r"% Total", output, re.IGNORECASE):
continue
for prompt, answer in prompt_answers.items():
if re.search(prompt, output, re.IGNORECASE):
is_password = "Password" in prompt
output = self.native.send_command(
answer, expect_string=expect_regex, read_timeout=src.timeout, cmd_verify=not is_password
)
break # Exit the for loop and check the new output for the next prompt

if not self.verify_file(
src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system
):
log.error(
"Host %s: Attempted remote file copy, but could not validate file existed after transfer %s",
self.host,
FileTransferError.default_message,
)
raise FileTransferError

# TODO: Make this an internal method since exposing file_copy should be sufficient
def file_copy_remote_exists(self, src, dest=None, file_system=None):
"""Copy file to remote device if it exists.
Expand Down
2 changes: 0 additions & 2 deletions pyntc/devices/iosxewlc_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
class IOSXEWLCDevice(IOSDevice):
"""Cisco IOSXE WLC Device Implementation."""

log.init()

def _wait_for_device_start_reboot(self, timeout=600):
start = time.time()
while time.time() - start < timeout:
Expand Down
Loading