diff --git a/packages/sdk/createKarmaConfig.ts b/packages/sdk/createKarmaConfig.ts index e455504467..bf322dbbfd 100644 --- a/packages/sdk/createKarmaConfig.ts +++ b/packages/sdk/createKarmaConfig.ts @@ -17,6 +17,7 @@ export function createKarmaConfig(testPaths: string[]): ReturnType/src/_jest/createSignatureValidationWorker.ts", "^@/createSigningWorker$": "/src/_jest/createSigningWorker.ts", + "^@/createEncryptionWorker$": "/src/_jest/createEncryptionWorker.ts", "^@/(.*)$": "/src/_nodejs/$1", }, transform: { diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index 35efbadc6e..671ebc24de 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -36,6 +36,7 @@ const browserAliases: Alias[] = [ const WORKERS: Record = { 'SignatureValidationWorker': 'signature/SignatureValidationWorker', 'SigningWorker': 'signature/SigningWorker', + 'EncryptionWorker': 'encryption/EncryptionWorker', } export default defineConfig([ diff --git a/packages/sdk/src/_browser/createEncryptionWorker.ts b/packages/sdk/src/_browser/createEncryptionWorker.ts new file mode 100644 index 0000000000..c24bed64a7 --- /dev/null +++ b/packages/sdk/src/_browser/createEncryptionWorker.ts @@ -0,0 +1,11 @@ +/** + * Browser-specific encryption worker factory. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('./workers/EncryptionWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_jest/createEncryptionWorker.ts b/packages/sdk/src/_jest/createEncryptionWorker.ts new file mode 100644 index 0000000000..bd9b168e0b --- /dev/null +++ b/packages/sdk/src/_jest/createEncryptionWorker.ts @@ -0,0 +1,12 @@ +/** + * Jest-specific encryption worker factory. + * Points to the built worker in dist/ for testing. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/EncryptionWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_karma/createEncryptionWorker.ts b/packages/sdk/src/_karma/createEncryptionWorker.ts new file mode 100644 index 0000000000..cba587f86b --- /dev/null +++ b/packages/sdk/src/_karma/createEncryptionWorker.ts @@ -0,0 +1,12 @@ +/** + * Karma-specific encryption worker factory. + * Points to the built worker in dist/ for browser testing. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/EncryptionWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_nodejs/createEncryptionWorker.ts b/packages/sdk/src/_nodejs/createEncryptionWorker.ts new file mode 100644 index 0000000000..39a105676f --- /dev/null +++ b/packages/sdk/src/_nodejs/createEncryptionWorker.ts @@ -0,0 +1,11 @@ +/** + * Node.js-specific encryption worker factory. + */ +import Worker from 'web-worker' + +export function createEncryptionWorker(): InstanceType { + return new Worker( + new URL('./workers/EncryptionWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/encryption/EncryptionService.ts b/packages/sdk/src/encryption/EncryptionService.ts new file mode 100644 index 0000000000..cea16da6f6 --- /dev/null +++ b/packages/sdk/src/encryption/EncryptionService.ts @@ -0,0 +1,130 @@ +/** + * Singleton encryption service using Web Worker. + * This offloads CPU-intensive AES encryption operations to a separate thread. + * Works in both browser and Node.js environments via platform-specific config. + * + * The worker is lazily initialized on first use and shared across all consumers. + */ +import { wrap, releaseProxy, transfer, type Remote } from 'comlink' +import { Lifecycle, scoped } from 'tsyringe' +import { EncryptedGroupKey } from '@streamr/trackerless-network' +import { createEncryptionWorker } from '@/createEncryptionWorker' +import type { EncryptionWorkerApi } from './EncryptionWorker' +import { DestroySignal } from '../DestroySignal' +import { StreamrClientError } from '../StreamrClientError' +import { GroupKey } from './GroupKey' + +@scoped(Lifecycle.ContainerScoped) +export class EncryptionService { + private worker: ReturnType | undefined + private workerApi: Remote | undefined + + constructor(destroySignal: DestroySignal) { + destroySignal.onDestroy.listen(() => this.destroy()) + } + + private getWorkerApi(): Remote { + if (this.workerApi === undefined) { + this.worker = createEncryptionWorker() + this.workerApi = wrap(this.worker) + } + return this.workerApi + } + + /** + * Encrypt data using AES-256-CTR. + * Note: The input data buffer is transferred to the worker and becomes unusable after this call. + */ + async encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Promise { + // Ensure we have plain Uint8Array instances for worker communication (not Buffer subclass) + const dataArray = new Uint8Array(data) + const keyArray = new Uint8Array(cipherKey) + const result = await this.getWorkerApi().encrypt( + transfer({ data: dataArray, cipherKey: keyArray }, [dataArray.buffer]) + ) + if (result.type === 'error') { + throw new Error(`AES encryption failed: ${result.message}`) + } + return result.data + } + + /** + * Encrypt the next group key using the current group key. + */ + async encryptNextGroupKey(currentKey: GroupKey, nextKey: GroupKey): Promise { + // Convert Buffer to Uint8Array for worker communication + const result = await this.getWorkerApi().encryptGroupKey({ + nextGroupKeyId: nextKey.id, + nextGroupKeyData: new Uint8Array(nextKey.data), + currentGroupKeyData: new Uint8Array(currentKey.data) + }) + if (result.type === 'error') { + throw new Error(`Group key encryption failed: ${result.message}`) + } + return { + id: result.id, + data: result.data + } + } + + /** + * Decrypt an encrypted group key using the current group key. + */ + async decryptNextGroupKey(currentKey: GroupKey, encryptedKey: EncryptedGroupKey): Promise { + // Convert Buffer to Uint8Array for worker communication + const result = await this.getWorkerApi().decryptGroupKey({ + encryptedGroupKeyId: encryptedKey.id, + encryptedGroupKeyData: new Uint8Array(encryptedKey.data), + currentGroupKeyData: new Uint8Array(currentKey.data) + }) + if (result.type === 'error') { + throw new Error(`Group key decryption failed: ${result.message}`) + } + return new GroupKey(result.id, Buffer.from(result.data)) + } + + /** + * Decrypt a stream message's content and optionally the new group key. + * This combines both operations for efficiency when processing messages. + * Note: The input content buffer is transferred to the worker and becomes unusable after this call. + */ + async decryptStreamMessage( + content: Uint8Array, + groupKey: GroupKey, + encryptedNewGroupKey?: EncryptedGroupKey + ): Promise<[Uint8Array, GroupKey?]> { + // Convert Buffer to Uint8Array for worker communication + const request = { + content, + groupKeyData: new Uint8Array(groupKey.data), + newGroupKey: encryptedNewGroupKey ? { + id: encryptedNewGroupKey.id, + data: new Uint8Array(encryptedNewGroupKey.data) + } : undefined + } + const result = await this.getWorkerApi().decryptStreamMessage( + transfer(request, [content.buffer]) + ) + if (result.type === 'error') { + throw new StreamrClientError(`AES decryption failed: ${result.message}`, 'DECRYPT_ERROR') + } + + let newGroupKey: GroupKey | undefined + if (result.newGroupKey) { + newGroupKey = new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data)) + } + + return [result.content, newGroupKey] + } + + destroy(): void { + if (this.workerApi !== undefined) { + this.workerApi[releaseProxy]() + this.workerApi = undefined + } + if (this.worker !== undefined) { + this.worker.terminate() + this.worker = undefined + } + } +} diff --git a/packages/sdk/src/encryption/EncryptionUtil.ts b/packages/sdk/src/encryption/EncryptionUtil.ts index b0bb87f5e2..47d983a51d 100644 --- a/packages/sdk/src/encryption/EncryptionUtil.ts +++ b/packages/sdk/src/encryption/EncryptionUtil.ts @@ -1,17 +1,19 @@ import { ml_kem1024 } from '@noble/post-quantum/ml-kem' import { randomBytes } from '@noble/post-quantum/utils' -import { StreamMessageAESEncrypted } from '../protocol/StreamMessage' -import { StreamrClientError } from '../StreamrClientError' -import { GroupKey } from './GroupKey' import { AsymmetricEncryptionType } from '@streamr/trackerless-network' -import { binaryToUtf8, createCipheriv, createDecipheriv, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils' - -export const INITIALIZATION_VECTOR_LENGTH = 16 +import { binaryToUtf8, getSubtle, privateDecrypt, publicEncrypt } from '@streamr/utils' +import { decryptWithAES, encryptWithAES } from './aesUtils' const INFO = Buffer.from('streamr-key-exchange') const KEM_CIPHER_LENGTH_BYTES = 1568 const KDF_SALT_LENGTH_BYTES = 64 +/** + * Asymmetric encryption utility class for RSA and ML-KEM (post-quantum) key exchange. + * + * For AES symmetric encryption of stream messages, use EncryptionService instead. + * This class only handles asymmetric encryption for key exchange operations. + */ // eslint-disable-next-line @typescript-eslint/no-extraneous-class export class EncryptionUtil { /** @@ -116,7 +118,7 @@ export class EncryptionUtil { const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt) // Encrypt plaintext with the AES wrapping key - const aesEncryptedPlaintext = this.encryptWithAES(plaintextBuffer, Buffer.from(wrappingAESKey)) + const aesEncryptedPlaintext = encryptWithAES(plaintextBuffer, wrappingAESKey) // Concatenate the deliverables into a binary package return Buffer.concat([kemCipher, kdfSalt, aesEncryptedPlaintext]) @@ -138,44 +140,6 @@ export class EncryptionUtil { const wrappingAESKey = await this.deriveAESWrapperKey(sharedSecret, kdfSalt) // Decrypt the aesEncryptedPlaintext - return this.decryptWithAES(aesEncryptedPlaintext, Buffer.from(wrappingAESKey)) - } - - /* - * Returns a hex string without the '0x' prefix. - */ - static encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array { - const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode - const cipher = createCipheriv('aes-256-ctr', cipherKey, iv) - return Buffer.concat([iv, cipher.update(data), cipher.final()]) - } - - /* - * 'ciphertext' must be a hex string (without '0x' prefix), 'groupKey' must be a GroupKey. Returns a Buffer. - */ - static decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Buffer { - const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH) - const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv) - return Buffer.concat([decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()]) - } - - static decryptStreamMessage(streamMessage: StreamMessageAESEncrypted, groupKey: GroupKey): [Uint8Array, GroupKey?] | never { - let content: Uint8Array - try { - content = this.decryptWithAES(streamMessage.content, groupKey.data) - } catch { - throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage) - } - - let newGroupKey: GroupKey | undefined = undefined - if (streamMessage.newGroupKey) { - try { - newGroupKey = groupKey.decryptNextGroupKey(streamMessage.newGroupKey) - } catch { - throw new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', streamMessage) - } - } - - return [content, newGroupKey] + return Buffer.from(decryptWithAES(aesEncryptedPlaintext, wrappingAESKey)) } } diff --git a/packages/sdk/src/encryption/EncryptionWorker.ts b/packages/sdk/src/encryption/EncryptionWorker.ts new file mode 100644 index 0000000000..23a0fb445d --- /dev/null +++ b/packages/sdk/src/encryption/EncryptionWorker.ts @@ -0,0 +1,86 @@ +/** + * Web Worker for AES encryption operations. + * Offloads CPU-intensive cryptographic operations to a separate thread. + */ +import { expose, transfer } from 'comlink' +import { encryptWithAES } from './aesUtils' +import { + encryptNextGroupKey, + decryptNextGroupKey, + decryptStreamMessageContent, + AESEncryptRequest, + EncryptGroupKeyRequest, + DecryptGroupKeyRequest, + DecryptStreamMessageRequest, + AESEncryptResult, + EncryptGroupKeyResult, + DecryptGroupKeyResult, + DecryptStreamMessageResult +} from './encryptionUtils' + +const workerApi = { + encrypt: async (request: AESEncryptRequest): Promise => { + try { + const data = encryptWithAES(request.data, request.cipherKey) + return transfer({ type: 'success', data }, [data.buffer]) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + encryptGroupKey: async (request: EncryptGroupKeyRequest): Promise => { + try { + const result = encryptNextGroupKey( + request.nextGroupKeyId, + request.nextGroupKeyData, + request.currentGroupKeyData + ) + return transfer( + { type: 'success', id: result.id, data: result.data }, + [result.data.buffer] + ) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + decryptGroupKey: async (request: DecryptGroupKeyRequest): Promise => { + try { + const result = decryptNextGroupKey( + request.encryptedGroupKeyId, + request.encryptedGroupKeyData, + request.currentGroupKeyData + ) + return transfer( + { type: 'success', id: result.id, data: result.data }, + [result.data.buffer] + ) + } catch (err) { + return { type: 'error', message: String(err) } + } + }, + + decryptStreamMessage: async (request: DecryptStreamMessageRequest): Promise => { + try { + const result = decryptStreamMessageContent( + request.content, + request.groupKeyData, + request.newGroupKey + ) + const transferables: ArrayBuffer[] = [result.content.buffer as ArrayBuffer] + if (result.newGroupKey) { + transferables.push(result.newGroupKey.data.buffer as ArrayBuffer) + } + return transfer( + { type: 'success', content: result.content, newGroupKey: result.newGroupKey }, + transferables + ) + } catch (err) { + return { type: 'error', message: String(err) } + } + } +} + +export type EncryptionWorkerApi = typeof workerApi + +expose(workerApi) diff --git a/packages/sdk/src/encryption/GroupKey.ts b/packages/sdk/src/encryption/GroupKey.ts index 9dda22dbd2..c6d534e874 100644 --- a/packages/sdk/src/encryption/GroupKey.ts +++ b/packages/sdk/src/encryption/GroupKey.ts @@ -1,7 +1,6 @@ -import { EncryptedGroupKey } from '@streamr/trackerless-network' -import { uuid } from '../utils/uuid' -import { EncryptionUtil } from './EncryptionUtil' import { randomBytes } from '@noble/post-quantum/utils' +import { uuid } from '../utils/uuid' + export class GroupKeyError extends Error { public groupKey?: GroupKey @@ -15,6 +14,9 @@ export class GroupKeyError extends Error { /** * GroupKeys are AES cipher keys, which are used to encrypt/decrypt StreamMessages (when encryptionType is AES). * Each group key contains 256 random bits of key data and an UUID. + * + * For encryption/decryption of group keys, use EncryptionService.encryptNextGroupKey() + * and EncryptionService.decryptNextGroupKey(). */ export class GroupKey { @@ -67,21 +69,4 @@ export class GroupKey { const keyBytes = randomBytes(32) return new GroupKey(id, Buffer.from(keyBytes)) } - - /** @internal */ - encryptNextGroupKey(nextGroupKey: GroupKey): EncryptedGroupKey { - return { - id: nextGroupKey.id, - data: EncryptionUtil.encryptWithAES(nextGroupKey.data, this.data) - } - } - - /** @internal */ - decryptNextGroupKey(nextGroupKey: EncryptedGroupKey): GroupKey { - return new GroupKey( - nextGroupKey.id, - EncryptionUtil.decryptWithAES(nextGroupKey.data, this.data) - ) - } - } diff --git a/packages/sdk/src/encryption/aesUtils.ts b/packages/sdk/src/encryption/aesUtils.ts new file mode 100644 index 0000000000..14744019ed --- /dev/null +++ b/packages/sdk/src/encryption/aesUtils.ts @@ -0,0 +1,42 @@ +/** + * Low-level AES-256-CTR encryption utilities. + * Shared between EncryptionUtil (for ML-KEM key wrapping) and encryptionUtils (for stream message encryption). + */ +import { randomBytes } from '@noble/post-quantum/utils' +import { createCipheriv, createDecipheriv } from '@streamr/utils' + +export const INITIALIZATION_VECTOR_LENGTH = 16 + +/** + * Concatenate multiple Uint8Arrays into a single Uint8Array. + */ +function concatBytes(...arrays: Uint8Array[]): Uint8Array { + const totalLength = arrays.reduce((sum, arr) => sum + arr.length, 0) + const result = new Uint8Array(totalLength) + let offset = 0 + for (const arr of arrays) { + result.set(arr, offset) + offset += arr.length + } + return result +} + +/** + * Encrypt data using AES-256-CTR. + * Returns IV prepended to ciphertext. + */ +export function encryptWithAES(data: Uint8Array, cipherKey: Uint8Array): Uint8Array { + const iv = randomBytes(INITIALIZATION_VECTOR_LENGTH) // always need a fresh IV when using CTR mode + const cipher = createCipheriv('aes-256-ctr', cipherKey, iv) + return concatBytes(iv, cipher.update(data), cipher.final()) +} + +/** + * Decrypt AES-256-CTR encrypted data. + * Expects IV prepended to ciphertext. + */ +export function decryptWithAES(cipher: Uint8Array, cipherKey: Uint8Array): Uint8Array { + const iv = cipher.slice(0, INITIALIZATION_VECTOR_LENGTH) + const decipher = createDecipheriv('aes-256-ctr', cipherKey, iv) + return concatBytes(decipher.update(cipher.slice(INITIALIZATION_VECTOR_LENGTH)), decipher.final()) +} diff --git a/packages/sdk/src/encryption/decrypt.ts b/packages/sdk/src/encryption/decrypt.ts index 9103cae261..2b434e437b 100644 --- a/packages/sdk/src/encryption/decrypt.ts +++ b/packages/sdk/src/encryption/decrypt.ts @@ -1,8 +1,8 @@ import { EncryptionType } from '@streamr/trackerless-network' import { DestroySignal } from '../DestroySignal' -import { EncryptionUtil } from '../encryption/EncryptionUtil' import { GroupKey } from '../encryption/GroupKey' import { GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { StreamMessage, StreamMessageAESEncrypted } from '../protocol/StreamMessage' import { StreamrClientError } from '../StreamrClientError' @@ -12,6 +12,7 @@ import { StreamrClientError } from '../StreamrClientError' export const decrypt = async ( streamMessage: StreamMessageAESEncrypted, groupKeyManager: GroupKeyManager, + encryptionService: EncryptionService, destroySignal: DestroySignal, ): Promise => { if (destroySignal.isDestroyed()) { @@ -33,7 +34,22 @@ export const decrypt = async ( if (destroySignal.isDestroyed()) { return streamMessage } - const [content, newGroupKey] = EncryptionUtil.decryptStreamMessage(streamMessage, groupKey) + + let content: Uint8Array + let newGroupKey: GroupKey | undefined + try { + [content, newGroupKey] = await encryptionService.decryptStreamMessage( + streamMessage.content, + groupKey, + streamMessage.newGroupKey + ) + } catch (err) { + if (err instanceof StreamrClientError) { + throw new StreamrClientError(err.message, 'DECRYPT_ERROR', streamMessage) + } + throw new StreamrClientError('AES decryption failed', 'DECRYPT_ERROR', streamMessage) + } + if (newGroupKey !== undefined) { await groupKeyManager.addKeyToLocalStore(newGroupKey, streamMessage.getPublisherId()) } diff --git a/packages/sdk/src/encryption/encryptionUtils.ts b/packages/sdk/src/encryption/encryptionUtils.ts new file mode 100644 index 0000000000..5b8164a359 --- /dev/null +++ b/packages/sdk/src/encryption/encryptionUtils.ts @@ -0,0 +1,104 @@ +/** + * Higher-level encryption logic - shared between worker and main thread implementations. + * This file contains pure cryptographic functions without any network dependencies. + * + * For low-level AES operations, see aesUtils.ts + */ +import { decryptWithAES, encryptWithAES } from './aesUtils' + +/** + * Request types for worker communication + */ +export interface AESEncryptRequest { + data: Uint8Array + cipherKey: Uint8Array +} + +export interface EncryptGroupKeyRequest { + nextGroupKeyId: string + nextGroupKeyData: Uint8Array + currentGroupKeyData: Uint8Array +} + +export interface DecryptGroupKeyRequest { + encryptedGroupKeyId: string + encryptedGroupKeyData: Uint8Array + currentGroupKeyData: Uint8Array +} + +export interface DecryptStreamMessageRequest { + content: Uint8Array + groupKeyData: Uint8Array + newGroupKey?: { + id: string + data: Uint8Array + } +} + +/** + * Result types for worker communication + */ +export type AESEncryptResult = + | { type: 'success', data: Uint8Array } + | { type: 'error', message: string } + +export type EncryptGroupKeyResult = + | { type: 'success', id: string, data: Uint8Array } + | { type: 'error', message: string } + +export type DecryptGroupKeyResult = + | { type: 'success', id: string, data: Uint8Array } + | { type: 'error', message: string } + +export type DecryptStreamMessageResult = + | { type: 'success', content: Uint8Array, newGroupKey?: { id: string, data: Uint8Array } } + | { type: 'error', message: string } + +/** + * Encrypt a next group key using the current group key. + */ +export function encryptNextGroupKey( + nextGroupKeyId: string, + nextGroupKeyData: Uint8Array, + currentGroupKeyData: Uint8Array +): { id: string, data: Uint8Array } { + return { + id: nextGroupKeyId, + data: encryptWithAES(nextGroupKeyData, currentGroupKeyData) + } +} + +/** + * Decrypt an encrypted group key using the current group key. + */ +export function decryptNextGroupKey( + encryptedGroupKeyId: string, + encryptedGroupKeyData: Uint8Array, + currentGroupKeyData: Uint8Array +): { id: string, data: Uint8Array } { + return { + id: encryptedGroupKeyId, + data: decryptWithAES(encryptedGroupKeyData, currentGroupKeyData) + } +} + +/** + * Decrypt a stream message content and optionally the new group key. + */ +export function decryptStreamMessageContent( + content: Uint8Array, + groupKeyData: Uint8Array, + newGroupKey?: { id: string, data: Uint8Array } +): { content: Uint8Array, newGroupKey?: { id: string, data: Uint8Array } } { + const decryptedContent = decryptWithAES(content, groupKeyData) + + let decryptedNewGroupKey: { id: string, data: Uint8Array } | undefined + if (newGroupKey) { + decryptedNewGroupKey = decryptNextGroupKey(newGroupKey.id, newGroupKey.data, groupKeyData) + } + + return { + content: decryptedContent, + newGroupKey: decryptedNewGroupKey + } +} diff --git a/packages/sdk/src/protocol/StreamMessageTranslator.ts b/packages/sdk/src/protocol/StreamMessageTranslator.ts index 1b3350fab2..12e72800dd 100644 --- a/packages/sdk/src/protocol/StreamMessageTranslator.ts +++ b/packages/sdk/src/protocol/StreamMessageTranslator.ts @@ -86,10 +86,15 @@ export class StreamMessageTranslator { let groupKeyId: string | undefined = undefined if (msg.body.oneofKind === 'contentMessage') { messageType = StreamMessageType.MESSAGE - content = msg.body.contentMessage.content + content = new Uint8Array(msg.body.contentMessage.content) contentType = msg.body.contentMessage.contentType encryptionType = msg.body.contentMessage.encryptionType - newGroupKey = msg.body.contentMessage.newGroupKey + if (msg.body.contentMessage.newGroupKey) { + newGroupKey = { + id: msg.body.contentMessage.newGroupKey.id, + data: new Uint8Array(msg.body.contentMessage.newGroupKey.data) + } + } groupKeyId = msg.body.contentMessage.groupKeyId } else if (msg.body.oneofKind === 'groupKeyRequest') { messageType = StreamMessageType.GROUP_KEY_REQUEST @@ -128,7 +133,7 @@ export class StreamMessageTranslator { messageType, content, contentType, - signature: msg.signature, + signature: new Uint8Array(msg.signature), signatureType: msg.signatureType, encryptionType, groupKeyId, diff --git a/packages/sdk/src/publish/MessageFactory.ts b/packages/sdk/src/publish/MessageFactory.ts index d81c83e90c..db6b17a79c 100644 --- a/packages/sdk/src/publish/MessageFactory.ts +++ b/packages/sdk/src/publish/MessageFactory.ts @@ -5,7 +5,7 @@ import { Identity } from '../identity/Identity' import { getPartitionCount } from '../StreamMetadata' import { StreamrClientError } from '../StreamrClientError' import { StreamRegistry } from '../contracts/StreamRegistry' -import { EncryptionUtil } from '../encryption/EncryptionUtil' +import { EncryptionService } from '../encryption/EncryptionService' import { MessageID } from '../protocol/MessageID' import { MessageRef } from '../protocol/MessageRef' @@ -30,6 +30,7 @@ export interface MessageFactoryOptions { groupKeyQueue: GroupKeyQueue signatureValidator: SignatureValidator messageSigner: MessageSigner + encryptionService: EncryptionService config: Pick } @@ -45,6 +46,7 @@ export class MessageFactory { private readonly groupKeyQueue: GroupKeyQueue private readonly signatureValidator: SignatureValidator private readonly messageSigner: MessageSigner + private readonly encryptionService: EncryptionService private readonly config: Pick private firstMessage = true @@ -55,6 +57,7 @@ export class MessageFactory { this.groupKeyQueue = opts.groupKeyQueue this.signatureValidator = opts.signatureValidator this.messageSigner = opts.messageSigner + this.encryptionService = opts.encryptionService this.config = opts.config this.defaultMessageChainIds = createLazyMap({ valueFactory: async () => { @@ -131,10 +134,10 @@ export class MessageFactory { } if (encryptionType === EncryptionType.AES) { const keySequence = await this.groupKeyQueue.useGroupKey() - rawContent = EncryptionUtil.encryptWithAES(rawContent, keySequence.current.data) + rawContent = await this.encryptionService.encryptWithAES(rawContent, keySequence.current.data) groupKeyId = keySequence.current.id if (keySequence.next !== undefined) { - newGroupKey = keySequence.current.encryptNextGroupKey(keySequence.next) + newGroupKey = await this.encryptionService.encryptNextGroupKey(keySequence.current, keySequence.next) } } diff --git a/packages/sdk/src/publish/Publisher.ts b/packages/sdk/src/publish/Publisher.ts index 89b567b620..5d78a748dc 100644 --- a/packages/sdk/src/publish/Publisher.ts +++ b/packages/sdk/src/publish/Publisher.ts @@ -8,6 +8,7 @@ import { StreamIDBuilder } from '../StreamIDBuilder' import { StreamrClientError } from '../StreamrClientError' import { StreamRegistry } from '../contracts/StreamRegistry' import { getExplicitKey, GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { StreamMessage } from '../protocol/StreamMessage' import { MessageSigner } from '../signature/MessageSigner' import { SignatureValidator } from '../signature/SignatureValidator' @@ -54,6 +55,7 @@ export class Publisher { private readonly identity: Identity private readonly signatureValidator: SignatureValidator private readonly messageSigner: MessageSigner + private readonly encryptionService: EncryptionService private readonly config: StrictStreamrClientConfig constructor( @@ -64,6 +66,7 @@ export class Publisher { @inject(IdentityInjectionToken) identity: Identity, signatureValidator: SignatureValidator, messageSigner: MessageSigner, + encryptionService: EncryptionService, @inject(ConfigInjectionToken) config: StrictStreamrClientConfig, ) { this.node = node @@ -72,6 +75,7 @@ export class Publisher { this.identity = identity this.signatureValidator = signatureValidator this.messageSigner = messageSigner + this.encryptionService = encryptionService this.config = config this.messageFactories = createLazyMap({ valueFactory: async (streamId) => { @@ -142,6 +146,7 @@ export class Publisher { groupKeyQueue: await this.groupKeyQueues.get(streamId), signatureValidator: this.signatureValidator, messageSigner: this.messageSigner, + encryptionService: this.encryptionService, config: this.config, }) } diff --git a/packages/sdk/src/subscribe/MessagePipelineFactory.ts b/packages/sdk/src/subscribe/MessagePipelineFactory.ts index c5c1283660..5c8c1c9d49 100644 --- a/packages/sdk/src/subscribe/MessagePipelineFactory.ts +++ b/packages/sdk/src/subscribe/MessagePipelineFactory.ts @@ -6,6 +6,7 @@ import { DestroySignal } from '../DestroySignal' import { StreamRegistry } from '../contracts/StreamRegistry' import { StreamStorageRegistry } from '../contracts/StreamStorageRegistry' import { GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { StreamMessage } from '../protocol/StreamMessage' import { SignatureValidator } from '../signature/SignatureValidator' import { LoggerFactory } from '../utils/LoggerFactory' @@ -17,6 +18,7 @@ import { Tokens } from '../tokens' type MessagePipelineFactoryOptions = MarkOptional StreamRegistry)) streamRegistry: StreamRegistry, signatureValidator: SignatureValidator, @inject(delay(() => GroupKeyManager)) groupKeyManager: GroupKeyManager, + encryptionService: EncryptionService, @inject(ConfigInjectionToken) config: MessagePipelineOptions['config'], destroySignal: DestroySignal, loggerFactory: LoggerFactory @@ -52,6 +56,7 @@ export class MessagePipelineFactory { this.streamRegistry = streamRegistry this.signatureValidator = signatureValidator this.groupKeyManager = groupKeyManager + this.encryptionService = encryptionService this.config = config this.destroySignal = destroySignal this.loggerFactory = loggerFactory @@ -65,6 +70,7 @@ export class MessagePipelineFactory { streamRegistry: this.streamRegistry, signatureValidator: this.signatureValidator, groupKeyManager: this.groupKeyManager, + encryptionService: this.encryptionService, config: opts.config ?? this.config, destroySignal: this.destroySignal, loggerFactory: this.loggerFactory diff --git a/packages/sdk/src/subscribe/messagePipeline.ts b/packages/sdk/src/subscribe/messagePipeline.ts index 34ec073714..a478625944 100644 --- a/packages/sdk/src/subscribe/messagePipeline.ts +++ b/packages/sdk/src/subscribe/messagePipeline.ts @@ -6,6 +6,7 @@ import type { StrictStreamrClientConfig } from '../ConfigTypes' import { DestroySignal } from '../DestroySignal' import { StreamRegistry } from '../contracts/StreamRegistry' import { GroupKeyManager } from '../encryption/GroupKeyManager' +import { EncryptionService } from '../encryption/EncryptionService' import { decrypt } from '../encryption/decrypt' import { StreamMessage } from '../protocol/StreamMessage' @@ -28,6 +29,7 @@ export interface MessagePipelineOptions { streamRegistry: StreamRegistry signatureValidator: SignatureValidator groupKeyManager: GroupKeyManager + encryptionService: EncryptionService // eslint-disable-next-line max-len config: Pick destroySignal: DestroySignal @@ -71,7 +73,7 @@ export const createMessagePipeline = (opts: MessagePipelineOptions): PushPipelin let decrypted if (StreamMessage.isAESEncrypted(msg)) { try { - decrypted = await decrypt(msg, opts.groupKeyManager, opts.destroySignal) + decrypted = await decrypt(msg, opts.groupKeyManager, opts.encryptionService, opts.destroySignal) } catch (err) { // TODO log this in onError? if we want to log all errors? logger.debug('Failed to decrypt', { messageId: msg.messageId, err }) diff --git a/packages/sdk/test/integration/Resends.test.ts b/packages/sdk/test/integration/Resends.test.ts index 301433a375..98cb477886 100644 --- a/packages/sdk/test/integration/Resends.test.ts +++ b/packages/sdk/test/integration/Resends.test.ts @@ -8,7 +8,7 @@ import { StreamPermission } from '../../src/permission' import { MessageFactory } from '../../src/publish/MessageFactory' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createMockEncryptionService, createStreamRegistry } from '../test-utils/utils' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -45,6 +45,7 @@ describe('Resends', () => { groupKeyQueue: await createGroupKeyQueue(identity, groupKey), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig() }) // store the encryption key publisher's local group key store diff --git a/packages/sdk/test/integration/gap-fill.test.ts b/packages/sdk/test/integration/gap-fill.test.ts index d577ce120b..4b6f37cb47 100644 --- a/packages/sdk/test/integration/gap-fill.test.ts +++ b/packages/sdk/test/integration/gap-fill.test.ts @@ -6,7 +6,14 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { StreamMessage } from '../../src/protocol/StreamMessage' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry, createTestStream, startFailingStorageNode } from '../test-utils/utils' +import { + createGroupKeyQueue, + createMessageSigner, + createMockEncryptionService, + createStreamRegistry, + createTestStream, + startFailingStorageNode +} from '../test-utils/utils' import { Stream } from './../../src/Stream' import { MessageFactory } from './../../src/publish/MessageFactory' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' @@ -45,6 +52,7 @@ describe('gap fill', () => { groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig() }) }) diff --git a/packages/sdk/test/integration/parallel-key-exchange.test.ts b/packages/sdk/test/integration/parallel-key-exchange.test.ts index ff6f872ed7..73ec30e7da 100644 --- a/packages/sdk/test/integration/parallel-key-exchange.test.ts +++ b/packages/sdk/test/integration/parallel-key-exchange.test.ts @@ -10,7 +10,7 @@ import { StreamPermission } from '../../src/permission' import { StreamMessageType } from '../../src/protocol/StreamMessage' import { MessageFactory } from '../../src/publish/MessageFactory' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createMockEncryptionService, createStreamRegistry } from '../test-utils/utils' import { FakeEnvironment } from './../test-utils/fake/FakeEnvironment' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -73,6 +73,7 @@ describe('parallel key exchange', () => { groupKeyQueue: await createGroupKeyQueue(identity, publisher.groupKey), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig() }) for (let i = 0; i < MESSAGE_COUNT_PER_PUBLISHER; i++) { diff --git a/packages/sdk/test/integration/update-encryption-key.test.ts b/packages/sdk/test/integration/update-encryption-key.test.ts index 16554a6ea3..3ceac23ccf 100644 --- a/packages/sdk/test/integration/update-encryption-key.test.ts +++ b/packages/sdk/test/integration/update-encryption-key.test.ts @@ -24,7 +24,7 @@ describe('update encryption key', () => { publisher = environment.createClient() subscriber = environment.createClient({ encryption: { - keyRequestTimeout: 1000 + keyRequestTimeout: 5000 } }) const stream = await publisher.createStream('/path') diff --git a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts index 3d530952c2..5aa794d00e 100644 --- a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts +++ b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts @@ -22,6 +22,7 @@ import { FakeStreamRegistry } from './FakeStreamRegistry' import { FakeStreamStorageRegistry } from './FakeStreamStorageRegistry' import { DestroySignal } from '../../../src/DestroySignal' import { SigningService } from '../../../src/signature/SigningService' +import { EncryptionService } from '../../../src/encryption/EncryptionService' const DEFAULT_CLIENT_OPTIONS: StreamrClientConfig = { encryption: { @@ -37,6 +38,7 @@ export class FakeEnvironment { private dependencyContainer: DependencyContainer private destroySignal: DestroySignal private signingService: SigningService + private encryptionService: EncryptionService constructor() { this.network = new FakeNetwork() @@ -45,6 +47,7 @@ export class FakeEnvironment { this.dependencyContainer = container.createChildContainer() this.destroySignal = new DestroySignal() this.signingService = new SigningService(this.destroySignal) + this.encryptionService = new EncryptionService(this.destroySignal) const loggerFactory = { createLogger: () => this.logger } @@ -58,6 +61,7 @@ export class FakeEnvironment { this.dependencyContainer.register(StorageNodeRegistry, FakeStorageNodeRegistry as any) this.dependencyContainer.register(OperatorRegistry, FakeOperatorRegistry as any) this.dependencyContainer.register(SigningService, { useValue: this.signingService }) + this.dependencyContainer.register(EncryptionService, { useValue: this.encryptionService }) } createClient(opts?: StreamrClientConfig): StreamrClient { diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index 8ea62c0fc2..7cd0f840ca 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -44,6 +44,9 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { LocalGroupKeyStore } from '../../src/encryption/LocalGroupKeyStore' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' +import { EncryptionService } from '../../src/encryption/EncryptionService' +import { encryptWithAES } from '../../src/encryption/aesUtils' +import { encryptNextGroupKey, decryptNextGroupKey, decryptStreamMessageContent } from '../../src/encryption/encryptionUtils' import { StreamrClientEventEmitter } from '../../src/events' import { StreamMessage } from '../../src/protocol/StreamMessage' import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' @@ -80,6 +83,42 @@ export function createMessageSigner(identity: Identity): MessageSigner { return new MessageSigner(identity, createMockSigningService()) } +/** + * Creates a mock EncryptionService that performs encryption synchronously on the main thread. + * Use this in tests instead of the real EncryptionService which spawns a worker. + */ +export function createMockEncryptionService(): EncryptionService { + return { + encryptWithAES: async (data: Uint8Array, cipherKey: Uint8Array) => { + return encryptWithAES(data, cipherKey) + }, + encryptNextGroupKey: async (currentKey: GroupKey, nextKey: GroupKey) => { + return encryptNextGroupKey(nextKey.id, nextKey.data, currentKey.data) + }, + decryptNextGroupKey: async (currentKey: GroupKey, encryptedKey: { id: string, data: Uint8Array }) => { + const result = decryptNextGroupKey(encryptedKey.id, encryptedKey.data, currentKey.data) + return new GroupKey(result.id, Buffer.from(result.data)) + }, + decryptStreamMessage: async ( + content: Uint8Array, + groupKey: GroupKey, + encryptedNewGroupKey?: { id: string, data: Uint8Array } + ) => { + const result = decryptStreamMessageContent( + content, + groupKey.data, + encryptedNewGroupKey + ) + let newGroupKey: GroupKey | undefined + if (result.newGroupKey) { + newGroupKey = new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data)) + } + return [result.content, newGroupKey] as [Uint8Array, GroupKey?] + }, + destroy: () => {} + } as unknown as EncryptionService +} + export function mockLoggerFactory(clientId?: string): LoggerFactory { return new LoggerFactory({ id: clientId ?? counterId('TestCtx'), @@ -173,7 +212,8 @@ export const createMockMessage = async ( }), groupKeyQueue: await createGroupKeyQueue(identity, opts.encryptionKey, opts.nextEncryptionKey), signatureValidator: mock(), - messageSigner: createMessageSigner(identity) + messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService() }) const DEFAULT_CONTENT = {} const plainContent = opts.content ?? DEFAULT_CONTENT diff --git a/packages/sdk/test/unit/Decrypt.test.ts b/packages/sdk/test/unit/Decrypt.test.ts index 502c68d517..dc6af3c14d 100644 --- a/packages/sdk/test/unit/Decrypt.test.ts +++ b/packages/sdk/test/unit/Decrypt.test.ts @@ -6,7 +6,7 @@ import { StreamrClientError } from '../../src/StreamrClientError' import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { decrypt } from '../../src/encryption/decrypt' -import { createGroupKeyManager, createMockMessage } from '../test-utils/utils' +import { createGroupKeyManager, createMockEncryptionService, createMockMessage } from '../test-utils/utils' import { StreamMessage, StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage' import { EncryptionType } from '@streamr/trackerless-network' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' @@ -26,7 +26,7 @@ describe('Decrypt', () => { encryptionKey: groupKey, content: unencryptedContent }) as StreamMessageAESEncrypted - const decryptedMessage = await decrypt(encryptedMessage, groupKeyManager, destroySignal) + const decryptedMessage = await decrypt(encryptedMessage, groupKeyManager, createMockEncryptionService(), destroySignal) expect(decryptedMessage).toEqual(new StreamMessage({ ...encryptedMessage, encryptionType: EncryptionType.NONE, @@ -53,6 +53,7 @@ describe('Decrypt', () => { return decrypt( msg as StreamMessageAESEncrypted, groupKeyManager, + createMockEncryptionService(), destroySignal) }).rejects.toThrowStreamrClientError( new StreamrClientError(`Could not get encryption key ${groupKey.id}`, 'DECRYPT_ERROR', msg) diff --git a/packages/sdk/test/unit/EncryptionService.test.ts b/packages/sdk/test/unit/EncryptionService.test.ts new file mode 100644 index 0000000000..ba7d99dd71 --- /dev/null +++ b/packages/sdk/test/unit/EncryptionService.test.ts @@ -0,0 +1,214 @@ +import { utf8ToBinary } from '@streamr/utils' +import { EncryptionService } from '../../src/encryption/EncryptionService' +import { GroupKey } from '../../src/encryption/GroupKey' +import { DestroySignal } from '../../src/DestroySignal' +import { StreamrClientError } from '../../src/StreamrClientError' + +describe('EncryptionService', () => { + + let encryptionService: EncryptionService + let destroySignal: DestroySignal + + beforeEach(() => { + destroySignal = new DestroySignal() + encryptionService = new EncryptionService(destroySignal) + }) + + afterEach(() => { + encryptionService.destroy() + }) + + describe('encryptWithAES', () => { + it('encrypts and decrypts data correctly', async () => { + const plaintextOriginal = utf8ToBinary('hello world') + const key = GroupKey.generate() + + // Make a copy since the original will be transferred + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + key.data + ) + + expect(ciphertext).not.toStrictEqual(plaintextOriginal) + expect(ciphertext.length).toBeGreaterThan(plaintextOriginal.length) + + // Use decryptStreamMessage to verify encryption worked correctly + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) + + expect(decrypted).toStrictEqual(plaintextOriginal) + }) + + it('produces different ciphertexts for same plaintext (due to random IV)', async () => { + const plaintext = utf8ToBinary('hello world') + const key = GroupKey.generate() + + const cipher1 = await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + const cipher2 = await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + + expect(cipher1).not.toStrictEqual(cipher2) + }) + + it('handles empty data', async () => { + const plaintextOriginal = new Uint8Array(0) + const key = GroupKey.generate() + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + key.data + ) + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) + + expect(decrypted).toStrictEqual(plaintextOriginal) + }) + + it('handles large data', async () => { + const plaintextOriginal = new Uint8Array(100000).fill(42) + const key = GroupKey.generate() + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + key.data + ) + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) + + expect(decrypted).toStrictEqual(plaintextOriginal) + }) + }) + + describe('encryptNextGroupKey / decryptNextGroupKey', () => { + it('encrypts and decrypts group key correctly', async () => { + const currentKey = GroupKey.generate() + const nextKey = GroupKey.generate() + + const encrypted = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + + expect(encrypted.id).toBe(nextKey.id) + expect(encrypted.data).not.toStrictEqual(nextKey.data) + + const decrypted = await encryptionService.decryptNextGroupKey(currentKey, encrypted) + + expect(decrypted.id).toBe(nextKey.id) + expect(decrypted.data).toStrictEqual(nextKey.data) + }) + + it('produces different ciphertexts for same key (due to random IV)', async () => { + const currentKey = GroupKey.generate() + const nextKey = GroupKey.generate() + + const encrypted1 = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + const encrypted2 = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + + expect(encrypted1.data).not.toStrictEqual(encrypted2.data) + }) + }) + + describe('decryptStreamMessage', () => { + it('decrypts content without new group key', async () => { + const groupKey = GroupKey.generate() + const plaintextOriginal = utf8ToBinary('{"message": "hello"}') + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + groupKey.data + ) + + const [decryptedContent, newGroupKey] = await encryptionService.decryptStreamMessage( + ciphertext, + groupKey + ) + + expect(decryptedContent).toStrictEqual(plaintextOriginal) + expect(newGroupKey).toBeUndefined() + }) + + it('decrypts content with new group key', async () => { + const currentKey = GroupKey.generate() + const nextKey = GroupKey.generate() + const plaintextOriginal = utf8ToBinary('{"message": "hello"}') + + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintextOriginal), + currentKey.data + ) + const encryptedNextKey = await encryptionService.encryptNextGroupKey(currentKey, nextKey) + + const [decryptedContent, decryptedNewGroupKey] = await encryptionService.decryptStreamMessage( + ciphertext, + currentKey, + encryptedNextKey + ) + + expect(decryptedContent).toStrictEqual(plaintextOriginal) + expect(decryptedNewGroupKey).toBeDefined() + expect(decryptedNewGroupKey!.id).toBe(nextKey.id) + expect(decryptedNewGroupKey!.data).toStrictEqual(nextKey.data) + }) + + it('throws StreamrClientError on invalid encrypted content', async () => { + const groupKey = GroupKey.generate() + // Content that's too short to contain valid IV + ciphertext + const invalidContent = new Uint8Array([1, 2, 3]) + + await expect(encryptionService.decryptStreamMessage(invalidContent, groupKey)) + .rejects + .toThrow(StreamrClientError) + }) + }) + + describe('lifecycle', () => { + it('cleans up worker on destroy', async () => { + const plaintext = utf8ToBinary('test') + const key = GroupKey.generate() + + // First encrypt to ensure worker is created + await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + + // Destroy should not throw + expect(() => encryptionService.destroy()).not.toThrow() + + // Calling destroy again should be safe (idempotent) + expect(() => encryptionService.destroy()).not.toThrow() + }) + + it('cleans up via DestroySignal', async () => { + const plaintext = utf8ToBinary('test') + const key = GroupKey.generate() + + await encryptionService.encryptWithAES(Uint8Array.from(plaintext), key.data) + + // Trigger destroy via signal - should not throw + await destroySignal.destroy() + }) + + it('lazily initializes worker on first use', async () => { + // Create a new service but don't use it yet + const signal = new DestroySignal() + const service = new EncryptionService(signal) + + // Destroy without using - should not throw + expect(() => service.destroy()).not.toThrow() + }) + }) + + describe('sequential operations', () => { + it('can perform multiple operations sequentially', async () => { + const key = GroupKey.generate() + const results: Uint8Array[] = [] + + for (let i = 0; i < 5; i++) { + const plaintext = utf8ToBinary(`message ${i}`) + const ciphertext = await encryptionService.encryptWithAES( + Uint8Array.from(plaintext), + key.data + ) + const [decrypted] = await encryptionService.decryptStreamMessage(ciphertext, key) + results.push(decrypted) + } + + expect(results).toHaveLength(5) + for (let i = 0; i < 5; i++) { + expect(results[i]).toStrictEqual(utf8ToBinary(`message ${i}`)) + } + }) + }) +}) diff --git a/packages/sdk/test/unit/EncryptionUtil.test.ts b/packages/sdk/test/unit/EncryptionUtil.test.ts index 7cec71e574..8c094c9e73 100644 --- a/packages/sdk/test/unit/EncryptionUtil.test.ts +++ b/packages/sdk/test/unit/EncryptionUtil.test.ts @@ -1,48 +1,12 @@ -import { createTestWallet } from '@streamr/test-utils' -import { StreamPartIDUtils, hexToBinary, toStreamID, toStreamPartID, utf8ToBinary } from '@streamr/utils' -import { EncryptionUtil, INITIALIZATION_VECTOR_LENGTH } from '../../src/encryption/EncryptionUtil' -import { GroupKey } from '../../src/encryption/GroupKey' -import { StreamrClientError } from '../../src/StreamrClientError' -import { createMockMessage } from '../test-utils/utils' -import { StreamMessage, StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage' import { AsymmetricEncryptionType } from '@streamr/trackerless-network' -import { RSAKeyPair } from '../../src/encryption/RSAKeyPair' +import { EncryptionUtil } from '../../src/encryption/EncryptionUtil' import { MLKEMKeyPair } from '../../src/encryption/MLKEMKeyPair' - -const STREAM_ID = toStreamID('streamId') +import { RSAKeyPair } from '../../src/encryption/RSAKeyPair' describe('EncryptionUtil', () => { const plaintext = Buffer.from('some random text', 'utf8') - describe('AES', () => { - it('returns a ciphertext which is different from the plaintext', () => { - const key = GroupKey.generate() - const ciphertext = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(ciphertext).not.toStrictEqual(plaintext) - }) - - it('returns the initial plaintext after decrypting the ciphertext', () => { - const key = GroupKey.generate() - const ciphertext = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(EncryptionUtil.decryptWithAES(ciphertext, key.data)).toStrictEqual(plaintext) - }) - - it('preserves size (plaintext + iv)', () => { - const key = GroupKey.generate() - const ciphertext = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(ciphertext.length).toStrictEqual(plaintext.length + INITIALIZATION_VECTOR_LENGTH) - }) - - it('produces different ivs and ciphertexts upon multiple encrypt() calls', () => { - const key = GroupKey.generate() - const cipher1 = EncryptionUtil.encryptWithAES(plaintext, key.data) - const cipher2 = EncryptionUtil.encryptWithAES(plaintext, key.data) - expect(cipher1.slice(0, INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(0, INITIALIZATION_VECTOR_LENGTH)) - expect(cipher1.slice(INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(INITIALIZATION_VECTOR_LENGTH)) - }) - }) - describe('RSA', () => { it('returns a ciphertext which is different from the plaintext', async () => { const key = await RSAKeyPair.create(512) @@ -53,7 +17,7 @@ describe('EncryptionUtil', () => { it('returns the initial plaintext after decrypting the ciphertext', async () => { const key = await RSAKeyPair.create(512) const ciphertext = await EncryptionUtil.encryptForPublicKey(plaintext, key.getPublicKey(), AsymmetricEncryptionType.RSA) - expect(await EncryptionUtil.decryptWithPrivateKey(ciphertext, key.getPrivateKey(), AsymmetricEncryptionType.RSA)).toStrictEqual(plaintext) + expect(await EncryptionUtil.decryptWithPrivateKey(ciphertext, key.getPrivateKey(), AsymmetricEncryptionType.RSA)).toEqualBinary(plaintext) }) it('produces different ciphertexts upon multiple encrypt() calls', async () => { @@ -76,7 +40,7 @@ describe('EncryptionUtil', () => { const ciphertext = await EncryptionUtil.encryptForPublicKey(plaintext, key.getPublicKey(), AsymmetricEncryptionType.ML_KEM) expect(await EncryptionUtil.decryptWithPrivateKey( ciphertext, key.getPrivateKey(), AsymmetricEncryptionType.ML_KEM - )).toStrictEqual(plaintext) + )).toEqualBinary(plaintext) }) it('produces different ciphertexts upon multiple encrypt() calls', async () => { @@ -86,39 +50,4 @@ describe('EncryptionUtil', () => { expect(cipher1).not.toStrictEqual(cipher2) }) }) - - describe('StreamMessage decryption', () => { - it('passes the happy path', async () => { - const key = GroupKey.generate() - const nextKey = GroupKey.generate() - const streamMessage = await createMockMessage({ - streamPartId: StreamPartIDUtils.parse('stream#0'), - publisher: await createTestWallet(), - content: { - foo: 'bar' - }, - encryptionKey: key, - nextEncryptionKey: nextKey - }) as StreamMessageAESEncrypted - const [content, newGroupKey] = EncryptionUtil.decryptStreamMessage(streamMessage, key) - expect(content).toEqualBinary(utf8ToBinary('{"foo":"bar"}')) - expect(newGroupKey).toEqual(nextKey) - }) - - it('throws if newGroupKey invalid', async () => { - const key = GroupKey.generate() - const msg = await createMockMessage({ - publisher: await createTestWallet(), - streamPartId: toStreamPartID(STREAM_ID, 0), - encryptionKey: key - }) - const msg2 = new StreamMessage({ - ...msg, - newGroupKey: { id: 'mockId', data: hexToBinary('0x1234') } - }) as StreamMessageAESEncrypted - expect(() => EncryptionUtil.decryptStreamMessage(msg2, key)).toThrowStreamrClientError( - new StreamrClientError('Could not decrypt new encryption key', 'DECRYPT_ERROR', msg2) - ) - }) - }) }) diff --git a/packages/sdk/test/unit/MessageFactory.test.ts b/packages/sdk/test/unit/MessageFactory.test.ts index 76d0edb8bb..752a2138c3 100644 --- a/packages/sdk/test/unit/MessageFactory.test.ts +++ b/packages/sdk/test/unit/MessageFactory.test.ts @@ -10,7 +10,8 @@ import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' import { MessageFactory, MessageFactoryOptions } from '../../src/publish/MessageFactory' import { PublishMetadata } from '../../src/publish/Publisher' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createMockEncryptionService, createStreamRegistry } from '../test-utils/utils' +import { decryptNextGroupKey } from '../../src/encryption/encryptionUtils' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { EncryptionType, SignatureType, ContentType } from '@streamr/trackerless-network' @@ -60,6 +61,7 @@ describe('MessageFactory', () => { groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: new SignatureValidator(opts?.erc1271ContractFacade ?? mock(), new DestroySignal()), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: { validation: { permissions: true, @@ -195,7 +197,8 @@ describe('MessageFactory', () => { id: nextGroupKey.id, data: expect.any(Uint8Array) }) - expect(GROUP_KEY.decryptNextGroupKey(msg.newGroupKey!)).toEqual(nextGroupKey) + const decrypted = decryptNextGroupKey(msg.newGroupKey!.id, msg.newGroupKey!.data, GROUP_KEY.data) + expect(new GroupKey(decrypted.id, Buffer.from(decrypted.data))).toEqual(nextGroupKey) }) it('not a publisher', async () => { diff --git a/packages/sdk/test/unit/Publisher.test.ts b/packages/sdk/test/unit/Publisher.test.ts index ac1564ec9d..71a7f32b6d 100644 --- a/packages/sdk/test/unit/Publisher.test.ts +++ b/packages/sdk/test/unit/Publisher.test.ts @@ -3,7 +3,7 @@ import { Publisher } from '../../src/publish/Publisher' import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { StreamIDBuilder } from '../../src/StreamIDBuilder' -import { createGroupKeyManager, createRandomIdentity } from '../test-utils/utils' +import { createGroupKeyManager, createMockEncryptionService, createRandomIdentity } from '../test-utils/utils' import type { StrictStreamrClientConfig } from '../../src/ConfigTypes' describe('Publisher', () => { @@ -22,6 +22,7 @@ describe('Publisher', () => { identity, mock(), mock(), + createMockEncryptionService(), { encryption: {}, validation: { diff --git a/packages/sdk/test/unit/aesUtils.test.ts b/packages/sdk/test/unit/aesUtils.test.ts new file mode 100644 index 0000000000..d09bd9e454 --- /dev/null +++ b/packages/sdk/test/unit/aesUtils.test.ts @@ -0,0 +1,80 @@ +import { createTestWallet } from '@streamr/test-utils' +import { StreamPartIDUtils, utf8ToBinary } from '@streamr/utils' +import { decryptWithAES, encryptWithAES, INITIALIZATION_VECTOR_LENGTH } from '../../src/encryption/aesUtils' +import { decryptStreamMessageContent } from '../../src/encryption/encryptionUtils' +import { GroupKey } from '../../src/encryption/GroupKey' +import { createMockMessage } from '../test-utils/utils' +import { StreamMessageAESEncrypted } from './../../src/protocol/StreamMessage' + +describe('aesUtils', () => { + + const plaintext = Buffer.from('some random text', 'utf8') + + describe('encryptWithAES / decryptWithAES', () => { + it('returns a ciphertext which is different from the plaintext', () => { + const key = GroupKey.generate() + const ciphertext = encryptWithAES(plaintext, key.data) + expect(ciphertext).not.toStrictEqual(plaintext) + }) + + it('returns the initial plaintext after decrypting the ciphertext', () => { + const key = GroupKey.generate() + const ciphertext = encryptWithAES(plaintext, key.data) + expect(decryptWithAES(ciphertext, key.data)).toEqualBinary(plaintext) + }) + + it('preserves size (plaintext + iv)', () => { + const key = GroupKey.generate() + const ciphertext = encryptWithAES(plaintext, key.data) + expect(ciphertext.length).toStrictEqual(plaintext.length + INITIALIZATION_VECTOR_LENGTH) + }) + + it('produces different ivs and ciphertexts upon multiple encrypt() calls', () => { + const key = GroupKey.generate() + const cipher1 = encryptWithAES(plaintext, key.data) + const cipher2 = encryptWithAES(plaintext, key.data) + expect(cipher1.slice(0, INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(0, INITIALIZATION_VECTOR_LENGTH)) + expect(cipher1.slice(INITIALIZATION_VECTOR_LENGTH)).not.toStrictEqual(cipher2.slice(INITIALIZATION_VECTOR_LENGTH)) + }) + }) + + describe('decryptStreamMessageContent', () => { + it('decrypts content and new group key', async () => { + const key = GroupKey.generate() + const nextKey = GroupKey.generate() + const streamMessage = await createMockMessage({ + streamPartId: StreamPartIDUtils.parse('stream#0'), + publisher: await createTestWallet(), + content: { + foo: 'bar' + }, + encryptionKey: key, + nextEncryptionKey: nextKey + }) as StreamMessageAESEncrypted + const result = decryptStreamMessageContent(streamMessage.content, key.data, streamMessage.newGroupKey) + expect(result.content).toEqualBinary(utf8ToBinary('{"foo":"bar"}')) + const newGroupKey = result.newGroupKey + ? new GroupKey(result.newGroupKey.id, Buffer.from(result.newGroupKey.data)) + : undefined + expect(newGroupKey).toEqual(nextKey) + }) + + it('throws if newGroupKey data is invalid', async () => { + const key = GroupKey.generate() + const streamMessage = await createMockMessage({ + streamPartId: StreamPartIDUtils.parse('stream#0'), + publisher: await createTestWallet(), + content: { foo: 'bar' }, + encryptionKey: key + }) as StreamMessageAESEncrypted + // Provide an invalid encrypted group key (too short to contain valid AES data) + const invalidNewGroupKey = { id: 'mockId', data: new Uint8Array([1, 2, 3, 4]) } + // decryptStreamMessageContent uses Node's crypto which throws on invalid cipher data + expect(() => decryptStreamMessageContent( + streamMessage.content, + key.data, + invalidNewGroupKey + )).toThrow() + }) + }) +}) diff --git a/packages/sdk/test/unit/messagePipeline.test.ts b/packages/sdk/test/unit/messagePipeline.test.ts index 4f9d00f773..a566004a99 100644 --- a/packages/sdk/test/unit/messagePipeline.test.ts +++ b/packages/sdk/test/unit/messagePipeline.test.ts @@ -6,15 +6,15 @@ import type { StrictStreamrClientConfig } from '../../src/ConfigTypes' import { DestroySignal } from '../../src/DestroySignal' import { ERC1271ContractFacade } from '../../src/contracts/ERC1271ContractFacade' import { StreamRegistry } from '../../src/contracts/StreamRegistry' -import { EncryptionUtil } from '../../src/encryption/EncryptionUtil' import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' +import { encryptWithAES } from '../../src/encryption/aesUtils' import { StreamrClientEventEmitter } from '../../src/events' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { createMessagePipeline } from '../../src/subscribe/messagePipeline' import { PushPipeline } from '../../src/utils/PushPipeline' -import { createMessageSigner, mockLoggerFactory } from '../test-utils/utils' +import { createMessageSigner, createMockEncryptionService, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EncryptionType, ContentType, SignatureType } from '@streamr/trackerless-network' @@ -95,6 +95,7 @@ describe('messagePipeline', () => { new StreamrClientEventEmitter(), destroySignal ), + encryptionService: createMockEncryptionService(), config, destroySignal, loggerFactory: mockLoggerFactory(), @@ -163,7 +164,7 @@ describe('messagePipeline', () => { it('error: no encryption key available', async () => { const encryptionKey = GroupKey.generate() - const content = EncryptionUtil.encryptWithAES(Buffer.from(JSON.stringify(CONTENT), 'utf8'), encryptionKey.data) + const content = encryptWithAES(Buffer.from(JSON.stringify(CONTENT), 'utf8'), encryptionKey.data) await pipeline.push(await createMessage({ content, encryptionType: EncryptionType.AES, diff --git a/packages/sdk/test/unit/resendSubscription.test.ts b/packages/sdk/test/unit/resendSubscription.test.ts index c716c48a96..ea22aad630 100644 --- a/packages/sdk/test/unit/resendSubscription.test.ts +++ b/packages/sdk/test/unit/resendSubscription.test.ts @@ -11,7 +11,14 @@ import { ResendRangeOptions } from '../../src/subscribe/Resends' import { Subscription, SubscriptionEvents } from '../../src/subscribe/Subscription' import { initResendSubscription } from '../../src/subscribe/resendSubscription' import { PushPipeline } from '../../src/utils/PushPipeline' -import { createGroupKeyQueue, createMessageSigner, createRandomIdentity, createStreamRegistry, mockLoggerFactory } from '../test-utils/utils' +import { + createGroupKeyQueue, + createMessageSigner, + createMockEncryptionService, + createRandomIdentity, + createStreamRegistry, + mockLoggerFactory +} from '../test-utils/utils' import { StreamMessage } from './../../src/protocol/StreamMessage' import { createStrictConfig } from '../../src/Config' @@ -61,6 +68,7 @@ describe('resend subscription', () => { groupKeyQueue: await createGroupKeyQueue(identity), signatureValidator: mock(), messageSigner: createMessageSigner(identity), + encryptionService: createMockEncryptionService(), config: createStrictConfig(), }) }) diff --git a/packages/utils/src/SigningUtil.ts b/packages/utils/src/SigningUtil.ts index 2a664f2a8f..968f0f9a15 100644 --- a/packages/utils/src/SigningUtil.ts +++ b/packages/utils/src/SigningUtil.ts @@ -82,8 +82,10 @@ export class EcdsaSecp256k1Evm extends SigningUtil { async createSignature(payload: Uint8Array, privateKey: Uint8Array): Promise { const msgHash = this.keccakHash(payload) const sigObj = secp256k1.ecdsaSign(msgHash, privateKey) - const result = Buffer.alloc(sigObj.signature.length + 1, Buffer.from(sigObj.signature)) - result.writeInt8(27 + sigObj.recid, result.length - 1) + // Return plain Uint8Array (not Buffer) for proper serialization across environments + const result = new Uint8Array(sigObj.signature.length + 1) + result.set(sigObj.signature) + result[result.length - 1] = 27 + sigObj.recid return result } diff --git a/packages/utils/src/binaryUtils.ts b/packages/utils/src/binaryUtils.ts index e9d3bc05ae..0d514ad1fd 100644 --- a/packages/utils/src/binaryUtils.ts +++ b/packages/utils/src/binaryUtils.ts @@ -20,10 +20,13 @@ export const hexToBinary = (hex: string): Uint8Array => { if (hex.length % 2 !== 0) { throw new Error(`Hex string length must be even, received: 0x${hex}`) } - const result = Buffer.from(hex, 'hex') - if (hex.length !== result.length * 2) { + if (!/^[0-9a-fA-F]*$/.test(hex)) { throw new Error(`Hex string input is likely malformed, received: 0x${hex}`) } + const result = new Uint8Array(hex.length / 2) + for (let i = 0; i < result.length; i++) { + result[i] = parseInt(hex.slice(i * 2, i * 2 + 2), 16) + } return result }