Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/5-internal/WPB-22959
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Generalized the migration lock for better reuse
- Move logic from `TeamFeatureStore` interpreter to `FeatureConfigSubsystem`
(#4982, #4983)
1 change: 1 addition & 0 deletions charts/background-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ config:
postgresMigration:
conversation: cassandra
conversationCodes: cassandra
teamFeatures: cassandra

secrets:
{}
Expand Down
1 change: 1 addition & 0 deletions charts/galley/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ config:
postgresMigration:
conversation: cassandra
conversationCodes: cassandra
teamFeatures: cassandra
settings:
httpPoolSize: 128
maxTeamSize: 10000
Expand Down
12 changes: 10 additions & 2 deletions docs/src/developer/reference/config-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -1812,11 +1812,13 @@ galley:
postgresMigration:
conversation: postgresql
conversationCodes: postgresql
teamFeatures: postgresql
background-worker:
config:
postgresMigration:
conversation: postgresql
conversationCodes: postgresql
teamFeatures: postgresql
migrateConversations: false
```

Expand Down Expand Up @@ -1847,11 +1849,13 @@ pattern below applies per store. Use it for `conversation` and
postgresMigration:
conversation: migration-to-postgresql
conversationCodes: migration-to-postgresql
teamFeatures: migration-to-postgresql
background-worker:
config:
postgresMigration:
conversation: migration-to-postgresql
conversationCodes: migration-to-postgresql
teamFeatures: migration-to-postgresql
migrateConversations: false
migrateConversationCodes: false
```
Expand Down Expand Up @@ -1882,11 +1886,13 @@ pattern below applies per store. Use it for `conversation` and
postgresMigration:
conversation: postgresql
conversationCodes: postgresql
teamFeatures: postgresql
background-worker:
config:
postgresMigration:
conversation: postgresql
conversationCodes: postgresql
teamFeatures: postgresql
migrateConversations: false
migrateConversationCodes: false
```
Expand Down Expand Up @@ -1956,6 +1962,8 @@ postgresqlPool:
postgresMigration:
# Valid: cassandra | migration-to-postgresql | postgresql
conversation: postgresql
conversationCodes: postgresql
teamFeatures: postgresql

# Start the migration worker when true
migrateConversations: false
Expand All @@ -1978,7 +1986,7 @@ Notes

- `postgresql` values follow libpq keywords; password is sourced via `secrets.pgPassword`.
- RabbitMQ admin fields (`adminHost`, `adminPort`) are templated only when `config.enableFederation` is true.
- `postgresMigration.conversation` must match `galley.config.postgresMigration.conversation` during migration phases.
- `migrateConversations: true` triggers the migration job; leave it `false` for new installs and after migration.
- `postgresMigration.<store>` must match between `galley` and `background-worker` during migration phases.
- `migrateConversations: true` triggers the conversation migration job; leave it `false` for new installs and after migration.
- `concurrency`, `jobTimeout`, and `maxAttempts` control parallelism and retry behavior of the consumer.
- `brig` and `gundeck` endpoints default to in-cluster services; override via `background-worker.config.brig` and `.gundeck` if your service DNS/ports differ.
1 change: 1 addition & 0 deletions hack/helm_vars/common.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dynBackendDomain3: dynamic-backend-3.{{ requiredEnv "NAMESPACE_1" }}.svc.cluster
{{- $preferredStore := default "cassandra" (env "PREFERRED_STORE") }}
conversationStore: {{ $preferredStore }}
conversationCodesStore: {{ $preferredStore }}
teamFeaturesStore: {{ $preferredStore }}

{{- if (eq (env "UPLOAD_XML_S3_BASE_URL") "") }}
uploadXml: {}
Expand Down
2 changes: 2 additions & 0 deletions hack/helm_vars/wire-server/values.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ galley:
postgresMigration:
conversation: {{ .Values.conversationStore }}
conversationCodes: {{ .Values.conversationCodesStore }}
teamFeatures: {{ .Values.teamFeaturesStore }}
settings:
maxConvAndTeamSize: 16
maxTeamSize: 32
Expand Down Expand Up @@ -675,6 +676,7 @@ background-worker:
postgresMigration:
conversation: {{ .Values.conversationStore }}
conversationCodes: {{ .Values.conversationCodesStore }}
teamFeatures: {{ .Values.teamFeaturesStore }}
rabbitmq:
port: 5671
adminPort: 15671
Expand Down
26 changes: 26 additions & 0 deletions libs/wire-api/src/Wire/API/Team/Feature.hs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ import Test.QuickCheck.Gen (suchThat)
import URI.ByteString.QQ qualified as URI.QQ
import Wire.API.Conversation.Protocol
import Wire.API.MLS.CipherSuite
import Wire.API.PostgresMarshall
import Wire.API.Routes.Named hiding (unnamed)
import Wire.API.Routes.Version
import Wire.API.Routes.Versioned
Expand Down Expand Up @@ -325,8 +326,15 @@ resolveDbFeature defFeature dbFeature =
LockStatusUnlocked -> feat

newtype DbConfig = DbConfig {unDbConfig :: A.Value}
deriving newtype (Arbitrary)
deriving (Eq, Show)

instance PostgresMarshall A.Value DbConfig where
postgresMarshall = unDbConfig

instance PostgresUnmarshall A.Value DbConfig where
postgresUnmarshall = Right . DbConfig

instance Default DbConfig where
def = DbConfig (A.object [])

Expand Down Expand Up @@ -629,6 +637,15 @@ instance Cass.Cql LockStatus where
toCql LockStatusLocked = Cass.CqlInt 0
toCql LockStatusUnlocked = Cass.CqlInt 1

instance PostgresMarshall Int32 LockStatus where
postgresMarshall LockStatusLocked = 0
postgresMarshall LockStatusUnlocked = 1

instance PostgresUnmarshall Int32 LockStatus where
postgresUnmarshall 0 = Right LockStatusLocked
postgresUnmarshall 1 = Right LockStatusUnlocked
postgresUnmarshall _ = Left "invalid lockStatus"

newtype LockStatusResponse = LockStatusResponse {_unlockStatus :: LockStatus}
deriving stock (Eq, Show, Generic)
deriving (Arbitrary) via (GenericUniform LockStatus)
Expand Down Expand Up @@ -2172,6 +2189,15 @@ instance Cass.Cql FeatureStatus where
toCql FeatureStatusDisabled = Cass.CqlInt 0
toCql FeatureStatusEnabled = Cass.CqlInt 1

instance PostgresMarshall Int32 FeatureStatus where
postgresMarshall FeatureStatusEnabled = 1
postgresMarshall FeatureStatusDisabled = 0

instance PostgresUnmarshall Int32 FeatureStatus where
postgresUnmarshall 1 = Right FeatureStatusEnabled
postgresUnmarshall 0 = Right FeatureStatusDisabled
postgresUnmarshall _ = Left "invalid feature status"

-- | list of available features config types
type Features :: [Type]
type Features =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module Test.Wire.API.Roundtrip.PostgresMarshall (tests) where

import Crypto.Error (CryptoFailable (..))
import Crypto.KDF.Argon2 qualified as Argon2
import Data.Aeson as A
import Data.ByteString.Char8 qualified as BS8
import Data.Code qualified as Code
import Data.Misc (PlainTextPassword8, fromPlainTextPassword)
Expand All @@ -32,14 +33,18 @@ import Wire.API.Password as Password
import Wire.API.Password.Argon2id (Argon2HashedPassword (..), encodeArgon2HashedPassword)
import Wire.API.Password.Scrypt (encodeScryptPassword)
import Wire.API.PostgresMarshall
import Wire.API.Team.Feature
import Wire.Arbitrary qualified as Arbitrary ()

tests :: T.TestTree
tests =
T.localOption (T.Timeout (60 * 1000000) "60s") . T.testGroup "PostgresMarshall roundtrip tests" $
[ testRoundTrip @Text @Code.Key,
testRoundTrip @Text @Code.Value,
testRoundTrip @ByteString @Password.Password
testRoundTrip @ByteString @Password.Password,
testRoundTrip @Int32 @FeatureStatus,
testRoundTrip @Int32 @LockStatus,
testRoundTrip @A.Value @DbConfig
]

testRoundTrip ::
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE team_features (
team uuid NOT NULL,
feature text NOT NULL,
config jsonb,
lock_status int,
status int,
PRIMARY KEY (team, feature)
);
3 changes: 1 addition & 2 deletions libs/wire-subsystems/src/Wire/CodeStore/DualWrite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import Wire.CodeStore.Cassandra qualified as Cassandra
import Wire.CodeStore.Postgres qualified as Postgres
import Wire.Postgres (PGConstraints)

-- | Cassandra is the sourceof truth during migration; writes are mirrored to Postgres.
interpretCodeStoreToCassandraAndPostgres ::
( Member (Input ClientState) r,
Member (Input (Either HttpsUrl (Map Text HttpsUrl))) r,
PGConstraints r
) =>
Sem (CodeStore ': r) a ->
Sem r a

-- | Cassandra is the source of truth during migration; writes are mirrored to Postgres.
interpretCodeStoreToCassandraAndPostgres = interpret $ \case
GetCode k -> do
Cassandra.interpretCodeStoreToCassandra $ CodeStore.getCode k
Expand Down
33 changes: 0 additions & 33 deletions libs/wire-subsystems/src/Wire/ConversationStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
module Wire.ConversationStore where

import Control.Error (lastMay)
import Data.Aeson
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.Id
import Data.Misc
import Data.Qualified
import Data.Range
import Data.Text qualified as Text
import Data.Time.Clock
import Imports
import Polysemy
Expand Down Expand Up @@ -204,37 +202,6 @@ getConversationIds lusr maxIds pagingState = do
}
}

data StorageLocation
= -- | Use when solely using Cassandra
CassandraStorage
| -- | Use while migration to postgresql. Using this option does not trigger
-- the migration. Newly created conversations are stored in Postgresql.
-- Once this has been turned on, it MUST NOT be made CassandraStorage ever
-- again.
MigrationToPostgresql
| -- | Use after migrating to postgresql
PostgresqlStorage
deriving (Show)

instance FromJSON StorageLocation where
parseJSON = withText "StorageLocation" $ \case
"cassandra" -> pure CassandraStorage
"migration-to-postgresql" -> pure MigrationToPostgresql
"postgresql" -> pure PostgresqlStorage
x -> fail $ "Invalid storage location: " <> Text.unpack x <> ". Valid options: cassandra, postgresql, migration-to-postgresql"

data PostgresMigrationOpts = PostgresMigrationOpts
{ conversation :: StorageLocation,
conversationCodes :: StorageLocation
}
deriving (Show)

instance FromJSON PostgresMigrationOpts where
parseJSON = withObject "PostgresMigrationOpts" $ \o ->
PostgresMigrationOpts
<$> o .: "conversation"
<*> o .: "conversationCodes"

getConvOrSubGroupInfo ::
(Member ConversationStore r) =>
ConvOrSubConvId ->
Expand Down
51 changes: 37 additions & 14 deletions libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ import Wire.ConversationStore.Cassandra.Queries qualified as Cql
import Wire.ConversationStore.Cassandra.Queries qualified as Queries
import Wire.ConversationStore.MLS.Types
import Wire.ConversationStore.Migration.Cleanup
import Wire.ConversationStore.MigrationLock
import Wire.ConversationStore.Postgres (interpretConversationStoreToPostgres)
import Wire.MigrationLock
import Wire.Postgres
import Wire.Sem.Paging.Cassandra
import Wire.StoredConversation
Expand Down Expand Up @@ -1094,7 +1094,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case
True -> interpretConversationStoreToPostgres $ ConvStore.getConversationEpoch cid
GetConversations cids -> do
logEffect "ConversationStore.GetConversations"
withMigrationLocksAndCleanup client LockShared (Seconds 2) (Left <$> cids) $ do
withMigrationLocksAndConvCleanup client LockShared (Seconds 2) cids $ do
let indexByConvId = foldr (\storedConv -> Map.insert storedConv.id_ storedConv) Map.empty
cassConvs <- indexByConvId <$> localConversations client cids
pgConvs <- indexByConvId <$> interpretConversationStoreToPostgres (ConvStore.getConversations cids)
Expand Down Expand Up @@ -1163,7 +1163,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case
True -> interpretConversationStoreToPostgres (ConvStore.isConversationAlive cid)
SelectConversations uid cids -> do
logEffect "ConversationStore.SelectConversations"
withMigrationLocksAndCleanup client LockShared (Seconds 2) (Left <$> cids) $ do
withMigrationLocksAndConvCleanup client LockShared (Seconds 2) cids $ do
cassConvs <- embedClient client $ localConversationIdsOf uid cids
pgConvs <- interpretConversationStoreToPostgres $ ConvStore.selectConversations uid cids
pure $ List.nubOrd (pgConvs <> cassConvs)
Expand Down Expand Up @@ -1287,7 +1287,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case
logEffect "ConversationStore.CreateMembersInRemoteConversation"

-- Save users joining their first remote conv in postgres
withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do
withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do
filterUsersInPostgres uids >>= \pgUids -> do
let -- These are not in Postgres, but that doesn't mean they're in
-- cassandra
Expand Down Expand Up @@ -1334,7 +1334,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case
True -> interpretConversationStoreToPostgres $ ConvStore.checkLocalMemberRemoteConv uid rcnv
SelectRemoteMembers uids rcnv -> do
logEffect "ConversationStore.SelectRemoteMembers"
withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do
withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do
filterUsersInPostgres uids >>= \pgUids -> do
(pgUsers, _) <- interpretConversationStoreToPostgres $ ConvStore.selectRemoteMembers pgUids rcnv
(cassUsers, _) <- embedClient client $ filterRemoteConvMembers uids rcnv
Expand Down Expand Up @@ -1369,7 +1369,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case
interpretConversationStoreToPostgres $ ConvStore.deleteMembers cid ul
DeleteMembersInRemoteConversation rcnv uids -> do
logEffect "ConversationStore.DeleteMembersInRemoteConversation"
withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do
withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do
-- No need to check where these are, we just delete them from both places
embedClient client $ removeLocalMembersFromRemoteConv rcnv uids
interpretConversationStoreToPostgres $ ConvStore.deleteMembersInRemoteConversation rcnv uids
Expand Down Expand Up @@ -1487,7 +1487,7 @@ interpretConversationStoreToCassandraAndPostgres client = interpret $ \case
True -> interpretConversationStoreToPostgres $ ConvStore.isConversationOutOfSync convId
HaveRemoteConvs uids -> do
logEffect "ConversationStore.DeleteSubConversation"
withMigrationLocksAndCleanup client LockShared (Seconds 2) (Right <$> uids) $ do
withMigrationLocksAndUserCleanup client LockShared (Seconds 2) uids $ do
remotesInCass <- embedClient client $ haveRemoteConvs uids
remotesInPG <- interpretConversationStoreToPostgres $ ConvStore.haveRemoteConvs uids
pure $ List.nubOrd (remotesInPG <> remotesInCass)
Expand Down Expand Up @@ -1532,10 +1532,12 @@ withMigrationLockAndCleanup ::
Either ConvId UserId ->
Sem (Error MigrationLockError : r) a ->
Sem r a
withMigrationLockAndCleanup cassClient ty key =
withMigrationLocksAndCleanup cassClient ty (MilliSeconds 500) [key]
withMigrationLockAndCleanup cassClient ty (Left convId) =
withMigrationLocksAndConvCleanup cassClient ty (MilliSeconds 500) [convId]
withMigrationLockAndCleanup cassClient ty (Right userId) =
withMigrationLocksAndUserCleanup cassClient ty (MilliSeconds 500) [userId]

withMigrationLocksAndCleanup ::
withMigrationLocksAndConvCleanup ::
( PGConstraints r,
Member Async r,
Member TinyLog r,
Expand All @@ -1546,12 +1548,33 @@ withMigrationLocksAndCleanup ::
ClientState ->
LockType ->
u ->
[Either ConvId UserId] ->
[ConvId] ->
Sem (Error MigrationLockError : r) a ->
Sem r a
withMigrationLocksAndConvCleanup cassClient lockType maxWait convIds action =
mapError FailedToAcquireMigrationLock . withMigrationLocks lockType maxWait convIds $ do
interpretConversationStoreToCassandra cassClient
. runInputConst cassClient
$ cleanupIfNecessary (Left <$> convIds)
action

withMigrationLocksAndUserCleanup ::
( PGConstraints r,
Member Async r,
Member TinyLog r,
Member Race r,
Member (Error MigrationError) r,
TimeUnit u
) =>
ClientState ->
LockType ->
u ->
[UserId] ->
Sem (Error MigrationLockError : r) a ->
Sem r a
withMigrationLocksAndCleanup cassClient lockType maxWait convOrUsers action =
mapError FailedToAcquireMigrationLock . withMigrationLocks lockType maxWait convOrUsers $ do
withMigrationLocksAndUserCleanup cassClient lockType maxWait userIds action =
mapError FailedToAcquireMigrationLock . withMigrationLocks lockType maxWait userIds $ do
interpretConversationStoreToCassandra cassClient
. runInputConst cassClient
$ cleanupIfNecessary convOrUsers
$ cleanupIfNecessary (Right <$> userIds)
action
Loading