Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ index-state:
, hackage.haskell.org 2026-02-17T10:15:41Z

-- Bump this if you need newer packages from CHaP
, cardano-haskell-packages 2026-03-11T15:05:51Z
, cardano-haskell-packages 2026-03-17T12:33:13Z

packages:
./dmq-node
Expand All @@ -33,21 +33,17 @@ if(os(windows))
constraints:
bitvec -simd

-- kes-agent is not yet in CHaP, so we pull it from its GitHub repo
source-repository-package
type: git
location: https://github.com/input-output-hk/kes-agent
tag: 7aedefc0077ffaf93bf46508c1fe0d7efdead227
--sha256: sha256-ZcB0pPex349uH1W7ha5ez7Kx/XadK00j5wRm7+vyX0s=
subdir:
kes-agent
kes-agent-crypto

source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-consensus
tag: 02c68cf3b5133dc6ccf5f42e251766540e511396
--sha256: sha256-C7H3y9nb3XGhIO5GiO1IuHvhkV7f2qnNglkImMgw1W8=
location: https://github.com/IntersectMBO/ouroboros-network
tag: 58f3ffb5abf74cd105f7c10593e01205dced7a9e
--sha256: sha256-x0feBtpO3zsvvq8Ocu5YEJFuRsVS/ptnLMEhXCAG+WI=
subdir:
acts-generic
cardano-diffusion
ouroboros-network
network-mux

if impl(ghc >= 9.12.0)
allow-newer: *:time,
Expand Down
19 changes: 4 additions & 15 deletions dmq-node/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..))
import DMQ.Tracer

import DMQ.Diffusion.PeerSelection (policy)
import DMQ.NodeToClient.LocalStateQueryClient
import DMQ.Protocol.SigSubmission.Validate
import Ouroboros.Network.Diffusion qualified as Diffusion
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
Expand Down Expand Up @@ -89,9 +88,7 @@ runDMQ commandLineConfig = do
dmqcValidationTracer = I validationTracer,
dmqcLocalHandshakeTracer = I localHandshakeTracer,
dmqcCardanoNodeSocket = I socketPath,
dmqcVersion = I version,
dmqcLocalStateQueryTracer = I localStateQueryTracer,
dmqcLedgerPeers = I ledgerPeers
dmqcVersion = I version
} = config' <> commandLineConfig
`act`
defaultConfiguration
Expand Down Expand Up @@ -133,20 +130,12 @@ runDMQ commandLineConfig = do

-- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port.
withIOManager \iocp -> do
let localSnocket' = localSnocket iocp
mkStakePoolMonitor = connectToCardanoNode
(if localStateQueryTracer
then WithEventType "LocalStateQuery" >$< tracer
else nullTracer)
ledgerPeers
localSnocket'
socketPath

withNodeKernel @StandardCrypto
(localSnocket iocp)
makeLocalBearer
tracer
dmqConfig
psRng
mkStakePoolMonitor $ \nodeKernel -> do
psRng $ \nodeKernel -> do
dmqDiffusionConfiguration <-
mkDiffusionConfiguration dmqConfig nt nodeKernel.stakePools.ledgerBigPeersVar

Expand Down
6 changes: 4 additions & 2 deletions dmq-node/dmq-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ library
DMQ.Diffusion.Applications
DMQ.Diffusion.Arguments
DMQ.Diffusion.NodeKernel
DMQ.Diffusion.NodeKernel.Types
DMQ.Diffusion.PeerSelection
DMQ.Handlers.TopLevel
DMQ.NodeToClient
Expand Down Expand Up @@ -109,6 +110,7 @@ library
cardano-ledger-byron,
cardano-ledger-core,
cardano-ledger-shelley,
cardano-protocol-tpraos,
cardano-slotting,
cardano-strict-containers,
cborg >=0.2.1 && <0.3,
Expand All @@ -120,14 +122,14 @@ library
hashable >=1.0 && <1.6,
io-classes:{io-classes, si-timers, strict-mvar, strict-stm} ^>=1.8.0.1,
iproute ^>=1.7.15,
kes-agent-crypto ^>=1.0,
kes-agent-crypto ^>=1.1.0.0,
mtl,
network ^>=3.2.7,
network-mux ^>=0.10,
nothunks,
optparse-applicative >=0.18 && <0.20,
ouroboros-consensus:{ouroboros-consensus, cardano, diffusion},
ouroboros-network:{ouroboros-network, api, framework, orphan-instances, protocols} ^>=1.0.0.0,
ouroboros-network:{ouroboros-network, api, framework, orphan-instances, protocols} ^>=1.1.0.0,
quiet,
random ^>=1.3,
singletons,
Expand Down
205 changes: 127 additions & 78 deletions dmq-node/src/DMQ/Diffusion/NodeKernel.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}

module DMQ.Diffusion.NodeKernel
( NodeKernel (..)
Expand All @@ -10,106 +12,73 @@ module DMQ.Diffusion.NodeKernel
, PoolId
) where

import Control.Applicative (Alternative)
import Control.Concurrent.Class.MonadMVar
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadST
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, nullTracer)
import Control.Tracer (Tracer (..), nullTracer)

import Data.Aeson qualified as Aeson
import Data.Function (on)
import Data.Functor.Contravariant ((>$<))
import Data.Hashable
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Proxy
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time.Clock.POSIX (POSIXTime)
import Data.Time.Clock.POSIX qualified as Time
import Data.Void (Void)
import Data.Word
import Data.Void (Void, absurd)
import System.Random (StdGen)
import System.Random qualified as Random

import Cardano.Ledger.Api.State.Query qualified as LedgerQuery
import Cardano.Ledger.Shelley.API qualified as Ledger
import Network.Mux qualified as Mx

import Ouroboros.Network.BlockFetch (FetchClientRegistry,
newFetchClientRegistry)
import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Ouroboros.Network.Magic (NetworkMagic (..))
import Cardano.Chain.Slotting (EpochSlots (..))
import Cardano.Network.NodeToClient qualified as Cardano.NtoC
import Cardano.Protocol.Crypto qualified as Cardano (StandardCrypto)

import Ouroboros.Consensus.Cardano.Node
import Ouroboros.Consensus.Network.NodeToClient
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.ProtocolInfo
import Ouroboros.Network.BlockFetch (newFetchClientRegistry)
import Ouroboros.Network.Mux qualified as Mx
import Ouroboros.Network.PeerSelection.Governor.Types
(makePublicPeerSelectionStateVar)
import Ouroboros.Network.PeerSelection.LedgerPeers (SomeLedgerPeerSnapshot)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerPeerSnapshot,
LedgerPeersKind (..))
import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry,
newPeerSharingAPI, newPeerSharingRegistry,
import Ouroboros.Network.PeerSharing (newPeerSharingAPI, newPeerSharingRegistry,
ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME)
import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec,
noTimeLimitsHandshake)
import Ouroboros.Network.Protocol.LocalStateQuery.Client
import Ouroboros.Network.Protocol.LocalStateQuery.Type
import Ouroboros.Network.Snocket (Snocket, localAddressFromPath)
import Ouroboros.Network.Socket (ConnectToArgs (..),
HandshakeCallbacks (HandshakeCallbacks), connectToNode)
import Ouroboros.Network.TxSubmission.Inbound.V2
import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..),
MempoolSeq (..), WithIndex (..))
import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool

import Cardano.Network.NodeToClient qualified as Cardano
import DMQ.Configuration
import DMQ.Diffusion.NodeKernel.Types
import DMQ.NodeToClient.LocalStateQueryClient
import DMQ.Policy qualified as Policy
import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId)
import DMQ.Tracer
import Ouroboros.Consensus.Cardano.Block (CardanoBlock)
import Ouroboros.Network.Handshake.Queryable (Queryable (..))
import Ouroboros.Network.Protocol.Handshake (Acceptable (..))


data NodeKernel crypto ntnAddr m =
NodeKernel {
-- | The fetch client registry, used for the keep alive clients.
fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m)

-- | Read the current peer sharing registry, used for interacting with
-- the PeerSharing protocol
, peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m)
, peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m)
, mempool :: !(Mempool m SigId (Sig crypto))
, sigChannelVar :: !(TxChannelsVar m ntnAddr SigId (Sig crypto))
, sigMempoolSem :: !(TxMempoolSem m)
, sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto))
, stakePools :: !(StakePools m)
, nextEpochVar :: !(StrictTVar m (Maybe UTCTime))
}

-- | Cardano pool id's are hashes of the cold verification key
--
type PoolId = Ledger.KeyHash Ledger.StakePool

data StakePools m = StakePools {
-- | contains map of cardano pool stake snapshot obtained
-- via local state query client
stakePoolsVar
:: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot))
-- | Acquire and update validation context for signature validation
, withPoolValidationCtx
:: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a
-- | provides only those big peers which provide SRV endpoints
-- as otherwise those are cardano-nodes
, ledgerBigPeersVar
:: !(StrictTVar m (Maybe (LedgerPeerSnapshot BigLedgerPeers)))
-- | all ledger peers, restricted to srv endpoints
, ledgerPeersVar
:: !(StrictTMVar m (LedgerPeerSnapshot AllLedgerPeers))
}

data PoolValidationCtx =
PoolValidationCtx {
vctxEpoch :: !(Maybe UTCTime)
-- ^ UTC time of next epoch boundary for handling clock skew
, vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot)
-- ^ for signature validation
, vctxOcertMap :: !(Map PoolId Word64)
-- ^ ocert counters to check monotonicity
}
deriving Show

newNodeKernel :: forall crypto ntnAddr m.
( MonadLabelledSTM m
, MonadMVar m
Expand Down Expand Up @@ -172,32 +141,44 @@ newNodeKernel rng = do


withNodeKernel :: forall crypto ntnAddr m a.
( MonadAsync m
, MonadFork m
, MonadDelay m
, MonadLabelledSTM m
, MonadMask m
, MonadMVar m
, MonadTime m
( Alternative (STM m)
, MonadAsync m
, MonadEvaluate m
, MonadFork m
, MonadDelay m
, MonadLabelledSTM m
, MonadMask m
, MonadMVar m
, Mx.MonadReadBuffer m
, MonadST m
, MonadThrow (STM m)
, MonadTime m
, MonadTimer m
, Ord ntnAddr
, Show ntnAddr
, Hashable ntnAddr
)
=> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
=> Snocket m Cardano.LocalSocket LocalAddress
-> Mx.MakeBearer m Cardano.LocalSocket
-> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev))
-> Configuration
-> StdGen
-> (NetworkMagic -> NodeKernel crypto ntnAddr m -> m (Either SomeException Void))
-> (NodeKernel crypto ntnAddr m -> m a)
-- ^ as soon as the callback exits the `mempoolWorker` and all
-- decision logic threads will be killed
-> m a
withNodeKernel tracer
withNodeKernel localSnocket
mkLocalBearer
tracer
Configuration {
dmqcSigSubmissionLogicTracer = I sigSubmissionLogicTracer,
dmqcCardanoNetworkMagic = I networkMagic
dmqcCardanoNetworkMagic = I networkMagic,
dmqcCardanoNodeSocket = I cardanoNodeSocketPath,
dmqcLocalStateQueryTracer = I localStateQueryTracer,
dmqcLedgerPeers = I ledgerPeers
}
rng
mkStakePoolMonitor k = do
k = do
nodeKernel@NodeKernel { mempool,
sigChannelVar,
sigSharedTxStateVar
Expand All @@ -214,11 +195,79 @@ withNodeKernel tracer
sigChannelVar
sigSharedTxStateVar)
$ \sigLogicThread ->
withAsync (mkStakePoolMonitor networkMagic nodeKernel) \spmAid -> do
withAsync (connectToCardanoNode nodeKernel) \spmAid -> do
link mempoolThread
link sigLogicThread
link spmAid
k nodeKernel
where
connectToCardanoNode :: NodeKernel crypto ntnAddr m
-> m (Either SomeException Void)
connectToCardanoNode nodeKernel =
fmap fn <$>
connectToNode
localSnocket
mkLocalBearer
ConnectToArgs {
ctaHandshakeCodec = Cardano.nodeToClientHandshakeCodec,
ctaHandshakeTimeLimits = noTimeLimitsHandshake,
ctaVersionDataCodec = cborTermVersionDataCodec Cardano.NtoC.nodeToClientCodecCBORTerm,
ctaConnectTracers = Cardano.nullNetworkConnectTracers, --debuggingNetworkConnectTracers,
ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion
}
(\_ -> return ())
(Cardano.combineVersions
[ Cardano.simpleSingletonVersions
version
Cardano.NodeToClientVersionData {
Cardano.networkMagic
, Cardano.query = False
}
\_version ->
Mx.OuroborosApplication
[ Mx.MiniProtocol
{ Mx.miniProtocolNum = Mx.MiniProtocolNum 7
, Mx.miniProtocolStart = Mx.StartEagerly
, Mx.miniProtocolLimits =
Mx.MiniProtocolLimits
{ Mx.maximumIngressQueue = 0xffffffff
}
, Mx.miniProtocolRun =
Mx.InitiatorProtocolOnly
. Mx.mkMiniProtocolCbFromPeerSt
. const
$ ( nullTracer -- TODO: add tracer
, cStateQueryCodec
, StateIdle
, localStateQueryClientPeer $
cardanoLocalStateQueryClient
(if localStateQueryTracer
then WithEventType "LocalStateQuery" >$< tracer
else nullTracer)
ledgerPeers
(stakePools nodeKernel)
(nextEpochVar nodeKernel)
)
}
]
| version <- [minBound..maxBound]
, let -- NOTE: the query protocol is running using
-- `Cardano.StandardCrypto`, while `dmq-node` is using
-- `StandardCrypto` defined in `kes-agent-krypto`. A priori
-- cryptography could differ but it shouldn't be a problem. We
-- are querying
supportedVersionMap =
supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock Cardano.StandardCrypto))
blk = supportedVersionMap Map.! version
Codecs {cStateQueryCodec} =
clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600)
blk version
])
Nothing
(localAddressFromPath cardanoNodeSocketPath)
where
fn :: forall x. Either x Void -> x
fn = either id absurd


mempoolWorker :: forall crypto m.
Expand Down
Loading
Loading