Skip to content

Commit 6e19ab1

Browse files
authored
Merge pull request #27 from IntersectMBO/coot/peer-selection-policy
PeerSelectionPolicy for dmq-node
2 parents fe5822a + 363cb28 commit 6e19ab1

9 files changed

Lines changed: 217 additions & 81 deletions

File tree

cabal.project

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ source-repository-package
5858
source-repository-package
5959
type: git
6060
location: https://github.com/IntersectMBO/ouroboros-network
61-
tag: 3c4433d05ec012af6d1a26e6b5e86665627c08c4
62-
--sha256: sha256-Jemp6PlzISA+l1wdXV6MrIxaBpAxdrLLAlbkB7ZqF2Y=
61+
-- from coot/dmq-related-changes
62+
tag: 625296c92363b8c5e77cddee40de4525421d2660
63+
--sha256: sha256-WRbKqNimAsYtgj/r3SJ0IT6z7+Q3XZf3p89BM9w6bF8=
6364
subdir:
6465
acts-generic
6566
cardano-diffusion

dmq-node/app/Main.hs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
module Main where
1212

1313
import Control.Concurrent.Class.MonadSTM.Strict
14+
import Control.Concurrent.Class.MonadMVar
1415
import Control.Monad (void, when)
1516
import Control.Monad.Class.MonadThrow
1617
import Control.Tracer (Tracer (..), nullTracer, traceWith)
@@ -33,8 +34,6 @@ import System.IOManager (withIOManager)
3334

3435
import Cardano.Git.Rev (gitRev)
3536
import Cardano.KESAgent.Protocols.StandardCrypto (StandardCrypto)
36-
import Cardano.Ledger.Keys (VKey (..))
37-
import Cardano.Ledger.Hashes (hashKey)
3837

3938
import DMQ.Configuration
4039
import DMQ.Configuration.CLIOptions (parseCLIOptions)
@@ -93,8 +92,13 @@ runDMQ commandLineConfig = do
9392
} = config' <> commandLineConfig
9493
`act`
9594
defaultConfiguration
96-
let tracer :: ToJSON ev => Tracer IO (WithEventType ev)
97-
tracer = dmqTracer prettyLog
95+
96+
lock <- newMVar ()
97+
let tracer', tracer :: ToJSON ev => Tracer IO (WithEventType ev)
98+
tracer' = dmqTracer prettyLog
99+
-- use a lock to prevent writing two lines at the same time
100+
-- TODO: this won't be needed with `cardano-tracer` integration
101+
tracer = Tracer $ \a -> withMVar lock $ \_ -> traceWith tracer' a
98102

99103
when version $ do
100104
let gitrev = $(gitRev)
@@ -119,6 +123,7 @@ runDMQ commandLineConfig = do
119123

120124
stdGen <- newStdGen
121125
let (psRng, policyRng) = split stdGen
126+
policyRngVar <- newTVarIO policyRng
122127

123128
-- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
124129
withIOManager \iocp -> do
@@ -149,7 +154,7 @@ runDMQ commandLineConfig = do
149154
Mempool.getWriter SigDuplicate
150155
sigId
151156
(\now sigs ->
152-
withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs)
157+
withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs)
153158
)
154159
(traverse_ $ \(sigid, reason) -> do
155160
traceWith ntnValidationTracer $ InvalidSignature sigid reason
@@ -183,7 +188,7 @@ runDMQ commandLineConfig = do
183188
Mempool.getWriter SigDuplicate
184189
sigId
185190
(\now sigs ->
186-
withPoolValidationCtx (stakePools nodeKernel) (validateSig (hashKey . VKey) now sigs)
191+
withPoolValidationCtx (stakePools nodeKernel) (validateSig now sigs)
187192
)
188193
(traverse_ $ \(sigid, reason) ->
189194
traceWith ntcValidationTracer $ InvalidSignature sigid reason
@@ -212,7 +217,7 @@ runDMQ commandLineConfig = do
212217
dmqLimitsAndTimeouts
213218
dmqNtNApps
214219
dmqNtCApps
215-
(policy policyRng)
220+
(policy policyRngVar)
216221

217222
Diffusion.run dmqDiffusionArguments
218223
(dmqDiffusionTracers dmqConfig tracer)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
### Breaking
2+
3+
- `validateSig`: removed the hashing function for cold key from arguments, added required constraints ledger's `hashKey . VKey` usage instead
4+
5+
### Non-Breaking
6+
7+
- Added a lock to avoid race conditions between trace events.
8+
- Improved peer selection policy.
9+

dmq-node/dmq-node.cabal

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ executable dmq-node
148148
base,
149149
bytestring,
150150
cardano-git-rev,
151-
cardano-ledger-core,
152151
contra-tracer >=0.1 && <0.3,
153152
dmq-node,
154153
io-classes:{io-classes, strict-stm},

dmq-node/src/DMQ/Configuration.hs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,35 +77,60 @@ import Ouroboros.Network.TxSubmission.Inbound.V2 (TxDecisionPolicy (..))
7777

7878
import DMQ.Configuration.Topology (NoExtraConfig (..), NoExtraFlags (..))
7979

80-
-- | Configuration comes in two flavours paramemtrised by `f` functor:
80+
-- | Configuration comes in two flavours depending on the `f` functor:
8181
-- `PartialConfig` is using `Last` and `Configuration` is using an identity
8282
-- functor `I`.
8383
--
84+
-- See `defaultConfiguration` for default values.
85+
--
8486
data Configuration' f =
8587
Configuration {
88+
-- | Path from which the `Configuration` is read.
89+
dmqcConfigFile :: f FilePath,
90+
91+
-- | Network magic for the DMQ network
92+
dmqcNetworkMagic :: f NetworkMagic,
93+
-- | Network magic for local connections to a cardano-node
94+
dmqcCardanoNetworkMagic :: f NetworkMagic,
95+
96+
-- | IPv4 address to bind to for `node-to-node` communication.
8697
dmqcIPv4 :: f (Maybe IPv4),
98+
-- | IPv6 address to bind to for `node-to-node` communication.
8799
dmqcIPv6 :: f (Maybe IPv6),
88-
dmqcLocalAddress :: f LocalAddress,
100+
-- | Port number for `node-to-node` DMQ communication.
89101
dmqcPortNumber :: f PortNumber,
90-
dmqcConfigFile :: f FilePath,
102+
-- | Local socket address for `node-to-client` DMQ communication.
103+
dmqcLocalAddress :: f LocalAddress,
104+
-- | Topology file path.
91105
dmqcTopologyFile :: f FilePath,
106+
-- | Path to the `cardano-node` socket.
107+
dmqcCardanoNodeSocket :: f FilePath,
108+
92109
dmqcAcceptedConnectionsLimit :: f AcceptedConnectionsLimit,
110+
-- | Diffusion mode for `node-to-node` communication.
93111
dmqcDiffusionMode :: f DiffusionMode,
112+
-- | Node-to-node inbound connection idle timeout.
113+
dmqcProtocolIdleTimeout :: f DiffTime,
114+
-- | Churn interval for peer selection.
115+
dmqcChurnInterval :: f DiffTime,
116+
-- | Peer sharing setting.
117+
dmqcPeerSharing :: f PeerSharing,
118+
119+
--
120+
-- Peer Selection Targets
121+
--
122+
94123
dmqcTargetOfRootPeers :: f Int,
95124
dmqcTargetOfKnownPeers :: f Int,
96125
dmqcTargetOfEstablishedPeers :: f Int,
97126
dmqcTargetOfActivePeers :: f Int,
98127
dmqcTargetOfKnownBigLedgerPeers :: f Int,
99128
dmqcTargetOfEstablishedBigLedgerPeers :: f Int,
100129
dmqcTargetOfActiveBigLedgerPeers :: f Int,
101-
dmqcProtocolIdleTimeout :: f DiffTime,
102-
dmqcChurnInterval :: f DiffTime,
103-
dmqcPeerSharing :: f PeerSharing,
104-
-- network magic for the DMQ network itself
105-
dmqcNetworkMagic :: f NetworkMagic,
106-
-- network magic for local connections to a cardano-node
107-
dmqcCardanoNetworkMagic :: f NetworkMagic,
108-
dmqcCardanoNodeSocket :: f FilePath,
130+
131+
--
132+
-- Tracers & logging
133+
--
109134
dmqcPrettyLog :: f Bool,
110135

111136
dmqcMuxTracer :: f Bool,
@@ -148,13 +173,18 @@ data Configuration' f =
148173
dmqcLocalMsgSubmissionServerProtocolTracer :: f Bool,
149174
dmqcLocalMsgNotificationServerProtocolTracer :: f Bool,
150175

176+
--
177+
-- Application tracers
178+
--
179+
151180
dmqcSigSubmissionLogicTracer :: f Bool,
152181
dmqcSigSubmissionOutboundTracer :: f Bool,
153182
dmqcSigSubmissionInboundTracer :: f Bool,
154183
dmqcLocalMsgSubmissionServerTracer :: f Bool,
155184
dmqcLocalMsgNotificationServerTracer :: f Bool,
156185
dmqcLocalStateQueryTracer :: f Bool,
157186

187+
-- | CLI only option to show version and exit.
158188
dmqcVersion :: f Bool
159189
}
160190
deriving Generic

dmq-node/src/DMQ/Diffusion/NodeKernel.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module DMQ.Diffusion.NodeKernel
66
, withNodeKernel
77
, PoolValidationCtx (..)
88
, StakePools (..)
9+
, PoolId
910
) where
1011

1112
import Control.Concurrent.Class.MonadMVar
@@ -33,8 +34,8 @@ import Data.Word
3334
import System.Random (StdGen)
3435
import System.Random qualified as Random
3536

36-
import Cardano.Ledger.Shelley.API hiding (I)
37-
import Ouroboros.Consensus.Shelley.Ledger.Query
37+
import Cardano.Ledger.Shelley.API qualified as Ledger
38+
import Ouroboros.Consensus.Shelley.Ledger.Query qualified as LedgerQuery
3839

3940
import Ouroboros.Network.BlockFetch (FetchClientRegistry,
4041
newFetchClientRegistry)
@@ -76,13 +77,13 @@ data NodeKernel crypto ntnAddr m =
7677

7778
-- | Cardano pool id's are hashes of the cold verification key
7879
--
79-
type PoolId = KeyHash StakePool
80+
type PoolId = Ledger.KeyHash Ledger.StakePool
8081

8182
data StakePools m = StakePools {
8283
-- | contains map of cardano pool stake snapshot obtained
8384
-- via local state query client
8485
stakePoolsVar
85-
:: !(StrictTVar m (Map PoolId StakeSnapshot))
86+
:: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot))
8687
-- | Acquire and update validation context for signature validation
8788
, withPoolValidationCtx
8889
:: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a
@@ -99,7 +100,7 @@ data PoolValidationCtx =
99100
PoolValidationCtx {
100101
vctxEpoch :: !(Maybe UTCTime)
101102
-- ^ UTC time of next epoch boundary for handling clock skew
102-
, vctxStakeMap :: !(Map PoolId StakeSnapshot)
103+
, vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot)
103104
-- ^ for signature validation
104105
, vctxOcertMap :: !(Map PoolId Word64)
105106
-- ^ ocert counters to check monotonicity
Lines changed: 107 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,116 @@
11
module DMQ.Diffusion.PeerSelection where
22

3-
import Data.Set (Set)
3+
import Control.Concurrent.Class.MonadSTM.Strict
4+
import Data.List (sortOn, unfoldr)
5+
import Data.Map.Strict qualified as Map
46
import Data.Set qualified as Set
5-
import Network.Socket (SockAddr)
6-
import Ouroboros.Network.PeerSelection.Governor.Types
7-
import System.Random (Random (..), StdGen)
7+
import Data.Word (Word32)
8+
import Ouroboros.Network.PeerSelection
9+
import System.Random (Random (..), StdGen, split)
810

911
-- | Trivial peer selection policy used as dummy value
1012
--
11-
policy :: StdGen -> PeerSelectionPolicy SockAddr IO
12-
policy gen =
13+
policy :: forall peerAddr m.
14+
( MonadSTM m
15+
, Ord peerAddr
16+
)
17+
=> StrictTVar m StdGen
18+
-> PeerSelectionPolicy peerAddr m
19+
policy rngVar =
1320
PeerSelectionPolicy {
14-
policyPickKnownPeersForPeerShare = \_ _ _ -> pickTrivially
15-
, policyPickColdPeersToForget = \_ _ _ -> pickTrivially
16-
, policyPickColdPeersToPromote = \_ _ _ -> pickTrivially
17-
, policyPickWarmPeersToPromote = \_ _ _ -> pickTrivially
18-
, policyPickHotPeersToDemote = \_ _ _ -> pickTrivially
19-
, policyPickWarmPeersToDemote = \_ _ _ -> pickTrivially
20-
, policyPickInboundPeers = \_ _ _ -> pickTrivially
21-
, policyFindPublicRootTimeout = 5
22-
, policyMaxInProgressPeerShareReqs = 0
23-
, policyPeerShareRetryTime = 0 -- seconds
24-
, policyPeerShareBatchWaitTime = 0 -- seconds
25-
, policyPeerShareOverallTimeout = 0 -- seconds
26-
, policyPeerShareActivationDelay = 2 -- seconds
21+
policyPickKnownPeersForPeerShare = simplePromotionPolicy,
22+
policyPickColdPeersToPromote = simplePromotionPolicy,
23+
policyPickWarmPeersToPromote = simplePromotionPolicy,
24+
policyPickInboundPeers = simplePromotionPolicy,
25+
26+
policyPickHotPeersToDemote = hotDemotionPolicy,
27+
policyPickWarmPeersToDemote = warmDemotionPolicy,
28+
policyPickColdPeersToForget = coldForgetPolicy,
29+
30+
policyFindPublicRootTimeout = 5,
31+
policyMaxInProgressPeerShareReqs = 0,
32+
policyPeerShareRetryTime = 0, -- seconds
33+
policyPeerShareBatchWaitTime = 0, -- seconds
34+
policyPeerShareOverallTimeout = 0, -- seconds
35+
policyPeerShareActivationDelay = 2 -- seconds
2736
}
2837
where
29-
pickTrivially :: Applicative m => Set SockAddr -> Int -> m (Set SockAddr)
30-
pickTrivially set n = pure
31-
. fst
32-
$ go gen (Set.toList set) n []
33-
where
34-
go g _ 0 acc = (Set.fromList acc, g)
35-
go g [] _ acc = (Set.fromList acc, g)
36-
go g xs k acc =
37-
let (idx, g') = randomR (0, length xs - 1) g
38-
picked = xs !! idx
39-
xs' = take idx xs ++ drop (idx + 1) xs
40-
in go g' xs' (k - 1) (picked : acc)
38+
hotDemotionPolicy :: PickPolicy peerAddr (STM m)
39+
hotDemotionPolicy _ _ _ available pickNum = do
40+
available' <- addRand rngVar available (,)
41+
return $ Set.fromList
42+
. map fst
43+
. take pickNum
44+
. sortOn snd
45+
. Map.assocs
46+
$ available'
47+
48+
-- Randomly pick peers to demote, peers with knownPeerTepid set are twice
49+
-- as likely to be demoted.
50+
warmDemotionPolicy :: PickPolicy peerAddr (STM m)
51+
warmDemotionPolicy _ _ isTepid available pickNum = do
52+
available' <- addRand rngVar available (tepidWeight isTepid)
53+
return $ Set.fromList
54+
. map fst
55+
. take pickNum
56+
. sortOn snd
57+
. Map.assocs
58+
$ available'
59+
60+
simplePromotionPolicy :: PickPolicy peerAddr (STM m)
61+
simplePromotionPolicy _ _ _ available pickNum = do
62+
available' <- addRand rngVar available (,)
63+
return $ Set.fromList
64+
. map fst
65+
. take pickNum
66+
. sortOn snd
67+
. Map.assocs
68+
$ available'
69+
70+
-- Randomly pick peers to forget, peers with failures are more likely to
71+
-- be forgotten.
72+
coldForgetPolicy :: PickPolicy peerAddr (STM m)
73+
coldForgetPolicy _ failCnt _ available pickNum = do
74+
available' <- addRand rngVar available (failWeight failCnt)
75+
return $ Set.fromList
76+
. map fst
77+
. take pickNum
78+
. sortOn snd
79+
. Map.assocs
80+
$ available'
81+
82+
-- Failures lowers r
83+
failWeight :: (peerAddr -> Int)
84+
-> peerAddr
85+
-> Word32
86+
-> (peerAddr, Word32)
87+
failWeight failCnt peer r =
88+
(peer, r `div` fromIntegral (failCnt peer + 1))
89+
90+
-- Tepid flag cuts r in half
91+
tepidWeight :: (peerAddr -> Bool)
92+
-> peerAddr
93+
-> Word32
94+
-> (peerAddr, Word32)
95+
tepidWeight isTepid peer r =
96+
if isTepid peer then (peer, r `div` 2)
97+
else (peer, r)
98+
99+
100+
-- Add scaled random number in order to prevent ordering based on SockAddr
101+
addRand :: ( MonadSTM m
102+
, Ord peerAddr
103+
)
104+
=> StrictTVar m StdGen
105+
-> Set.Set peerAddr
106+
-> (peerAddr -> Word32 -> (peerAddr, Word32))
107+
-> STM m (Map.Map peerAddr Word32)
108+
addRand rngVar available scaleFn = do
109+
inRng <- readTVar rngVar
110+
111+
let (rng, rng') = split inRng
112+
rns = take (Set.size available) $ unfoldr (Just . random) rng :: [Word32]
113+
available' = Map.fromList $ zipWith scaleFn (Set.toList available) rns
114+
writeTVar rngVar rng'
115+
return available'
116+

0 commit comments

Comments
 (0)