diff --git a/changelog.d/5-internal/WPB-22959 b/changelog.d/5-internal/WPB-22959 new file mode 100644 index 0000000000..c073df6a25 --- /dev/null +++ b/changelog.d/5-internal/WPB-22959 @@ -0,0 +1,3 @@ +- Generalized the migration lock for better reuse +- Move logic from `TeamFeatureStore` interpreter to `FeatureConfigSubsystem` +(#4982, #4983) diff --git a/charts/background-worker/values.yaml b/charts/background-worker/values.yaml index 2896d749e8..0c84575d6f 100644 --- a/charts/background-worker/values.yaml +++ b/charts/background-worker/values.yaml @@ -92,6 +92,7 @@ config: postgresMigration: conversation: cassandra conversationCodes: cassandra + teamFeatures: cassandra secrets: {} diff --git a/charts/galley/values.yaml b/charts/galley/values.yaml index f4ac3331c5..6d718b75e8 100644 --- a/charts/galley/values.yaml +++ b/charts/galley/values.yaml @@ -72,6 +72,7 @@ config: postgresMigration: conversation: cassandra conversationCodes: cassandra + teamFeatures: cassandra settings: httpPoolSize: 128 maxTeamSize: 10000 diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index 92f52cbf46..5de1510167 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1812,11 +1812,13 @@ galley: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql background-worker: config: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql migrateConversations: false ``` @@ -1847,11 +1849,13 @@ pattern below applies per store. Use it for `conversation` and postgresMigration: conversation: migration-to-postgresql conversationCodes: migration-to-postgresql + teamFeatures: migration-to-postgresql background-worker: config: postgresMigration: conversation: migration-to-postgresql conversationCodes: migration-to-postgresql + teamFeatures: migration-to-postgresql migrateConversations: false migrateConversationCodes: false ``` @@ -1882,11 +1886,13 @@ pattern below applies per store. Use it for `conversation` and postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql background-worker: config: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql migrateConversations: false migrateConversationCodes: false ``` @@ -1956,6 +1962,8 @@ postgresqlPool: postgresMigration: # Valid: cassandra | migration-to-postgresql | postgresql conversation: postgresql + conversationCodes: postgresql + teamFeatures: postgresql # Start the migration worker when true migrateConversations: false @@ -1978,7 +1986,7 @@ Notes - `postgresql` values follow libpq keywords; password is sourced via `secrets.pgPassword`. - RabbitMQ admin fields (`adminHost`, `adminPort`) are templated only when `config.enableFederation` is true. -- `postgresMigration.conversation` must match `galley.config.postgresMigration.conversation` during migration phases. -- `migrateConversations: true` triggers the migration job; leave it `false` for new installs and after migration. +- `postgresMigration.` must match between `galley` and `background-worker` during migration phases. +- `migrateConversations: true` triggers the conversation migration job; leave it `false` for new installs and after migration. - `concurrency`, `jobTimeout`, and `maxAttempts` control parallelism and retry behavior of the consumer. - `brig` and `gundeck` endpoints default to in-cluster services; override via `background-worker.config.brig` and `.gundeck` if your service DNS/ports differ. diff --git a/hack/helm_vars/common.yaml.gotmpl b/hack/helm_vars/common.yaml.gotmpl index e1974ced47..7cd9bb5fac 100644 --- a/hack/helm_vars/common.yaml.gotmpl +++ b/hack/helm_vars/common.yaml.gotmpl @@ -16,6 +16,7 @@ dynBackendDomain3: dynamic-backend-3.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster {{- $preferredStore := default "cassandra" (env "PREFERRED_STORE") }} conversationStore: {{ $preferredStore }} conversationCodesStore: {{ $preferredStore }} +teamFeaturesStore: {{ $preferredStore }} {{- if (eq (env "UPLOAD_XML_S3_BASE_URL") "") }} uploadXml: {} diff --git a/hack/helm_vars/wire-server/values.yaml.gotmpl b/hack/helm_vars/wire-server/values.yaml.gotmpl index 5efdf49dbd..3745be436a 100644 --- a/hack/helm_vars/wire-server/values.yaml.gotmpl +++ b/hack/helm_vars/wire-server/values.yaml.gotmpl @@ -307,6 +307,7 @@ galley: postgresMigration: conversation: {{ .Values.conversationStore }} conversationCodes: {{ .Values.conversationCodesStore }} + teamFeatures: {{ .Values.teamFeaturesStore }} settings: maxConvAndTeamSize: 16 maxTeamSize: 32 @@ -675,6 +676,7 @@ background-worker: postgresMigration: conversation: {{ .Values.conversationStore }} conversationCodes: {{ .Values.conversationCodesStore }} + teamFeatures: {{ .Values.teamFeaturesStore }} rabbitmq: port: 5671 adminPort: 15671 diff --git a/libs/wire-api/src/Wire/API/Team/Feature.hs b/libs/wire-api/src/Wire/API/Team/Feature.hs index 37d226b9b1..d36a0a585a 100644 --- a/libs/wire-api/src/Wire/API/Team/Feature.hs +++ b/libs/wire-api/src/Wire/API/Team/Feature.hs @@ -174,6 +174,7 @@ import Test.QuickCheck.Gen (suchThat) import URI.ByteString.QQ qualified as URI.QQ import Wire.API.Conversation.Protocol import Wire.API.MLS.CipherSuite +import Wire.API.PostgresMarshall import Wire.API.Routes.Named hiding (unnamed) import Wire.API.Routes.Version import Wire.API.Routes.Versioned @@ -325,8 +326,15 @@ resolveDbFeature defFeature dbFeature = LockStatusUnlocked -> feat newtype DbConfig = DbConfig {unDbConfig :: A.Value} + deriving newtype (Arbitrary) deriving (Eq, Show) +instance PostgresMarshall A.Value DbConfig where + postgresMarshall = unDbConfig + +instance PostgresUnmarshall A.Value DbConfig where + postgresUnmarshall = Right . DbConfig + instance Default DbConfig where def = DbConfig (A.object []) @@ -629,6 +637,15 @@ instance Cass.Cql LockStatus where toCql LockStatusLocked = Cass.CqlInt 0 toCql LockStatusUnlocked = Cass.CqlInt 1 +instance PostgresMarshall Int32 LockStatus where + postgresMarshall LockStatusLocked = 0 + postgresMarshall LockStatusUnlocked = 1 + +instance PostgresUnmarshall Int32 LockStatus where + postgresUnmarshall 0 = Right LockStatusLocked + postgresUnmarshall 1 = Right LockStatusUnlocked + postgresUnmarshall _ = Left "invalid lockStatus" + newtype LockStatusResponse = LockStatusResponse {_unlockStatus :: LockStatus} deriving stock (Eq, Show, Generic) deriving (Arbitrary) via (GenericUniform LockStatus) @@ -2172,6 +2189,15 @@ instance Cass.Cql FeatureStatus where toCql FeatureStatusDisabled = Cass.CqlInt 0 toCql FeatureStatusEnabled = Cass.CqlInt 1 +instance PostgresMarshall Int32 FeatureStatus where + postgresMarshall FeatureStatusEnabled = 1 + postgresMarshall FeatureStatusDisabled = 0 + +instance PostgresUnmarshall Int32 FeatureStatus where + postgresUnmarshall 1 = Right FeatureStatusEnabled + postgresUnmarshall 0 = Right FeatureStatusDisabled + postgresUnmarshall _ = Left "invalid feature status" + -- | list of available features config types type Features :: [Type] type Features = diff --git a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs index 31ce898c43..191f55bffd 100644 --- a/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs +++ b/libs/wire-api/test/unit/Test/Wire/API/Roundtrip/PostgresMarshall.hs @@ -20,6 +20,7 @@ module Test.Wire.API.Roundtrip.PostgresMarshall (tests) where import Crypto.Error (CryptoFailable (..)) import Crypto.KDF.Argon2 qualified as Argon2 +import Data.Aeson as A import Data.ByteString.Char8 qualified as BS8 import Data.Code qualified as Code import Data.Misc (PlainTextPassword8, fromPlainTextPassword) @@ -32,6 +33,7 @@ import Wire.API.Password as Password import Wire.API.Password.Argon2id (Argon2HashedPassword (..), encodeArgon2HashedPassword) import Wire.API.Password.Scrypt (encodeScryptPassword) import Wire.API.PostgresMarshall +import Wire.API.Team.Feature import Wire.Arbitrary qualified as Arbitrary () tests :: T.TestTree @@ -39,7 +41,10 @@ tests = T.localOption (T.Timeout (60 * 1000000) "60s") . T.testGroup "PostgresMarshall roundtrip tests" $ [ testRoundTrip @Text @Code.Key, testRoundTrip @Text @Code.Value, - testRoundTrip @ByteString @Password.Password + testRoundTrip @ByteString @Password.Password, + testRoundTrip @Int32 @FeatureStatus, + testRoundTrip @Int32 @LockStatus, + testRoundTrip @A.Value @DbConfig ] testRoundTrip :: diff --git a/libs/wire-subsystems/postgres-migrations/20260123124917-team-features.sql b/libs/wire-subsystems/postgres-migrations/20260123124917-team-features.sql new file mode 100644 index 0000000000..a95e85d90b --- /dev/null +++ b/libs/wire-subsystems/postgres-migrations/20260123124917-team-features.sql @@ -0,0 +1,8 @@ +CREATE TABLE team_features ( + team uuid NOT NULL, + feature text NOT NULL, + config jsonb, + lock_status int, + status int, + PRIMARY KEY (team, feature) +); diff --git a/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs b/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs index 112b61a91e..77a655a379 100644 --- a/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs +++ b/libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs @@ -31,6 +31,7 @@ import Wire.CodeStore.Cassandra qualified as Cassandra import Wire.CodeStore.Postgres qualified as Postgres import Wire.Postgres (PGConstraints) +-- | Cassandra is the sourceof truth during migration; writes are mirrored to Postgres. interpretCodeStoreToCassandraAndPostgres :: ( Member (Input ClientState) r, Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r, @@ -38,8 +39,6 @@ interpretCodeStoreToCassandraAndPostgres :: ) => Sem (CodeStore ': r) a -> Sem r a - --- | Cassandra is the source of truth during migration; writes are mirrored to Postgres. interpretCodeStoreToCassandraAndPostgres = interpret $ \case GetCode k -> do Cassandra.interpretCodeStoreToCassandra $ CodeStore.getCode k diff --git a/libs/wire-subsystems/src/Wire/ConversationStore.hs b/libs/wire-subsystems/src/Wire/ConversationStore.hs index 9b7b4b97e2..5992eaf23d 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore.hs @@ -20,14 +20,12 @@ module Wire.ConversationStore where import Control.Error (lastMay) -import Data.Aeson import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.Id import Data.Misc import Data.Qualified import Data.Range -import Data.Text qualified as Text import Data.Time.Clock import Imports import Polysemy @@ -204,37 +202,6 @@ getConversationIds lusr maxIds pagingState = do } } -data StorageLocation - = -- | Use when solely using Cassandra - CassandraStorage - | -- | Use while migration to postgresql. Using this option does not trigger - -- the migration. Newly created conversations are stored in Postgresql. - -- Once this has been turned on, it MUST NOT be made CassandraStorage ever - -- again. - MigrationToPostgresql - | -- | Use after migrating to postgresql - PostgresqlStorage - deriving (Show) - -instance FromJSON StorageLocation where - parseJSON = withText "StorageLocation" $ \case - "cassandra" -> pure CassandraStorage - "migration-to-postgresql" -> pure MigrationToPostgresql - "postgresql" -> pure PostgresqlStorage - x -> fail $ "Invalid storage location: " <> Text.unpack x <> ". Valid options: cassandra, postgresql, migration-to-postgresql" - -data PostgresMigrationOpts = PostgresMigrationOpts - { conversation :: StorageLocation, - conversationCodes :: StorageLocation - } - deriving (Show) - -instance FromJSON PostgresMigrationOpts where - parseJSON = withObject "PostgresMigrationOpts" $ \o -> - PostgresMigrationOpts - <$> o .: "conversation" - <*> o .: "conversationCodes" - getConvOrSubGroupInfo :: (Member ConversationStore r) => ConvOrSubConvId -> diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs index 66f3943b90..77ea2d6f00 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs @@ -76,8 +76,8 @@ import Wire.ConversationStore.Cassandra.Queries qualified as Cql import Wire.ConversationStore.Cassandra.Queries qualified as Queries import Wire.ConversationStore.MLS.Types import Wire.ConversationStore.Migration.Cleanup -import Wire.ConversationStore.MigrationLock import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) +import Wire.MigrationLock import Wire.Postgres import Wire.Sem.Paging.Cassandra import Wire.StoredConversation @@ -1094,7 +1094,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case True -> interpretConversationStoreToPostgres $ ConvStore.getConversationEpoch cid GetConversations cids -> do logEffect "ConversationStore.GetConversations" - withMigrationLocksAndCleanup client LockShared (Seconds 2) (Left <$> cids) $ do + withMigrationLocksAndConvCleanup client LockShared (Seconds 2) cids $ do let indexByConvId = foldr (\storedConv -> Map.insert storedConv.id_ storedConv) Map.empty cassConvs <- indexByConvId <$> localConversations client cids pgConvs <- indexByConvId <$> interpretConversationStoreToPostgres (ConvStore.getConversations cids) @@ -1163,7 +1163,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case True -> interpretConversationStoreToPostgres (ConvStore.isConversationAlive cid) SelectConversations uid cids -> do logEffect "ConversationStore.SelectConversations" - withMigrationLocksAndCleanup client LockShared (Seconds 2) (Left <$> cids) $ do + withMigrationLocksAndConvCleanup client LockShared (Seconds 2) cids $ do cassConvs <- embedClient client $ localConversationIdsOf uid cids pgConvs <- interpretConversationStoreToPostgres $ ConvStore.selectConversations uid cids pure $ List.nubOrd (pgConvs <> cassConvs) @@ -1287,7 +1287,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case logEffect "ConversationStore.CreateMembersInRemoteConversation" -- Save users joining their first remote conv in postgres - withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do + withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do filterUsersInPostgres uids >>= \pgUids -> do let -- These are not in Postgres, but that doesn't mean they're in -- cassandra @@ -1334,7 +1334,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case True -> interpretConversationStoreToPostgres $ ConvStore.checkLocalMemberRemoteConv uid rcnv SelectRemoteMembers uids rcnv -> do logEffect "ConversationStore.SelectRemoteMembers" - withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do + withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do filterUsersInPostgres uids >>= \pgUids -> do (pgUsers, _) <- interpretConversationStoreToPostgres $ ConvStore.selectRemoteMembers pgUids rcnv (cassUsers, _) <- embedClient client $ filterRemoteConvMembers uids rcnv @@ -1369,7 +1369,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case interpretConversationStoreToPostgres $ ConvStore.deleteMembers cid ul DeleteMembersInRemoteConversation rcnv uids -> do logEffect "ConversationStore.DeleteMembersInRemoteConversation" - withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do + withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do -- No need to check where these are, we just delete them from both places embedClient client $ removeLocalMembersFromRemoteConv rcnv uids interpretConversationStoreToPostgres $ ConvStore.deleteMembersInRemoteConversation rcnv uids @@ -1487,7 +1487,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case True -> interpretConversationStoreToPostgres $ ConvStore.isConversationOutOfSync convId HaveRemoteConvs uids -> do logEffect "ConversationStore.DeleteSubConversation" - withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do + withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do remotesInCass <- embedClient client $ haveRemoteConvs uids remotesInPG <- interpretConversationStoreToPostgres $ ConvStore.haveRemoteConvs uids pure $ List.nubOrd (remotesInPG <> remotesInCass) @@ -1532,10 +1532,12 @@ withMigrationLockAndCleanup :: Either ConvId UserId -> Sem (Error MigrationLockError : r) a -> Sem r a -withMigrationLockAndCleanup cassClient ty key = - withMigrationLocksAndCleanup cassClient ty (MilliSeconds 500) [key] +withMigrationLockAndCleanup cassClient ty (Left convId) = + withMigrationLocksAndConvCleanup cassClient ty (MilliSeconds 500) [convId] +withMigrationLockAndCleanup cassClient ty (Right userId) = + withMigrationLocksAndUserCleanup cassClient ty (MilliSeconds 500) [userId] -withMigrationLocksAndCleanup :: +withMigrationLocksAndConvCleanup :: ( PGConstraints r, Member Async r, Member TinyLog r, @@ -1546,12 +1548,33 @@ withMigrationLocksAndCleanup :: ClientState -> LockType -> u -> - [Either ConvId UserId] -> + [ConvId] -> + Sem (Error MigrationLockError : r) a -> + Sem r a +withMigrationLocksAndConvCleanup cassClient lockType maxWait convIds action = + mapError FailedToAcquireMigrationLock . withMigrationLocks lockType maxWait convIds $ do + interpretConversationStoreToCassandra cassClient + . runInputConst cassClient + $ cleanupIfNecessary (Left <$> convIds) + action + +withMigrationLocksAndUserCleanup :: + ( PGConstraints r, + Member Async r, + Member TinyLog r, + Member Race r, + Member (Error MigrationError) r, + TimeUnit u + ) => + ClientState -> + LockType -> + u -> + [UserId] -> Sem (Error MigrationLockError : r) a -> Sem r a -withMigrationLocksAndCleanup cassClient lockType maxWait convOrUsers action = - mapError FailedToAcquireMigrationLock . withMigrationLocks lockType maxWait convOrUsers $ do +withMigrationLocksAndUserCleanup cassClient lockType maxWait userIds action = + mapError FailedToAcquireMigrationLock . withMigrationLocks lockType maxWait userIds $ do interpretConversationStoreToCassandra cassClient . runInputConst cassClient - $ cleanupIfNecessary convOrUsers + $ cleanupIfNecessary (Right <$> userIds) action diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs index a3bf288c23..44cd919511 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs @@ -67,8 +67,8 @@ import Wire.ConversationStore.Cassandra (interpretConversationStoreToCassandra) import Wire.ConversationStore.MLS.Types import Wire.ConversationStore.Migration.Cleanup import Wire.ConversationStore.Migration.Types -import Wire.ConversationStore.MigrationLock import Wire.Migration +import Wire.MigrationLock import Wire.Postgres import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (..), unsafePooledMapConcurrentlyN_) import Wire.Sem.Concurrency.IO (unsafelyPerformConcurrency) @@ -216,7 +216,7 @@ migrateConversation :: ConvId -> Sem r () migrateConversation migCounter cid = do - void . withMigrationLocks LockExclusive (Seconds 10) [Left cid] $ do + void . withMigrationLocks LockExclusive (Seconds 10) [cid] $ do mConvData <- withCassandra $ getAllConvData cid for_ mConvData $ \convData -> do saveConvToPostgres convData @@ -445,7 +445,7 @@ saveConvToPostgres allConvData = do migrateUser :: (PGConstraints r, Member (Input ClientState) r, Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, Member Race r) => Prometheus.Counter -> UserId -> Sem r () migrateUser migCounter uid = do - withMigrationLocks LockExclusive (Seconds 10) [Right uid] $ do + withMigrationLocks LockExclusive (Seconds 10) [uid] $ do statusses <- getRemoteMemberStatusFromCassandra uid saveRemoteMemberStatusToPostgres uid statusses deleteRemoteMemberStatusesFromCassandra uid diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Types.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Types.hs index 37e1bbe193..39487845d6 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Types.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration/Types.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE ScopedTypeVariables #-} + -- This file is part of the Wire Server implementation. -- -- Copyright (C) 2025 Wire Swiss GmbH diff --git a/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem.hs b/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem.hs index 8d0e802d9f..d8824e685c 100644 --- a/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem.hs @@ -23,26 +23,15 @@ import Data.Id (TeamId, UserId) import Data.Qualified (Local) import Imports import Polysemy -import Wire.API.Team.Feature (AllTeamFeatures, LockableFeature) +import Wire.API.Team.Feature (AllTeamFeatures, DbFeature, LockableFeature) import Wire.FeaturesConfigSubsystem.Types data FeaturesConfigSubsystem m a where - GetFeature :: - forall cfg m. - (GetFeatureConfig cfg) => - UserId -> TeamId -> FeaturesConfigSubsystem m (LockableFeature cfg) - GetFeatureForTeam :: - forall cfg m. - (GetFeatureConfig cfg) => - TeamId -> FeaturesConfigSubsystem m (LockableFeature cfg) - GetFeatureForServer :: - forall cfg m. - (GetFeatureConfig cfg) => - FeaturesConfigSubsystem m (LockableFeature cfg) - GetFeatureForTeamUser :: - forall cfg m. - (GetFeatureConfig cfg) => - UserId -> Maybe TeamId -> FeaturesConfigSubsystem m (LockableFeature cfg) + GetDbFeatureRawInternal :: forall cfg m. (GetFeatureConfig cfg) => TeamId -> FeaturesConfigSubsystem m (DbFeature cfg) + GetFeature :: forall cfg m. (GetFeatureConfig cfg) => UserId -> TeamId -> FeaturesConfigSubsystem m (LockableFeature cfg) + GetFeatureForTeam :: forall cfg m. (GetFeatureConfig cfg) => TeamId -> FeaturesConfigSubsystem m (LockableFeature cfg) + GetFeatureForServer :: forall cfg m. (GetFeatureConfig cfg) => FeaturesConfigSubsystem m (LockableFeature cfg) + GetFeatureForTeamUser :: forall cfg m. (GetFeatureConfig cfg) => UserId -> Maybe TeamId -> FeaturesConfigSubsystem m (LockableFeature cfg) GetAllTeamFeaturesForTeamMember :: Local UserId -> TeamId -> FeaturesConfigSubsystem m AllTeamFeatures GetAllTeamFeaturesForTeam :: TeamId -> FeaturesConfigSubsystem m AllTeamFeatures GetAllTeamFeaturesForServer :: FeaturesConfigSubsystem m AllTeamFeatures diff --git a/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs index e82d02a1d7..852e4d57c8 100644 --- a/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/FeaturesConfigSubsystem/Interpreter.hs @@ -1,14 +1,19 @@ +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeOperators #-} {-# OPTIONS_GHC -Wno-ambiguous-fields #-} module Wire.FeaturesConfigSubsystem.Interpreter where +import Data.Aeson.Types qualified as A import Data.Id import Data.Qualified (tUnqualified) import Data.SOP +import Data.Text.Lazy qualified as LT import Galley.Types.Teams import Imports import Polysemy +import Polysemy.Error import Polysemy.Input import Wire.API.Error import Wire.API.Error.Galley @@ -20,77 +25,108 @@ import Wire.TeamFeatureStore import Wire.TeamSubsystem (TeamSubsystem) import Wire.TeamSubsystem qualified as TeamSubsystem +data TeamFeatureStoreError = TeamFeatureStoreErrorInternalError LText + runFeaturesConfigSubsystem :: forall r a. ( Member TeamFeatureStore r, Member TeamSubsystem r, + Member (Error TeamFeatureStoreError) r, Member (ErrorS 'NotATeamMember) r, GetFeatureConfigEffects r ) => Sem (FeaturesConfigSubsystem : r) a -> Sem r a runFeaturesConfigSubsystem = interpret $ \case + GetDbFeatureRawInternal tid -> getDbFeatureRawInternalImpl tid GetFeature uid tid -> do void $ TeamSubsystem.internalGetTeamMember uid tid >>= noteS @'NotATeamMember - doGetFeatureForTeam tid + getFeatureForTeamImpl tid GetFeatureForTeam tid -> - doGetFeatureForTeam tid + getFeatureForTeamImpl tid GetFeatureForServer -> resolveServerFeature GetFeatureForTeamUser uid mTid -> - doGetFeatureForTeamUser uid mTid + getFeatureForTeamUserImpl uid mTid GetAllTeamFeaturesForTeamMember luid tid -> do void $ TeamSubsystem.internalGetTeamMember (tUnqualified luid) tid >>= noteS @'NotATeamMember - doGetAllTeamFeatures tid + getAllTeamFeaturesImpl tid GetAllTeamFeaturesForTeam tid -> - doGetAllTeamFeatures tid + getAllTeamFeaturesImpl tid GetAllTeamFeaturesForServer -> - doGetAllTeamFeaturesForServer + getAllTeamFeaturesForServerImpl -- Internal helpers -doGetFeatureForTeam :: +getFeatureForTeamImpl :: forall cfg r. ( GetFeatureConfig cfg, Member TeamFeatureStore r, + Member (Error TeamFeatureStoreError) r, GetFeatureConfigEffects r ) => TeamId -> Sem r (LockableFeature cfg) -doGetFeatureForTeam tid = do - dbFeature <- getDbFeature tid +getFeatureForTeamImpl tid = do + dbFeature <- getDbFeatureRawInternalImpl tid defFeature <- resolveServerFeature computeFeature tid defFeature dbFeature -doGetFeatureForTeamUser :: +getFeatureForTeamUserImpl :: forall cfg r. ( GetFeatureConfig cfg, Member TeamFeatureStore r, + Member (Error TeamFeatureStoreError) r, GetFeatureConfigEffects r ) => UserId -> Maybe TeamId -> Sem r (LockableFeature cfg) -doGetFeatureForTeamUser uid Nothing = getFeatureForUser uid -doGetFeatureForTeamUser _uid (Just tid) = doGetFeatureForTeam tid +getFeatureForTeamUserImpl uid Nothing = getFeatureForUser uid +getFeatureForTeamUserImpl _uid (Just tid) = getFeatureForTeamImpl tid -doGetAllTeamFeatures :: +getAllTeamFeaturesImpl :: forall r. ( Member TeamFeatureStore r, + Member (Error TeamFeatureStoreError) r, GetFeatureConfigEffects r ) => TeamId -> Sem r AllTeamFeatures -doGetAllTeamFeatures tid = do +getAllTeamFeaturesImpl tid = do features <- getAllDbFeatures tid - defFeatures <- doGetAllTeamFeaturesForServer + defFeatures <- getAllTeamFeaturesForServerImpl hsequence' $ hcliftA2 (Proxy @(GetAllFeaturesForServerConstraints r)) compute defFeatures features where - compute :: forall p. (GetFeatureConfig p) => LockableFeature p -> DbFeature p -> (Sem r :.: LockableFeature) p - compute defFeature feat = Comp $ computeFeature tid defFeature feat + compute :: forall p. (GetFeatureConfig p) => LockableFeature p -> K (Maybe DbFeaturePatch) p -> (Sem r :.: LockableFeature) p + compute defFeature (K mPatch) = Comp $ do + dbFeature <- fromMaybe mempty <$> traverse parseDbFeatureOrThrow mPatch + computeFeature tid defFeature dbFeature -doGetAllTeamFeaturesForServer :: forall r. (Member (Input FeatureFlags) r) => Sem r AllTeamFeatures -doGetAllTeamFeaturesForServer = +getAllTeamFeaturesForServerImpl :: forall r. (Member (Input FeatureFlags) r) => Sem r AllTeamFeatures +getAllTeamFeaturesForServerImpl = hsequence' $ hcpure (Proxy @GetFeatureConfig) $ Comp resolveServerFeature + +getDbFeatureRawInternalImpl :: + forall cfg r. + ( IsFeatureConfig cfg, + Member (Error TeamFeatureStoreError) r, + Member TeamFeatureStore r + ) => + TeamId -> Sem r (DbFeature cfg) +getDbFeatureRawInternalImpl tid = + fromMaybe mempty <$> (getDbFeature @cfg tid >>= traverse parseDbFeatureOrThrow) + +parseDbFeatureOrThrow :: + forall cfg r. + ( IsFeatureConfig cfg, + Member (Error TeamFeatureStoreError) r + ) => + DbFeaturePatch -> + Sem r (DbFeature cfg) +parseDbFeatureOrThrow feat = + mapError (TeamFeatureStoreErrorInternalError . LT.pack) + . fromEither + $ A.parseEither (const (parseDbFeature feat)) () diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/MigrationLock.hs b/libs/wire-subsystems/src/Wire/MigrationLock.hs similarity index 69% rename from libs/wire-subsystems/src/Wire/ConversationStore/MigrationLock.hs rename to libs/wire-subsystems/src/Wire/MigrationLock.hs index e0830b5b17..bf8ba326e8 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/MigrationLock.hs +++ b/libs/wire-subsystems/src/Wire/MigrationLock.hs @@ -14,10 +14,13 @@ -- -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE TypeApplications #-} -module Wire.ConversationStore.MigrationLock where +module Wire.MigrationLock where import Data.Bits +import Data.Hashable (hash) import Data.Id import Data.UUID qualified as UUID import Data.Vector (Vector) @@ -26,6 +29,9 @@ import Hasql.Session qualified as Session import Hasql.Statement qualified as Hasql import Hasql.TH import Imports +import Network.HTTP.Types.Status (status500) +import Network.Wai.Utilities.Error qualified as WaiError +import Network.Wai.Utilities.JSONResponse import Polysemy import Polysemy.Async import Polysemy.Conc.Effect.Race @@ -35,11 +41,19 @@ import Polysemy.Time.Data.TimeUnit import Polysemy.TinyLog (TinyLog) import Polysemy.TinyLog qualified as TinyLog import System.Logger.Message qualified as Log +import Wire.API.Error import Wire.API.PostgresMarshall import Wire.Postgres +class MigrationLockable a where + -- | namespace (e.g. "conv", "user", etc.), used for logging only + lockScope :: ByteString + + -- | key used for advisory locks; should be collision-resistant (unique with high probability) + lockKey :: a -> Int64 + data LockType - = -- | Used for migrating a conversation, will block any other locks + = -- | Used for migrating a set of data, will block any other locks LockExclusive | -- | Used for reading and writing to Cassandra, will block exclusive locks LockShared @@ -47,26 +61,31 @@ data LockType data MigrationLockError = TimedOutAcquiringLock deriving (Show) +instance APIError MigrationLockError where + toResponse _ = waiErrorToJSONResponse $ WaiError.mkError status500 "internal-server-error" "Internal Server Error" + withMigrationLocks :: + forall x a u r. ( PGConstraints r, Member Async r, Member TinyLog r, Member Race r, Member (Error MigrationLockError) r, - TimeUnit u + TimeUnit u, + MigrationLockable x ) => LockType -> u -> - [Either ConvId UserId] -> + [x] -> Sem r a -> Sem r a -withMigrationLocks lockType maxWait convOrUsers action = do +withMigrationLocks lockType maxWait lockables action = do lockAcquired <- embed newEmptyMVar actionCompleted <- embed newEmptyMVar pool <- input lockThread <- async . embed . Hasql.use pool $ do - let lockIds = map mkLockId convOrUsers + let lockIds = fmap lockKey lockables Session.statement lockIds acquireLocks liftIO $ putMVar lockAcquired () @@ -80,14 +99,14 @@ withMigrationLocks lockType maxWait convOrUsers action = do mEithErr <- timeout (cancel lockThread) (Seconds 1) $ await lockThread let logFirstLock = - case convOrUsers of + case lockables of [] -> id - (convOrUser : _) -> Log.field (either (const "first_conv") (const "first_user") convOrUser) (either idToText idToText convOrUser) + (x : _) -> Log.field ("first_" <> lockScope @x) (lockKey x) logError errorStr = TinyLog.warn $ Log.msg (Log.val "Failed to cleanly unlock the migration locks") . logFirstLock - . Log.field "numberOfLocks" (length convOrUsers) + . Log.field "numberOfLocks" (length lockables) . Log.field "error" errorStr case mEithErr of Left () -> logError "timed out waiting for unlock" @@ -97,17 +116,6 @@ withMigrationLocks lockType maxWait convOrUsers action = do pure res where - mkLockId :: Either ConvId UserId -> Int64 - mkLockId convOrUser = fromIntegral $ case convOrUser of - Left convId -> hashUUID convId - Right userId -> hashUUID userId - - hashUUID :: Id a -> Int64 - hashUUID (toUUID -> uuid) = - let (w1, w2) = UUID.toWords64 uuid - mixed = w1 `xor` (w2 `shiftR` 32) `xor` (w2 `shiftL` 32) - in fromIntegral mixed - acquireLocks :: Hasql.Statement [Int64] () acquireLocks = lmapPG @(Vector _) @@ -133,3 +141,27 @@ withMigrationLocks lockType maxWait convOrUsers action = do [resultlessStatement|SELECT (1 :: int) FROM (SELECT pg_advisory_unlock_shared(lockId) FROM (SELECT UNNEST($1 :: bigint[]) as lockId))|] + +-------------------------------------------------------------------------------- +-- INSTANCES + +instance MigrationLockable (TeamId, Text) where + lockKey (team, featureName) = + let teamHash = hashUUID team + featureHash = fromIntegral (hash featureName) + in teamHash `xor` rotateL featureHash 1 + lockScope = "team_feature" + +instance MigrationLockable ConvId where + lockKey = hashUUID + lockScope = "conv" + +instance MigrationLockable UserId where + lockKey = hashUUID + lockScope = "user" + +hashUUID :: Id a -> Int64 +hashUUID (toUUID -> uuid) = + let (w1, w2) = UUID.toWords64 uuid + mixed = w1 `xor` (w2 `shiftR` 32) `xor` (w2 `shiftL` 32) + in fromIntegral mixed diff --git a/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs b/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs new file mode 100644 index 0000000000..86fa90c878 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/PostgresMigrationOpts.hs @@ -0,0 +1,57 @@ +{-# OPTIONS_GHC -fforce-recomp #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2025 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.PostgresMigrationOpts where + +import Data.Aeson +import Data.Text qualified as Text +import Imports + +data StorageLocation + = -- | Use when solely using Cassandra + CassandraStorage + | -- | Use while migration to postgresql. Using this option does not trigger + -- the migration. Newly created data is stored in Postgresql. + -- Once this has been turned on, it MUST NOT be made CassandraStorage ever + -- again. + MigrationToPostgresql + | -- | Use after migrating to postgresql + PostgresqlStorage + deriving (Show) + +instance FromJSON StorageLocation where + parseJSON = withText "StorageLocation" $ \case + "cassandra" -> pure CassandraStorage + "migration-to-postgresql" -> pure MigrationToPostgresql + "postgresql" -> pure PostgresqlStorage + x -> fail $ "Invalid storage location: " <> Text.unpack x <> ". Valid options: cassandra, postgresql, migration-to-postgresql" + +data PostgresMigrationOpts = PostgresMigrationOpts + { conversation :: StorageLocation, + conversationCodes :: StorageLocation, + teamFeatures :: StorageLocation + } + deriving (Show) + +instance FromJSON PostgresMigrationOpts where + parseJSON = withObject "PostgresMigrationOpts" $ \o -> + PostgresMigrationOpts + <$> o .: "conversation" + <*> o .: "conversationCodes" + <*> o .: "teamFeatures" diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore.hs index 7be546ed65..9f9a2527f2 100644 --- a/libs/wire-subsystems/src/Wire/TeamFeatureStore.hs +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore.hs @@ -14,19 +14,27 @@ -- -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} module Wire.TeamFeatureStore where import Data.Id +import Data.SOP (K (..)) +import Imports import Polysemy import Wire.API.Team.Feature +type DbFeaturePatch = LockableFeaturePatch DbConfig + +type AllDbFeaturePatches = AllFeatures (K (Maybe DbFeaturePatch)) + data TeamFeatureStore m a where -- | Returns all stored feature values excluding lock status. GetDbFeature :: FeatureSingleton cfg -> TeamId -> - TeamFeatureStore m (DbFeature cfg) + TeamFeatureStore m (Maybe DbFeaturePatch) SetDbFeature :: FeatureSingleton cfg -> TeamId -> @@ -44,27 +52,30 @@ data TeamFeatureStore m a where TeamFeatureStore m () GetAllDbFeatures :: TeamId -> - TeamFeatureStore m (AllFeatures DbFeature) + TeamFeatureStore m AllDbFeaturePatches getDbFeature :: + forall cfg r. (Member TeamFeatureStore r, IsFeatureConfig cfg) => TeamId -> - Sem r (DbFeature cfg) -getDbFeature tid = send (GetDbFeature featureSingleton tid) + Sem r (Maybe DbFeaturePatch) +getDbFeature tid = send (GetDbFeature (featureSingleton @cfg) tid) setDbFeature :: + forall cfg r. (Member TeamFeatureStore r, IsFeatureConfig cfg) => TeamId -> LockableFeature cfg -> Sem r () -setDbFeature tid feat = send (SetDbFeature featureSingleton tid feat) +setDbFeature tid feat = send (SetDbFeature (featureSingleton @cfg) tid feat) patchDbFeature :: + forall cfg r. (Member TeamFeatureStore r, IsFeatureConfig cfg) => TeamId -> (LockableFeaturePatch cfg) -> Sem r () -patchDbFeature tid featPatch = send (PatchDbFeature featureSingleton tid featPatch) +patchDbFeature tid featPatch = send (PatchDbFeature (featureSingleton @cfg) tid featPatch) setFeatureLockStatus :: forall cfg r. @@ -75,5 +86,5 @@ setFeatureLockStatus :: setFeatureLockStatus tid lockStatus = send (SetFeatureLockStatus (featureSingleton @cfg) tid lockStatus) -getAllDbFeatures :: (Member TeamFeatureStore r) => TeamId -> Sem r (AllFeatures DbFeature) +getAllDbFeatures :: (Member TeamFeatureStore r) => TeamId -> Sem r AllDbFeaturePatches getAllDbFeatures tid = send (GetAllDbFeatures tid) diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs index 2a897bc6b3..4e5826a3dd 100644 --- a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Cassandra.hs @@ -17,65 +17,57 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra, TeamFeatureStoreError (..)) where +module Wire.TeamFeatureStore.Cassandra (interpretTeamFeatureStoreToCassandra) where import Cassandra -import Data.Aeson.Types qualified as A import Data.Constraint import Data.Id -import Data.Map qualified as M -import Data.Text.Lazy qualified as LT +import Data.Map qualified as Map +import Data.Proxy +import Data.SOP (K (..), hcpure) import Imports import Polysemy -import Polysemy.Error import Polysemy.Input import Wire.API.Team.Feature import Wire.API.Team.Feature.TH import Wire.ConversationStore.Cassandra.Instances () -import Wire.TeamFeatureStore (TeamFeatureStore (..)) +import Wire.TeamFeatureStore (AllDbFeaturePatches, DbFeaturePatch, TeamFeatureStore (..)) import Wire.Util -data TeamFeatureStoreError = TeamFeatureStoreErrorInternalError LText - interpretTeamFeatureStoreToCassandra :: ( Member (Embed IO) r, - Member (Input ClientState) r, - Member (Error TeamFeatureStoreError) r + Member (Input ClientState) r ) => Sem (TeamFeatureStore ': r) a -> Sem r a interpretTeamFeatureStoreToCassandra = interpret $ \case GetDbFeature sing tid -> do - getDbFeatureDyn sing tid + getDbFeatureImpl sing tid SetDbFeature sing tid feat -> do - setDbFeatureDyn sing tid feat + setDbFeatureImpl sing tid feat SetFeatureLockStatus sing tid lock -> do - setFeatureLockStatusDyn sing tid (Tagged lock) + setFeatureLockStatusImpl sing tid (Tagged lock) GetAllDbFeatures tid -> do - getAllDbFeaturesDyn tid + getAllDbFeaturesImpl tid PatchDbFeature sing tid feat -> do - patchDbFeatureDyn sing tid feat + patchDbFeatureImpl sing tid feat -getDbFeatureDyn :: +getDbFeatureImpl :: forall cfg r. ( Member (Input ClientState) r, - Member (Embed IO) r, - Member (Error TeamFeatureStoreError) r + Member (Embed IO) r ) => FeatureSingleton cfg -> TeamId -> - Sem r (DbFeature cfg) -getDbFeatureDyn sing tid = case featureSingIsFeature sing of + Sem r (Maybe DbFeaturePatch) +getDbFeatureImpl sing tid = case featureSingIsFeature sing of Dict -> do let q :: PrepQuery R (TeamId, Text) (Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) q = "select status, lock_status, config from team_features_dyn where team = ? and feature = ?" - (embedClientInput (retry x1 $ query1 q (params LocalQuorum (tid, featureName @cfg)))) >>= \case - Nothing -> pure mempty - Just (status, lockStatus, config) -> - runFeatureParser . parseDbFeature $ - LockableFeaturePatch {..} + mRow <- (embedClientInput (retry x1 $ query1 q (params LocalQuorum (tid, featureName @cfg)))) + pure $ (\(status, lockStatus, config) -> LockableFeaturePatch {..}) <$> mRow -setDbFeatureDyn :: +setDbFeatureImpl :: forall cfg r. ( Member (Input ClientState) r, Member (Embed IO) r @@ -84,8 +76,8 @@ setDbFeatureDyn :: TeamId -> LockableFeature cfg -> Sem r () -setDbFeatureDyn sing tid feat = - patchDbFeatureDyn +setDbFeatureImpl sing tid feat = + patchDbFeatureImpl sing tid ( LockableFeaturePatch @@ -95,7 +87,7 @@ setDbFeatureDyn sing tid feat = } ) -patchDbFeatureDyn :: +patchDbFeatureImpl :: forall cfg r. ( Member (Input ClientState) r, Member (Embed IO) r @@ -104,7 +96,7 @@ patchDbFeatureDyn :: TeamId -> LockableFeaturePatch cfg -> Sem r () -patchDbFeatureDyn sing tid patch = case featureSingIsFeature sing of +patchDbFeatureImpl sing tid patch = case featureSingIsFeature sing of Dict -> embedClientInput $ do retry x5 . batch $ do setType BatchLogged @@ -122,7 +114,7 @@ patchDbFeatureDyn sing tid patch = case featureSingIsFeature sing of writeConfig :: PrepQuery W (DbConfig, TeamId, Text) () writeConfig = "update team_features_dyn set config = ? where team = ? and feature = ?" -setFeatureLockStatusDyn :: +setFeatureLockStatusImpl :: forall cfg r. ( Member (Input ClientState) r, Member (Embed IO) r @@ -131,7 +123,7 @@ setFeatureLockStatusDyn :: TeamId -> Tagged cfg LockStatus -> Sem r () -setFeatureLockStatusDyn sing tid (Tagged lockStatus) = case featureSingIsFeature sing of +setFeatureLockStatusImpl sing tid (Tagged lockStatus) = case featureSingIsFeature sing of Dict -> do let q :: PrepQuery W (LockStatus, TeamId, Text) () q = "update team_features_dyn set lock_status = ? where team = ? and feature = ?" @@ -139,28 +131,23 @@ setFeatureLockStatusDyn sing tid (Tagged lockStatus) = case featureSingIsFeature retry x5 $ write q (params LocalQuorum (lockStatus, tid, featureName @cfg)) -getAllDbFeaturesDyn :: +getAllDbFeaturesImpl :: ( Member (Embed IO) r, - Member (Input ClientState) r, - Member (Error TeamFeatureStoreError) r + Member (Input ClientState) r ) => TeamId -> - Sem r (AllFeatures DbFeature) -getAllDbFeaturesDyn tid = do + Sem r AllDbFeaturePatches +getAllDbFeaturesImpl tid = do let q :: PrepQuery R (Identity TeamId) (Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) q = "select feature, status, lock_status, config from team_features_dyn where team = ?" rows <- embedClientInput $ retry x1 $ query q (params LocalQuorum (Identity tid)) - let m = M.fromList $ do + let m = Map.fromList $ do (name, status, lockStatus, config) <- rows pure (name, LockableFeaturePatch {..}) - runFeatureParser $ mkAllFeatures m + pure $ mkAllDbFeaturePatches m + where + mkAllDbFeaturePatches :: Map Text DbFeaturePatch -> AllDbFeaturePatches + mkAllDbFeaturePatches m = hcpure (Proxy @IsFeatureConfig) $ get m -runFeatureParser :: - forall r a. - (Member (Error TeamFeatureStoreError) r) => - A.Parser a -> - Sem r a -runFeatureParser p = - mapError (TeamFeatureStoreErrorInternalError . LT.pack) - . fromEither - $ A.parseEither (const p) () + get :: forall cfg. (IsFeatureConfig cfg) => Map Text DbFeaturePatch -> K (Maybe DbFeaturePatch) cfg + get m = K (Map.lookup (featureName @cfg) m) diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Error.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Error.hs new file mode 100644 index 0000000000..234e270011 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Error.hs @@ -0,0 +1,35 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . +module Wire.TeamFeatureStore.Error where + +import Data.Aeson.Types qualified as A +import Data.Text.Lazy qualified as LT +import Imports +import Polysemy +import Polysemy.Error + +data TeamFeatureStoreError = TeamFeatureStoreErrorInternalError LText + +runFeatureParser :: + forall r a. + (Member (Error TeamFeatureStoreError) r) => + A.Parser a -> + Sem r a +runFeatureParser p = + mapError (TeamFeatureStoreErrorInternalError . LT.pack) + . fromEither + $ A.parseEither (const p) () diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs new file mode 100644 index 0000000000..0d88f94a53 --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs @@ -0,0 +1,186 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Migrating where + +import Cassandra (ClientState) +import Data.Constraint +import Data.Id +import Data.SOP (K (..), hzipWith) +import Imports +import Polysemy +import Polysemy.Async +import Polysemy.Conc.Effect.Race +import Polysemy.Error +import Polysemy.Input +import Polysemy.Time +import Polysemy.TinyLog +import Wire.API.Team.Feature +import Wire.API.Team.Feature.TH +import Wire.MigrationLock +import Wire.Postgres +import Wire.TeamFeatureStore +import Wire.TeamFeatureStore.Cassandra +import Wire.TeamFeatureStore.Postgres + +interpretTeamFeatureStoreToCassandraAndPostgres :: + ( PGConstraints r, + Member (Input ClientState) r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Error MigrationLockError) r + ) => + Sem (TeamFeatureStore ': r) a -> + Sem r a +interpretTeamFeatureStoreToCassandraAndPostgres = interpret $ \case + GetDbFeature sing tid -> getDbFeatureImpl sing tid + GetAllDbFeatures tid -> getAllDbFeaturesImpl tid + SetDbFeature sing tid feat -> setDbFeatureImpl sing tid feat + SetFeatureLockStatus sing tid lock -> setFeatureLockStatusImpl sing tid lock + PatchDbFeature sing tid feat -> patchDbFeatureImpl sing tid feat + +-- Read path under lock: +-- - Prefer Postgres; fallback to Cassandra; if neither exists → Nothing. +getDbFeatureImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + Sem r (Maybe DbFeaturePatch) +getDbFeatureImpl sing tid = case featureSingIsFeature sing of + Dict -> + withSharedLock (tid, featureName @cfg) $ do + mFeature <- interpretTeamFeatureStoreToPostgres $ send (GetDbFeature sing tid) + maybe + (interpretTeamFeatureStoreToCassandra $ send (GetDbFeature sing tid)) + (pure . Just) + mFeature + +-- Read all feature, no lock: +-- - Read all features from Postgres. +-- - Read all features from Cassandra. +-- - Merge per‑feature with precedence: Postgres wins, fallback to Cassandra, otherwise Nothing. +getAllDbFeaturesImpl :: + forall r. + ( PGConstraints r, + Member (Input ClientState) r + ) => + TeamId -> + Sem r AllDbFeaturePatches +getAllDbFeaturesImpl tid = do + mergeDbFeaturePatches + <$> interpretTeamFeatureStoreToPostgres (send (GetAllDbFeatures tid)) + <*> interpretTeamFeatureStoreToCassandra (send (GetAllDbFeatures tid)) + where + mergeDbFeaturePatches = hzipWith $ \(K psqlPatch) (K cassPatch) -> K (psqlPatch <|> cassPatch) + +setDbFeatureImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + LockableFeature cfg -> + Sem r () +setDbFeatureImpl sing tid feat = case featureSingIsFeature sing of + Dict -> withWritePathUnderLock sing tid $ send (SetDbFeature sing tid feat) + +setFeatureLockStatusImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + LockStatus -> + Sem r () +setFeatureLockStatusImpl sing tid lock = case featureSingIsFeature sing of + Dict -> withWritePathUnderLock sing tid $ send (SetFeatureLockStatus sing tid lock) + +patchDbFeatureImpl :: + forall cfg r. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r + ) => + FeatureSingleton cfg -> + TeamId -> + LockableFeaturePatch cfg -> + Sem r () +patchDbFeatureImpl sing tid feat = case featureSingIsFeature sing of + Dict -> withWritePathUnderLock sing tid $ send (PatchDbFeature sing tid feat) + +-- Write path under lock: +-- 1. Check Postgres for row. +-- 2. If exists -> write Postgres. +-- 3. Else check Cassandra. +-- 4. If exists -> write Cassandra. +-- 5. Else → write Postgres (new canonical row). +withWritePathUnderLock :: + forall cfg r a. + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Input ClientState) r, + Member (Error MigrationLockError) r, + IsFeatureConfig cfg + ) => + FeatureSingleton cfg -> + TeamId -> + Sem (TeamFeatureStore ': r) a -> + Sem r a +withWritePathUnderLock sing tid action = + withSharedLock (tid, featureName @cfg) $ do + mFeaturePsql <- interpretTeamFeatureStoreToPostgres $ send (GetDbFeature sing tid) + if isJust mFeaturePsql + then interpretTeamFeatureStoreToPostgres action + else do + mFeatureCql <- interpretTeamFeatureStoreToCassandra $ send (GetDbFeature sing tid) + if isJust mFeatureCql + then interpretTeamFeatureStoreToCassandra action + else interpretTeamFeatureStoreToPostgres action + +withSharedLock :: + ( PGConstraints r, + Member TinyLog r, + Member Async r, + Member Race r, + Member (Error MigrationLockError) r, + MigrationLockable x + ) => + x -> Sem r a -> Sem r a +withSharedLock lockable = withMigrationLocks LockShared (MilliSeconds 500) [lockable] diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres.hs new file mode 100644 index 0000000000..20a1821ebc --- /dev/null +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Postgres.hs @@ -0,0 +1,172 @@ +{-# LANGUAGE RecordWildCards #-} + +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2026 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Wire.TeamFeatureStore.Postgres (interpretTeamFeatureStoreToPostgres) where + +import Data.Constraint +import Data.Id +import Data.Map qualified as M +import Data.Map qualified as Map +import Data.Proxy +import Data.SOP (K (..), hcpure) +import Data.Vector (Vector) +import Data.Vector qualified as Vector +import Hasql.Statement qualified as Hasql +import Hasql.TH +import Imports +import Polysemy +import Wire.API.PostgresMarshall +import Wire.API.Team.Feature +import Wire.API.Team.Feature.TH +import Wire.ConversationStore.Cassandra.Instances () +import Wire.Postgres +import Wire.TeamFeatureStore + +interpretTeamFeatureStoreToPostgres :: + (PGConstraints r) => + Sem (TeamFeatureStore ': r) a -> + Sem r a +interpretTeamFeatureStoreToPostgres = interpret $ \case + GetDbFeature sing tid -> do + getDbFeatureImpl sing tid + SetDbFeature sing tid feat -> do + setDbFeatureImpl sing tid feat + SetFeatureLockStatus sing tid lock -> do + setFeatureLockStatusImpl sing tid lock + GetAllDbFeatures tid -> do + getAllDbFeaturesImpl tid + PatchDbFeature sing tid feat -> do + patchDbFeatureImpl sing tid feat + +getDbFeatureImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + Sem r (Maybe DbFeaturePatch) +getDbFeatureImpl sing tid = case featureSingIsFeature sing of + Dict -> do + mRow <- runStatement (tid, featureName @cfg) select + pure $ (\(status, lockStatus, config) -> LockableFeaturePatch {..}) <$> mRow + where + select :: Hasql.Statement (TeamId, Text) (Maybe (Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig)) + select = + dimapPG + [maybeStatement|SELECT + status :: int?, + lock_status :: int?, + config :: jsonb? + FROM team_features + WHERE team = ($1 :: uuid) AND feature = ($2 :: text) + |] + +setDbFeatureImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + LockableFeature cfg -> + Sem r () +setDbFeatureImpl sing tid feat = + patchDbFeatureImpl + sing + tid + ( LockableFeaturePatch + { status = Just feat.status, + lockStatus = Just feat.lockStatus, + config = Just feat.config + } + ) + +patchDbFeatureImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + LockableFeaturePatch cfg -> + Sem r () +patchDbFeatureImpl sing tid patch = case featureSingIsFeature sing of + Dict -> do + runStatement + ( tid, + featureName @cfg, + patch.status, + patch.lockStatus, + serialiseDbConfig <$> patch.config + ) + upsertPatch + where + upsertPatch :: Hasql.Statement (TeamId, Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) () + upsertPatch = + lmapPG + [resultlessStatement|INSERT INTO team_features (team, feature, status, lock_status, config) + VALUES ($1 :: uuid, $2 :: text, $3 :: int?, $4 :: int?, $5 :: jsonb?) + ON CONFLICT (team, feature) DO UPDATE + SET status = COALESCE(EXCLUDED.status, team_features.status), + lock_status = COALESCE(EXCLUDED.lock_status, team_features.lock_status), + config = COALESCE(EXCLUDED.config, team_features.config) + |] + +setFeatureLockStatusImpl :: + forall cfg r. + (PGConstraints r) => + FeatureSingleton cfg -> + TeamId -> + LockStatus -> + Sem r () +setFeatureLockStatusImpl sing tid lockStatus = case featureSingIsFeature sing of + Dict -> do + runStatement (tid, featureName @cfg, lockStatus) writeLockStatus + where + writeLockStatus :: Hasql.Statement (TeamId, Text, LockStatus) () + writeLockStatus = + lmapPG + [resultlessStatement|INSERT INTO team_features (team, feature, lock_status) + VALUES ($1 :: uuid, $2 :: text, $3 :: int) + ON CONFLICT (team, feature) DO UPDATE + SET lock_status = EXCLUDED.lock_status + |] + +getAllDbFeaturesImpl :: + (PGConstraints r) => + TeamId -> + Sem r AllDbFeaturePatches +getAllDbFeaturesImpl tid = do + rows <- runStatement tid selectAll + let m = M.fromList $ do + (name, status, lockStatus, config) <- Vector.toList rows + pure (name, LockableFeaturePatch {..}) + pure $ mkAllDbFeaturePatches m + where + mkAllDbFeaturePatches :: Map Text DbFeaturePatch -> AllDbFeaturePatches + mkAllDbFeaturePatches m = hcpure (Proxy @IsFeatureConfig) $ get m + + get :: forall cfg. (IsFeatureConfig cfg) => Map Text DbFeaturePatch -> K (Maybe DbFeaturePatch) cfg + get m = K (Map.lookup (featureName @cfg) m) + + selectAll :: Hasql.Statement TeamId (Vector (Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig)) + selectAll = + dimapPG + [vectorStatement|SELECT (feature :: text), + (status :: int?), + (lock_status :: int?), + (config :: jsonb?) + FROM team_features + WHERE team = ($1 :: uuid) + |] diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index 5dc5e19c77..dbea70f429 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -239,7 +239,6 @@ library Wire.ConversationStore.Migration Wire.ConversationStore.Migration.Cleanup Wire.ConversationStore.Migration.Types - Wire.ConversationStore.MigrationLock Wire.ConversationStore.MLS.Types Wire.ConversationStore.Postgres Wire.ConversationSubsystem @@ -298,6 +297,7 @@ library Wire.LegalHoldStore.Env Wire.ListItems Wire.Migration + Wire.MigrationLock Wire.NotificationSubsystem Wire.NotificationSubsystem.Interpreter Wire.PaginationState @@ -307,6 +307,7 @@ library Wire.PasswordStore Wire.PasswordStore.Cassandra Wire.Postgres + Wire.PostgresMigrationOpts Wire.PostgresMigrations Wire.PropertyStore Wire.PropertyStore.Cassandra @@ -334,6 +335,9 @@ library Wire.TeamCollaboratorsSubsystem.Interpreter Wire.TeamFeatureStore Wire.TeamFeatureStore.Cassandra + Wire.TeamFeatureStore.Error + Wire.TeamFeatureStore.Migrating + Wire.TeamFeatureStore.Postgres Wire.TeamInvitationSubsystem Wire.TeamInvitationSubsystem.Error Wire.TeamInvitationSubsystem.Interpreter diff --git a/postgres-schema.sql b/postgres-schema.sql index a0f4b619b3..fe339796c8 100644 --- a/postgres-schema.sql +++ b/postgres-schema.sql @@ -242,6 +242,21 @@ CREATE TABLE public.subconversation ( ALTER TABLE public.subconversation OWNER TO "wire-server"; +-- +-- Name: team_features; Type: TABLE; Schema: public; Owner: wire-server +-- + +CREATE TABLE public.team_features ( + team uuid NOT NULL, + feature text NOT NULL, + config jsonb, + lock_status integer, + status integer +); + + +ALTER TABLE public.team_features OWNER TO "wire-server"; + -- -- Name: user_group; Type: TABLE; Schema: public; Owner: wire-server -- @@ -369,6 +384,14 @@ ALTER TABLE ONLY public.subconversation ADD CONSTRAINT subconversation_pkey PRIMARY KEY (conv_id, subconv_id); +-- +-- Name: team_features team_features_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server +-- + +ALTER TABLE ONLY public.team_features + ADD CONSTRAINT team_features_pkey PRIMARY KEY (team, feature); + + -- -- Name: user_group_channel user_group_channel_pkey; Type: CONSTRAINT; Schema: public; Owner: wire-server -- diff --git a/services/background-worker/background-worker.integration.yaml b/services/background-worker/background-worker.integration.yaml index db49fb502d..e85a06bb7e 100644 --- a/services/background-worker/background-worker.integration.yaml +++ b/services/background-worker/background-worker.integration.yaml @@ -79,3 +79,4 @@ backgroundJobs: postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql diff --git a/services/background-worker/src/Wire/BackgroundWorker/Env.hs b/services/background-worker/src/Wire/BackgroundWorker/Env.hs index 11787f105c..bccd415be1 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Env.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Env.hs @@ -44,7 +44,7 @@ import System.Logger.Class (Logger, MonadLogger (..)) import System.Logger.Extended qualified as Log import Util.Options import Wire.BackgroundWorker.Options -import Wire.ConversationStore (PostgresMigrationOpts) +import Wire.PostgresMigrationOpts type IsWorking = Bool diff --git a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs index 1c9b3416bd..b537e1b6c2 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Jobs/Registry.hs @@ -40,7 +40,6 @@ import Wire.BackgroundJobsRunner (runJob) import Wire.BackgroundJobsRunner.Interpreter hiding (runJob) import Wire.BackgroundWorker.Env (AppT, Env (..)) import Wire.BrigAPIAccess.Rpc -import Wire.ConversationStore import Wire.ConversationStore.Cassandra import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres) import Wire.ConversationSubsystem.Interpreter (interpretConversationSubsystem) @@ -49,6 +48,7 @@ import Wire.FireAndForget (interpretFireAndForget) import Wire.GundeckAPIAccess import Wire.NotificationSubsystem.Interpreter import Wire.ParseException +import Wire.PostgresMigrationOpts import Wire.Rpc import Wire.Sem.Delay (runDelay) import Wire.Sem.Logger (mapLogger) diff --git a/services/background-worker/src/Wire/BackgroundWorker/Options.hs b/services/background-worker/src/Wire/BackgroundWorker/Options.hs index 6dc18f03a2..ccb340dda7 100644 --- a/services/background-worker/src/Wire/BackgroundWorker/Options.hs +++ b/services/background-worker/src/Wire/BackgroundWorker/Options.hs @@ -27,8 +27,8 @@ import Imports import Network.AMQP.Extended import System.Logger.Extended import Util.Options -import Wire.ConversationStore (PostgresMigrationOpts) -import Wire.Migration (MigrationOptions) +import Wire.Migration +import Wire.PostgresMigrationOpts data Opts = Opts { logLevel :: !Level, diff --git a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs index e10ab43123..01d658122e 100644 --- a/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs +++ b/services/background-worker/test/Test/Wire/BackendNotificationPusherSpec.hs @@ -69,7 +69,7 @@ import Wire.BackendNotificationPusher import Wire.BackgroundWorker.Env import Wire.BackgroundWorker.Options import Wire.BackgroundWorker.Util -import Wire.ConversationStore +import Wire.PostgresMigrationOpts spec :: Spec spec = do @@ -364,7 +364,8 @@ spec = do postgresMigration = PostgresMigrationOpts { conversation = CassandraStorage, - conversationCodes = CassandraStorage + conversationCodes = CassandraStorage, + teamFeatures = CassandraStorage } gundeckEndpoint = undefined brigEndpoint = undefined @@ -402,7 +403,8 @@ spec = do postgresMigration = PostgresMigrationOpts { conversation = CassandraStorage, - conversationCodes = CassandraStorage + conversationCodes = CassandraStorage, + teamFeatures = CassandraStorage } gundeckEndpoint = undefined brigEndpoint = undefined diff --git a/services/background-worker/test/Test/Wire/Util.hs b/services/background-worker/test/Test/Wire/Util.hs index cdb020a222..ef2d18321c 100644 --- a/services/background-worker/test/Test/Wire/Util.hs +++ b/services/background-worker/test/Test/Wire/Util.hs @@ -30,7 +30,7 @@ import Util.Options (Endpoint (..)) import Wire.BackgroundWorker.Env hiding (federatorInternal) import Wire.BackgroundWorker.Env qualified as E import Wire.BackgroundWorker.Options -import Wire.ConversationStore +import Wire.PostgresMigrationOpts testEnv :: IO Env testEnv = do @@ -42,7 +42,8 @@ testEnv = do postgresMigration = PostgresMigrationOpts { conversation = CassandraStorage, - conversationCodes = CassandraStorage + conversationCodes = CassandraStorage, + teamFeatures = CassandraStorage } statuses <- newIORef mempty backendNotificationMetrics <- mkBackendNotificationMetrics diff --git a/services/galley/galley.integration.yaml b/services/galley/galley.integration.yaml index e2106c63e6..1f19ccbee0 100644 --- a/services/galley/galley.integration.yaml +++ b/services/galley/galley.integration.yaml @@ -248,3 +248,4 @@ journal: # if set, journals; if not set, disables journaling postgresMigration: conversation: postgresql conversationCodes: postgresql + teamFeatures: postgresql diff --git a/services/galley/src/Galley/API/LegalHold.hs b/services/galley/src/Galley/API/LegalHold.hs index 2aa4a48088..6faadb492e 100644 --- a/services/galley/src/Galley/API/LegalHold.hs +++ b/services/galley/src/Galley/API/LegalHold.hs @@ -80,6 +80,7 @@ import Wire.BrigAPIAccess import Wire.ConversationStore import Wire.ConversationSubsystem import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemConfig) +import Wire.FeaturesConfigSubsystem import Wire.FireAndForget import Wire.LegalHoldStore qualified as LegalHoldData import Wire.NotificationSubsystem @@ -101,10 +102,10 @@ createSettings :: Member (ErrorS 'LegalHoldServiceInvalidKey) r, Member (ErrorS 'LegalHoldServiceBadResponse) r, Member LegalHoldStore r, - Member TeamFeatureStore r, Member P.TinyLog r, Member (Input (FeatureDefaults LegalholdConfig)) r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member FeaturesConfigSubsystem r ) => Local UserId -> TeamId -> @@ -131,9 +132,9 @@ getSettings :: forall r. ( Member (ErrorS 'NotATeamMember) r, Member LegalHoldStore r, - Member TeamFeatureStore r, Member (Input (FeatureDefaults LegalholdConfig)) r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member FeaturesConfigSubsystem r ) => Local UserId -> TeamId -> @@ -177,7 +178,6 @@ removeSettingsInternalPaging :: Member ProposalStore r, Member P.TinyLog r, Member Random r, - Member TeamFeatureStore r, Member (TeamMemberStore InternalPaging) r, Member TeamStore r, Member (Embed IO) r, @@ -185,7 +185,8 @@ removeSettingsInternalPaging :: Member MLSCommitLockStore r, Member (Input (FeatureDefaults LegalholdConfig)) r, Member TeamSubsystem r, - Member (Input ConversationSubsystemConfig) r + Member (Input ConversationSubsystemConfig) r, + Member FeaturesConfigSubsystem r ) => Local UserId -> TeamId -> @@ -197,7 +198,6 @@ removeSettings :: forall p r. ( Paging p, Bounded (PagingBounds p TeamMember), - Member TeamFeatureStore r, Member (TeamMemberStore p) r, Member TeamStore r, Member BackendNotificationQueueAccess r, @@ -231,7 +231,8 @@ removeSettings :: Member MLSCommitLockStore r, Member (Input (FeatureDefaults LegalholdConfig)) r, Member TeamSubsystem r, - Member (Input ConversationSubsystemConfig) r + Member (Input ConversationSubsystemConfig) r, + Member FeaturesConfigSubsystem r ) => UserId -> TeamId -> @@ -388,14 +389,14 @@ requestDevice :: Member ProposalStore r, Member P.TinyLog r, Member Random r, - Member TeamFeatureStore r, Member TeamStore r, Member (Embed IO) r, Member TeamCollaboratorsSubsystem r, Member MLSCommitLockStore r, Member (Input (FeatureDefaults LegalholdConfig)) r, Member TeamSubsystem r, - Member (Input ConversationSubsystemConfig) r + Member (Input ConversationSubsystemConfig) r, + Member FeaturesConfigSubsystem r ) => Local UserId -> TeamId -> @@ -485,14 +486,14 @@ approveDevice :: Member ProposalStore r, Member P.TinyLog r, Member Random r, - Member TeamFeatureStore r, Member TeamStore r, Member (Embed IO) r, Member TeamCollaboratorsSubsystem r, Member MLSCommitLockStore r, Member (Input (FeatureDefaults LegalholdConfig)) r, Member TeamSubsystem r, - Member (Input ConversationSubsystemConfig) r + Member (Input ConversationSubsystemConfig) r, + Member FeaturesConfigSubsystem r ) => Local UserId -> ConnId -> diff --git a/services/galley/src/Galley/API/LegalHold/Team.hs b/services/galley/src/Galley/API/LegalHold/Team.hs index 977fcfb291..92f1eac3fb 100644 --- a/services/galley/src/Galley/API/LegalHold/Team.hs +++ b/services/galley/src/Galley/API/LegalHold/Team.hs @@ -37,15 +37,15 @@ import Wire.API.Error.Galley import Wire.API.Team.Feature import Wire.API.Team.Size import Wire.BrigAPIAccess +import Wire.FeaturesConfigSubsystem (FeaturesConfigSubsystem, getDbFeatureRawInternal) import Wire.LegalHold -import Wire.TeamFeatureStore assertLegalHoldEnabledForTeam :: forall r. ( Member LegalHoldStore r, - Member TeamFeatureStore r, Member (Input (FeatureDefaults LegalholdConfig)) r, - Member (ErrorS 'LegalHoldNotEnabled) r + Member (ErrorS 'LegalHoldNotEnabled) r, + Member FeaturesConfigSubsystem r ) => TeamId -> Sem r () @@ -56,13 +56,13 @@ assertLegalHoldEnabledForTeam tid = isLegalHoldEnabledForTeam :: forall r. ( Member LegalHoldStore r, - Member TeamFeatureStore r, + Member FeaturesConfigSubsystem r, Member (Input (FeatureDefaults LegalholdConfig)) r ) => TeamId -> Sem r Bool isLegalHoldEnabledForTeam tid = do - dbFeature <- getDbFeature tid + dbFeature <- getDbFeatureRawInternal tid status <- computeLegalHoldFeatureStatus tid dbFeature pure $ status == FeatureStatusEnabled diff --git a/services/galley/src/Galley/API/Teams.hs b/services/galley/src/Galley/API/Teams.hs index 2ac4938634..32539143f4 100644 --- a/services/galley/src/Galley/API/Teams.hs +++ b/services/galley/src/Galley/API/Teams.hs @@ -526,13 +526,13 @@ addTeamMember :: Member (Input Opts) r, Member Now r, Member LegalHoldStore r, - Member TeamFeatureStore r, Member TeamNotificationStore r, Member TeamStore r, Member P.TinyLog r, Member (Input FanoutLimit) r, Member (Input (FeatureDefaults LegalholdConfig)) r, - Member TeamSubsystem r + Member TeamSubsystem r, + Member FeaturesConfigSubsystem r ) => Local UserId -> ConnId -> @@ -570,12 +570,12 @@ uncheckedAddTeamMember :: Member Now r, Member LegalHoldStore r, Member P.TinyLog r, - Member TeamFeatureStore r, Member TeamNotificationStore r, Member TeamStore r, Member (Input FanoutLimit) r, Member (Input (FeatureDefaults LegalholdConfig)) r, - Member TeamJournal r + Member TeamJournal r, + Member FeaturesConfigSubsystem r ) => TeamId -> NewTeamMember -> @@ -1115,10 +1115,10 @@ ensureNotTooLarge tid = do ensureNotTooLargeForLegalHold :: forall r. ( Member LegalHoldStore r, - Member TeamFeatureStore r, Member (ErrorS 'TooManyTeamMembersOnTeamWithLegalhold) r, Member (Input FanoutLimit) r, - Member (Input (FeatureDefaults LegalholdConfig)) r + Member (Input (FeatureDefaults LegalholdConfig)) r, + Member FeaturesConfigSubsystem r ) => TeamId -> Int -> @@ -1204,10 +1204,10 @@ canUserJoinTeam :: forall r. ( Member BrigAPIAccess r, Member LegalHoldStore r, - Member TeamFeatureStore r, Member (ErrorS 'TooManyTeamMembersOnTeamWithLegalhold) r, Member (Input FanoutLimit) r, - Member (Input (FeatureDefaults LegalholdConfig)) r + Member (Input (FeatureDefaults LegalholdConfig)) r, + Member FeaturesConfigSubsystem r ) => TeamId -> Sem r () diff --git a/services/galley/src/Galley/API/Teams/Features.hs b/services/galley/src/Galley/API/Teams/Features.hs index 4c9bf4b979..d16588f9d2 100644 --- a/services/galley/src/Galley/API/Teams/Features.hs +++ b/services/galley/src/Galley/API/Teams/Features.hs @@ -71,7 +71,7 @@ import Wire.CodeStore import Wire.ConversationStore (MLSCommitLockStore) import Wire.ConversationSubsystem import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemConfig) -import Wire.FeaturesConfigSubsystem (FeaturesConfigSubsystem) +import Wire.FeaturesConfigSubsystem import Wire.FeaturesConfigSubsystem.Types (GetFeatureConfigEffects) import Wire.FeaturesConfigSubsystem.Utils (resolveServerFeature) import Wire.NotificationSubsystem @@ -104,8 +104,8 @@ patchFeatureInternal :: Sem r (LockableFeature cfg) patchFeatureInternal tid patch = do assertTeamExists tid - dbFeature <- getDbFeature tid - (defFeature :: LockableFeature cfg) <- resolveServerFeature + dbFeature <- getDbFeatureRawInternal tid + defFeature :: LockableFeature cfg <- resolveServerFeature let dbFeatureWithDefaults = dbFeature.applyDbFeature defFeature let patchedFeature = applyPatch dbFeatureWithDefaults prepareFeature tid patchedFeature diff --git a/services/galley/src/Galley/API/Teams/Features/Get.hs b/services/galley/src/Galley/API/Teams/Features/Get.hs index ece1922543..0c2dd6ddd3 100644 --- a/services/galley/src/Galley/API/Teams/Features/Get.hs +++ b/services/galley/src/Galley/API/Teams/Features/Get.hs @@ -50,11 +50,11 @@ import Wire.API.Team.Feature import Wire.ConversationStore as ConversationStore import Wire.FeaturesConfigSubsystem import Wire.FeaturesConfigSubsystem.Types -import Wire.TeamFeatureStore import Wire.TeamStore qualified as TeamStore import Wire.TeamSubsystem (TeamSubsystem) import Wire.TeamSubsystem qualified as TeamSubsystem +-- FUTUREWORK: everything in this module should be moved to the FeatureConfigSubsystem data DoAuth = DoAuth UserId | DontDoAuth getFeatureInternal :: @@ -90,29 +90,15 @@ getTeamAndCheckMembership uid = do getAllTeamFeatures :: forall r. - ( Member TeamFeatureStore r, - Member FeaturesConfigSubsystem r, - GetFeatureConfigEffects r - ) => + (Member FeaturesConfigSubsystem r) => TeamId -> Sem r AllTeamFeatures -getAllTeamFeatures tid = do - features <- getAllDbFeatures tid - defFeatures <- getAllTeamFeaturesForServer - hsequence' $ hcliftA2 (Proxy @(GetAllFeaturesForServerConstraints r)) compute defFeatures features - where - compute :: - (GetFeatureConfig p) => - LockableFeature p -> - DbFeature p -> - (Sem r :.: LockableFeature) p - compute defFeature feat = Comp $ computeFeature tid defFeature feat +getAllTeamFeatures tid = getAllTeamFeaturesForTeam tid getAllTeamFeaturesForUser :: forall r. ( Member (ErrorS 'NotATeamMember) r, Member (ErrorS 'TeamNotFound) r, - Member TeamFeatureStore r, Member TeamStore r, Member TeamSubsystem r, Member FeaturesConfigSubsystem r, diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index 2035a64c1c..3aeb7f4654 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -115,7 +115,7 @@ import Wire.ConversationSubsystem.Interpreter (ConversationSubsystemConfig (..), import Wire.Error import Wire.ExternalAccess.External import Wire.FeaturesConfigSubsystem -import Wire.FeaturesConfigSubsystem.Interpreter (runFeaturesConfigSubsystem) +import Wire.FeaturesConfigSubsystem.Interpreter import Wire.FeaturesConfigSubsystem.Types (ExposeInvitationURLsAllowlist (..)) import Wire.FederationAPIAccess.Interpreter import Wire.FireAndForget @@ -123,6 +123,7 @@ import Wire.GundeckAPIAccess (runGundeckAPIAccess) import Wire.HashPassword.Interpreter import Wire.LegalHoldStore.Cassandra (interpretLegalHoldStoreToCassandra) import Wire.LegalHoldStore.Env (LegalHoldEnv (..)) +import Wire.MigrationLock import Wire.NotificationSubsystem.Interpreter (runNotificationSubsystemGundeck) import Wire.ParseException import Wire.ProposalStore.Cassandra @@ -139,6 +140,8 @@ import Wire.SparAPIAccess.Rpc import Wire.TeamCollaboratorsStore.Postgres (interpretTeamCollaboratorsStoreToPostgres) import Wire.TeamCollaboratorsSubsystem.Interpreter import Wire.TeamFeatureStore.Cassandra +import Wire.TeamFeatureStore.Migrating +import Wire.TeamFeatureStore.Postgres import Wire.TeamJournal.Aws import Wire.TeamStore.Cassandra (interpretTeamStoreToCassandra) import Wire.TeamSubsystem.Interpreter @@ -150,6 +153,7 @@ type GalleyEffects0 = Input Hasql.Pool, Input Env, Input ConversationSubsystemConfig, + Error MigrationLockError, Error TeamFeatureStoreError, Error MigrationError, Error InvalidInput, @@ -298,6 +302,11 @@ evalGalley e = CassandraStorage -> interpretCodeStoreToCassandra MigrationToPostgresql -> interpretCodeStoreToCassandraAndPostgres PostgresqlStorage -> interpretCodeStoreToPostgres + teamFeatureStoreInterpreter = + case (e ^. options . postgresMigration).teamFeatures of + CassandraStorage -> interpretTeamFeatureStoreToCassandra + MigrationToPostgresql -> interpretTeamFeatureStoreToCassandraAndPostgres + PostgresqlStorage -> interpretTeamFeatureStoreToPostgres localUnit = toLocalUnsafe (e ^. options . settings . federationDomain) () teamSubsystemConfig = TeamSubsystemConfig @@ -348,6 +357,7 @@ evalGalley e = . mapError toResponse . logAndMapError toResponse (Text.pack . show) "migration error" . mapError mapTeamFeatureStoreError + . mapError toResponse . runInputConst conversationSubsystemConfig . runInputConst e . runInputConst (e ^. hasqlPool) @@ -369,7 +379,7 @@ evalGalley e = . interpretTeamListToCassandra . interpretTeamMemberStoreToCassandraWithPaging lh . interpretTeamMemberStoreToCassandra lh - . interpretTeamFeatureStoreToCassandra + . teamFeatureStoreInterpreter . interpretMLSCommitLockStoreToCassandra (e ^. cstate) . convStoreInterpreter . interpretTeamNotificationStoreToCassandra diff --git a/services/galley/src/Galley/Options.hs b/services/galley/src/Galley/Options.hs index a435707a7e..0ccfa71b9a 100644 --- a/services/galley/src/Galley/Options.hs +++ b/services/galley/src/Galley/Options.hs @@ -85,7 +85,7 @@ import Util.Options.Common import Wire.API.Conversation.Protocol import Wire.API.Routes.Version import Wire.API.Team.Member -import Wire.ConversationStore +import Wire.PostgresMigrationOpts import Wire.RateLimit.Interpreter (RateLimitConfig) newtype GuestLinkTTLSeconds = GuestLinkTTLSeconds