diff --git a/changelog.d/0-release-notes/WPB-22811 b/changelog.d/0-release-notes/WPB-22811 new file mode 100644 index 0000000000..e30e9e739b --- /dev/null +++ b/changelog.d/0-release-notes/WPB-22811 @@ -0,0 +1,5 @@ +Conversation codes can now be migrated to PostgreSQL. For existing installations: +- Set `postgresMigration.conversationCodes: migration-to-postgresql` in both `galley` and `background-worker`. +- Run the backfill with `migrateConversationCodes: true`. +- Wait for `wire_conv_codes_migration_finished` to reach `1.0`. +- Switch to `postgresMigration.conversationCodes: postgresql` and disable `migrateConversationCodes`. diff --git a/changelog.d/5-internal/WPB-22811 b/changelog.d/5-internal/WPB-22811 index 6d1749a04b..2d98a74327 100644 --- a/changelog.d/5-internal/WPB-22811 +++ b/changelog.d/5-internal/WPB-22811 @@ -1 +1 @@ -Moved CodeStore from galley to subsystems +Migration of conversation codes from cassandra to postgres (#4959, #4961) diff --git a/charts/background-worker/templates/configmap.yaml b/charts/background-worker/templates/configmap.yaml index 81e78f5ca7..6c84d80876 100644 --- a/charts/background-worker/templates/configmap.yaml +++ b/charts/background-worker/templates/configmap.yaml @@ -90,6 +90,7 @@ data: {{- end }} migrateConversations: {{ .migrateConversations }} + migrateConversationCodes: {{ .migrateConversationCodes }} migrateConversationsOptions: {{toYaml .migrateConversationsOptions | indent 6 }} diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index 5b729f1472..2896d749e8 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -69,6 +69,10 @@ config: migrateConversationsOptions: pageSize: 10000 parallelism: 2 + # This will start the migration of conversation codes. + # It's important to set `settings.postgresMigration.conversationCodes` to `migration-to-postgresql` + # before starting the migration. + migrateConversationCodes: false backendNotificationPusher: pushBackoffMinWait: 10000 # in microseconds, so 10ms @@ -87,6 +91,7 @@ config: # Controls where conversation data is stored/accessed postgresMigration: conversation: cassandra + conversationCodes: cassandra secrets: {} diff --git a/charts/galley/values.yaml b/charts/galley/values.yaml index a8ac5e2b3b..f4ac3331c5 100644 --- a/charts/galley/values.yaml +++ b/charts/galley/values.yaml @@ -71,6 +71,7 @@ config: postgresMigration: conversation: cassandra + conversationCodes: cassandra settings: httpPoolSize: 128 maxTeamSize: 10000 diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index d72b8158fa..92f52cbf46 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1811,79 +1811,94 @@ galley: config: postgresMigration: conversation: postgresql + conversationCodes: postgresql background-worker: config: postgresMigration: conversation: postgresql + conversationCodes: postgresql migrateConversations: false ``` #### Migration for existing installations -Existing installations should migrate the conversation data to PostgreSQL from +Existing installations should migrate conversation data to PostgreSQL from Cassandra. This is necessary for channel search and management of channels from the team-management UI. It is highly recommended to take a backup of the Galley Cassandra before triggering the migration. -The migration needs to happen in 3 steps: +Migrations are independent and can be run separately, in batches, or all at +once. This is expected, because migrations will be released over time. The +pattern below applies per store. Use it for `conversation` and +`conversationCodes` now, and for future stores as they are added. -1. Prepare wire-server for migration. +**Migration pattern per store(s)** - This step make sure that wire-server keep working as expected during the - migration. To do this deploy wire-server with this config change: - - Configure both `galley` and `background-worker` so that newly created - conversations are written to PostgreSQL while existing data still reads from - Cassandra: +1. Prepare the selected store(s) for migration by setting + `postgresMigration.` to `migration-to-postgresql`. This enables the + migration interpreter for that store, which ensures data is written to + PostgreSQL (store-specific details are handled internally). + The configuration must be consistent across `galley` and + `background-worker`. ```yaml galley: config: postgresMigration: conversation: migration-to-postgresql + conversationCodes: migration-to-postgresql background-worker: config: postgresMigration: conversation: migration-to-postgresql + conversationCodes: migration-to-postgresql migrateConversations: false + migrateConversationCodes: false ``` - This change should restart all the galley pods, any new conversations will - now be written to PostgreSQL. - -2. Trigger the migration and wait. + This change should restart all the galley pods, and new writes will follow + the migration interpreter. - This step will actually carry out the migration. To do this deploy - wire-server with this config change: +2. Run the backfill for the selected store(s) via background-worker. ```yaml background-worker: config: migrateConversations: true + migrateConversationCodes: true ``` - This change should restart the background-worker pods. It is recommended to - watch the logs and wait for both of these two metrics to report `1.0`: - `wire_local_convs_migration_finished` and `wire_user_remote_convs_migration_finished`. - This can take a long time depending on number of conversations in the DB. - -3. Configure wire-server to only use PostgreSQL for conversations. + Wait for the store-specific migration metrics to reach `1.0`. For + conversations: `wire_local_convs_migration_finished` and + `wire_user_remote_convs_migration_finished`. For conversation codes: + `wire_conv_codes_migration_finished`. - This will be the configuration which must be used from now on for every new - release. +3. Cut over reads and writes to PostgreSQL for the selected store(s). This + configuration must be used from now on for every new release. ```yaml galley: config: postgresMigration: conversation: postgresql + conversationCodes: postgresql background-worker: config: postgresMigration: conversation: postgresql + conversationCodes: postgresql migrateConversations: false + migrateConversationCodes: false ``` +**How to run migrations independently or in batches** + +- To migrate a single store, set only that store’s `postgresMigration.` + and `migrate` flags; leave others unchanged. +- To migrate a batch, set multiple stores to `migration-to-postgresql` and + enable only the matching `migrate` flags together. +- To reduce load, run large stores alone and group small stores together. + ## Configure Cells If Cells integration is enabled, gundeck must be configured with the name of diff --git a/hack/helm_vars/common.yaml.gotmpl b/hack/helm_vars/common.yaml.gotmpl index bce592348c..e1974ced47 100644 --- a/hack/helm_vars/common.yaml.gotmpl +++ b/hack/helm_vars/common.yaml.gotmpl @@ -13,11 +13,9 @@ dynBackendDomain1: dynamic-backend-1.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster dynBackendDomain2: dynamic-backend-2.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster.local dynBackendDomain3: dynamic-backend-3.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster.local -{{- if (eq (env "PREFERRED_STORE") "") }} -conversationStore: cassandra -{{- else }} -conversationStore: {{ env "PREFERRED_STORE" }} -{{- end }} +{{- $preferredStore := default "cassandra" (env "PREFERRED_STORE") }} +conversationStore: {{ $preferredStore }} +conversationCodesStore: {{ $preferredStore }} {{- if (eq (env "UPLOAD_XML_S3_BASE_URL") "") }} uploadXml: {} diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index ec4ea1670a..5efdf49dbd 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -306,6 +306,7 @@ galley: enableFederation: true # keep in sync with brig.config.enableFederation, cargohold.config.enableFederation and tags.federator! postgresMigration: conversation: {{ .Values.conversationStore }} + conversationCodes: {{ .Values.conversationCodesStore }} settings: maxConvAndTeamSize: 16 maxTeamSize: 32 @@ -673,6 +674,7 @@ background-worker: federationDomain: integration.example.com postgresMigration: conversation: {{ .Values.conversationStore }} + conversationCodes: {{ .Values.conversationCodesStore }} rabbitmq: port: 5671 adminPort: 15671 diff --git a/integration/integration.cabal b/integration/integration.cabal index 72c0da80ef..fae812e6f5 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -126,7 +126,6 @@ library Test.Client Test.Connection Test.Conversation - Test.Conversation.Migration Test.Demo Test.DNSMock Test.DomainVerification @@ -174,6 +173,9 @@ library Test.LegalHold Test.Login Test.MessageTimer + Test.Migration.Conversation + Test.Migration.ConversationCodes + Test.Migration.Util Test.MLS Test.MLS.Clients Test.MLS.History diff --git a/integration/test/Test/Conversation/Migration.hs b/integration/test/Test/Migration/Conversation.hs similarity index 93% rename from integration/test/Test/Conversation/Migration.hs rename to integration/test/Test/Migration/Conversation.hs index 97472960d4..dfb80e91a7 100644 --- a/integration/test/Test/Conversation/Migration.hs +++ b/integration/test/Test/Migration/Conversation.hs @@ -10,26 +10,22 @@ -- The tests are from the perspective of mel, a user on the dynamic backend, -- called backendM (migrating backend). There are also users called mark and mia -- on this backend. -module Test.Conversation.Migration where +module Test.Migration.Conversation where import API.Galley import Control.Applicative -import Control.Concurrent (threadDelay) import Control.Monad.Codensity import Control.Monad.Reader import Data.IntMap (IntMap) import qualified Data.IntMap as IntMap import qualified Data.IntSet as IntSet -import Data.Text (Text) -import qualified Data.Text as Text -import qualified Data.Text.Encoding as Text import GHC.Stack import MLS.Util import Notifications import SetupHelpers hiding (deleteUser) +import Test.Migration.Util import Testlib.Prelude import Testlib.ResourcePool -import Text.Regex.TDFA ((=~)) import UnliftIO -- | Our test setup cannot process updates to many MLS convs concurrently, so we @@ -84,7 +80,9 @@ testMigrationToPostgresMLS = do actualConvs `shouldMatchSet` ((convIdToQidObject <$> expectedConvs) <> otherMelConvs) - when (phase == 3) $ waitForMigration domainM + when (phase == 3) $ do + waitForMigration domainM convMigrationFinishedCounterName + waitForMigration domainM userMigrationFinishedCounterName runPhase 1 runPhase 2 runPhase 3 @@ -191,7 +189,9 @@ testMigrationToPostgresProteus = do actualConvs `shouldMatchSet` ((convIdToQidObject <$> expectedConvs) <> otherMelConvs) - when (phase == 3) $ waitForMigration domainM + when (phase == 3) $ do + waitForMigration domainM convMigrationFinishedCounterName + waitForMigration domainM userMigrationFinishedCounterName runPhase 1 runPhase 2 runPhase 3 @@ -292,17 +292,11 @@ instance Semigroup TestConvList where addMelConvs = IntMap.unionWith (<>) l1.addMelConvs l2.addMelConvs } -waitForMigration :: (HasCallStack) => String -> App () -waitForMigration domainM = do - metrics <- - getMetrics domainM BackgroundWorker `bindResponse` \resp -> do - resp.status `shouldMatchInt` 200 - pure $ Text.decodeUtf8 resp.body - let (_, _, _, convFinishedMatches) :: (Text, Text, Text, [Text]) = (metrics =~ Text.pack "^wire_local_convs_migration_finished\\ ([0-9]+\\.[0-9]+)$") - let (_, _, _, userFinishedMatches) :: (Text, Text, Text, [Text]) = (metrics =~ Text.pack "^wire_user_remote_convs_migration_finished\\ ([0-9]+\\.[0-9]+)$") - when (convFinishedMatches /= [Text.pack "1.0"] || userFinishedMatches /= [Text.pack "1.0"]) $ do - liftIO $ threadDelay 100_000 - waitForMigration domainM +convMigrationFinishedCounterName :: String +convMigrationFinishedCounterName = "^wire_local_convs_migration_finished" + +userMigrationFinishedCounterName :: String +userMigrationFinishedCounterName = "^wire_user_remote_convs_migration_finished" phase1Overrides, phase2Overrides, phase3Overrides, phase4Overrides, phase5Overrides :: ServiceOverrides phase1Overrides = diff --git a/integration/test/Test/Migration/ConversationCodes.hs b/integration/test/Test/Migration/ConversationCodes.hs new file mode 100644 index 0000000000..c56fd73c9a --- /dev/null +++ b/integration/test/Test/Migration/ConversationCodes.hs @@ -0,0 +1,183 @@ +module Test.Migration.ConversationCodes where + +import API.Galley +import Control.Applicative +import Control.Concurrent.Timeout +import Control.Monad.Codensity +import Control.Monad.Reader +import SetupHelpers +import Test.Migration.Util (waitForMigration) +import Testlib.Prelude +import Testlib.ResourcePool + +testConversationCodesMigration :: (HasCallStack) => TaggedBool "has-password" -> App () +testConversationCodesMigration (TaggedBool hasPassword) = do + resourcePool <- asks (.resourcePool) + let pw = if hasPassword then Just "funky password" else Nothing + + runCodensity (acquireResources 1 resourcePool) $ \[backend] -> do + let domain = backend.berDomain + + (admin, code1, codeA, convs, members) <- runCodensity (startDynamicBackend backend (conf "cassandra" False)) $ \_ -> do + (admin, _, members) <- createTeam domain 6 + convs1@(conv1 : _) <- replicateM 5 $ postConversation admin (allowGuests defProteus) >>= getJSON 201 + convs2@(convA : _) <- replicateM 4 $ postConversation admin (allowGuests defProteus) >>= getJSON 201 + code1 <- genCode admin conv1 pw + codeA <- genCode admin convA pw + pure (admin, code1, codeA, convs1 <> convs2, members) + + [conv1, conv2, conv3, conv4, conv5, convA, convB, convC, convD] <- pure convs + m1 : m2 : m3 : m4 : _ <- pure members + + (code2, codeB) <- runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" False)) $ \_ -> do + -- code generation works + code2 <- genCode admin conv2 pw + codeB <- genCode admin convB pw + -- joining works + checkJoinAndGet admin m1 conv1 code1 pw + checkJoinAndGet admin m1 conv2 code2 pw + -- deletion works + checkDelete admin m1 convA codeA pw + pure (code2, codeB) + + (code3, codeC) <- runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" True)) $ \_ -> do + -- code generation works + code3 <- genCode admin conv3 pw + codeC <- genCode admin convC pw + -- joining works + checkJoinAndGet admin m2 conv1 code1 pw + checkJoinAndGet admin m2 conv2 code2 pw + checkJoinAndGet admin m2 conv3 code3 pw + -- deletion works + checkNoCode admin m1 convA codeA pw + checkDelete admin m1 convB codeB pw + waitForMigration domain counterName + pure (code3, codeC) + + (code4, codeD) <- runCodensity (startDynamicBackend backend (conf "migration-to-postgresql" False)) $ \_ -> do + -- code generation works + code4 <- genCode admin conv4 pw + codeD <- genCode admin convD pw + -- joining works + checkJoinAndGet admin m3 conv1 code1 pw + checkJoinAndGet admin m3 conv2 code2 pw + checkJoinAndGet admin m3 conv3 code3 pw + checkJoinAndGet admin m3 conv4 code4 pw + -- deletion works + checkNoCode admin m1 convA codeA pw + checkNoCode admin m1 convB codeB pw + checkDelete admin m1 convC codeC pw + pure (code4, codeD) + + runCodensity (startDynamicBackend backend (conf "postgresql" False)) $ \_ -> do + -- code generation works + code5 <- genCode admin conv5 pw + -- joining works + checkJoinAndGet admin m4 conv1 code1 pw + checkJoinAndGet admin m4 conv2 code2 pw + checkJoinAndGet admin m4 conv3 code3 pw + checkJoinAndGet admin m4 conv4 code4 pw + checkJoinAndGet admin m4 conv5 code5 pw + -- deletion works + checkNoCode admin m1 convA codeA pw + checkNoCode admin m1 convB codeB pw + checkNoCode admin m1 convC codeC pw + checkDelete admin m1 convD codeD pw + checkDelete admin m1 conv5 code5 pw + where + checkJoinAndGet admin user conv code pw = do + joinWithCode user conv code + getCode admin conv pw `shouldMatch` code + checkDelete admin user conv (k, v) pw = do + assertSuccess =<< deleteConversationCode admin conv + checkNoCode admin user conv (k, v) pw + checkNoCode admin user conv (k, v) pw = do + assertStatus 404 =<< getConversationCode admin conv pw + bindResponse (getJoinCodeConv user k v) $ \res -> do + res.status `shouldMatchInt` 404 + res.json %. "label" `shouldMatch` "no-conversation-code" + +testConversationCodesMigrationExpiration :: (HasCallStack) => App () +testConversationCodesMigrationExpiration = do + resourcePool <- asks (.resourcePool) + let pw = Nothing + + runCodensity (acquireResources 1 resourcePool) $ \[backend] -> do + let domain = backend.berDomain + + (admin, code1, conv, mem) <- runCodensity (startDynamicBackend backend (confWithExpiry "cassandra" False 2)) $ \_ -> do + (admin, _, mem : _) <- createTeam domain 2 + conv <- postConversation admin (allowGuests defProteus) >>= getJSON 201 + code1 <- genCode admin conv pw + pure (admin, code1, conv, mem) + + code2 <- runCodensity (startDynamicBackend backend (confWithExpiry "migration-to-postgresql" False 2)) $ \_ -> do + waitForCodeToExpire admin conv pw + checkCantJoin mem code1 + genCode admin conv pw + + code3 <- runCodensity (startDynamicBackend backend (confWithExpiry "migration-to-postgresql" True 2)) $ \_ -> do + waitForCodeToExpire admin conv pw + checkCantJoin mem code2 + genCode admin conv pw + + code4 <- runCodensity (startDynamicBackend backend (confWithExpiry "migration-to-postgresql" False 2)) $ \_ -> do + waitForCodeToExpire admin conv pw + checkCantJoin mem code3 + genCode admin conv pw + runCodensity (startDynamicBackend backend (confWithExpiry "postgresql" False 2)) $ \_ -> do + waitForCodeToExpire admin conv pw + checkCantJoin mem code4 + where + checkCantJoin user (k, v) = do + bindResponse (getJoinCodeConv user k v) $ \res -> do + res.status `shouldMatchInt` 404 + res.json %. "label" `shouldMatch` "no-conversation-code" + +-- HELPER + +genCode :: (HasCallStack, MakesValue user, MakesValue conv) => user -> conv -> Maybe String -> App (String, String) +genCode user conv pw = + bindResponse (postConversationCode user conv pw Nothing) $ \res -> do + payload <- getJSON 201 res + k <- payload %. "data.key" & asString + v <- payload %. "data.code" & asString + pure (k, v) + +getCode :: (HasCallStack, MakesValue user, MakesValue conv) => user -> conv -> Maybe String -> App (String, String) +getCode user conv pw = + bindResponse (getConversationCode user conv pw) $ \res -> do + payload <- getJSON 200 res + k <- payload %. "key" & asString + v <- payload %. "code" & asString + pure (k, v) + +waitForCodeToExpire :: (MakesValue user, MakesValue conv) => user -> conv -> Maybe String -> App () +waitForCodeToExpire user conv pw = do + res <- getConversationCode user conv pw + if res.status == 404 + then pure () + else do + liftIO $ threadDelay 100_000 + waitForCodeToExpire user conv pw + +joinWithCode :: (HasCallStack, MakesValue user) => user -> Value -> (String, String) -> App () +joinWithCode user conv (k, v) = + bindResponse (getJoinCodeConv user k v) $ \res -> do + res.status `shouldMatchInt` 200 + res.json %. "id" `shouldMatch` (objQidObject conv & objId) + +conf :: String -> Bool -> ServiceOverrides +conf db runMigration = confWithExpiry db runMigration 604800 + +confWithExpiry :: String -> Bool -> Int -> ServiceOverrides +confWithExpiry db runMigration expiry = + def + { galleyCfg = + setField "postgresMigration.conversationCodes" db + >=> setField "settings.guestLinkTTLSeconds" expiry, + backgroundWorkerCfg = setField "migrateConversationCodes" runMigration + } + +counterName :: String +counterName = "^wire_conv_codes_migration_finished" diff --git a/integration/test/Test/Migration/Util.hs b/integration/test/Test/Migration/Util.hs new file mode 100644 index 0000000000..f55db0c58f --- /dev/null +++ b/integration/test/Test/Migration/Util.hs @@ -0,0 +1,23 @@ +module Test.Migration.Util where + +import Control.Applicative +import Control.Concurrent (threadDelay) +import Control.Monad.Reader +import Data.Text (Text) +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text +import GHC.Stack +import SetupHelpers hiding (deleteUser) +import Testlib.Prelude +import Text.Regex.TDFA ((=~)) + +waitForMigration :: (HasCallStack) => String -> String -> App () +waitForMigration domain name = do + metrics <- + getMetrics domain BackgroundWorker `bindResponse` \resp -> do + resp.status `shouldMatchInt` 200 + pure $ Text.decodeUtf8 resp.body + let (_, _, _, finishedMatches) :: (Text, Text, Text, [Text]) = (metrics =~ Text.pack (name <> "\\ ([0-9]+\\.[0-9]+)$")) + when (finishedMatches /= [Text.pack "1.0"]) $ do + liftIO $ threadDelay 100_000 + waitForMigration domain name diff --git a/libs/wire-api/src/Wire/API/Password.hs b/libs/wire-api/src/Wire/API/Password.hs index dc5cb9b9df..dfb16d1d25 100644 --- a/libs/wire-api/src/Wire/API/Password.hs +++ b/libs/wire-api/src/Wire/API/Password.hs @@ -36,11 +36,13 @@ import Data.ByteString.Lazy (fromStrict, toStrict) import Data.Misc import Data.OpenApi qualified as S import Data.Schema +import Data.Text qualified as Text import Data.Text.Encoding qualified as Text import Imports import OpenSSL.Random (randBytes) import Wire.API.Password.Argon2id import Wire.API.Password.Scrypt +import Wire.API.PostgresMarshall -- | A derived, stretched password that can be safely stored. data Password @@ -50,6 +52,22 @@ data Password instance Show Password where show _ = "" +------------------------------------------------------------------------------- +-- PSQL + +instance PostgresMarshall ByteString Password where + postgresMarshall = + Text.encodeUtf8 . \case + Argon2Password p -> encodeArgon2HashedPassword p + ScryptPassword p -> encodeScryptPassword p + +instance PostgresUnmarshall ByteString Password where + postgresUnmarshall = + mapLeft Text.pack . parsePassword . Text.decodeUtf8 + +------------------------------------------------------------------------------- +-- CQL + instance Cql Password where ctype = Tagged BlobColumn diff --git a/libs/wire-api/src/Wire/API/PostgresMarshall.hs b/libs/wire-api/src/Wire/API/PostgresMarshall.hs index ee78384312..666b5b78c4 100644 --- a/libs/wire-api/src/Wire/API/PostgresMarshall.hs +++ b/libs/wire-api/src/Wire/API/PostgresMarshall.hs @@ -27,7 +27,9 @@ where import Data.Aeson import Data.Bifunctor (first) import Data.ByteString qualified as BS +import Data.ByteString.Conversion (toByteString') import Data.ByteString.Conversion qualified as BSC +import Data.Code qualified as Code import Data.Domain import Data.Id import Data.Misc @@ -530,6 +532,12 @@ instance (PostgresMarshall b a) => PostgresMarshall (Vector b) (Set a) where instance (PostgresMarshall a b) => PostgresMarshall (Vector a) (Vector b) where postgresMarshall = V.map postgresMarshall +instance PostgresMarshall Text Code.Key where + postgresMarshall = Text.decodeUtf8 . toByteString' + +instance PostgresMarshall Text Code.Value where + postgresMarshall = Text.decodeUtf8 . toByteString' + --- class PostgresUnmarshall db domain where @@ -855,6 +863,12 @@ instance (PostgresUnmarshall a b, Ord b) => PostgresUnmarshall (Vector a) (Set b instance PostgresUnmarshall Int64 Milliseconds where postgresUnmarshall = Right . int64ToMs +instance PostgresUnmarshall Text Code.Key where + postgresUnmarshall = mapLeft Text.pack . BSC.runParser BSC.parser . Text.encodeUtf8 + +instance PostgresUnmarshall Text Code.Value where + postgresUnmarshall = mapLeft Text.pack . BSC.runParser BSC.parser . Text.encodeUtf8 + --- lmapPG :: (PostgresMarshall db domain, Profunctor p) => p db x -> p domain x diff --git a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs new file mode 100644 index 0000000000..31ce898c43 --- /dev/null +++ b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs @@ -0,0 +1,83 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . +{-# OPTIONS_GHC -Wno-orphans #-} + +module Test.Wire.API.Roundtrip.PostgresMarshall (tests) where + +import Crypto.Error (CryptoFailable (..)) +import Crypto.KDF.Argon2 qualified as Argon2 +import Data.ByteString.Char8 qualified as BS8 +import Data.Code qualified as Code +import Data.Misc (PlainTextPassword8, fromPlainTextPassword) +import Data.Text.Encoding (encodeUtf8) +import Imports +import Test.Tasty qualified as T +import Test.Tasty.QuickCheck +import Type.Reflection (typeRep) +import Wire.API.Password as Password +import Wire.API.Password.Argon2id (Argon2HashedPassword (..), encodeArgon2HashedPassword) +import Wire.API.Password.Scrypt (encodeScryptPassword) +import Wire.API.PostgresMarshall +import Wire.Arbitrary qualified as Arbitrary () + +tests :: T.TestTree +tests = + T.localOption (T.Timeout (60 * 1000000) "60s") . T.testGroup "PostgresMarshall roundtrip tests" $ + [ testRoundTrip @Text @Code.Key, + testRoundTrip @Text @Code.Value, + testRoundTrip @ByteString @Password.Password + ] + +testRoundTrip :: + forall db domain. + (Arbitrary domain, Typeable domain, PostgresMarshall db domain, PostgresUnmarshall db domain, Eq domain, Show domain) => + T.TestTree +testRoundTrip = testProperty msg trip + where + msg = show (typeRep @domain) + trip (value :: domain) = + counterexample (show value) $ + Right value === (postgresUnmarshall . postgresMarshall @db) value + +instance Arbitrary Password where + arbitrary = Argon2Password . hashPlaintext <$> (arbitrary :: Gen PlainTextPassword8) + where + hashPlaintext plain = + let opts = + Argon2.Options + { variant = Argon2.Argon2id, + version = Argon2.Version13, + iterations = 1, + parallelism = 1, + memory = 8 + } + salt = BS8.pack "static-salt-1234" + password = encodeUtf8 (fromPlainTextPassword plain) + hashedKey = hashWithOptions opts password salt + in Argon2HashedPassword {opts, salt, hashedKey} + hashWithOptions opts password salt = + let tagSize = 16 + in case Argon2.hash opts password salt tagSize of + CryptoFailed err -> error $ "argon2 hash failed: " <> show err + CryptoPassed hash -> hash + +instance Eq Password where + p1 == p2 = passwordText p1 == passwordText p2 + where + passwordText = \case + Argon2Password p -> encodeArgon2HashedPassword p + ScryptPassword p -> encodeScryptPassword p diff --git a/libs/wire-api/test/unit/Test/Wire/API/Run.hs b/libs/wire-api/test/unit/Test/Wire/API/Run.hs index 0a083cd4fe..cf0f89456c 100644 --- a/libs/wire-api/test/unit/Test/Wire/API/Run.hs +++ b/libs/wire-api/test/unit/Test/Wire/API/Run.hs @@ -31,6 +31,7 @@ import Test.Wire.API.Roundtrip.ByteString qualified as Roundtrip.ByteString import Test.Wire.API.Roundtrip.CSV qualified as Roundtrip.CSV import Test.Wire.API.Roundtrip.HttpApiData qualified as Roundtrip.HttpApiData import Test.Wire.API.Roundtrip.MLS qualified as Roundtrip.MLS +import Test.Wire.API.Roundtrip.PostgresMarshall as PostgresMarshall import Test.Wire.API.Routes qualified as Routes import Test.Wire.API.Routes.Version qualified as Routes.Version import Test.Wire.API.Routes.Version.Wai qualified as Routes.Version.Wai @@ -67,5 +68,6 @@ main = Routes.Version.tests, unsafePerformIO Routes.Version.Wai.tests, RawJson.tests, - OAuth.tests + OAuth.tests, + PostgresMarshall.tests ] diff --git a/libs/wire-api/wire-api.cabal b/libs/wire-api/wire-api.cabal index eb5c79e655..cc4ec01bb5 100644 --- a/libs/wire-api/wire-api.cabal +++ b/libs/wire-api/wire-api.cabal @@ -710,6 +710,7 @@ test-suite wire-api-tests Test.Wire.API.Roundtrip.CSV Test.Wire.API.Roundtrip.HttpApiData Test.Wire.API.Roundtrip.MLS + Test.Wire.API.Roundtrip.PostgresMarshall Test.Wire.API.Routes Test.Wire.API.Routes.Version Test.Wire.API.Routes.Version.Wai diff --git a/libs/wire-subsystems/postgres-migrations/20260115150600-conversation-codes.sql b/libs/wire-subsystems/postgres-migrations/20260115150600-conversation-codes.sql new file mode 100644 index 0000000000..533b2e24a9 --- /dev/null +++ b/libs/wire-subsystems/postgres-migrations/20260115150600-conversation-codes.sql @@ -0,0 +1,16 @@ +CREATE TABLE conversation_codes ( + key text NOT NULL, + conversation uuid NOT NULL, + password bytea, + value text NOT NULL, + expires_at timestamptz NOT NULL, + PRIMARY KEY (key) +); + +-- index for lookups like `WHERE key = ? AND scope = ? AND expires_at > now()` +CREATE INDEX conversation_codes_key_expires_at_idx + ON conversation_codes (key, expires_at); + +-- index for deletes like `DELETE ... WHERE expires_at <= now()` +CREATE INDEX conversation_codes_expires_at_idx + ON conversation_codes (expires_at); diff --git a/libs/wire-subsystems/src/Wire/CodeStore.hs b/libs/wire-subsystems/src/Wire/CodeStore.hs index 79946cc04d..528b1b2335 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore.hs @@ -29,10 +29,10 @@ import Wire.CodeStore.Code data CodeStore m a where CreateCode :: Code -> Maybe Password -> CodeStore m () - GetCode :: Key -> Scope -> CodeStore m (Maybe (Code, Maybe Password)) - DeleteCode :: Key -> Scope -> CodeStore m () + GetCode :: Key -> CodeStore m (Maybe (Code, Maybe Password)) + DeleteCode :: Key -> CodeStore m () MakeKey :: ConvId -> CodeStore m Key - GenerateCode :: ConvId -> Scope -> Timeout -> CodeStore m Code + GenerateCode :: ConvId -> Timeout -> CodeStore m Code GetConversationCodeURI :: Maybe Text -> CodeStore m (Maybe HttpsUrl) makeSem ''CodeStore diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/CodeStore/Cassandra.hs index a49d7c2a57..d320f13369 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore/Cassandra.hs @@ -41,16 +41,16 @@ interpretCodeStoreToCassandra :: Sem (CodeStore ': r) a -> Sem r a interpretCodeStoreToCassandra = interpret $ \case - GetCode k s -> do - embedClientInput $ lookupCode k s + GetCode k -> do + embedClientInput $ lookupCode k CreateCode code mPw -> do embedClientInput $ insertCode code mPw - DeleteCode k s -> do - embedClientInput $ deleteCode k s + DeleteCode k -> do + embedClientInput $ deleteCode k MakeKey cid -> do Code.mkKey cid - GenerateCode cid s t -> do - Code.generate cid s t + GenerateCode cid t -> do + Code.generate cid t GetConversationCodeURI mbHost -> do convCodeURI <- input case convCodeURI of @@ -67,14 +67,13 @@ insertCode c mPw = do let v = codeValue c let cnv = codeConversation c let t = round (codeTTL c) - let s = codeScope c - retry x5 (write Cql.insertCode (params LocalQuorum (k, v, cnv, s, mPw, t))) + retry x5 (write Cql.insertCode (params LocalQuorum (k, v, cnv, mPw, t))) -- | Lookup a conversation by code. -lookupCode :: Key -> Scope -> Client (Maybe (Code, Maybe Password)) -lookupCode k s = - fmap (toCode k s) <$> retry x1 (query1 Cql.lookupCode (params LocalQuorum (k, s))) +lookupCode :: Key -> Client (Maybe (Code, Maybe Password)) +lookupCode k = + fmap (toCode k) <$> retry x1 (query1 Cql.lookupCode (params LocalQuorum (Identity k))) -- | Delete a code associated with the given conversation key -deleteCode :: Key -> Scope -> Client () -deleteCode k s = retry x5 $ write Cql.deleteCode (params LocalQuorum (k, s)) +deleteCode :: Key -> Client () +deleteCode k = retry x5 $ write Cql.deleteCode (params LocalQuorum (Identity k)) diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Cassandra/Queries.hs b/libs/wire-subsystems/src/Wire/CodeStore/Cassandra/Queries.hs index e0eed6d0e8..23b31b8e9a 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore/Cassandra/Queries.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore/Cassandra/Queries.hs @@ -22,13 +22,15 @@ import Data.Id import Imports import Wire.API.Conversation.Code import Wire.API.Password (Password) -import Wire.CodeStore.Scope -insertCode :: PrepQuery W (Key, Value, ConvId, Scope, Maybe Password, Int32) () -insertCode = "INSERT INTO conversation_codes (key, value, conversation, scope, password) VALUES (?, ?, ?, ?, ?) USING TTL ?" +insertCode :: PrepQuery W (Key, Value, ConvId, Maybe Password, Int32) () +insertCode = "INSERT INTO conversation_codes (key, value, conversation, scope, password) VALUES (?, ?, ?, 1, ?) USING TTL ?" -lookupCode :: PrepQuery R (Key, Scope) (Value, Int32, ConvId, Maybe Password) -lookupCode = "SELECT value, ttl(value), conversation, password FROM conversation_codes WHERE key = ? AND scope = ?" +lookupCode :: PrepQuery R (Identity Key) (Value, Int32, ConvId, Maybe Password) +lookupCode = "SELECT value, ttl(value), conversation, password FROM conversation_codes WHERE key = ? AND scope = 1" -deleteCode :: PrepQuery W (Key, Scope) () -deleteCode = "DELETE FROM conversation_codes WHERE key = ? AND scope = ?" +deleteCode :: PrepQuery W (Identity Key) () +deleteCode = "DELETE FROM conversation_codes WHERE key = ? AND scope = 1" + +selectAllCodes :: PrepQuery R () (Key, Value, Int32, ConvId, Maybe Password) +selectAllCodes = "SELECT key, value, ttl(value), conversation, password FROM conversation_codes" diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Code.hs b/libs/wire-subsystems/src/Wire/CodeStore/Code.hs index f49588d1b0..c5c497e44b 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore/Code.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore/Code.hs @@ -19,7 +19,6 @@ module Wire.CodeStore.Code ( Code (..), - Scope (..), toCode, generate, mkKey, @@ -36,26 +35,23 @@ import Imports import OpenSSL.EVP.Digest (digestBS, getDigestByName) import OpenSSL.Random (randBytes) import Wire.API.Password (Password) -import Wire.CodeStore.Scope data Code = Code { codeKey :: !Key, codeValue :: !Value, codeTTL :: !Timeout, codeConversation :: !ConvId, - codeScope :: !Scope, codeHasPassword :: !Bool } deriving (Eq, Show, Generic) -toCode :: Key -> Scope -> (Value, Int32, ConvId, Maybe Password) -> (Code, Maybe Password) -toCode k s (val, ttl, cnv, mPw) = +toCode :: Key -> (Value, Int32, ConvId, Maybe Password) -> (Code, Maybe Password) +toCode k (val, ttl, cnv, mPw) = ( Code { codeKey = k, codeValue = val, codeTTL = Timeout (fromIntegral ttl), codeConversation = cnv, - codeScope = s, codeHasPassword = isJust mPw }, mPw @@ -68,8 +64,8 @@ toCode k s (val, ttl, cnv, mPw) = -- The 'key' is a stable, truncated, base64 encoded sha256 hash of the conversation ID -- The 'value' is a base64 encoded, 120-bit random value (changing on each generation) -generate :: (MonadIO m) => ConvId -> Scope -> Timeout -> m Code -generate cnv s t = do +generate :: (MonadIO m) => ConvId -> Timeout -> m Code +generate cnv t = do key <- mkKey cnv val <- liftIO $ Value . unsafeRange . Ascii.encodeBase64Url <$> randBytes 15 pure @@ -78,7 +74,6 @@ generate cnv s t = do codeValue = val, codeConversation = cnv, codeTTL = t, - codeScope = s, codeHasPassword = False } diff --git a/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs b/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs new file mode 100644 index 0000000000..112b61a91e --- /dev/null +++ b/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs @@ -0,0 +1,57 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.CodeStore.DualWrite + ( interpretCodeStoreToCassandraAndPostgres, + ) +where + +import Cassandra (ClientState) +import Data.Misc +import Imports +import Polysemy +import Polysemy.Input +import Wire.CodeStore (CodeStore (..)) +import Wire.CodeStore qualified as CodeStore +import Wire.CodeStore.Cassandra qualified as Cassandra +import Wire.CodeStore.Postgres qualified as Postgres +import Wire.Postgres (PGConstraints) + +interpretCodeStoreToCassandraAndPostgres :: + ( Member (Input ClientState) r, + Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r, + PGConstraints r + ) => + Sem (CodeStore ': r) a -> + Sem r a + +-- | Cassandra is the source of truth during migration; writes are mirrored to Postgres. +interpretCodeStoreToCassandraAndPostgres = interpret $ \case + GetCode k -> do + Cassandra.interpretCodeStoreToCassandra $ CodeStore.getCode k + CreateCode code mPw -> do + Cassandra.interpretCodeStoreToCassandra $ CodeStore.createCode code mPw + Postgres.interpretCodeStoreToPostgres $ CodeStore.createCode code mPw + DeleteCode k -> do + Cassandra.interpretCodeStoreToCassandra $ CodeStore.deleteCode k + Postgres.interpretCodeStoreToPostgres $ CodeStore.deleteCode k + MakeKey cid -> do + Cassandra.interpretCodeStoreToCassandra $ CodeStore.makeKey cid + GenerateCode cid t -> do + Cassandra.interpretCodeStoreToCassandra $ CodeStore.generateCode cid t + GetConversationCodeURI mbHost -> do + Cassandra.interpretCodeStoreToCassandra $ CodeStore.getConversationCodeURI mbHost diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs b/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs new file mode 100644 index 0000000000..d260e9cb02 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/CodeStore/Migration.hs @@ -0,0 +1,135 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.CodeStore.Migration + ( MigrationOptions (..), + migrateCodesLoop, + ) +where + +import Cassandra hiding (Value) +import Data.Code (Key, Value) +import Data.Conduit +import Data.Conduit.List qualified as C +import Data.Id (ConvId) +import Data.Misc (HttpsUrl) +import Hasql.Pool qualified as Hasql +import Imports +import Polysemy +import Polysemy.Error +import Polysemy.Input +import Polysemy.State +import Polysemy.TinyLog +import Prometheus qualified +import System.Logger qualified as Log +import Wire.API.Password +import Wire.CodeStore +import Wire.CodeStore.Cassandra.Queries qualified as Cql +import Wire.CodeStore.Code +import Wire.CodeStore.Postgres qualified as Postgres +import Wire.Migration +import Wire.Postgres +import Wire.Sem.Logger (mapLogger) +import Wire.Sem.Logger.TinyLog (loggerToTinyLog) + +type EffectStack = + [ State Int, + Input ClientState, + Input Hasql.Pool, + Input (Either HttpsUrl (Map Text HttpsUrl)), + TinyLog, + Embed IO, + Final IO + ] + +migrateCodesLoop :: + MigrationOptions -> + ClientState -> + Hasql.Pool -> + Log.Logger -> + Prometheus.Counter -> + Prometheus.Counter -> + Prometheus.Counter -> + IO () +migrateCodesLoop migOpts cassClient pgPool logger migCounter migFinished migFailed = + migrationLoop + logger + "conversation codes" + migFinished + migFailed + (interpreter cassClient pgPool logger "conversation codes") + (migrateAllCodes migOpts migCounter) + +interpreter :: ClientState -> Hasql.Pool -> Log.Logger -> ByteString -> Sem EffectStack a -> IO (Int, a) +interpreter cassClient pgPool logger name = + runFinal + . embedToFinal + . loggerToTinyLog logger + . mapLogger (Log.field "migration" (Log.val name) .) + . raiseUnder + . runInputConst (Right mempty) + . runInputConst pgPool + . runInputConst cassClient + . runState 0 + +migrateAllCodes :: + ( Member (Input Hasql.Pool) r, + Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r, + Member (Embed IO) r, + Member (Input ClientState) r, + Member TinyLog r, + Member (State Int) r + ) => + MigrationOptions -> + Prometheus.Counter -> + ConduitM () Void (Sem r) () +migrateAllCodes migOpts migCounter = do + lift $ info $ Log.msg (Log.val "migrateAllCodes") + withCount (paginateSem Cql.selectAllCodes (paramsP LocalQuorum () migOpts.pageSize) x5) + .| logRetrievedPage migOpts.pageSize id + .| C.mapM_ (traverse_ (handleErrors (migrateCodeRow migCounter))) + +handleErrors :: + ( Member (State Int) r, + Member TinyLog r + ) => + ((Key, Value, Int32, ConvId, Maybe Password) -> Sem (Error Hasql.UsageError : r) ()) -> + (Key, Value, Int32, ConvId, Maybe Password) -> + Sem r () +handleErrors action row@(k, _, _, _, _) = do + eithErr <- runError (action row) + case eithErr of + Right _ -> pure () + Left e -> do + warn $ + Log.msg (Log.val "error occurred during migration") + . Log.field "key" (show k) + . Log.field "error" (show e) + modify (+ 1) + +migrateCodeRow :: + ( Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r, + PGConstraints r + ) => + Prometheus.Counter -> + (Key, Value, Int32, ConvId, Maybe Password) -> + Sem r () +migrateCodeRow migCounter (k, v, ttl, cnv, mPw) = + when (ttl > 0) $ do + let (code, _) = toCode k (v, ttl, cnv, mPw) + Postgres.interpretCodeStoreToPostgres $ createCode code mPw + liftIO $ Prometheus.incCounter migCounter diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Postgres.hs b/libs/wire-subsystems/src/Wire/CodeStore/Postgres.hs new file mode 100644 index 0000000000..0126216d79 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/CodeStore/Postgres.hs @@ -0,0 +1,111 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.CodeStore.Postgres + ( interpretCodeStoreToPostgres, + ) +where + +import Data.Code +import Data.Id +import Data.Map qualified as Map +import Data.Misc (HttpsUrl) +import Hasql.Statement qualified as Hasql +import Hasql.TH +import Imports +import Polysemy +import Polysemy.Input +import Wire.API.Password +import Wire.API.PostgresMarshall +import Wire.CodeStore (CodeStore (..)) +import Wire.CodeStore.Code as Code +import Wire.Postgres + +interpretCodeStoreToPostgres :: + ( PGConstraints r, + Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r + ) => + Sem (CodeStore ': r) a -> + Sem r a +interpretCodeStoreToPostgres = interpret $ \case + GetCode k -> do + lookupCode k + CreateCode code mPw -> do + insertCode code mPw + DeleteCode k -> do + deleteCode k + MakeKey cid -> do + Code.mkKey cid + GenerateCode cid t -> do + Code.generate cid t + GetConversationCodeURI mbHost -> do + convCodeURI <- input + pure $ case convCodeURI of + Left uri -> Just uri + Right map' -> mbHost >>= flip Map.lookup map' + +insertCode :: (PGConstraints r) => Code -> Maybe Password -> Sem r () +insertCode c password = do + runStatement (codeKey c, codeConversation c, password, codeValue c, round (codeTTL c)) insert + where + insert :: Hasql.Statement (Key, ConvId, Maybe Password, Value, Int32) () + insert = + lmapPG + [resultlessStatement|INSERT INTO conversation_codes + (key, conversation, password, value, expires_at) + VALUES + ($1 :: text, $2 :: uuid, $3 :: bytea?, $4 :: text, now() + make_interval(secs => $5 :: int)) + ON CONFLICT (key) DO UPDATE + SET conversation = ($2 :: uuid), + password = ($3 :: bytea?), + value = ($4 :: text), + expires_at = now() + make_interval(secs => $5 :: int) + |] + +lookupCode :: (PGConstraints r) => Key -> Sem r (Maybe (Code, Maybe Password)) +lookupCode k = do + mRow <- runStatement k selectCode + pure $ fmap (toCode k) mRow + where + selectCode :: Hasql.Statement Key (Maybe (Value, Int32, ConvId, Maybe Password)) + selectCode = + dimapPG + -- on the extraction of the remaining seconds of the TTL + -- `expires_at - now()` produces an interval representing how much time is left + -- `EXTRACT(EPOCH FROM interval)` converts that interval to seconds (a double precision) + -- `FLOOR(...)` truncates fractional seconds + -- `GREATEST(0, ...)` clamps negatives to 0 (expired rows) + -- `::int4` casts to 32‑bit integer. + [maybeStatement|SELECT + value :: text, + GREATEST(0, FLOOR(EXTRACT(EPOCH FROM (expires_at - now()))))::int4 AS ttl_secs, + conversation :: uuid, + password :: bytea? + FROM conversation_codes + WHERE key = ($1 :: text) AND expires_at > now () + |] + +deleteCode :: (PGConstraints r) => Key -> Sem r () +deleteCode k = + runStatement k delete + where + delete :: Hasql.Statement Key () + delete = + lmapPG + [resultlessStatement|DELETE FROM conversation_codes + WHERE key = ($1 :: text) + |] diff --git a/libs/wire-subsystems/src/Wire/CodeStore/Scope.hs b/libs/wire-subsystems/src/Wire/CodeStore/Scope.hs deleted file mode 100644 index b48dc6e042..0000000000 --- a/libs/wire-subsystems/src/Wire/CodeStore/Scope.hs +++ /dev/null @@ -1,32 +0,0 @@ --- This file is part of the Wire Server implementation. --- --- Copyright (C) 2026 Wire Swiss GmbH --- --- This program is free software: you can redistribute it and/or modify it under --- the terms of the GNU Affero General Public License as published by the Free --- Software Foundation, either version 3 of the License, or (at your option) any --- later version. --- --- This program is distributed in the hope that it will be useful, but WITHOUT --- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS --- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more --- details. --- --- You should have received a copy of the GNU Affero General Public License along --- with this program. If not, see . - -module Wire.CodeStore.Scope where - -import Cassandra hiding (Value) -import Imports - -data Scope = ReusableCode - deriving (Eq, Show, Generic) - -instance Cql Scope where - ctype = Tagged IntColumn - - toCql ReusableCode = CqlInt 1 - - fromCql (CqlInt 1) = pure ReusableCode - fromCql _ = Left "unknown Scope" diff --git a/libs/wire-subsystems/src/Wire/ConversationStore.hs b/libs/wire-subsystems/src/Wire/ConversationStore.hs index 2a529d40cd..9b7b4b97e2 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore.hs @@ -224,13 +224,16 @@ instance FromJSON StorageLocation where x -> fail $ "Invalid storage location: " <> Text.unpack x <> ". Valid options: cassandra, postgresql, migration-to-postgresql" data PostgresMigrationOpts = PostgresMigrationOpts - { conversation :: StorageLocation + { conversation :: StorageLocation, + conversationCodes :: StorageLocation } deriving (Show) instance FromJSON PostgresMigrationOpts where parseJSON = withObject "PostgresMigrationOpts" $ \o -> - PostgresMigrationOpts <$> o .: "conversation" + PostgresMigrationOpts + <$> o .: "conversation" + <*> o .: "conversationCodes" getConvOrSubGroupInfo :: (Member ConversationStore r) => diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs index cfc60b0331..a3bf288c23 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs @@ -22,9 +22,7 @@ module Wire.ConversationStore.Migration where import Cassandra import Cassandra.Settings hiding (pageSize) import Control.Error (lastMay) -import Data.Aeson (FromJSON) import Data.Conduit -import Data.Conduit.Internal (zipSources) import Data.Conduit.List qualified as C import Data.Domain import Data.Id @@ -37,7 +35,6 @@ import Data.Time.Calendar.OrdinalDate (fromOrdinalDate) import Data.Tuple.Extra import Data.Vector (Vector) import Data.Vector qualified as Vector -import GHC.Generics (Generically (..)) import Hasql.Pool qualified as Hasql import Hasql.Statement qualified as Hasql import Hasql.TH @@ -54,7 +51,6 @@ import Polysemy.Time import Polysemy.TinyLog import Prometheus qualified import System.Logger qualified as Log -import UnliftIO.Exception qualified as UnliftIO import Wire.API.Conversation hiding (Member) import Wire.API.Conversation.CellsState import Wire.API.Conversation.Protocol @@ -72,6 +68,7 @@ import Wire.ConversationStore.MLS.Types import Wire.ConversationStore.Migration.Cleanup import Wire.ConversationStore.Migration.Types import Wire.ConversationStore.MigrationLock +import Wire.Migration import Wire.Postgres import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (..), unsafePooledMapConcurrentlyN_) import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) @@ -79,62 +76,56 @@ import Wire.Sem.Logger (mapLogger) import Wire.Sem.Logger.TinyLog (loggerToTinyLog) import Wire.Sem.Paging.Cassandra import Wire.StoredConversation -import Wire.Util -- * Top level logic -type EffectStack = [State Int, Input ClientState, Input Hasql.Pool, Async, Race, TinyLog, Embed IO, Concurrency 'Unsafe, Final IO] - -data MigrationOptions = MigrationOptions - { pageSize :: Int32, - parallelism :: Int - } - deriving (Show, Eq, Generic) - deriving (FromJSON) via Generically MigrationOptions - -migrateConvsLoop :: MigrationOptions -> ClientState -> Hasql.Pool -> Log.Logger -> Prometheus.Counter -> Prometheus.Counter -> Prometheus.Counter -> IO () +type EffectStack = + [ State Int, + Input ClientState, + Input Hasql.Pool, + Async, + Race, + TinyLog, + Embed IO, + Concurrency 'Unsafe, + Final IO + ] + +migrateConvsLoop :: + MigrationOptions -> + ClientState -> + Hasql.Pool -> + Log.Logger -> + Prometheus.Counter -> + Prometheus.Counter -> + Prometheus.Counter -> + IO () migrateConvsLoop migOpts cassClient pgPool logger migCounter migFinished migFailed = - migrationLoop cassClient pgPool logger "conversations" migFinished migFailed $ migrateAllConversations migOpts migCounter - -migrateUsersLoop :: MigrationOptions -> ClientState -> Hasql.Pool -> Log.Logger -> Prometheus.Counter -> Prometheus.Counter -> Prometheus.Counter -> IO () + migrationLoop + logger + "conversations" + migFinished + migFailed + (interpreter cassClient pgPool logger "conversations") + (migrateAllConversations migOpts migCounter) + +migrateUsersLoop :: + MigrationOptions -> + ClientState -> + Hasql.Pool -> + Log.Logger -> + Prometheus.Counter -> + Prometheus.Counter -> + Prometheus.Counter -> + IO () migrateUsersLoop migOpts cassClient pgPool logger migCounter migFinished migFailed = - migrationLoop cassClient pgPool logger "users" migFinished migFailed $ migrateAllUsers migOpts migCounter - -migrationLoop :: ClientState -> Hasql.Pool -> Log.Logger -> ByteString -> Prometheus.Counter -> Prometheus.Counter -> ConduitT () Void (Sem EffectStack) () -> IO () -migrationLoop cassClient pgPool logger name migFinished migFailed migration = do - go 0 `UnliftIO.catch` handleIOError - where - handleIOError :: SomeException -> IO () - handleIOError exc = do - Prometheus.incCounter migFailed - Log.err logger $ - Log.msg (Log.val "migration failed, it won't restart unless the background-worker is restarted.") - . Log.field "migration" name - . Log.field "error" (displayException exc) - UnliftIO.throwIO exc - - go :: Int -> IO () - go nIter = do - runMigration >>= \case - 0 -> do - Log.info logger $ - Log.msg (Log.val "finished migration") - . Log.field "attempt" nIter - . Log.field "migration" name - Prometheus.incCounter migFinished - n -> do - Log.info logger $ - Log.msg (Log.val "finished migration with errors") - . Log.field "migration" name - . Log.field "errors" n - . Log.field "attempt" nIter - go (nIter + 1) - - runMigration :: IO Int - runMigration = - fmap fst - . interpreter cassClient pgPool logger name - $ runConduit migration + migrationLoop + logger + "users" + migFinished + migFailed + (interpreter cassClient pgPool logger "users") + (migrateAllUsers migOpts migCounter) interpreter :: ClientState -> Hasql.Pool -> Log.Logger -> ByteString -> Sem EffectStack a -> IO (Int, a) interpreter cassClient pgPool logger name = @@ -166,7 +157,7 @@ migrateAllConversations :: migrateAllConversations migOpts migCounter = do lift $ info $ Log.msg (Log.val "migrateAllConversations") withCount (paginateSem select (paramsP LocalQuorum () migOpts.pageSize) x5) - .| logRetrievedPage migOpts.pageSize + .| logRetrievedPage migOpts.pageSize runIdentity .| C.mapM_ (unsafePooledMapConcurrentlyN_ migOpts.parallelism (handleErrors (migrateConversation migCounter) "conv")) where select :: PrepQuery R () (Identity ConvId) @@ -188,24 +179,12 @@ migrateAllUsers :: migrateAllUsers migOpts migCounter = do lift $ info $ Log.msg (Log.val "migrateAllUsers") withCount (paginateSem select (paramsP LocalQuorum () migOpts.pageSize) x5) - .| logRetrievedPage migOpts.pageSize + .| logRetrievedPage migOpts.pageSize runIdentity .| C.mapM_ (unsafePooledMapConcurrentlyN_ migOpts.parallelism (handleErrors (migrateUser migCounter) "user")) where select :: PrepQuery R () (Identity UserId) select = "select distinct user from user_remote_conv" -logRetrievedPage :: (Member TinyLog r) => Int32 -> ConduitM (Int32, [Identity (Id a)]) [Id a] (Sem r) () -logRetrievedPage pageSize = - C.mapM - ( \(i, rows) -> do - let estimatedRowsSoFar = (i - 1) * pageSize + fromIntegral (length rows) - info $ Log.msg (Log.val "retrieved page") . Log.field "estimatedRowsSoFar" estimatedRowsSoFar - pure $ map runIdentity rows - ) - -withCount :: (Monad m) => ConduitM () [a] m () -> ConduitM () (Int32, [a]) m () -withCount = zipSources (C.sourceList [1 ..]) - handleErrors :: (Member (State Int) r, Member TinyLog r) => (Id a -> Sem (Error MigrationLockError : Error Hasql.UsageError : r) b) -> ByteString -> Id a -> Sem r (Maybe b) handleErrors action lockType id_ = join <$> handleError (handleError action lockType) lockType id_ @@ -528,24 +507,3 @@ unzip9 [] = ([], [], [], [], [], [], [], [], []) unzip9 ((y1, y2, y3, y4, y5, y6, y7, y8, y9) : ys) = let (l1, l2, l3, l4, l5, l6, l7, l8, l9) = unzip9 ys in (y1 : l1, y2 : l2, y3 : l3, y4 : l4, y5 : l5, y6 : l6, y7 : l7, y8 : l8, y9 : l9) - -paginateSem :: forall a b q r. (Tuple a, Tuple b, RunQ q, Member (Input ClientState) r, Member TinyLog r, Member (Embed IO) r) => q R a b -> QueryParams a -> RetrySettings -> ConduitT () [b] (Sem r) () -paginateSem q p r = do - go =<< lift getFirstPage - where - go page = do - lift $ info $ Log.msg (Log.val "Got a page") - unless (null (result page)) $ - yield (result page) - when (hasMore page) $ - go =<< lift (getNextPage page) - - getFirstPage :: Sem r (Page b) - getFirstPage = do - client <- input - embedClient client $ retry r (paginate q p) - - getNextPage :: Page b -> Sem r (Page b) - getNextPage page = do - client <- input - embedClient client $ retry r (nextPage page) diff --git a/libs/wire-subsystems/src/Wire/Migration.hs b/libs/wire-subsystems/src/Wire/Migration.hs new file mode 100644 index 0000000000..3a1d6503d3 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/Migration.hs @@ -0,0 +1,129 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.Migration where + +import Cassandra +import Cassandra.Settings +import Data.Aeson +import Data.Conduit +import Data.Conduit.Internal (zipSources) +import Data.Conduit.List qualified as C +import GHC.Generics (Generically (..)) +import Imports +import Polysemy +import Polysemy.Input +import Polysemy.TinyLog +import Prometheus qualified +import System.Logger qualified as Log +import UnliftIO qualified +import Wire.Util (embedClient) + +data MigrationOptions = MigrationOptions + { pageSize :: Int32, + parallelism :: Int + } + deriving (Show, Eq, Generic) + deriving (FromJSON) via Generically MigrationOptions + +migrationLoop :: + Log.Logger -> + ByteString -> + Prometheus.Counter -> + Prometheus.Counter -> + (Sem r () -> IO (Int, a)) -> + ConduitT () Void (Sem r) () -> + IO () +migrationLoop logger name migFinished migFailed interpreter migration = do + go 0 `UnliftIO.catch` handleIOError + where + handleIOError :: SomeException -> IO () + handleIOError exc = do + Prometheus.incCounter migFailed + Log.err logger $ + Log.msg (Log.val "migration failed, it won't restart unless the background-worker is restarted.") + . Log.field "migration" name + . Log.field "error" (displayException exc) + UnliftIO.throwIO exc + + go :: Int -> IO () + go nIter = do + runMigration >>= \case + 0 -> do + Log.info logger $ + Log.msg (Log.val "finished migration") + . Log.field "attempt" nIter + . Log.field "migration" name + Prometheus.incCounter migFinished + n -> do + Log.info logger $ + Log.msg (Log.val "finished migration with errors") + . Log.field "migration" name + . Log.field "errors" n + . Log.field "attempt" nIter + go (nIter + 1) + + runMigration :: IO Int + runMigration = + fmap fst + . interpreter + $ runConduit migration + +logRetrievedPage :: (Member TinyLog r) => Int32 -> (a -> b) -> ConduitM (Int32, [a]) [b] (Sem r) () +logRetrievedPage pageSize toRow = + C.mapM + ( \(i, rows) -> do + let estimatedRowsSoFar = (i - 1) * pageSize + fromIntegral (length rows) + info $ Log.msg (Log.val "retrieved page") . Log.field "estimatedRowsSoFar" estimatedRowsSoFar + pure $ map toRow rows + ) + +withCount :: (Monad m) => ConduitM () [a] m () -> ConduitM () (Int32, [a]) m () +withCount = zipSources (C.sourceList [1 ..]) + +paginateSem :: + forall a b q r. + ( Tuple a, + Tuple b, + RunQ q, + Member (Input ClientState) r, + Member TinyLog r, + Member (Embed IO) r + ) => + q R a b -> + QueryParams a -> + RetrySettings -> + ConduitT () [b] (Sem r) () +paginateSem q p r = do + go =<< lift getFirstPage + where + go page = do + lift $ info $ Log.msg (Log.val "got a page") + unless (null (result page)) $ + yield (result page) + when (hasMore page) $ + go =<< lift (getNextPage page) + + getFirstPage :: Sem r (Page b) + getFirstPage = do + client <- input + embedClient client $ retry r (paginate q p) + + getNextPage :: Page b -> Sem r (Page b) + getNextPage page = do + client <- input + embedClient client $ retry r (nextPage page) diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index 5b29d71a86..090fe8d5bf 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -228,7 +228,9 @@ library Wire.CodeStore.Cassandra Wire.CodeStore.Cassandra.Queries Wire.CodeStore.Code - Wire.CodeStore.Scope + Wire.CodeStore.DualWrite + Wire.CodeStore.Migration + Wire.CodeStore.Postgres Wire.ConversationStore Wire.ConversationStore.Cassandra Wire.ConversationStore.Cassandra.Instances @@ -294,6 +296,7 @@ library Wire.LegalHoldStore.Cassandra.Queries Wire.LegalHoldStore.Env Wire.ListItems + Wire.Migration Wire.NotificationSubsystem Wire.NotificationSubsystem.Interpreter Wire.PaginationState diff --git a/postgres-schema.sql b/postgres-schema.sql index 8cfc62bd5d..a0f4b619b3 100644 --- a/postgres-schema.sql +++ b/postgres-schema.sql @@ -9,8 +9,8 @@ \restrict 79bbfb4630959c48307653a5cd3d83f2582b3c2210f75f10d79e3ebf0015620 --- Dumped from database version 17.7 --- Dumped by pg_dump version 17.7 +-- Dumped from database version 17.6 +-- Dumped by pg_dump version 17.6 SET statement_timeout = 0; SET lock_timeout = 0; @@ -102,6 +102,21 @@ CREATE TABLE public.conversation ( ALTER TABLE public.conversation OWNER TO "wire-server"; +-- +-- Name: conversation_codes; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.conversation_codes ( + key text NOT NULL, + conversation uuid NOT NULL, + password bytea, + value text NOT NULL, + expires_at timestamp with time zone NOT NULL +); + + +ALTER TABLE public.conversation_codes OWNER TO "wire-server"; + -- -- Name: conversation_member; Type: TABLE; Schema: public; Owner: wire-server -- @@ -282,6 +297,14 @@ ALTER TABLE ONLY public.collaborators ADD CONSTRAINT collaborators_pkey PRIMARY KEY (user_id, team_id); +-- +-- Name: conversation_codes conversation_codes_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.conversation_codes + ADD CONSTRAINT conversation_codes_pkey PRIMARY KEY (key); + + -- -- Name: conversation_member conversation_member_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server -- @@ -392,6 +415,20 @@ CREATE INDEX collaborators_team_id_idx ON public.collaborators USING btree (team CREATE INDEX collaborators_user_id_idx ON public.collaborators USING btree (user_id); +-- +-- Name: conversation_codes_expires_at_idx; Type: INDEX; Schema: public; Owner: wire-server +-- + +CREATE INDEX conversation_codes_expires_at_idx ON public.conversation_codes USING btree (expires_at); + + +-- +-- Name: conversation_codes_key_expires_at_idx; Type: INDEX; Schema: public; Owner: wire-server +-- + +CREATE INDEX conversation_codes_key_expires_at_idx ON public.conversation_codes USING btree (key, expires_at); + + -- -- Name: conversation_member_user_idx; Type: INDEX; Schema: public; Owner: wire-server -- diff --git a/services/background-worker/background-worker.cabal b/services/background-worker/background-worker.cabal index 891e864d63..595e3d01ea 100644 --- a/services/background-worker/background-worker.cabal +++ b/services/background-worker/background-worker.cabal @@ -21,7 +21,7 @@ library Wire.BackgroundWorker.Options Wire.BackgroundWorker.Util Wire.DeadUserNotificationWatcher - Wire.MigrateConversations + Wire.PostgresMigrations hs-source-dirs: src default-language: GHC2021 diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index 4ee7abbe10..db49fb502d 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -68,6 +68,7 @@ migrateConversations: false migrateConversationsOptions: pageSize: 10000 parallelism: 2 +migrateConversationCodes: false # Background jobs consumer configuration for integration backgroundJobs: @@ -77,3 +78,4 @@ backgroundJobs: postgresMigration: conversation: postgresql + conversationCodes: postgresql diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index c30c1d809a..ab20dd7c8b 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -34,7 +34,8 @@ import Wire.BackgroundWorker.Health qualified as Health import Wire.BackgroundWorker.Jobs.Consumer qualified as Jobs import Wire.BackgroundWorker.Options import Wire.DeadUserNotificationWatcher qualified as DeadUserNotificationWatcher -import Wire.MigrateConversations qualified as MigrateConversations +import Wire.Migration +import Wire.PostgresMigrations qualified as Migrations run :: Opts -> IO () run opts = do @@ -53,7 +54,14 @@ run opts = do then runAppT env $ withNamedLogger "migrate-conversations" $ - MigrateConversations.startWorker opts.migrateConversationsOptions + Migrations.conversations opts.migrateConversationsOptions + else pure $ pure () + cleanUpConvCodesMigration <- + if opts.migrateConversationCodes + then + runAppT env $ + withNamedLogger "migrate-conversation-codes" $ + Migrations.conversationCodes (MigrationOptions 1000 1) else pure $ pure () cleanupJobs <- runAppT env $ @@ -61,10 +69,11 @@ run opts = do Jobs.startWorker amqpEP let cleanup = void . runConcurrently $ - (,,,) + (,,,,) <$> Concurrently cleanupDeadUserNotifWatcher <*> Concurrently cleanupBackendNotifPusher <*> Concurrently cleanupConvMigration + <*> Concurrently cleanUpConvCodesMigration <*> Concurrently cleanupJobs let server = defaultServer (T.unpack opts.backgroundWorker.host) opts.backgroundWorker.port env.logger diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index 48cc531b58..6dc18f03a2 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -28,7 +28,7 @@ import Network.AMQP.Extended import System.Logger.Extended import Util.Options import Wire.ConversationStore (PostgresMigrationOpts) -import Wire.ConversationStore.Migration (MigrationOptions) +import Wire.Migration (MigrationOptions) data Opts = Opts { logLevel :: !Level, @@ -52,6 +52,7 @@ data Opts = Opts postgresMigration :: !PostgresMigrationOpts, migrateConversations :: !Bool, migrateConversationsOptions :: !MigrationOptions, + migrateConversationCodes :: !Bool, backgroundJobs :: BackgroundJobsConfig, federationDomain :: Domain } diff --git a/services/background-worker/src/Wire/MigrateConversations.hs b/services/background-worker/src/Wire/PostgresMigrations.hs similarity index 69% rename from services/background-worker/src/Wire/MigrateConversations.hs rename to services/background-worker/src/Wire/PostgresMigrations.hs index 75587e1ae5..a4c1cbff2d 100644 --- a/services/background-worker/src/Wire/MigrateConversations.hs +++ b/services/background-worker/src/Wire/PostgresMigrations.hs @@ -15,7 +15,7 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.MigrateConversations where +module Wire.PostgresMigrations where import Imports import Prometheus @@ -23,10 +23,11 @@ import System.Logger qualified as Log import UnliftIO import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Util +import Wire.CodeStore.Migration import Wire.ConversationStore.Migration -startWorker :: MigrationOptions -> AppT IO CleanupAction -startWorker migOpts = do +conversations :: MigrationOptions -> AppT IO CleanupAction +conversations migOpts = do cassClient <- asks (.cassandraGalley) pgPool <- asks (.hasqlPool) logger <- asks (.logger) @@ -47,3 +48,20 @@ startWorker migOpts = do Log.info logger $ Log.msg (Log.val "cancelling conversation migration") cancel convLoop cancel userLoop + +conversationCodes :: MigrationOptions -> AppT IO CleanupAction +conversationCodes migOpts = do + cassClient <- asks (.cassandraGalley) + pgPool <- asks (.hasqlPool) + logger <- asks (.logger) + Log.info logger $ Log.msg (Log.val "starting conversation codes migration") + count <- register $ counter $ Prometheus.Info "wire_conv_codes_migrated_to_pg" "Number of conversation codes migrated to Postgresql" + finished <- register $ counter $ Prometheus.Info "wire_conv_codes_migration_finished" "Whether the conversation codes migration to Postgresql is finished successfully" + failed <- register $ counter $ Prometheus.Info "wire_conv_codes_migration_failed" "Whether the conversation codes migration to Postgresql has failed" + + migrationLoop <- async . lift $ migrateCodesLoop migOpts cassClient pgPool logger count finished failed + + Log.info logger $ Log.msg (Log.val "started conversation codes migration") + pure $ do + Log.info logger $ Log.msg (Log.val "cancelling conversation codes migration") + cancel migrationLoop diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index 04307949b2..e10ab43123 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -361,7 +361,11 @@ spec = do amqpJobsPublisherChannel = undefined amqpBackendNotificationsChannel = undefined federationDomain = Domain "local" - postgresMigration = PostgresMigrationOpts CassandraStorage + postgresMigration = + PostgresMigrationOpts + { conversation = CassandraStorage, + conversationCodes = CassandraStorage + } gundeckEndpoint = undefined brigEndpoint = undefined @@ -395,7 +399,11 @@ spec = do amqpJobsPublisherChannel = undefined amqpBackendNotificationsChannel = undefined federationDomain = Domain "local" - postgresMigration = PostgresMigrationOpts CassandraStorage + postgresMigration = + PostgresMigrationOpts + { conversation = CassandraStorage, + conversationCodes = CassandraStorage + } gundeckEndpoint = undefined brigEndpoint = undefined backendNotificationMetrics <- mkBackendNotificationMetrics diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index f7f24a54f0..cdb020a222 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -39,7 +39,11 @@ testEnv = do let cassandra = undefined cassandraGalley = undefined cassandraBrig = undefined - postgresMigration = PostgresMigrationOpts CassandraStorage + postgresMigration = + PostgresMigrationOpts + { conversation = CassandraStorage, + conversationCodes = CassandraStorage + } statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics workerRunningGauge <- mkWorkerRunningGauge diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index f4dce07f1b..e2106c63e6 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -247,4 +247,4 @@ journal: # if set, journals; if not set, disables journaling postgresMigration: conversation: postgresql - # conversation: cassandra + conversationCodes: postgresql diff --git a/services/galley/src/Galley/API/Action.hs b/services/galley/src/Galley/API/Action.hs index 7f40e70076..10474662d9 100644 --- a/services/galley/src/Galley/API/Action.hs +++ b/services/galley/src/Galley/API/Action.hs @@ -115,7 +115,6 @@ import Wire.API.User as User import Wire.BrigAPIAccess qualified as E import Wire.CodeStore import Wire.CodeStore qualified as E -import Wire.CodeStore.Scope (Scope (ReusableCode)) import Wire.ConversationStore qualified as E import Wire.ConversationSubsystem import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemConfig (..)) @@ -560,7 +559,7 @@ performAction tag origUser lconv action = do deleteGroup gidParent key <- E.makeKey (tUnqualified lcnv) - E.deleteCode key ReusableCode + E.deleteCode key case convTeam storedConv of Nothing -> E.deleteConversation (tUnqualified lcnv) Just tid -> E.deleteTeamConversation tid (tUnqualified lcnv) @@ -800,7 +799,7 @@ performConversationAccessData qusr lconv action = do ) $ do key <- E.makeKey (tUnqualified lcnv) - E.deleteCode key ReusableCode + E.deleteCode key -- Determine bots and members to be removed let filterBotsAndMembers = diff --git a/services/galley/src/Galley/API/Update.hs b/services/galley/src/Galley/API/Update.hs index 8596392cef..fd4e4606b8 100644 --- a/services/galley/src/Galley/API/Update.hs +++ b/services/galley/src/Galley/API/Update.hs @@ -605,10 +605,10 @@ addCode lusr mbZHost mZcon lcnv mReq = do ensureGuestsOrNonTeamMembersAllowed conv convUri <- getConversationCodeURI mbZHost key <- E.makeKey (tUnqualified lcnv) - E.getCode key ReusableCode >>= \case + E.getCode key >>= \case Nothing -> do ttl <- realToFrac . unGuestLinkTTLSeconds . fromMaybe defGuestLinkTTLSeconds . view (settings . guestLinkTTLSeconds) <$> input - code <- E.generateCode (tUnqualified lcnv) ReusableCode (Timeout ttl) + code <- E.generateCode (tUnqualified lcnv) (Timeout ttl) mPw <- for (mReq >>= (.password)) $ HashPassword.hashPassword8 (RateLimitUser (tUnqualified lusr)) E.createCode code mPw now <- Now.get @@ -670,7 +670,7 @@ rmCode lusr zcon lcnv = do ensureAccess conv CodeAccess let (bots, users) = localBotsAndUsers $ conv.localMembers key <- E.makeKey (tUnqualified lcnv) - E.deleteCode key ReusableCode + E.deleteCode key now <- Now.get let event = Event (tUntagged lcnv) Nothing (EventFromUser (tUntagged lusr)) now Nothing EdConvCodeDelete pushConversationEvent (Just zcon) conv event (qualifyAs lusr (map (.id_) users)) bots @@ -697,7 +697,7 @@ getCode mbZHost lusr cnv = do ensureAccess conv CodeAccess ensureConvMember (conv.localMembers) (tUnqualified lusr) key <- E.makeKey cnv - (c, mPw) <- E.getCode key ReusableCode >>= noteS @'CodeNotFound + (c, mPw) <- E.getCode key >>= noteS @'CodeNotFound convUri <- getConversationCodeURI mbZHost pure $ mkConversationCodeInfo (isJust mPw) (codeKey c) (codeValue c) convUri diff --git a/services/galley/src/Galley/API/Util.hs b/services/galley/src/Galley/API/Util.hs index a13c62c15d..d8a1143b9e 100644 --- a/services/galley/src/Galley/API/Util.hs +++ b/services/galley/src/Galley/API/Util.hs @@ -750,7 +750,7 @@ verifyReusableCode :: Sem r DataTypes.Code verifyReusableCode rateLimitKey checkPw mPtpw convCode = do (c, mPw) <- - getCode (conversationKey convCode) DataTypes.ReusableCode + getCode (conversationKey convCode) >>= noteS @'CodeNotFound unless (DataTypes.codeValue c == conversationCode convCode) $ throwS @'CodeNotFound diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index 1c790dd3af..6d1a34867a 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -108,6 +108,8 @@ import Wire.AWS qualified as Aws import Wire.BackendNotificationQueueAccess.RabbitMq qualified as BackendNotificationQueueAccess import Wire.BrigAPIAccess.Rpc import Wire.CodeStore.Cassandra +import Wire.CodeStore.DualWrite +import Wire.CodeStore.Postgres import Wire.ConversationStore.Cassandra import Wire.ConversationStore.Postgres import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemConfig (..), interpretConversationSubsystem) @@ -289,6 +291,11 @@ evalGalley e = CassandraStorage -> interpretConversationStoreToCassandra (e ^. cstate) MigrationToPostgresql -> interpretConversationStoreToCassandraAndPostgres (e ^. cstate) PostgresqlStorage -> interpretConversationStoreToPostgres + convCodesStoreInterpreter = + case (e ^. options . postgresMigration).conversationCodes of + CassandraStorage -> interpretCodeStoreToCassandra + MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres + PostgresqlStorage -> interpretCodeStoreToPostgres localUnit = toLocalUnsafe (e ^. options . settings . federationDomain) () teamSubsystemConfig = TeamSubsystemConfig @@ -375,7 +382,7 @@ evalGalley e = . runHashPassword e._options._settings._passwordHashingOptions . interpretRateLimit e._passwordHashingRateLimitEnv . interpretProposalStoreToCassandra - . interpretCodeStoreToCassandra + . convCodesStoreInterpreter . interpretClientStoreToCassandra . interpretTeamCollaboratorsStoreToPostgres . interpretFireAndForget