Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 118 additions & 65 deletions Sources/Tools/QUICTransfer/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,18 @@ final class QUICTransfer {
let NSEC_PER_MSEC = UInt64(Duration.milliseconds(1) / Duration.nanoseconds(1))
var serverSigningKey = P256.Signing.PrivateKey()

func run(iterations: Int, loggingHandle: LoggingHandle, group: DispatchGroup, sendSize: Int) -> Double {
let ipv4Client = Endpoint(address: IPv4Address(localIPv4Address)!, port: 0)
let ipv4Server = Endpoint(address: IPv4Address(remoteIPv4Address)!, port: 0)
func run(
iterations: Int,
loggingHandle: LoggingHandle,
group: DispatchGroup,
sendSize: Int,
linkDelay: NetworkDuration = .zero
) -> Double {
let ipv4Client = Endpoint(address: IPv4Address(localIPv4Address)!, port: 1234)
let ipv4Server = Endpoint(address: IPv4Address(remoteIPv4Address)!, port: 2345)
var clientStream: StreamUpperHarness? = nil
var clientInput: NewStreamFlowHarness? = nil
var serverInput: NewStreamFlowHarness? = nil
// Create a random payload to send back and forth
var payload = [UInt8](repeating: 0, count: sendSize)
payload = (0..<sendSize).map { _ in UInt8.random(in: 0...255) }
Expand All @@ -57,10 +66,15 @@ final class QUICTransfer {
var clientParameters = Parameters()
let context = NetworkContext(identifier: "QUICTransfer")
clientParameters.context = context
let path = PathProperties(parameters: clientParameters)

var serverParameters = Parameters()
serverParameters.isServer = true
serverParameters.context = context

context.activate()
context.async {
// Client
let path = PathProperties(parameters: clientParameters)
let clientIP = IPProtocol.instance(context: clientParameters.context)
let clientIPOptions = IPProtocol.options()
clientIPOptions.setLogID(prefix: "C", parent: "1", protocolLogIDNumber: 3)
Expand All @@ -87,8 +101,14 @@ final class QUICTransfer {
clientQUICOptions.setProtocolInstance(clientQUIC)
clientParameters.defaultStack.prepend(applicationProtocol: .quic(clientQUICOptions))

let clientOutput = BridgeDatagramProtocol.instance(context: clientParameters.context)
let bridgeOptions = BridgeDatagramProtocol.options()
bridgeOptions.linkDelay = linkDelay
bridgeOptions.setProtocolInstance(clientOutput)
clientParameters.defaultStack.link = .custom(bridgeOptions)

let clientListenerLinkage = StreamListenerLinkage(reference: clientQUIC)
let clientInput = StreamUpperHarness(
clientInput = NewStreamFlowHarness(
identifier: "Client",
local: ipv4Client,
remote: ipv4Server,
Expand All @@ -97,14 +117,20 @@ final class QUICTransfer {
context: context,
listenerProtocol: clientListenerLinkage
)

clientStream = StreamUpperHarness(
identifier: "C1",
local: ipv4Client,
remote: ipv4Server,
parameters: clientParameters,
path: path,
context: context,
listenerProtocol: clientListenerLinkage
)
guard let clientInput else {
group.leave()
return
}

let clientOutput = DatagramLowerHarness(
identifier: "Client",
context: clientParameters.context
)
do {
try clientQUIC.attachLowerDatagramProtocolForNewPath(
clientUDP,
Expand All @@ -121,22 +147,20 @@ final class QUICTransfer {
path: path
)
try clientIP.attachLowerDatagramProtocol(
clientOutput.reference,
clientOutput,
remote: ipv4Server,
local: ipv4Client,
parameters: clientParameters,
path: path
)
} catch {
loggingHandle.log("Failed to attach client IP to lower protocol")
group.leave()
return
}
// Server
var serverParameters = Parameters()
serverParameters.isServer = true
serverParameters.context = context
let serverPath = PathProperties(parameters: serverParameters)
let serverIP = IPProtocol.instance(context: clientParameters.context)
let serverIP = IPProtocol.instance(context: context)
let serverIPOptions = IPProtocol.options()
serverIPOptions.setLogID(prefix: "L", parent: "1", protocolLogIDNumber: 3)
clientIPOptions.setProtocolInstance(serverIP)
Expand All @@ -161,8 +185,14 @@ final class QUICTransfer {
serverQUICOptions.setProtocolInstance(serverQUIC)
serverParameters.defaultStack.prepend(applicationProtocol: .quic(serverQUICOptions))

let serverOutput = BridgeDatagramProtocol.instance(context: context)
let serverBridgeOptions = BridgeDatagramProtocol.options()
serverBridgeOptions.linkDelay = linkDelay
serverBridgeOptions.setProtocolInstance(serverOutput)
serverParameters.defaultStack.link = .custom(serverBridgeOptions)

let serverListenerLinkage = StreamListenerLinkage(reference: serverQUIC)
let serverInput = NewStreamFlowHarness(
serverInput = NewStreamFlowHarness(
identifier: "Server",
local: ipv4Server,
remote: ipv4Client,
Expand All @@ -171,14 +201,11 @@ final class QUICTransfer {
context: serverParameters.context,
listenerProtocol: serverListenerLinkage
)

guard let serverInput else {
group.leave()
return
}

let serverOutput = DatagramLowerHarness(
identifier: "Server",
context: clientParameters.context
)
do {
try serverQUIC.attachLowerDatagramProtocolForNewPath(
serverUDP,
Expand All @@ -196,74 +223,89 @@ final class QUICTransfer {
path: path
)
try serverIP.attachLowerDatagramProtocol(
serverOutput.reference,
serverOutput,
remote: ipv4Client,
local: ipv4Server,
parameters: serverParameters,
path: serverPath
)
} catch {
loggingHandle.log("Failed to attach server IP to lower protocol")
group.leave()
return
}
var serverConnected: Bool = false
serverInput.start { connected in
serverConnected = connected
// Server connected event
group.leave()
}
clientInput.start()
// The client will complete TLS before the server
while !serverConnected {
let _ = self.dataBenchmarkUtility.loopOutputHandlerPackets(
sender: clientOutput,
receiver: serverOutput,
maximumBurst: 10
)
let _ = self.dataBenchmarkUtility.loopOutputHandlerPackets(
sender: serverOutput,
receiver: clientOutput,
maximumBurst: 10
)
}
// Transfer all of the data
for _ in 0..<iterations {
group.enter()
let writeSuccess = clientInput.write(payload)
guard writeSuccess else {
clientStream?.start()
}
group.wait()
guard let serverInput, let clientInput, let clientStream else {
return 0
}
var serverStream: StreamUpperHarness?

var totalReadSize = 0
while index < iterations {
group.enter()
context.async {
guard clientStream.write(payload) else {
loggingHandle.log("Client failed to write at iteration: \(index)")
group.leave()
print("Issue took place writing to the client")
break
return
}
var payloadReceived = false
var readDataSize = 0
while !payloadReceived {
let _ = self.dataBenchmarkUtility.loopOutputHandlerPackets(
sender: clientOutput,
receiver: serverOutput,
maximumBurst: 50
)
let _ = self.dataBenchmarkUtility.loopOutputHandlerPackets(
sender: serverOutput,
receiver: clientOutput,
maximumBurst: 50
)

if let readBytes = serverInput.upperHarnesses.first?.readAndDrop() {
readDataSize += readBytes
if readDataSize == sendSize {
payloadReceived = true
}
if serverStream == nil {
group.enter()
serverInput.waitForNewFlow {
loggingHandle.log("Server got new inbound flow")
serverStream = serverInput.upperHarnesses.last
group.leave()
}
}
index += 1
group.leave()
}
group.wait()

guard let serverStream else {
return 0
}

group.enter()
context.async {
var serverReadDataSizeForIteration = 0
var serverReadCompletion: ((Bool) -> Void)? = nil
serverReadCompletion = { _ in
let readBytes = serverStream.readAndDrop()
if readBytes > 0 {
serverReadDataSizeForIteration += readBytes
totalReadSize += readBytes
}
if serverReadDataSizeForIteration >= payload.count {
serverReadCompletion = nil
index += 1
group.leave()
} else {
serverStream.waitForInboundDataAvailable(completion: serverReadCompletion!)
}
}
serverStream.waitForInboundDataAvailable(completion: serverReadCompletion!)
}
group.wait()
}

group.enter()
context.async {
clientStream.stop()
clientInput.stop()
clientInput.teardown()
serverInput.stop()
serverInput.teardown()
group.leave()
}
group.wait()

print("Completed \(index) / \(iterations) transfers")
// Short circuit and return 0 if index does not match iterations
if index != iterations {
Expand All @@ -281,6 +323,7 @@ final class QUICTransfer {
var iterations = 10000 // 5gb total (if 500000 sendSize)
var loggingHandler: LoggingHandle = LoggingHandle(loggingType: .none)
var sendSize = 500000 // 500kb
var linkDelay = NetworkDuration.zero
var arguments = CommandLine.arguments.dropFirst(0)
if arguments.contains("-iterations"),
let index = arguments.firstIndex(of: "-iterations")
Expand Down Expand Up @@ -310,6 +353,15 @@ if arguments.contains("-size"),
}
}
}
if arguments.contains("-link-delay-ms"),
let index = arguments.firstIndex(of: "-link-delay-ms")
{
if arguments.count >= (index + 2) {
if let linkDelayOption = Int(arguments[index + 1]) {
linkDelay = .milliseconds(linkDelayOption)
}
}
}

// Create and run the transfers
let quicTransfer = QUICTransfer()
Expand All @@ -319,7 +371,8 @@ let totalTime = quicTransfer.run(
iterations: iterations,
loggingHandle: loggingHandler,
group: group,
sendSize: sendSize
sendSize: sendSize,
linkDelay: linkDelay
)
if totalTime > 0 {
print("Finished all (\(iterations)) transfers in \(totalTime) seconds")
Expand Down
Loading