diff --git a/CHANGELOG.md b/CHANGELOG.md index 3918e42a..9a5c9723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/). ## Version 3.12.0 - Upcoming +### Added + +- A maximum concurrent amount of scans can now be configured for the malware scanner. + +### Changed + +- The retry logic for the malware scanner was improved to be more robust under high loads. + ### Fixed - Wrong file name being shown when rejecting an attachment due to file size. diff --git a/README.md b/README.md index c7611256..6bd91fd6 100755 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ The `@cap-js/attachments` package is a [CDS plugin](https://cap.cloud.sap/docs/n - [Changes in the CDS Models](#changes-in-the-cds-models) - [Storage Targets](#storage-targets) - [Malware Scanner](#malware-scanner) + - [Rate Limit Handling (Auto-Retry)](#rate-limit-handling-auto-retry) + - [Scan Concurrency Limiting](#scan-concurrency-limiting) - [Automatic file rescanning](#automatic-file-rescanning) - [Audit logging](#audit-logging) - [Visibility Control for Attachments UI Facet Generation](#visibility-control-for-attachments-ui-facet-generation) @@ -231,6 +233,60 @@ Scan status codes: > [!Note] > If the malware scanner reports a file size larger than the limit specified via [@Validation.Maximum](#specify-the-maximum-file-size) it removes the file and sets the status of the attachment metadata to failed. +#### Rate Limit Handling (Auto-Retry) + +The SAP Malware Scanning Service enforces a rate limit of 30 concurrent requests per subaccount. When this limit is exceeded, the service responds with HTTP `429 Too Many Requests`. By default, the plugin automatically retries scan requests that receive a 429 response using exponential backoff with jitter. + +You can configure the retry behavior in `package.json` or `.cdsrc.json`: + +```json +{ + "cds": { + "requires": { + "malwareScanner": { + "retry": { + "maxAttempts": 5, + "initialDelay": 1000, + "maxDelay": 30000 + } + } + } + } +} +``` + +| Option | Default | Description | +| -------------------- | ------- | ------------------------------------------------------ | +| `retry.maxAttempts` | `5` | Total number of attempts including the initial request | +| `retry.initialDelay` | `1000` | Base delay in milliseconds before the first retry | +| `retry.maxDelay` | `30000` | Maximum delay in milliseconds between retries | + +When a 429 response includes a `Retry-After` header, the plugin respects that value (capped at `maxDelay`). Only 429 responses trigger retries — other errors fail immediately. + +To disable retry and restore the previous behavior (immediate failure on 429), set `retry` to `false`. + +#### Scan Concurrency Limiting + +To reduce pressure on the shared rate limit, the plugin limits how many scan requests run concurrently within a single process. Excess scans are queued and processed as slots become available. + +```json +{ + "cds": { + "requires": { + "malwareScanner": { + "maxConcurrentScans": 10 + } + } + } +} +``` + +| Option | Default | Description | +| -------------------- | ------- | ------------------------------------------------------------------------------------------------------ | +| `maxConcurrentScans` | `30` | Maximum number of concurrent scan requests per process. Set to `0` to disable (unbounded parallelism). | + +A scan that is retrying due to a 429 response holds its concurrency slot during the backoff wait, preventing retry storms from competing with new scans. + #### Automatic file rescanning According to the recommendation of the [Malware Scanning Service](http://help.sap.com/docs/malware-scanning-service/sap-malware-scanning-service/developing-applications-with-sap-malware-scanning-service), attachments should be rescanned automatically if the last scan is older than 3 days. This behavior can be configured in the attachments settings by specifying the `scanExpiryMs` property: diff --git a/package.json b/package.json index 54dc7b10..54a57898 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,12 @@ "cds": { "requires": { "malwareScanner": { + "retry": { + "maxAttempts": 5, + "initialDelay": 1000, + "maxDelay": 30000, + "maxConcurrentScans": 30 + }, "vcap": { "label": "malware-scanner" } diff --git a/srv/attachments/basic.js b/srv/attachments/basic.js index 18f5159e..eaf029e4 100644 --- a/srv/attachments/basic.js +++ b/srv/attachments/basic.js @@ -127,11 +127,16 @@ class AttachmentsService extends cds.Service { res = await Promise.all( data.map(async (d) => { const res = await UPSERT(d).into(attachments) - const attachmentForHash = await this.get(attachments, { ID: d.ID }) - // If this is just the PUT for metadata, there is not yet any file to retrieve - if (attachmentForHash) { - const hash = await computeHash(attachmentForHash) - await this.update(attachments, { ID: d.ID }, { hash }) + // When scanning is enabled, skip hash computation here — the malware + // scanner returns SHA-256 in its response and writes the hash itself. + // This avoids a redundant file read (expensive for object store backends). + const scanEnabled = cds.env.requires?.attachments?.scan !== false + if (!scanEnabled || !this._skipInlineHash) { + const attachmentForHash = await this.get(attachments, { ID: d.ID }) + if (attachmentForHash) { + const hash = await computeHash(attachmentForHash) + await this.update(attachments, { ID: d.ID }, { hash }) + } } return res }), diff --git a/srv/attachments/object-store.js b/srv/attachments/object-store.js index d3165224..f7febf75 100644 --- a/srv/attachments/object-store.js +++ b/srv/attachments/object-store.js @@ -7,6 +7,9 @@ module.exports = class RemoteAttachmentsService extends require("./basic") { objectStoreKind = cds.env.requires?.attachments?.objectStore?.kind separateObjectStore = this.isMultiTenancyEnabled && this.objectStoreKind === "separate" + // Skip inline hash computation in put() — the malware scanner already + // returns SHA-256, avoiding a redundant remote file download from object store. + _skipInlineHash = true init() { LOG.debug(`${this.constructor.name} initialization`, { diff --git a/srv/malware-scanner/malwareScanner.js b/srv/malware-scanner/malwareScanner.js index 2c948d6a..2862e4ab 100644 --- a/srv/malware-scanner/malwareScanner.js +++ b/srv/malware-scanner/malwareScanner.js @@ -2,6 +2,7 @@ const cds = require("@sap/cds") const crypto = require("crypto") const https = require("https") const { URL } = require("url") +const Semaphore = require("./semaphore") const LOG = cds.log("attachments") class MalwareScanner extends cds.ApplicationService { @@ -13,6 +14,16 @@ class MalwareScanner extends cds.ApplicationService { this.on("ScanAttachmentsFile", this.scanAttachmentsFile) this.on("scan", this.scanFile) + const config = cds.env.requires?.malwareScanner + this.retryConfig = { + enabled: !!config?.retry, + maxAttempts: config?.retry?.maxAttempts, + initialDelay: config?.retry?.initialDelay, + maxDelay: config?.retry?.maxDelay, + } + const maxConcurrent = config?.maxConcurrentScans + this.semaphore = maxConcurrent > 0 ? new Semaphore(maxConcurrent) : null + return super.init() } @@ -24,6 +35,15 @@ class MalwareScanner extends cds.ApplicationService { * @param {{data: {target: string, keys: object}}} msg The target is the CSN Entity name, which is used to lookup entity via cds.model.definitions[]. */ async scanAttachmentsFile(msg) { + if (this.semaphore) await this.semaphore.acquire() + try { + return await this._scanAttachmentsFile(msg) + } finally { + if (this.semaphore) this.semaphore.release() + } + } + + async _scanAttachmentsFile(msg) { const { target, keys } = msg.data const scanEnabled = cds.env.requires?.attachments?.scan ?? true if (!scanEnabled) { @@ -51,35 +71,19 @@ class MalwareScanner extends cds.ApplicationService { await this.updateStatus(_target, keys, "Scanning") - LOG.debug( - `Fetching file content for scanning for ${target}, ${JSON.stringify(keys)}`, - ) - const contentStream = await AttachmentsSrv.get( - model.definitions[target], - keys, - ) - - if (!contentStream) { - LOG.warn( - `Cannot fetch file content for malware scanning for ${target}, ${JSON.stringify(keys)}! Check if the file exists.`, - ) - await this.updateStatus(_target, keys, "Failed") - return - } - let res try { - res = await this.scan(contentStream) + res = await this._scanWithRetry(AttachmentsSrv, model, target, keys) } catch (err) { LOG.error( `Request to malware scanner failed for ${target}, ${JSON.stringify(keys)}`, err, ) - await this.updateStatus(target, keys, "Failed") + await this.updateStatus(_target, keys, "Failed") throw err } - let status = res.isMalware ? "Infected" : "Clean" + const status = res.isMalware ? "Infected" : "Clean" const hash = res.hash if (status === "Infected") { @@ -96,7 +100,82 @@ class MalwareScanner extends cds.ApplicationService { } // Assign hash as another condition to ensure the correct file is marked as fine - await this.updateStatus(_target, Object.assign({ hash }, keys), status) + await this.updateStatus( + _target, + [ + "(", + { ref: ["hash"] }, + "=", + { val: hash }, + "or", + { ref: ["hash"] }, + "is", + "null", + ")", + "and", + { + xpr: Object.keys(keys).reduce((acc, key) => { + if (acc.length) acc.push("and") + acc.push({ ref: [key] }, "=", { val: keys[key] }) + return acc + }, []), + }, + ], + status, + hash, + ) + } + + async _scanWithRetry(AttachmentsSrv, model, target, keys) { + const { enabled, maxAttempts } = this.retryConfig + const attempts = enabled ? maxAttempts : 1 + + for (let attempt = 1; attempt <= attempts; attempt++) { + LOG.debug( + `Fetching file content for scanning for ${target}, ${JSON.stringify(keys)}`, + ) + const contentStream = await AttachmentsSrv.get( + model.definitions[target], + keys, + ) + + if (!contentStream) { + throw new Error( + `Cannot fetch file content for malware scanning for ${target}, ${JSON.stringify(keys)}`, + ) + } + + try { + const res = await this.scan(contentStream) + if (attempt > 1) { + LOG.info( + `Malware scan for ${target}, ${JSON.stringify(keys)} succeeded after ${attempt} attempts`, + ) + } + return res + } catch (err) { + const is429 = err.response?.status === 429 + const hasRetriesLeft = attempt < attempts + + if (is429 && hasRetriesLeft) { + const delay = this._calculateRetryDelay(err, attempt) + LOG.debug( + `Malware scan received 429, retrying in ${Math.round(delay)}ms (attempt ${attempt}/${attempts})`, + ) + await new Promise((resolve) => setTimeout(resolve, delay)) + continue + } + + if (is429) { + throw new Error( + `Malware scan failed after ${attempts} attempts (last status: 429)`, + { cause: err }, + ) + } + + throw err + } + } } async getFileInformation(target, keys) { @@ -107,113 +186,90 @@ class MalwareScanner extends cds.ApplicationService { return dbResult } - async updateStatus(target, keys, status) { + async updateStatus(target, where, status, hash) { + const updateObject = { status } + if (status !== "Scanning") { + updateObject.lastScan = new Date() + } + if (hash) { + updateObject.hash = hash + } if (target.drafts) { await Promise.all([ - UPDATE.entity(target).where(keys).set({ status, lastScan: new Date() }), - UPDATE.entity(target.drafts) - .where(keys) - .set({ status, lastScan: new Date() }), + UPDATE.entity(target).where(where).set(updateObject), + UPDATE.entity(target.drafts).where(where).set(updateObject), ]) } else { - await UPDATE.entity(target) - .where(keys) - .set({ status, lastScan: new Date() }) + await UPDATE.entity(target).where(where).set(updateObject) } LOG.info( - `Updated scan status to ${status} for ${target.name}, ${JSON.stringify(keys)}`, + `Updated scan status to ${status} for ${target.name}, ${JSON.stringify(where)}`, ) } /** * Scans the passed over file * @param {*} req - The request object - * @param {string} fileName - The name of the file being scanned */ async scanFile(req) { const { file } = req.data - let response const scanStartTime = Date.now() try { - // Prepare request options const url = new URL(`https://${this.credentials.uri}/scan`) - const requestOptions = { - method: "POST", - hostname: url.hostname, - port: url.port || 443, - path: url.pathname, - headers: {}, - } - - if (this.credentials?.certificate && this.credentials?.key) { - LOG.debug("Using mTLS authentication for malware scanning") - - const cert = new crypto.X509Certificate(this.credentials.certificate) - const expiryDate = new Date(cert.validTo) - const now = Date.now() - - // Show warning if certificate is expired or expiring within 30 days - const msIn30Days = 30 * 24 * 60 * 60 * 1000 - - if (expiryDate.getTime() < now) { - LOG.error("Malware scanner certificate expired", { - validTo: cert.validTo, - }) - throw new Error("Malware scanner certificate expired") - } else if (expiryDate.getTime() - now < msIn30Days) { - LOG.warn("Malware scanner certificate expiring soon", { - validTo: cert.validTo, - }) - } - - requestOptions.cert = this.credentials.certificate - requestOptions.key = this.credentials.key - requestOptions.rejectUnauthorized = false - - LOG.debug("Using mTLS authorization") - } else if (this.credentials?.username && this.credentials?.password) { - // Basic Auth: set Authorization header - LOG.warn( - "Deprecated: Basic Authentication for malware scanning is deprecated and will be removed in future releases.", - ) - requestOptions.headers.Authorization = - "Basic " + - Buffer.from( - `${this.credentials.username}:${this.credentials.password}`, - "binary", - ).toString("base64") - LOG.debug("Using basic authorization") - } else { - throw new Error( - "Could not find any credentials to authenticate against malware scanning service, please make sure binding and service key exists.", - ) - } + const requestOptions = this._buildRequestOptions(url) - response = await new Promise((resolve, reject) => { - const req = https.request(requestOptions, (res) => { + const response = await new Promise((resolve, reject) => { + const httpReq = https.request(requestOptions, (res) => { let data = "" res.on("data", (chunk) => (data += chunk)) res.on("end", () => { resolve({ status: res.statusCode, - ok: res.statusCode >= 200 && res.statusCode < 300, + headers: res.headers, data, }) }) }) - req.on("error", reject) + httpReq.on("error", reject) + file.pipe(httpReq) + }) - file.pipe(req) + if (response.status < 200 || response.status >= 300) { + const error = new Error( + `Malware scan request failed with status ${response.status}`, + ) + error.response = { + status: response.status, + headers: response.headers, + } + throw error + } + + /** + * @typedef {Object} MalwareScanResponse + * @property {boolean} malwareDetected - Indicates whether the scan engine detected a threat. + * @property {boolean} encryptedContentDetected - Indicates whether the file has encrypted parts, which could not be scanned. + * @property {number} scanSize - Size in bytes of the scanned file. + * @property {string} finding - This field may contain information about detected malware. + * @property {string} mimeType - Indicates the detected MIME type for the scanned file. + * @property {string} SHA256 - SHA-256 hash of the scanned file. + */ + /** @type {MalwareScanResponse} */ + const responseJson = JSON.parse(response.data) + const scanDuration = Date.now() - scanStartTime + LOG.debug(`Malware scan response`, { + scanDuration, + response: responseJson, }) - if (!response.ok) { - const json = JSON.parse(response.data || "{}") - const errorMsg = - JSON.stringify(json) || - response.statusText || - "Unknown error from malware scanner" - return req.reject(response.status, `Scanning failed: ${errorMsg}`) + return { + isMalware: responseJson.malwareDetected, + encryptedContentDetected: responseJson.encryptedContentDetected, + scanSize: responseJson.scanSize, + finding: responseJson.finding, + mimeType: responseJson.mimeType, + hash: responseJson.SHA256, } } catch (error) { const scanDuration = Date.now() - scanStartTime @@ -224,33 +280,74 @@ class MalwareScanner extends cds.ApplicationService { { scanDuration, scannerUri: this.credentials?.uri }, ) file?.destroy() - return req.reject(500, "Scanning failed") + throw error } finally { file?.destroy() } + } - /** - * @typedef {Object} MalwareScanResponse - * @property {boolean} malwareDetected - Indicates whether the scan engine detected a threat. - * @property {boolean} encryptedContentDetected - Indicates whether the file has encrypted parts, which could not be scanned. - * @property {number} scanSize - Size in bytes of the scanned file. Use the file size to validate the success of data transmission. - * @property {string} finding - This field may contain information about detected malware. - * @property {string} mimeType - Indicates the detected MIME type for the scanned file. This data may not be reliable and results may vary on different service providers. - * @property {string} SHA256 - SHA-256 hash of the scanned file. Use the hash to validate the success of data transmission. - */ - /** @type {MalwareScanResponse} */ - const responseJson = JSON.parse(response.data) - const scanDuration = Date.now() - scanStartTime - LOG.debug(`Malware scan response`, { scanDuration, response: responseJson }) - - return { - isMalware: responseJson.malwareDetected, - encryptedContentDetected: responseJson.encryptedContentDetected, - scanSize: responseJson.scanSize, - finding: responseJson.finding, - mimeType: responseJson.mimeType, - hash: responseJson.SHA256, + _buildRequestOptions(url) { + const requestOptions = { + method: "POST", + hostname: url.hostname, + port: url.port || 443, + path: url.pathname, + headers: {}, } + + if (this.credentials?.certificate && this.credentials?.key) { + LOG.debug("Using mTLS authentication for malware scanning") + + const cert = new crypto.X509Certificate(this.credentials.certificate) + const expiryDate = new Date(cert.validTo) + const now = Date.now() + const msIn30Days = 30 * 24 * 60 * 60 * 1000 + + if (expiryDate.getTime() < now) { + LOG.error("Malware scanner certificate expired", { + validTo: cert.validTo, + }) + throw new Error("Malware scanner certificate expired") + } else if (expiryDate.getTime() - now < msIn30Days) { + LOG.warn("Malware scanner certificate expiring soon", { + validTo: cert.validTo, + }) + } + + requestOptions.cert = this.credentials.certificate + requestOptions.key = this.credentials.key + requestOptions.rejectUnauthorized = false + LOG.debug("Using mTLS authorization") + } else if (this.credentials?.username && this.credentials?.password) { + LOG.warn( + "Deprecated: Basic Authentication for malware scanning is deprecated and will be removed in future releases.", + ) + requestOptions.headers.Authorization = + "Basic " + + Buffer.from( + `${this.credentials.username}:${this.credentials.password}`, + "binary", + ).toString("base64") + LOG.debug("Using basic authorization") + } else { + throw new Error( + "Could not find any credentials to authenticate against malware scanning service, please make sure binding and service key exists.", + ) + } + + return requestOptions + } + + _calculateRetryDelay(err, attempt) { + const { initialDelay, maxDelay } = this.retryConfig + const retryAfter = err.response?.headers?.["retry-after"] + + if (retryAfter) { + return Math.min(parseInt(retryAfter, 10) * 1000, maxDelay) + } + + const jitter = Math.random() * 500 + return Math.min(initialDelay * Math.pow(2, attempt - 1) + jitter, maxDelay) } } diff --git a/srv/malware-scanner/semaphore.js b/srv/malware-scanner/semaphore.js new file mode 100644 index 00000000..fa9e2706 --- /dev/null +++ b/srv/malware-scanner/semaphore.js @@ -0,0 +1,26 @@ +class Semaphore { + constructor(max) { + this._max = max + this._current = 0 + this._queue = [] + } + + acquire() { + if (this._current < this._max) { + this._current++ + return Promise.resolve() + } + return new Promise((resolve) => this._queue.push(resolve)) + } + + release() { + if (this._queue.length > 0) { + const next = this._queue.shift() + next() + } else { + this._current-- + } + } +} + +module.exports = Semaphore diff --git a/tests/incidents-app/package.json b/tests/incidents-app/package.json index 4958f4b3..c010759f 100644 --- a/tests/incidents-app/package.json +++ b/tests/incidents-app/package.json @@ -19,10 +19,8 @@ "requires": { "queue": { "parallel": true, - "chunkSize": 20 - }, - "attachments": { - "scanExpiryMs": 30000 + "chunkSize": 50, + "maxAttempts": 2 }, "auth": { "[development]": { diff --git a/tests/integration/attachments.test.js b/tests/integration/attachments.test.js index c1246df7..bf7905ae 100644 --- a/tests/integration/attachments.test.js +++ b/tests/integration/attachments.test.js @@ -3,7 +3,6 @@ const { RequestSend } = require("../utils/api") const { waitForScanStatus, newIncident, - delay, waitForMalwareDeletion, waitForDeletion, runWithUser, @@ -264,9 +263,12 @@ describe("Tests for uploading/deleting attachments through API calls", () => { ) expect(contentResponse.status).toEqual(200) expect(contentResponse.data).toBeTruthy() - - // Wait for 45 seconds to let the scan status expire - await delay(45 * 1000) + const Incidents_attachments = cds.entities("sap.capire.incidents")[ + "Incidents.attachments" + ] + await UPDATE.entity(Incidents_attachments) + .where({ ID: sampleDocID }) + .set({ lastScan: "2020-01-01T00:00:00" }) await GET( `odata/v4/processor/Incidents(ID=${incidentID},IsActiveEntity=true)/attachments(up__ID=${incidentID},ID=${sampleDocID},IsActiveEntity=true)/content`, @@ -1851,7 +1853,6 @@ describe("Tests for uploading/deleting attachments through API calls", () => { ) expect(resultResponse.status).toEqual(200) - const scanCleanWaiter2 = waitForScanStatus("Clean") try { await waitForDeletion(attachmentResponse.data.value[0].url) // Should throw due to timeout @@ -1862,11 +1863,6 @@ describe("Tests for uploading/deleting attachments through API calls", () => { ) } - // Second scan round needed due to scan expiry limit for other tests. Triggered via rescan - await GET( - `odata/v4/processor/Incidents(ID=${incidentID},IsActiveEntity=true)/attachments(up__ID=${incidentID},ID=${sampleDocID},IsActiveEntity=true)/content`, - ) - await scanCleanWaiter2 const contentAfterActiveUpdate = await GET( `odata/v4/processor/Incidents(ID=${incidentID},IsActiveEntity=true)/attachments(up__ID=${incidentID},ID=${sampleDocID},IsActiveEntity=true)/content`, ) diff --git a/tests/unit/malwareScanner.test.js b/tests/unit/malwareScanner.test.js index 9043aad6..afa85ecc 100644 --- a/tests/unit/malwareScanner.test.js +++ b/tests/unit/malwareScanner.test.js @@ -4,6 +4,7 @@ const { join } = cds.utils.path cds.test(join(__dirname, "../incidents-app")) const MalwareScanner = require("../../srv/malware-scanner/malwareScanner") +const Semaphore = require("../../srv/malware-scanner/semaphore") let scanner let attachmentsSvc @@ -27,6 +28,12 @@ beforeEach(() => { cds.connect.to = jest.fn().mockResolvedValue(attachmentsSvc) scanner = new MalwareScanner() + scanner.retryConfig = { + enabled: true, + maxAttempts: 5, + initialDelay: 1000, + maxDelay: 30000, + } scanner.scan = jest.fn() }) @@ -55,7 +62,9 @@ describe("scanAttachmentsFile", () => { it("sets status Failed when content stream is null", async () => { attachmentsSvc.get.mockResolvedValue(null) jest.spyOn(scanner, "updateStatus").mockResolvedValue() - await scanner.scanAttachmentsFile({ data: { target, keys } }) + await expect( + scanner.scanAttachmentsFile({ data: { target, keys } }), + ).rejects.toThrow("Cannot fetch file content") expect(scanner.updateStatus).toHaveBeenLastCalledWith( expect.anything(), keys, @@ -90,6 +99,7 @@ describe("scanAttachmentsFile", () => { expect.anything(), expect.anything(), "Infected", + expect.anything(), ) }) @@ -103,6 +113,7 @@ describe("scanAttachmentsFile", () => { expect.anything(), expect.anything(), "Clean", + expect.anything(), ) }) }) @@ -225,3 +236,225 @@ describe("getCredentials", () => { ) }) }) + +// --------------------------------------------------------------------------- +// 429 retry at scanAttachmentsFile level (stream re-fetch) +// --------------------------------------------------------------------------- + +describe("429 retry (stream re-fetch)", () => { + const target = "ProcessorService.Incidents.attachments" + const keys = { up__ID: cds.utils.uuid(), ID: cds.utils.uuid() } + + function make429(headers = {}) { + const err = new Error("429") + err.response = { status: 429, headers } + return err + } + + beforeEach(() => { + scanner.retryConfig = { + enabled: true, + maxAttempts: 3, + initialDelay: 1, + maxDelay: 10, + } + jest.spyOn(scanner, "updateStatus").mockResolvedValue() + }) + + it("fetches stream once on first success (no retry needed)", async () => { + attachmentsSvc.get.mockResolvedValue({}) + scanner.scan.mockResolvedValue({ isMalware: false, hash: "h" }) + + await scanner.scanAttachmentsFile({ data: { target, keys } }) + expect(attachmentsSvc.get).toHaveBeenCalledTimes(1) + expect(scanner.scan).toHaveBeenCalledTimes(1) + }) + + it("re-fetches stream on 429 then succeeds", async () => { + attachmentsSvc.get.mockResolvedValue({}) + scanner.scan + .mockRejectedValueOnce(make429()) + .mockResolvedValueOnce({ isMalware: false, hash: "h" }) + + await scanner.scanAttachmentsFile({ data: { target, keys } }) + expect(attachmentsSvc.get).toHaveBeenCalledTimes(2) + expect(scanner.scan).toHaveBeenCalledTimes(2) + }) + + it("sets status Failed after exhausting all retry attempts on 429", async () => { + attachmentsSvc.get.mockResolvedValue({}) + scanner.scan + .mockRejectedValueOnce(make429()) + .mockRejectedValueOnce(make429()) + .mockRejectedValueOnce(make429()) + + await expect( + scanner.scanAttachmentsFile({ data: { target, keys } }), + ).rejects.toThrow("Malware scan failed after 3 attempts (last status: 429)") + expect(attachmentsSvc.get).toHaveBeenCalledTimes(3) + expect(scanner.updateStatus).toHaveBeenLastCalledWith( + expect.anything(), + keys, + "Failed", + ) + }) + + it("does not retry on non-429 errors", async () => { + attachmentsSvc.get.mockResolvedValue({}) + scanner.scan.mockRejectedValue(new Error("boom")) + + await expect( + scanner.scanAttachmentsFile({ data: { target, keys } }), + ).rejects.toThrow("boom") + expect(attachmentsSvc.get).toHaveBeenCalledTimes(1) + expect(scanner.scan).toHaveBeenCalledTimes(1) + expect(scanner.updateStatus).toHaveBeenLastCalledWith( + expect.anything(), + keys, + "Failed", + ) + }) + + it("respects Retry-After header (capped at maxDelay)", async () => { + attachmentsSvc.get.mockResolvedValue({}) + scanner.scan + .mockRejectedValueOnce(make429({ "retry-after": "1" })) + .mockResolvedValueOnce({ isMalware: false, hash: "h" }) + + const start = Date.now() + await scanner.scanAttachmentsFile({ data: { target, keys } }) + // Retry-After: 1s = 1000ms, capped at maxDelay (10ms in test config) + const elapsed = Date.now() - start + expect(elapsed).toBeLessThan(500) + expect(attachmentsSvc.get).toHaveBeenCalledTimes(2) + }) + + it("does not retry when retry is disabled", async () => { + scanner.retryConfig.enabled = false + attachmentsSvc.get.mockResolvedValue({}) + scanner.scan.mockRejectedValue(make429()) + + await expect( + scanner.scanAttachmentsFile({ data: { target, keys } }), + ).rejects.toThrow("Malware scan failed after 1 attempts (last status: 429)") + expect(attachmentsSvc.get).toHaveBeenCalledTimes(1) + expect(scanner.scan).toHaveBeenCalledTimes(1) + }) +}) + +// --------------------------------------------------------------------------- +// Semaphore / concurrency limiting +// --------------------------------------------------------------------------- + +describe("concurrency limiting", () => { + const target = "ProcessorService.Incidents.attachments" + const keys = { up__ID: cds.utils.uuid(), ID: cds.utils.uuid() } + const noRetry = { + enabled: false, + maxAttempts: 1, + initialDelay: 1000, + maxDelay: 30000, + } + + beforeEach(() => { + attachmentsSvc.get.mockResolvedValue({}) + }) + + function mockScanWithConcurrencyTracking() { + let activeConcurrent = 0 + let peakConcurrent = 0 + const resolvers = [] + + scanner.scan = jest.fn(() => { + activeConcurrent++ + peakConcurrent = Math.max(peakConcurrent, activeConcurrent) + return new Promise((resolve) => { + resolvers.push(() => { + activeConcurrent-- + resolve({ isMalware: false, hash: "h" }) + }) + }) + }) + jest.spyOn(scanner, "updateStatus").mockResolvedValue() + + return { + get activeConcurrent() { + return activeConcurrent + }, + get peakConcurrent() { + return peakConcurrent + }, + resolvers, + } + } + + it("limits concurrent scans to maxConcurrentScans", async () => { + scanner.semaphore = new Semaphore(2) + scanner.retryConfig = noRetry + const tracking = mockScanWithConcurrencyTracking() + + const promises = [] + for (let i = 0; i < 5; i++) { + promises.push(scanner.scanAttachmentsFile({ data: { target, keys } })) + } + + await new Promise((r) => setTimeout(r, 50)) + expect(tracking.activeConcurrent).toBe(2) + expect(tracking.resolvers).toHaveLength(2) + + tracking.resolvers.shift()() + tracking.resolvers.shift()() + await new Promise((r) => setTimeout(r, 50)) + expect(tracking.activeConcurrent).toBe(2) + + while (tracking.resolvers.length) tracking.resolvers.shift()() + await new Promise((r) => setTimeout(r, 50)) + while (tracking.resolvers.length) tracking.resolvers.shift()() + + await Promise.all(promises) + expect(tracking.peakConcurrent).toBe(2) + }) + + it("allows unbounded parallelism when semaphore is null", async () => { + scanner.semaphore = null + scanner.retryConfig = noRetry + const tracking = mockScanWithConcurrencyTracking() + + const promises = [] + for (let i = 0; i < 5; i++) { + promises.push(scanner.scanAttachmentsFile({ data: { target, keys } })) + } + + await new Promise((r) => setTimeout(r, 50)) + expect(tracking.activeConcurrent).toBe(5) + expect(tracking.peakConcurrent).toBe(5) + + while (tracking.resolvers.length) tracking.resolvers.shift()() + await Promise.all(promises) + }) + + it("holds semaphore slot during scan (slot not released until complete)", async () => { + scanner.semaphore = new Semaphore(1) + scanner.retryConfig = noRetry + const tracking = mockScanWithConcurrencyTracking() + + let scan2Started = false + const p1 = scanner.scanAttachmentsFile({ data: { target, keys } }) + const p2 = scanner + .scanAttachmentsFile({ data: { target, keys } }) + .then(() => { + scan2Started = true + }) + + await new Promise((r) => setTimeout(r, 30)) + expect(tracking.resolvers).toHaveLength(1) + + tracking.resolvers.shift()() + await new Promise((r) => setTimeout(r, 30)) + expect(tracking.resolvers).toHaveLength(1) + tracking.resolvers.shift()() + + await Promise.all([p1, p2]) + expect(scan2Started).toBe(true) + }) +}) diff --git a/tests/utils/testUtils.js b/tests/utils/testUtils.js index 81e71c2f..2022f241 100644 --- a/tests/utils/testUtils.js +++ b/tests/utils/testUtils.js @@ -30,7 +30,9 @@ async function waitForScanStatus(status, attachmentID) { .where.some((e) => e.val && e.val === attachmentID)) || (req.query.UPDATE.where && req.query.UPDATE.where.some( - (e) => e.val && e.val === attachmentID, + (e) => + (e.val && e.val === attachmentID) || + (e.xpr && e.xpr.some((e) => e.val && e.val === attachmentID)), ))) ) { // Store the latest status for timeout reporting @@ -44,9 +46,12 @@ async function waitForScanStatus(status, attachmentID) { } db.after("*", handler) }), - delay(40000).then(() => { + delay(40000).then(async () => { + const { messagesAmount } = await SELECT.one + .from("cds.outbox.Messages") + .columns("count(1) as messagesAmount") throw new Error( - `Timeout waiting for attachment ${attachmentID || ""} to reach status: ${status}, last known status: ${latestStatus}`, + `Timeout waiting for attachment ${attachmentID || ""} to reach status: ${status}, last known status: ${latestStatus}. ${messagesAmount} messages in outbox.`, ) }), ]) @@ -72,9 +77,12 @@ async function waitForDeletion(attachmentID) { } AttachmentsSrv.on("DeleteAttachment", handler) }), - delay(30000).then(() => { + delay(30000).then(async () => { + const { messagesAmount } = await SELECT.one + .from("cds.outbox.Messages") + .columns("count(1) as messagesAmount") throw new Error( - `Timeout waiting for deletion of attachment ID: ${attachmentID}`, + `Timeout waiting for deletion of attachment ID: ${attachmentID}. ${messagesAmount} messages in outbox.`, ) }), ]) @@ -103,9 +111,12 @@ async function waitForMalwareDeletion(attachmentID) { } AttachmentsSrv.on("DeleteInfectedAttachment", handler) }), - delay(30000).then(() => { + delay(30000).then(async () => { + const { messagesAmount } = await SELECT.one + .from("cds.outbox.Messages") + .columns("count(1) as messagesAmount") throw new Error( - `Timeout waiting for malware deletion of attachment ID: ${attachmentID}`, + `Timeout waiting for malware deletion of attachment ID: ${attachmentID}. ${messagesAmount} messages in outbox.`, ) }), ])