diff --git a/Makefile b/Makefile index eda5ea8ba15..556e1056df4 100644 --- a/Makefile +++ b/Makefile @@ -338,7 +338,7 @@ cassandra-schema: db-migrate cassandra-schema-impl cassandra-schema-impl: ./hack/bin/cassandra_dump_schema > ./cassandra-schema.cql -.PHONY: postgres-reset postgres-schema-impl +.PHONY: postgres-schema postgres-schema: postgres-reset postgres-schema-impl .PHONY: postgres-schema-impl @@ -393,21 +393,43 @@ postgres-reset: c ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --reset --dbname dyn-2 ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --reset --dbname dyn-3 +.PHONY: postgres-migrate +postgres-migrate: c + ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname backendA + ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname backendB + ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname dyn-1 + ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname dyn-2 + ./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname dyn-3 + .PHONY: es-reset es-reset: c ./dist/brig-index reset \ --elasticsearch-index-prefix directory \ --elasticsearch-server https://localhost:9200 \ - --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ + --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null ./dist/brig-index reset \ --elasticsearch-index-prefix directory2 \ --elasticsearch-server https://localhost:9200 \ - --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ + --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null ./integration/scripts/integration-dynamic-backends-brig-index.sh \ --elasticsearch-server https://localhost:9200 \ - --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ + --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ + --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null + @echo -e "\n'brig-index reset' only deletes the index and regenerates the mapping, but doesn't generate or populate a new index, so you need to call 'make es-reindex explicitly now!\n" + +.PHONY: es-reindex +es-reindex: c + ./dist/brig-index reindex \ + --pg-pool-size 10 \ + --pg-pool-acquisition-timeout 10s \ + --pg-pool-aging-timeout 1d \ + --pg-pool-idleness-timeout 1h \ + --pg-settings '{"host":"127.0.0.1","port":"5432","user":"wire-server","dbname":"backendA"}' \ + --pg-password-file ./libs/wire-subsystems/test/resources/postgres-credentials.yaml \ + --elasticsearch-server https://localhost:9200 \ + --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null .PHONY: rabbitmq-reset @@ -416,7 +438,7 @@ rabbitmq-reset: rabbit-clean # Migrate all keyspaces and reset the ES index # Does not migrate postgres as brig does that on startup. .PHONY: db-migrate -db-migrate: c +db-migrate: c postgres-migrate ./dist/brig-schema --keyspace brig_test --replication-factor 1 > /dev/null ./dist/galley-schema --keyspace galley_test --replication-factor 1 > /dev/null ./dist/gundeck-schema --keyspace gundeck_test --replication-factor 1 > /dev/null @@ -426,20 +448,7 @@ db-migrate: c ./dist/gundeck-schema --keyspace gundeck_test2 --replication-factor 1 > /dev/null ./dist/spar-schema --keyspace spar_test2 --replication-factor 1 > /dev/null ./integration/scripts/integration-dynamic-backends-db-schemas.sh --replication-factor 1 > /dev/null - ./dist/brig-index reset \ - --elasticsearch-index-prefix directory \ - --elasticsearch-server https://localhost:9200 \ - --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ - --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null - ./dist/brig-index reset \ - --elasticsearch-index-prefix directory2 \ - --elasticsearch-server https://localhost:9200 \ - --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ - --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null - ./integration/scripts/integration-dynamic-backends-brig-index.sh \ - --elasticsearch-server https://localhost:9200 \ - --elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \ - --elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null + make es-reset ################################# ## dependencies diff --git a/changelog.d/0-release-notes/WPB-21366-refactor-brig-index-cli b/changelog.d/0-release-notes/WPB-21366-refactor-brig-index-cli new file mode 100644 index 00000000000..db52bee7752 --- /dev/null +++ b/changelog.d/0-release-notes/WPB-21366-refactor-brig-index-cli @@ -0,0 +1 @@ +Elasticsearch re-indexing requires postgres access now. If you run `brig-index` directly anywhere, make sure to add the relevant settings. The Elasticsearch index must be refilled from Cassandra in order for the changes to the search results to take effect. See https://docs.wire.com/latest/developer/reference/elastic-search.html?h=index#refill-es-documents-from-cassandra diff --git a/integration/test/Test/Apps.hs b/integration/test/Test/Apps.hs index 857db9c7bbc..8bb314a771f 100644 --- a/integration/test/Test/Apps.hs +++ b/integration/test/Test/Apps.hs @@ -101,19 +101,25 @@ testCreateApp = do void $ bindResponse (createApp owner tid new {category = "notinenum"}) $ \resp -> do resp.status `shouldMatchInt` 400 - let foundUserType exactMatchTerm aType = - searchContacts owner exactMatchTerm OwnDomain `bindResponse` \resp -> do + let foundUserType :: (HasCallStack) => Value -> String -> [String] -> App () + foundUserType searcher exactMatchTerm aTypes = + searchContacts searcher exactMatchTerm OwnDomain `bindResponse` \resp -> do resp.status `shouldMatchInt` 200 - foundDoc <- resp.json %. "documents" >>= asList >>= assertOne - foundDoc %. "type" `shouldMatch` aType + foundDoc <- resp.json %. "documents" >>= asList + (%. "type") `mapM` foundDoc `shouldMatch` aTypes -- App's user is findable from /search/contacts BrigI.refreshIndex domain - foundUserType new.name "app" + foundUserType owner new.name ["app"] + foundUserType regularMember new.name ["app"] + + -- App's user is *not* findable from other team. + BrigI.refreshIndex domain + foundUserType owner2 new.name [] -- Regular members still have the type "regular" memberName <- regularMember %. "name" & asString - foundUserType memberName "regular" + foundUserType owner memberName ["regular"] testRefreshAppCookie :: (HasCallStack) => App () testRefreshAppCookie = do diff --git a/libs/types-common/src/Data/Misc.hs b/libs/types-common/src/Data/Misc.hs index 4fa162f3bf3..0bbc6d01ad4 100644 --- a/libs/types-common/src/Data/Misc.hs +++ b/libs/types-common/src/Data/Misc.hs @@ -43,6 +43,7 @@ module Data.Misc Duration (..), diffTimeParser, parseDuration, + unsafeParseDuration, durationToMicros, -- * HttpsUrl @@ -284,6 +285,13 @@ diffTimeParser = do parseDuration :: Text -> Either String Duration parseDuration = fmap Duration . Atto.parseOnly (diffTimeParser <* Atto.endOfInput) +unsafeParseDuration :: Text -> Duration +unsafeParseDuration txt = + either + (\err -> error $ "Malformed duration: " <> show txt <> " " <> err) + id + (parseDuration txt) + -- | Useful for threadDelay, timeout, etc. durationToMicros :: Duration -> Int durationToMicros = diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs index fa2a714db92..47985da588c 100644 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs +++ b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs @@ -34,6 +34,7 @@ data IndexedUserStoreBulk m a where -- | Overwrite all users in the ES index, use it when trying to fix some -- inconsistency or while introducing a new field in the mapping. ForceSyncAllUsers :: IndexedUserStoreBulk m () + -- | Run `ForceSyncAllUsers` iff the index version is out of date. MigrateData :: IndexedUserStoreBulk m () makeSem ''IndexedUserStoreBulk diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs index e5585735bdd..bbc3f68bd52 100644 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs +++ b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs @@ -20,6 +20,7 @@ module Wire.IndexedUserStore.Bulk.ElasticSearch where import Cassandra.Exec (paginateWithStateC) import Cassandra.Util (Writetime (Writetime)) import Conduit (ConduitT, runConduit, (.|)) +import Data.Aeson (encode) import Data.Conduit.Combinators qualified as Conduit import Data.Id import Data.Json.Util (UTCTimeMillis (fromUTCTimeMillis)) @@ -35,6 +36,7 @@ import Wire.API.Team.Feature import Wire.API.Team.Member.Info import Wire.API.Team.Role import Wire.API.User +import Wire.AppStore import Wire.GalleyAPIAccess import Wire.IndexedUserStore (IndexedUserStore) import Wire.IndexedUserStore qualified as IndexedUserStore @@ -50,6 +52,7 @@ import Wire.UserStore.IndexUser interpretIndexedUserStoreBulk :: ( Member TinyLog r, Member UserStore r, + Member AppStore r, Member (Concurrency Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r, @@ -65,6 +68,7 @@ interpretIndexedUserStoreBulk = interpret \case syncAllUsersImpl :: forall r. ( Member UserStore r, + Member AppStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, @@ -76,6 +80,7 @@ syncAllUsersImpl = syncAllUsersWithVersion ES.ExternalGT forceSyncAllUsersImpl :: forall r. ( Member UserStore r, + Member AppStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, @@ -87,6 +92,7 @@ forceSyncAllUsersImpl = syncAllUsersWithVersion ES.ExternalGTE syncAllUsersWithVersion :: forall r. ( Member UserStore r, + Member AppStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, @@ -122,6 +128,7 @@ syncAllUsersWithVersion mkVersion = (t,) <$> teamSearchVisibilityInbound t userTypes :: Map UserId UserType <- fmap Map.fromList . unsafePooledForConcurrentlyN 16 page $ \iu -> (iu.userId,) <$> getUserType iu + warnIfMissingUserTypes page userTypes roles :: Map UserId (WithWritetime Role) <- fmap (Map.fromList . concat) . unsafePooledForConcurrentlyN 16 (Map.toList teams) $ \(t, us) -> do tms <- (.members) <$> selectTeamMemberInfos t (fmap (.userId) us) pure $ mapMaybe mkRoleWithWriteTime tms @@ -147,11 +154,29 @@ syncAllUsersWithVersion mkVersion = ) <$> permissionsToRole tmi.permissions + -- `page` and `userTypes` *should* overlap perfectly, but we're + -- using `unsafePooledForConcurrentlyN` to make concurrent db + -- calls and that swallows any errors that might occur. + -- + -- FUTUREWORK: we need to get rid of `Wire.Sem.Concurrency`, it's + -- unidiomatic and dangerous! + warnIfMissingUserTypes :: [IndexUser] -> Map UserId ignored -> Sem r () + warnIfMissingUserTypes page userTypes = do + let missing = us \\ ts + us, ts :: [UserId] + us = (.userId) <$> page + ts = Map.keys userTypes + unless (null missing) do + warn $ + Log.field "missing" (encode missing) + . Log.msg (Log.val "Reindex: could not lookup all user types!") + migrateDataImpl :: ( Member IndexedUserStore r, Member (Error MigrationException) r, Member IndexedUserMigrationStore r, Member UserStore r, + Member AppStore r, Member (Concurrency Unsafe) r, Member GalleyAPIAccess r, Member TinyLog r @@ -185,17 +210,13 @@ teamSearchVisibilityInbound tid = -- probably expose it as an action there. getUserType :: forall r. + (Member AppStore r) => IndexUser -> Sem r UserType getUserType iu = case iu.serviceId of Just _ -> pure UserTypeBot Nothing -> do - {- - FUTUREWORK: *correct* type fields from search are coming in a separate PR: - mmApp <- mapM (getApp iu.userId) (iu.teamId <&> (.value)) case join mmApp of Just _ -> pure UserTypeApp Nothing -> pure UserTypeRegular - -} - pure UserTypeRegular diff --git a/libs/wire-subsystems/test/resources/postgres-credentials.yaml b/libs/wire-subsystems/test/resources/postgres-credentials.yaml new file mode 100644 index 00000000000..dfcbb7e44a9 --- /dev/null +++ b/libs/wire-subsystems/test/resources/postgres-credentials.yaml @@ -0,0 +1 @@ +posty-the-gres diff --git a/services/brig/src/Brig/Index/Eval.hs b/services/brig/src/Brig/Index/Eval.hs index ad58b6d53fb..523582c1c70 100644 --- a/services/brig/src/Brig/Index/Eval.hs +++ b/services/brig/src/Brig/Index/Eval.hs @@ -22,11 +22,10 @@ module Brig.Index.Eval where import Brig.App (initHttpManagerWithTLSConfig, mkIndexEnv) -import Brig.Index.Options -import Brig.Options +import Brig.Index.Options as IxOpts +import Brig.Options as Opt import Brig.User.Search.Index import Cassandra (Client, runClient) -import Cassandra.Options import Cassandra.Util (defInitCassandra) import Control.Exception (throwIO) import Control.Lens @@ -40,16 +39,20 @@ import Data.Id import Database.Bloodhound qualified as ES import Database.Bloodhound.Internal.Client (BHEnv (..)) import Hasql.Pool +import Hasql.Pool.Extended import Imports import Polysemy import Polysemy.Embed (runEmbedded) import Polysemy.Error +import Polysemy.Input import Polysemy.TinyLog hiding (Logger) import System.Logger qualified as Log import System.Logger.Class (Logger) import Util.Options import Wire.API.Federation.Client (FederatorClient) import Wire.API.Federation.Error +import Wire.AppStore +import Wire.AppStore.Postgres import Wire.BlockListStore (BlockListStore) import Wire.BlockListStore.Cassandra import Wire.FederationAPIAccess @@ -87,6 +90,7 @@ type BrigIndexEffectStack = FederationAPIAccess FederatorClient, Error FederationError, UserStore, + AppStore, IndexedUserStore, Error IndexedUserStoreError, IndexedUserMigrationStore, @@ -98,16 +102,18 @@ type BrigIndexEffectStack = Metrics, TinyLog, Concurrency 'Unsafe, + Input Pool, Error UsageError, Embed IO, Final IO ] -runSem :: ESConnectionSettings -> CassandraSettings -> Endpoint -> Logger -> Sem BrigIndexEffectStack a -> IO a -runSem esConn cas galleyEndpoint logger action = do +runSem :: ESConnectionSettings -> CassandraSettings -> PostgresSettings -> Endpoint -> Logger -> Sem BrigIndexEffectStack a -> IO a +runSem esConn cas pg galleyEndpoint logger action = do mgr <- initHttpManagerWithTLSConfig esConn.esInsecureSkipVerifyTls esConn.esCaCert mEsCreds :: Maybe Credentials <- for esConn.esCredentials initCredentials casClient <- defInitCassandra (toCassandraOpts cas) logger + pgPool <- initPostgresPool pg.pool pg.settings pg.passwordFile let bhEnv = BHEnv { bhServer = toESServer esConn.esServer, @@ -128,6 +134,7 @@ runSem esConn cas galleyEndpoint logger action = do runFinal . embedToFinal . throwErrorToIOFinal @UsageError + . runInputConst pgPool . unsafelyPerformConcurrency . loggerToTinyLogReqId reqId logger . ignoreMetrics @@ -141,6 +148,7 @@ runSem esConn cas galleyEndpoint logger action = do . interpretIndexedUserMigrationStoreES bhEnv migrationIndexName . throwErrorToIOFinal @IndexedUserStoreError . interpretIndexedUserStoreES indexedUserStoreConfig + . interpretAppStoreToPostgres . interpretUserStoreCassandra casClient . throwErrorToIOFinal @FederationError . noFederationAPIAccess @@ -164,17 +172,17 @@ runCommand l = \case Reset es galley -> do e <- initIndex l (es ^. esConnection) galley runIndexIO e $ resetIndex (mkCreateIndexSettings es) - Reindex es cas galley -> do - runSem (es ^. esConnection) cas galley l $ + Reindex es cas pg galley -> do + runSem (es ^. esConnection) cas pg galley l $ IndexedUserStoreBulk.syncAllUsers - ReindexSameOrNewer es cas galley -> do - runSem (es ^. esConnection) cas galley l $ + ReindexSameOrNewer es cas pg galley -> do + runSem (es ^. esConnection) cas pg galley l $ IndexedUserStoreBulk.forceSyncAllUsers UpdateMapping esConn galley -> do e <- initIndex l esConn galley runIndexIO e updateMapping - Migrate es cas galley -> do - runSem (es ^. esConnection) cas galley l $ + Migrate es cas pg galley -> do + runSem (es ^. esConnection) cas pg galley l $ IndexedUserStoreBulk.migrateData ReindexFromAnotherIndex reindexSettings -> do mgr <- diff --git a/services/brig/src/Brig/Index/Options.hs b/services/brig/src/Brig/Index/Options.hs index f72db003042..8b0966d25f5 100644 --- a/services/brig/src/Brig/Index/Options.hs +++ b/services/brig/src/Brig/Index/Options.hs @@ -34,7 +34,9 @@ module Brig.Index.Options cPort, cTlsCa, cKeyspace, + PostgresSettings (..), localElasticSettings, + brigOptsToPostgresSettings, localCassandraSettings, commandParser, mkCreateIndexSettings, @@ -47,13 +49,23 @@ module Brig.Index.Options where import Brig.Index.Types (CreateIndexSettings (..)) +import Brig.Options qualified as Opts import Cassandra qualified as C import Control.Lens +import Data.Aeson as Aeson +import Data.Aeson.Key qualified as AKey +import Data.Aeson.KeyMap qualified as AKM +import Data.Aeson.Text qualified as Aeson import Data.ByteString.Lens +import Data.Map qualified as Map +import Data.Misc import Data.Text qualified as Text +import Data.Text.Encoding (encodeUtf8) +import Data.Text.Lazy qualified as LText import Data.Text.Strict.Lens -import Data.Time.Clock (NominalDiffTime) +import Data.Time (NominalDiffTime) import Database.Bloodhound qualified as ES +import Hasql.Pool.Extended import Imports import Options.Applicative import URI.ByteString @@ -63,11 +75,11 @@ import Util.Options (CassandraOpts (..), Endpoint (..), FilePathSecrets) data Command = Create ElasticSettings Endpoint | Reset ElasticSettings Endpoint - | Reindex ElasticSettings CassandraSettings Endpoint - | ReindexSameOrNewer ElasticSettings CassandraSettings Endpoint + | Reindex ElasticSettings CassandraSettings PostgresSettings Endpoint + | ReindexSameOrNewer ElasticSettings CassandraSettings PostgresSettings Endpoint | -- | 'ElasticSettings' has shards and other settings that are not needed here. UpdateMapping ESConnectionSettings Endpoint - | Migrate ElasticSettings CassandraSettings Endpoint + | Migrate ElasticSettings CassandraSettings PostgresSettings Endpoint | ReindexFromAnotherIndex ReindexFromAnotherIndexSettings deriving (Show) @@ -90,6 +102,15 @@ data ElasticSettings = ElasticSettings } deriving (Show) +data PostgresSettings = PostgresSettings + { pool :: !PoolConfig, + passwordFile :: !(Maybe FilePathSecrets), + -- | Postgresql settings, the key values must be in libpq format. + -- https://www.postgresql.org/docs/17/libpq-connect.html#LIBPQ-PARAMKEYWORDS + settings :: !(Map Text Text) + } + deriving (Show) + data CassandraSettings = CassandraSettings { _cHost :: String, _cPort :: Word16, @@ -147,6 +168,14 @@ localElasticSettings = _esDeleteTemplate = Nothing } +brigOptsToPostgresSettings :: Opts.Opts -> PostgresSettings +brigOptsToPostgresSettings opts = + PostgresSettings + { pool = opts.postgresqlPool, + passwordFile = opts.postgresqlPassword, + settings = opts.postgresql + } + localCassandraSettings :: CassandraSettings localCassandraSettings = CassandraSettings @@ -297,6 +326,70 @@ credentialsPathParser = ) ) +postgresSettingsParser :: Parser PostgresSettings +postgresSettingsParser = + PostgresSettings + <$> poolConfigParser + <*> optional + ( strOption + ( long "pg-password-file" + <> metavar "FILE" + <> help "File containing PostgreSQL password" + ) + ) + <*> option + (eitherReader parseJsonMap) + ( long "pg-settings" + <> metavar "JSON" + <> help "PostgreSQL connection parameters as JSON object" + <> value Map.empty + ) + +poolConfigParser :: Parser PoolConfig +poolConfigParser = + PoolConfig + <$> option + auto + ( long "pg-pool-size" + <> metavar "INT" + <> help "Connection pool size" + <> value 10 + ) + <*> option + (eitherReader (parseDuration . Text.pack)) + ( long "pg-pool-acquisition-timeout" + <> metavar "Duration" + <> help "Pool acquisition timeout in seconds" + <> value (unsafeParseDuration "10s") + ) + <*> option + (eitherReader (parseDuration . Text.pack)) + ( long "pg-pool-aging-timeout" + <> metavar "Duration" + <> help "Pool aging timeout in seconds" + <> value (unsafeParseDuration "1d") + ) + <*> option + (eitherReader (parseDuration . Text.pack)) + ( long "pg-pool-idleness-timeout" + <> metavar "Duration" + <> help "Pool idleness timeout in seconds" + <> value (unsafeParseDuration "10m") + ) + +parseJsonMap :: String -> Either String (Map Text Text) +parseJsonMap s = do + Aeson.eitherDecodeStrict' (encodeUtf8 (Text.pack s)) >>= \case + Object hmap -> pure $ Map.fromList $ bimap AKey.toText valueToText <$> AKM.toList hmap + bad -> Left $ "invalid json object: " <> show bad + where + valueToText :: Value -> Text + valueToText (String t) = t + valueToText (Bool b) = (if b then "true" else "false") + valueToText (Number n) = (Text.pack (show n)) + valueToText Null = "null" + valueToText v = LText.toStrict (Aeson.encodeToLazyText v) + cassandraSettingsParser :: Parser CassandraSettings cassandraSettingsParser = CassandraSettings @@ -394,19 +487,19 @@ commandParser = <> command "reindex" ( info - (Reindex <$> elasticSettingsParser <*> cassandraSettingsParser <*> galleyEndpointParser) + (Reindex <$> elasticSettingsParser <*> cassandraSettingsParser <*> postgresSettingsParser <*> galleyEndpointParser) (progDesc "Reindex all users from Cassandra if there is a new version.") ) <> command "reindex-if-same-or-newer" ( info - (ReindexSameOrNewer <$> elasticSettingsParser <*> cassandraSettingsParser <*> galleyEndpointParser) + (ReindexSameOrNewer <$> elasticSettingsParser <*> cassandraSettingsParser <*> postgresSettingsParser <*> galleyEndpointParser) (progDesc "Reindex all users from Cassandra, even if the version has not changed.") ) <> command "migrate-data" ( info - (Migrate <$> elasticSettingsParser <*> cassandraSettingsParser <*> galleyEndpointParser) + (Migrate <$> elasticSettingsParser <*> cassandraSettingsParser <*> postgresSettingsParser <*> galleyEndpointParser) (progDesc "Migrate data in elastic search") ) <> command diff --git a/services/brig/test/integration/API/Search.hs b/services/brig/test/integration/API/Search.hs index 1e87cb4b7a0..2fb1fbdf5fa 100644 --- a/services/brig/test/integration/API/Search.hs +++ b/services/brig/test/integration/API/Search.hs @@ -800,7 +800,7 @@ runReindexFromAnotherIndex logger opts newIndexName migrationIndexName = in runCommand logger $ ReindexFromAnotherIndex reindexSettings runReindexFromDatabase :: - (ElasticSettings -> CassandraSettings -> Endpoint -> Command) -> + (ElasticSettings -> CassandraSettings -> PostgresSettings -> Endpoint -> Command) -> Log.Logger -> Opt.Opts -> ES.IndexName -> @@ -819,14 +819,14 @@ runReindexFromDatabase syncCommand logger opts newIndexName migrationIndexName = & IndexOpts.esIndexShardCount .~ shards & IndexOpts.esIndexRefreshInterval .~ refreshInterval cassandraSettings :: CassandraSettings = - ( localCassandraSettings - & IndexOpts.cHost .~ (Text.unpack opts.cassandra.endpoint.host) - & IndexOpts.cPort .~ (opts.cassandra.endpoint.port) - & IndexOpts.cKeyspace .~ (C.Keyspace opts.cassandra.keyspace) - ) - + localCassandraSettings + & IndexOpts.cHost .~ (Text.unpack opts.cassandra.endpoint.host) + & IndexOpts.cPort .~ (opts.cassandra.endpoint.port) + & IndexOpts.cKeyspace .~ (C.Keyspace opts.cassandra.keyspace) + postgresSettings :: PostgresSettings = + brigOptsToPostgresSettings opts endpoint :: Endpoint = opts.galley - in runCommand logger $ syncCommand elasticSettings cassandraSettings endpoint + in runCommand logger $ syncCommand elasticSettings cassandraSettings postgresSettings endpoint toESConnectionSettings :: ElasticSearchOpts -> ES.IndexName -> ESConnectionSettings toESConnectionSettings opts migrationIndexName = ESConnectionSettings {..}