Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 209 additions & 18 deletions network-mux/demo/mux-leios-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@
--
module Main (main) where

import Data.Binary.Get qualified as Bin
import Data.Binary.Put qualified as Bin
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BSC
import Data.ByteString.Lazy qualified as BL
import Data.IP (IP)
import Data.IP qualified as IP
import Data.TDigest (TDigest, insert, maximumValue, mean, minimumValue, quantile, stddev, tdigest)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import Data.Word (Word64)
import Text.Printf (printf)
import Text.Read (readMaybe)

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM (TVar, atomically, modifyTVar', newTVarIO, readTVar)
import Control.Exception
import Control.Monad
import Control.Monad.Class.MonadTime.SI (getCurrentTime)
import Control.Tracer

import System.Environment qualified as SysEnv
Expand All @@ -43,6 +52,14 @@ data ClientType = Sequential | Bursty
unusedValue :: a
unusedValue = error "unused"

data LatencyStats = LatencyStats {
lsCount :: !Int,
lsDigest :: !(TDigest 5),
lsRespBytes :: !Word64,
lsFirstRespTsUs :: !Word64,
lsLastRxUs :: !Word64
}

main :: IO ()
main = do
args <- SysEnv.getArgs
Expand Down Expand Up @@ -214,8 +231,8 @@ serverWorkerSequential bearer len1 len2 = do
go :: Char -> ReqRespServer ByteString ByteString IO Int
go c =
ReqRespServer {
recvMsgReq = \(!_) ->
let msg = BSC.replicate n c in
recvMsgReq = \(!_) -> do
msg <- makeTimestampedPayload n c
pure (msg, go (succ c)),
recvMsgDone = pure n
}
Expand Down Expand Up @@ -252,13 +269,14 @@ serverWorkerBursty bearer (n1, n2) len1 len2 = do
:: Int
-> Int
-> ReqRespServerBurst ByteString ByteString IO Int
serverReqResp n len = ReqRespServerBurst $ \_ -> pure (go n minBound)
serverReqResp n len = ReqRespServerBurst $ \_ -> go n minBound
where
go :: Int -> Char -> ReqRespServerLoop ByteString IO Int
go m c | m > 0 =
SendMsgResp (BSC.replicate len c) (return $ go (m-1) (succ c))
go :: Int -> Char -> IO (ReqRespServerLoop ByteString IO Int)
go m c | m > 0 = do
msg <- makeTimestampedPayload len c
return $ SendMsgResp msg (go (m - 1) (succ c))
| otherwise =
SendMsgDoneServer (pure n)
return $ SendMsgDoneServer (pure n)


--
Expand Down Expand Up @@ -301,6 +319,10 @@ clientWorkerSequential
-- ^ number of requests to send over `MiniProtocolNum 3`
-> IO ()
clientWorkerSequential bearer len n1 n2 = do
praosStats <- newTVarIO newLatencyStats
praosMissing <- newTVarIO 0
leiosStats <- newTVarIO newLatencyStats
leiosMissing <- newTVarIO 0
mux <- Mx.new Mx.nullTracers (protocols InitiatorDirectionOnly)
void $ forkIO $
do awaitResult1 <-
Expand All @@ -309,38 +331,51 @@ clientWorkerSequential bearer len n1 n2 = do
(MiniProtocolNum 2)
InitiatorDirectionOnly
StartEagerly
(\chan -> runClientBin (reqrespTracer "client:praos") chan (clientReqResp '0' n1))
(\chan -> runClientBin (reqrespTracer "client:praos") chan
(clientReqResp praosStats praosMissing '0' n1))
awaitResult2 <-
runMiniProtocol
mux
(MiniProtocolNum 3)
InitiatorDirectionOnly
StartEagerly
(\chan -> runClientBin (reqrespTracer "client:leios") chan (clientReqResp '1' n2))
(\chan -> runClientBin (reqrespTracer "client:leios") chan
(clientReqResp leiosStats leiosMissing '1' n2))
-- wait for both mini-protocols to finish
results <- atomically $ (,) <$> awaitResult1
<*> awaitResult2
debugPutStrLn_ $ "client results: " ++ show results
reportStats "praos" praosStats praosMissing
reportStats "leios" leiosStats leiosMissing
`finally`
Mx.stop mux
Mx.run mux bearer
where
clientReqResp
:: Char
:: TVar LatencyStats
-> TVar Int
-> Char
-> Int
-> ReqRespClient ByteString ByteString IO Int
clientReqResp c n = go n
clientReqResp statsVar missingVar c n = go n
where
!msg = BSC.replicate len c

go :: Int -> ReqRespClient ByteString ByteString IO Int
go m | m <= 0
= SendMsgDone (pure n)
go m = SendMsgReq msg (\_ -> pure $ go (m-1))
go m =
SendMsgReq msg (\rsp -> do
recordLatency statsVar missingVar rsp
pure $ go (m-1))


clientWorkerBursty :: Mx.Bearer IO -> IO ()
clientWorkerBursty bearer = do
praosStats <- newTVarIO newLatencyStats
praosMissing <- newTVarIO 0
leiosStats <- newTVarIO newLatencyStats
leiosMissing <- newTVarIO 0
mux <- Mx.new Mx.nullTracers (protocols InitiatorDirectionOnly)
void $ forkIO $
do awaitResult1 <-
Expand All @@ -349,28 +384,184 @@ clientWorkerBursty bearer = do
(MiniProtocolNum 2)
InitiatorDirectionOnly
StartEagerly
(\chan -> runClientBurstBin (reqrespTracer "client:praos") chan clientReqResp)
(\chan -> runClientBurstBin (reqrespTracer "client:praos") chan
(clientReqResp praosStats praosMissing))
awaitResult2 <-
runMiniProtocol
mux
(MiniProtocolNum 3)
InitiatorDirectionOnly
StartEagerly
(\chan -> runClientBurstBin (reqrespTracer "client:leios") chan clientReqResp)
(\chan -> runClientBurstBin (reqrespTracer "client:leios") chan
(clientReqResp leiosStats leiosMissing))
-- wait for both mini-protocols to finish
results <- atomically $ (,) <$> awaitResult1
<*> awaitResult2
debugPutStrLn_ $ "client results: " ++ show results
reportStats "praos" praosStats praosMissing
reportStats "leios" leiosStats leiosMissing
`finally`
Mx.stop mux
Mx.run mux bearer
where
clientReqResp
:: ReqRespClientBurst ByteString ByteString IO Int
clientReqResp = SendMsgReqBurst (BSC.replicate 10 '\NUL') (go 0)
:: TVar LatencyStats
-> TVar Int
-> ReqRespClientBurst ByteString ByteString IO Int
clientReqResp statsVar missingVar = SendMsgReqBurst (BSC.replicate 10 '\NUL') (go 0)
where
go :: Int -> ReqRespClientLoop ByteString IO Int
go !count =
AwaitResp { handleMsgDone = pure count
, handleMsgResp = \(!_) -> pure (go (count + 1))
, handleMsgResp = \(!rsp) -> do
recordLatency statsVar missingVar rsp
return $ go (count + 1)
}

newLatencyStats :: LatencyStats
newLatencyStats =
LatencyStats {
lsCount = 0
, lsDigest = tdigest []
, lsRespBytes = 0
, lsFirstRespTsUs = 0
, lsLastRxUs = 0
}

updateLatencyStats :: Word64 -> Word64 -> Word64 -> Word64 -> LatencyStats -> LatencyStats
updateLatencyStats us bytes respTs nowUs LatencyStats{lsCount, lsDigest, lsRespBytes, lsFirstRespTsUs} =
let !lsCount' = lsCount + 1
!lsDigest' = insert (fromIntegral us :: Double) lsDigest
!lsRespBytes' =
if lsRespBytes <= 0
then bytes
else lsRespBytes
!lsFirstRespTsUs' =
if lsFirstRespTsUs <= 0
then respTs
else lsFirstRespTsUs
in
LatencyStats {
lsCount = lsCount'
, lsDigest = lsDigest'
, lsRespBytes = lsRespBytes'
, lsFirstRespTsUs = lsFirstRespTsUs'
, lsLastRxUs = nowUs
}

timestampSize :: Int
timestampSize = 8

getWallClockTimeUs :: IO Word64
getWallClockTimeUs = do
now <- getCurrentTime
let ns = floor (utcTimeToPOSIXSeconds now * 1_000_000) :: Integer
return $ fromInteger ns

encodeTimeStamp :: Word64 -> ByteString
encodeTimeStamp ts =
BL.toStrict (Bin.runPut (Bin.putWord64be ts))

extractTimestamp :: ByteString -> Maybe Word64
extractTimestamp bs
| BS.length bs < timestampSize = Nothing
| otherwise =
Just $ Bin.runGet Bin.getWord64be
(BL.fromStrict (BS.take timestampSize bs))

makeTimestampedPayload :: Int -> Char -> IO ByteString
makeTimestampedPayload len c
| len < timestampSize = pure (BSC.replicate len c)
| otherwise = do
ts <- getWallClockTimeUs
let tsBytes = encodeTimeStamp ts
return $ tsBytes <> BSC.replicate (len - timestampSize) c

updateLastRx :: Word64 -> LatencyStats -> LatencyStats
updateLastRx nowUs stats = stats { lsLastRxUs = nowUs }

recordLatency :: TVar LatencyStats -> TVar Int -> ByteString -> IO ()
recordLatency statsVar missingVar rsp = do
now <- getWallClockTimeUs
case extractTimestamp rsp of
Nothing -> atomically $ do
modifyTVar' missingVar succ
modifyTVar' statsVar (updateLastRx now)
Just ts -> do
let latency = max 0 $ now - ts
bytes = fromIntegral (BS.length rsp)
atomically $ modifyTVar' statsVar (updateLatencyStats latency bytes ts now)

reportStats :: String -> TVar LatencyStats -> TVar Int -> IO ()
reportStats label statsVar missingVar = do
(stats, missing) <- atomically $ (,) <$> readTVar statsVar <*> readTVar missingVar
let count = lsCount stats
respBytes = lsRespBytes stats
firstRespTsUs = lsFirstRespTsUs stats
lastRxUs = lsLastRxUs stats
durationUs :: Word64
durationUs =
if firstRespTsUs > 0 && lastRxUs > firstRespTsUs
then lastRxUs - firstRespTsUs
else 0
durationMs :: Double
durationMs = fromIntegral durationUs / 1000
overallMbps :: Double
overallMbps =
if durationUs > 0 && respBytes > 0 && count > 0
then (fromIntegral respBytes * fromIntegral count * 8) / fromIntegral durationUs
else 0
bitsPerResp :: Double
bitsPerResp = fromIntegral (respBytes * 8)
speedFromUs :: Real a => a -> Double
speedFromUs us =
let usD = realToFrac us :: Double in
if usD <= 0 || respBytes == 0
then 0
else bitsPerResp / usD
if count <= 0
then do
printf "%s latency: count=0 missing=%d\n" label missing
printf "%s speed: count=0 min=0.000Mbps mean=0.000Mbps median=0.000Mbps p90=0.000Mbps p95=0.000Mbps p99=0.000Mbps max=0.000Mbps stddev=0.000Mbps overall=%.3fMbps duration=%.3fms\n"
label overallMbps durationMs
else do
let td = lsDigest stats
minUs = minimumValue td
maxUs = maximumValue td
stddevUs = case stddev td of
Nothing -> 0
Just s -> s
meanUs = case mean td of
Nothing -> 0
Just m -> m
medianUs = quantileUs 0.5 td
p90Us = quantileUs 0.9 td
p95Us = quantileUs 0.95 td
p99Us = quantileUs 0.99 td
minMs = toMs minUs
maxMs = toMs maxUs
stddevMs = toMs stddevUs
meanMs = toMs meanUs
medianMs = toMs medianUs
p90Ms = toMs p90Us
p95Ms = toMs p95Us
p99Ms = toMs p99Us
minSp = speedFromUs maxUs
maxSp = speedFromUs minUs
stddevSp = speedFromUs stddevUs
meanSp = speedFromUs meanUs
medianSp = speedFromUs medianUs
p90Sp = speedFromUs p90Us
p95Sp = speedFromUs p95Us
p99Sp = speedFromUs p99Us
printf "%s latency: count=%d min=%.3fms mean=%.3fms median=%.3fms p90=%.3fms p95=%.3fms p99=%.3fms max=%.3fms stddev=%.3fms missing=%d\n"
label count minMs meanMs medianMs p90Ms p95Ms p99Ms maxMs stddevMs missing
printf "%s speed: count=%d min=%.3fMbps mean=%.3fMbps median=%.3fMbps p90=%.3fMbps p95=%.3fMbps p99=%.3fMbps max=%.3fMbps stddev=%.3fMbps\n"
label count minSp meanSp medianSp p90Sp p95Sp p99Sp maxSp stddevSp
printf "%s: overall=%.3fMbps duration=%.3fs\n"
label overallMbps (durationMs / 1000)
where
toMs us = us / 1000
quantileUs q td = case quantile q td of
Nothing -> 0
Just w -> w
8 changes: 6 additions & 2 deletions network-mux/demo/mux-leios-demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ LEIOS_BLOCK_SIZE=$((($MUX_SDU - 5) * 1000))
PRAOS_BLOCK_SIZE=$((($MUX_SDU - 5) * 10))

# client's configuration
NUM_LEIOS_REQUESTS=20
NUM_LEIOS_REQUESTS=10
NUM_PRAOS_REQUESTS=1000 # $(($NUM_LEIOS_REQUESTS * $LEIOS_BLOCK_SIZE / $PRAOS_BLOCK_SIZE))

# network shaping parameters
Expand Down Expand Up @@ -41,6 +41,7 @@ cleanup_netns() {
# kill server and tcpdump
sudo kill -9 $pid
done
sudo ip netns delete ns$i
done
# for i in 1 2 3; do sudo ip netns del ns$i; done

Expand Down Expand Up @@ -163,7 +164,10 @@ setup_bridge() {

setup_bridge

cabal build exe:mux-leios-demo
if ! cabal build exe:mux-leios-demo; then
echo "cabal build failed; exiting." >&2
exit 1
fi
CMD=$(cabal list-bin exe:mux-leios-demo)

# For debuging throuput shaping
Expand Down
4 changes: 3 additions & 1 deletion network-mux/network-mux.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,15 @@ executable mux-leios-demo
bytestring,
cborg,
contra-tracer,
io-classes,
io-classes:{io-classes, si-timers},
iproute,
network,
network-mux,
primitive,
serialise,
stm,
tdigest,
time,

ghc-options:
-rtsopts
Expand Down
Loading