From b4bdd2c6679a14264ccc23e60b2bf0a972c10fa5 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Tue, 20 May 2025 04:23:53 +0000 Subject: [PATCH] feat: Add GRBL plotter backend for G-code sending I've implemented a backend system to send G-code commands directly to a GRBL-based pen plotter via a serial connection. Key changes include: - Added `grbl_communicator.py` with a `GRBLCommunicator` class to handle serial connection, G-code command sending, and GRBL response parsing ('ok', 'error'). - Updated the `/api/send_job` route in `app.py` to utilize `GRBLCommunicator` for processing and sending G-code jobs. Includes default serial port and baud rate configuration. - Added `pyserial` to `requirements.txt` for serial communication. - Implemented comprehensive unit tests for `GRBLCommunicator` in `test_grbl_communicator.py`, using `unittest.mock` to simulate serial port interactions. - Updated `README.md` with details on the new backend, configuration instructions for serial port/baud rate, and dependency information. The system now allows you to send G-code from the frontend, which is then relayed to the connected GRBL device. Error handling for connection issues and GRBL errors is included. --- README.md | 57 +++- __pycache__/grbl_communicator.cpython-310.pyc | Bin 0 -> 4170 bytes .../test_grbl_communicator.cpython-310.pyc | Bin 0 -> 9351 bytes app.py | 67 ++++- grbl_communicator.py | 170 +++++++++++ requirements.txt | 3 +- test_grbl_communicator.py | 280 ++++++++++++++++++ 7 files changed, 558 insertions(+), 19 deletions(-) create mode 100644 __pycache__/grbl_communicator.cpython-310.pyc create mode 100644 __pycache__/test_grbl_communicator.cpython-310.pyc create mode 100644 grbl_communicator.py create mode 100644 test_grbl_communicator.py diff --git a/README.md b/README.md index c94262e..84839f2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ A web-based G-code visualizer and job management tool for pen plotters. - Line color based on Z height (lower Z = darker color) - Bounding box display around drawing area - Manual jogging controls for X/Y/Z -- Job sending functionality (stub for now) +- Job sending functionality to GRBL-compatible plotters - Current tool position shown as a red dot ## Setup and Installation @@ -28,45 +28,82 @@ A web-based G-code visualizer and job management tool for pen plotters. source venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. Install the dependencies: - ``` + ```bash pip install -r requirements.txt ``` -4. Run the application: - ``` + This will install Flask, pyserial, and other necessary packages. +4. Configure Plotter Connection (Optional): + If you intend to connect to a GRBL plotter, you might need to update the serial port and baud rate settings in `app.py`. See the "Plotter Backend > Configuration" section for details. +5. Run the application: + ```bash python app.py ``` -5. Open your browser and navigate to `http://127.0.0.1:5000` +6. Open your browser and navigate to `http://127.0.0.1:5000` (or the host/port shown in the terminal, e.g., `http://0.0.0.0:5000`). ## Usage - Paste G-code into the text area or upload a .gcode file - Click "Visualize" to render the G-code on the canvas -- Use jogging controls to move the virtual tool -- Click "Send Job" to send the job to the plotter (currently just logs to console) +- Use jogging controls to move the virtual tool (visualization only) +- Click "Send Job" to send the G-code to your configured GRBL plotter. ## Project Structure -- `app.py` - Flask application entry point +- `app.py` - Flask application entry point, includes GRBL communication logic. +- `grbl_communicator.py` - Module for handling serial communication with GRBL devices. +- `requirements.txt` - Python package dependencies. - `templates/` - HTML templates - `static/` - Static assets - `css/style.css` - Main stylesheet - `js/gcode-parser.js` - G-code parsing logic - `js/gcode-renderer.js` - Canvas rendering logic - `js/visualizer.js` - Main application logic +- `test_grbl_communicator.py` - Unit tests for `grbl_communicator.py`. ## Technical Details - Built as a Flask web application - Frontend uses HTML5 Canvas for visualization +- Backend uses `pyserial` for GRBL communication - Modular JavaScript architecture: - GCodeParser class for parsing G-code - GCodeRenderer class for canvas rendering - Modern JavaScript (ES6+) - Responsive design +## Plotter Backend + +This application now supports sending G-code jobs directly to GRBL-compatible pen plotters. The backend uses the `pyserial` library to communicate with the plotter over a serial connection, managed by the `GRBLCommunicator` class found in `grbl_communicator.py`. + +### Configuration + +To connect to your plotter, you may need to configure the serial port and baud rate: + +- **Location**: These settings are defined as constants at the top of the `app.py` file: + - `DEFAULT_SERIAL_PORT`: The serial port your plotter is connected to. + - `DEFAULT_BAUDRATE`: The baud rate your plotter uses for communication. +- **Default Values**: + - `DEFAULT_SERIAL_PORT = "/dev/ttyUSB0"` + - `DEFAULT_BAUDRATE = 115200` +- **Finding Your Serial Port**: + - **Linux**: Check common device paths like `/dev/ttyUSB*` or `/dev/ttyACM*`. You can use the command `dmesg | grep -i "ttyS\\|ttyUSB\\|ttyACM"` (run as root or with sudo if needed) shortly after connecting your device to see which port it's assigned. + - **Windows**: Open Device Manager (search for "Device Manager" in the Start Menu). Look under "Ports (COM & LPT)". Your plotter will likely appear as a "USB Serial Port" or similar, listed with a `COMx` number (e.g., `COM3`). + - **macOS**: Check devices starting with `/dev/tty.*`. In the terminal, run `ls /dev/tty.*`. Common names include `/dev/tty.usbmodemXXXX` or `/dev/tty.usbserial-XXXX`. + +### Dependencies + +Serial communication with the plotter requires the `pyserial` library. This library is listed in the `requirements.txt` file. + +To ensure all dependencies, including `pyserial`, are installed, run the following command in your activated virtual environment (as mentioned in the "Setup and Installation" section): +```bash +pip install -r requirements.txt +``` + ## Future Enhancements -- Real plotter communication via serial port +- Real-time job progress and status feedback from the plotter - Job queue management - G-code generation from SVG files -- More advanced visualization features \ No newline at end of file +- More advanced visualization features (e.g., toolpath simulation) +- Emergency stop / pause / resume functionality for plotter jobs +- Web-based configuration for serial port/baud rate \ No newline at end of file diff --git a/__pycache__/grbl_communicator.cpython-310.pyc b/__pycache__/grbl_communicator.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f907b5c27aa8bbe941d41e9e00a439a62b5ed26f GIT binary patch literal 4170 zcmaJ^O>7&-72erhE|(NVQ9qVs$4;lIU73kZ8A00wY90T@F#;4yV978_6xpsgD`_Qi zmziB!=9V&o*g#JPdaN5HppL#2Xm33g?X~wF_Ez*z1U(h)p-oVv-l0r>hPIm2&zyCm zR_u4Ydc@E3GiTf09j_g@hxLvZMeY`Q>v~#HovF1M%8nbih{O0}pmG^+)By1`(z&S% zCiDkd(-4L*QJP{xSfcoVH4DNPCA3(gyr$JFY3;_%t3SRLb~?SlZ@FdRhec13&iNwx4){ z@A>gI?}jqIm`t4G8*WcXH})>p3#m!1sXb~+jmVQ}u^n!0`N39d`oU&cH&Qe5+MBWf z-6U!HSd$YdzB<0_cDu`4a-;3E##Yzp_EXz&NCC$wVPIr~Sd4zJvgG*Fsy0SrTMXky z4@sk-QfB!TdSY$Jlo9mY2nI^y^8%S6>BQzv0fr&tz#XFS(177AHOHb6adBu1WipJg zhNT0I!L;})i=r~5z0WYFc%bd;Svk-b5rDgz?9jNK2~pZL<T zVvaE|qyxtpVsb%yO9L;`wmN{0pX+uHgl@ayw5&G`0Sha%smDkv;9Bk99~l# zjvaBJHOb$rRnBGQ@EF~bYlb!k3-%HgTQOW^hwU0Wy_V}-;4hs4{p$0Q zwxXm>d9(w7l8wMceNO-Cmp)_M{(sY1t2)z05(W zEj?GX{lH5t;k80R^J3|D)8d{h145FijP>=qEe}x=25CX9C!M;Mg$kOkle5y&(U77a zITQ+Mq16r}uU?RZ2hFM6T~6+9V4eb~j8tef#oSO+s6_9AoMbwiXJz_X z6~nY_R$~>m$V|{0(?2Qewf~x>GOL+2y$0Z#XZ9ym^i(k;tLa}Si_bt=F6Qr$famn( zCy<`hKZLY*um!NKG+89N!=hTO4-LT-1mFy$?VJ2@z=nkbz}x~L2K|;W2L`o_Hb2cB z7SU!403pp4*1j2+2;lY$Sve>y0&Di$R$&ZxGi6>sZ`BOrfJKjp@qo@ zfXrG!q zaYN1{NR!1i%p`odah|sGh{p<-Gw7DnLUlY(v}owXgPxUC-qG(=d;qz4P zXNB1~{+objwSeZ-$c@fMZ*vp=ISUr4+1hqxvqaSC;A~GPmFGq$fP9^VQA#?7_?2-( zUO`{VSRqZKHh-z+Po14C=$Npr82KE}lS({_k?N_K&%R8H}xA zb`?iK1;+=yHT<#`{eQ`+XO4q$9iGT;DgQY<3M487jdEFmC6miZMM0&mfJG($1Yozd zfCEAbJ668|&_GL4#nqsQD*;^yRGxWT`w8;4eRhup<*eLedz!jH0G{)lF1t3EtI6y* zB&buC5*Ou5PF0Gn=qNKo48JixW6zEF2y{3$h1;eG>Z_?pf$H{HFsgt!%07-Fg#xs$ zs~~xvBUL2zFeW0Hfj8hAI*3og3;H)@qs)@|XY8()vRY^I95FA_b%ZW{j+2%hM3CsU z>AT`MANO2!C3GA)iJ|fhB0nIa^gW6x^+lo#i<0-&(tR!Pou-XWe>QPZ-LLA}+j5 zb)=;q_k8RR?q=CA`1>udkz862F$2IyDXQXlj{6`6|FaopJ2bO|LS zi(1Ih;s$o8@`eHeT3I@=;YNN7soAEFOW^yf?|qh@jJHkr*U%>2v$0g>!ujQ{`u literal 0 HcmV?d00001 diff --git a/__pycache__/test_grbl_communicator.cpython-310.pyc b/__pycache__/test_grbl_communicator.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d4a8d15ef18fae100e7e8b7a0fe2b314710b5440 GIT binary patch literal 9351 zcmb_iO^h4od7eKGhqGKRcP;-#YQ)%bBs%HZisQ6)ZAQk2+RM^a;(O**AGv%93Y zB=gPCvIs6vbX|CL5M1=u6a}atmp$hcAg7#r?qSed(E`2&5OUGQeV%uQMt!qEfwEyNq_G97UJg#sA;%mO%)@uAV+WNLpGq%l| zsVD7L%|g4`&TZQ@dplpt>spW>IYGWvxU1{hCC#_|+^3qK3$&KIYt|OfV*7ct0 zhOhg^r&`VQ&8u3~il4a_gweq%FL#;**u+4b!U}`-hFZYOs>m6NjSPeKHMNY! z``>)2-s{PV-ln?V_OfGEd$;1}xEZgBvwI=<8iXE~_Uawq8?R27gX-8A>~mgVJUpVE;$ ziL+xyb}Qd6hPvlQ3x3I80^+Tx=$Ci(&x=62HCnuj&3yoLTlm|B zIq5iOweZ86Ks^6G!Zp-eCBEw~`zv=a0=rlmmH8g-mPaN33F-$lXNCJ~J>ln5-8nY2 zT2GF3r~2gjHAUSFzX5UyK(3r!J-b!;+ta5`S2nt;vK6#@zgTo<5I~OdI?4CG39@Ij zH04J69^_6Q?m;Vsr-pBce##Q}FjA|U@j|2CZU?^C?KGfr?VuC8&5n1o-i%;&_Kn!8 ztIaUBZ`|bfz8N1A35QW#Mg5+)9fV50g2nB3qJ$NHh8Yykk~!;B zQ&461@&-nQG;>AW(Jgau^=qAU{u=vwypV+;O^%{jy?^4cwThbC!D`qJg5DRz$i807 zv#HyU?h!=pzOGK;yZV$OeByjF^t!#EBjQ)oT6KoxyX+OCZE>dybIG=(%eme1qUPev5o6Tq|b~oDnaO-lX*N@_r!-L}7 zP1S@?O}j2ZTv4FJ@2Vm<|{5!AjVf>Lj?LF~#@DpL#d;t^8uI$gv;Gba2aOcXu^k_gIyuw272 z%7%zVA6)zjLYcG7U^xvHn%z!iquzw;QQyI;GxDQqk#{1$b=*X7U_L`pgTZG6>D@KN zIU6IBGCAa&zcJ&SwGrez4NLQMlsG6CP;5^~IIx9ShpS<9cM$n7EJWt6|W0hk| zz*Sn9UtFqdl;A<2SjOYH)JI(9e)(#5JE(M{tw2>|-5CKU@oFkSUj#@66lJ6M5OYE8 z%*bMVnr0Ze$o8#NtmY^`SFA4_5G$g%Bza@zQO&KZACkKhwHus!yHn|Rf=_w~Jpw=3 zXr8f*V6HT0|fF8vL3&3tGI&f^N-1sUfy(>&L=knf6_ z#&<4&u}0P+DLOKle3F{i9KYx0QK&NJ@|@i&p`~o$lo%CC=KLYpKOZyeMZ(%(wUgx?n^Ybio-Gl3#Hy z45Gj3?cz2hz8Nn$NKpqbJme6o^4`o*=o+b^Ts)+`m}dOYMt#Th^J&He_xEXm0WjP5 zaL_%5hT=FQI2Am_Vm%h(3&_%XZ^l?>E+6YXJ5XT z$I8jAD+zA*#-c01)y!rY4RNCkB<@kiu&l)DEqDT-3xfM>ldZs`N4cTCiH#+ZI;Am* z)%We#zTQV7qGGex?IqEwI8Qp^ICuRP6coYw3w*K~TXls8iF>9Jp)jQFpK)`K^#U7l z3w06!tUmbJ!^D9VP=f%-q(;F4#8GKXHcnP&u~4-bTkU2Tsh@HW+;?m-3Qx?@BnuH_ z#l`FzN$|jcB6br;bxrkyaj>_z+2|reW6cEnsXa5cmtN3e$7H8*35Anhw-aJ4c!R7E z&;53q)cuT`5~!Yd&>3%Im=vv8W^t^t?&eu+DH68TRus+geVmfDp^{ojWMg z9*T)y$>Q=*?>pIES&oKy9)Fg+lq;(}JW}WWdI9)+e>$D&T1uoUyP7uXXS89!3uT=GTDShzLR9s+H zEhgxh*zW{Ul0?yIs02*u%147KNu+88sbFydCAhlIDFjy&t%{#LY}nEr{T8D`R!%Gf z#p8m5tjRTx=_>%0@ecr&HF)juEn7PGj9FtCLbE2o`4`-7$eRIK&EsU$wEu*zQPbfXH3B&*}} zsj|^jVYDh{@M1NeghGscL}p1iEhp{BxnpcTOqa@cv^<78Gi@;*Eblc6zd ziyyGAljhEYGby8~A<3^sOsrE|oE<65MtpyXjQkOaD2^CaR;x0spP1Y@*JyVU@k{+- zo&>&gSOOT%n$a{*MPe$IsQ<(wl9-lLX&^EE$>U4m$oY?!$lqh-gAzd%2(s~DTSv5{ ztS2OrL6rw-=sd|he@HU0kQ2$|)&~n$5R8qB8mF-6;Piu}lO4@~00!-VaHcMj@##DX zsSS12IKa=ED`v;|huRhh>vKihRfx zZNGqQ(T$&-$BH%T#|-jlvUsiGy2r$MoF^js)TIiE%A=Q*Ld>B)vWOs5r>s#q3LwW zUVeFSQTO8?Js3){uO4~m^rx7)vmMD0sZPz5-)R1iI$JF*s-cVst1drNDV|i_m~*;s zXHK_;G^UN?)!PK#t>m=69Q2QK4w?hd&p*;ZA29LEnTfM82*NXXn1TTQ8ozEr@J1?b zfS@Ibgorzj!Bs0^$yCTZHGXK4EL7xsK(x=#IXp|eiLQ6Aw^08A%#zXpMa;-|@aAKk z`QUsr2j*cXVM%Z1z**EsCV~=JS5{*YX0Z029{}$O_CL@%OO_UyE$h~cI<`ws2|p?S zTJVw9(Qi^ua%mM_WB%2eW>t2krh9F$lqu>GqJ!6_4zjCO1^#EM6{(^6mBdOZbZVtV1I2q%R#ObMwsOH&X*&6;eB?%`Jbu2+*hRG^JNjDJ1 zKz46ElI)ILU>$0ApwidI3Dh=8hlE1RSdgjVZKC6UaEUK>G#FmjDHrpK?fiF#?K*yj z^W+x~gzklp19iO{HlthBW%UY%)E39Timfu@PK-X`BhIh3d??k literal 0 HcmV?d00001 diff --git a/app.py b/app.py index 07c3ad3..ded7167 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,14 @@ from flask import Flask, render_template, request, jsonify import os +import logging +from grbl_communicator import GRBLCommunicator + +# Configure basic logging for the app +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Default GRBL connection parameters +DEFAULT_SERIAL_PORT = "/dev/ttyUSB0" # Common for Linux, use "COMx" for Windows if needed +DEFAULT_BAUDRATE = 115200 app = Flask(__name__) @@ -9,14 +18,56 @@ def index(): @app.route('/api/send_job', methods=['POST']) def send_job(): - # This is a stub function that would send the job to the plotter gcode = request.json.get('gcode', '') - # For now, just return success and the first few lines for confirmation - return jsonify({ - 'status': 'success', - 'message': 'Job sent to plotter', - 'preview': '\n'.join(gcode.split('\n')[:5]) + '...' if gcode else 'No G-code provided' - }) + + if not gcode: + logging.warning("send_job called with no G-code.") + return jsonify({'status': 'error', 'message': 'No G-code provided.'}), 400 + + logging.info(f"Received G-code job. First 50 chars: {gcode[:50]}...") + + communicator = None # Initialize communicator to None for the finally block + try: + communicator = GRBLCommunicator(port=DEFAULT_SERIAL_PORT, baudrate=DEFAULT_BAUDRATE) + logging.info(f"Attempting to connect to GRBL on {DEFAULT_SERIAL_PORT} at {DEFAULT_BAUDRATE} baud.") + communicator.connect() + logging.info("Successfully connected to GRBL.") + + gcode_lines = gcode.strip().split('\n') + total_lines = len(gcode_lines) + logging.info(f"Starting to send {total_lines} G-code lines.") + + for i, line in enumerate(gcode_lines): + line = line.strip() + if not line or line.startswith(';') or line.startswith('('): # Ignore empty lines and comments + logging.info(f"Skipping empty line or comment: {line}") + continue + + logging.info(f"Sending line {i+1}/{total_lines}: {line}") + communicator.send_command(line) + logging.info(f"Successfully sent line: {line}") + + logging.info("All G-code commands sent successfully.") + return jsonify({'status': 'success', 'message': 'Job sent to plotter successfully.'}) + + except ConnectionError as e: + logging.error(f"Connection failed: {e}") + return jsonify({'status': 'error', 'message': f'Connection failed: {str(e)}'}), 500 + except RuntimeError as e: # GRBL reported error + logging.error(f"GRBL error: {e}") + return jsonify({'status': 'error', 'message': f'GRBL error: {str(e)}'}), 400 + except TimeoutError as e: + logging.error(f"GRBL timeout: {e}") + return jsonify({'status': 'error', 'message': f'GRBL timeout: {str(e)}'}), 400 + except Exception as e: # Catch any other unexpected errors + logging.error(f"An unexpected error occurred: {e}", exc_info=True) # Log stack trace + return jsonify({'status': 'error', 'message': f'An unexpected error occurred: {str(e)}'}), 500 + finally: + if communicator: + logging.info("Closing GRBL connection.") + communicator.close() if __name__ == '__main__': - app.run(debug=True) + # Use host='0.0.0.0' to make it accessible from the network if needed for testing + # debug=True is useful for development but should be False in production + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/grbl_communicator.py b/grbl_communicator.py new file mode 100644 index 0000000..bf5ed34 --- /dev/null +++ b/grbl_communicator.py @@ -0,0 +1,170 @@ +import serial +import time +import logging + +# Configure basic logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +class GRBLCommunicator: + def __init__(self, port, baudrate): + self.port = port + self.baudrate = baudrate + self.ser = None + logging.info(f"GRBLCommunicator initialized with port={self.port}, baudrate={self.baudrate}") + + def connect(self): + logging.info(f"Attempting to connect to GRBL on {self.port} at {self.baudrate} baud.") + try: + self.ser = serial.Serial(self.port, self.baudrate, timeout=2) + logging.info("Serial port opened. Waiting for GRBL to initialize...") + time.sleep(2) # Wait for GRBL to initialize + + # Wake up GRBL and clear buffer + # A soft reset (Ctrl-X) or a newline can be used. + # Ctrl-X is often \x18 in bytes. + # Sending a newline might be safer initially. + self.ser.flushInput() # Clear input buffer before sending commands + + # Sending a newline character to ensure GRBL is awake and to clear any partial commands. + self.ser.write(b'\n') + # Alternative: self.ser.write(b'\x18') # Soft reset + + # Read and discard startup messages. + # GRBL usually sends a welcome message like "Grbl 1.1h ['$' for help]" + # We'll read for a short period or until no more data comes. + startup_message = "" + start_time = time.time() + while time.time() - start_time < 2: # Read for up to 2 seconds + if self.ser.in_waiting > 0: + line = self.ser.readline().decode('utf-8', errors='ignore').strip() + if line: + startup_message += line + "\n" + logging.info(f"GRBL Startup: {line}") + else: + time.sleep(0.05) # Small delay to avoid busy waiting + + if not startup_message: + logging.warning("No startup message received from GRBL. This might be okay.") + + # A more robust check would be to send a status command '?' and expect 'ok' + # For now, we assume connection is successful if no serial exception occurred. + logging.info("GRBL connection established.") + + except serial.SerialException as e: + logging.error(f"Failed to connect to GRBL: {e}") + self.ser = None # Ensure ser is None if connection failed + raise ConnectionError(f"Failed to connect to GRBL on {self.port}: {e}") + except Exception as e: # Catch any other unexpected errors during connection + logging.error(f"An unexpected error occurred during connection: {e}") + if self.ser and self.ser.is_open: + self.ser.close() + self.ser = None + raise ConnectionError(f"An unexpected error occurred on {self.port}: {e}") + + + def send_command(self, gcode_line, command_timeout=10.0): + if not self.ser or not self.ser.is_open: + logging.error("Serial port not open. Cannot send command.") + raise ConnectionError("Serial port not open. Connect first.") + + try: + command = gcode_line.strip() + '\n' + logging.info(f"Sending G-code command: {command.strip()}") + self.ser.write(command.encode('utf-8')) + + response_buffer = "" + start_time = time.time() + + while time.time() - start_time < command_timeout: + if self.ser.in_waiting > 0: + char = self.ser.read().decode('utf-8', errors='ignore') + response_buffer += char + if response_buffer.endswith('ok\r\n'): + logging.info(f"Received 'ok' for command: {command.strip()}") + return True + if response_buffer.strip().startswith('error:'): + # Try to read the rest of the error line until a newline + # or timeout to capture the full error message. + while not response_buffer.endswith('\n') and (time.time() - start_time < command_timeout): + if self.ser.in_waiting > 0: + char_more = self.ser.read().decode('utf-8', errors='ignore') + response_buffer += char_more + else: + time.sleep(0.005) # Shorter sleep while actively fetching rest of error + + error_message = response_buffer.strip() + logging.error(f"GRBL error for command '{command.strip()}': {error_message}") + raise RuntimeError(f"GRBL error: {error_message}") + else: + time.sleep(0.01) # Short sleep to prevent busy-waiting + + logging.error(f"Timeout waiting for response to command: {command.strip()}") + raise TimeoutError(f"Timeout waiting for 'ok' or 'error' from GRBL for command: {command.strip()}") + + except serial.SerialException as e: + logging.error(f"Serial communication error during send_command: {e}") + self.close() # Attempt to close port on error + raise ConnectionError(f"Serial communication error: {e}") + except RuntimeError: # Re-raise RuntimeError from GRBL error + raise + except TimeoutError: # Re-raise TimeoutError + raise + except Exception as e: # Catch any other unexpected errors + logging.error(f"An unexpected error occurred during send_command: {e}") + self.close() + raise RuntimeError(f"An unexpected error occurred sending command: {e}") + + def close(self): + if self.ser and self.ser.is_open: + try: + self.ser.close() + logging.info("Serial port closed.") + except serial.SerialException as e: + logging.error(f"Error closing serial port: {e}") + else: + logging.info("Serial port was not open or already closed.") + self.ser = None + +if __name__ == '__main__': + # This is example usage, not part of the class itself. + # It's good for basic testing if you have a GRBL device connected. + # For actual use, this would be imported into another script. + + # Replace with your actual port and baudrate + # On Linux, it might be /dev/ttyUSB0 or /dev/ttyACM0 + # On Windows, it might be COM3, COM4, etc. + GRBL_PORT = "/dev/ttyUSB0" # Example, change this! + GRBL_BAUDRATE = 115200 + + # Example of how to use the GRBLCommunicator + # Note: This will try to connect to a real device. + # If you don't have one, this part will fail. + + # communicator = None + # try: + # communicator = GRBLCommunicator(port=GRBL_PORT, baudrate=GRBL_BAUDRATE) + # communicator.connect() + + # # Example commands + # # Ensure GRBL is in a known state (e.g., by homing or unlocking if needed) + # # For testing, simple status commands are safest. + # if communicator.send_command("$#"): # View G-code parameters + # print("Successfully sent '$#' and got ok") + + # if communicator.send_command("G0 X1 F100"): # Simple move command + # print("Successfully sent 'G0 X1 F100' and got ok") + + # except ConnectionError as e: + # print(f"Connection Error: {e}") + # except RuntimeError as e: + # print(f"Runtime Error: {e}") + # except TimeoutError as e: + # print(f"Timeout Error: {e}") + # except Exception as e: + # print(f"An unexpected error occurred: {e}") + # finally: + # if communicator: + # communicator.close() + + print("GRBLCommunicator class defined. Example usage (commented out) requires a GRBL device.") + print("To use this class, import it into your main application script.") diff --git a/requirements.txt b/requirements.txt index 8c33811..fe29ec8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -flask==2.3.3 \ No newline at end of file +flask==2.3.3 +pyserial \ No newline at end of file diff --git a/test_grbl_communicator.py b/test_grbl_communicator.py new file mode 100644 index 0000000..880983b --- /dev/null +++ b/test_grbl_communicator.py @@ -0,0 +1,280 @@ +import unittest +from unittest.mock import patch, MagicMock +import serial # Import the actual serial module to check for its exceptions +from grbl_communicator import GRBLCommunicator, logging + +# Disable logging for tests to keep output clean +logging.disable(logging.CRITICAL) + +class TestGRBLCommunicatorInit(unittest.TestCase): + def test_init_stores_port_and_baudrate(self): + port = "/dev/testport" + baudrate = 115200 + communicator = GRBLCommunicator(port, baudrate) + self.assertEqual(communicator.port, port) + self.assertEqual(communicator.baudrate, baudrate) + self.assertIsNone(communicator.ser) + +class TestGRBLCommunicatorConnect(unittest.TestCase): + @patch('grbl_communicator.serial.Serial') + def test_connect_successful(self, mock_serial_class): + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + mock_ser_instance.in_waiting = 0 # Default to no data initially + + # Simulate GRBL startup message + grbl_startup_message = b"Grbl 1.1h ['$' for help]\r\n" + + # Configure side_effect for readline to simulate GRBL sending data + # Then no more data (which will make in_waiting effectively 0) + def readline_side_effect(*args, **kwargs): + if readline_side_effect.called_once: + mock_ser_instance.in_waiting = len(grbl_startup_message) + return grbl_startup_message + mock_ser_instance.in_waiting = 0 + return b'' + readline_side_effect.called_once = False + + def read_side_effect(size=1): + nonlocal grbl_startup_message + if grbl_startup_message: + data_to_return = grbl_startup_message[:size] + grbl_startup_message = grbl_startup_message[size:] + mock_ser_instance.in_waiting = len(grbl_startup_message) + return data_to_return + mock_ser_instance.in_waiting = 0 + return b'' + + # More robust way to handle readline for startup: + # Configure readline to return the message then empty bytes. + # Configure in_waiting to simulate data available then not. + mock_ser_instance.readline.side_effect = [grbl_startup_message, b"", b""] # Subsequent calls return empty + + # This function will be called by the code for self.ser.in_waiting + # It needs to reflect that data is available for the first readline call. + # Configure readline to return the message then empty bytes. + mock_ser_instance.readline.side_effect = [grbl_startup_message, b"", b""] # Subsequent calls return empty + + # This function will be called by the code for self.ser.in_waiting + # It needs to reflect that data is available for the first readline call. + def in_waiting_side_effect(*args, **kwargs): # Add *args, **kwargs for PropertyMock + if mock_ser_instance.readline.call_count == 0: # Before first readline call + return len(grbl_startup_message) + # After the first call to readline (which consumes the startup message), + # or if readline has been called multiple times for other reasons. + return 0 + + # To make in_waiting a callable property that behaves like an attribute: + # We use a PropertyMock for 'in_waiting' + type(mock_ser_instance).in_waiting = unittest.mock.PropertyMock(side_effect=in_waiting_side_effect) + # Ensure 'in_waiting' is treated as a property, not a method. + # This makes `self.ser.in_waiting` (no parentheses) call in_waiting_side_effect. + + mock_serial_class.return_value = mock_ser_instance + + communicator = GRBLCommunicator("/dev/testport", 115200) + + # Patch time.sleep to avoid actual sleeping during test + with patch('grbl_communicator.time.sleep'): + communicator.connect() + + mock_serial_class.assert_called_once_with("/dev/testport", 115200, timeout=2) + mock_ser_instance.flushInput.assert_called_once() + mock_ser_instance.write.assert_called_once_with(b'\n') + # Check readline was called at least once for the startup message, + # and potentially more until timeout logic is met. + self.assertGreaterEqual(mock_ser_instance.readline.call_count, 1) + self.assertIsNotNone(communicator.ser) + + @patch('grbl_communicator.serial.Serial') + def test_connect_successful_no_startup_message(self, mock_serial_class): + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + mock_ser_instance.in_waiting = 0 + mock_ser_instance.readline.return_value = b"" # No startup message + mock_serial_class.return_value = mock_ser_instance + + communicator = GRBLCommunicator("/dev/testport", 115200) + # Should not raise an error, but log a warning (logging is disabled for test) + communicator.connect() + + mock_serial_class.assert_called_once_with("/dev/testport", 115200, timeout=2) + mock_ser_instance.flushInput.assert_called_once() + mock_ser_instance.write.assert_called_once_with(b'\n') + self.assertIsNotNone(communicator.ser) + + @patch('grbl_communicator.serial.Serial', side_effect=serial.SerialException("Connection failed")) + def test_connect_serial_exception_raises_connection_error(self, mock_serial_class): + communicator = GRBLCommunicator("/dev/testport", 115200) + with self.assertRaisesRegex(ConnectionError, "Failed to connect to GRBL on /dev/testport: Connection failed"): + communicator.connect() + self.assertIsNone(communicator.ser) + + @patch('grbl_communicator.serial.Serial', side_effect=Exception("Some other error")) + def test_connect_other_exception_raises_connection_error(self, mock_serial_class): + # Ensure ser.close is not called if ser is not successfully created + mock_ser_instance = MagicMock() + mock_serial_class.return_value = mock_ser_instance # To test closing logic + mock_serial_class.side_effect = Exception("Some other error") + + + communicator = GRBLCommunicator("/dev/testport", 115200) + with self.assertRaisesRegex(ConnectionError, "An unexpected error occurred on /dev/testport: Some other error"): + communicator.connect() + self.assertIsNone(communicator.ser) + # Check that if self.ser was somehow assigned before the exception, close would be attempted + # This is tricky because the exception happens during serial.Serial() call itself in this mock. + # If the exception happened *after* self.ser = serial.Serial(...), then ser.close() would be called. + # The current code structure in GRBLCommunicator handles this well. + +class TestGRBLCommunicatorSendCommand(unittest.TestCase): + def setUp(self): + self.communicator = GRBLCommunicator("/dev/testport", 115200) + self.mock_ser_instance = MagicMock() + self.mock_ser_instance.is_open = True + self.communicator.ser = self.mock_ser_instance # Simulate connected state + + def test_send_command_successful(self): + gcode_command = "G0 X10" + expected_encoded_command = b"G0 X10\n" + + # Simulate GRBL's response: 'ok\r\n' + # The send_command reads one char at a time. + response_chars = list(b'ok\r\n') + def read_side_effect(size=1): + if read_side_effect.buffer: + char = read_side_effect.buffer.pop(0) + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + return bytes([char]) # Return as bytes + self.mock_ser_instance.in_waiting = 0 + return b'' + read_side_effect.buffer = list(b'ok\r\n') + self.mock_ser_instance.read.side_effect = read_side_effect + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + + result = self.communicator.send_command(gcode_command) + + self.mock_ser_instance.write.assert_called_once_with(expected_encoded_command) + self.assertTrue(result) + + def test_send_command_grbl_error(self): + gcode_command = "G0 X10" + + response_chars = list(b'error: Invalid GCode\r\n') + def read_side_effect(size=1): + if read_side_effect.buffer: + char = read_side_effect.buffer.pop(0) + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + return bytes([char]) + self.mock_ser_instance.in_waiting = 0 + return b'' + read_side_effect.buffer = list(b'error: Invalid GCode\r\n') + self.mock_ser_instance.read.side_effect = read_side_effect + self.mock_ser_instance.in_waiting = len(read_side_effect.buffer) + + with self.assertRaisesRegex(RuntimeError, "GRBL error: error: Invalid GCode"): + self.communicator.send_command(gcode_command) + self.mock_ser_instance.write.assert_called_once_with(b"G0 X10\n") + + @patch('grbl_communicator.time.time') + def test_send_command_timeout(self, mock_time): + gcode_command = "G0 X10" + command_timeout = 0.1 # Use a small timeout for testing + + # Simulate time passing beyond timeout. + # time.time() is called once at the start of the loop, then in each iteration. + # The loop condition is `time.time() - start_time < command_timeout` + # So, first call to time.time() is start_time (0). + # Subsequent calls should make the condition false. + # Iteration 1: time() -> 0.0 (start_time), time() -> 0.05. 0.05-0 < 0.1 is true. + # Iteration 2: time() -> 0.11. 0.11-0 < 0.1 is false. Loop terminates. + mock_time.side_effect = [0.0, 0.05, 0.11] + + self.mock_ser_instance.in_waiting = 0 + self.mock_ser_instance.read.return_value = b'' # No response + + # Patch time.sleep inside send_command + with patch('grbl_communicator.time.sleep'): + with self.assertRaisesRegex(TimeoutError, f"Timeout waiting for 'ok' or 'error' from GRBL for command: {gcode_command}"): + self.communicator.send_command(gcode_command, command_timeout=command_timeout) + + self.mock_ser_instance.write.assert_called_once_with(b"G0 X10\n") + + + def test_send_command_not_connected(self): + self.communicator.ser = None # Simulate not connected + with self.assertRaisesRegex(ConnectionError, "Serial port not open. Connect first."): + self.communicator.send_command("G0 X10") + + self.communicator.ser = MagicMock() + self.communicator.ser.is_open = False # Simulate connected but port closed + with self.assertRaisesRegex(ConnectionError, "Serial port not open. Connect first."): + self.communicator.send_command("G0 X10") + + def test_send_command_serial_exception_on_write(self): + self.mock_ser_instance.write.side_effect = serial.SerialException("Write failed") + with self.assertRaisesRegex(ConnectionError, "Serial communication error: Write failed"): + self.communicator.send_command("G0 X10") + self.mock_ser_instance.close.assert_called_once() # Ensure port is closed on error + + def test_send_command_serial_exception_on_read(self): + self.mock_ser_instance.read.side_effect = serial.SerialException("Read failed") + self.mock_ser_instance.in_waiting = 1 # Simulate data available to trigger read + + with self.assertRaisesRegex(ConnectionError, "Serial communication error: Read failed"): + self.communicator.send_command("G0 X10") + self.mock_ser_instance.close.assert_called_once() + + +class TestGRBLCommunicatorClose(unittest.TestCase): + def test_close_closes_open_port(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + communicator.ser = mock_ser_instance + + communicator.close() + mock_ser_instance.close.assert_called_once() + self.assertIsNone(communicator.ser) + + def test_close_handles_already_closed_port(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = False # Port is already closed + communicator.ser = mock_ser_instance + + communicator.close() + # ser.close() should not be called if port is not open, + # but the implementation calls it and serial.Serial handles it gracefully. + # For strictness, we could check it's not called, but current code calls it. + # Let's verify it doesn't raise an error. + mock_ser_instance.close.assert_not_called() # Ideal, but current code might call it. + # The GRBLCommunicator's close checks `is_open`. + self.assertIsNone(communicator.ser) + + + def test_close_handles_no_serial_object(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + communicator.ser = None # No serial object + + communicator.close() # Should not raise any error + self.assertIsNone(communicator.ser) + + def test_close_serial_exception_on_close(self): + communicator = GRBLCommunicator("/dev/testport", 115200) + mock_ser_instance = MagicMock() + mock_ser_instance.is_open = True + mock_ser_instance.close.side_effect = serial.SerialException("Failed to close") + communicator.ser = mock_ser_instance + + # The method should catch the exception and log an error, but not re-raise + try: + communicator.close() + except serial.SerialException: + self.fail("communicator.close() raised SerialException unexpectedly!") + + mock_ser_instance.close.assert_called_once() + self.assertIsNone(communicator.ser) # ser should still be set to None + +if __name__ == '__main__': + unittest.main(verbosity=2)