diff --git a/integration/test/Test/Events.hs b/integration/test/Test/Events.hs index d85b42ba32..69d99ba0e1 100644 --- a/integration/test/Test/Events.hs +++ b/integration/test/Test/Events.hs @@ -740,6 +740,53 @@ testEndOfInitialSync = do ackEvent ws e assertNoEvent_ ws +testSyncMarkerMissingAcrossConnections :: (HasCallStack) => App () +testSyncMarkerMissingAcrossConnections = do + (alice, _, cid) <- mkUserPlusClient + emptyQueue alice cid + + lowerCodensity $ do + -- open first websocket + (_, ws1) <- createEventsWebSocketWithSync alice (Just cid) + -- ...and close it immediately + -- if the first websocket is not closed, all evenets including the marker of the second connection + -- will be received on the first socket + -- but if closed, the sync markers are not ACKed and we expect them to be received by the second web socket that will be opened + lift $ killWebSocket ws1 + -- open second websocket + (marker2, ws2) <- createEventsWebSocketWithSync alice (Just cid) + lift $ do + let timeoutMicros = 2_000_000 + -- collect markers on both websockets + markers1Async <- async (collectSyncMarkers ws1 timeoutMicros) + markers2Async <- async (collectSyncMarkers ws2 timeoutMicros) + markers1 <- wait markers1Async + markers2 <- wait markers2Async + let allMarkersReceived = markers1 <> markers2 + -- assert that the second marker has arrived + unless (marker2 `elem` allMarkersReceived) $ assertFailure $ "Expected sync marker not observed: " <> marker2 + where + killWebSocket :: EventWebSocket -> App () + killWebSocket ws = do + void $ tryPutMVar ws.kill () + void $ timeout 1_000_000 (takeMVar ws.done) + + collectSyncMarkers :: (HasCallStack) => EventWebSocket -> Int -> App [String] + collectSyncMarkers ws timeoutMicros = go [] + where + go markers = + timeout timeoutMicros (readChan ws.events) >>= \case + Nothing -> pure (reverse markers) + Just (Left e) -> assertFailure $ "Websocket closed while waiting for synchronization message " <> displayException e + Just (Right e) -> do + ackEvent ws e + t <- e %. "type" & asString + if (t == "synchronization") + then do + markerId <- e %. "data.marker_id" & asString + go (markerId : markers) + else go markers + testEndOfInitialSyncMoreEventsAfterSyncMessage :: (HasCallStack) => App () testEndOfInitialSyncMoreEventsAfterSyncMessage = do (alice, uid, cid) <- mkUserPlusClient diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index c526d3cd9c..8894979210 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -108,6 +108,7 @@ library , metrics-wai >=0.4 , mwc-random >=0.13 , prometheus-client + , random , retry >=0.7 , safe-exceptions , servant-conduit diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 13f7f1950c..f157ea87c6 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -34,6 +34,7 @@ import Data.Text hiding (show) import Data.Text qualified as Text import Data.Text.Lazy qualified as TL import Data.Text.Lazy.Encoding qualified as TLE +import Debug.Trace (traceM) import Imports hiding (min, threadDelay) import Network.AMQP (newQueue) import Network.AMQP qualified as Q @@ -41,6 +42,7 @@ import Network.WebSockets import Network.WebSockets qualified as WS import Network.WebSockets.Connection import System.Logger qualified as Log +import System.Random (randomIO) import System.Timeout import Wire.API.Event.WebSocketProtocol import Wire.API.Notification @@ -55,6 +57,8 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = handle handleTooManyChannels . lowerCodensity $ do (chan, queueInfo) <- createChannel uid mcid e.pool createQueue + rawWsId :: Word32 <- liftIO randomIO + let wsId = fromIntegral (rawWsId `mod` 1000000) conn <- Codensity $ bracket openWebSocket closeWebSocket activity <- liftIO newEmptyMVar let wsConn = @@ -77,8 +81,8 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = ] $ do traverse_ (sendFullSyncMessageIfNeeded wsConn uid e) mcid - traverse_ (Q.publishMsg chan.inner "" queueInfo.queueName . mkSynchronizationMessage e.notificationTTL) (mcid *> mSyncMarkerId) - sendNotifications chan wsConn + traverse_ (publishSyncMarker wsId chan queueInfo) (mcid *> mSyncMarkerId) + sendNotifications wsId chan wsConn let monitor = do timeout wsConn.activityTimeout (takeMVar wsConn.activity) >>= \case @@ -106,14 +110,22 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = -- ignore any exceptions when sending the close message void . try @SomeException $ WS.sendClose wsConn ("" :: ByteString) - getEventData :: RabbitMqChannel -> IO (Either EventData SynchronizationData) - getEventData chan = do + getEventData :: Int -> RabbitMqChannel -> IO (Either EventData SynchronizationData) + getEventData wsId chan = do (msg, envelope) <- getMessage chan case msg.msgType of Just "synchronization" -> do + let marker = TL.toStrict $ TLE.decodeUtf8 msg.msgBody + traceM $ + "sync_marker received ws_id=" + <> show wsId + <> " marker_id=" + <> show marker + <> " delivery_tag=" + <> show envelope.envDeliveryTag let syncData = SynchronizationData - { markerId = TL.toStrict $ TLE.decodeUtf8 msg.msgBody, + { markerId = marker, deliveryTag = envelope.envDeliveryTag } pure $ Right syncData @@ -131,7 +143,7 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = -- that happens. Q.rejectEnv envelope False -- try again - getEventData chan + getEventData wsId chan Right notif -> do logEvent notif pure $ @@ -227,10 +239,10 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = Q.msgType = Just "synchronization" } - sendNotifications :: RabbitMqChannel -> WSConnection -> IO () - sendNotifications chan wsConn = do + sendNotifications :: Int -> RabbitMqChannel -> WSConnection -> IO () + sendNotifications wsId chan wsConn = do let consumeRabbitMq = forever $ do - eventData <- getEventData chan + eventData <- getEventData wsId chan let msg = case eventData of Left event -> EventMessage event Right sync -> EventSyncMessage sync @@ -245,7 +257,7 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = getClientMessage wsConn >>= \case AckFullSync -> throwIO UnexpectedAck AckMessage ackData -> do - logAckReceived ackData + logAckReceived wsId ackData void $ ackMessage chan ackData.deliveryTag ackData.multiple -- run both loops concurrently, so that @@ -274,13 +286,15 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = . Log.field "error" (displayException err) . logClient - logAckReceived :: AckData -> IO () - logAckReceived ackData = - Log.debug e.logg $ - Log.msg (Log.val "Received ACK") - . Log.field "delivery_tag" ackData.deliveryTag - . Log.field "multiple" ackData.multiple - . logClient + logAckReceived :: Int -> AckData -> IO () + logAckReceived wsId ackData = + traceM $ + "ack received ws_id=" + <> show wsId + <> " delivery_tag=" + <> show ackData.deliveryTag + <> " multiple=" + <> show ackData.multiple logCloseWebsocket :: IO () logCloseWebsocket = @@ -288,6 +302,15 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = Log.msg (Log.val "Closing the websocket") . logClient + publishSyncMarker :: Int -> RabbitMqChannel -> QueueInfo -> Text -> IO () + publishSyncMarker wsId chan queueInfo markerId = do + void $ Q.publishMsg chan.inner "" queueInfo.queueName (mkSynchronizationMessage e.notificationTTL markerId) + traceM $ + "sync_marker published ws_id=" + <> show wsId + <> " marker_id=" + <> show markerId + -- | Check if client has missed messages. If so, send a full synchronisation -- message and wait for the corresponding ack. sendFullSyncMessageIfNeeded ::