Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions lib/src/kex/kex_x25519.dart
Original file line number Diff line number Diff line change
@@ -1,28 +1,61 @@
import 'dart:typed_data';

import 'package:dartssh2/src/ssh_kex.dart';
import 'package:dartssh2/src/utils/compute.dart';
import 'package:dartssh2/src/utils/bigint.dart';
import 'package:dartssh2/src/utils/list.dart';
import 'package:pinenacl/tweetnacl.dart';

class SSHKexX25519 implements SSHKexECDH {
/// Randomly generated private key.
late final Uint8List privateKey;
final Uint8List privateKey;

/// Public key computed from the private key.
@override
late final Uint8List publicKey;
final Uint8List publicKey;

factory SSHKexX25519() {
final privateKey = randomBytes(32);
final publicKey = _ScalarMult.scalseMultBase(privateKey);
return SSHKexX25519._(
privateKey: privateKey,
publicKey: publicKey,
);
}

SSHKexX25519._({required this.privateKey, required this.publicKey});

SSHKexX25519() {
privateKey = randomBytes(32);
publicKey = _ScalarMult.scalseMultBase(privateKey);
static Future<SSHKexX25519> createAsync() async {
final keyPair = await sshCompute(_computeX25519KeyPair, null);
return SSHKexX25519._(
privateKey: keyPair[0],
publicKey: keyPair[1],
);
}

@override
BigInt computeSecret(Uint8List remotePublicKey) {
final secret = _ScalarMult.scalseMult(privateKey, remotePublicKey);
return decodeBigIntWithSign(1, secret);
}

Future<BigInt> computeSecretAsync(Uint8List remotePublicKey) async {
final secret = await sshCompute(
_computeX25519Secret,
[privateKey, remotePublicKey],
);
return decodeBigIntWithSign(1, secret);
}
}

List<Uint8List> _computeX25519KeyPair(void _) {
final privateKey = randomBytes(32);
final publicKey = _ScalarMult.scalseMultBase(privateKey);
return [privateKey, publicKey];
}

Uint8List _computeX25519Secret(List<Uint8List> data) {
return _ScalarMult.scalseMult(data[0], data[1]);
}

/// Scalar multiplication, Implements curve25519.
Expand Down
91 changes: 54 additions & 37 deletions lib/src/ssh_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ class SSHTransport {
/// transport is closed.
StreamSubscription? _socketSubscription;

/// Guards asynchronous packet processing to preserve message order.
var _isProcessingData = false;

/// Identification string sent by us without trailing \r\n. For example,
/// "SSH-2.0-DartSSH_2.0".
String get _localVersion => 'SSH-2.0-$version';
Expand Down Expand Up @@ -408,13 +411,7 @@ class SSHTransport {

void _onSocketData(Uint8List data) {
_buffer.add(data);
try {
_processData();
} on SSHError catch (e, stackTrace) {
closeWithError(e, stackTrace);
} catch (e) {
rethrow;
}
_scheduleProcessData();
}

void _onSocketError(Object error, StackTrace stackTrace) {
Expand All @@ -427,11 +424,33 @@ class SSHTransport {
close();
}

void _processData() {
void _scheduleProcessData() {
if (_isProcessingData || isClosed) {
return;
}

_isProcessingData = true;

_processDataAsync().catchError((error, stackTrace) {
if (error is SSHError) {
closeWithError(error, stackTrace);
} else {
closeWithError(SSHInternalError(error), stackTrace);
}
}).whenComplete(() {
_isProcessingData = false;
if (_buffer.isNotEmpty && !isClosed) {
_scheduleProcessData();
}
});
}

Future<void> _processDataAsync() async {
if (_remoteVersion == null) {
_processVersionExchange();
} else {
_processPackets();
}
if (_remoteVersion != null) {
await _processPackets();
}
}

Expand Down Expand Up @@ -473,12 +492,12 @@ class SSHTransport {
_sendKexInit();
}

// There maybe more data in the buffer, so process it.
_processPackets();
// There maybe more data in the buffer, so it will be consumed by the
// asynchronous packet processing queue.
}

/// Process one or more SSH packets queued in [_buffer].
void _processPackets() {
Future<void> _processPackets() async {
printDebug?.call('SSHTransport._processPackets');

while (_buffer.isNotEmpty && !isClosed) {
Expand All @@ -491,7 +510,7 @@ class SSHTransport {
// throw SSHPacketError('Packet too long: ${payload.length}');
// }

_handleMessage(payload);
await _handleMessage(payload);

_remotePacketSN.increase();
}
Expand Down Expand Up @@ -980,7 +999,7 @@ class SSHTransport {
sendPacket(message.encode());
}

void _handleMessage(Uint8List message) {
Future<void> _handleMessage(Uint8List message) async {
final messageId = SSHMessage.readMessageId(message);
switch (messageId) {
case SSH_Message_KexInit.messageId:
Expand All @@ -995,7 +1014,7 @@ class SSHTransport {
}
}

void _handleMessageKexInit(Uint8List payload) {
Future<void> _handleMessageKexInit(Uint8List payload) async {
printDebug?.call('SSHTransport._handleMessageKexInit');

// If this message initiates a new key-exchange round from the remote
Expand Down Expand Up @@ -1073,7 +1092,7 @@ class SSHTransport {

switch (_kexType) {
case SSHKexType.x25519:
_kex = SSHKexX25519();
_kex = await SSHKexX25519.createAsync();
break;
case SSHKexType.nistp256:
_kex = SSHKexNist.p256();
Expand Down Expand Up @@ -1107,7 +1126,7 @@ class SSHTransport {
/// When client receives [SSH_Message_KexECDH_Reply], it should verify the
/// server's signature with the server's public key. Then send NEW_KEYS
/// message back to the server.
void _handleMessageKexReply(Uint8List payload) {
Future<void> _handleMessageKexReply(Uint8List payload) async {
printDebug?.call('SSHTransport._handleMessageKexReply');
if (isServer) throw SSHStateError('Unexpected KEX_REPLY');

Expand Down Expand Up @@ -1149,7 +1168,11 @@ class SSHTransport {
hostSignature = message.signature;
serverKexKey = message.ecdhPublicKey;
clientKexKey = kex.publicKey;
sharedSecret = kex.computeSecret(message.ecdhPublicKey);
if (kex is SSHKexX25519) {
sharedSecret = await kex.computeSecretAsync(message.ecdhPublicKey);
} else {
sharedSecret = kex.computeSecret(message.ecdhPublicKey);
}
} else {
throw UnimplementedError('$kex');
}
Expand Down Expand Up @@ -1189,27 +1212,21 @@ class SSHTransport {
}

final userVerified = onVerifyHostKey != null
? onVerifyHostKey!(_hostkeyType!.name, fingerprint)
? await Future.value(onVerifyHostKey!(_hostkeyType!.name, fingerprint))
: true;

Future.value(userVerified).then(
(verified) {
if (!verified) {
closeWithError(SSHHostkeyError('Hostkey verification failed'));
} else {
_hostkeyVerified = true;
_sendNewKeys();
_applyLocalKeys();
onReady?.call();
}
},
onError: (error) {
closeWithError(error);
},
);
if (!userVerified) {
closeWithError(SSHHostkeyError('Hostkey verification failed'));
return;
}

_hostkeyVerified = true;
_sendNewKeys();
_applyLocalKeys();
onReady?.call();
}

void _handleMessageKexGexReply(Uint8List payload) {
Future<void> _handleMessageKexGexReply(Uint8List payload) async {
printDebug?.call('SSHTransport._handleMessageKexGexReply');
if (isServer) throw SSHStateError('Unexpected KEX_GEX_REPLY');

Expand All @@ -1220,7 +1237,7 @@ class SSHTransport {
_sendKexDHGexInit();
}

void _handleMessageNewKeys(Uint8List message) {
Future<void> _handleMessageNewKeys(Uint8List message) async {
printDebug?.call('SSHTransport._handleMessageNewKeys');
printTrace?.call('<- $socket: SSH_Message_NewKeys');

Expand Down
7 changes: 7 additions & 0 deletions lib/src/utils/compute.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import 'compute_stub.dart' if (dart.library.isolate) 'compute_io.dart';

typedef SSHComputeCallback<M, R> = R Function(M message);

Future<R> sshCompute<M, R>(SSHComputeCallback<M, R> callback, M message) {
return sshComputeImpl(callback, message);
}
63 changes: 63 additions & 0 deletions lib/src/utils/compute_io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import 'dart:async';
import 'dart:isolate';

class _ComputeConfiguration<M, R> {
final R Function(M message) callback;
final M message;
final SendPort resultPort;

const _ComputeConfiguration({
required this.callback,
required this.message,
required this.resultPort,
});
}

class _ComputeError {
final String error;
final String stackTrace;

const _ComputeError(this.error, this.stackTrace);
}

void _spawn<M, R>(_ComputeConfiguration<M, R> configuration) {
try {
final result = configuration.callback(configuration.message);
Isolate.exit(configuration.resultPort, result);
} catch (error, stackTrace) {
Isolate.exit(
configuration.resultPort,
_ComputeError(error.toString(), stackTrace.toString()),
);
}
}

Future<R> sshComputeImpl<M, R>(
R Function(M message) callback,
M message,
) async {
final resultPort = RawReceivePort();
final completer = Completer<R>();

resultPort.handler = (response) {
resultPort.close();
if (response is _ComputeError) {
completer.completeError(
RemoteError(response.error, response.stackTrace),
);
return;
}
completer.complete(response as R);
};

await Isolate.spawn<_ComputeConfiguration<M, R>>(
_spawn,
_ComputeConfiguration<M, R>(
callback: callback,
message: message,
resultPort: resultPort.sendPort,
),
);

return completer.future;
}
6 changes: 6 additions & 0 deletions lib/src/utils/compute_stub.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Future<R> sshComputeImpl<M, R>(
R Function(M message) callback,
M message,
) {
return Future<R>.sync(() => callback(message));
}
Loading
Loading