Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions libs/cassandra-util/src/Cassandra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import Cassandra.Exec as C
( BatchM,
Client,
ClientState,
GeneralPaginationState (..),
MonadClient,
Page (..),
PageWithState (..),
Expand All @@ -74,6 +75,8 @@ import Cassandra.Exec as C
paginate,
paginateC,
paginateWithState,
paginationStateCassandra,
paginationStatePostgres,
params,
paramsP,
paramsPagingState,
Expand Down
35 changes: 26 additions & 9 deletions libs/cassandra-util/src/Cassandra/Exec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ module Cassandra.Exec
x5,
x1,
paginateC,
GeneralPaginationState (..),
paginationStateCassandra,
paginationStatePostgres,
PageWithState (..),
paginateWithState,
paginateWithStateC,
Expand Down Expand Up @@ -97,23 +100,37 @@ paginateC q p r = go =<< lift (retry r (paginate q p))
when (hasMore page) $
go =<< lift (retry r (liftClient (nextPage page)))

data PageWithState a = PageWithState
{ pwsResults :: [a],
pwsState :: Maybe Protocol.PagingState
data GeneralPaginationState a
= PaginationStateCassandra Protocol.PagingState
| PaginationStatePostgres a

paginationStateCassandra :: GeneralPaginationState pgState -> Maybe Protocol.PagingState
paginationStateCassandra = \case
PaginationStateCassandra state -> Just state
PaginationStatePostgres {} -> Nothing

paginationStatePostgres :: GeneralPaginationState pgState -> Maybe pgState
paginationStatePostgres = \case
PaginationStatePostgres pgState -> Just pgState
PaginationStateCassandra {} -> Nothing

data PageWithState state res = PageWithState
{ pwsResults :: [res],
pwsState :: Maybe (GeneralPaginationState state)
}
deriving (Functor)

-- | Like 'paginate' but exposes the paging state. This paging state can be
-- serialised and sent to consumers of the API. The state is not good for long
-- term storage as the bytestring format may change when the schema of a table
-- changes or when cassandra is upgraded.
paginateWithState :: (MonadClient m, Tuple a, Tuple b, RunQ q) => q R a b -> QueryParams a -> m (PageWithState b)
paginateWithState :: (MonadClient m, Tuple a, Tuple b, RunQ q) => q R a b -> QueryParams a -> m (PageWithState x b)
paginateWithState q p = do
let p' = p {Protocol.pageSize = Protocol.pageSize p <|> Just 10000}
r <- runQ q p'
getResult r >>= \case
Protocol.RowsResult m b ->
pure $ PageWithState b (pagingState m)
pure $ PageWithState b (PaginationStateCassandra <$> pagingState m)
_ -> throwM $ UnexpectedResponse (hrHost r) (hrResponse r)

-- | Like 'paginateWithState' but returns a conduit instead of one page.
Expand All @@ -128,20 +145,20 @@ paginateWithState q p = do
-- where
-- getUsers state = paginateWithState getUsersQuery (paramsPagingState Quorum () 10000 state)
-- @
paginateWithStateC :: forall m a. (Monad m) => (Maybe Protocol.PagingState -> m (PageWithState a)) -> ConduitT () [a] m ()
paginateWithStateC :: forall m res state. (Monad m) => (Maybe (GeneralPaginationState state) -> m (PageWithState state res)) -> ConduitT () [res] m ()
paginateWithStateC getPage = do
go =<< lift (getPage Nothing)
where
go :: PageWithState a -> ConduitT () [a] m ()
go :: PageWithState state res -> ConduitT () [res] m ()
go page = do
unless (null page.pwsResults) $
yield (page.pwsResults)
when (pwsHasMore page) $
go =<< lift (getPage page.pwsState)
go =<< lift (getPage $ page.pwsState)

paramsPagingState :: Consistency -> a -> Int32 -> Maybe Protocol.PagingState -> QueryParams a
paramsPagingState c p n state = QueryParams c False p (Just n) state Nothing Nothing
{-# INLINE paramsPagingState #-}

pwsHasMore :: PageWithState a -> Bool
pwsHasMore :: PageWithState a b -> Bool
pwsHasMore = isJust . pwsState
2 changes: 1 addition & 1 deletion libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data CassandraPaging

type instance E.PagingState CassandraPaging a = PagingState

type instance E.Page CassandraPaging a = PageWithState a
type instance E.Page CassandraPaging a = PageWithState Void a

type instance E.PagingBounds CassandraPaging TeamId = Range 1 100 Int32

Expand Down
9 changes: 7 additions & 2 deletions libs/wire-api/src/Wire/API/Team/Member.hs
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,13 @@ instance ToSchema TeamMembersPage where

type TeamMembersPagingState = MultiTablePagingState TeamMembersPagingName TeamMembersTable

teamMemberPagingState :: PageWithState TeamMember -> TeamMembersPagingState
teamMemberPagingState p = MultiTablePagingState TeamMembersTable (LBS.toStrict . C.unPagingState <$> pwsState p)
teamMemberPagingState :: PageWithState Void TeamMember -> TeamMembersPagingState
teamMemberPagingState p =
MultiTablePagingState
TeamMembersTable
( LBS.toStrict . C.unPagingState
<$> (C.paginationStateCassandra =<< p.pwsState)
)

instance ToParamSchema TeamMembersPagingState where
toParamSchema _ = toParamSchema (Proxy @Text)
Expand Down
8 changes: 4 additions & 4 deletions libs/wire-subsystems/src/Wire/UserStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

module Wire.UserStore where

import Cassandra (PageWithState (..), PagingState)
import Cassandra (GeneralPaginationState, PageWithState (..))
import Data.Default
import Data.Handle
import Data.Id
Expand Down Expand Up @@ -72,7 +72,7 @@ data UserStore m a where
CreateUser :: NewStoredUser -> Maybe (ConvId, Maybe TeamId) -> UserStore m ()
GetIndexUser :: UserId -> UserStore m (Maybe IndexUser)
DoesUserExist :: UserId -> UserStore m Bool
GetIndexUsersPaginated :: Int32 -> Maybe PagingState -> UserStore m (PageWithState IndexUser)
GetIndexUsersPaginated :: Int32 -> Maybe (GeneralPaginationState Void) -> UserStore m (PageWithState Void IndexUser)
GetUsers :: [UserId] -> UserStore m [StoredUser]
UpdateUser :: UserId -> StoredUserUpdate -> UserStore m ()
UpdateEmail :: UserId -> EmailAddress -> UserStore m ()
Expand Down Expand Up @@ -112,8 +112,8 @@ data UserStore m a where
UpdateFeatureConferenceCalling :: UserId -> Maybe FeatureStatus -> UserStore m ()
LookupFeatureConferenceCalling :: UserId -> UserStore m (Maybe FeatureStatus)
DeleteServiceUser :: ProviderId -> ServiceId -> BotId -> UserStore m ()
LookupServiceUsers :: ProviderId -> ServiceId -> Maybe PagingState -> UserStore m (PageWithState (BotId, ConvId, Maybe TeamId))
LookupServiceUsersForTeam :: ProviderId -> ServiceId -> TeamId -> Maybe PagingState -> UserStore m (PageWithState (BotId, ConvId))
LookupServiceUsers :: ProviderId -> ServiceId -> Maybe (GeneralPaginationState Void) -> UserStore m (PageWithState Void (BotId, ConvId, Maybe TeamId))
LookupServiceUsersForTeam :: ProviderId -> ServiceId -> TeamId -> Maybe (GeneralPaginationState Void) -> UserStore m (PageWithState Void (BotId, ConvId))

makeSem ''UserStore

Expand Down
12 changes: 6 additions & 6 deletions libs/wire-subsystems/src/Wire/UserStore/Cassandra.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ interpretUserStoreCassandra casClient =
GetUsers uids -> getUsersImpl uids
DoesUserExist uid -> doesUserExistImpl uid
GetIndexUser uid -> getIndexUserImpl uid
GetIndexUsersPaginated pageSize mPagingState -> getIndexUserPaginatedImpl pageSize mPagingState
GetIndexUsersPaginated pageSize mPagingState -> getIndexUserPaginatedImpl pageSize (paginationStateCassandra =<< mPagingState)
UpdateUser uid update -> updateUserImpl uid update
UpdateEmail uid email -> updateEmailImpl uid email
UpdateEmailUnvalidated uid email -> updateEmailUnvalidatedImpl uid email
Expand Down Expand Up @@ -77,8 +77,8 @@ interpretUserStoreCassandra casClient =
DeleteEmail uid -> deleteEmailImpl uid
SetUserSearchable uid searchable -> setUserSearchableImpl uid searchable
DeleteServiceUser pid sid bid -> deleteServiceUserImpl pid sid bid
LookupServiceUsers pid sid mPagingState -> lookupServiceUsersImpl pid sid mPagingState
LookupServiceUsersForTeam pid sid tid mPagingState -> lookupServiceUsersForTeamImpl pid sid tid mPagingState
LookupServiceUsers pid sid mPagingState -> lookupServiceUsersImpl pid sid (paginationStateCassandra =<< mPagingState)
LookupServiceUsersForTeam pid sid tid mPagingState -> lookupServiceUsersForTeamImpl pid sid tid (paginationStateCassandra =<< mPagingState)

createUserImpl :: NewStoredUser -> Maybe (ConvId, Maybe TeamId) -> Client ()
createUserImpl new mbConv = retry x5 . batch $ do
Expand Down Expand Up @@ -122,7 +122,7 @@ getIndexUserImpl u = do
cql :: PrepQuery R (Identity UserId) (TupleType IndexUser)
cql = prepared . QueryString $ getIndexUserBaseQuery <> " WHERE id = ?"

getIndexUserPaginatedImpl :: Int32 -> Maybe PagingState -> Client (PageWithState IndexUser)
getIndexUserPaginatedImpl :: Int32 -> Maybe PagingState -> Client (PageWithState x IndexUser)
getIndexUserPaginatedImpl pageSize mPagingState =
indexUserFromTuple <$$> paginateWithState cql (paramsPagingState LocalQuorum () pageSize mPagingState)
where
Expand Down Expand Up @@ -414,7 +414,7 @@ lookupServiceUsersImpl ::
ProviderId ->
ServiceId ->
Maybe PagingState ->
Client (PageWithState (BotId, ConvId, Maybe TeamId))
Client (PageWithState Void (BotId, ConvId, Maybe TeamId))
lookupServiceUsersImpl pid sid mPagingState =
paginateWithState cql (paramsPagingState LocalQuorum (pid, sid) 100 mPagingState)
where
Expand All @@ -428,7 +428,7 @@ lookupServiceUsersForTeamImpl ::
ServiceId ->
TeamId ->
Maybe PagingState ->
Client (PageWithState (BotId, ConvId))
Client (PageWithState Void (BotId, ConvId))
lookupServiceUsersForTeamImpl pid sid tid mPagingState =
paginateWithState cql (paramsPagingState LocalQuorum (pid, sid, tid) 100 mPagingState)
where
Expand Down
4 changes: 2 additions & 2 deletions services/brig/src/Brig/API/Public.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1380,14 +1380,14 @@ listConnections uid Public.GetMultiTablePageRequest {..} = do
Just (Public.ConnectionPagingState Public.PagingRemotes stateBS) -> remotesOnly self (mkState <$> stateBS) (fromRange gmtprSize)
_ -> localsAndRemotes self (fmap mkState . Public.mtpsState =<< gmtprState) gmtprSize
where
pageToConnectionsPage :: Public.LocalOrRemoteTable -> Data.PageWithState Public.UserConnection -> Public.ConnectionsPage
pageToConnectionsPage :: Public.LocalOrRemoteTable -> Data.PageWithState Void Public.UserConnection -> Public.ConnectionsPage
pageToConnectionsPage table page@Data.PageWithState {..} =
Public.MultiTablePage
{ mtpResults = pwsResults,
mtpHasMore = C.pwsHasMore page,
-- FUTUREWORK confusingly, using 'ConversationPagingState' instead of 'ConnectionPagingState' doesn't fail any tests.
-- Is this type actually useless? Or the tests not good enough?
mtpPagingState = Public.ConnectionPagingState table (LBS.toStrict . C.unPagingState <$> pwsState)
mtpPagingState = Public.ConnectionPagingState table (LBS.toStrict . C.unPagingState <$> (Data.paginationStateCassandra =<< pwsState))
}

mkState :: ByteString -> C.PagingState
Expand Down
4 changes: 2 additions & 2 deletions services/brig/src/Brig/Data/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ lookupLocalConnectionsPage ::
Local UserId ->
Maybe PagingState ->
Range 1 1000 Int32 ->
m (PageWithState UserConnection)
m (PageWithState Void UserConnection)
lookupLocalConnectionsPage self pagingState (fromRange -> size) =
fmap (toLocalUserConnection self) <$> paginateWithState connectionsSelect (paramsPagingState LocalQuorum (Identity (tUnqualified self)) size pagingState)

Expand All @@ -186,7 +186,7 @@ lookupRemoteConnectionsPage ::
Local UserId ->
Maybe PagingState ->
Int32 ->
m (PageWithState UserConnection)
m (PageWithState Void UserConnection)
lookupRemoteConnectionsPage self pagingState size =
fmap (toRemoteUserConnection self)
<$> paginateWithState
Expand Down
4 changes: 2 additions & 2 deletions services/galley/src/Galley/API/Teams.hs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ getTeamMembers lzusr tid mbMaxResults mbPagingState = do
let mLimit = fromMaybe (unsafeRange Public.hardTruncationLimit) mbMaxResults
if member `hasPermission` SearchContacts
then do
pws :: PageWithState TeamMember <- E.listTeamMembers @CassandraPaging tid mState mLimit
pws :: PageWithState Void TeamMember <- E.listTeamMembers @CassandraPaging tid mState mLimit
-- FUTUREWORK: Remove this via-Brig filtering when user and
-- team_member tables are migrated to Postgres. We currently
-- can't filter in the database because Cassandra doesn't
Expand Down Expand Up @@ -447,7 +447,7 @@ getTeamMembers lzusr tid mbMaxResults mbPagingState = do
let uids = uid : maybeToList invitee
TeamSubsystem.internalSelectTeamMembers tid uids <&> toTeamSingleMembersPage member
where
toTeamMembersPage :: TeamMember -> C.PageWithState TeamMember -> TeamMembersPage
toTeamMembersPage :: TeamMember -> C.PageWithState Void TeamMember -> TeamMembersPage
toTeamMembersPage member p =
let withPerms = (member `canSeePermsOf`)
in TeamMembersPage $
Expand Down
2 changes: 1 addition & 1 deletion services/galley/src/Galley/Cassandra/Team.hs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ teamMembersPageFrom ::
TeamId ->
Maybe PagingState ->
Range 1 HardTruncationLimit Int32 ->
Client (PageWithState TeamMember)
Client (PageWithState Void TeamMember)
teamMembersPageFrom lh tid pagingState (fromRange -> max) = do
page <- paginateWithState Cql.selectTeamMembers (paramsPagingState LocalQuorum (Identity tid) max pagingState)
members <- mapM (newTeamMember' lh tid) (pwsResults page)
Expand Down