Skip to content
Merged
86 changes: 52 additions & 34 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ import Simplex.Messaging.Protocol
SMPMsgMeta,
SParty (..),
SProtocolType (..),
ServiceSubResult,
ServiceSub (..),
ServiceSubResult (..),
ServiceSubError (..),
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
Expand Down Expand Up @@ -1040,10 +1042,10 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
createRcvQueue nonce_ qd e2eKeys = do
AgentConfig {smpClientVRange = vr} <- asks config
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
atomically $ incSMPServerStat c userId srv connCreated
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure (rq', qUri)
createConnReq :: SMPQueueUri -> AM (ConnectionRequestUri c)
Expand Down Expand Up @@ -1291,11 +1293,11 @@ joinConnSrvAsync _c _userId _connId _enableNtfs (CRContactUri _) _cInfo _subMode
createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM SMPQueueInfo
createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
(rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
atomically $ incSMPServerStat c userId (qServer rq) connCreated
let qInfo = toVersionT qUri smpClientVersion
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure qInfo

Expand Down Expand Up @@ -1451,22 +1453,14 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
Nothing -> userSrvs
useServices <- readTVarIO $ useClientServices c
-- These options are possible below:
-- 1) services fully disabled:
-- No service subscriptions will be attempted, and existing services and association will remain in in the database,
-- but they will be ignored because of hasService parameter set to False.
-- This approach preserves performance for all clients that do not use services.
-- 2) at least one user ID has services enabled:
-- Service will be loaded for all user/server combinations:
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
-- b) service is disabled and record exists: service record and all associations will be removed,
-- c) service is disabled or no record: no subscription attempt.
-- Service will be loaded for all user/server combinations:
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
-- b) service is disabled and record exists: service record and all associations will be removed,
-- c) service is disabled or no record: no subscription attempt.
-- On successful service subscription, only unassociated queues will be subscribed.
userSrvs'' <-
if any id useServices
then lift $ mapConcurrently (subscribeService useServices) userSrvs'
else pure $ map (,False) userSrvs'
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs'
userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3
let (errs, oks) = partitionEithers rs
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
Expand All @@ -1475,16 +1469,30 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
resumeAllCommands c
where
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc)
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
getService :: DB.Connection -> Map UserId Bool -> (UserId, SMPServer) -> IO ((UserId, SMPServer), Maybe ServiceSub)
getService db useServices us@(userId, srv) =
fmap (us,) $ getSubscriptionService db userId srv >>= \case
Just serviceSub -> case M.lookup userId useServices of
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
Left e | clientServiceError e -> unassocQueues $> False
Just True -> pure $ Just serviceSub
_ -> Nothing <$ unassocUserServerRcvQueueSubs' db userId srv
_ -> pure Nothing
subscribeService :: ((UserId, SMPServer), Maybe ServiceSub) -> AM' ((UserId, SMPServer), ServiceAssoc)
subscribeService (us@(userId, srv), serviceSub_) = fmap ((us,) . fromRight False) $ tryAllErrors' $
case serviceSub_ of
Just serviceSub -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
Right (ServiceSubResult e _) -> case e of
Just SSErrorServiceId {} -> unassocQueues
-- Possibly, we should always resubscribe all when expected is greater than subscribed
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
_ -> pure True
_ -> unassocQueues $> False
Left e -> do
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
if clientServiceError e
then unassocQueues
else pure True
where
unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
unassocQueues :: AM Bool
unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv)
_ -> pure False
subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
Expand Down Expand Up @@ -2219,10 +2227,10 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth
-- TODO [notications] possible improvement would be to create ntf credentials here, to avoid creating them after rotation completes.
-- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions.
(q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
(q, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe
lift $ addNewQueueSubscription c rq'' tSess sessId
lift $ addNewQueueSubscription c rq'' tSess sessId serviceId_
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
let rqs' = updatedQs rq1 rqs <> [rq'']
Expand Down Expand Up @@ -2908,7 +2916,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
processSubOk :: RcvQueue -> TVar [ConnId] -> TVar [RcvQueue] -> Maybe SMP.ServiceId -> IO ()
processSubOk rq@RcvQueue {connId} upConnIds serviceRQs serviceId_ =
atomically . whenM (isPendingSub rq) $ do
SS.addActiveSub tSess sessId rq $ currentSubs c
SS.addActiveSub tSess sessId serviceId_ rq $ currentSubs c
modifyTVar' upConnIds (connId :)
when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq :)
clientServiceId_ = (\THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth)
Expand Down Expand Up @@ -3115,16 +3123,26 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
notifyEnd removed
| removed = notify END >> logServer "<--" c srv rId "END"
| otherwise = logServer "<--" c srv rId "END from disconnected client - ignored"
-- TODO [certs rcv]
r@(SMP.ENDS _) -> unexpected r
SMP.ENDS n idsHash ->
atomically (ifM (activeClientSession c tSess sessId) (SS.deleteServiceSub tSess (currentSubs c) $> True) (pure False))
>>= notifyEnd
where
notifyEnd removed
| removed = do
forM_ clientServiceId_ $ \serviceId ->
notify_ B.empty $ SERVICE_END srv $ ServiceSub serviceId n idsHash
logServer "<--" c srv rId "ENDS"
| otherwise = logServer "<--" c srv rId "ENDS from disconnected client - ignored"
-- TODO [certs rcv] Possibly, we need to add some flag to connection that it was deleted
SMP.DELD -> atomically (removeSubscription c tSess connId rq) >> notify DELD
SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
r -> unexpected r
where
notify :: forall e m. (AEntityI e, MonadIO m) => AEvent e -> m ()
notify msg =
let t = ("", connId, AEvt (sAEntity @e) msg)
notify = notify_ connId
notify_ :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m ()
notify_ connId' msg =
let t = ("", connId', AEvt (sAEntity @e) msg)
in atomically $ ifM (isFullTBQueue subQ) (modifyTVar' pendingMsgs (t :)) (writeTBQueue subQ t)

prohibited :: Text -> AM ()
Expand Down
43 changes: 25 additions & 18 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ import Simplex.Messaging.Protocol
NetworkError (..),
MsgFlags (..),
MsgId,
IdsHash,
NtfServer,
NtfServerWithAuth,
ProtoServer,
Expand All @@ -283,6 +282,7 @@ import Simplex.Messaging.Protocol
SProtocolType (..),
ServiceSub (..),
ServiceSubResult (..),
ServiceSubError (..),
SndPublicAuthKey,
SubscriptionMode (..),
NewNtfCreds (..),
Expand Down Expand Up @@ -1420,7 +1420,7 @@ getSessionMode :: AgentClient -> STM TransportSessionMode
getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig
{-# INLINE getSessionMode #-}

newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId)
newRcvQueue c nm userId connId srv vRange cMode enableNtfs subMode = do
let qrd = case cMode of SCMInvitation -> CQRMessaging Nothing; SCMContact -> CQRContact Nothing
e2eKeys <- atomically . C.generateKeyPair =<< asks random
Expand All @@ -1441,7 +1441,7 @@ queueReqData = \case
CQRMessaging d -> QRMessaging $ srvReq <$> d
CQRContact d -> QRContact $ srvReq <$> d

newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId)
newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enableNtfs subMode nonce_ (e2eDhKey, e2ePrivKey) = do
C.AuthAlg a <- asks (rcvAuthAlg . config)
g <- asks random
Expand Down Expand Up @@ -1483,7 +1483,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
deleteErrors = 0
}
qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey queueMode
pure (rq, qUri, tSess, sessionId thParams')
pure (rq, qUri, tSess, sessionId thParams', sessServiceId)
where
mkNtfCreds :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> TVar ChaChaDRG -> SMPClient -> IO (Maybe (C.AAuthKeyPair, C.PrivateKeyX25519), Maybe NewNtfCreds)
mkNtfCreds a g smp
Expand Down Expand Up @@ -1526,23 +1526,23 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl

processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> Maybe ServiceId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ([RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)])
processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
pending <- SS.getPendingSubs tSess $ currentSubs c
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pending) (M.empty, ([], []), [], 0) rs
pendingSubs <- SS.getPendingQueueSubs tSess $ currentSubs c
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pendingSubs) (M.empty, ([], []), [], 0) rs
unless (M.null failed) $ do
incSMPServerStat' c userId srv connSubErrs $ M.size failed
failSubscriptions c tSess failed
unless (null qs && null sQs) $ do
incSMPServerStat' c userId srv connSubscribed $ length qs + length sQs
SS.batchAddActiveSubs tSess sessId subscribed $ currentSubs c
SS.batchAddActiveSubs tSess sessId serviceId_ subscribed $ currentSubs c
unless (ignored == 0) $ incSMPServerStat' c userId srv connSubIgnored ignored
pure (sQs, notices)
where
partitionResults ::
(Map SMP.RecipientId RcvQueueSub, Maybe ServiceSub) ->
Map SMP.RecipientId RcvQueueSub ->
(RcvQueueSub, Either SMPClientError (Maybe ServiceId)) ->
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int) ->
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int)
partitionResults (pendingSubs, pendingSS) (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
partitionResults pendingSubs (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
Left e -> case smpErrorClientNotice e of
Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored)
where
Expand All @@ -1554,8 +1554,8 @@ processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
failed' = M.insert rcvId e failed
Right serviceId_'
| rcvId `M.member` pendingSubs ->
let subscribed' = case (serviceId_, serviceId_', pendingSS) of
(Just sId, Just sId', Just ServiceSub {smpServiceId}) | sId == sId' && sId == smpServiceId -> (qs, rq : sQs)
let subscribed' = case (serviceId_, serviceId_') of
(Just sId, Just sId') | sId == sId' -> (qs, rq : sQs)
_ -> (rq : qs, sQs)
in (failed, subscribed', notices', ignored)
| otherwise -> (failed, subscribed, notices', ignored + 1)
Expand Down Expand Up @@ -1726,11 +1726,18 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do

resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult
resubscribeClientService c tSess@(userId, srv, _) serviceSub =
withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do
when (clientServiceError e) $ do
tryAllErrors (withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub) >>= \case
Right r@(ServiceSubResult e _) -> case e of
Just SSErrorServiceId {} -> unassocSubscribeQueues $> r
_ -> pure r
Left e -> do
when (clientServiceError e) $ unassocSubscribeQueues
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
throwE e
where
unassocSubscribeQueues = do
qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
void $ lift $ subscribeUserServerQueues c userId srv qs
throwE e

-- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult
Expand All @@ -1751,7 +1758,7 @@ withServiceClient c tSess subscribe =

-- TODO [certs rcv] send subscription error event?
subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult
subscribeClientService_ c withEvent tSess@(userId, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribed <- subscribeService smp SMP.SRecipientService n idsHash
let sessId = sessionId $ thParams smp
r = serviceSubResult expected subscribed
Expand Down Expand Up @@ -1821,14 +1828,14 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n
TM.insert k s removedSubs
pure s

addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
addNewQueueSubscription c rq' tSess sessId = do
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> Maybe ServiceId -> AM' ()
addNewQueueSubscription c rq' tSess sessId serviceId_ = do
let rq = rcvQueueSub rq'
same <- atomically $ do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
active <- activeClientSession c tSess sessId
if active
then SS.addActiveSub tSess sessId rq' $ currentSubs c
then SS.addActiveSub tSess sessId serviceId_ rq' $ currentSubs c
else SS.addPendingSub tSess rq $ currentSubs c
pure active
unless same $ resubscribeSMPSession c tSess
Expand Down
3 changes: 3 additions & 0 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ data AEvent (e :: AEntity) where
SERVICE_ALL :: SMPServer -> AEvent AENone -- all service messages are delivered
SERVICE_DOWN :: SMPServer -> ServiceSub -> AEvent AENone
SERVICE_UP :: SMPServer -> ServiceSubResult -> AEvent AENone
SERVICE_END :: SMPServer -> ServiceSub -> AEvent AENone
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn
RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn
SENT :: AgentMsgId -> Maybe SMPServer -> AEvent AEConn
Expand Down Expand Up @@ -467,6 +468,7 @@ data AEventTag (e :: AEntity) where
SERVICE_ALL_ :: AEventTag AENone
SERVICE_DOWN_ :: AEventTag AENone
SERVICE_UP_ :: AEventTag AENone
SERVICE_END_ :: AEventTag AENone
SWITCH_ :: AEventTag AEConn
RSYNC_ :: AEventTag AEConn
SENT_ :: AEventTag AEConn
Expand Down Expand Up @@ -525,6 +527,7 @@ aEventTag = \case
SERVICE_ALL _ -> SERVICE_ALL_
SERVICE_DOWN {} -> SERVICE_DOWN_
SERVICE_UP {} -> SERVICE_UP_
SERVICE_END {} -> SERVICE_END_
SWITCH {} -> SWITCH_
RSYNC {} -> RSYNC_
SENT {} -> SENT_
Expand Down
Loading
Loading