Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions integration/test/Test/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,53 @@ testEndOfInitialSync = do
ackEvent ws e
assertNoEvent_ ws

testSyncMarkerMissingAcrossConnections :: (HasCallStack) => App ()
testSyncMarkerMissingAcrossConnections = do
(alice, _, cid) <- mkUserPlusClient
emptyQueue alice cid

lowerCodensity $ do
-- open first websocket
(_, ws1) <- createEventsWebSocketWithSync alice (Just cid)
-- ...and close it immediately
-- if the first websocket is not closed, all evenets including the marker of the second connection
-- will be received on the first socket
-- but if closed, the sync markers are not ACKed and we expect them to be received by the second web socket that will be opened
lift $ killWebSocket ws1
-- open second websocket
(marker2, ws2) <- createEventsWebSocketWithSync alice (Just cid)
lift $ do
let timeoutMicros = 2_000_000
-- collect markers on both websockets
markers1Async <- async (collectSyncMarkers ws1 timeoutMicros)
markers2Async <- async (collectSyncMarkers ws2 timeoutMicros)
markers1 <- wait markers1Async
markers2 <- wait markers2Async
let allMarkersReceived = markers1 <> markers2
-- assert that the second marker has arrived
unless (marker2 `elem` allMarkersReceived) $ assertFailure $ "Expected sync marker not observed: " <> marker2
where
killWebSocket :: EventWebSocket -> App ()
killWebSocket ws = do
void $ tryPutMVar ws.kill ()
void $ timeout 1_000_000 (takeMVar ws.done)

collectSyncMarkers :: (HasCallStack) => EventWebSocket -> Int -> App [String]
collectSyncMarkers ws timeoutMicros = go []
where
go markers =
timeout timeoutMicros (readChan ws.events) >>= \case
Nothing -> pure (reverse markers)
Just (Left e) -> assertFailure $ "Websocket closed while waiting for synchronization message " <> displayException e
Just (Right e) -> do
ackEvent ws e
t <- e %. "type" & asString
if (t == "synchronization")
then do
markerId <- e %. "data.marker_id" & asString
go (markerId : markers)
else go markers

testEndOfInitialSyncMoreEventsAfterSyncMessage :: (HasCallStack) => App ()
testEndOfInitialSyncMoreEventsAfterSyncMessage = do
(alice, uid, cid) <- mkUserPlusClient
Expand Down
1 change: 1 addition & 0 deletions services/cannon/cannon.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ library
, metrics-wai >=0.4
, mwc-random >=0.13
, prometheus-client
, random
, retry >=0.7
, safe-exceptions
, servant-conduit
Expand Down
57 changes: 40 additions & 17 deletions services/cannon/src/Cannon/RabbitMqConsumerApp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import Data.Text hiding (show)
import Data.Text qualified as Text
import Data.Text.Lazy qualified as TL
import Data.Text.Lazy.Encoding qualified as TLE
import Debug.Trace (traceM)
import Imports hiding (min, threadDelay)
import Network.AMQP (newQueue)
import Network.AMQP qualified as Q
import Network.WebSockets
import Network.WebSockets qualified as WS
import Network.WebSockets.Connection
import System.Logger qualified as Log
import System.Random (randomIO)
import System.Timeout
import Wire.API.Event.WebSocketProtocol
import Wire.API.Notification
Expand All @@ -55,6 +57,8 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
handle handleTooManyChannels . lowerCodensity $
do
(chan, queueInfo) <- createChannel uid mcid e.pool createQueue
rawWsId :: Word32 <- liftIO randomIO
let wsId = fromIntegral (rawWsId `mod` 1000000)
conn <- Codensity $ bracket openWebSocket closeWebSocket
activity <- liftIO newEmptyMVar
let wsConn =
Expand All @@ -77,8 +81,8 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
]
$ do
traverse_ (sendFullSyncMessageIfNeeded wsConn uid e) mcid
traverse_ (Q.publishMsg chan.inner "" queueInfo.queueName . mkSynchronizationMessage e.notificationTTL) (mcid *> mSyncMarkerId)
sendNotifications chan wsConn
traverse_ (publishSyncMarker wsId chan queueInfo) (mcid *> mSyncMarkerId)
sendNotifications wsId chan wsConn

let monitor = do
timeout wsConn.activityTimeout (takeMVar wsConn.activity) >>= \case
Expand Down Expand Up @@ -106,14 +110,22 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
-- ignore any exceptions when sending the close message
void . try @SomeException $ WS.sendClose wsConn ("" :: ByteString)

getEventData :: RabbitMqChannel -> IO (Either EventData SynchronizationData)
getEventData chan = do
getEventData :: Int -> RabbitMqChannel -> IO (Either EventData SynchronizationData)
getEventData wsId chan = do
(msg, envelope) <- getMessage chan
case msg.msgType of
Just "synchronization" -> do
let marker = TL.toStrict $ TLE.decodeUtf8 msg.msgBody
traceM $
"sync_marker received ws_id="
<> show wsId
<> " marker_id="
<> show marker
<> " delivery_tag="
<> show envelope.envDeliveryTag
let syncData =
SynchronizationData
{ markerId = TL.toStrict $ TLE.decodeUtf8 msg.msgBody,
{ markerId = marker,
deliveryTag = envelope.envDeliveryTag
}
pure $ Right syncData
Expand All @@ -131,7 +143,7 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
-- that happens.
Q.rejectEnv envelope False
-- try again
getEventData chan
getEventData wsId chan
Right notif -> do
logEvent notif
pure $
Expand Down Expand Up @@ -227,10 +239,10 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
Q.msgType = Just "synchronization"
}

sendNotifications :: RabbitMqChannel -> WSConnection -> IO ()
sendNotifications chan wsConn = do
sendNotifications :: Int -> RabbitMqChannel -> WSConnection -> IO ()
sendNotifications wsId chan wsConn = do
let consumeRabbitMq = forever $ do
eventData <- getEventData chan
eventData <- getEventData wsId chan
let msg = case eventData of
Left event -> EventMessage event
Right sync -> EventSyncMessage sync
Expand All @@ -245,7 +257,7 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
getClientMessage wsConn >>= \case
AckFullSync -> throwIO UnexpectedAck
AckMessage ackData -> do
logAckReceived ackData
logAckReceived wsId ackData
void $ ackMessage chan ackData.deliveryTag ackData.multiple

-- run both loops concurrently, so that
Expand Down Expand Up @@ -274,20 +286,31 @@ rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn =
. Log.field "error" (displayException err)
. logClient

logAckReceived :: AckData -> IO ()
logAckReceived ackData =
Log.debug e.logg $
Log.msg (Log.val "Received ACK")
. Log.field "delivery_tag" ackData.deliveryTag
. Log.field "multiple" ackData.multiple
. logClient
logAckReceived :: Int -> AckData -> IO ()
logAckReceived wsId ackData =
traceM $
"ack received ws_id="
<> show wsId
<> " delivery_tag="
<> show ackData.deliveryTag
<> " multiple="
<> show ackData.multiple

logCloseWebsocket :: IO ()
logCloseWebsocket =
Log.debug e.logg $
Log.msg (Log.val "Closing the websocket")
. logClient

publishSyncMarker :: Int -> RabbitMqChannel -> QueueInfo -> Text -> IO ()
publishSyncMarker wsId chan queueInfo markerId = do
void $ Q.publishMsg chan.inner "" queueInfo.queueName (mkSynchronizationMessage e.notificationTTL markerId)
traceM $
"sync_marker published ws_id="
<> show wsId
<> " marker_id="
<> show markerId

-- | Check if client has missed messages. If so, send a full synchronisation
-- message and wait for the corresponding ack.
sendFullSyncMessageIfNeeded ::
Expand Down