diff --git a/cabal.project b/cabal.project index e7af55a..99503e2 100644 --- a/cabal.project +++ b/cabal.project @@ -18,7 +18,7 @@ index-state: , hackage.haskell.org 2026-02-17T10:15:41Z -- Bump this if you need newer packages from CHaP - , cardano-haskell-packages 2026-03-11T15:05:51Z + , cardano-haskell-packages 2026-03-17T12:33:13Z packages: ./dmq-node @@ -33,21 +33,17 @@ if(os(windows)) constraints: bitvec -simd --- kes-agent is not yet in CHaP, so we pull it from its GitHub repo -source-repository-package - type: git - location: https://github.com/input-output-hk/kes-agent - tag: 7aedefc0077ffaf93bf46508c1fe0d7efdead227 - --sha256: sha256-ZcB0pPex349uH1W7ha5ez7Kx/XadK00j5wRm7+vyX0s= - subdir: - kes-agent - kes-agent-crypto source-repository-package type: git - location: https://github.com/IntersectMBO/ouroboros-consensus - tag: 02c68cf3b5133dc6ccf5f42e251766540e511396 - --sha256: sha256-C7H3y9nb3XGhIO5GiO1IuHvhkV7f2qnNglkImMgw1W8= + location: https://github.com/IntersectMBO/ouroboros-network + tag: 58f3ffb5abf74cd105f7c10593e01205dced7a9e + --sha256: sha256-x0feBtpO3zsvvq8Ocu5YEJFuRsVS/ptnLMEhXCAG+WI= + subdir: + acts-generic + cardano-diffusion + ouroboros-network + network-mux if impl(ghc >= 9.12.0) allow-newer: *:time, diff --git a/dmq-node/app/Main.hs b/dmq-node/app/Main.hs index e3ff0a4..d3cc950 100644 --- a/dmq-node/app/Main.hs +++ b/dmq-node/app/Main.hs @@ -51,7 +51,6 @@ import DMQ.Protocol.SigSubmission.Type (Sig (..)) import DMQ.Tracer import DMQ.Diffusion.PeerSelection (policy) -import DMQ.NodeToClient.LocalStateQueryClient import DMQ.Protocol.SigSubmission.Validate import Ouroboros.Network.Diffusion qualified as Diffusion import Ouroboros.Network.PeerSelection.LedgerPeers.Type @@ -89,9 +88,7 @@ runDMQ commandLineConfig = do dmqcValidationTracer = I validationTracer, dmqcLocalHandshakeTracer = I localHandshakeTracer, dmqcCardanoNodeSocket = I socketPath, - dmqcVersion = I version, - dmqcLocalStateQueryTracer = I localStateQueryTracer, - dmqcLedgerPeers = I ledgerPeers + dmqcVersion = I version } = config' <> commandLineConfig `act` defaultConfiguration @@ -133,20 +130,12 @@ runDMQ commandLineConfig = do -- TODO: this might not work, since `ouroboros-network` creates its own IO Completion Port. withIOManager \iocp -> do - let localSnocket' = localSnocket iocp - mkStakePoolMonitor = connectToCardanoNode - (if localStateQueryTracer - then WithEventType "LocalStateQuery" >$< tracer - else nullTracer) - ledgerPeers - localSnocket' - socketPath - withNodeKernel @StandardCrypto + (localSnocket iocp) + makeLocalBearer tracer dmqConfig - psRng - mkStakePoolMonitor $ \nodeKernel -> do + psRng $ \nodeKernel -> do dmqDiffusionConfiguration <- mkDiffusionConfiguration dmqConfig nt nodeKernel.stakePools.ledgerBigPeersVar diff --git a/dmq-node/dmq-node.cabal b/dmq-node/dmq-node.cabal index 798b8c5..982c5f9 100644 --- a/dmq-node/dmq-node.cabal +++ b/dmq-node/dmq-node.cabal @@ -63,6 +63,7 @@ library DMQ.Diffusion.Applications DMQ.Diffusion.Arguments DMQ.Diffusion.NodeKernel + DMQ.Diffusion.NodeKernel.Types DMQ.Diffusion.PeerSelection DMQ.Handlers.TopLevel DMQ.NodeToClient @@ -109,6 +110,7 @@ library cardano-ledger-byron, cardano-ledger-core, cardano-ledger-shelley, + cardano-protocol-tpraos, cardano-slotting, cardano-strict-containers, cborg >=0.2.1 && <0.3, @@ -120,14 +122,14 @@ library hashable >=1.0 && <1.6, io-classes:{io-classes, si-timers, strict-mvar, strict-stm} ^>=1.8.0.1, iproute ^>=1.7.15, - kes-agent-crypto ^>=1.0, + kes-agent-crypto ^>=1.1.0.0, mtl, network ^>=3.2.7, network-mux ^>=0.10, nothunks, optparse-applicative >=0.18 && <0.20, ouroboros-consensus:{ouroboros-consensus, cardano, diffusion}, - ouroboros-network:{ouroboros-network, api, framework, orphan-instances, protocols} ^>=1.0.0.0, + ouroboros-network:{ouroboros-network, api, framework, orphan-instances, protocols} ^>=1.1.0.0, quiet, random ^>=1.3, singletons, diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs index 6480108..5d7479b 100644 --- a/dmq-node/src/DMQ/Diffusion/NodeKernel.hs +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel.hs @@ -1,5 +1,7 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} module DMQ.Diffusion.NodeKernel ( NodeKernel (..) @@ -10,106 +12,73 @@ module DMQ.Diffusion.NodeKernel , PoolId ) where +import Control.Applicative (Alternative) import Control.Concurrent.Class.MonadMVar import Control.Concurrent.Class.MonadSTM.Strict import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadST import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI -import Control.Tracer (Tracer, nullTracer) +import Control.Tracer (Tracer (..), nullTracer) import Data.Aeson qualified as Aeson import Data.Function (on) import Data.Functor.Contravariant ((>$<)) import Data.Hashable -import Data.Map.Strict (Map) import Data.Map.Strict qualified as Map +import Data.Proxy import Data.Sequence (Seq) import Data.Sequence qualified as Seq import Data.Set (Set) import Data.Set qualified as Set import Data.Time.Clock.POSIX (POSIXTime) import Data.Time.Clock.POSIX qualified as Time -import Data.Void (Void) -import Data.Word +import Data.Void (Void, absurd) import System.Random (StdGen) import System.Random qualified as Random -import Cardano.Ledger.Api.State.Query qualified as LedgerQuery -import Cardano.Ledger.Shelley.API qualified as Ledger +import Network.Mux qualified as Mx -import Ouroboros.Network.BlockFetch (FetchClientRegistry, - newFetchClientRegistry) -import Ouroboros.Network.ConnectionId (ConnectionId (..)) -import Ouroboros.Network.Magic (NetworkMagic (..)) +import Cardano.Chain.Slotting (EpochSlots (..)) +import Cardano.Network.NodeToClient qualified as Cardano.NtoC +import Cardano.Protocol.Crypto qualified as Cardano (StandardCrypto) + +import Ouroboros.Consensus.Cardano.Node +import Ouroboros.Consensus.Network.NodeToClient +import Ouroboros.Consensus.Node.NetworkProtocolVersion +import Ouroboros.Consensus.Node.ProtocolInfo +import Ouroboros.Network.BlockFetch (newFetchClientRegistry) +import Ouroboros.Network.Mux qualified as Mx import Ouroboros.Network.PeerSelection.Governor.Types (makePublicPeerSelectionStateVar) import Ouroboros.Network.PeerSelection.LedgerPeers (SomeLedgerPeerSnapshot) -import Ouroboros.Network.PeerSelection.LedgerPeers.Type (LedgerPeerSnapshot, - LedgerPeersKind (..)) -import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry, - newPeerSharingAPI, newPeerSharingRegistry, +import Ouroboros.Network.PeerSharing (newPeerSharingAPI, newPeerSharingRegistry, ps_POLICY_PEER_SHARE_MAX_PEERS, ps_POLICY_PEER_SHARE_STICKY_TIME) +import Ouroboros.Network.Protocol.Handshake.Codec (cborTermVersionDataCodec, + noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.LocalStateQuery.Client +import Ouroboros.Network.Protocol.LocalStateQuery.Type +import Ouroboros.Network.Snocket (Snocket, localAddressFromPath) +import Ouroboros.Network.Socket (ConnectToArgs (..), + HandshakeCallbacks (HandshakeCallbacks), connectToNode) import Ouroboros.Network.TxSubmission.Inbound.V2 import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..), MempoolSeq (..), WithIndex (..)) import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool +import Cardano.Network.NodeToClient qualified as Cardano import DMQ.Configuration +import DMQ.Diffusion.NodeKernel.Types +import DMQ.NodeToClient.LocalStateQueryClient import DMQ.Policy qualified as Policy import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt, sigId), SigId) import DMQ.Tracer +import Ouroboros.Consensus.Cardano.Block (CardanoBlock) +import Ouroboros.Network.Handshake.Queryable (Queryable (..)) +import Ouroboros.Network.Protocol.Handshake (Acceptable (..)) -data NodeKernel crypto ntnAddr m = - NodeKernel { - -- | The fetch client registry, used for the keep alive clients. - fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m) - - -- | Read the current peer sharing registry, used for interacting with - -- the PeerSharing protocol - , peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m) - , peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m) - , mempool :: !(Mempool m SigId (Sig crypto)) - , sigChannelVar :: !(TxChannelsVar m ntnAddr SigId (Sig crypto)) - , sigMempoolSem :: !(TxMempoolSem m) - , sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto)) - , stakePools :: !(StakePools m) - , nextEpochVar :: !(StrictTVar m (Maybe UTCTime)) - } - --- | Cardano pool id's are hashes of the cold verification key --- -type PoolId = Ledger.KeyHash Ledger.StakePool - -data StakePools m = StakePools { - -- | contains map of cardano pool stake snapshot obtained - -- via local state query client - stakePoolsVar - :: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot)) - -- | Acquire and update validation context for signature validation - , withPoolValidationCtx - :: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a - -- | provides only those big peers which provide SRV endpoints - -- as otherwise those are cardano-nodes - , ledgerBigPeersVar - :: !(StrictTVar m (Maybe (LedgerPeerSnapshot BigLedgerPeers))) - -- | all ledger peers, restricted to srv endpoints - , ledgerPeersVar - :: !(StrictTMVar m (LedgerPeerSnapshot AllLedgerPeers)) - } - -data PoolValidationCtx = - PoolValidationCtx { - vctxEpoch :: !(Maybe UTCTime) - -- ^ UTC time of next epoch boundary for handling clock skew - , vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot) - -- ^ for signature validation - , vctxOcertMap :: !(Map PoolId Word64) - -- ^ ocert counters to check monotonicity - } - deriving Show - newNodeKernel :: forall crypto ntnAddr m. ( MonadLabelledSTM m , MonadMVar m @@ -172,32 +141,44 @@ newNodeKernel rng = do withNodeKernel :: forall crypto ntnAddr m a. - ( MonadAsync m - , MonadFork m - , MonadDelay m - , MonadLabelledSTM m - , MonadMask m - , MonadMVar m - , MonadTime m + ( Alternative (STM m) + , MonadAsync m + , MonadEvaluate m + , MonadFork m + , MonadDelay m + , MonadLabelledSTM m + , MonadMask m + , MonadMVar m + , Mx.MonadReadBuffer m + , MonadST m + , MonadThrow (STM m) + , MonadTime m + , MonadTimer m , Ord ntnAddr , Show ntnAddr , Hashable ntnAddr ) - => (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev)) + => Snocket m Cardano.LocalSocket LocalAddress + -> Mx.MakeBearer m Cardano.LocalSocket + -> (forall ev. Aeson.ToJSON ev => Tracer m (WithEventType ev)) -> Configuration -> StdGen - -> (NetworkMagic -> NodeKernel crypto ntnAddr m -> m (Either SomeException Void)) -> (NodeKernel crypto ntnAddr m -> m a) -- ^ as soon as the callback exits the `mempoolWorker` and all -- decision logic threads will be killed -> m a -withNodeKernel tracer +withNodeKernel localSnocket + mkLocalBearer + tracer Configuration { dmqcSigSubmissionLogicTracer = I sigSubmissionLogicTracer, - dmqcCardanoNetworkMagic = I networkMagic + dmqcCardanoNetworkMagic = I networkMagic, + dmqcCardanoNodeSocket = I cardanoNodeSocketPath, + dmqcLocalStateQueryTracer = I localStateQueryTracer, + dmqcLedgerPeers = I ledgerPeers } rng - mkStakePoolMonitor k = do + k = do nodeKernel@NodeKernel { mempool, sigChannelVar, sigSharedTxStateVar @@ -214,11 +195,79 @@ withNodeKernel tracer sigChannelVar sigSharedTxStateVar) $ \sigLogicThread -> - withAsync (mkStakePoolMonitor networkMagic nodeKernel) \spmAid -> do + withAsync (connectToCardanoNode nodeKernel) \spmAid -> do link mempoolThread link sigLogicThread link spmAid k nodeKernel + where + connectToCardanoNode :: NodeKernel crypto ntnAddr m + -> m (Either SomeException Void) + connectToCardanoNode nodeKernel = + fmap fn <$> + connectToNode + localSnocket + mkLocalBearer + ConnectToArgs { + ctaHandshakeCodec = Cardano.nodeToClientHandshakeCodec, + ctaHandshakeTimeLimits = noTimeLimitsHandshake, + ctaVersionDataCodec = cborTermVersionDataCodec Cardano.NtoC.nodeToClientCodecCBORTerm, + ctaConnectTracers = Cardano.nullNetworkConnectTracers, --debuggingNetworkConnectTracers, + ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion + } + (\_ -> return ()) + (Cardano.combineVersions + [ Cardano.simpleSingletonVersions + version + Cardano.NodeToClientVersionData { + Cardano.networkMagic + , Cardano.query = False + } + \_version -> + Mx.OuroborosApplication + [ Mx.MiniProtocol + { Mx.miniProtocolNum = Mx.MiniProtocolNum 7 + , Mx.miniProtocolStart = Mx.StartEagerly + , Mx.miniProtocolLimits = + Mx.MiniProtocolLimits + { Mx.maximumIngressQueue = 0xffffffff + } + , Mx.miniProtocolRun = + Mx.InitiatorProtocolOnly + . Mx.mkMiniProtocolCbFromPeerSt + . const + $ ( nullTracer -- TODO: add tracer + , cStateQueryCodec + , StateIdle + , localStateQueryClientPeer $ + cardanoLocalStateQueryClient + (if localStateQueryTracer + then WithEventType "LocalStateQuery" >$< tracer + else nullTracer) + ledgerPeers + (stakePools nodeKernel) + (nextEpochVar nodeKernel) + ) + } + ] + | version <- [minBound..maxBound] + , let -- NOTE: the query protocol is running using + -- `Cardano.StandardCrypto`, while `dmq-node` is using + -- `StandardCrypto` defined in `kes-agent-krypto`. A priori + -- cryptography could differ but it shouldn't be a problem. We + -- are querying + supportedVersionMap = + supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock Cardano.StandardCrypto)) + blk = supportedVersionMap Map.! version + Codecs {cStateQueryCodec} = + clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600) + blk version + ]) + Nothing + (localAddressFromPath cardanoNodeSocketPath) + where + fn :: forall x. Either x Void -> x + fn = either id absurd mempoolWorker :: forall crypto m. diff --git a/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs new file mode 100644 index 0000000..7b89919 --- /dev/null +++ b/dmq-node/src/DMQ/Diffusion/NodeKernel/Types.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE RankNTypes #-} + +module DMQ.Diffusion.NodeKernel.Types + ( NodeKernel (..) + , PoolId + , StakePools (..) + , PoolValidationCtx (..) + ) where + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Monad.Class.MonadTime.SI + + +import Data.Map.Strict (Map) +import Data.Word +import System.Random (StdGen) + +import Cardano.Ledger.Api.State.Query qualified as LedgerQuery +import Cardano.Ledger.Shelley.API qualified as Ledger + +import Ouroboros.Network.BlockFetch (FetchClientRegistry) +import Ouroboros.Network.ConnectionId (ConnectionId (..)) +import Ouroboros.Network.PeerSelection (LedgerPeerSnapshot, + LedgerPeersKind (..)) +import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry) +import Ouroboros.Network.TxSubmission.Inbound.V2 +import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (..)) + +import DMQ.Protocol.SigSubmission.Type (Sig (..), SigId) + + +data NodeKernel crypto ntnAddr m = + NodeKernel { + -- | The fetch client registry, used for the keep alive clients. + fetchClientRegistry :: !(FetchClientRegistry (ConnectionId ntnAddr) () () m) + + -- | Read the current peer sharing registry, used for interacting with + -- the PeerSharing protocol + , peerSharingRegistry :: !(PeerSharingRegistry ntnAddr m) + , peerSharingAPI :: !(PeerSharingAPI ntnAddr StdGen m) + , mempool :: !(Mempool m SigId (Sig crypto)) + , sigChannelVar :: !(TxChannelsVar m ntnAddr SigId (Sig crypto)) + , sigMempoolSem :: !(TxMempoolSem m) + , sigSharedTxStateVar :: !(SharedTxStateVar m ntnAddr SigId (Sig crypto)) + , stakePools :: !(StakePools m) + , nextEpochVar :: !(StrictTVar m (Maybe UTCTime)) + } + +-- | Cardano pool id's are hashes of the cold verification key +-- +type PoolId = Ledger.KeyHash Ledger.StakePool + +data StakePools m = StakePools { + -- | contains map of cardano pool stake snapshot obtained + -- via local state query client + stakePoolsVar + :: !(StrictTVar m (Map PoolId LedgerQuery.StakeSnapshot)) + -- | Acquire and update validation context for signature validation + , withPoolValidationCtx + :: forall a. (PoolValidationCtx -> (a, PoolValidationCtx)) -> STM m a + -- | provides only those big peers which provide SRV endpoints + -- as otherwise those are cardano-nodes + , ledgerBigPeersVar + :: !(StrictTVar m (Maybe (LedgerPeerSnapshot BigLedgerPeers))) + -- | all ledger peers, restricted to srv endpoints + , ledgerPeersVar + :: !(StrictTMVar m (LedgerPeerSnapshot AllLedgerPeers)) + } + +data PoolValidationCtx = + PoolValidationCtx { + vctxEpoch :: !(Maybe UTCTime) + -- ^ UTC time of next epoch boundary for handling clock skew + , vctxStakeMap :: !(Map PoolId LedgerQuery.StakeSnapshot) + -- ^ for signature validation + , vctxOcertMap :: !(Map PoolId Word64) + -- ^ ocert counters to check monotonicity + } + deriving Show + diff --git a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs index d1dff4e..f7c9b12 100644 --- a/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs +++ b/dmq-node/src/DMQ/NodeToClient/LocalStateQueryClient.hs @@ -5,8 +5,8 @@ module DMQ.NodeToClient.LocalStateQueryClient ( TraceLocalStateQueryClient (..) - , cardanoClient - , connectToCardanoNode + , CardanoLocalStateQueryClient + , cardanoLocalStateQueryClient ) where import Control.Concurrent.Class.MonadSTM.Strict @@ -15,42 +15,32 @@ import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime.SI import Control.Monad.Class.MonadTimer.SI import Control.Monad.Trans.Except -import Control.Tracer (Tracer (..), nullTracer, traceWith) +import Control.Tracer (Tracer (..), traceWith) import Data.Aeson (ToJSON (..), object, (.=)) import Data.Aeson qualified as Aeson import Data.Functor ((<&>)) import Data.List.NonEmpty qualified as NonEmpty -import Data.Map.Strict qualified as Map -import Data.Proxy import Data.Void -import Cardano.Chain.Slotting (EpochSlots (..)) import Cardano.Ledger.Api.State.Query (StakeSnapshots (..)) -import Cardano.Network.NodeToClient import Cardano.Network.PeerSelection (LedgerPeerSnapshot (..), LedgerRelayAccessPoint (..), SingLedgerPeersKind (..)) import Cardano.Slotting.EpochInfo.API import Cardano.Slotting.Slot (EpochNo) import Cardano.Slotting.Time -import DMQ.Diffusion.NodeKernel +import DMQ.Diffusion.NodeKernel.Types (StakePools (..)) + import Ouroboros.Consensus.Cardano.Block -import Ouroboros.Consensus.Cardano.Node import Ouroboros.Consensus.HardFork.Combinator.Ledger.Query import Ouroboros.Consensus.HardFork.History.EpochInfo (interpreterToEpochInfo) import Ouroboros.Consensus.HardFork.History.Qry (PastHorizonException) import Ouroboros.Consensus.Ledger.Query (Query (..)) -import Ouroboros.Consensus.Network.NodeToClient -import Ouroboros.Consensus.Node.NetworkProtocolVersion -import Ouroboros.Consensus.Node.ProtocolInfo import Ouroboros.Consensus.Shelley.Ledger.Query import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol () import Ouroboros.Network.Block -import Ouroboros.Network.Magic -import Ouroboros.Network.Mux qualified as Mx import Ouroboros.Network.PeerSelection.LedgerPeers (LedgerPeersKind (..), - accumulateBigLedgerStake) -import Ouroboros.Network.PeerSelection.LedgerPeers.Type (RawBlockHash) + RawBlockHash, accumulateBigLedgerStake) import Ouroboros.Network.Point (Block (..)) import Ouroboros.Network.Protocol.LocalStateQuery.Client import Ouroboros.Network.Protocol.LocalStateQuery.Type @@ -91,34 +81,65 @@ data QueryError = UnsupportedEra instance Exception QueryError where --- TODO generalize to handle ledger eras other than Conway --- | connects the dmq node to cardano node via local state query --- and updates the node kernel with stake pool data necessary to perform message --- validation -- -cardanoClient - :: forall block query point crypto m. (MonadDelay m, MonadSTM m, MonadThrow m, MonadTime m) - => (block ~ CardanoBlock crypto, query ~ Query block, point ~ Point block) +-- Type aliases +-- + +-- | `LocalStateQuery` using `CardanoBlock` +type CardanoLocalStateQueryClient crypto m a = + LocalStateQueryClient (CardanoBlock crypto) + (Point (CardanoBlock crypto)) + (Query (CardanoBlock crypto)) m Void + +-- | `ClientStAcuiring` using `CardanoBlock` +type CardanoClientStAcquiring crypto m a = + ClientStAcquiring (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a + +-- | `ClientStAcuired` using `CardanoBlock` +type CardanoClientStAcquired crypto m a = + ClientStAcquired (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a + +-- | `ClientStQuerying` using `CardanoBlock` +type CardanoClientStQuerying crypto m a b = + ClientStQuerying (CardanoBlock crypto) (Point (CardanoBlock crypto)) (Query (CardanoBlock crypto)) m a b + + +-- | Local state query client which queries cardano node for +-- +-- * stake pool data (for signature validation) +-- * ledger peers (for peer selection) +-- +-- TODO generalize to handle ledger eras other than Conway. +-- +cardanoLocalStateQueryClient + :: forall crypto m. + ( MonadDelay m + , MonadSTM m + , MonadThrow m + , MonadTime m + ) => Tracer m TraceLocalStateQueryClient -> Bool -- ^ use ledger peers -> StakePools m -> StrictTVar m (Maybe UTCTime) -- ^ from node kernel - -> LocalStateQueryClient (CardanoBlock crypto) (Point block) (Query block) m Void -cardanoClient tracer ledgerPeers - StakePools { - stakePoolsVar, - ledgerPeersVar, - ledgerBigPeersVar - } - nextEpochVar = - LocalStateQueryClient (idle Nothing) + -> CardanoLocalStateQueryClient crypto m Void +cardanoLocalStateQueryClient + tracer ledgerPeers + StakePools { + stakePoolsVar, + ledgerPeersVar, + ledgerBigPeersVar + } + nextEpochVar + = + LocalStateQueryClient (idle Nothing) where idle mSystemStart = do traceWith tracer $ Acquiring mSystemStart -- FIXME: switched to volatiletip for prerelease testing purposes pure $ SendMsgAcquire VolatileTip {-ImmutableTip-} acquire where - acquire :: ClientStAcquiring block point query m Void + acquire :: CardanoClientStAcquiring crypto m Void acquire = ClientStAcquiring { recvMsgAcquired = let epochQry systemStart = pure $ @@ -134,8 +155,8 @@ cardanoClient tracer ledgerPeers } wrappingMismatch :: forall err r. - (r -> m (ClientStAcquired block point query m Void)) - -> ClientStQuerying block point query m Void (Either err r) + (r -> m (CardanoClientStAcquired crypto m Void)) + -> CardanoClientStQuerying crypto m Void (Either err r) wrappingMismatch k = ClientStQuerying $ either (const . throwIO . userError $ "mismatch era info") k @@ -168,12 +189,7 @@ cardanoClient tracer ledgerPeers queryCurrentEra :: SystemStart -> UTCTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void queryCurrentEra systemStart nextEpoch = SendMsgQuery (BlockQuery (QueryHardFork GetCurrentEra)) $ ClientStQuerying $ \era -> queryStakeSnapshots systemStart nextEpoch era @@ -183,12 +199,7 @@ cardanoClient tracer ledgerPeers :: SystemStart -> UTCTime -> EraIndex (CardanoEras crypto) - -> m (ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void) + -> m (CardanoClientStAcquired crypto m Void) queryStakeSnapshots systemStart nextEpoch era = case era of EraByron{} -> throwIO UnsupportedEra @@ -209,12 +220,7 @@ cardanoClient tracer ledgerPeers where handleStakeSnapshots :: StakeSnapshots - -> m (ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void) + -> m (CardanoClientStAcquired crypto m Void) handleStakeSnapshots StakeSnapshots { ssStakeSnapshots } = do atomically do writeTVar stakePoolsVar ssStakeSnapshots @@ -234,12 +240,7 @@ cardanoClient tracer ledgerPeers queryLedgerPeers :: SystemStart -> NominalDiffTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void queryLedgerPeers systemStart toNextEpoch = SendMsgQuery (BlockQuery . QueryIfCurrentConway $ GetLedgerPeerSnapshot SingAllLedgerPeers) $ wrappingMismatch handleLedgerPeers @@ -289,63 +290,7 @@ cardanoClient tracer ledgerPeers -- release, continue the loop in `idle` release :: SystemStart -> NominalDiffTime - -> ClientStAcquired - (CardanoBlock crypto) - (Point (CardanoBlock crypto)) - (Query (CardanoBlock crypto)) - m - Void + -> CardanoClientStAcquired crypto m Void release systemStart toNextEpoch = SendMsgRelease do threadDelay $ min (realToFrac toNextEpoch) 86400 -- TODO fuzz this? idle $ Just systemStart - - -connectToCardanoNode :: Tracer IO TraceLocalStateQueryClient - -> Bool -- ^ use ledger peers - -> LocalSnocket - -> FilePath - -> NetworkMagic - -> NodeKernel crypto ntnAddr IO - -> IO (Either SomeException Void) -connectToCardanoNode tracer ledgerPeers localSnocket' snocketPath networkMagic nodeKernel = - connectTo - localSnocket' - nullNetworkConnectTracers --debuggingNetworkConnectTracers - (combineVersions - [ simpleSingletonVersions - version - NodeToClientVersionData { - networkMagic - , query = False - } - \_version -> - Mx.OuroborosApplication - [ Mx.MiniProtocol - { miniProtocolNum = Mx.MiniProtocolNum 7 - , miniProtocolStart = Mx.StartEagerly - , miniProtocolLimits = - Mx.MiniProtocolLimits - { maximumIngressQueue = 0xffffffff - } - , miniProtocolRun = - Mx.InitiatorProtocolOnly - . Mx.mkMiniProtocolCbFromPeerSt - . const - $ ( nullTracer -- TODO: add tracer - , cStateQueryCodec - , StateIdle - , localStateQueryClientPeer - $ cardanoClient tracer - ledgerPeers - (stakePools nodeKernel) - (nextEpochVar nodeKernel) - ) - } - ] - | version <- [minBound..maxBound] - , let supportedVersionMap = supportedNodeToClientVersions (Proxy :: Proxy (CardanoBlock StandardCrypto)) - blk = supportedVersionMap Map.! version - Codecs {cStateQueryCodec} = - clientCodecs (pClientInfoCodecConfig . protocolClientInfoCardano $ EpochSlots 21600) blk version - ]) - snocketPath diff --git a/flake.lock b/flake.lock index a1785b4..8d87ea8 100644 --- a/flake.lock +++ b/flake.lock @@ -3,11 +3,11 @@ "CHaP": { "flake": false, "locked": { - "lastModified": 1773244089, - "narHash": "sha256-XfxUIdO14amoNVVY6e9VeuWI96u87R3LhxllSKsKRfU=", + "lastModified": 1773767585, + "narHash": "sha256-51V6NaYu6rTqiy9h3itovXJFab5ehkgGiwgq3uSkKFc=", "owner": "IntersectMBO", "repo": "cardano-haskell-packages", - "rev": "b7f3826f3574129577ed01e96aadd0027adca27d", + "rev": "38ac4a748aa186310e3c114208597eb979354296", "type": "github" }, "original": {