From a930884d68951647016180e25ccb5dbbf0f231d5 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 14 Dec 2025 18:51:47 +0000 Subject: [PATCH 1/5] agent: remove service queue association when service ID changed --- src/Simplex/Messaging/Agent.hs | 11 +++++++---- src/Simplex/Messaging/Agent/Client.hs | 14 ++++++++++---- src/Simplex/Messaging/Agent/Store/AgentStore.hs | 4 ++++ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f44708fe6..f7f732f23 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -221,7 +221,8 @@ import Simplex.Messaging.Protocol SMPMsgMeta, SParty (..), SProtocolType (..), - ServiceSubResult, + ServiceSubResult (..), + ServiceSubError (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, @@ -1480,11 +1481,13 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do withStore' c (\db -> getSubscriptionService db userId srv) >>= \case Just serviceSub -> case M.lookup userId useServices of Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case - Left e | clientServiceError e -> unassocQueues $> False + Right (ServiceSubResult (Just SSErrorServiceId {}) _) -> unassocQueues + Left e | clientServiceError e -> unassocQueues _ -> pure True - _ -> unassocQueues $> False + _ -> unassocQueues where - unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv + unassocQueues :: AM Bool + unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv) _ -> pure False subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int) subscribeUserServer maxPending currPending ((userId, srv), hasService) = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 7acfb0b49..4ebe67e82 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -283,6 +283,7 @@ import Simplex.Messaging.Protocol SProtocolType (..), ServiceSub (..), ServiceSubResult (..), + ServiceSubError (..), SndPublicAuthKey, SubscriptionMode (..), NewNtfCreds (..), @@ -1725,12 +1726,17 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do notifySub' c "" $ ERR e resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult -resubscribeClientService c tSess@(userId, srv, _) serviceSub = - withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do - when (clientServiceError e) $ do +resubscribeClientService c tSess@(userId, srv, _) serviceSub = do + r <- tryAllErrors (withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub) + case r of + Right (ServiceSubResult (Just SSErrorServiceId {}) _) -> unassocSubscribeQueues + Left e | clientServiceError e -> unassocSubscribeQueues + _ -> pure () + either throwE pure r + where + unassocSubscribeQueues = do qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv void $ lift $ subscribeUserServerQueues c userId srv qs - throwE e -- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 0d0b2af70..c25f354bd 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -55,6 +55,7 @@ module Simplex.Messaging.Agent.Store.AgentStore getSubscriptionServers, getUserServerRcvQueueSubs, unassocUserServerRcvQueueSubs, + unassocUserServerRcvQueueSubs', unsetQueuesToSubscribe, setRcvServiceAssocs, removeRcvServiceAssocs, @@ -2293,6 +2294,9 @@ unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) = rcv_queues.rcv_queue_id, rcv_queues.rcv_primary, rcv_queues.replace_rcv_queue_id |] +unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO () +unassocUserServerRcvQueueSubs' db userId (SMPServer h p kh) = DB.execute db removeRcvAssocsQuery (h, p, userId, kh) + unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" From b391aa7f7fa088b57d07b979f566db10ad4a4800 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 14 Dec 2025 21:31:18 +0000 Subject: [PATCH 2/5] agent: process ENDS event --- src/Simplex/Messaging/Agent.hs | 19 +++++++++++++++---- src/Simplex/Messaging/Agent/Client.hs | 3 +-- src/Simplex/Messaging/Agent/Protocol.hs | 3 +++ src/Simplex/Messaging/Agent/TSessionSubs.hs | 4 ++++ src/Simplex/Messaging/Protocol.hs | 21 ++++++++++++--------- src/Simplex/Messaging/Server.hs | 4 ++-- tests/ServerTests.hs | 4 ++-- 7 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f7f732f23..d59ee714f 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -221,6 +221,7 @@ import Simplex.Messaging.Protocol SMPMsgMeta, SParty (..), SProtocolType (..), + ServiceSub (..), ServiceSubResult (..), ServiceSubError (..), SndPublicAuthKey, @@ -3118,16 +3119,26 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar notifyEnd removed | removed = notify END >> logServer "<--" c srv rId "END" | otherwise = logServer "<--" c srv rId "END from disconnected client - ignored" - -- TODO [certs rcv] - r@(SMP.ENDS _) -> unexpected r + SMP.ENDS n idsHash -> + atomically (ifM (activeClientSession c tSess sessId) (SS.deleteServiceSub tSess (currentSubs c) $> True) (pure False)) + >>= notifyEnd + where + notifyEnd removed + | removed = do + forM_ clientServiceId_ $ \serviceId -> + notify_ B.empty $ SERVICE_END srv $ ServiceSub serviceId n idsHash + logServer "<--" c srv rId "ENDS" + | otherwise = logServer "<--" c srv rId "ENDS from disconnected client - ignored" -- TODO [certs rcv] Possibly, we need to add some flag to connection that it was deleted SMP.DELD -> atomically (removeSubscription c tSess connId rq) >> notify DELD SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e r -> unexpected r where notify :: forall e m. (AEntityI e, MonadIO m) => AEvent e -> m () - notify msg = - let t = ("", connId, AEvt (sAEntity @e) msg) + notify = notify_ connId + notify_ :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m () + notify_ connId' msg = + let t = ("", connId', AEvt (sAEntity @e) msg) in atomically $ ifM (isFullTBQueue subQ) (modifyTVar' pendingMsgs (t :)) (writeTBQueue subQ t) prohibited :: Text -> AM () diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 4ebe67e82..481c7dfc0 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -266,7 +266,6 @@ import Simplex.Messaging.Protocol NetworkError (..), MsgFlags (..), MsgId, - IdsHash, NtfServer, NtfServerWithAuth, ProtoServer, @@ -1757,7 +1756,7 @@ withServiceClient c tSess subscribe = -- TODO [certs rcv] send subscription error event? subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult -subscribeClientService_ c withEvent tSess@(userId, srv, _) smp expected@(ServiceSub _ n idsHash) = do +subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do subscribed <- subscribeService smp SMP.SRecipientService n idsHash let sessId = sessionId $ thParams smp r = serviceSubResult expected subscribed diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index d5b35611b..ef9bc592f 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -393,6 +393,7 @@ data AEvent (e :: AEntity) where SERVICE_ALL :: SMPServer -> AEvent AENone -- all service messages are delivered SERVICE_DOWN :: SMPServer -> ServiceSub -> AEvent AENone SERVICE_UP :: SMPServer -> ServiceSubResult -> AEvent AENone + SERVICE_END :: SMPServer -> ServiceSub -> AEvent AENone SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn SENT :: AgentMsgId -> Maybe SMPServer -> AEvent AEConn @@ -467,6 +468,7 @@ data AEventTag (e :: AEntity) where SERVICE_ALL_ :: AEventTag AENone SERVICE_DOWN_ :: AEventTag AENone SERVICE_UP_ :: AEventTag AENone + SERVICE_END_ :: AEventTag AENone SWITCH_ :: AEventTag AEConn RSYNC_ :: AEventTag AEConn SENT_ :: AEventTag AEConn @@ -525,6 +527,7 @@ aEventTag = \case SERVICE_ALL _ -> SERVICE_ALL_ SERVICE_DOWN {} -> SERVICE_DOWN_ SERVICE_UP {} -> SERVICE_UP_ + SERVICE_END {} -> SERVICE_END_ SWITCH {} -> SWITCH_ RSYNC {} -> RSYNC_ SENT {} -> SENT_ diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs index ab15b9793..25df48b6a 100644 --- a/src/Simplex/Messaging/Agent/TSessionSubs.hs +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -23,6 +23,7 @@ module Simplex.Messaging.Agent.TSessionSubs batchDeletePendingSubs, deleteSub, batchDeleteSubs, + deleteServiceSub, hasPendingSubs, getPendingSubs, getActiveSubs, @@ -176,6 +177,9 @@ batchDeleteSubs tSess rqs = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs rIds = S.fromList $ map queueId rqs delete = (`modifyTVar'` (`M.withoutKeys` rIds)) +deleteServiceSub :: SMPTransportSession -> TSessionSubs -> STM () +deleteServiceSub tSess = lookupSubs tSess >=> mapM_ (\s -> writeTVar (activeServiceSub s) Nothing >> writeTVar (pendingServiceSub s) Nothing) + hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (\s -> anyM [hasSubs s, hasServiceSub s]) where diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 51128597c..b813f60e9 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -726,7 +726,7 @@ data BrokerMsg where RRES :: EncFwdResponse -> BrokerMsg -- relay to proxy PRES :: EncResponse -> BrokerMsg -- proxy to client END :: BrokerMsg - ENDS :: Int64 -> BrokerMsg + ENDS :: Int64 -> IdsHash -> BrokerMsg DELD :: BrokerMsg INFO :: QueueInfo -> BrokerMsg OK :: BrokerMsg @@ -1925,9 +1925,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where SOK serviceId_ | v >= serviceCertsSMPVersion -> e (SOK_, ' ', serviceId_) | otherwise -> e OK_ -- won't happen, the association with the service requires v >= serviceCertsSMPVersion - SOKS n idsHash - | v >= rcvServiceSMPVersion -> e (SOKS_, ' ', n, idsHash) - | otherwise -> e (SOKS_, ' ', n) + SOKS n idsHash -> serviceResp SOKS_ n idsHash MSG RcvMessage {msgId, msgBody = EncRcvMsgBody body} -> e (MSG_, ' ', msgId, Tail body) ALLS -> e ALLS_ @@ -1937,7 +1935,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where RRES (EncFwdResponse encBlock) -> e (RRES_, ' ', Tail encBlock) PRES (EncResponse encBlock) -> e (PRES_, ' ', Tail encBlock) END -> e END_ - ENDS n -> e (ENDS_, ' ', n) + ENDS n idsHash -> serviceResp ENDS_ n idsHash DELD | v >= deletedEventSMPVersion -> e DELD_ | otherwise -> e END_ @@ -1954,6 +1952,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where where e :: Encoding a => a -> ByteString e = smpEncode + serviceResp tag n idsHash + | v >= serviceCertsSMPVersion = e (tag, ' ', n, idsHash) + | otherwise = e (tag, ' ', n) protocolP v = \case MSG_ -> do @@ -1982,21 +1983,23 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId, serviceId, serverNtfCreds} LNK_ -> LNK <$> _smpP <*> smpP SOK_ -> SOK <$> _smpP - SOKS_ - | v >= rcvServiceSMPVersion -> SOKS <$> _smpP <*> smpP - | otherwise -> SOKS <$> _smpP <*> pure noIdsHash + SOKS_ -> serviceRespP SOKS NID_ -> NID <$> _smpP <*> smpP NMSG_ -> NMSG <$> _smpP <*> smpP PKEY_ -> PKEY <$> _smpP <*> smpP <*> smpP RRES_ -> RRES <$> (EncFwdResponse . unTail <$> _smpP) PRES_ -> PRES <$> (EncResponse . unTail <$> _smpP) END_ -> pure END - ENDS_ -> ENDS <$> _smpP + ENDS_ -> serviceRespP ENDS DELD_ -> pure DELD INFO_ -> INFO <$> _smpP OK_ -> pure OK ERR_ -> ERR <$> _smpP PONG_ -> pure PONG + where + serviceRespP resp + | v >= serviceCertsSMPVersion = resp <$> _smpP <*> smpP + | otherwise = resp <$> _smpP <*> pure noIdsHash fromProtocolError = \case PECmdSyntax -> CMD SYNTAX diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index b7bb0efaa..0a429bbe0 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -316,8 +316,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt cancelServiceSubs :: ServiceId -> Maybe (Client s) -> STM [PrevClientSub s] cancelServiceSubs serviceId = checkAnotherClient $ \c -> do - changedSubs@(n, _) <- swapTVar (clientServiceSubs c) (0, noIdsHash) - pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n))] + changedSubs@(n, idsHash) <- swapTVar (clientServiceSubs c) (0, noIdsHash) + pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n idsHash))] checkAnotherClient :: (Client s -> STM [PrevClientSub s]) -> Maybe (Client s) -> STM [PrevClientSub s] checkAnotherClient mkSub = \case Just c@Client {clientId, connected} | clntId /= clientId -> diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 82a39af39..27a72d2ac 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -1334,7 +1334,7 @@ testMessageServiceNotifications = Resp "4" _ (SOK (Just serviceId')) <- serviceSignSendRecv nh2 nKey servicePK ("4", nId, NSUB) serviceId' `shouldBe` serviceId -- service subscription is terminated - Resp "" serviceId2 (ENDS 1) <- tGet1 nh1 + Resp "" serviceId2 (ENDS 1 _) <- tGet1 nh1 serviceId2 `shouldBe` serviceId deliverMessage rh rId rKey sh sId sKey nh2 "hello again" dec 1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg nh1 >>= \case @@ -1374,7 +1374,7 @@ testMessageServiceNotifications = Resp "12" serviceId5 (SOKS 2 idsHash') <- signSendRecv nh1 (C.APrivateAuthKey C.SEd25519 servicePK) ("12", serviceId, NSUBS 2 idsHash) idsHash' `shouldBe` idsHash serviceId5 `shouldBe` serviceId - Resp "" serviceId6 (ENDS 2) <- tGet1 nh2 + Resp "" serviceId6 (ENDS 2 _) <- tGet1 nh2 serviceId6 `shouldBe` serviceId deliverMessage rh rId rKey sh sId sKey nh1 "connection 1 one more" dec deliverMessage rh rId'' rKey'' sh sId'' sKey'' nh1 "connection 2 one more" dec'' From 835f4119f1451275e1487d43a7d0d056db1341f1 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Sun, 14 Dec 2025 23:40:49 +0000 Subject: [PATCH 3/5] agent: send service subscription error event --- src/Simplex/Messaging/Agent.hs | 11 ++++++++--- src/Simplex/Messaging/Agent/Client.hs | 16 +++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index d59ee714f..f867866e3 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1482,9 +1482,14 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do withStore' c (\db -> getSubscriptionService db userId srv) >>= \case Just serviceSub -> case M.lookup userId useServices of Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case - Right (ServiceSubResult (Just SSErrorServiceId {}) _) -> unassocQueues - Left e | clientServiceError e -> unassocQueues - _ -> pure True + Right (ServiceSubResult e _) -> case e of + Just SSErrorServiceId {} -> unassocQueues + _ -> pure True + Left e -> do + atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) + if clientServiceError e + then unassocQueues + else pure True _ -> unassocQueues where unassocQueues :: AM Bool diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 481c7dfc0..27ef7ab11 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1725,13 +1725,15 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do notifySub' c "" $ ERR e resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult -resubscribeClientService c tSess@(userId, srv, _) serviceSub = do - r <- tryAllErrors (withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub) - case r of - Right (ServiceSubResult (Just SSErrorServiceId {}) _) -> unassocSubscribeQueues - Left e | clientServiceError e -> unassocSubscribeQueues - _ -> pure () - either throwE pure r +resubscribeClientService c tSess@(userId, srv, _) serviceSub = + tryAllErrors (withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub) >>= \case + Right r@(ServiceSubResult e _) -> case e of + Just SSErrorServiceId {} -> unassocSubscribeQueues $> r + _ -> pure r + Left e -> do + when (clientServiceError e) $ unassocSubscribeQueues + atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) + throwE e where unassocSubscribeQueues = do qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv From 197e3c8683259feb510074748d004de96283cfd0 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 15 Dec 2025 23:37:48 +0000 Subject: [PATCH 4/5] agent: test migrating to/from service subscriptions, fixes --- src/Simplex/Messaging/Agent.hs | 3 + src/Simplex/Messaging/Agent/Client.hs | 14 +- src/Simplex/Messaging/Agent/TSessionSubs.hs | 26 +++- tests/AgentTests/FunctionalAPITests.hs | 156 +++++++++++++++++++- 4 files changed, 183 insertions(+), 16 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f867866e3..991ecb1dd 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1484,6 +1484,9 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case Right (ServiceSubResult e _) -> case e of Just SSErrorServiceId {} -> unassocQueues + -- Below would resubscribe all queues after service was disabled and re-enabled + -- Possibly, we should always resubscribe all with expected is greated than subscribed + Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues _ -> pure True Left e -> do atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 27ef7ab11..47549c871 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1526,23 +1526,23 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> Maybe ServiceId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ([RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)]) processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do - pending <- SS.getPendingSubs tSess $ currentSubs c - let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pending) (M.empty, ([], []), [], 0) rs + pendingSubs <- SS.getPendingQueueSubs tSess $ currentSubs c + let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pendingSubs) (M.empty, ([], []), [], 0) rs unless (M.null failed) $ do incSMPServerStat' c userId srv connSubErrs $ M.size failed failSubscriptions c tSess failed unless (null qs && null sQs) $ do incSMPServerStat' c userId srv connSubscribed $ length qs + length sQs - SS.batchAddActiveSubs tSess sessId subscribed $ currentSubs c + SS.batchAddActiveSubs tSess sessId serviceId_ subscribed $ currentSubs c unless (ignored == 0) $ incSMPServerStat' c userId srv connSubIgnored ignored pure (sQs, notices) where partitionResults :: - (Map SMP.RecipientId RcvQueueSub, Maybe ServiceSub) -> + Map SMP.RecipientId RcvQueueSub -> (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> (Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int) -> (Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int) - partitionResults (pendingSubs, pendingSS) (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of + partitionResults pendingSubs (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of Left e -> case smpErrorClientNotice e of Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored) where @@ -1554,8 +1554,8 @@ processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do failed' = M.insert rcvId e failed Right serviceId_' | rcvId `M.member` pendingSubs -> - let subscribed' = case (serviceId_, serviceId_', pendingSS) of - (Just sId, Just sId', Just ServiceSub {smpServiceId}) | sId == sId' && sId == smpServiceId -> (qs, rq : sQs) + let subscribed' = case (serviceId_, serviceId_') of + (Just sId, Just sId') | sId == sId' -> (qs, rq : sQs) _ -> (rq : qs, sQs) in (failed, subscribed', notices', ignored) | otherwise -> (failed, subscribed, notices', ignored + 1) diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs index 25df48b6a..3f82da291 100644 --- a/src/Simplex/Messaging/Agent/TSessionSubs.hs +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -26,6 +26,7 @@ module Simplex.Messaging.Agent.TSessionSubs deleteServiceSub, hasPendingSubs, getPendingSubs, + getPendingQueueSubs, getActiveSubs, setSubsPending, updateClientNotices, @@ -40,12 +41,12 @@ import Data.Int (Int64) import Data.List (foldl') import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (isJust) +import Data.Maybe (fromMaybe, isJust) import qualified Data.Set as S import Simplex.Messaging.Agent.Protocol (SMPQueue (..)) import Simplex.Messaging.Agent.Store (RcvQueue, RcvQueueSub (..), SomeRcvQueue, StoredRcvQueue (rcvServiceAssoc), rcvQueueSub) import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..)) -import Simplex.Messaging.Protocol (RecipientId, ServiceSub (..), queueIdHash) +import Simplex.Messaging.Protocol (RecipientId, ServiceSub (..), noIdsHash, queueIdHash) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport @@ -138,22 +139,27 @@ addActiveSub' tSess sessId rq serviceAssoc ss = do in modifyTVar' (activeServiceSub s) (updateServiceSub <$>) else TM.insert rId rq $ pendingSubs s -batchAddActiveSubs :: SMPTransportSession -> SessionId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM () -batchAddActiveSubs tSess sessId (rqs, serviceRQs) ss = do +batchAddActiveSubs :: SMPTransportSession -> SessionId -> Maybe ServiceId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM () +batchAddActiveSubs tSess sessId serviceId_ (rqs, serviceRQs) ss = do s <- getSessSubs tSess ss sessId' <- readTVar $ subsSessId s let qs = M.fromList $ map (\rq -> (rcvId rq, rq)) rqs + serviceQs = M.fromList $ map (\rq -> (rcvId rq, rq)) serviceRQs if Just sessId == sessId' then do TM.union qs $ activeSubs s modifyTVar' (pendingSubs s) (`M.difference` qs) - serviceSub_ <- readTVar $ activeServiceSub s - forM_ serviceSub_ $ \(ServiceSub serviceId n idsHash) -> do - unless (null serviceRQs) $ do + forM_ serviceId_ $ \serviceId -> unless (null serviceRQs) $ do + modifyTVar' (pendingSubs s) (`M.difference` serviceQs) + ServiceSub serviceId' n idsHash <- + fromMaybe (ServiceSub serviceId 0 noIdsHash) <$> readTVar (activeServiceSub s) + when (serviceId == serviceId') $ do let idsHash' = idsHash <> mconcat (map (queueIdHash . rcvId) serviceRQs) n' = n + fromIntegral (length serviceRQs) writeTVar (activeServiceSub s) $ Just $ ServiceSub serviceId n' idsHash' - else TM.union qs $ pendingSubs s + else do + TM.union qs $ pendingSubs s + when (isJust serviceId_ && not (null serviceRQs)) $ TM.union serviceQs $ pendingSubs s batchAddPendingSubs :: SMPTransportSession -> [RcvQueueSub] -> TSessionSubs -> STM () batchAddPendingSubs tSess rqs ss = do @@ -191,6 +197,10 @@ getPendingSubs tSess = lookupSubs tSess >=> maybe (pure (M.empty, Nothing)) get where get s = liftM2 (,) (readTVar $ pendingSubs s) (readTVar $ pendingServiceSub s) +getPendingQueueSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) +getPendingQueueSubs = getSubs_ pendingSubs +{-# INLINE getPendingQueueSubs #-} + getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub) getActiveSubs = getSubs_ activeSubs {-# INLINE getActiveSubs #-} diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index b63e4cb48..bdb3588ba 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -480,6 +480,7 @@ functionalAPITests ps = do describe "Client service certificates" $ do it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps + it "migrate connections to and from service" $ testMigrateConnectionsToService ps describe "Connection switch" $ do describe "should switch delivery to the new queue" $ testServerMatrix2 ps testSwitchConnection @@ -3743,7 +3744,7 @@ testClientServiceIDChange ps@(_, ASType qs _) = do liftIO $ getInAnyOrder service [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False, \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False, - \case ("", "", AEvt SAENone (UP _ _)) -> True; _ -> False + \case ("", "", AEvt SAENone (UP _ [_])) -> True; _ -> False ] subscribeAllConnections user False Nothing ("", "", UP _ [_]) <- nGet user @@ -3759,6 +3760,159 @@ testClientServiceIDChange ps@(_, ASType qs _) = do ("", "", UP _ [_]) <- nGet user exchangeGreetingsMsgId 6 notService uId user sId +testMigrateConnectionsToService :: HasCallStack => (ASrvTransport, AStoreType) -> IO () +testMigrateConnectionsToService ps = do + (((sId1, uId1), (uId2, sId2)), ((sId3, uId3), (uId4, sId4)), ((sId5, uId5), (uId6, sId6))) <- + withSmpServerStoreLogOn ps testPort $ \_ -> do + -- starting without service + cs12@((sId1, uId1), (uId2, sId2)) <- + withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> + runRight $ (,) <$> makeConnection notService user <*> makeConnection user notService + -- migrating to service + cs34@((sId3, uId3), (uId4, sId4)) <- + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> runRight $ do + subscribeAllConnections service False Nothing + service `up` 2 + subscribeAllConnections user False Nothing + user `up` 2 + exchangeGreetingsMsgId 2 service uId1 user sId1 + exchangeGreetingsMsgId 2 service uId2 user sId2 + (,) <$> makeConnection service user <*> makeConnection user service + -- starting as service + cs56 <- + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> runRight $ do + subscribeAllConnections service False Nothing + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 4 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + subscribeAllConnections user False Nothing + user `up` 4 + exchangeGreetingsMsgId 4 service uId1 user sId1 + exchangeGreetingsMsgId 4 service uId2 user sId2 + exchangeGreetingsMsgId 2 service uId3 user sId3 + exchangeGreetingsMsgId 2 service uId4 user sId4 + (,) <$> makeConnection service user <*> makeConnection user service + pure (cs12, cs34, cs56) + -- server reconnecting resubscribes service + let testSendMessages6 s u n = do + exchangeGreetingsMsgId (n + 4) s uId1 u sId1 + exchangeGreetingsMsgId (n + 4) s uId2 u sId2 + exchangeGreetingsMsgId (n + 2) s uId3 u sId3 + exchangeGreetingsMsgId (n + 2) s uId4 u sId4 + exchangeGreetingsMsgId n s uId5 u sId5 + exchangeGreetingsMsgId n s uId6 u sId6 + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do + subscribeAllConnections service False Nothing + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 6 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + subscribeAllConnections user False Nothing + user `up` 6 + testSendMessages6 service user 2 + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 6 _)) <- nGet service + user `down` 6 + withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 6 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + user `up` 6 + testSendMessages6 service user 4 + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 6 _)) <- nGet service + user `down` 6 + -- disabling service and adding connections + ((sId7, uId7), (uId8, sId8)) <- + withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> do + cs78@((sId7, uId7), (uId8, sId8)) <- + withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + subscribeAllConnections notService False Nothing + notService `up` 6 + subscribeAllConnections user False Nothing + user `up` 6 + testSendMessages6 notService user 6 + (,) <$> makeConnection notService user <*> makeConnection user notService + notService `down` 8 + user `down` 8 + withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + notService `up` 8 + user `up` 8 + testSendMessages6 notService user 8 + exchangeGreetingsMsgId 2 notService uId7 user sId7 + exchangeGreetingsMsgId 2 notService uId8 user sId8 + notService `down` 8 + user `down` 8 + pure cs78 + let testSendMessages8 s u n = do + testSendMessages6 s u (n + 8) + exchangeGreetingsMsgId (n + 2) s uId7 u sId7 + exchangeGreetingsMsgId (n + 2) s uId8 u sId8 + -- re-enabling service and adding connections + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do + subscribeAllConnections service False Nothing + -- the "error" in SERVICE_UP event is expected, because when service was disabled for the user, + -- the service and associations were not removed, to optimize non-service clients. + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 6 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + service `up` 8 + subscribeAllConnections user False Nothing + user `up` 8 + testSendMessages8 service user 2 + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 8 _)) <- nGet service + user `down` 8 + -- re-connect to server + withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + user `up` 8 + testSendMessages8 service user 4 + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ _ _)) <- nGet service -- should be 8 here + user `down` 8 + -- restart agents + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do + subscribeAllConnections service False Nothing + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + subscribeAllConnections user False Nothing + user `up` 8 + testSendMessages8 service user 6 + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 8 _)) <- nGet service + user `down` 8 + runRight_ $ do + void $ sendMessage user sId7 SMP.noMsgFlags "hello 1" + void $ sendMessage user sId8 SMP.noMsgFlags "hello 2" + -- re-connect to server + withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False, + \case ("", c, AEvt SAEConn (Msg "hello 1")) -> c == uId7; _ -> False, + \case ("", c, AEvt SAEConn (Msg "hello 2")) -> c == uId8; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + liftIO $ getInAnyOrder user + [ \case ("", "", AEvt SAENone (UP _ [_, _, _, _, _, _, _, _])) -> True; _ -> False, + \case ("", c, AEvt SAEConn (SENT 10)) -> c == sId7; _ -> False, + \case ("", c, AEvt SAEConn (SENT 10)) -> c == sId8; _ -> False + ] + testSendMessages6 service user 16 + where + up c n = do + ("", "", UP _ conns) <- nGet c + liftIO $ length conns `shouldBe` n + down c n = do + ("", "", DOWN _ conns) <- nGet c + liftIO $ length conns `shouldBe` n + getSMPAgentClient' :: Int -> AgentConfig -> InitialAgentServers -> String -> IO AgentClient getSMPAgentClient' clientId cfg' initServers dbPath = do Right st <- liftIO $ createStore dbPath From a90601c674fbb6bcce620c9c68d7c6023f39e312 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 18 Dec 2025 22:27:17 +0000 Subject: [PATCH 5/5] agent: always remove service when disabled, fix service subscriptions --- src/Simplex/Messaging/Agent.hs | 72 +++++++++---------- src/Simplex/Messaging/Agent/Client.hs | 12 ++-- .../Messaging/Agent/Store/AgentStore.hs | 46 ++++++------ src/Simplex/Messaging/Agent/TSessionSubs.hs | 41 ++++++----- src/Simplex/Messaging/Protocol.hs | 13 ++-- src/Simplex/Messaging/Server.hs | 2 +- src/Simplex/Messaging/Server/Env/STM.hs | 6 +- tests/AgentTests/FunctionalAPITests.hs | 28 +++++--- tests/CoreTests/TSessionSubs.hs | 6 +- 9 files changed, 116 insertions(+), 110 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 991ecb1dd..e17c39a16 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1042,10 +1042,10 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni createRcvQueue nonce_ qd e2eKeys = do AgentConfig {smpClientVRange = vr} <- asks config ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing - (rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e + (rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e atomically $ incSMPServerStat c userId srv connCreated rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode - lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId + lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_ mapM_ (newQueueNtfSubscription c rq') ntfServer_ pure (rq', qUri) createConnReq :: SMPQueueUri -> AM (ConnectionRequestUri c) @@ -1293,11 +1293,11 @@ joinConnSrvAsync _c _userId _connId _enableNtfs (CRContactUri _) _cInfo _subMode createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM SMPQueueInfo createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing - (rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode + (rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode atomically $ incSMPServerStat c userId (qServer rq) connCreated let qInfo = toVersionT qUri smpClientVersion rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode - lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId + lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_ mapM_ (newQueueNtfSubscription c rq') ntfServer_ pure qInfo @@ -1453,22 +1453,14 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs Nothing -> userSrvs useServices <- readTVarIO $ useClientServices c - -- These options are possible below: - -- 1) services fully disabled: - -- No service subscriptions will be attempted, and existing services and association will remain in in the database, - -- but they will be ignored because of hasService parameter set to False. - -- This approach preserves performance for all clients that do not use services. - -- 2) at least one user ID has services enabled: - -- Service will be loaded for all user/server combinations: - -- a) service is enabled for user ID and service record exists: subscription will be attempted, - -- b) service is disabled and record exists: service record and all associations will be removed, - -- c) service is disabled or no record: no subscription attempt. + -- Service will be loaded for all user/server combinations: + -- a) service is enabled for user ID and service record exists: subscription will be attempted, + -- b) service is disabled and record exists: service record and all associations will be removed, + -- c) service is disabled or no record: no subscription attempt. -- On successful service subscription, only unassociated queues will be subscribed. - userSrvs'' <- - if any id useServices - then lift $ mapConcurrently (subscribeService useServices) userSrvs' - else pure $ map (,False) userSrvs' - rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'' + userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs' + userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2 + rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3 let (errs, oks) = partitionEithers rs logInfo $ "subscribed " <> tshow (sum oks) <> " queues" forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",) @@ -1477,23 +1469,27 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do resumeAllCommands c where handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e) - subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc) - subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do - withStore' c (\db -> getSubscriptionService db userId srv) >>= \case + getService :: DB.Connection -> Map UserId Bool -> (UserId, SMPServer) -> IO ((UserId, SMPServer), Maybe ServiceSub) + getService db useServices us@(userId, srv) = + fmap (us,) $ getSubscriptionService db userId srv >>= \case Just serviceSub -> case M.lookup userId useServices of - Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case - Right (ServiceSubResult e _) -> case e of - Just SSErrorServiceId {} -> unassocQueues - -- Below would resubscribe all queues after service was disabled and re-enabled - -- Possibly, we should always resubscribe all with expected is greated than subscribed - Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues - _ -> pure True - Left e -> do - atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) - if clientServiceError e - then unassocQueues - else pure True - _ -> unassocQueues + Just True -> pure $ Just serviceSub + _ -> Nothing <$ unassocUserServerRcvQueueSubs' db userId srv + _ -> pure Nothing + subscribeService :: ((UserId, SMPServer), Maybe ServiceSub) -> AM' ((UserId, SMPServer), ServiceAssoc) + subscribeService (us@(userId, srv), serviceSub_) = fmap ((us,) . fromRight False) $ tryAllErrors' $ + case serviceSub_ of + Just serviceSub -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case + Right (ServiceSubResult e _) -> case e of + Just SSErrorServiceId {} -> unassocQueues + -- Possibly, we should always resubscribe all when expected is greater than subscribed + Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues + _ -> pure True + Left e -> do + atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) + if clientServiceError e + then unassocQueues + else pure True where unassocQueues :: AM Bool unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv) @@ -2231,10 +2227,10 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth -- TODO [notications] possible improvement would be to create ntf credentials here, to avoid creating them after rotation completes. -- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions. - (q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe + (q, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId} rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe - lift $ addNewQueueSubscription c rq'' tSess sessId + lift $ addNewQueueSubscription c rq'' tSess sessId serviceId_ void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))] rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD let rqs' = updatedQs rq1 rqs <> [rq''] @@ -2920,7 +2916,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar processSubOk :: RcvQueue -> TVar [ConnId] -> TVar [RcvQueue] -> Maybe SMP.ServiceId -> IO () processSubOk rq@RcvQueue {connId} upConnIds serviceRQs serviceId_ = atomically . whenM (isPendingSub rq) $ do - SS.addActiveSub tSess sessId rq $ currentSubs c + SS.addActiveSub tSess sessId serviceId_ rq $ currentSubs c modifyTVar' upConnIds (connId :) when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq :) clientServiceId_ = (\THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 47549c871..9bf1afd8d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1420,7 +1420,7 @@ getSessionMode :: AgentClient -> STM TransportSessionMode getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig {-# INLINE getSessionMode #-} -newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId) +newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId) newRcvQueue c nm userId connId srv vRange cMode enableNtfs subMode = do let qrd = case cMode of SCMInvitation -> CQRMessaging Nothing; SCMContact -> CQRContact Nothing e2eKeys <- atomically . C.generateKeyPair =<< asks random @@ -1441,7 +1441,7 @@ queueReqData = \case CQRMessaging d -> QRMessaging $ srvReq <$> d CQRContact d -> QRContact $ srvReq <$> d -newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId) +newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId) newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enableNtfs subMode nonce_ (e2eDhKey, e2ePrivKey) = do C.AuthAlg a <- asks (rcvAuthAlg . config) g <- asks random @@ -1483,7 +1483,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl deleteErrors = 0 } qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey queueMode - pure (rq, qUri, tSess, sessionId thParams') + pure (rq, qUri, tSess, sessionId thParams', sessServiceId) where mkNtfCreds :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> TVar ChaChaDRG -> SMPClient -> IO (Maybe (C.AAuthKeyPair, C.PrivateKeyX25519), Maybe NewNtfCreds) mkNtfCreds a g smp @@ -1828,14 +1828,14 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n TM.insert k s removedSubs pure s -addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' () -addNewQueueSubscription c rq' tSess sessId = do +addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> Maybe ServiceId -> AM' () +addNewQueueSubscription c rq' tSess sessId serviceId_ = do let rq = rcvQueueSub rq' same <- atomically $ do modifyTVar' (subscrConns c) $ S.insert $ qConnId rq active <- activeClientSession c tSess sessId if active - then SS.addActiveSub tSess sessId rq' $ currentSubs c + then SS.addActiveSub tSess sessId serviceId_ rq' $ currentSubs c else SS.addPendingSub tSess rq $ currentSubs c pure active unless same $ resubscribeSMPSession c tSess diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 46242df5a..853a76908 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -38,7 +38,6 @@ module Simplex.Messaging.Agent.Store.AgentStore -- * Client services createClientService, getClientServiceCredentials, - getSubscriptionServices, getSubscriptionService, getClientServiceServers, setClientServiceId, @@ -345,7 +344,7 @@ handleSQLError err e = case constraintViolation e of handleSQLError :: StoreError -> SQLError -> StoreError handleSQLError err e | SQL.sqlError e == SQL.ErrorConstraint = err - | otherwise = SEInternal $ bshow e + | otherwise = SEInternal $ encodeUtf8 $ tshow e <> ": " <> SQL.sqlErrorDetails e <> ", " <> SQL.sqlErrorContext e #endif createUserRecord :: DB.Connection -> IO UserId @@ -440,11 +439,6 @@ getClientServiceCredentials db userId srv = where toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_) -getSubscriptionServices :: DB.Connection -> IO [(UserId, (SMPServer, ServiceSub))] -getSubscriptionServices db = map toUserService <$> DB.query_ db clientServiceQuery - where - toUserService (Only userId :. serviceRow) = (userId, toServerService serviceRow) - getSubscriptionService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ServiceSub) getSubscriptionService db userId (SMPServer h p kh) = maybeFirstRow toService $ @@ -454,7 +448,7 @@ getSubscriptionService db userId (SMPServer h p kh) = SELECT c.service_id, c.service_queue_count, c.service_queue_ids_hash FROM client_services c JOIN servers s ON s.host = c.host AND s.port = c.port - WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ? + WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ? AND service_id IS NOT NULL |] (userId, h, p, kh) where @@ -462,15 +456,16 @@ getSubscriptionService db userId (SMPServer h p kh) = getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)] getClientServiceServers db userId = - map toServerService <$> DB.query db (clientServiceQuery <> " WHERE c.user_id = ?") (Only userId) - -clientServiceQuery :: Query -clientServiceQuery = - [sql| - SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash - FROM client_services c - JOIN servers s ON s.host = c.host AND s.port = c.port - |] + map toServerService <$> + DB.query + db + [sql| + SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash + FROM client_services c + JOIN servers s ON s.host = c.host AND s.port = c.port + WHERE c.user_id = ? AND service_id IS NOT NULL + |] + (Only userId) toServerService :: (NonEmpty TransportHost, ServiceName, C.KeyHash, ServiceId, Int64, Binary ByteString) -> (ProtocolServer 'PSMP, ServiceSub) toServerService (host, port, kh, serviceId, n, Binary idsHash) = @@ -488,14 +483,20 @@ setClientServiceId db userId srv serviceId = (serviceId, userId, host srv, port srv) deleteClientService :: DB.Connection -> UserId -> SMPServer -> IO () -deleteClientService db userId srv = +deleteClientService db userId (SMPServer h p kh) = DB.execute db [sql| DELETE FROM client_services WHERE user_id = ? AND host = ? AND port = ? + AND EXISTS ( + SELECT 1 FROM servers s + WHERE s.host = client_services.host + AND s.port = client_services.port + AND COALESCE(client_services.server_key_hash, s.key_hash) = ? + ); |] - (userId, host srv, port srv) + (userId, h, p, Just kh) deleteClientServices :: DB.Connection -> UserId -> IO () deleteClientServices db userId = do @@ -2280,7 +2281,8 @@ getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService = | otherwise = "" unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] -unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) = +unassocUserServerRcvQueueSubs db userId srv@(SMPServer h p kh) = do + deleteClientService db userId srv map toRcvQueueSub <$> DB.query db @@ -2295,7 +2297,9 @@ unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) = |] unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO () -unassocUserServerRcvQueueSubs' db userId (SMPServer h p kh) = DB.execute db removeRcvAssocsQuery (h, p, userId, kh) +unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do + deleteClientService db userId srv + DB.execute db removeRcvAssocsQuery (h, p, userId, kh) unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs index 3f82da291..a1db48c9e 100644 --- a/src/Simplex/Messaging/Agent/TSessionSubs.hs +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -44,9 +44,9 @@ import qualified Data.Map.Strict as M import Data.Maybe (fromMaybe, isJust) import qualified Data.Set as S import Simplex.Messaging.Agent.Protocol (SMPQueue (..)) -import Simplex.Messaging.Agent.Store (RcvQueue, RcvQueueSub (..), SomeRcvQueue, StoredRcvQueue (rcvServiceAssoc), rcvQueueSub) +import Simplex.Messaging.Agent.Store (RcvQueue, RcvQueueSub (..), ServiceAssoc, SomeRcvQueue, StoredRcvQueue (rcvServiceAssoc), rcvQueueSub) import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..)) -import Simplex.Messaging.Protocol (RecipientId, ServiceSub (..), noIdsHash, queueIdHash) +import Simplex.Messaging.Protocol (IdsHash, RecipientId, ServiceSub (..), queueIdHash) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport @@ -121,45 +121,48 @@ setActiveServiceSub tSess sessId serviceSub ss = do writeTVar (pendingServiceSub s) Nothing else writeTVar (pendingServiceSub s) $ Just serviceSub -addActiveSub :: SMPTransportSession -> SessionId -> RcvQueue -> TSessionSubs -> STM () -addActiveSub tSess sessId rq = addActiveSub' tSess sessId (rcvQueueSub rq) (rcvServiceAssoc rq) +addActiveSub :: SMPTransportSession -> SessionId -> Maybe ServiceId -> RcvQueue -> TSessionSubs -> STM () +addActiveSub tSess sessId serviceId_ rq = addActiveSub' tSess sessId serviceId_ (rcvQueueSub rq) (rcvServiceAssoc rq) {-# INLINE addActiveSub #-} -addActiveSub' :: SMPTransportSession -> SessionId -> RcvQueueSub -> Bool -> TSessionSubs -> STM () -addActiveSub' tSess sessId rq serviceAssoc ss = do +addActiveSub' :: SMPTransportSession -> SessionId -> Maybe ServiceId -> RcvQueueSub -> ServiceAssoc -> TSessionSubs -> STM () +addActiveSub' tSess sessId serviceId_ rq serviceAssoc ss = do s <- getSessSubs tSess ss sessId' <- readTVar $ subsSessId s let rId = rcvId rq if Just sessId == sessId' then do - TM.insert rId rq $ activeSubs s TM.delete rId $ pendingSubs s - when serviceAssoc $ - let updateServiceSub (ServiceSub serviceId n idsHash) = ServiceSub serviceId (n + 1) (idsHash <> queueIdHash rId) - in modifyTVar' (activeServiceSub s) (updateServiceSub <$>) + case serviceId_ of + Just serviceId | serviceAssoc -> updateActiveService s serviceId 1 (queueIdHash rId) + _ -> TM.insert rId rq $ activeSubs s else TM.insert rId rq $ pendingSubs s batchAddActiveSubs :: SMPTransportSession -> SessionId -> Maybe ServiceId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM () batchAddActiveSubs tSess sessId serviceId_ (rqs, serviceRQs) ss = do s <- getSessSubs tSess ss sessId' <- readTVar $ subsSessId s - let qs = M.fromList $ map (\rq -> (rcvId rq, rq)) rqs - serviceQs = M.fromList $ map (\rq -> (rcvId rq, rq)) serviceRQs + let qs = queuesMap rqs + serviceQs = queuesMap serviceRQs if Just sessId == sessId' then do TM.union qs $ activeSubs s modifyTVar' (pendingSubs s) (`M.difference` qs) - forM_ serviceId_ $ \serviceId -> unless (null serviceRQs) $ do + unless (null serviceRQs) $ forM_ serviceId_ $ \serviceId -> do modifyTVar' (pendingSubs s) (`M.difference` serviceQs) - ServiceSub serviceId' n idsHash <- - fromMaybe (ServiceSub serviceId 0 noIdsHash) <$> readTVar (activeServiceSub s) - when (serviceId == serviceId') $ do - let idsHash' = idsHash <> mconcat (map (queueIdHash . rcvId) serviceRQs) - n' = n + fromIntegral (length serviceRQs) - writeTVar (activeServiceSub s) $ Just $ ServiceSub serviceId n' idsHash' + updateActiveService s serviceId (fromIntegral $ length serviceRQs) (mconcat $ map (queueIdHash . rcvId) serviceRQs) else do TM.union qs $ pendingSubs s when (isJust serviceId_ && not (null serviceRQs)) $ TM.union serviceQs $ pendingSubs s + where + queuesMap = M.fromList . map (\rq -> (rcvId rq, rq)) + +updateActiveService :: SessSubs -> ServiceId -> Int64 -> IdsHash -> STM () +updateActiveService s serviceId addN addIdsHash = do + ServiceSub serviceId' n idsHash <- + fromMaybe (ServiceSub serviceId 0 mempty) <$> readTVar (activeServiceSub s) + when (serviceId == serviceId') $ + writeTVar (activeServiceSub s) $ Just $ ServiceSub serviceId (n + addN) (idsHash <> addIdsHash) batchAddPendingSubs :: SMPTransportSession -> [RcvQueueSub] -> TSessionSubs -> STM () batchAddPendingSubs tSess rqs ss = do diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index b813f60e9..4993aaac8 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -147,7 +147,6 @@ module Simplex.Messaging.Protocol serviceSubResult, queueIdsHash, queueIdHash, - noIdsHash, addServiceSubs, subtractServiceSubs, MaxMessageLen, @@ -1518,10 +1517,6 @@ instance Monoid IdsHash where xor' :: Word8 -> Word8 -> Word8 xor' x y = let !r = xor x y in r -noIdsHash ::IdsHash -noIdsHash = IdsHash B.empty -{-# INLINE noIdsHash #-} - queueIdsHash :: [QueueId] -> IdsHash queueIdsHash = mconcat . map queueIdHash @@ -1535,7 +1530,7 @@ addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash') subtractServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash) subtractServiceSubs (n', idsHash') (n, idsHash) | n > n' = (n - n', idsHash <> idsHash') -- concat is a reversible xor: (x `xor` y) `xor` y == x - | otherwise = (0, noIdsHash) + | otherwise = (0, mempty) data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock @@ -1883,7 +1878,7 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where QUE_ -> pure QUE CT SRecipientService SUBS_ | v >= rcvServiceSMPVersion -> Cmd SRecipientService <$> (SUBS <$> _smpP <*> smpP) - | otherwise -> pure $ Cmd SRecipientService $ SUBS (-1) noIdsHash + | otherwise -> pure $ Cmd SRecipientService $ SUBS (-1) mempty CT SSender tag -> Cmd SSender <$> case tag of SKEY_ -> SKEY <$> _smpP @@ -1902,7 +1897,7 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB CT SNotifierService NSUBS_ | v >= rcvServiceSMPVersion -> Cmd SNotifierService <$> (NSUBS <$> _smpP <*> smpP) - | otherwise -> pure $ Cmd SNotifierService $ NSUBS (-1) noIdsHash + | otherwise -> pure $ Cmd SNotifierService $ NSUBS (-1) mempty fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg {-# INLINE fromProtocolError #-} @@ -1999,7 +1994,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where where serviceRespP resp | v >= serviceCertsSMPVersion = resp <$> _smpP <*> smpP - | otherwise = resp <$> _smpP <*> pure noIdsHash + | otherwise = resp <$> _smpP <*> pure mempty fromProtocolError = \case PECmdSyntax -> CMD SYNTAX diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 0a429bbe0..24247e781 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -316,7 +316,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt cancelServiceSubs :: ServiceId -> Maybe (Client s) -> STM [PrevClientSub s] cancelServiceSubs serviceId = checkAnotherClient $ \c -> do - changedSubs@(n, idsHash) <- swapTVar (clientServiceSubs c) (0, noIdsHash) + changedSubs@(n, idsHash) <- swapTVar (clientServiceSubs c) (0, mempty) pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n idsHash))] checkAnotherClient :: (Client s -> STM [PrevClientSub s]) -> Maybe (Client s) -> STM [PrevClientSub s] checkAnotherClient mkSub = \case diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 02cf136c7..e59cd5c0b 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -502,7 +502,7 @@ newServerSubscribers = do subQ <- newTQueueIO queueSubscribers <- SubscribedClients <$> TM.emptyIO serviceSubscribers <- SubscribedClients <$> TM.emptyIO - totalServiceSubs <- newTVarIO (0, noIdsHash) + totalServiceSubs <- newTVarIO (0, mempty) subClients <- newTVarIO IS.empty pendingEvents <- newTVarIO IM.empty pure ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents} @@ -513,8 +513,8 @@ newClient clientId qSize clientTHParams createdAt = do ntfSubscriptions <- TM.emptyIO serviceSubscribed <- newTVarIO False ntfServiceSubscribed <- newTVarIO False - serviceSubsCount <- newTVarIO (0, noIdsHash) - ntfServiceSubsCount <- newTVarIO (0, noIdsHash) + serviceSubsCount <- newTVarIO (0, mempty) + ntfServiceSubsCount <- newTVarIO (0, mempty) rcvQ <- newTBQueueIO qSize sndQ <- newTBQueueIO qSize msgQ <- newTBQueueIO qSize diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index bdb3588ba..34448fc10 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -3722,10 +3722,22 @@ testClientServiceConnection ps = do testClientServiceIDChange :: HasCallStack => (ASrvTransport, AStoreType) -> IO () testClientServiceIDChange ps@(_, ASType qs _) = do (sId, uId) <- withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do - withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + conns <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do conns@(sId, uId) <- makeConnection service user exchangeGreetings service uId user sId pure conns + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + withSmpServerStoreLogOn ps testPort $ \_ -> do + getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 1 _)))) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + ("", "", UP _ [_]) <- nGet user + pure () + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + pure conns _ :: () <- case qs of SQSPostgres -> do #if defined(dbServerPostgres) @@ -3740,6 +3752,7 @@ testClientServiceIDChange ps@(_, ASType qs _) = do writeFile testStoreLogFile $ unlines $ filter (not . ("NEW_SERVICE" `isPrefixOf`)) $ lines s withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + liftIO $ threadDelay 250000 subscribeAllConnections service False Nothing liftIO $ getInAnyOrder service [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False, @@ -3749,10 +3762,11 @@ testClientServiceIDChange ps@(_, ASType qs _) = do subscribeAllConnections user False Nothing ("", "", UP _ [_]) <- nGet user exchangeGreetingsMsgId 4 service uId user sId + ("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + pure () -- disable service in the client - -- The test uses True for non-existing user to make sure it's removed for user 1, - -- because if no users use services, then it won't be checking them to optimize for most clients. - withAgentClientsServers2 (agentCfg, initAgentServers {useServices = M.fromList [(100, True)]}) (agentCfg, initAgentServers) $ \notService user -> do + withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> do withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do subscribeAllConnections notService False Nothing ("", "", UP _ [_]) <- nGet notService @@ -3853,12 +3867,6 @@ testMigrateConnectionsToService ps = do withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do subscribeAllConnections service False Nothing - -- the "error" in SERVICE_UP event is expected, because when service was disabled for the user, - -- the service and associations were not removed, to optimize non-service clients. - liftIO $ getInAnyOrder service - [ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 6 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False, - \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False - ] service `up` 8 subscribeAllConnections user False Nothing user `up` 8 diff --git a/tests/CoreTests/TSessionSubs.hs b/tests/CoreTests/TSessionSubs.hs index e9038b9d9..96975e9ef 100644 --- a/tests/CoreTests/TSessionSubs.hs +++ b/tests/CoreTests/TSessionSubs.hs @@ -69,21 +69,21 @@ testSessionSubs = do atomically (SS.hasPendingSub tSess1 (rcvId q4) ss) `shouldReturn` False atomically (SS.hasActiveSub tSess1 (rcvId q4) ss) `shouldReturn` False -- setting active queue without setting session ID would keep it as pending - atomically $ SS.addActiveSub' tSess1 "123" q1 False ss + atomically $ SS.addActiveSub' tSess1 "123" Nothing q1 False ss atomically (SS.hasPendingSub tSess1 (rcvId q1) ss) `shouldReturn` True atomically (SS.hasActiveSub tSess1 (rcvId q1) ss) `shouldReturn` False dumpSessionSubs ss `shouldReturn` st countSubs ss `shouldReturn` (0, 3) -- setting active queues atomically $ SS.setSessionId tSess1 "123" ss - atomically $ SS.addActiveSub' tSess1 "123" q1 False ss + atomically $ SS.addActiveSub' tSess1 "123" Nothing q1 False ss atomically (SS.hasPendingSub tSess1 (rcvId q1) ss) `shouldReturn` False atomically (SS.hasActiveSub tSess1 (rcvId q1) ss) `shouldReturn` True atomically (SS.getActiveSubs tSess1 ss) `shouldReturn` M.fromList [("r1", q1)] atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` (M.fromList [("r2", q2)], Nothing) countSubs ss `shouldReturn` (1, 2) atomically $ SS.setSessionId tSess2 "456" ss - atomically $ SS.addActiveSub' tSess2 "456" q4 False ss + atomically $ SS.addActiveSub' tSess2 "456" Nothing q4 False ss atomically (SS.hasPendingSub tSess2 (rcvId q4) ss) `shouldReturn` False atomically (SS.hasActiveSub tSess2 (rcvId q4) ss) `shouldReturn` True atomically (SS.hasActiveSub tSess1 (rcvId q4) ss) `shouldReturn` False -- wrong transport session