diff --git a/.gitattributes b/.gitattributes index 942a76fd..28ae895f 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1 +1 @@ -*.geojson filter=lfs diff=lfs merge=lfs -text +# GeoJSON files are read at runtime by spp_demo, keep as regular files diff --git a/docker-compose.override.yml b/docker-compose.override.yml new file mode 100644 index 00000000..13a85820 --- /dev/null +++ b/docker-compose.override.yml @@ -0,0 +1,23 @@ +# Local override - use unique image name to avoid collision with openspp-modules-v2 +services: + openspp: + image: openspp2-dev + build: + context: . + dockerfile: docker/Dockerfile + target: dev + + openspp-dev: + image: openspp2-dev + build: + context: . + dockerfile: docker/Dockerfile + target: dev + + jobworker: + image: openspp2-dev + volumes: + - ../odoo-job-worker:/mnt/extra-addons/odoo-job-worker:ro,z + + test: + image: openspp2-dev diff --git a/requirements.txt b/requirements.txt index f0f0fcb2..0c96447d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,10 +9,12 @@ fastapi>=0.110.0 geojson httpx jwcrypto +jwcrypto>=1.5.6 libhxl numpy>=1.22.2 openpyxl parse-accept-language +pyclamd pydantic pyjwt pyjwt>=2.4.0 diff --git a/scripts/lint/check_xml_ids.py b/scripts/lint/check_xml_ids.py index c150ae7d..4713dd8f 100755 --- a/scripts/lint/check_xml_ids.py +++ b/scripts/lint/check_xml_ids.py @@ -106,7 +106,7 @@ }, "res.groups": { "patterns": [ - r"^group_[a-z0-9_]+_(viewer|officer|manager|admin|supervisor|approver|rejector|user|worker|requestor|validator|distributor|generator|registrar|reset|get|post|auditor|runner|editor|validator_hq)$", + r"^group_[a-z0-9_]+_(viewer|officer|manager|admin|supervisor|approver|rejector|user|worker|requestor|validator|distributor|generator|registrar|reset|get|post|auditor|runner|editor|assessor|validator_hq)$", r"^group_[a-z0-9_]+_(read|write|create|delete|approve|reject)$", r"^group_[a-z0-9_]+_restrict_[a-z0-9_]+$", # Technical restriction groups r"^group_spp_[a-z0-9_]+_(agent|validator|applicator|administrator|external_api|local_validator|hq_validator)$", # noqa: E501 Module-specific roles diff --git a/spp_attachment_av_scan/README.md b/spp_attachment_av_scan/README.md new file mode 100644 index 00000000..2c59e5c2 --- /dev/null +++ b/spp_attachment_av_scan/README.md @@ -0,0 +1,215 @@ +# OpenSPP Attachment Antivirus Scan + +System-wide antivirus scanning for file attachments in OpenSPP. + +## Overview + +This module provides automatic malware scanning for all file attachments uploaded to the +system using ClamAV antivirus engine. It integrates with the `queue_job` module for +asynchronous scanning to avoid blocking file uploads. + +## Features + +- **Automatic Scanning**: All binary attachments are automatically queued for malware + scanning upon upload +- **Configurable Backends**: Support for ClamAV via Unix socket or network connection +- **Quarantine**: Infected files are automatically quarantined and access is blocked +- **Security Notifications**: Security administrators are notified when malware is + detected +- **Manual Rescans**: Administrators can manually trigger rescans of attachments +- **File Size Limits**: Configurable maximum file size to avoid scanning large files +- **Scan Timeouts**: Configurable timeout to prevent long-running scans + +## Dependencies + +- `base`: Odoo base module +- `queue_job`: For asynchronous job processing +- `spp_security`: OpenSPP security module for security groups +- `pyclamd`: Python library for ClamAV integration (external) + +## Installation + +1. Install ClamAV on your server: + + ```bash + # Ubuntu/Debian + sudo apt-get install clamav clamav-daemon + + # Start the daemon + sudo systemctl start clamav-daemon + sudo systemctl enable clamav-daemon + ``` + +2. Install the Python library: + + ```bash + pip install pyclamd + ``` + +3. Install the module in Odoo: + - Update the apps list + - Search for "OpenSPP Attachment Antivirus Scan" + - Click Install + +## Configuration + +### Scanner Backend Setup + +1. Go to **Settings > Technical > Antivirus Scanners** +2. Edit the "Default ClamAV Scanner" record +3. Configure the connection settings: + - **Backend Type**: Choose "ClamAV Unix Socket" or "ClamAV Network" + - **Socket Path**: Path to ClamAV socket (default: `/var/run/clamav/clamd.sock`) + - Or **Host/Port**: Network connection details + - **Max File Size**: Maximum file size to scan in MB (default: 100) + - **Scan Timeout**: Maximum time for scan in seconds (default: 60) +4. Enable the backend by toggling the "Active" button +5. Click "Test Connection" to verify the configuration + +### Security Groups + +- **Antivirus Administrator** (`group_av_admin`): Can manage scanner backends and view + detailed scan results + +## Usage + +### Automatic Scanning + +When a user uploads a file: + +1. The attachment is created immediately +2. A scan job is queued in the background +3. The scan status shows "Pending Scan" +4. Once scanned, the status updates to: + - **Clean**: No malware detected + - **Infected**: Malware detected (file is quarantined) + - **Error**: Scan failed + - **Skipped**: File too large or no scanner configured + +### Viewing Scan Status + +Scan status is visible on the attachment form and list views: + +- Navigate to any attachment (e.g., in Documents or via Technical > Attachments) +- Check the "Antivirus Scan" section for scan status and details + +### Manual Rescans + +As an AV Administrator: + +1. Open an attachment +2. Click the "Rescan" button +3. The file is queued for scanning + +### Infected Files + +When malware is detected: + +1. The file is marked as "Infected" +2. The threat name is recorded +3. The file is quarantined (access to file data is blocked) +4. Security administrators receive an activity notification +5. The file cannot be downloaded until reviewed + +## Technical Details + +### Models + +#### `spp.av.scanner.backend` + +Stores configuration for antivirus scanner backends. + +**Key Fields**: + +- `backend_type`: Type of scanner (ClamAV socket or network) +- `is_active`: Whether this backend is active +- `clamd_socket_path`: Path to ClamAV Unix socket +- `clamd_host`, `clamd_port`: Network connection details +- `max_file_size_mb`: Maximum file size to scan +- `scan_timeout_seconds`: Scan timeout + +**Key Methods**: + +- `scan_binary(binary_data, filename)`: Scan binary data for malware +- `test_connection()`: Test connection to scanner +- `get_active_scanner()`: Get the active scanner backend + +#### `ir.attachment` (inherited) + +Extended with antivirus scan fields and logic. + +**New Fields**: + +- `scan_status`: Status of malware scan +- `scan_date`: When the file was scanned +- `scan_result`: Detailed scan result (JSON) +- `threat_name`: Name of detected threat +- `is_quarantined`: Whether file is quarantined + +**Key Methods**: + +- `_scan_for_malware()`: Async job to scan attachment +- `_quarantine()`: Quarantine infected file +- `_notify_security_admins()`: Notify admins of infection +- `action_rescan()`: Manually trigger rescan + +### Queue Jobs + +The module uses `queue_job` for asynchronous scanning: + +- Scans are queued when attachments are created or updated +- Priority 20 for automatic scans, priority 10 for manual rescans +- Jobs are named "Scan attachment {id} for malware" + +### Security + +- AV Administrators can manage scanner backends +- All users can view scan status on their attachments +- Quarantined files block access to binary data via `read()` override + +## Logging + +The module uses structured logging: + +- Info level: Scan results, connection tests +- Warning level: Malware detections, configuration issues +- Error level: Scan failures, connection errors + +No PII (personally identifiable information) is logged. + +## Performance Considerations + +- File scanning is asynchronous via queue_job +- Large files can be skipped via `max_file_size_mb` setting +- Scan timeouts prevent long-running operations +- Failed scans don't block file uploads + +## Limitations + +- Currently only supports ClamAV +- Requires ClamAV daemon to be running +- Files are scanned after upload (not during) +- Very large files may be skipped + +## Future Enhancements + +- Support for additional antivirus engines +- Real-time scanning before upload completion +- Scan result caching +- Scheduled rescans of all attachments +- Quarantine management interface +- Detailed scan statistics and reports + +## License + +LGPL-3 + +## Author + +OpenSPP.org + +## Maintainers + +- jeremi +- gonzalesedwin1123 +- reichie020212 diff --git a/spp_attachment_av_scan/README.rst b/spp_attachment_av_scan/README.rst new file mode 100644 index 00000000..84c0b9e6 --- /dev/null +++ b/spp_attachment_av_scan/README.rst @@ -0,0 +1,174 @@ +================================= +OpenSPP Attachment Antivirus Scan +================================= + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:f6237a54ba392825fb72fd61a8e94c10528b2bde807e13bf33beeac7f7b156b7 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png + :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html + :alt: License: LGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OpenSPP%2FOpenSPP2-lightgray.png?logo=github + :target: https://github.com/OpenSPP/OpenSPP2/tree/19.0/spp_attachment_av_scan + :alt: OpenSPP/OpenSPP2 + +|badge1| |badge2| |badge3| + +Scans uploaded file attachments for malware using ClamAV antivirus +engine. Automatically queues scans for binary attachments on create or +update using queue_job. Quarantines infected files by encrypting them +with spp_encryption and removing original data. Provides forensic tools +for security administrators to restore false positives or download +quarantined files for analysis. + +Key Capabilities +~~~~~~~~~~~~~~~~ + +- Auto-scan binary attachments on upload or update via background jobs +- Quarantine infected files with encrypted backup and SHA256 hash + verification +- Block read access to quarantined attachment data +- Manual rescan, restore, forensic download, and permanent deletion of + quarantined files +- Notify security administrators when malware is detected +- Scheduled cleanup of old quarantined files and forensic downloads +- Support ClamAV via Unix socket or network connection + +Key Models +~~~~~~~~~~ + ++----------------------------+-----------------------------------------+ +| Model | Description | ++============================+=========================================+ +| ``spp.av.scanner.backend`` | Configures ClamAV connection | +| | (socket/network) and limits | ++----------------------------+-----------------------------------------+ +| ``ir.attachment`` | Extended with scan status, threat name, | +| | and quarantine | ++----------------------------+-----------------------------------------+ + +Configuration +~~~~~~~~~~~~~ + +After installing: + +1. Navigate to **Settings > Administration > Antivirus Scanners** +2. Create a scanner backend with ClamAV connection details (default: + ``/var/run/clamav/clamd.sock``) +3. Click **Test Connection** to verify ClamAV is running +4. Set **Active** to enable scanning +5. Configure system parameters: + + - ``spp_attachment_av_scan.quarantine_encryption_provider_id``: + Encryption provider for quarantine + - ``spp_attachment_av_scan.quarantine_retention_days``: Days before + purging quarantined files (default: 90) + - ``spp_attachment_av_scan.forensic_download_retention_hours``: Hours + before cleaning forensic downloads (default: 24) + +UI Location +~~~~~~~~~~~ + +- **Scanner Configuration**: Settings > Administration > Antivirus + Scanners +- **Quarantined Files**: Settings > Technical > Security > Quarantined + Files +- **Attachment Forms**: Scan status and quarantine actions appear in + "Antivirus Scan" section + +Tabs +~~~~ + +**Scanner Backend form** (``spp.av.scanner.backend``): + +- **Connection Settings**: Unix socket or network configuration for + ClamAV +- **Connection Status**: Last connection test results and error details + +Security +~~~~~~~~ + ++-------------------------------------------+----------------------------+----------------------+ +| Group | Model | Access | ++===========================================+============================+======================+ +| ``base.group_user`` | ``spp.av.scanner.backend`` | Read | ++-------------------------------------------+----------------------------+----------------------+ +| ``base.group_user`` | ``ir.attachment`` | Read scan status | ++-------------------------------------------+----------------------------+----------------------+ +| ``spp_attachment_av_scan.group_av_admin`` | ``spp.av.scanner.backend`` | Full CRUD | ++-------------------------------------------+----------------------------+----------------------+ +| ``spp_attachment_av_scan.group_av_admin`` | ``ir.attachment`` | Manage quarantined | +| | | files | ++-------------------------------------------+----------------------------+----------------------+ + +Extension Points +~~~~~~~~~~~~~~~~ + +- Override ``ir.attachment._scan_for_malware()`` to customize scan logic + or add pre/post-scan hooks +- Inherit ``spp.av.scanner.backend`` and extend ``scan_binary()`` to + support additional antivirus engines +- Override ``ir.attachment._quarantine()`` to add custom quarantine + handling or external storage + +Dependencies +~~~~~~~~~~~~ + +``base``, ``mail``, ``queue_job``, ``spp_encryption``, ``spp_security`` + +External: ``pyclamd`` (Python library for ClamAV integration) + +**Table of contents** + +.. contents:: + :local: + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* OpenSPP.org + +Maintainers +----------- + +.. |maintainer-jeremi| image:: https://github.com/jeremi.png?size=40px + :target: https://github.com/jeremi + :alt: jeremi +.. |maintainer-gonzalesedwin1123| image:: https://github.com/gonzalesedwin1123.png?size=40px + :target: https://github.com/gonzalesedwin1123 + :alt: gonzalesedwin1123 +.. |maintainer-reichie020212| image:: https://github.com/reichie020212.png?size=40px + :target: https://github.com/reichie020212 + :alt: reichie020212 +.. |maintainer-emjay0921| image:: https://github.com/emjay0921.png?size=40px + :target: https://github.com/emjay0921 + :alt: emjay0921 + +Current maintainers: + +|maintainer-jeremi| |maintainer-gonzalesedwin1123| |maintainer-reichie020212| |maintainer-emjay0921| + +This module is part of the `OpenSPP/OpenSPP2 `_ project on GitHub. + +You are welcome to contribute. \ No newline at end of file diff --git a/spp_attachment_av_scan/__init__.py b/spp_attachment_av_scan/__init__.py new file mode 100644 index 00000000..0650744f --- /dev/null +++ b/spp_attachment_av_scan/__init__.py @@ -0,0 +1 @@ +from . import models diff --git a/spp_attachment_av_scan/__manifest__.py b/spp_attachment_av_scan/__manifest__.py new file mode 100644 index 00000000..f80b1b57 --- /dev/null +++ b/spp_attachment_av_scan/__manifest__.py @@ -0,0 +1,35 @@ +{ # pylint: disable=pointless-statement + "name": "OpenSPP Attachment Antivirus Scan", + "category": "OpenSPP", + "version": "19.0.1.0.0", + "sequence": 1, + "author": "OpenSPP.org", + "website": "https://github.com/OpenSPP/OpenSPP2", + "license": "LGPL-3", + "development_status": "Beta", + "maintainers": ["jeremi", "gonzalesedwin1123", "reichie020212", "emjay0921"], + "depends": [ + "base", + "mail", + "job_worker", + "spp_encryption", + "spp_security", + ], + "external_dependencies": { + "python": ["pyclamd"], + }, + "data": [ + "security/security.xml", + "security/ir.model.access.csv", + "data/av_scanner_data.xml", + "data/quarantine_cron.xml", + "views/av_scanner_backend_views.xml", + "views/ir_attachment_views.xml", + ], + "assets": {}, + "demo": [], + "images": [], + "application": False, + "installable": True, + "auto_install": False, +} diff --git a/spp_attachment_av_scan/data/av_scanner_data.xml b/spp_attachment_av_scan/data/av_scanner_data.xml new file mode 100644 index 00000000..826e71bd --- /dev/null +++ b/spp_attachment_av_scan/data/av_scanner_data.xml @@ -0,0 +1,15 @@ + + + + + Default ClamAV Scanner + 10 + clamd_socket + + /var/run/clamav/clamd.sock + localhost + 3310 + 100 + 60 + + diff --git a/spp_attachment_av_scan/data/quarantine_cron.xml b/spp_attachment_av_scan/data/quarantine_cron.xml new file mode 100644 index 00000000..7dfec792 --- /dev/null +++ b/spp_attachment_av_scan/data/quarantine_cron.xml @@ -0,0 +1,41 @@ + + + + + Purge Old Quarantined Files + + code + model._cron_purge_old_quarantined_files() + 1 + days + 1 + + + + + Clean Up Forensic Download Attachments + + code + model._cron_cleanup_forensic_downloads() + 1 + hours + 1 + + + + + spp_attachment_av_scan.quarantine_retention_days + 90 + + + + + spp_attachment_av_scan.forensic_download_retention_hours + 24 + + diff --git a/spp_attachment_av_scan/models/__init__.py b/spp_attachment_av_scan/models/__init__.py new file mode 100644 index 00000000..0a39496b --- /dev/null +++ b/spp_attachment_av_scan/models/__init__.py @@ -0,0 +1,2 @@ +from . import av_scanner_backend +from . import ir_attachment diff --git a/spp_attachment_av_scan/models/av_scanner_backend.py b/spp_attachment_av_scan/models/av_scanner_backend.py new file mode 100644 index 00000000..b2867261 --- /dev/null +++ b/spp_attachment_av_scan/models/av_scanner_backend.py @@ -0,0 +1,326 @@ +import logging + +from odoo import _, api, fields, models +from odoo.exceptions import UserError, ValidationError + +_logger = logging.getLogger(__name__) + +try: + import pyclamd +except ImportError: + pyclamd = None + _logger.warning("pyclamd library not found. Antivirus scanning will not work.") + + +class AVScannerBackend(models.Model): + _name = "spp.av.scanner.backend" + _description = "Antivirus Scanner Backend" + _order = "sequence, name" + + name = fields.Char( + required=True, + help="Name of the antivirus scanner backend", + ) + sequence = fields.Integer( + default=10, + help="Sequence for ordering scanner backends", + ) + backend_type = fields.Selection( + [ + ("clamd_socket", "ClamAV Unix Socket"), + ("clamd_network", "ClamAV Network"), + ], + required=True, + default="clamd_socket", + help="Type of antivirus scanner backend", + ) + is_active = fields.Boolean( + default=True, + help="Whether this scanner backend is active and should be used for scanning", + ) + + # ClamAV settings + clamd_socket_path = fields.Char( + string="ClamAV Socket Path", + default="/var/run/clamav/clamd.sock", + help="Path to ClamAV Unix socket", + ) + clamd_host = fields.Char( + string="ClamAV Host", + default="localhost", + help="Hostname or IP address of ClamAV daemon", + ) + clamd_port = fields.Integer( + string="ClamAV Port", + default=3310, + help="Port number of ClamAV daemon", + ) + + # Limits + max_file_size_mb = fields.Integer( + string="Max File Size (MB)", + default=100, + help="Maximum file size to scan in megabytes. Files larger than this will be skipped.", + ) + scan_timeout_seconds = fields.Integer( + string="Scan Timeout (seconds)", + default=60, + help="Maximum time to wait for scan completion", + ) + + # Statistics + last_connection_test_date = fields.Datetime( + string="Last Connection Test", + readonly=True, + ) + last_connection_test_result = fields.Boolean( + string="Last Test Result", + readonly=True, + ) + last_connection_error = fields.Text( + string="Last Connection Error", + readonly=True, + ) + + @api.constrains("max_file_size_mb") + def _check_max_file_size(self): + """Validate max file size is positive.""" + for record in self: + if record.max_file_size_mb <= 0: + raise ValidationError(_("Maximum file size must be greater than zero.")) + + @api.constrains("scan_timeout_seconds") + def _check_scan_timeout(self): + """Validate scan timeout is positive.""" + for record in self: + if record.scan_timeout_seconds <= 0: + raise ValidationError(_("Scan timeout must be greater than zero.")) + + @api.constrains("clamd_port") + def _check_clamd_port(self): + """Validate ClamAV port is in valid range.""" + for record in self: + if record.backend_type == "clamd_network": + if not (1 <= record.clamd_port <= 65535): + raise ValidationError(_("ClamAV port must be between 1 and 65535.")) + + def _get_scanner_instance(self): + """Get pyclamd scanner instance based on backend type. + + Returns: + pyclamd instance or None if unavailable + + Raises: + UserError: If pyclamd library is not installed or connection fails + """ + self.ensure_one() + + if not pyclamd: + raise UserError(_("The pyclamd library is not installed. Please install it to use antivirus scanning.")) + + try: + if self.backend_type == "clamd_socket": + _logger.info("Connecting to ClamAV via Unix socket: %s", self.clamd_socket_path) + scanner = pyclamd.ClamdUnixSocket(filename=self.clamd_socket_path) + else: # clamd_network + _logger.info( + "Connecting to ClamAV via network: %s:%s", + self.clamd_host, + self.clamd_port, + ) + scanner = pyclamd.ClamdNetworkSocket( + host=self.clamd_host, + port=self.clamd_port, + timeout=self.scan_timeout_seconds, + ) + + # Test if we can ping the scanner + if not scanner.ping(): + raise UserError( + _( + "Cannot connect to ClamAV daemon. " + "Please check the scanner configuration and ensure ClamAV is running." + ) + ) + + return scanner + + except Exception as error: + _logger.error( + "Failed to connect to ClamAV backend '%s': %s", + self.name, + str(error), + exc_info=True, + ) + raise UserError( + _( + "Failed to connect to ClamAV daemon: %(error)s", + error=str(error), + ) + ) from error + + def scan_binary(self, binary_data, filename=None): + """Scan binary data for malware. + + Args: + binary_data: Binary data to scan + filename: Optional filename for logging purposes + + Returns: + dict with keys: + - status: 'clean', 'infected', 'error', or 'skipped' + - threat_name: Name of detected threat (if infected) + - details: Additional details about the scan result + """ + self.ensure_one() + + if not self.is_active: + _logger.warning("Scanner backend '%s' is not active, skipping scan", self.id) + return { + "status": "skipped", + "threat_name": None, + "details": "Scanner backend is not active", + } + + # Check file size + file_size_mb = len(binary_data) / (1024 * 1024) + if file_size_mb > self.max_file_size_mb: + _logger.info( + "File size (%.2f MB) exceeds maximum (%.2f MB), skipping scan", + file_size_mb, + self.max_file_size_mb, + ) + return { + "status": "skipped", + "threat_name": None, + "details": f"File size ({file_size_mb:.2f} MB) exceeds maximum ({self.max_file_size_mb} MB)", + } + + try: + scanner = self._get_scanner_instance() + + _logger.info( + "Scanning binary data (%.2f MB)%s", + file_size_mb, + f" for file: {filename}" if filename else "", + ) + + # Scan the binary data + scan_result = scanner.scan_stream(binary_data) + + if scan_result is None: + # Clean file + _logger.info("Scan completed: file is clean") + return { + "status": "clean", + "threat_name": None, + "details": "No malware detected", + } + else: + # Infected file + # pyclamd scan_stream returns: {'stream': ('FOUND', 'ThreatName')} + stream_result = scan_result.get("stream", (None, "Unknown")) + threat_name = stream_result[1] if len(stream_result) > 1 else "Unknown" + _logger.warning( + "Malware detected: %s", + threat_name, + ) + return { + "status": "infected", + "threat_name": threat_name, + "details": f"Malware detected: {threat_name}", + } + + except UserError: + # Re-raise UserError (connection issues, etc.) + raise + except Exception as error: + _logger.error( + "Error during malware scan: %s", + str(error), + exc_info=True, + ) + return { + "status": "error", + "threat_name": None, + "details": f"Scan error: {str(error)}", + } + + def test_connection(self): + """Test connection to the scanner backend. + + Returns: + bool: True if connection successful, False otherwise + """ + self.ensure_one() + + try: + scanner = self._get_scanner_instance() + version = scanner.version() + + self.write( + { + "last_connection_test_date": fields.Datetime.now(), + "last_connection_test_result": True, + "last_connection_error": None, + } + ) + + _logger.info( + "Connection test successful for backend '%s'. ClamAV version: %s", + self.name, + version, + ) + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Connection Successful"), + "message": _( + "Successfully connected to ClamAV daemon.\nVersion: %(version)s", + version=version, + ), + "type": "success", + "sticky": False, + }, + } + + except Exception as error: + error_msg = str(error) + self.write( + { + "last_connection_test_date": fields.Datetime.now(), + "last_connection_test_result": False, + "last_connection_error": error_msg, + } + ) + + _logger.error( + "Connection test failed for backend '%s': %s", + self.name, + error_msg, + ) + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Connection Failed"), + "message": _( + "Failed to connect to ClamAV daemon:\n%(error)s", + error=error_msg, + ), + "type": "danger", + "sticky": True, + }, + } + + @api.model + def get_active_scanner(self): + """Get the first active scanner backend. + + Returns: + AVScannerBackend record or empty recordset + """ + return self.search([("is_active", "=", True)], limit=1, order="sequence, id") diff --git a/spp_attachment_av_scan/models/ir_attachment.py b/spp_attachment_av_scan/models/ir_attachment.py new file mode 100644 index 00000000..5442e2da --- /dev/null +++ b/spp_attachment_av_scan/models/ir_attachment.py @@ -0,0 +1,769 @@ +import base64 +import hashlib +import json +import logging + +from odoo import Command, _, api, fields, models +from odoo.exceptions import AccessError, UserError + +_logger = logging.getLogger(__name__) + +QUARANTINE_PROVIDER_PARAM = "spp_attachment_av_scan.quarantine_encryption_provider_id" +QUARANTINE_RETENTION_DAYS_PARAM = "spp_attachment_av_scan.quarantine_retention_days" +DEFAULT_QUARANTINE_RETENTION_DAYS = 90 +FORENSIC_DOWNLOAD_RETENTION_HOURS_PARAM = "spp_attachment_av_scan.forensic_download_retention_hours" +DEFAULT_FORENSIC_DOWNLOAD_RETENTION_HOURS = 24 + + +class IrAttachment(models.Model): + _inherit = "ir.attachment" + + scan_status = fields.Selection( + [ + ("pending", "Pending Scan"), + ("scanning", "Scanning"), + ("clean", "Clean"), + ("infected", "Infected"), + ("error", "Scan Error"), + ("skipped", "Skipped"), + ], + default="pending", + index=True, + help="Status of the malware scan for this attachment", + ) + scan_date = fields.Datetime( + string="Scan Date", + help="Date and time when the file was scanned", + ) + scan_result = fields.Text( + string="Scan Result Details", + help="Detailed scan result in JSON format", + ) + threat_name = fields.Char( + string="Threat Name", + help="Name of the detected threat if file is infected", + ) + is_quarantined = fields.Boolean( + string="Quarantined", + default=False, + index=True, + help="Whether this file has been quarantined due to malware detection", + ) + + # Encrypted quarantine storage fields + quarantine_data = fields.Binary( + string="Encrypted Quarantine Data", + attachment=False, + help="Encrypted copy of the infected file for forensic analysis", + ) + quarantine_hash = fields.Char( + string="File SHA256 Hash", + help="SHA256 hash of the original infected file for verification", + ) + quarantine_date = fields.Datetime( + string="Quarantine Date", + help="Date and time when the file was quarantined", + ) + original_file_size = fields.Integer( + string="Original File Size", + help="Size of the original file in bytes before quarantine", + ) + + # Forensic download tracking + is_forensic_download = fields.Boolean( + string="Forensic Download", + default=False, + index=True, + help="Marks temporary attachments created for forensic analysis downloads", + ) + + @api.model_create_multi + def create(self, vals_list): + """Override create to queue malware scan for binary attachments.""" + attachments = super().create(vals_list) + + # Queue scan for attachments that have binary data + for attachment in attachments: + if attachment.type == "binary" and attachment.datas: + try: + # Queue the scan job + attachment.with_delay( + description=f"Scan attachment {attachment.id} for malware", + priority=20, + )._scan_for_malware() + _logger.info("Queued malware scan for attachment ID %s", attachment.id) + except Exception as error: + _logger.error( + "Failed to queue malware scan for attachment ID %s: %s", + attachment.id, + str(error), + ) + + return attachments + + def write(self, vals): + """Override write to queue scan if binary data is updated.""" + result = super().write(vals) + + if "datas" in vals and not self.env.context.get("skip_av_scan_queue"): + for attachment in self: + if attachment.type == "binary" and attachment.datas: + try: + attachment.with_context(skip_av_scan_queue=True).write( + { + "scan_status": "pending", + "scan_date": None, + "scan_result": None, + "threat_name": None, + "is_quarantined": False, + "quarantine_data": None, + "quarantine_hash": None, + "quarantine_date": None, + "original_file_size": None, + } + ) + attachment.with_delay( + description=f"Scan updated attachment {attachment.id} for malware", + priority=20, + )._scan_for_malware() + _logger.info( + "Queued malware scan for updated attachment ID %s", + attachment.id, + ) + except Exception as error: + _logger.error( + "Failed to queue malware scan for updated attachment ID %s: %s", + attachment.id, + str(error), + ) + + return result + + def _scan_for_malware(self): + """Scan attachment for malware using configured antivirus backend.""" + self.ensure_one() + + if self.type != "binary" or not self.datas: + _logger.info( + "Skipping scan for attachment ID %s (not binary or no data)", + self.id, + ) + self.write( + { + "scan_status": "skipped", + "scan_date": fields.Datetime.now(), + "scan_result": json.dumps( + { + "status": "skipped", + "reason": "Not a binary attachment or no data", + } + ), + } + ) + return + + scanner_backend = self.env["spp.av.scanner.backend"].get_active_scanner() + if not scanner_backend: + _logger.warning("No active antivirus scanner backend configured") + self.write( + { + "scan_status": "skipped", + "scan_date": fields.Datetime.now(), + "scan_result": json.dumps( + { + "status": "skipped", + "reason": "No active scanner backend configured", + } + ), + } + ) + return + + try: + self.write({"scan_status": "scanning"}) + + # Decode binary data once and reuse to avoid race conditions + # In Odoo, self.datas is always base64-encoded (str or bytes) + # Ensure raw_binary_data is always bytes to avoid TypeError in scan_binary() + raw_binary_data = base64.b64decode(self.datas) if self.datas else b"" + + scan_result = scanner_backend.scan_binary( + raw_binary_data, + filename=self.name, + ) + + scan_status = scan_result.get("status", "error") + threat_name = scan_result.get("threat_name") + + vals = { + "scan_status": scan_status, + "scan_date": fields.Datetime.now(), + "scan_result": json.dumps(scan_result), + "threat_name": threat_name, + } + + if scan_status == "infected": + vals["is_quarantined"] = True + self.write(vals) + # Pass binary_data to avoid re-reading from self.datas (race condition fix) + self._quarantine(binary_data=raw_binary_data) + self._notify_security_admins() + else: + self.write(vals) + + _logger.info( + "Malware scan completed for attachment ID %s: %s", + self.id, + scan_status, + ) + + except Exception as error: + _logger.error( + "Error scanning attachment ID %s for malware: %s", + self.id, + str(error), + exc_info=True, + ) + self.write( + { + "scan_status": "error", + "scan_date": fields.Datetime.now(), + "scan_result": json.dumps( + { + "status": "error", + "error": str(error), + } + ), + } + ) + + def _get_quarantine_encryption_provider(self): + """Get or create the encryption provider for quarantine storage.""" + ICP = self.env["ir.config_parameter"].sudo() # nosemgrep + provider_id = ICP.get_param(QUARANTINE_PROVIDER_PARAM) + + if provider_id: + provider = self.env["spp.encryption.provider"].sudo().browse(int(provider_id)) # nosemgrep + if provider.exists(): + return provider + + provider = ( + self.env["spp.encryption.provider"] # nosemgrep + .sudo() + .search([("type", "=", "jwcrypto")], order="id asc", limit=1) + ) + + if not provider: + _logger.warning( + "No encryption provider configured for quarantine. Files will be quarantined without encryption." + ) + return None + + ICP.set_param(QUARANTINE_PROVIDER_PARAM, str(provider.id)) + return provider + + def _calculate_file_hash(self, binary_data): + """Calculate SHA256 hash of file data.""" + if isinstance(binary_data, str): + binary_data = base64.b64decode(binary_data) + return hashlib.sha256(binary_data).hexdigest() + + def _quarantine(self, binary_data=None): + """Quarantine infected file with encrypted storage. + + Args: + binary_data: Raw binary data to quarantine. If not provided, reads from self.datas. + Passing binary_data avoids race conditions when data may change. + """ + self.ensure_one() + + if self.scan_status != "infected": + return + + _logger.warning( + "Quarantining infected attachment ID %s (threat: %s)", + self.id, + self.threat_name or "Unknown", + ) + + try: + # Use provided binary_data to avoid race condition, or fall back to self.datas + if binary_data is None: + binary_data = base64.b64decode(self.datas) if self.datas else b"" + if not binary_data: + _logger.warning("No binary data to quarantine for attachment ID %s", self.id) + return + + file_hash = self._calculate_file_hash(binary_data) + file_size = len(binary_data) + + encryption_provider = self._get_quarantine_encryption_provider() + + if encryption_provider: + try: + encrypted_data = encryption_provider.encrypt_data(binary_data) + quarantine_data = base64.b64encode(encrypted_data) + _logger.info( + "Encrypted quarantined file for attachment ID %s (hash: %s)", + self.id, + file_hash, + ) + except Exception as enc_error: + _logger.error( + "Failed to encrypt quarantined file for attachment ID %s: %s", + self.id, + str(enc_error), + ) + quarantine_data = None + else: + quarantine_data = None + + self.with_context(skip_av_scan_queue=True).write( + { + "is_quarantined": True, + "quarantine_data": quarantine_data, + "quarantine_hash": file_hash, + "quarantine_date": fields.Datetime.now(), + "original_file_size": file_size, + "datas": False, + } + ) + + _logger.warning( + "Quarantined attachment ID %s - Original data removed, " + "encrypted backup stored (hash: %s, size: %d bytes)", + self.id, + file_hash, + file_size, + ) + + except Exception as error: + _logger.error( + "Error during quarantine of attachment ID %s: %s", + self.id, + str(error), + exc_info=True, + ) + self.with_context(skip_av_scan_queue=True).write( + { + "is_quarantined": True, + "quarantine_date": fields.Datetime.now(), + } + ) + + def _notify_security_admins(self): + """Send notification to security administrators about infected file.""" + self.ensure_one() + + if self.scan_status != "infected": + return + + _logger.warning( + "Infected file detected - Attachment ID: %s, Threat: %s", + self.id, + self.threat_name or "Unknown", + ) + + try: + security_group = self.env.ref("spp_attachment_av_scan.group_av_admin", raise_if_not_found=False) + if not security_group: + _logger.warning("AV Admin group not found, cannot notify administrators") + return + + admin_users = security_group.user_ids + + if not admin_users: + _logger.warning("No users in AV Admin group to notify") + return + + # Send notification to admin users via internal message + message_body = _( + "

Malware Detected in Attachment

" + "
    " + "
  • File: %(name)s
  • " + "
  • Threat: %(threat)s
  • " + "
  • Hash: %(hash)s
  • " + "
  • Date: %(date)s
  • " + "
  • Status: Quarantined and encrypted
  • " + "
", + name=self.name, + threat=self.threat_name or "Unknown", + hash=self.quarantine_hash or "N/A", + date=self.scan_date, + ) + + # Create notification for each admin user + self.env["mail.message"].sudo().create( # nosemgrep + { + "message_type": "notification", + "subject": _("Security Alert: Infected File Detected"), + "body": message_body, + "partner_ids": [Command.set(admin_users.mapped("partner_id").ids)], + "model": "ir.attachment", + "res_id": self.id, + } + ) + + _logger.info( + "Notified %d security administrators about infected file", + len(admin_users), + ) + + except Exception as error: + _logger.error( + "Failed to notify security admins about infected file: %s", + str(error), + ) + + def _check_av_admin_access(self): + """Check if current user has AV Admin access.""" + self.ensure_one() + if not self.env.user.has_group("spp_attachment_av_scan.group_av_admin"): + raise AccessError(_("Only Antivirus Administrators can perform this action.")) + + def action_restore_quarantined(self): + """Restore a quarantined file (for false positives). AV Admin only.""" + self.ensure_one() + self._check_av_admin_access() + + if not self.is_quarantined: + raise UserError(_("This attachment is not quarantined.")) + + if not self.quarantine_data: + raise UserError(_("No encrypted backup available for this quarantined file.")) + + encryption_provider = self._get_quarantine_encryption_provider() + if not encryption_provider: + raise UserError(_("No encryption provider configured. Cannot decrypt the file.")) + + try: + encrypted_data = base64.b64decode(self.quarantine_data) + decrypted_data = encryption_provider.decrypt_data(encrypted_data) + + # Validate decrypted file size matches original + if self.original_file_size and len(decrypted_data) != self.original_file_size: + raise UserError( + _( + "File size mismatch. Expected %(expected)d bytes, got %(actual)d bytes.", + expected=self.original_file_size, + actual=len(decrypted_data), + ) + ) + + restored_hash = self._calculate_file_hash(decrypted_data) + if restored_hash != self.quarantine_hash: + raise UserError(_("File integrity check failed. The restored file hash does not match.")) + + # SECURITY: sudo() is intentional - AV admin access verified via _check_av_admin_access() + # nosemgrep: semgrep.odoo-sudo-without-context + self.sudo().with_context(skip_av_scan_queue=True).write( + { + "datas": base64.b64encode(decrypted_data), + "is_quarantined": False, + "scan_status": "clean", + "quarantine_data": False, + "quarantine_hash": False, + "quarantine_date": False, + "original_file_size": False, + "threat_name": False, + "scan_result": json.dumps( + { + "status": "restored", + "reason": "Manually restored by administrator", + "restored_by": self.env.user.name, + "restored_date": fields.Datetime.now().isoformat(), + } + ), + } + ) + + _logger.warning( + "Quarantined attachment ID %s restored by user %s", + self.id, + self.env.user.name, + ) + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("File Restored"), + "message": _("The quarantined file has been restored successfully."), + "type": "success", + "sticky": False, + }, + } + + except UserError: + raise + except Exception as error: + _logger.error( + "Failed to restore quarantined attachment ID %s: %s", + self.id, + str(error), + exc_info=True, + ) + raise UserError(_("Failed to restore file: %(error)s", error=str(error))) from error + + def action_download_quarantined_for_analysis(self): + """Download quarantined file for forensic analysis. AV Admin only.""" + self.ensure_one() + self._check_av_admin_access() + + if not self.is_quarantined: + raise UserError(_("This attachment is not quarantined.")) + + if not self.quarantine_data: + raise UserError(_("No encrypted backup available for this quarantined file.")) + + encryption_provider = self._get_quarantine_encryption_provider() + if not encryption_provider: + raise UserError(_("No encryption provider configured. Cannot decrypt the file.")) + + try: + encrypted_data = base64.b64decode(self.quarantine_data) + decrypted_data = encryption_provider.decrypt_data(encrypted_data) + + # Validate decrypted file size matches original + if self.original_file_size and len(decrypted_data) != self.original_file_size: + raise UserError( + _( + "File size mismatch. Expected %(expected)d bytes, got %(actual)d bytes.", + expected=self.original_file_size, + actual=len(decrypted_data), + ) + ) + + download_attachment = ( + self.env["ir.attachment"] # nosemgrep + .sudo() + .with_context(skip_av_scan_queue=True) + .create( + { + "name": f"QUARANTINED_{self.name}", + "datas": base64.b64encode(decrypted_data), + "mimetype": "application/octet-stream", + "type": "binary", + "res_model": "ir.attachment", + "res_id": self.id, + # Mark as forensic download for cleanup + "is_forensic_download": True, + "scan_status": "skipped", + "scan_result": json.dumps( + { + "status": "skipped", + "reason": "Forensic download - original quarantined file", + } + ), + } + ) + ) + + _logger.warning( + "Quarantined attachment ID %s downloaded for analysis by user %s", + self.id, + self.env.user.name, + ) + + return { + "type": "ir.actions.act_url", + "url": f"/web/content/{download_attachment.id}?download=true", + "target": "self", + } + + except UserError: + raise + except Exception as error: + _logger.error( + "Failed to download quarantined attachment ID %s: %s", + self.id, + str(error), + exc_info=True, + ) + raise UserError(_("Failed to download file: %(error)s", error=str(error))) from error + + def action_permanently_delete_quarantined(self): + """Permanently delete quarantined file and its encrypted backup. AV Admin only.""" + self.ensure_one() + self._check_av_admin_access() + + if not self.is_quarantined: + raise UserError(_("This attachment is not quarantined.")) + + _logger.warning( + "Permanently deleting quarantined attachment ID %s (hash: %s) by user %s", + self.id, + self.quarantine_hash or "N/A", + self.env.user.name, + ) + + # SECURITY: sudo() is intentional - AV admin access verified via _check_av_admin_access() + # nosemgrep: semgrep.odoo-sudo-without-context + self.sudo().with_context(skip_av_scan_queue=True).write( + { + "quarantine_data": False, + "datas": False, + } + ) + + # nosemgrep: semgrep.odoo-sudo-without-context + self.sudo().unlink() + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("File Deleted"), + "message": _("The quarantined file has been permanently deleted."), + "type": "warning", + "sticky": False, + }, + } + + def action_rescan(self): + """Manually trigger a rescan of the attachment.""" + for attachment in self: + if attachment.is_quarantined: + raise UserError(_("Cannot rescan a quarantined file. Restore it first if needed.")) + + if attachment.type != "binary" or not attachment.datas: + raise UserError(_("Cannot scan attachment: not a binary file or no data present.")) + + attachment.write( + { + "scan_status": "pending", + "scan_date": None, + "scan_result": None, + "threat_name": None, + } + ) + + try: + attachment.with_delay( + description=f"Manual rescan of attachment {attachment.id}", + priority=10, + )._scan_for_malware() + + _logger.info("Queued manual rescan for attachment ID %s", attachment.id) + + except Exception as error: + _logger.error( + "Failed to queue manual rescan for attachment ID %s: %s", + attachment.id, + str(error), + ) + raise UserError(_("Failed to queue rescan: %(error)s", error=str(error))) from error + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Rescan Queued"), + "message": _("The attachment(s) have been queued for scanning."), + "type": "success", + "sticky": False, + }, + } + + def read(self, fields=None, load="_classic_read"): + """Override read to block access to quarantined files.""" + result = super().read(fields=fields, load=load) + + if fields and "datas" in fields: + for record in result: + attachment = self.browse(record["id"]) + if attachment.is_quarantined: + _logger.warning( + "Blocked access to quarantined attachment ID %s", + attachment.id, + ) + record["datas"] = False + + return result + + @api.model + def _cron_purge_old_quarantined_files(self): + """Scheduled job to purge quarantined files older than retention period.""" + ICP = self.env["ir.config_parameter"].sudo() # nosemgrep + retention_days = int(ICP.get_param(QUARANTINE_RETENTION_DAYS_PARAM, DEFAULT_QUARANTINE_RETENTION_DAYS)) + + if retention_days <= 0: + _logger.info("Quarantine purge disabled (retention_days <= 0)") + return + + cutoff_date = fields.Datetime.subtract(fields.Datetime.now(), days=retention_days) + + old_quarantined = self.search( + [ + ("is_quarantined", "=", True), + ("quarantine_date", "<", cutoff_date), + ("quarantine_date", "!=", False), + ] + ) + + if not old_quarantined: + _logger.info("No quarantined files older than %d days to purge", retention_days) + return + + _logger.info( + "Purging %d quarantined files older than %d days", + len(old_quarantined), + retention_days, + ) + + for attachment in old_quarantined: + _logger.info( + "Purging old quarantined attachment ID %s (hash: %s, quarantine_date: %s)", + attachment.id, + attachment.quarantine_hash or "N/A", + attachment.quarantine_date, + ) + attachment.with_context(skip_av_scan_queue=True).write( + { + "quarantine_data": False, + } + ) + + _logger.info("Purged encrypted data from %d old quarantined files", len(old_quarantined)) + + @api.model + def _cron_cleanup_forensic_downloads(self): + """Scheduled job to clean up temporary forensic download attachments. + + Forensic download attachments are temporary files created for admin analysis. + They should be cleaned up after a short retention period (default: 24 hours). + """ + ICP = self.env["ir.config_parameter"].sudo() # nosemgrep + retention_hours = int( + ICP.get_param( + FORENSIC_DOWNLOAD_RETENTION_HOURS_PARAM, + DEFAULT_FORENSIC_DOWNLOAD_RETENTION_HOURS, + ) + ) + + if retention_hours <= 0: + _logger.info("Forensic download cleanup disabled (retention_hours <= 0)") + return + + cutoff_date = fields.Datetime.subtract(fields.Datetime.now(), hours=retention_hours) + + old_forensic_downloads = self.search( + [ + ("is_forensic_download", "=", True), + ("create_date", "<", cutoff_date), + ] + ) + + if not old_forensic_downloads: + _logger.info("No forensic download attachments older than %d hours to clean up", retention_hours) + return + + _logger.info( + "Cleaning up %d forensic download attachments older than %d hours. IDs: %s", + len(old_forensic_downloads), + retention_hours, + old_forensic_downloads.ids, + ) + + # Delete all old forensic downloads in batch + old_forensic_downloads.sudo().with_context(skip_av_scan_queue=True).unlink() # nosemgrep diff --git a/spp_attachment_av_scan/pyproject.toml b/spp_attachment_av_scan/pyproject.toml new file mode 100644 index 00000000..4231d0cc --- /dev/null +++ b/spp_attachment_av_scan/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/spp_attachment_av_scan/readme/DESCRIPTION.md b/spp_attachment_av_scan/readme/DESCRIPTION.md new file mode 100644 index 00000000..f7859b84 --- /dev/null +++ b/spp_attachment_av_scan/readme/DESCRIPTION.md @@ -0,0 +1,65 @@ +Scans uploaded file attachments for malware using ClamAV antivirus engine. Automatically queues scans for binary attachments on create or update using queue_job. Quarantines infected files by encrypting them with spp_encryption and removing original data. Provides forensic tools for security administrators to restore false positives or download quarantined files for analysis. + +### Key Capabilities + +- Auto-scan binary attachments on upload or update via background jobs +- Quarantine infected files with encrypted backup and SHA256 hash verification +- Block read access to quarantined attachment data +- Manual rescan, restore, forensic download, and permanent deletion of quarantined files +- Notify security administrators when malware is detected +- Scheduled cleanup of old quarantined files and forensic downloads +- Support ClamAV via Unix socket or network connection + +### Key Models + +| Model | Description | +| ------------------------ | -------------------------------------------------------- | +| `spp.av.scanner.backend` | Configures ClamAV connection (socket/network) and limits | +| `ir.attachment` | Extended with scan status, threat name, and quarantine | + +### Configuration + +After installing: + +1. Navigate to **Settings > Administration > Antivirus Scanners** +2. Create a scanner backend with ClamAV connection details (default: `/var/run/clamav/clamd.sock`) +3. Click **Test Connection** to verify ClamAV is running +4. Set **Active** to enable scanning +5. Configure system parameters: + - `spp_attachment_av_scan.quarantine_encryption_provider_id`: Encryption provider for quarantine + - `spp_attachment_av_scan.quarantine_retention_days`: Days before purging quarantined files (default: 90) + - `spp_attachment_av_scan.forensic_download_retention_hours`: Hours before cleaning forensic downloads (default: 24) + +### UI Location + +- **Scanner Configuration**: Settings > Administration > Antivirus Scanners +- **Quarantined Files**: Settings > Technical > Security > Quarantined Files +- **Attachment Forms**: Scan status and quarantine actions appear in "Antivirus Scan" section + +### Tabs + +**Scanner Backend form** (`spp.av.scanner.backend`): + +- **Connection Settings**: Unix socket or network configuration for ClamAV +- **Connection Status**: Last connection test results and error details + +### Security + +| Group | Model | Access | +| --------------------------------------- | ------------------------ | --------------------------- | +| `base.group_user` | `spp.av.scanner.backend` | Read | +| `base.group_user` | `ir.attachment` | Read scan status | +| `spp_attachment_av_scan.group_av_admin` | `spp.av.scanner.backend` | Full CRUD | +| `spp_attachment_av_scan.group_av_admin` | `ir.attachment` | Manage quarantined files | + +### Extension Points + +- Override `ir.attachment._scan_for_malware()` to customize scan logic or add pre/post-scan hooks +- Inherit `spp.av.scanner.backend` and extend `scan_binary()` to support additional antivirus engines +- Override `ir.attachment._quarantine()` to add custom quarantine handling or external storage + +### Dependencies + +`base`, `mail`, `queue_job`, `spp_encryption`, `spp_security` + +External: `pyclamd` (Python library for ClamAV integration) diff --git a/spp_attachment_av_scan/security/ir.model.access.csv b/spp_attachment_av_scan/security/ir.model.access.csv new file mode 100644 index 00000000..927e9abe --- /dev/null +++ b/spp_attachment_av_scan/security/ir.model.access.csv @@ -0,0 +1,3 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +access_spp_av_scanner_backend_admin,spp.av.scanner.backend admin,model_spp_av_scanner_backend,group_av_admin,1,1,1,1 +access_spp_av_scanner_backend_user,spp.av.scanner.backend user,model_spp_av_scanner_backend,base.group_user,1,0,0,0 diff --git a/spp_attachment_av_scan/security/security.xml b/spp_attachment_av_scan/security/security.xml new file mode 100644 index 00000000..8c94d2f7 --- /dev/null +++ b/spp_attachment_av_scan/security/security.xml @@ -0,0 +1,45 @@ + + + + + Antivirus Administrator + + 20 + + + + + Antivirus Administrator + + + Users who can manage antivirus scanner backends and view scan results + + + + + + AV Scanner Backend: Admin Access + + [(1, '=', 1)] + + + + + + + + + + Attachment: Users can see scan status + + [(1, '=', 1)] + + + + + + + + diff --git a/spp_attachment_av_scan/static/description/icon.png b/spp_attachment_av_scan/static/description/icon.png new file mode 100644 index 00000000..c7dbdaaf Binary files /dev/null and b/spp_attachment_av_scan/static/description/icon.png differ diff --git a/spp_attachment_av_scan/static/description/index.html b/spp_attachment_av_scan/static/description/index.html new file mode 100644 index 00000000..0bc537c0 --- /dev/null +++ b/spp_attachment_av_scan/static/description/index.html @@ -0,0 +1,545 @@ + + + + + +OpenSPP Attachment Antivirus Scan + + + +
+

OpenSPP Attachment Antivirus Scan

+ + +

Beta License: LGPL-3 OpenSPP/OpenSPP2

+

Scans uploaded file attachments for malware using ClamAV antivirus +engine. Automatically queues scans for binary attachments on create or +update using queue_job. Quarantines infected files by encrypting them +with spp_encryption and removing original data. Provides forensic tools +for security administrators to restore false positives or download +quarantined files for analysis.

+
+

Key Capabilities

+
    +
  • Auto-scan binary attachments on upload or update via background jobs
  • +
  • Quarantine infected files with encrypted backup and SHA256 hash +verification
  • +
  • Block read access to quarantined attachment data
  • +
  • Manual rescan, restore, forensic download, and permanent deletion of +quarantined files
  • +
  • Notify security administrators when malware is detected
  • +
  • Scheduled cleanup of old quarantined files and forensic downloads
  • +
  • Support ClamAV via Unix socket or network connection
  • +
+
+
+

Key Models

+ ++++ + + + + + + + + + + + + + +
ModelDescription
spp.av.scanner.backendConfigures ClamAV connection +(socket/network) and limits
ir.attachmentExtended with scan status, threat name, +and quarantine
+
+
+

Configuration

+

After installing:

+
    +
  1. Navigate to Settings > Administration > Antivirus Scanners
  2. +
  3. Create a scanner backend with ClamAV connection details (default: +/var/run/clamav/clamd.sock)
  4. +
  5. Click Test Connection to verify ClamAV is running
  6. +
  7. Set Active to enable scanning
  8. +
  9. Configure system parameters:
      +
    • spp_attachment_av_scan.quarantine_encryption_provider_id: +Encryption provider for quarantine
    • +
    • spp_attachment_av_scan.quarantine_retention_days: Days before +purging quarantined files (default: 90)
    • +
    • spp_attachment_av_scan.forensic_download_retention_hours: Hours +before cleaning forensic downloads (default: 24)
    • +
    +
  10. +
+
+
+

UI Location

+
    +
  • Scanner Configuration: Settings > Administration > Antivirus +Scanners
  • +
  • Quarantined Files: Settings > Technical > Security > Quarantined +Files
  • +
  • Attachment Forms: Scan status and quarantine actions appear in +“Antivirus Scan” section
  • +
+
+
+

Tabs

+

Scanner Backend form (spp.av.scanner.backend):

+
    +
  • Connection Settings: Unix socket or network configuration for +ClamAV
  • +
  • Connection Status: Last connection test results and error details
  • +
+
+
+

Security

+ +++++ + + + + + + + + + + + + + + + + + + + + + + + + +
GroupModelAccess
base.group_userspp.av.scanner.backendRead
base.group_userir.attachmentRead scan status
spp_attachment_av_scan.group_av_adminspp.av.scanner.backendFull CRUD
spp_attachment_av_scan.group_av_adminir.attachmentManage quarantined +files
+
+
+

Extension Points

+
    +
  • Override ir.attachment._scan_for_malware() to customize scan logic +or add pre/post-scan hooks
  • +
  • Inherit spp.av.scanner.backend and extend scan_binary() to +support additional antivirus engines
  • +
  • Override ir.attachment._quarantine() to add custom quarantine +handling or external storage
  • +
+
+
+

Dependencies

+

base, mail, queue_job, spp_encryption, spp_security

+

External: pyclamd (Python library for ClamAV integration)

+

Table of contents

+ +
+

Bug Tracker

+

Bugs are tracked on GitHub Issues. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +feedback.

+

Do not contact contributors directly about support or help with technical issues.

+
+
+

Credits

+
+

Authors

+
    +
  • OpenSPP.org
  • +
+
+
+

Maintainers

+

Current maintainers:

+

jeremi gonzalesedwin1123 reichie020212 emjay0921

+

This module is part of the OpenSPP/OpenSPP2 project on GitHub.

+

You are welcome to contribute.

+
+
+
+
+ + diff --git a/spp_attachment_av_scan/tests/__init__.py b/spp_attachment_av_scan/tests/__init__.py new file mode 100644 index 00000000..9ccd65a6 --- /dev/null +++ b/spp_attachment_av_scan/tests/__init__.py @@ -0,0 +1,2 @@ +from . import test_av_scanner_backend +from . import test_ir_attachment diff --git a/spp_attachment_av_scan/tests/test_av_scanner_backend.py b/spp_attachment_av_scan/tests/test_av_scanner_backend.py new file mode 100644 index 00000000..c22e908f --- /dev/null +++ b/spp_attachment_av_scan/tests/test_av_scanner_backend.py @@ -0,0 +1,281 @@ +import logging +from unittest.mock import MagicMock, patch + +from odoo.exceptions import ValidationError +from odoo.tests import TransactionCase, tagged + +_logger = logging.getLogger(__name__) + + +@tagged("post_install", "-at_install") +class TestAVScannerBackend(TransactionCase): + """Test cases for AV Scanner Backend model.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.scanner_backend = cls.env["spp.av.scanner.backend"] + + def test_create_scanner_backend(self): + """Test creating a scanner backend.""" + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": True, + } + ) + self.assertEqual(backend.name, "Test Scanner") + self.assertEqual(backend.backend_type, "clamd_socket") + self.assertTrue(backend.is_active) + + def test_max_file_size_constraint(self): + """Test that max file size must be positive.""" + with self.assertRaises(ValidationError): + self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "max_file_size_mb": 0, + } + ) + + def test_scan_timeout_constraint(self): + """Test that scan timeout must be positive.""" + with self.assertRaises(ValidationError): + self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "scan_timeout_seconds": -1, + } + ) + + def test_clamd_port_constraint(self): + """Test that ClamAV port must be in valid range.""" + with self.assertRaises(ValidationError): + self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_network", + "clamd_port": 70000, # Invalid port + } + ) + + def test_get_active_scanner(self): + """Test getting active scanner backend.""" + # Create inactive backend (intentionally unused - verifies active scanner returns correct one) + self.scanner_backend.create( + { + "name": "Inactive Scanner", + "backend_type": "clamd_socket", + "is_active": False, + } + ) + + # Create active backend + active = self.scanner_backend.create( + { + "name": "Active Scanner", + "backend_type": "clamd_socket", + "is_active": True, + "sequence": 5, + } + ) + + # Should return the active scanner + result = self.scanner_backend.get_active_scanner() + self.assertEqual(result, active) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_binary_clean_file(self, mock_pyclamd): + """Test scanning a clean file.""" + # Mock ClamAV scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = None # Clean file + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": True, + } + ) + + result = backend.scan_binary(b"test data", filename="test.txt") + + self.assertEqual(result["status"], "clean") + self.assertIsNone(result["threat_name"]) + mock_scanner.scan_stream.assert_called_once() + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_binary_infected_file(self, mock_pyclamd): + """Test scanning an infected file.""" + # Mock ClamAV scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Eicar-Test-Signature")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": True, + } + ) + + result = backend.scan_binary(b"test virus data", filename="virus.exe") + + self.assertEqual(result["status"], "infected") + self.assertEqual(result["threat_name"], "Eicar-Test-Signature") + + def test_scan_binary_file_too_large(self): + """Test skipping scan for files that are too large.""" + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": True, + "max_file_size_mb": 1, # 1 MB limit + } + ) + + # Create 2 MB of data + large_data = b"x" * (2 * 1024 * 1024) + + result = backend.scan_binary(large_data) + + self.assertEqual(result["status"], "skipped") + self.assertIn("exceeds maximum", result["details"]) + + def test_scan_binary_inactive_backend(self): + """Test that inactive backends skip scanning.""" + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": False, + } + ) + + result = backend.scan_binary(b"test data") + + self.assertEqual(result["status"], "skipped") + self.assertIn("not active", result["details"]) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_test_connection_success(self, mock_pyclamd): + """Test successful connection test.""" + # Mock ClamAV scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.version.return_value = "ClamAV 0.103.0" + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + } + ) + + result = backend.test_connection() + + self.assertTrue(backend.last_connection_test_result) + self.assertFalse(backend.last_connection_error) # Odoo Char fields are False when empty + self.assertEqual(result["type"], "ir.actions.client") + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_test_connection_failure(self, mock_pyclamd): + """Test failed connection test.""" + # Mock ClamAV scanner to raise exception + mock_pyclamd.ClamdUnixSocket.side_effect = Exception("Connection refused") + + backend = self.scanner_backend.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + } + ) + + result = backend.test_connection() + + self.assertFalse(backend.last_connection_test_result) + self.assertIsNotNone(backend.last_connection_error) + self.assertEqual(result["params"]["type"], "danger") + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_binary_network_socket(self, mock_pyclamd): + """Test scanning using network socket backend type.""" + # Mock ClamAV network scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = None # Clean file + mock_pyclamd.ClamdNetworkSocket.return_value = mock_scanner + + backend = self.scanner_backend.create( + { + "name": "Network Scanner", + "backend_type": "clamd_network", + "clamd_host": "192.168.1.100", + "clamd_port": 3310, + "is_active": True, + } + ) + + result = backend.scan_binary(b"test data", filename="test.txt") + + self.assertEqual(result["status"], "clean") + # Verify ClamdNetworkSocket was called with correct params + mock_pyclamd.ClamdNetworkSocket.assert_called_once_with( + host="192.168.1.100", + port=3310, + timeout=backend.scan_timeout_seconds, + ) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_network_connection_test_success(self, mock_pyclamd): + """Test successful connection test with network socket.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.version.return_value = "ClamAV 0.105.0" + mock_pyclamd.ClamdNetworkSocket.return_value = mock_scanner + + backend = self.scanner_backend.create( + { + "name": "Network Scanner", + "backend_type": "clamd_network", + "clamd_host": "clamav-server", + "clamd_port": 3310, + } + ) + + result = backend.test_connection() + + self.assertTrue(backend.last_connection_test_result) + self.assertEqual(result["type"], "ir.actions.client") + mock_pyclamd.ClamdNetworkSocket.assert_called_once() + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_binary_infected_network(self, mock_pyclamd): + """Test detecting infected file via network socket.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Win.Trojan.Generic")} + mock_pyclamd.ClamdNetworkSocket.return_value = mock_scanner + + backend = self.scanner_backend.create( + { + "name": "Network Scanner", + "backend_type": "clamd_network", + "is_active": True, + } + ) + + result = backend.scan_binary(b"malicious content") + + self.assertEqual(result["status"], "infected") + self.assertEqual(result["threat_name"], "Win.Trojan.Generic") diff --git a/spp_attachment_av_scan/tests/test_ir_attachment.py b/spp_attachment_av_scan/tests/test_ir_attachment.py new file mode 100644 index 00000000..a680ce96 --- /dev/null +++ b/spp_attachment_av_scan/tests/test_ir_attachment.py @@ -0,0 +1,730 @@ +import base64 +import logging +from unittest.mock import MagicMock, patch + +from odoo import Command, fields +from odoo.exceptions import AccessError, UserError +from odoo.tests import TransactionCase, tagged + +_logger = logging.getLogger(__name__) + + +@tagged("post_install", "-at_install") +class TestIrAttachment(TransactionCase): + """Test cases for ir.attachment with AV scanning.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.attachment_model = cls.env["ir.attachment"] + cls.scanner_backend_model = cls.env["spp.av.scanner.backend"] + + # Create a test scanner backend + cls.scanner_backend = cls.scanner_backend_model.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": True, + } + ) + + def test_attachment_default_scan_status(self): + """Test that new attachments have pending scan status.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "datas": base64.b64encode(b"test content"), + } + ) + + self.assertEqual(attachment.scan_status, "pending") + self.assertFalse(attachment.is_quarantined) + + def test_create_attachment_queues_scan(self): + """Test that creating binary attachment queues/processes scan.""" + # When job_worker is installed, the scan is queued automatically + # In test mode, the job may run synchronously + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test content"), + } + ) + + # The attachment should be created successfully + self.assertTrue(attachment.exists()) + # Status should be pending (queued) or already processed + self.assertIn(attachment.scan_status, ["pending", "clean", "error", "skipped"]) + + def test_scan_non_binary_attachment(self): + """Test that non-binary attachments are skipped.""" + attachment = self.attachment_model.create( + { + "name": "test.url", + "type": "url", + "url": "https://example.com", + } + ) + + attachment._scan_for_malware() + + self.assertEqual(attachment.scan_status, "skipped") + self.assertIn("Not a binary attachment", attachment.scan_result) + + def test_scan_no_active_backend(self): + """Test scanning when no active backend is configured.""" + # Deactivate all backends + self.scanner_backend.is_active = False + + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test content"), + } + ) + + attachment._scan_for_malware() + + self.assertEqual(attachment.scan_status, "skipped") + self.assertIn("No active scanner", attachment.scan_result) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_clean_file(self, mock_pyclamd): + """Test scanning a clean file.""" + # Mock ClamAV scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = None # Clean + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "clean.txt", + "type": "binary", + "datas": base64.b64encode(b"clean content"), + } + ) + + attachment._scan_for_malware() + + self.assertEqual(attachment.scan_status, "clean") + self.assertFalse(attachment.is_quarantined) + self.assertFalse(attachment.threat_name) # Odoo Char fields are False when empty + self.assertTrue(attachment.scan_date) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_infected_file(self, mock_pyclamd): + """Test scanning an infected file.""" + # Mock ClamAV scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Eicar-Test-Signature")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "virus.exe", + "type": "binary", + "datas": base64.b64encode(b"virus content"), + } + ) + + attachment._scan_for_malware() + + self.assertEqual(attachment.scan_status, "infected") + self.assertTrue(attachment.is_quarantined) + self.assertEqual(attachment.threat_name, "Eicar-Test-Signature") + self.assertIsNotNone(attachment.scan_date) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_scan_error_handling(self, mock_pyclamd): + """Test error handling during scan.""" + # Mock ClamAV scanner to raise exception + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.side_effect = Exception("Scan failed") + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test content"), + } + ) + + attachment._scan_for_malware() + + self.assertEqual(attachment.scan_status, "error") + self.assertIn("error", attachment.scan_result.lower()) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_quarantined_file_read_blocked(self, mock_pyclamd): + """Test that quarantined files cannot be read.""" + # Mock ClamAV scanner + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Eicar-Test-Signature")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "virus.exe", + "type": "binary", + "datas": base64.b64encode(b"virus content"), + } + ) + + attachment._scan_for_malware() + + # Try to read the attachment + result = attachment.read(["datas"]) + + # Verify datas is blocked + self.assertFalse(result[0]["datas"]) + + def test_action_rescan(self): + """Test manual rescan action.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test content"), + } + ) + # Set to clean to simulate previous scan + attachment.with_context(skip_av_scan_queue=True).write( + { + "scan_status": "clean", + } + ) + + result = attachment.action_rescan() + + # Verify scan was queued - status should be pending + self.assertEqual(attachment.scan_status, "pending") + self.assertEqual(result["type"], "ir.actions.client") + + def test_action_rescan_non_binary(self): + """Test that rescanning non-binary attachment raises error.""" + attachment = self.attachment_model.create( + { + "name": "test.url", + "type": "url", + "url": "https://example.com", + } + ) + + with self.assertRaises(UserError): + attachment.action_rescan() + + def test_write_datas_queues_rescan(self): + """Test that updating datas field queues rescan.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"original content"), + } + ) + # Set to clean to simulate previous scan + attachment.with_context(skip_av_scan_queue=True).write( + { + "scan_status": "clean", + } + ) + + # Update the datas field + attachment.write( + { + "datas": base64.b64encode(b"updated content"), + } + ) + + # Verify rescan was queued - status should be pending + self.assertEqual(attachment.scan_status, "pending") + + +@tagged("post_install", "-at_install") +class TestEncryptedQuarantine(TransactionCase): + """Test cases for encrypted quarantine functionality.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.attachment_model = cls.env["ir.attachment"] + cls.scanner_backend_model = cls.env["spp.av.scanner.backend"] + cls.encryption_provider_model = cls.env["spp.encryption.provider"] + + # Create a test scanner backend + cls.scanner_backend = cls.scanner_backend_model.create( + { + "name": "Test Scanner", + "backend_type": "clamd_socket", + "is_active": True, + } + ) + + # Create AV admin user for testing admin actions + cls.av_admin_group = cls.env.ref("spp_attachment_av_scan.group_av_admin") + cls.av_admin_user = cls.env["res.users"].create( + { + "name": "AV Admin", + "login": "av_admin_test", + "group_ids": [Command.link(cls.av_admin_group.id), Command.link(cls.env.ref("base.group_user").id)], + } + ) + + # Create regular user without AV admin access + cls.regular_user = cls.env["res.users"].create( + { + "name": "Regular User", + "login": "regular_user_test", + "group_ids": [Command.link(cls.env.ref("base.group_user").id)], + } + ) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_quarantine_stores_hash(self, mock_pyclamd): + """Test that quarantine stores file hash correctly.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Test-Virus")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + original_content = b"infected file content" + attachment = self.attachment_model.create( + { + "name": "virus.exe", + "type": "binary", + "datas": base64.b64encode(original_content), + } + ) + + attachment._scan_for_malware() + + self.assertTrue(attachment.is_quarantined) + self.assertTrue(attachment.quarantine_hash) + self.assertTrue(attachment.quarantine_date) + self.assertEqual(attachment.original_file_size, len(original_content)) + + # Verify hash is correct + import hashlib + + expected_hash = hashlib.sha256(original_content).hexdigest() + self.assertEqual(attachment.quarantine_hash, expected_hash) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_quarantine_clears_original_datas(self, mock_pyclamd): + """Test that quarantine clears the original file data.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Test-Virus")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "virus.exe", + "type": "binary", + "datas": base64.b64encode(b"infected content"), + } + ) + + attachment._scan_for_malware() + + # Original datas should be cleared + self.assertFalse(attachment.datas) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_restore_quarantined_admin_only(self, mock_pyclamd): + """Test that only AV admins can restore quarantined files.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Test-Virus")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "virus.exe", + "type": "binary", + "datas": base64.b64encode(b"infected content"), + } + ) + + attachment._scan_for_malware() + + # Regular user should not be able to restore + with self.assertRaises(AccessError): + attachment.with_user(self.regular_user).action_restore_quarantined() + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_permanently_delete_quarantined(self, mock_pyclamd): + """Test permanently deleting a quarantined file.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Test-Virus")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + attachment = self.attachment_model.create( + { + "name": "virus.exe", + "type": "binary", + "datas": base64.b64encode(b"infected content"), + } + ) + + attachment._scan_for_malware() + attachment_id = attachment.id + + # Delete as admin + attachment.with_user(self.av_admin_user).action_permanently_delete_quarantined() + + # Attachment should no longer exist + self.assertFalse(self.attachment_model.browse(attachment_id).exists()) + + def test_action_rescan_quarantined_blocked(self): + """Test that rescanning a quarantined file is blocked.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test content"), + } + ) + # Simulate quarantined state + attachment.with_context(skip_av_scan_queue=True).write( + { + "is_quarantined": True, + "scan_status": "infected", + } + ) + + with self.assertRaises(UserError): + attachment.action_rescan() + + def test_cron_purge_old_quarantined(self): + """Test scheduled purge of old quarantined files.""" + from datetime import timedelta + + attachment = self.attachment_model.create( + { + "name": "old_virus.exe", + "type": "binary", + "datas": base64.b64encode(b"test"), + } + ) + + # Simulate old quarantine + old_date = fields.Datetime.now() - timedelta(days=100) + attachment.with_context(skip_av_scan_queue=True).write( + { + "is_quarantined": True, + "quarantine_date": old_date, + "quarantine_data": base64.b64encode(b"encrypted_data"), + "scan_status": "infected", + } + ) + + # Set retention to 90 days + self.env["ir.config_parameter"].sudo().set_param("spp_attachment_av_scan.quarantine_retention_days", "90") + + # Run purge + self.attachment_model._cron_purge_old_quarantined_files() + + # Quarantine data should be cleared + attachment.invalidate_recordset() + self.assertFalse(attachment.quarantine_data) + + def test_calculate_file_hash(self): + """Test file hash calculation.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test content"), + } + ) + + test_data = b"test content for hashing" + calculated_hash = attachment._calculate_file_hash(test_data) + + import hashlib + + expected_hash = hashlib.sha256(test_data).hexdigest() + self.assertEqual(calculated_hash, expected_hash) + + def test_restore_not_quarantined_error(self): + """Test that restoring non-quarantined file raises error.""" + attachment = self.attachment_model.create( + { + "name": "clean.txt", + "type": "binary", + "datas": base64.b64encode(b"clean content"), + } + ) + attachment.with_context(skip_av_scan_queue=True).write( + { + "scan_status": "clean", + "is_quarantined": False, + } + ) + + with self.assertRaises(UserError): + attachment.with_user(self.av_admin_user).action_restore_quarantined() + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_notification_sent_on_infection(self, mock_pyclamd): + """Test that security admins are notified when malware is detected.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Test-Malware")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + # Ensure we have the AV admin group with users + av_admin_group = self.env.ref("spp_attachment_av_scan.group_av_admin") + self.assertTrue(av_admin_group.user_ids, "AV admin group should have users") + + attachment = self.attachment_model.create( + { + "name": "malware.exe", + "type": "binary", + "datas": base64.b64encode(b"malicious content"), + } + ) + + # Count messages before scan + messages_before = self.env["mail.message"].search_count( + [("model", "=", "ir.attachment"), ("res_id", "=", attachment.id)] + ) + + attachment._scan_for_malware() + + # Count messages after scan + messages_after = self.env["mail.message"].search_count( + [("model", "=", "ir.attachment"), ("res_id", "=", attachment.id)] + ) + + # A notification message should have been created + self.assertGreater(messages_after, messages_before) + + def test_forensic_download_marked(self): + """Test that forensic downloads are marked with is_forensic_download flag.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test"), + } + ) + + # Simulate quarantine state with encrypted data + # First create an encryption provider with a key + encryption_provider = self.env["spp.encryption.provider"].sudo().search([("type", "=", "jwcrypto")], limit=1) + + if not encryption_provider: + _logger.info("Skipping test_forensic_download_marked: no encryption provider available") + return + + try: + original_content = b"quarantined content" + encrypted_data = encryption_provider.encrypt_data(original_content) + file_hash = attachment._calculate_file_hash(original_content) + + attachment.with_context(skip_av_scan_queue=True).write( + { + "is_quarantined": True, + "scan_status": "infected", + "quarantine_data": base64.b64encode(encrypted_data), + "quarantine_hash": file_hash, + "original_file_size": len(original_content), + "datas": False, + } + ) + + # Download for analysis as admin + attachment.with_user(self.av_admin_user).action_download_quarantined_for_analysis() + + # Find the created download attachment + download_attachment = self.attachment_model.search([("name", "=", f"QUARANTINED_{attachment.name}")]) + + self.assertTrue(download_attachment.exists()) + self.assertTrue(download_attachment.is_forensic_download) + self.assertEqual(download_attachment.scan_status, "skipped") + except ValueError as e: + if "No encryption key" in str(e): + _logger.info("Skipping test_forensic_download_marked: %s", e) + else: + raise + + def test_cron_cleanup_forensic_downloads(self): + """Test scheduled cleanup of old forensic download attachments.""" + from datetime import timedelta + + # Create a forensic download attachment + attachment = self.attachment_model.create( + { + "name": "QUARANTINED_old_file.exe", + "type": "binary", + "datas": base64.b64encode(b"test"), + "is_forensic_download": True, + } + ) + + # Simulate old create_date (25 hours ago) + old_date = fields.Datetime.now() - timedelta(hours=25) + self.env.cr.execute( + "UPDATE ir_attachment SET create_date = %s WHERE id = %s", + (old_date, attachment.id), + ) + attachment.invalidate_recordset() + + # Set retention to 24 hours + self.env["ir.config_parameter"].sudo().set_param( + "spp_attachment_av_scan.forensic_download_retention_hours", "24" + ) + + attachment_id = attachment.id + + # Run cleanup + self.attachment_model._cron_cleanup_forensic_downloads() + + # Attachment should be deleted + self.assertFalse(self.attachment_model.browse(attachment_id).exists()) + + def test_cron_cleanup_keeps_recent_forensic_downloads(self): + """Test that recent forensic downloads are not cleaned up.""" + # Create a recent forensic download attachment + attachment = self.attachment_model.create( + { + "name": "QUARANTINED_recent_file.exe", + "type": "binary", + "datas": base64.b64encode(b"test"), + "is_forensic_download": True, + } + ) + + # Set retention to 24 hours + self.env["ir.config_parameter"].sudo().set_param( + "spp_attachment_av_scan.forensic_download_retention_hours", "24" + ) + + attachment_id = attachment.id + + # Run cleanup + self.attachment_model._cron_cleanup_forensic_downloads() + + # Attachment should still exist (it's recent) + self.assertTrue(self.attachment_model.browse(attachment_id).exists()) + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_large_file_skipped_during_scan(self, mock_pyclamd): + """Test that files exceeding max_file_size_mb are skipped.""" + # Configure scanner with 1 MB limit + self.scanner_backend.write({"max_file_size_mb": 1}) + + # Create attachment with 2 MB of data + large_content = b"x" * (2 * 1024 * 1024) + attachment = self.attachment_model.create( + { + "name": "large_file.bin", + "type": "binary", + "datas": base64.b64encode(large_content), + } + ) + + # Run scan + attachment._scan_for_malware() + + # Should be skipped due to size + self.assertEqual(attachment.scan_status, "skipped") + self.assertIn("exceeds maximum", attachment.scan_result) + + def test_size_validation_on_restore(self): + """Test that size validation catches corrupted restored files.""" + attachment = self.attachment_model.create( + { + "name": "test.txt", + "type": "binary", + "datas": base64.b64encode(b"test"), + } + ) + + encryption_provider = self.env["spp.encryption.provider"].sudo().search([("type", "=", "jwcrypto")], limit=1) + + if not encryption_provider: + _logger.info("Skipping test_size_validation_on_restore: no encryption provider available") + return + + try: + original_content = b"original content here" + # Encrypt different data than what original_file_size indicates + different_content = b"different" + encrypted_data = encryption_provider.encrypt_data(different_content) + file_hash = attachment._calculate_file_hash(different_content) + + attachment.with_context(skip_av_scan_queue=True).write( + { + "is_quarantined": True, + "scan_status": "infected", + "quarantine_data": base64.b64encode(encrypted_data), + "quarantine_hash": file_hash, + # Set wrong size to simulate corruption + "original_file_size": len(original_content), + "datas": False, + } + ) + + # Restore should fail due to size mismatch + with self.assertRaises(UserError) as context: + attachment.with_user(self.av_admin_user).action_restore_quarantined() + + self.assertIn("size mismatch", str(context.exception).lower()) + except ValueError as e: + if "No encryption key" in str(e): + _logger.info("Skipping test_size_validation_on_restore: %s", e) + else: + raise + + @patch("odoo.addons.spp_attachment_av_scan.models.av_scanner_backend.pyclamd") + def test_full_restore_flow_with_encryption(self, mock_pyclamd): + """Test complete quarantine and restore flow with encryption.""" + mock_scanner = MagicMock() + mock_scanner.ping.return_value = True + mock_scanner.scan_stream.return_value = {"stream": ("FOUND", "Test-Virus")} + mock_pyclamd.ClamdUnixSocket.return_value = mock_scanner + + original_content = b"this was falsely detected as malware" + attachment = self.attachment_model.create( + { + "name": "false_positive.doc", + "type": "binary", + "datas": base64.b64encode(original_content), + } + ) + + # Scan and quarantine + attachment._scan_for_malware() + + self.assertTrue(attachment.is_quarantined) + self.assertEqual(attachment.scan_status, "infected") + self.assertFalse(attachment.datas) # Original data cleared + self.assertTrue(attachment.quarantine_hash) + self.assertEqual(attachment.original_file_size, len(original_content)) + + # Check if encryption provider exists for restore test + encryption_provider = self.env["spp.encryption.provider"].sudo().search([("type", "=", "jwcrypto")], limit=1) + + if encryption_provider and attachment.quarantine_data: + # Restore as admin + attachment.with_user(self.av_admin_user).action_restore_quarantined() + + # Verify restoration + self.assertFalse(attachment.is_quarantined) + self.assertEqual(attachment.scan_status, "clean") + self.assertTrue(attachment.datas) + + # Verify restored content matches original + restored_content = base64.b64decode(attachment.datas) + self.assertEqual(restored_content, original_content) diff --git a/spp_attachment_av_scan/views/av_scanner_backend_views.xml b/spp_attachment_av_scan/views/av_scanner_backend_views.xml new file mode 100644 index 00000000..d9cf3ccb --- /dev/null +++ b/spp_attachment_av_scan/views/av_scanner_backend_views.xml @@ -0,0 +1,123 @@ + + + + + spp.av.scanner.backend.tree + spp.av.scanner.backend + + + + + + + + + + + + + + + spp.av.scanner.backend.form + spp.av.scanner.backend + +
+
+
+ +
+ +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+
+ + + + Antivirus Scanner Backends + spp.av.scanner.backend + list,form + +

+ Create your first antivirus scanner backend +

+

+ Configure ClamAV or other antivirus scanners to automatically + scan uploaded files for malware. +

+
+
+ + + +
diff --git a/spp_attachment_av_scan/views/ir_attachment_views.xml b/spp_attachment_av_scan/views/ir_attachment_views.xml new file mode 100644 index 00000000..65cff6ff --- /dev/null +++ b/spp_attachment_av_scan/views/ir_attachment_views.xml @@ -0,0 +1,181 @@ + + + + + ir.attachment.form.inherit.av.scan + ir.attachment + + + + + + + + + + + + + + + + +
+

+ +

+
+ + +
+
+