Skip to content
Merged
79 changes: 79 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,85 @@ void main() async {

An `UnsupportedImTypeException` is thrown if `ImList.wrap` or `ImMap.wrap` encounters a type that cannot be converted.

### Zero-Copy Data Transfers (Transferables)

For large `Uint8List` or `ByteBuffer` payloads, pass a `transferables` list to `compute()` to enable zero-copy transport instead of copying bytes across isolate boundaries.

#### Basic usage

```dart
@pragma('vm:entry-point')
@isolateManagerWorker
Uint8List processImage(Uint8List data) {
// ... image processing ...
return data;
}

final manager = IsolateManager.create(processImage, workerName: 'processImage');
await manager.start();

final pixels = Uint8List(1920 * 1080 * 4); // ~8 MB RGBA frame

final result = await manager.compute(
pixels,
transferables: [pixels.buffer], // zero-copy send
);
```

On **native (VM)** the buffer is wrapped in a `TransferableTypedData` and sent O(1) — no byte copying. On **web (dart2js)** the `ArrayBuffer` is transferred via the `postMessage` transfer list, also O(1), and the source buffer is **detached** (its `lengthInBytes` becomes 0) after the call returns.

#### Auto-extraction with `sendResultWithAutoTransfer`

Inside a custom isolate, the `AutoTransferExtension` recursively finds every `Uint8List` / `ByteBuffer` in the result and transfers them automatically — no manual bookkeeping needed:

```dart
import 'package:isolate_manager/isolate_manager.dart';

@pragma('vm:entry-point')
void processingWorker(dynamic params) {
final controller =
IsolateManagerController<Map<String, Object?>, Uint8List>(params);

controller.onIsolateMessage.listen((input) {
final output = Uint8List(input.length);
for (var i = 0; i < output.length; i++) {
output[i] = (input[i] + 1) % 256;
}

// Finds all Uint8List/ByteBuffer in the map and transfers them zero-copy.
controller.sendResultWithAutoTransfer({'result': output, 'size': output.length});
});

controller.initialized();
}
```

#### Pros, cons, and platform behaviour

| | Native (VM) | Web — dart2js | Web — dart2wasm |
|---|---|---|---|
| **No transferables** | Bytes deep-copied O(n) | Bytes serialised & copied O(n) | Bytes copied O(n) |
| **`transferables: [data.buffer]`** | Zero-copy via `TransferableTypedData` (O(1) transport; small codec overhead) | Zero-copy via `ArrayBuffer` transfer (O(1)); source buffer detached | ⚠ No benefit — WASM linear memory must be copied to the JS heap regardless |
| **Pre-built `TransferableTypedData`** | Fastest — skips codec overhead entirely | N/A | N/A |

**Native (VM)**

* ✅ Eliminates the O(n) copy for large buffers; measurable improvement at ~1 MB+.
* ✅ Pre-building `TransferableTypedData` before calling `compute()` removes the codec overhead and is the fastest option.
* ⚠ Small buffers (< ~100 KB) may see no net gain or a slight regression due to codec overhead.
* ⚠ The source buffer is consumed by `TransferableTypedData`; do not reuse the original `Uint8List` after calling `compute()` with it as a transferable.

**Web — dart2js**

* ✅ Source `ArrayBuffer` is transferred in O(1); the worker receives the original memory.
* ✅ Large speedups (2–10×) for MB-range payloads compared to copy-based transfer.
* ⚠ Source buffer is **neutered** after `compute()` returns — `data.buffer.lengthInBytes` becomes 0. Keep a reference to the result instead.

**Web — dart2wasm**

* ⚠ WASM uses linear memory that is opaque to the JS engine. Every transfer still requires a copy from the WASM heap to a JS `ArrayBuffer`, so using `transferables` adds codec overhead with no speed benefit.
* Prefer omitting `transferables` when targeting WASM.

### Handling Exceptions (Web)

To ensure custom exceptions are correctly propagated from Web Workers:
Expand Down
1 change: 1 addition & 0 deletions lib/isolate_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export 'src/models/isolate_manager_shared_worker.dart';
export 'src/models/isolate_manager_worker.dart';
export 'src/models/isolate_types.dart';
export 'src/models/queue_strategy.dart';
export 'src/utils/auto_transfer.dart';
5 changes: 4 additions & 1 deletion lib/src/base/contactor/isolate_contactor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ abstract class IsolateContactor<R, P> {

/// Send message to the `function` for computing
///
/// [transferables] - Optional list of transferable objects (e.g., ByteBuffer)
/// that will be transferred instead of copied.
///
/// Throw `IsolateContactorException` when error occurs.
Future<R> sendMessage(P message);
Future<R> sendMessage(P message, {List<Object>? transferables});

/// Listen to the result from the isolate.
Stream<R> get onMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class IsolateContactorInternal<R, P> extends IsolateContactor<R, P> {
}

@override
Future<R> sendMessage(P message) async {
Future<R> sendMessage(P message, {List<Object>? transferables}) async {
printDebug(() => '[Main App] Message sent to isolate: $message');
_isolateContactorController.sendIsolate(message);
_isolateContactorController.sendIsolate(
message,
transferables: transferables,
);
return _isolateContactorController.onMessage.first;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ class IsolateContactorInternalFuture<R, P>
}

@override
Future<R> sendMessage(P message) async {
Future<R> sendMessage(P message, {List<Object>? transferables}) async {
printDebug(() => '[Main App] Message sent to the Web Future: $message');
_isolateContactorController.sendIsolate(message);
_isolateContactorController.sendIsolate(
message,
transferables: transferables,
);
return _isolateContactorController.onMessage.first;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,12 @@ class IsolateContactorInternalWorker<R, P>
}

@override
Future<R> sendMessage(P message) async {
Future<R> sendMessage(P message, {List<Object>? transferables}) async {
printDebug(() => '[Main App] Message sent to the Web Worker: $message');
_isolateContactorController.sendIsolate(message);
_isolateContactorController.sendIsolate(
message,
transferables: transferables,
);
return _isolateContactorController.onMessage.first;
}
}
12 changes: 10 additions & 2 deletions lib/src/base/contactor/isolate_contactor_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,21 @@ abstract class IsolateContactorController<R, P> {
Completer<void> get ensureInitialized;

/// Send `message` to the isolate for computation
void sendIsolate(P message);
///
/// [transferables] - Optional list of transferable objects (e.g., ByteBuffer)
/// that will be transferred instead of copied. Only works on web platform.
/// On native platforms, listed buffers are encoded as TransferableTypedData.
void sendIsolate(P message, {List<Object>? transferables});

/// Send an `IsolateState` message to the isolate
void sendIsolateState(IsolateState state);

/// Send the `result` of computation to `onIsolateMessage` stream
void sendResult(R result);
///
/// [transferables] - Optional list of transferable objects (e.g., ByteBuffer)
/// that will be transferred instead of copied. Only works on web platform.
/// On native platforms, listed buffers are encoded as TransferableTypedData.
void sendResult(R result, {List<Object>? transferables});

/// Send the `Exception` to the main app
void sendResultError(IsolateException exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:isolate_manager/src/base/contactor/isolate_contactor_controller.
import 'package:isolate_manager/src/base/contactor/models/isolate_port.dart';
import 'package:isolate_manager/src/base/contactor/models/isolate_state.dart';
import 'package:isolate_manager/src/models/isolate_exceptions.dart';
import 'package:isolate_manager/src/utils/native_transferable_codec.dart';
import 'package:isolate_manager/src/utils/print.dart';
import 'package:stream_channel/isolate_channel.dart';

Expand Down Expand Up @@ -63,8 +64,12 @@ class IsolateContactorControllerImpl<R, P>
}

@override
void sendIsolate(P message) {
_delegate.sink.add({IsolatePort.isolate: message});
void sendIsolate(P message, {List<Object>? transferables}) {
final payload = encodeNativeTransferPayload(
message,
transferables: transferables,
);
_delegate.sink.add({IsolatePort.isolate: payload});
}

@override
Expand All @@ -73,8 +78,12 @@ class IsolateContactorControllerImpl<R, P>
}

@override
void sendResult(R message) {
_delegate.sink.add({IsolatePort.main: message});
void sendResult(R message, {List<Object>? transferables}) {
final payload = encodeNativeTransferPayload(
message,
transferables: transferables,
);
_delegate.sink.add({IsolatePort.main: payload});
}

@override
Expand Down Expand Up @@ -105,11 +114,13 @@ class IsolateContactorControllerImpl<R, P>
}

void _handleMainPort(dynamic value) {
final decodedValue = decodeNativeTransferPayload(value);

debugPrinter(
() => '[Main App] Message received from the Isolate: $value',
() => '[Main App] Message received from the Isolate: $decodedValue',
debug: _debugMode,
);
switch (value) {
switch (decodedValue) {
case == IsolateState.initialized:
if (!ensureInitialized.isCompleted) {
ensureInitialized.complete();
Expand All @@ -118,7 +129,9 @@ class IsolateContactorControllerImpl<R, P>
_mainStreamController.addError(e.error, e.stackTrace);
default:
try {
_mainStreamController.add(_converter?.call(value) ?? value as R);
_mainStreamController.add(
_converter?.call(decodedValue) ?? decodedValue as R,
);
// To catch both Error and Exception
// ignore: avoid_catches_without_on_clauses
} catch (e, stack) {
Expand All @@ -128,17 +141,19 @@ class IsolateContactorControllerImpl<R, P>
}

Future<void> _handleIsolatePort(dynamic value) async {
final decodedValue = decodeNativeTransferPayload(value);

debugPrinter(
() => '[Isolate] Message received from the Main App: $value',
() => '[Isolate] Message received from the Main App: $decodedValue',
debug: _debugMode,
);
switch (value) {
switch (decodedValue) {
case == IsolateState.dispose:
_onDispose?.call();
await close();
default:
try {
_isolateStreamController.add(value as P);
_isolateStreamController.add(decodedValue as P);
// To catch both Error and Exception
// ignore: avoid_catches_without_on_clauses
} catch (e, stack) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,17 @@ class IsolateContactorControllerImplFuture<R, P>
}

@override
void sendIsolate(P message) {
void sendIsolate(P message, {List<Object>? transferables}) {
if (_delegate.isClosed) return;

if (transferables != null && transferables.isNotEmpty) {
debugPrinter(
() =>
'[Main App] transferables ignored in Web Future mode. Set workerName to use Worker transfer lists.',
debug: _debugMode,
);
}

_delegate.sink.add({IsolatePort.isolate: message});
}

Expand All @@ -78,9 +86,17 @@ class IsolateContactorControllerImplFuture<R, P>
}

@override
void sendResult(R message) {
void sendResult(R message, {List<Object>? transferables}) {
if (_delegate.isClosed) return;

if (transferables != null && transferables.isNotEmpty) {
debugPrinter(
() =>
'[Isolate] transferables ignored in Web Future mode. Set workerName to use Worker transfer lists.',
debug: _debugMode,
);
}

_delegate.sink.add({IsolatePort.main: message});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:isolate_manager/src/base/contactor/isolate_contactor_controller/
import 'package:isolate_manager/src/base/isolate_contactor.dart';
import 'package:isolate_manager/src/models/isolate_types.dart';
import 'package:isolate_manager/src/utils/check_subtype.dart';
import 'package:isolate_manager/src/utils/extract_array_buffers.dart';
import 'package:isolate_manager/src/utils/print.dart';
import 'package:web/web.dart';

Expand Down Expand Up @@ -50,11 +51,15 @@ class IsolateContactorControllerImplWorker<R, P>
Stream<R> get onMessage => _mainStreamController.stream;

@override
void sendIsolate(P message) {
if (message is ImType) {
_delegate.postMessage(message.unwrap.jsify());
void sendIsolate(P message, {List<Object>? transferables}) {
final jsMessage =
message is ImType ? message.unwrap.jsify() : message.jsify();

if (transferables != null && transferables.isNotEmpty) {
final jsTransferables = extractArrayBuffers(transferables);
_delegate.postMessage(jsMessage, jsTransferables);
} else {
_delegate.postMessage(message.jsify());
_delegate.postMessage(jsMessage);
}
}

Expand All @@ -74,7 +79,7 @@ class IsolateContactorControllerImplWorker<R, P>
throw UnimplementedError('initialized method is not implemented');

@override
void sendResult(R message) =>
void sendResult(R message, {List<Object>? transferables}) =>
throw UnimplementedError('sendResult is not implemented');

@override
Expand Down
28 changes: 24 additions & 4 deletions lib/src/isolate_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -693,14 +693,26 @@ class IsolateManager<R, P> {
P params, {
IsolateCallback<R>? callback,
bool priority = false,
}) => compute(params, callback: callback, priority: priority);
List<Object>? transferables,
}) => compute(
params,
callback: callback,
priority: priority,
transferables: transferables,
);

/// Similar to the [compute], for who's using IsolateContactor.
Future<R> sendMessage(
P params, {
IsolateCallback<R>? callback,
bool priority = false,
}) => compute(params, callback: callback, priority: priority);
List<Object>? transferables,
}) => compute(
params,
callback: callback,
priority: priority,
transferables: transferables,
);

/// Compute isolate manager with [R] is return type.
///
Expand Down Expand Up @@ -738,10 +750,15 @@ class IsolateManager<R, P> {
P params, {
IsolateCallback<R>? callback,
bool priority = false,
List<Object>? transferables,
}) async {
await start();

final queue = IsolateQueue<R, P>(params, callback);
final queue = IsolateQueue<R, P>(
params,
callback,
transferables: transferables,
);
queueStrategy.add(queue, addToTop: priority);
_executeQueue();

Expand Down Expand Up @@ -795,7 +812,10 @@ class IsolateManager<R, P> {
);

try {
await isolate.sendMessage(queue.params);
await isolate.sendMessage(
queue.params,
transferables: queue.transferables,
);
// To catch both Error and Exception
// ignore: avoid_catches_without_on_clauses
} catch (_) {
Expand Down
Loading