From e148c6bf7bc54a6a0cf9c17fc19687c8e5d7c358 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 20 Jan 2026 11:41:17 +0100 Subject: [PATCH] Introduce GeneralPaginationState type to be able to paginate over cassandra or postgres --- libs/cassandra-util/src/Cassandra.hs | 3 ++ libs/cassandra-util/src/Cassandra/Exec.hs | 35 ++++++++++++++----- .../src/Wire/Sem/Paging/Cassandra.hs | 2 +- libs/wire-api/src/Wire/API/Team/Member.hs | 9 +++-- libs/wire-subsystems/src/Wire/UserStore.hs | 8 ++--- .../src/Wire/UserStore/Cassandra.hs | 12 +++---- services/brig/src/Brig/API/Public.hs | 4 +-- services/brig/src/Brig/Data/Connection.hs | 4 +-- services/galley/src/Galley/API/Teams.hs | 4 +-- services/galley/src/Galley/Cassandra/Team.hs | 2 +- 10 files changed, 54 insertions(+), 29 deletions(-) diff --git a/libs/cassandra-util/src/Cassandra.hs b/libs/cassandra-util/src/Cassandra.hs index 74dcdfc45f4..24406334813 100644 --- a/libs/cassandra-util/src/Cassandra.hs +++ b/libs/cassandra-util/src/Cassandra.hs @@ -58,6 +58,7 @@ import Cassandra.Exec as C ( BatchM, Client, ClientState, + GeneralPaginationState (..), MonadClient, Page (..), PageWithState (..), @@ -74,6 +75,8 @@ import Cassandra.Exec as C paginate, paginateC, paginateWithState, + paginationStateCassandra, + paginationStatePostgres, params, paramsP, paramsPagingState, diff --git a/libs/cassandra-util/src/Cassandra/Exec.hs b/libs/cassandra-util/src/Cassandra/Exec.hs index 8ef7d64337c..b5a6eb02bc8 100644 --- a/libs/cassandra-util/src/Cassandra/Exec.hs +++ b/libs/cassandra-util/src/Cassandra/Exec.hs @@ -25,6 +25,9 @@ module Cassandra.Exec x5, x1, paginateC, + GeneralPaginationState (..), + paginationStateCassandra, + paginationStatePostgres, PageWithState (..), paginateWithState, paginateWithStateC, @@ -97,9 +100,23 @@ paginateC q p r = go =<< lift (retry r (paginate q p)) when (hasMore page) $ go =<< lift (retry r (liftClient (nextPage page))) -data PageWithState a = PageWithState - { pwsResults :: [a], - pwsState :: Maybe Protocol.PagingState +data GeneralPaginationState a + = PaginationStateCassandra Protocol.PagingState + | PaginationStatePostgres a + +paginationStateCassandra :: GeneralPaginationState pgState -> Maybe Protocol.PagingState +paginationStateCassandra = \case + PaginationStateCassandra state -> Just state + PaginationStatePostgres {} -> Nothing + +paginationStatePostgres :: GeneralPaginationState pgState -> Maybe pgState +paginationStatePostgres = \case + PaginationStatePostgres pgState -> Just pgState + PaginationStateCassandra {} -> Nothing + +data PageWithState state res = PageWithState + { pwsResults :: [res], + pwsState :: Maybe (GeneralPaginationState state) } deriving (Functor) @@ -107,13 +124,13 @@ data PageWithState a = PageWithState -- serialised and sent to consumers of the API. The state is not good for long -- term storage as the bytestring format may change when the schema of a table -- changes or when cassandra is upgraded. -paginateWithState :: (MonadClient m, Tuple a, Tuple b, RunQ q) => q R a b -> QueryParams a -> m (PageWithState b) +paginateWithState :: (MonadClient m, Tuple a, Tuple b, RunQ q) => q R a b -> QueryParams a -> m (PageWithState x b) paginateWithState q p = do let p' = p {Protocol.pageSize = Protocol.pageSize p <|> Just 10000} r <- runQ q p' getResult r >>= \case Protocol.RowsResult m b -> - pure $ PageWithState b (pagingState m) + pure $ PageWithState b (PaginationStateCassandra <$> pagingState m) _ -> throwM $ UnexpectedResponse (hrHost r) (hrResponse r) -- | Like 'paginateWithState' but returns a conduit instead of one page. @@ -128,20 +145,20 @@ paginateWithState q p = do -- where -- getUsers state = paginateWithState getUsersQuery (paramsPagingState Quorum () 10000 state) -- @ -paginateWithStateC :: forall m a. (Monad m) => (Maybe Protocol.PagingState -> m (PageWithState a)) -> ConduitT () [a] m () +paginateWithStateC :: forall m res state. (Monad m) => (Maybe (GeneralPaginationState state) -> m (PageWithState state res)) -> ConduitT () [res] m () paginateWithStateC getPage = do go =<< lift (getPage Nothing) where - go :: PageWithState a -> ConduitT () [a] m () + go :: PageWithState state res -> ConduitT () [res] m () go page = do unless (null page.pwsResults) $ yield (page.pwsResults) when (pwsHasMore page) $ - go =<< lift (getPage page.pwsState) + go =<< lift (getPage $ page.pwsState) paramsPagingState :: Consistency -> a -> Int32 -> Maybe Protocol.PagingState -> QueryParams a paramsPagingState c p n state = QueryParams c False p (Just n) state Nothing Nothing {-# INLINE paramsPagingState #-} -pwsHasMore :: PageWithState a -> Bool +pwsHasMore :: PageWithState a b -> Bool pwsHasMore = isJust . pwsState diff --git a/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs b/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs index 12210c3c8a3..a775a56beb3 100644 --- a/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs +++ b/libs/polysemy-wire-zoo/src/Wire/Sem/Paging/Cassandra.hs @@ -43,7 +43,7 @@ data CassandraPaging type instance E.PagingState CassandraPaging a = PagingState -type instance E.Page CassandraPaging a = PageWithState a +type instance E.Page CassandraPaging a = PageWithState Void a type instance E.PagingBounds CassandraPaging TeamId = Range 1 100 Int32 diff --git a/libs/wire-api/src/Wire/API/Team/Member.hs b/libs/wire-api/src/Wire/API/Team/Member.hs index 2d75ce2e04d..55453872d7c 100644 --- a/libs/wire-api/src/Wire/API/Team/Member.hs +++ b/libs/wire-api/src/Wire/API/Team/Member.hs @@ -263,8 +263,13 @@ instance ToSchema TeamMembersPage where type TeamMembersPagingState = MultiTablePagingState TeamMembersPagingName TeamMembersTable -teamMemberPagingState :: PageWithState TeamMember -> TeamMembersPagingState -teamMemberPagingState p = MultiTablePagingState TeamMembersTable (LBS.toStrict . C.unPagingState <$> pwsState p) +teamMemberPagingState :: PageWithState Void TeamMember -> TeamMembersPagingState +teamMemberPagingState p = + MultiTablePagingState + TeamMembersTable + ( LBS.toStrict . C.unPagingState + <$> (C.paginationStateCassandra =<< p.pwsState) + ) instance ToParamSchema TeamMembersPagingState where toParamSchema _ = toParamSchema (Proxy @Text) diff --git a/libs/wire-subsystems/src/Wire/UserStore.hs b/libs/wire-subsystems/src/Wire/UserStore.hs index a7bc2e9fcdb..df314e5fb00 100644 --- a/libs/wire-subsystems/src/Wire/UserStore.hs +++ b/libs/wire-subsystems/src/Wire/UserStore.hs @@ -19,7 +19,7 @@ module Wire.UserStore where -import Cassandra (PageWithState (..), PagingState) +import Cassandra (GeneralPaginationState, PageWithState (..)) import Data.Default import Data.Handle import Data.Id @@ -72,7 +72,7 @@ data UserStore m a where CreateUser :: NewStoredUser -> Maybe (ConvId, Maybe TeamId) -> UserStore m () GetIndexUser :: UserId -> UserStore m (Maybe IndexUser) DoesUserExist :: UserId -> UserStore m Bool - GetIndexUsersPaginated :: Int32 -> Maybe PagingState -> UserStore m (PageWithState IndexUser) + GetIndexUsersPaginated :: Int32 -> Maybe (GeneralPaginationState Void) -> UserStore m (PageWithState Void IndexUser) GetUsers :: [UserId] -> UserStore m [StoredUser] UpdateUser :: UserId -> StoredUserUpdate -> UserStore m () UpdateEmail :: UserId -> EmailAddress -> UserStore m () @@ -112,8 +112,8 @@ data UserStore m a where UpdateFeatureConferenceCalling :: UserId -> Maybe FeatureStatus -> UserStore m () LookupFeatureConferenceCalling :: UserId -> UserStore m (Maybe FeatureStatus) DeleteServiceUser :: ProviderId -> ServiceId -> BotId -> UserStore m () - LookupServiceUsers :: ProviderId -> ServiceId -> Maybe PagingState -> UserStore m (PageWithState (BotId, ConvId, Maybe TeamId)) - LookupServiceUsersForTeam :: ProviderId -> ServiceId -> TeamId -> Maybe PagingState -> UserStore m (PageWithState (BotId, ConvId)) + LookupServiceUsers :: ProviderId -> ServiceId -> Maybe (GeneralPaginationState Void) -> UserStore m (PageWithState Void (BotId, ConvId, Maybe TeamId)) + LookupServiceUsersForTeam :: ProviderId -> ServiceId -> TeamId -> Maybe (GeneralPaginationState Void) -> UserStore m (PageWithState Void (BotId, ConvId)) makeSem ''UserStore diff --git a/libs/wire-subsystems/src/Wire/UserStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/UserStore/Cassandra.hs index 98b930f0cf1..ce3da6246a7 100644 --- a/libs/wire-subsystems/src/Wire/UserStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/UserStore/Cassandra.hs @@ -47,7 +47,7 @@ interpretUserStoreCassandra casClient = GetUsers uids -> getUsersImpl uids DoesUserExist uid -> doesUserExistImpl uid GetIndexUser uid -> getIndexUserImpl uid - GetIndexUsersPaginated pageSize mPagingState -> getIndexUserPaginatedImpl pageSize mPagingState + GetIndexUsersPaginated pageSize mPagingState -> getIndexUserPaginatedImpl pageSize (paginationStateCassandra =<< mPagingState) UpdateUser uid update -> updateUserImpl uid update UpdateEmail uid email -> updateEmailImpl uid email UpdateEmailUnvalidated uid email -> updateEmailUnvalidatedImpl uid email @@ -77,8 +77,8 @@ interpretUserStoreCassandra casClient = DeleteEmail uid -> deleteEmailImpl uid SetUserSearchable uid searchable -> setUserSearchableImpl uid searchable DeleteServiceUser pid sid bid -> deleteServiceUserImpl pid sid bid - LookupServiceUsers pid sid mPagingState -> lookupServiceUsersImpl pid sid mPagingState - LookupServiceUsersForTeam pid sid tid mPagingState -> lookupServiceUsersForTeamImpl pid sid tid mPagingState + LookupServiceUsers pid sid mPagingState -> lookupServiceUsersImpl pid sid (paginationStateCassandra =<< mPagingState) + LookupServiceUsersForTeam pid sid tid mPagingState -> lookupServiceUsersForTeamImpl pid sid tid (paginationStateCassandra =<< mPagingState) createUserImpl :: NewStoredUser -> Maybe (ConvId, Maybe TeamId) -> Client () createUserImpl new mbConv = retry x5 . batch $ do @@ -122,7 +122,7 @@ getIndexUserImpl u = do cql :: PrepQuery R (Identity UserId) (TupleType IndexUser) cql = prepared . QueryString $ getIndexUserBaseQuery <> " WHERE id = ?" -getIndexUserPaginatedImpl :: Int32 -> Maybe PagingState -> Client (PageWithState IndexUser) +getIndexUserPaginatedImpl :: Int32 -> Maybe PagingState -> Client (PageWithState x IndexUser) getIndexUserPaginatedImpl pageSize mPagingState = indexUserFromTuple <$$> paginateWithState cql (paramsPagingState LocalQuorum () pageSize mPagingState) where @@ -414,7 +414,7 @@ lookupServiceUsersImpl :: ProviderId -> ServiceId -> Maybe PagingState -> - Client (PageWithState (BotId, ConvId, Maybe TeamId)) + Client (PageWithState Void (BotId, ConvId, Maybe TeamId)) lookupServiceUsersImpl pid sid mPagingState = paginateWithState cql (paramsPagingState LocalQuorum (pid, sid) 100 mPagingState) where @@ -428,7 +428,7 @@ lookupServiceUsersForTeamImpl :: ServiceId -> TeamId -> Maybe PagingState -> - Client (PageWithState (BotId, ConvId)) + Client (PageWithState Void (BotId, ConvId)) lookupServiceUsersForTeamImpl pid sid tid mPagingState = paginateWithState cql (paramsPagingState LocalQuorum (pid, sid, tid) 100 mPagingState) where diff --git a/services/brig/src/Brig/API/Public.hs b/services/brig/src/Brig/API/Public.hs index 89466550dbc..c3f262d8a95 100644 --- a/services/brig/src/Brig/API/Public.hs +++ b/services/brig/src/Brig/API/Public.hs @@ -1380,14 +1380,14 @@ listConnections uid Public.GetMultiTablePageRequest {..} = do Just (Public.ConnectionPagingState Public.PagingRemotes stateBS) -> remotesOnly self (mkState <$> stateBS) (fromRange gmtprSize) _ -> localsAndRemotes self (fmap mkState . Public.mtpsState =<< gmtprState) gmtprSize where - pageToConnectionsPage :: Public.LocalOrRemoteTable -> Data.PageWithState Public.UserConnection -> Public.ConnectionsPage + pageToConnectionsPage :: Public.LocalOrRemoteTable -> Data.PageWithState Void Public.UserConnection -> Public.ConnectionsPage pageToConnectionsPage table page@Data.PageWithState {..} = Public.MultiTablePage { mtpResults = pwsResults, mtpHasMore = C.pwsHasMore page, -- FUTUREWORK confusingly, using 'ConversationPagingState' instead of 'ConnectionPagingState' doesn't fail any tests. -- Is this type actually useless? Or the tests not good enough? - mtpPagingState = Public.ConnectionPagingState table (LBS.toStrict . C.unPagingState <$> pwsState) + mtpPagingState = Public.ConnectionPagingState table (LBS.toStrict . C.unPagingState <$> (Data.paginationStateCassandra =<< pwsState)) } mkState :: ByteString -> C.PagingState diff --git a/services/brig/src/Brig/Data/Connection.hs b/services/brig/src/Brig/Data/Connection.hs index fbe8221018e..3a1525b8b32 100644 --- a/services/brig/src/Brig/Data/Connection.hs +++ b/services/brig/src/Brig/Data/Connection.hs @@ -176,7 +176,7 @@ lookupLocalConnectionsPage :: Local UserId -> Maybe PagingState -> Range 1 1000 Int32 -> - m (PageWithState UserConnection) + m (PageWithState Void UserConnection) lookupLocalConnectionsPage self pagingState (fromRange -> size) = fmap (toLocalUserConnection self) <$> paginateWithState connectionsSelect (paramsPagingState LocalQuorum (Identity (tUnqualified self)) size pagingState) @@ -186,7 +186,7 @@ lookupRemoteConnectionsPage :: Local UserId -> Maybe PagingState -> Int32 -> - m (PageWithState UserConnection) + m (PageWithState Void UserConnection) lookupRemoteConnectionsPage self pagingState size = fmap (toRemoteUserConnection self) <$> paginateWithState diff --git a/services/galley/src/Galley/API/Teams.hs b/services/galley/src/Galley/API/Teams.hs index 7bee76d5937..b8d6f5105a0 100644 --- a/services/galley/src/Galley/API/Teams.hs +++ b/services/galley/src/Galley/API/Teams.hs @@ -417,7 +417,7 @@ getTeamMembers lzusr tid mbMaxResults mbPagingState = do let mLimit = fromMaybe (unsafeRange Public.hardTruncationLimit) mbMaxResults if member `hasPermission` SearchContacts then do - pws :: PageWithState TeamMember <- E.listTeamMembers @CassandraPaging tid mState mLimit + pws :: PageWithState Void TeamMember <- E.listTeamMembers @CassandraPaging tid mState mLimit -- FUTUREWORK: Remove this via-Brig filtering when user and -- team_member tables are migrated to Postgres. We currently -- can't filter in the database because Cassandra doesn't @@ -447,7 +447,7 @@ getTeamMembers lzusr tid mbMaxResults mbPagingState = do let uids = uid : maybeToList invitee TeamSubsystem.internalSelectTeamMembers tid uids <&> toTeamSingleMembersPage member where - toTeamMembersPage :: TeamMember -> C.PageWithState TeamMember -> TeamMembersPage + toTeamMembersPage :: TeamMember -> C.PageWithState Void TeamMember -> TeamMembersPage toTeamMembersPage member p = let withPerms = (member `canSeePermsOf`) in TeamMembersPage $ diff --git a/services/galley/src/Galley/Cassandra/Team.hs b/services/galley/src/Galley/Cassandra/Team.hs index 40513789326..e541e7b136a 100644 --- a/services/galley/src/Galley/Cassandra/Team.hs +++ b/services/galley/src/Galley/Cassandra/Team.hs @@ -177,7 +177,7 @@ teamMembersPageFrom :: TeamId -> Maybe PagingState -> Range 1 HardTruncationLimit Int32 -> - Client (PageWithState TeamMember) + Client (PageWithState Void TeamMember) teamMembersPageFrom lh tid pagingState (fromRange -> max) = do page <- paginateWithState Cql.selectTeamMembers (paramsPagingState LocalQuorum (Identity tid) max pagingState) members <- mapM (newTeamMember' lh tid) (pwsResults page)