Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ cassandra-schema: db-migrate cassandra-schema-impl
cassandra-schema-impl:
./hack/bin/cassandra_dump_schema > ./cassandra-schema.cql

.PHONY: postgres-reset postgres-schema-impl
.PHONY: postgres-schema
postgres-schema: postgres-reset postgres-schema-impl

.PHONY: postgres-schema-impl
Expand Down Expand Up @@ -393,21 +393,43 @@ postgres-reset: c
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --reset --dbname dyn-2
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --reset --dbname dyn-3

.PHONY: postgres-migrate
postgres-migrate: c
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname backendA
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname backendB
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname dyn-1
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname dyn-2
./dist/brig -c ./services/brig/brig.integration.yaml migrate-postgres --dbname dyn-3

.PHONY: es-reset
es-reset: c
./dist/brig-index reset \
--elasticsearch-index-prefix directory \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null
./dist/brig-index reset \
--elasticsearch-index-prefix directory2 \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null
./integration/scripts/integration-dynamic-backends-brig-index.sh \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null
@echo -e "\n'brig-index reset' only deletes the index and regenerates the mapping, but doesn't generate or populate a new index, so you need to call 'make es-reindex explicitly now!\n"

.PHONY: es-reindex
es-reindex: c
./dist/brig-index reindex \
--pg-pool-size 10 \
--pg-pool-acquisition-timeout 10s \
--pg-pool-aging-timeout 1d \
--pg-pool-idleness-timeout 1h \
--pg-settings '{"host":"127.0.0.1","port":"5432","user":"wire-server","dbname":"backendA"}' \
--pg-password-file ./libs/wire-subsystems/test/resources/postgres-credentials.yaml \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null

.PHONY: rabbitmq-reset
Expand All @@ -416,7 +438,7 @@ rabbitmq-reset: rabbit-clean
# Migrate all keyspaces and reset the ES index
# Does not migrate postgres as brig does that on startup.
.PHONY: db-migrate
db-migrate: c
db-migrate: c postgres-migrate
./dist/brig-schema --keyspace brig_test --replication-factor 1 > /dev/null
./dist/galley-schema --keyspace galley_test --replication-factor 1 > /dev/null
./dist/gundeck-schema --keyspace gundeck_test --replication-factor 1 > /dev/null
Expand All @@ -426,20 +448,7 @@ db-migrate: c
./dist/gundeck-schema --keyspace gundeck_test2 --replication-factor 1 > /dev/null
./dist/spar-schema --keyspace spar_test2 --replication-factor 1 > /dev/null
./integration/scripts/integration-dynamic-backends-db-schemas.sh --replication-factor 1 > /dev/null
./dist/brig-index reset \
--elasticsearch-index-prefix directory \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null
./dist/brig-index reset \
--elasticsearch-index-prefix directory2 \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null
./integration/scripts/integration-dynamic-backends-brig-index.sh \
--elasticsearch-server https://localhost:9200 \
--elasticsearch-ca-cert ./libs/wire-subsystems/test/resources/elasticsearch-ca.pem \
--elasticsearch-credentials ./libs/wire-subsystems/test/resources/elasticsearch-credentials.yaml > /dev/null
make es-reset

#################################
## dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Elasticsearch re-indexing requires postgres access now. If you run `brig-index` directly anywhere, make sure to add the relevant settings. The Elasticsearch index must be refilled from Cassandra in order for the changes to the search results to take effect. See https://docs.wire.com/latest/developer/reference/elastic-search.html?h=index#refill-es-documents-from-cassandra
18 changes: 12 additions & 6 deletions integration/test/Test/Apps.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,25 @@ testCreateApp = do
void $ bindResponse (createApp owner tid new {category = "notinenum"}) $ \resp -> do
resp.status `shouldMatchInt` 400

let foundUserType exactMatchTerm aType =
searchContacts owner exactMatchTerm OwnDomain `bindResponse` \resp -> do
let foundUserType :: (HasCallStack) => Value -> String -> [String] -> App ()
foundUserType searcher exactMatchTerm aTypes =
searchContacts searcher exactMatchTerm OwnDomain `bindResponse` \resp -> do
resp.status `shouldMatchInt` 200
foundDoc <- resp.json %. "documents" >>= asList >>= assertOne
foundDoc %. "type" `shouldMatch` aType
foundDoc <- resp.json %. "documents" >>= asList
(%. "type") `mapM` foundDoc `shouldMatch` aTypes

-- App's user is findable from /search/contacts
BrigI.refreshIndex domain
foundUserType new.name "app"
foundUserType owner new.name ["app"]
foundUserType regularMember new.name ["app"]

-- App's user is *not* findable from other team.
BrigI.refreshIndex domain
foundUserType owner2 new.name []

-- Regular members still have the type "regular"
memberName <- regularMember %. "name" & asString
foundUserType memberName "regular"
foundUserType owner memberName ["regular"]

testRefreshAppCookie :: (HasCallStack) => App ()
testRefreshAppCookie = do
Expand Down
8 changes: 8 additions & 0 deletions libs/types-common/src/Data/Misc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Data.Misc
Duration (..),
diffTimeParser,
parseDuration,
unsafeParseDuration,
durationToMicros,

-- * HttpsUrl
Expand Down Expand Up @@ -284,6 +285,13 @@ diffTimeParser = do
parseDuration :: Text -> Either String Duration
parseDuration = fmap Duration . Atto.parseOnly (diffTimeParser <* Atto.endOfInput)

unsafeParseDuration :: Text -> Duration
unsafeParseDuration txt =
either
(\err -> error $ "Malformed duration: " <> show txt <> " " <> err)
id
(parseDuration txt)

-- | Useful for threadDelay, timeout, etc.
durationToMicros :: Duration -> Int
durationToMicros =
Expand Down
1 change: 1 addition & 0 deletions libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data IndexedUserStoreBulk m a where
-- | Overwrite all users in the ES index, use it when trying to fix some
-- inconsistency or while introducing a new field in the mapping.
ForceSyncAllUsers :: IndexedUserStoreBulk m ()
-- | Run `ForceSyncAllUsers` iff the index version is out of date.
MigrateData :: IndexedUserStoreBulk m ()

makeSem ''IndexedUserStoreBulk
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module Wire.IndexedUserStore.Bulk.ElasticSearch where
import Cassandra.Exec (paginateWithStateC)
import Cassandra.Util (Writetime (Writetime))
import Conduit (ConduitT, runConduit, (.|))
import Data.Aeson (encode)
import Data.Conduit.Combinators qualified as Conduit
import Data.Id
import Data.Json.Util (UTCTimeMillis (fromUTCTimeMillis))
Expand All @@ -35,6 +36,7 @@ import Wire.API.Team.Feature
import Wire.API.Team.Member.Info
import Wire.API.Team.Role
import Wire.API.User
import Wire.AppStore
import Wire.GalleyAPIAccess
import Wire.IndexedUserStore (IndexedUserStore)
import Wire.IndexedUserStore qualified as IndexedUserStore
Expand All @@ -50,6 +52,7 @@ import Wire.UserStore.IndexUser
interpretIndexedUserStoreBulk ::
( Member TinyLog r,
Member UserStore r,
Member AppStore r,
Member (Concurrency Unsafe) r,
Member GalleyAPIAccess r,
Member IndexedUserStore r,
Expand All @@ -65,6 +68,7 @@ interpretIndexedUserStoreBulk = interpret \case
syncAllUsersImpl ::
forall r.
( Member UserStore r,
Member AppStore r,
Member TinyLog r,
Member (Concurrency 'Unsafe) r,
Member GalleyAPIAccess r,
Expand All @@ -76,6 +80,7 @@ syncAllUsersImpl = syncAllUsersWithVersion ES.ExternalGT
forceSyncAllUsersImpl ::
forall r.
( Member UserStore r,
Member AppStore r,
Member TinyLog r,
Member (Concurrency 'Unsafe) r,
Member GalleyAPIAccess r,
Expand All @@ -87,6 +92,7 @@ forceSyncAllUsersImpl = syncAllUsersWithVersion ES.ExternalGTE
syncAllUsersWithVersion ::
forall r.
( Member UserStore r,
Member AppStore r,
Member TinyLog r,
Member (Concurrency 'Unsafe) r,
Member GalleyAPIAccess r,
Expand Down Expand Up @@ -122,6 +128,7 @@ syncAllUsersWithVersion mkVersion =
(t,) <$> teamSearchVisibilityInbound t
userTypes :: Map UserId UserType <- fmap Map.fromList . unsafePooledForConcurrentlyN 16 page $ \iu ->
(iu.userId,) <$> getUserType iu
warnIfMissingUserTypes page userTypes
roles :: Map UserId (WithWritetime Role) <- fmap (Map.fromList . concat) . unsafePooledForConcurrentlyN 16 (Map.toList teams) $ \(t, us) -> do
tms <- (.members) <$> selectTeamMemberInfos t (fmap (.userId) us)
pure $ mapMaybe mkRoleWithWriteTime tms
Expand All @@ -147,11 +154,29 @@ syncAllUsersWithVersion mkVersion =
)
<$> permissionsToRole tmi.permissions

-- `page` and `userTypes` *should* overlap perfectly, but we're
-- using `unsafePooledForConcurrentlyN` to make concurrent db
-- calls and that swallows any errors that might occur.
--
-- FUTUREWORK: we need to get rid of `Wire.Sem.Concurrency`, it's
-- unidiomatic and dangerous!
warnIfMissingUserTypes :: [IndexUser] -> Map UserId ignored -> Sem r ()
warnIfMissingUserTypes page userTypes = do
let missing = us \\ ts
us, ts :: [UserId]
us = (.userId) <$> page
ts = Map.keys userTypes
unless (null missing) do
warn $
Log.field "missing" (encode missing)
. Log.msg (Log.val "Reindex: could not lookup all user types!")

migrateDataImpl ::
( Member IndexedUserStore r,
Member (Error MigrationException) r,
Member IndexedUserMigrationStore r,
Member UserStore r,
Member AppStore r,
Member (Concurrency Unsafe) r,
Member GalleyAPIAccess r,
Member TinyLog r
Expand Down Expand Up @@ -185,17 +210,13 @@ teamSearchVisibilityInbound tid =
-- probably expose it as an action there.
getUserType ::
forall r.
(Member AppStore r) =>
IndexUser ->
Sem r UserType
getUserType iu = case iu.serviceId of
Just _ -> pure UserTypeBot
Nothing -> do
{-
FUTUREWORK: *correct* type fields from search are coming in a separate PR:

mmApp <- mapM (getApp iu.userId) (iu.teamId <&> (.value))
case join mmApp of
Just _ -> pure UserTypeApp
Nothing -> pure UserTypeRegular
-}
pure UserTypeRegular
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
posty-the-gres
30 changes: 19 additions & 11 deletions services/brig/src/Brig/Index/Eval.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ module Brig.Index.Eval
where

import Brig.App (initHttpManagerWithTLSConfig, mkIndexEnv)
import Brig.Index.Options
import Brig.Options
import Brig.Index.Options as IxOpts
import Brig.Options as Opt
import Brig.User.Search.Index
import Cassandra (Client, runClient)
import Cassandra.Options
import Cassandra.Util (defInitCassandra)
import Control.Exception (throwIO)
import Control.Lens
Expand All @@ -40,16 +39,20 @@ import Data.Id
import Database.Bloodhound qualified as ES
import Database.Bloodhound.Internal.Client (BHEnv (..))
import Hasql.Pool
import Hasql.Pool.Extended
import Imports
import Polysemy
import Polysemy.Embed (runEmbedded)
import Polysemy.Error
import Polysemy.Input
import Polysemy.TinyLog hiding (Logger)
import System.Logger qualified as Log
import System.Logger.Class (Logger)
import Util.Options
import Wire.API.Federation.Client (FederatorClient)
import Wire.API.Federation.Error
import Wire.AppStore
import Wire.AppStore.Postgres
import Wire.BlockListStore (BlockListStore)
import Wire.BlockListStore.Cassandra
import Wire.FederationAPIAccess
Expand Down Expand Up @@ -87,6 +90,7 @@ type BrigIndexEffectStack =
FederationAPIAccess FederatorClient,
Error FederationError,
UserStore,
AppStore,
IndexedUserStore,
Error IndexedUserStoreError,
IndexedUserMigrationStore,
Expand All @@ -98,16 +102,18 @@ type BrigIndexEffectStack =
Metrics,
TinyLog,
Concurrency 'Unsafe,
Input Pool,
Error UsageError,
Embed IO,
Final IO
]

runSem :: ESConnectionSettings -> CassandraSettings -> Endpoint -> Logger -> Sem BrigIndexEffectStack a -> IO a
runSem esConn cas galleyEndpoint logger action = do
runSem :: ESConnectionSettings -> CassandraSettings -> PostgresSettings -> Endpoint -> Logger -> Sem BrigIndexEffectStack a -> IO a
runSem esConn cas pg galleyEndpoint logger action = do
mgr <- initHttpManagerWithTLSConfig esConn.esInsecureSkipVerifyTls esConn.esCaCert
mEsCreds :: Maybe Credentials <- for esConn.esCredentials initCredentials
casClient <- defInitCassandra (toCassandraOpts cas) logger
pgPool <- initPostgresPool pg.pool pg.settings pg.passwordFile
let bhEnv =
BHEnv
{ bhServer = toESServer esConn.esServer,
Expand All @@ -128,6 +134,7 @@ runSem esConn cas galleyEndpoint logger action = do
runFinal
. embedToFinal
. throwErrorToIOFinal @UsageError
. runInputConst pgPool
. unsafelyPerformConcurrency
. loggerToTinyLogReqId reqId logger
. ignoreMetrics
Expand All @@ -141,6 +148,7 @@ runSem esConn cas galleyEndpoint logger action = do
. interpretIndexedUserMigrationStoreES bhEnv migrationIndexName
. throwErrorToIOFinal @IndexedUserStoreError
. interpretIndexedUserStoreES indexedUserStoreConfig
. interpretAppStoreToPostgres
. interpretUserStoreCassandra casClient
. throwErrorToIOFinal @FederationError
. noFederationAPIAccess
Expand All @@ -164,17 +172,17 @@ runCommand l = \case
Reset es galley -> do
e <- initIndex l (es ^. esConnection) galley
runIndexIO e $ resetIndex (mkCreateIndexSettings es)
Reindex es cas galley -> do
runSem (es ^. esConnection) cas galley l $
Reindex es cas pg galley -> do
runSem (es ^. esConnection) cas pg galley l $
IndexedUserStoreBulk.syncAllUsers
ReindexSameOrNewer es cas galley -> do
runSem (es ^. esConnection) cas galley l $
ReindexSameOrNewer es cas pg galley -> do
runSem (es ^. esConnection) cas pg galley l $
IndexedUserStoreBulk.forceSyncAllUsers
UpdateMapping esConn galley -> do
e <- initIndex l esConn galley
runIndexIO e updateMapping
Migrate es cas galley -> do
runSem (es ^. esConnection) cas galley l $
Migrate es cas pg galley -> do
runSem (es ^. esConnection) cas pg galley l $
IndexedUserStoreBulk.migrateData
ReindexFromAnotherIndex reindexSettings -> do
mgr <-
Expand Down
Loading