diff --git a/README.md b/README.md index c94262e..84839f2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ A web-based G-code visualizer and job management tool for pen plotters. - Line color based on Z height (lower Z = darker color) - Bounding box display around drawing area - Manual jogging controls for X/Y/Z -- Job sending functionality (stub for now) +- Job sending functionality to GRBL-compatible plotters - Current tool position shown as a red dot ## Setup and Installation @@ -28,45 +28,82 @@ A web-based G-code visualizer and job management tool for pen plotters. source venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. Install the dependencies: - ``` + ```bash pip install -r requirements.txt ``` -4. Run the application: - ``` + This will install Flask, pyserial, and other necessary packages. +4. Configure Plotter Connection (Optional): + If you intend to connect to a GRBL plotter, you might need to update the serial port and baud rate settings in `app.py`. See the "Plotter Backend > Configuration" section for details. +5. Run the application: + ```bash python app.py ``` -5. Open your browser and navigate to `http://127.0.0.1:5000` +6. Open your browser and navigate to `http://127.0.0.1:5000` (or the host/port shown in the terminal, e.g., `http://0.0.0.0:5000`). ## Usage - Paste G-code into the text area or upload a .gcode file - Click "Visualize" to render the G-code on the canvas -- Use jogging controls to move the virtual tool -- Click "Send Job" to send the job to the plotter (currently just logs to console) +- Use jogging controls to move the virtual tool (visualization only) +- Click "Send Job" to send the G-code to your configured GRBL plotter. ## Project Structure -- `app.py` - Flask application entry point +- `app.py` - Flask application entry point, includes GRBL communication logic. +- `grbl_communicator.py` - Module for handling serial communication with GRBL devices. +- `requirements.txt` - Python package dependencies. - `templates/` - HTML templates - `static/` - Static assets - `css/style.css` - Main stylesheet - `js/gcode-parser.js` - G-code parsing logic - `js/gcode-renderer.js` - Canvas rendering logic - `js/visualizer.js` - Main application logic +- `test_grbl_communicator.py` - Unit tests for `grbl_communicator.py`. ## Technical Details - Built as a Flask web application - Frontend uses HTML5 Canvas for visualization +- Backend uses `pyserial` for GRBL communication - Modular JavaScript architecture: - GCodeParser class for parsing G-code - GCodeRenderer class for canvas rendering - Modern JavaScript (ES6+) - Responsive design +## Plotter Backend + +This application now supports sending G-code jobs directly to GRBL-compatible pen plotters. The backend uses the `pyserial` library to communicate with the plotter over a serial connection, managed by the `GRBLCommunicator` class found in `grbl_communicator.py`. + +### Configuration + +To connect to your plotter, you may need to configure the serial port and baud rate: + +- **Location**: These settings are defined as constants at the top of the `app.py` file: + - `DEFAULT_SERIAL_PORT`: The serial port your plotter is connected to. + - `DEFAULT_BAUDRATE`: The baud rate your plotter uses for communication. +- **Default Values**: + - `DEFAULT_SERIAL_PORT = "/dev/ttyUSB0"` + - `DEFAULT_BAUDRATE = 115200` +- **Finding Your Serial Port**: + - **Linux**: Check common device paths like `/dev/ttyUSB*` or `/dev/ttyACM*`. You can use the command `dmesg | grep -i "ttyS\\|ttyUSB\\|ttyACM"` (run as root or with sudo if needed) shortly after connecting your device to see which port it's assigned. + - **Windows**: Open Device Manager (search for "Device Manager" in the Start Menu). Look under "Ports (COM & LPT)". Your plotter will likely appear as a "USB Serial Port" or similar, listed with a `COMx` number (e.g., `COM3`). + - **macOS**: Check devices starting with `/dev/tty.*`. In the terminal, run `ls /dev/tty.*`. Common names include `/dev/tty.usbmodemXXXX` or `/dev/tty.usbserial-XXXX`. + +### Dependencies + +Serial communication with the plotter requires the `pyserial` library. This library is listed in the `requirements.txt` file. + +To ensure all dependencies, including `pyserial`, are installed, run the following command in your activated virtual environment (as mentioned in the "Setup and Installation" section): +```bash +pip install -r requirements.txt +``` + ## Future Enhancements -- Real plotter communication via serial port +- Real-time job progress and status feedback from the plotter - Job queue management - G-code generation from SVG files -- More advanced visualization features \ No newline at end of file +- More advanced visualization features (e.g., toolpath simulation) +- Emergency stop / pause / resume functionality for plotter jobs +- Web-based configuration for serial port/baud rate \ No newline at end of file diff --git a/__pycache__/grbl_communicator.cpython-310.pyc b/__pycache__/grbl_communicator.cpython-310.pyc new file mode 100644 index 0000000..f907b5c Binary files /dev/null and b/__pycache__/grbl_communicator.cpython-310.pyc differ diff --git a/__pycache__/test_grbl_communicator.cpython-310.pyc b/__pycache__/test_grbl_communicator.cpython-310.pyc new file mode 100644 index 0000000..d4a8d15 Binary files /dev/null and b/__pycache__/test_grbl_communicator.cpython-310.pyc differ diff --git a/app.py b/app.py index 07c3ad3..ded7167 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,14 @@ from flask import Flask, render_template, request, jsonify import os +import logging +from grbl_communicator import GRBLCommunicator + +# Configure basic logging for the app +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Default GRBL connection parameters +DEFAULT_SERIAL_PORT = "/dev/ttyUSB0" # Common for Linux, use "COMx" for Windows if needed +DEFAULT_BAUDRATE = 115200 app = Flask(__name__) @@ -9,14 +18,56 @@ def index(): @app.route('/api/send_job', methods=['POST']) def send_job(): - # This is a stub function that would send the job to the plotter gcode = request.json.get('gcode', '') - # For now, just return success and the first few lines for confirmation - return jsonify({ - 'status': 'success', - 'message': 'Job sent to plotter', - 'preview': '\n'.join(gcode.split('\n')[:5]) + '...' if gcode else 'No G-code provided' - }) + + if not gcode: + logging.warning("send_job called with no G-code.") + return jsonify({'status': 'error', 'message': 'No G-code provided.'}), 400 + + logging.info(f"Received G-code job. First 50 chars: {gcode[:50]}...") + + communicator = None # Initialize communicator to None for the finally block + try: + communicator = GRBLCommunicator(port=DEFAULT_SERIAL_PORT, baudrate=DEFAULT_BAUDRATE) + logging.info(f"Attempting to connect to GRBL on {DEFAULT_SERIAL_PORT} at {DEFAULT_BAUDRATE} baud.") + communicator.connect() + logging.info("Successfully connected to GRBL.") + + gcode_lines = gcode.strip().split('\n') + total_lines = len(gcode_lines) + logging.info(f"Starting to send {total_lines} G-code lines.") + + for i, line in enumerate(gcode_lines): + line = line.strip() + if not line or line.startswith(';') or line.startswith('('): # Ignore empty lines and comments + logging.info(f"Skipping empty line or comment: {line}") + continue + + logging.info(f"Sending line {i+1}/{total_lines}: {line}") + communicator.send_command(line) + logging.info(f"Successfully sent line: {line}") + + logging.info("All G-code commands sent successfully.") + return jsonify({'status': 'success', 'message': 'Job sent to plotter successfully.'}) + + except ConnectionError as e: + logging.error(f"Connection failed: {e}") + return jsonify({'status': 'error', 'message': f'Connection failed: {str(e)}'}), 500 + except RuntimeError as e: # GRBL reported error + logging.error(f"GRBL error: {e}") + return jsonify({'status': 'error', 'message': f'GRBL error: {str(e)}'}), 400 + except TimeoutError as e: + logging.error(f"GRBL timeout: {e}") + return jsonify({'status': 'error', 'message': f'GRBL timeout: {str(e)}'}), 400 + except Exception as e: # Catch any other unexpected errors + logging.error(f"An unexpected error occurred: {e}", exc_info=True) # Log stack trace + return jsonify({'status': 'error', 'message': f'An unexpected error occurred: {str(e)}'}), 500 + finally: + if communicator: + logging.info("Closing GRBL connection.") + communicator.close() if __name__ == '__main__': - app.run(debug=True) + # Use host='0.0.0.0' to make it accessible from the network if needed for testing + # debug=True is useful for development but should be False in production + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/grbl_communicator.py b/grbl_communicator.py new file mode 100644 index 0000000..bf5ed34 --- /dev/null +++ b/grbl_communicator.py @@ -0,0 +1,170 @@ +import serial +import time +import logging + +# Configure basic logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +class GRBLCommunicator: + def __init__(self, port, baudrate): + self.port = port + self.baudrate = baudrate + self.ser = None + logging.info(f"GRBLCommunicator initialized with port={self.port}, baudrate={self.baudrate}") + + def connect(self): + logging.info(f"Attempting to connect to GRBL on {self.port} at {self.baudrate} baud.") + try: + self.ser = serial.Serial(self.port, self.baudrate, timeout=2) + logging.info("Serial port opened. Waiting for GRBL to initialize...") + time.sleep(2) # Wait for GRBL to initialize + + # Wake up GRBL and clear buffer + # A soft reset (Ctrl-X) or a newline can be used. + # Ctrl-X is often \x18 in bytes. + # Sending a newline might be safer initially. + self.ser.flushInput() # Clear input buffer before sending commands + + # Sending a newline character to ensure GRBL is awake and to clear any partial commands. + self.ser.write(b'\n') + # Alternative: self.ser.write(b'\x18') # Soft reset + + # Read and discard startup messages. + # GRBL usually sends a welcome message like "Grbl 1.1h ['$' for help]" + # We'll read for a short period or until no more data comes. + startup_message = "" + start_time = time.time() + while time.time() - start_time < 2: # Read for up to 2 seconds + if self.ser.in_waiting > 0: + line = self.ser.readline().decode('utf-8', errors='ignore').strip() + if line: + startup_message += line + "\n" + logging.info(f"GRBL Startup: {line}") + else: + time.sleep(0.05) # Small delay to avoid busy waiting + + if not startup_message: + logging.warning("No startup message received from GRBL. This might be okay.") + + # A more robust check would be to send a status command '?' and expect 'ok' + # For now, we assume connection is successful if no serial exception occurred. + logging.info("GRBL connection established.") + + except serial.SerialException as e: + logging.error(f"Failed to connect to GRBL: {e}") + self.ser = None # Ensure ser is None if connection failed + raise ConnectionError(f"Failed to connect to GRBL on {self.port}: {e}") + except Exception as e: # Catch any other unexpected errors during connection + logging.error(f"An unexpected error occurred during connection: {e}") + if self.ser and self.ser.is_open: + self.ser.close() + self.ser = None + raise ConnectionError(f"An unexpected error occurred on {self.port}: {e}") + + + def send_command(self, gcode_line, command_timeout=10.0): + if not self.ser or not self.ser.is_open: + logging.error("Serial port not open. Cannot send command.") + raise ConnectionError("Serial port not open. Connect first.") + + try: + command = gcode_line.strip() + '\n' + logging.info(f"Sending G-code command: {command.strip()}") + self.ser.write(command.encode('utf-8')) + + response_buffer = "" + start_time = time.time() + + while time.time() - start_time < command_timeout: + if self.ser.in_waiting > 0: + char = self.ser.read().decode('utf-8', errors='ignore') + response_buffer += char + if response_buffer.endswith('ok\r\n'): + logging.info(f"Received 'ok' for command: {command.strip()}") + return True + if response_buffer.strip().startswith('error:'): + # Try to read the rest of the error line until a newline + # or timeout to capture the full error message. + while not response_buffer.endswith('\n') and (time.time() - start_time < command_timeout): + if self.ser.in_waiting > 0: + char_more = self.ser.read().decode('utf-8', errors='ignore') + response_buffer += char_more + else: + time.sleep(0.005) # Shorter sleep while actively fetching rest of error + + error_message = response_buffer.strip() + logging.error(f"GRBL error for command '{command.strip()}': {error_message}") + raise RuntimeError(f"GRBL error: {error_message}") + else: + time.sleep(0.01) # Short sleep to prevent busy-waiting + + logging.error(f"Timeout waiting for response to command: {command.strip()}") + raise TimeoutError(f"Timeout waiting for 'ok' or 'error' from GRBL for command: {command.strip()}") + + except serial.SerialException as e: + logging.error(f"Serial communication error during send_command: {e}") + self.close() # Attempt to close port on error + raise ConnectionError(f"Serial communication error: {e}") + except RuntimeError: # Re-raise RuntimeError from GRBL error + raise + except TimeoutError: # Re-raise TimeoutError + raise + except Exception as e: # Catch any other unexpected errors + logging.error(f"An unexpected error occurred during send_command: {e}") + self.close() + raise RuntimeError(f"An unexpected error occurred sending command: {e}") + + def close(self): + if self.ser and self.ser.is_open: + try: + self.ser.close() + logging.info("Serial port closed.") + except serial.SerialException as e: + logging.error(f"Error closing serial port: {e}") + else: + logging.info("Serial port was not open or already closed.") + self.ser = None + +if __name__ == '__main__': + # This is example usage, not part of the class itself. + # It's good for basic testing if you have a GRBL device connected. + # For actual use, this would be imported into another script. + + # Replace with your actual port and baudrate + # On Linux, it might be /dev/ttyUSB0 or /dev/ttyACM0 + # On Windows, it might be COM3, COM4, etc. + GRBL_PORT = "/dev/ttyUSB0" # Example, change this! + GRBL_BAUDRATE = 115200 + + # Example of how to use the GRBLCommunicator + # Note: This will try to connect to a real device. + # If you don't have one, this part will fail. + + # communicator = None + # try: + # communicator = GRBLCommunicator(port=GRBL_PORT, baudrate=GRBL_BAUDRATE) + # communicator.connect() + + # # Example commands + # # Ensure GRBL is in a known state (e.g., by homing or unlocking if needed) + # # For testing, simple status commands are safest. + # if communicator.send_command("$#"): # View G-code parameters + # print("Successfully sent '$#' and got ok") + + # if communicator.send_command("G0 X1 F100"): # Simple move command + # print("Successfully sent 'G0 X1 F100' and got ok") + + # except ConnectionError as e: + # print(f"Connection Error: {e}") + # except RuntimeError as e: + # print(f"Runtime Error: {e}") + # except TimeoutError as e: + # print(f"Timeout Error: {e}") + # except Exception as e: + # print(f"An unexpected error occurred: {e}") + # finally: + # if communicator: + # communicator.close() + + print("GRBLCommunicator class defined. Example usage (commented out) requires a GRBL device.") + print("To use this class, import it into your main application script.") diff --git a/requirements.txt b/requirements.txt index 8c33811..fe29ec8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -flask==2.3.3 \ No newline at end of file +flask==2.3.3 +pyserial \ No newline at end of file diff --git a/test_grbl_communicator.py b/test_grbl_communicator.py new file mode 100644 index 0000000..880983b --- /dev/null +++ b/test_grbl_communicator.py @@ -0,0 +1,280 @@ +import unittest +from unittest.mock import patch, MagicMock +import serial # Import the actual serial module to check for its exceptions +from grbl_communicator import GRBLCommunicator, logging + +# Disable logging for tests to keep output clean +logging.disable(logging.CRITICAL) + +class TestGRBLCommunicatorInit(unittest.TestCase): + def test_init_stores_port_and_baudrate(self): + port = "/dev/testport" + baudrate = 115200 + communicator = GRBLCommunicator(port, baudrate) + self.assertEqual(communicator.port, port) + self.assertEqual(communicator.baudrate, baudrate) + self.assertIsNone(communicator.ser) + +class TestGRBLCommunicatorConnect(unittest.TestCase): + @patch('grbl_communicator.serial.Serial') + def test_connect_successful(self, mock_serial_class): + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + mock_ser_instance.in_waiting = 0 # Default to no data initially + + # Simulate GRBL startup message + grbl_startup_message = b"Grbl 1.1h ['$' for help]\r\n" + + # Configure side_effect for readline to simulate GRBL sending data + # Then no more data (which will make in_waiting effectively 0) + def readline_side_effect(*args, **kwargs): + if readline_side_effect.called_once: + mock_ser_instance.in_waiting = len(grbl_startup_message) + return grbl_startup_message + mock_ser_instance.in_waiting = 0 + return b'' + readline_side_effect.called_once = False + + def read_side_effect(size=1): + nonlocal grbl_startup_message + if grbl_startup_message: + data_to_return = grbl_startup_message[:size] + grbl_startup_message = grbl_startup_message[size:] + mock_ser_instance.in_waiting = len(grbl_startup_message) + return data_to_return + mock_ser_instance.in_waiting = 0 + return b'' + + # More robust way to handle readline for startup: + # Configure readline to return the message then empty bytes. + # Configure in_waiting to simulate data available then not. + mock_ser_instance.readline.side_effect = [grbl_startup_message, b"", b""] # Subsequent calls return empty + + # This function will be called by the code for self.ser.in_waiting + # It needs to reflect that data is available for the first readline call. + # Configure readline to return the message then empty bytes. + mock_ser_instance.readline.side_effect = [grbl_startup_message, b"", b""] # Subsequent calls return empty + + # This function will be called by the code for self.ser.in_waiting + # It needs to reflect that data is available for the first readline call. + def in_waiting_side_effect(*args, **kwargs): # Add *args, **kwargs for PropertyMock + if mock_ser_instance.readline.call_count == 0: # Before first readline call + return len(grbl_startup_message) + # After the first call to readline (which consumes the startup message), + # or if readline has been called multiple times for other reasons. + return 0 + + # To make in_waiting a callable property that behaves like an attribute: + # We use a PropertyMock for 'in_waiting' + type(mock_ser_instance).in_waiting = unittest.mock.PropertyMock(side_effect=in_waiting_side_effect) + # Ensure 'in_waiting' is treated as a property, not a method. + # This makes `self.ser.in_waiting` (no parentheses) call in_waiting_side_effect. + + mock_serial_class.return_value = mock_ser_instance + + communicator = GRBLCommunicator("/dev/testport", 115200) + + # Patch time.sleep to avoid actual sleeping during test + with patch('grbl_communicator.time.sleep'): + communicator.connect() + + mock_serial_class.assert_called_once_with("/dev/testport", 115200, timeout=2) + mock_ser_instance.flushInput.assert_called_once() + mock_ser_instance.write.assert_called_once_with(b'\n') + # Check readline was called at least once for the startup message, + # and potentially more until timeout logic is met. + self.assertGreaterEqual(mock_ser_instance.readline.call_count, 1) + self.assertIsNotNone(communicator.ser) + + @patch('grbl_communicator.serial.Serial') + def test_connect_successful_no_startup_message(self, mock_serial_class): + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + mock_ser_instance.in_waiting = 0 + mock_ser_instance.readline.return_value = b"" # No startup message + mock_serial_class.return_value = mock_ser_instance + + communicator = GRBLCommunicator("/dev/testport", 115200) + # Should not raise an error, but log a warning (logging is disabled for test) + communicator.connect() + + mock_serial_class.assert_called_once_with("/dev/testport", 115200, timeout=2) + mock_ser_instance.flushInput.assert_called_once() + mock_ser_instance.write.assert_called_once_with(b'\n') + self.assertIsNotNone(communicator.ser) + + @patch('grbl_communicator.serial.Serial', side_effect=serial.SerialException("Connection failed")) + def test_connect_serial_exception_raises_connection_error(self, mock_serial_class): + communicator = GRBLCommunicator("/dev/testport", 115200) + with self.assertRaisesRegex(ConnectionError, "Failed to connect to GRBL on /dev/testport: Connection failed"): + communicator.connect() + self.assertIsNone(communicator.ser) + + @patch('grbl_communicator.serial.Serial', side_effect=Exception("Some other error")) + def test_connect_other_exception_raises_connection_error(self, mock_serial_class): + # Ensure ser.close is not called if ser is not successfully created + mock_ser_instance = MagicMock() + mock_serial_class.return_value = mock_ser_instance # To test closing logic + mock_serial_class.side_effect = Exception("Some other error") + + + communicator = GRBLCommunicator("/dev/testport", 115200) + with self.assertRaisesRegex(ConnectionError, "An unexpected error occurred on /dev/testport: Some other error"): + communicator.connect() + self.assertIsNone(communicator.ser) + # Check that if self.ser was somehow assigned before the exception, close would be attempted + # This is tricky because the exception happens during serial.Serial() call itself in this mock. + # If the exception happened *after* self.ser = serial.Serial(...), then ser.close() would be called. + # The current code structure in GRBLCommunicator handles this well. + +class TestGRBLCommunicatorSendCommand(unittest.TestCase): + def setUp(self): + self.communicator = GRBLCommunicator("/dev/testport", 115200) + self.mock_ser_instance = MagicMock() + self.mock_ser_instance.is_open = True + self.communicator.ser = self.mock_ser_instance # Simulate connected state + + def test_send_command_successful(self): + gcode_command = "G0 X10" + expected_encoded_command = b"G0 X10\n" + + # Simulate GRBL's response: 'ok\r\n' + # The send_command reads one char at a time. + response_chars = list(b'ok\r\n') + def read_side_effect(size=1): + if read_side_effect.buffer: + char = read_side_effect.buffer.pop(0) + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + return bytes([char]) # Return as bytes + self.mock_ser_instance.in_waiting = 0 + return b'' + read_side_effect.buffer = list(b'ok\r\n') + self.mock_ser_instance.read.side_effect = read_side_effect + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + + result = self.communicator.send_command(gcode_command) + + self.mock_ser_instance.write.assert_called_once_with(expected_encoded_command) + self.assertTrue(result) + + def test_send_command_grbl_error(self): + gcode_command = "G0 X10" + + response_chars = list(b'error: Invalid GCode\r\n') + def read_side_effect(size=1): + if read_side_effect.buffer: + char = read_side_effect.buffer.pop(0) + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + return bytes([char]) + self.mock_ser_instance.in_waiting = 0 + return b'' + read_side_effect.buffer = list(b'error: Invalid GCode\r\n') + self.mock_ser_instance.read.side_effect = read_side_effect + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + + with self.assertRaisesRegex(RuntimeError, "GRBL error: error: Invalid GCode"): + self.communicator.send_command(gcode_command) + self.mock_ser_instance.write.assert_called_once_with(b"G0 X10\n") + + @patch('grbl_communicator.time.time') + def test_send_command_timeout(self, mock_time): + gcode_command = "G0 X10" + command_timeout = 0.1 # Use a small timeout for testing + + # Simulate time passing beyond timeout. + # time.time() is called once at the start of the loop, then in each iteration. + # The loop condition is `time.time() - start_time < command_timeout` + # So, first call to time.time() is start_time (0). + # Subsequent calls should make the condition false. + # Iteration 1: time() -> 0.0 (start_time), time() -> 0.05. 0.05-0 < 0.1 is true. + # Iteration 2: time() -> 0.11. 0.11-0 < 0.1 is false. Loop terminates. + mock_time.side_effect = [0.0, 0.05, 0.11] + + self.mock_ser_instance.in_waiting = 0 + self.mock_ser_instance.read.return_value = b'' # No response + + # Patch time.sleep inside send_command + with patch('grbl_communicator.time.sleep'): + with self.assertRaisesRegex(TimeoutError, f"Timeout waiting for 'ok' or 'error' from GRBL for command: {gcode_command}"): + self.communicator.send_command(gcode_command, command_timeout=command_timeout) + + self.mock_ser_instance.write.assert_called_once_with(b"G0 X10\n") + + + def test_send_command_not_connected(self): + self.communicator.ser = None # Simulate not connected + with self.assertRaisesRegex(ConnectionError, "Serial port not open. Connect first."): + self.communicator.send_command("G0 X10") + + self.communicator.ser = MagicMock() + self.communicator.ser.is_open = False # Simulate connected but port closed + with self.assertRaisesRegex(ConnectionError, "Serial port not open. Connect first."): + self.communicator.send_command("G0 X10") + + def test_send_command_serial_exception_on_write(self): + self.mock_ser_instance.write.side_effect = serial.SerialException("Write failed") + with self.assertRaisesRegex(ConnectionError, "Serial communication error: Write failed"): + self.communicator.send_command("G0 X10") + self.mock_ser_instance.close.assert_called_once() # Ensure port is closed on error + + def test_send_command_serial_exception_on_read(self): + self.mock_ser_instance.read.side_effect = serial.SerialException("Read failed") + self.mock_ser_instance.in_waiting = 1 # Simulate data available to trigger read + + with self.assertRaisesRegex(ConnectionError, "Serial communication error: Read failed"): + self.communicator.send_command("G0 X10") + self.mock_ser_instance.close.assert_called_once() + + +class TestGRBLCommunicatorClose(unittest.TestCase): + def test_close_closes_open_port(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + communicator.ser = mock_ser_instance + + communicator.close() + mock_ser_instance.close.assert_called_once() + self.assertIsNone(communicator.ser) + + def test_close_handles_already_closed_port(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = False # Port is already closed + communicator.ser = mock_ser_instance + + communicator.close() + # ser.close() should not be called if port is not open, + # but the implementation calls it and serial.Serial handles it gracefully. + # For strictness, we could check it's not called, but current code calls it. + # Let's verify it doesn't raise an error. + mock_ser_instance.close.assert_not_called() # Ideal, but current code might call it. + # The GRBLCommunicator's close checks `is_open`. + self.assertIsNone(communicator.ser) + + + def test_close_handles_no_serial_object(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + communicator.ser = None # No serial object + + communicator.close() # Should not raise any error + self.assertIsNone(communicator.ser) + + def test_close_serial_exception_on_close(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + mock_ser_instance.close.side_effect = serial.SerialException("Failed to close") + communicator.ser = mock_ser_instance + + # The method should catch the exception and log an error, but not re-raise + try: + communicator.close() + except serial.SerialException: + self.fail("communicator.close() raised SerialException unexpectedly!") + + mock_ser_instance.close.assert_called_once() + self.assertIsNone(communicator.ser) # ser should still be set to None + +if __name__ == '__main__': + unittest.main(verbosity=2)