|
| 1 | +import logging |
| 2 | +import argparse |
| 3 | +import textwrap |
| 4 | +import glob |
| 5 | +import os |
| 6 | +import json |
| 7 | +import serial |
| 8 | +import time |
| 9 | +import subprocess |
| 10 | +import re |
| 11 | + |
| 12 | +from pymcuprog.backend import SessionConfig |
| 13 | +from pymcuprog.toolconnection import ToolUsbHidConnection |
| 14 | +from pymcuprog.backend import Backend |
| 15 | +from pymcuprog.hexfileutils import read_memories_from_hex |
| 16 | +from pymcuprog.deviceinfo.memorynames import MemoryNameAliases |
| 17 | +from pymcuprog.deviceinfo.deviceinfokeys import DeviceMemoryInfoKeys |
| 18 | + |
| 19 | + |
| 20 | +BOARD_CONFIG = "DxCore:megaavr:avrdb:appspm=no,chip=avr128db48,clock=24internal,"\ |
| 21 | + "bodvoltage=1v9,bodmode=disabled,eesave=enable,resetpin=reset,"\ |
| 22 | + "millis=tcb2,startuptime=8,wiremode=mors2,printf=full" |
| 23 | + |
| 24 | +TIMEOUT = 10 |
| 25 | + |
| 26 | + |
| 27 | +def retrieve_examples(examples_directory): |
| 28 | + """Returns the absolute path to each example sketch file in the examples_directory. |
| 29 | +
|
| 30 | + Args: |
| 31 | + examples_directory (str): Path to the examples directory |
| 32 | + """ |
| 33 | + |
| 34 | + return glob.glob(examples_directory + "/**/*.ino", recursive=True) |
| 35 | + |
| 36 | + |
| 37 | +def program(sketch_path, build_directory): |
| 38 | + """Builds and programs the sketch file |
| 39 | +
|
| 40 | + Args: |
| 41 | + sketch_path (str): Path to the sketch |
| 42 | + build_directory (str): Path to the build directory |
| 43 | + """ |
| 44 | + |
| 45 | + print(f"Buillding and programming {os.path.basename(sketch_path)}...") |
| 46 | + |
| 47 | + if not os.path.exists(build_directory): |
| 48 | + os.mkdir(build_directory) |
| 49 | + |
| 50 | + # Use subprocess.check_output here and discard the output to not have any output from arduino-cli |
| 51 | + subprocess.check_output(f"arduino-cli compile {sketch_path} -b {BOARD_CONFIG} --output-dir {build_directory}") |
| 52 | + |
| 53 | + hex_file = build_directory + f"/{os.path.basename(sketch_path)}.hex" |
| 54 | + |
| 55 | + try: |
| 56 | + memory_segments = read_memories_from_hex(hex_file, backend.device_memory_info) |
| 57 | + backend.erase(MemoryNameAliases.ALL, address=None) |
| 58 | + |
| 59 | + for segment in memory_segments: |
| 60 | + memory_name = segment.memory_info[DeviceMemoryInfoKeys.NAME] |
| 61 | + backend.write_memory(segment.data, memory_name, segment.offset) |
| 62 | + verify_ok = backend.verify_memory(segment.data, memory_name, segment.offset) |
| 63 | + |
| 64 | + if verify_ok: |
| 65 | + logging.info("OK") |
| 66 | + else: |
| 67 | + logging.error("Verification failed!") |
| 68 | + return False |
| 69 | + |
| 70 | + except Exception as exception: |
| 71 | + logging.warning(f"Error programming: {exception}") |
| 72 | + return False |
| 73 | + |
| 74 | + return True |
| 75 | + |
| 76 | + |
| 77 | +def test(example, test_data, serial_handle): |
| 78 | + |
| 79 | + example_name = os.path.splitext(os.path.basename(example))[0] |
| 80 | + |
| 81 | + print(f"{example_name}: Testing...") |
| 82 | + |
| 83 | + for entry in test_data: |
| 84 | + |
| 85 | + expectation = entry.get("expectation", None) |
| 86 | + command = entry.get("command", None) |
| 87 | + repeat = entry.get("repeat", 1) |
| 88 | + |
| 89 | + for _ in range(0, repeat): |
| 90 | + if command != None: |
| 91 | + command_stripped = command.strip("\r") |
| 92 | + logging.info(f"\tTesting command: {command_stripped}") |
| 93 | + |
| 94 | + serial_handle.write(str.encode(command)) |
| 95 | + serial_handle.flush() |
| 96 | + |
| 97 | + # Read until line feed |
| 98 | + output = serial_handle.read_until().decode("utf-8") |
| 99 | + |
| 100 | + response = re.search(expectation, output) |
| 101 | + |
| 102 | + if response == None: |
| 103 | + formatted_output = output.replace("\r", "\\r").replace("\n", "\\n") |
| 104 | + logging.error(f"\tDid not get the expected response \"{expectation}\", got: \"{formatted_output}\"") |
| 105 | + logging.error(f"{example_name}: Failed") |
| 106 | + return False |
| 107 | + |
| 108 | + formatted_response = response.group(0).replace("\r", "\\r").replace("\n", "\\n") |
| 109 | + |
| 110 | + logging.info(f"\tGot valid response: {formatted_response}") |
| 111 | + |
| 112 | + print(f"{example_name}: Passed") |
| 113 | + return True |
| 114 | + |
| 115 | + |
| 116 | +if __name__ == '__main__': |
| 117 | + parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, |
| 118 | + description=textwrap.dedent('''\ |
| 119 | +Automated tests for AVR-IoT Cellular Library |
| 120 | + '''), |
| 121 | + epilog=textwrap.dedent('''\ |
| 122 | +Usage: |
| 123 | + - python test.py PORT |
| 124 | +''')) |
| 125 | + |
| 126 | + parser.add_argument("port", |
| 127 | + type=str, |
| 128 | + help="Device port (COMx, /dev/ttyACMx etc.)") |
| 129 | + |
| 130 | + parser.add_argument("-d", |
| 131 | + "--device", |
| 132 | + type=str, |
| 133 | + help="Device to test against", |
| 134 | + default="avr128db48") |
| 135 | + |
| 136 | + parser.add_argument("-v", |
| 137 | + "--verbose", |
| 138 | + type=str, |
| 139 | + help="Logging verbosity level (critical, error, warning, info, debug)", |
| 140 | + default="warn") |
| 141 | + |
| 142 | + parser.add_argument("-t", |
| 143 | + "--testsfile", |
| 144 | + type=str, |
| 145 | + help="Path to JSON file with tests", |
| 146 | + default="tests.json") |
| 147 | + |
| 148 | + parser.add_argument("-e", |
| 149 | + "--examplesdir", |
| 150 | + type=str, |
| 151 | + help="Relative path to the example directory", |
| 152 | + default="../examples") |
| 153 | + |
| 154 | + parser.add_argument("-b", |
| 155 | + "--builddir", |
| 156 | + type=str, |
| 157 | + help="Relative path to the build directory", |
| 158 | + default="build") |
| 159 | + |
| 160 | + arguments = parser.parse_args() |
| 161 | + |
| 162 | + logging.basicConfig(format="%(levelname)s: %(message)s", |
| 163 | + level=getattr(logging, arguments.verbose.upper())) |
| 164 | + |
| 165 | + session_config = SessionConfig(arguments.device) |
| 166 | + |
| 167 | + transport = ToolUsbHidConnection() |
| 168 | + |
| 169 | + backend = Backend() |
| 170 | + |
| 171 | + try: |
| 172 | + backend.connect_to_tool(transport) |
| 173 | + except Exception as exception: |
| 174 | + logging.error(f"Failed to connect to tool: {exception}") |
| 175 | + exit(1) |
| 176 | + |
| 177 | + # Remove the build dir if it already exists |
| 178 | + if os.path.exists(arguments.builddir): |
| 179 | + for root, _, files in os.walk(arguments.builddir, topdown=False): |
| 180 | + for name in files: |
| 181 | + os.remove(os.path.join(root, name)) |
| 182 | + |
| 183 | + examples = retrieve_examples(arguments.examplesdir) |
| 184 | + |
| 185 | + test_config = {} |
| 186 | + |
| 187 | + with open(arguments.testsfile) as file: |
| 188 | + test_config = json.load(file) |
| 189 | + |
| 190 | + examples_test_status = {} |
| 191 | + |
| 192 | + for example in examples: |
| 193 | + |
| 194 | + example_name = os.path.splitext(os.path.basename(example))[0] |
| 195 | + |
| 196 | + if not example_name in test_config: |
| 197 | + examples_test_status[example_name] = "No test defined" |
| 198 | + continue |
| 199 | + |
| 200 | + if not test_config[example_name]["enabled"]: |
| 201 | + logging.warning(f"Skipping test for {example_name}, not enabled") |
| 202 | + examples_test_status[example_name] = "Test disabled" |
| 203 | + continue |
| 204 | + |
| 205 | + backend.start_session(session_config) |
| 206 | + if not program(example, arguments.builddir): |
| 207 | + exit(1) |
| 208 | + |
| 209 | + try: |
| 210 | + # Start the serial session before before we close the pymcuprog session |
| 211 | + # so that the board is reset and running the programmed code |
| 212 | + with serial.Serial(arguments.port, 115200, timeout=TIMEOUT) as serial_handle: |
| 213 | + backend.end_session() |
| 214 | + if test(example, test_config[example_name]["tests"], serial_handle): |
| 215 | + examples_test_status[example_name] = "Passed" |
| 216 | + else: |
| 217 | + examples_test_status[example_name] = "Not passed" |
| 218 | + exit(1) |
| 219 | + |
| 220 | + except serial.SerialException as exception: |
| 221 | + logging.error(f"Got exception while opening serial port: {exception}") |
| 222 | + exit(1) |
| 223 | + |
| 224 | + print("") |
| 225 | + |
| 226 | + backend.disconnect_from_tool() |
| 227 | + |
| 228 | + print("--------------- Test status ---------------") |
| 229 | + for example_name, status in examples_test_status.items(): |
| 230 | + print(f"{example_name:<30}: {status}") |
| 231 | + |
| 232 | + exit(0) |
0 commit comments