diff --git a/Makefile b/Makefile index ecf42013455..d451a3b0b08 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,8 @@ fake-aws fake-aws-s3 fake-aws-sqs aws-ingress fluent-bit kibana backoffice \ calling-test demo-smtp elasticsearch-curator elasticsearch-external \ elasticsearch-ephemeral minio-external cassandra-external \ ingress-nginx-controller nginx-ingress-services reaper restund \ -k8ssandra-test-cluster ldap-scim-bridge wire-server-enterprise +k8ssandra-test-cluster ldap-scim-bridge wire-server-enterprise \ +integration KIND_CLUSTER_NAME := wire-server HELM_PARALLELISM ?= 1 # 1 for sequential tests; 6 for all-parallel tests # (run `psql -h localhost -p 5432 -d backendA -U wire-server -w` for the list of options for PSQL_DB) diff --git a/charts/cannon/templates/configmap.yaml b/charts/cannon/templates/configmap.yaml index e7fb94bfba5..b5190a812c3 100644 --- a/charts/cannon/templates/configmap.yaml +++ b/charts/cannon/templates/configmap.yaml @@ -39,6 +39,12 @@ data: rabbitMqMaxConnections: {{ .config.rabbitMqMaxConnections }} rabbitMqMaxChannels: {{ .config.rabbitMqMaxChannels }} + {{- with .config.pulsar }} + pulsar: + host: {{ .host }} + port: {{ .port }} + {{- end }} + drainOpts: gracePeriodSeconds: {{ .config.drainOpts.gracePeriodSeconds }} millisecondsBetweenBatches: {{ .config.drainOpts.millisecondsBetweenBatches }} diff --git a/charts/cannon/values.yaml b/charts/cannon/values.yaml index d1428b21d80..dd05b5cc4b9 100644 --- a/charts/cannon/values.yaml +++ b/charts/cannon/values.yaml @@ -29,6 +29,10 @@ config: # name: # key: + pulsar: + host: localhost + port: 6650 + # See also the section 'Controlling the speed of websocket draining during # cannon pod replacement' in docs/how-to/install/configuration-options.rst drainOpts: diff --git a/charts/gundeck/templates/configmap.yaml b/charts/gundeck/templates/configmap.yaml index 26b04fc9337..b1c19ebfcdb 100644 --- a/charts/gundeck/templates/configmap.yaml +++ b/charts/gundeck/templates/configmap.yaml @@ -41,6 +41,18 @@ data: {{- end }} {{- end }} + {{- with .pulsar }} + pulsar: + host: {{ .host }} + port: {{ .port }} + {{- end }} + + {{- with .pulsarAdmin }} + pulsarAdmin: + host: {{ .host }} + port: {{ .port }} + {{- end }} + redis: host: {{ .redis.host }} port: {{ .redis.port }} diff --git a/charts/gundeck/values.yaml b/charts/gundeck/values.yaml index 1a989c8510a..d79857d7c4b 100644 --- a/charts/gundeck/values.yaml +++ b/charts/gundeck/values.yaml @@ -50,6 +50,14 @@ config: # name: # key: + pulsar: + host: localhost + port: 6650 + + pulsarAdmin: + host: localhost + port: 8080 + # To enable additional writes during a migration: # redisAdditionalWrite: # host: redis-two diff --git a/deploy/dockerephemeral/docker-compose.yaml b/deploy/dockerephemeral/docker-compose.yaml index 96eb142a547..a0f379c1837 100644 --- a/deploy/dockerephemeral/docker-compose.yaml +++ b/deploy/dockerephemeral/docker-compose.yaml @@ -419,6 +419,50 @@ services: # - DNS_SERVER_RECURSION_ALLOWED_NETWORKS=127.0.0.1, 192.168.1.0/24 #Comma separated list of IP addresses or network addresses to allow recursion. Valid only for `UseSpecifiedNetworkACL` recursion option. This option is obsolete and DNS_SERVER_RECURSION_NETWORK_ACL should be used instead. # - DNS_SERVER_ENABLE_BLOCKING=false #Sets the DNS server to block domain names using Blocked Zone and Block List Zone. + pulsar: + image: apachepulsar/pulsar:latest + container_name: pulsar + # 8080 belongs to nginz + ports: + - 6650:6650 + - 5080:8080 + networks: + - demo_wire + command: bin/pulsar standalone + + # Admin GUI + # Url: http://localhost:9527 + # Username: pulsar + # Password: walter-frosch + pulsar-manager: + image: apachepulsar/pulsar-manager:latest + container_name: pulsar-manager + ports: + - 9527:9527 + - 7750:7750 + networks: + - demo_wire + environment: + - "SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties" + healthcheck: + test: + ["CMD-SHELL", "curl http://127.0.0.1:7750/actuator/health || exit 1 "] + interval: 3s + timeout: 5s + retries: 100 + + pulsar-init: + container_name: pulsar-init + image: curlimages/curl:latest + networks: + - demo_wire + command: init-pulsar + volumes: + - ./pulsar-config/init-pulsar.sh:/bin/init-pulsar + depends_on: + pulsar-manager: + condition: service_healthy + volumes: redis-node-1-data: redis-node-2-data: diff --git a/deploy/dockerephemeral/pulsar-config/init-pulsar.sh b/deploy/dockerephemeral/pulsar-config/init-pulsar.sh new file mode 100755 index 00000000000..5e39894f117 --- /dev/null +++ b/deploy/dockerephemeral/pulsar-config/init-pulsar.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env sh + +# Create a superuser in pulsar-manager. +# username: pulsar +# password: walter-frosch +CSRF_TOKEN=$(curl http://pulsar-manager:7750/pulsar-manager/csrf-token) +curl -v -H "X-XSRF-TOKEN: $CSRF_TOKEN" \ + -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \ + -H "Content-Type: application/json" \ + -X PUT http://pulsar-manager:7750/pulsar-manager/users/superuser \ + -d '{"name": "pulsar", "password": "walter-frosch", "description": "test", "email": "username@test.org"}' + +curl -v -X PUT http://pulsar:8080/admin/v2/tenants/wire \ + -H "Content-Type: application/json" \ + -d '{"adminRoles": ["pulsar"], "allowedClusters": ["standalone"]}' + +curl -v -X PUT \ + http://pulsar:8080/admin/v2/namespaces/wire/user-notifications \ + -H "Content-Type: application/json" \ + -d '{"message_ttl_in_seconds":3600}' diff --git a/integration/default.nix b/integration/default.nix index 04163fc5074..03512c4bc51 100644 --- a/integration/default.nix +++ b/integration/default.nix @@ -74,6 +74,7 @@ , split , stm , streaming-commons +, streamly , string-conversions , system-linux-proc , tagged @@ -179,6 +180,7 @@ mkDerivation { split stm streaming-commons + streamly string-conversions system-linux-proc tagged diff --git a/integration/integration.cabal b/integration/integration.cabal index 0cdf6972ef1..70386331cfc 100644 --- a/integration/integration.cabal +++ b/integration/integration.cabal @@ -184,6 +184,7 @@ library Test.MLS.Unreachable Test.NginxZAuthModule Test.Notifications + Test.NotificationsBenchmark Test.OAuth Test.PasswordReset Test.Presence @@ -299,6 +300,7 @@ library , split , stm , streaming-commons + , streamly , string-conversions , system-linux-proc , tagged diff --git a/integration/test/Test/NotificationsBenchmark.hs b/integration/test/Test/NotificationsBenchmark.hs new file mode 100644 index 00000000000..c137414f22a --- /dev/null +++ b/integration/test/Test/NotificationsBenchmark.hs @@ -0,0 +1,180 @@ +module Test.NotificationsBenchmark where + +import API.Brig +import API.BrigCommon +import API.Common +import API.GundeckInternal +import Control.Concurrent +import Control.Concurrent.Async (Async) +import Control.Concurrent.STM.TBQueue +import Control.Monad.Codensity (Codensity (..)) +import Control.Monad.Reader (MonadReader (ask), asks) +import Control.Monad.Reader.Class (local) +import Control.Retry +import qualified Data.Map.Strict as Map +import Data.Time +import Debug.Trace +import GHC.Conc.Sync +import GHC.Stack +import SetupHelpers +import qualified Streamly.Data.Fold.Prelude as Fold +import qualified Streamly.Data.Stream.Prelude as Stream +import System.Random +import qualified Test.Events as TestEvents +import Testlib.Prekeys +import Testlib.Prelude +import UnliftIO (async, waitAnyCancel) + +data TestRecipient = TestRecipient + { user :: Value, + clientIds :: [String] + } + deriving (Show) + +testBench :: (HasCallStack) => App () +testBench = do + shardingGroupCount <- asks (.shardingGroupCount) + shardingGroup <- asks (.shardingGroup) + maxUserNo <- asks (.maxUserNo) + env <- ask + + -- Preparation + let threadCount = min numCapabilities (fromIntegral maxUserNo) + parCfg = Stream.maxThreads threadCount . Stream.ordered False + toMap = Fold.foldl' (\kv (k, v) -> Map.insert k v kv) Map.empty + -- Later, we only read from this map. Thus, it doesn't have to be thread-safe. + userMap :: Map Word TestRecipient <- + Stream.fromList [0 :: Word .. maxUserNo] + & Stream.filter ((shardingGroup ==) . (`mod` shardingGroupCount)) + & Stream.parMapM parCfg (\i -> generateTestRecipient >>= \r -> pure (i, r)) + & Stream.fold toMap + + now <- liftIO getCurrentTime + + -- TODO: To be replaced with real data from the file. (See + -- https://wearezeta.atlassian.net/wiki/spaces/PET/pages/2118680620/Simulating+production-like+data) + let fakeData = zip (plusDelta now <$> [0 :: Word ..]) (cycle [1 .. maxUserNo]) + + tbchans :: [(Int, TBQueue (UTCTime, Word))] <- liftIO $ forM [1 .. threadCount] $ \i -> do + q <- atomically $ newTBQueue 1 -- capacity 100, adjust as needed + pure (i, q) + + workers :: [Async ()] <- forM tbchans $ \(i, chan) -> liftIO + . async + -- . handle (\(e :: SomeException) -> traceM $ "Caught exception in worker id=" <> show i <> " e=" <> show e) + . forever + . runAppWithEnv env + $ do + traceM $ "worker taking from id=" <> show i + (t, uNo) <- liftIO $ atomically $ readTBQueue chan + traceM $ "worker took from id=" <> show i <> " val=" <> show (t, uNo) + sendAndReceive uNo userMap + traceM $ "worker end id=" <> show i + + producer <- async $ forM_ fakeData $ \(t, uNo) -> + when ((uNo `mod` shardingGroupCount) == shardingGroup) + $ let workerShard = fromIntegral uNo `mod` threadCount + (i, chan) = tbchans !! workerShard + in do + waitForTimeStamp t + traceM $ "producer putting to shard=" <> show workerShard <> " id=" <> show i <> " val=" <> show (t, uNo) + liftIO $ atomically $ writeTBQueue chan (t, uNo) + + liftIO . void . waitAnyCancel $ producer : workers + +waitForTimeStamp :: UTCTime -> App () +waitForTimeStamp timestamp = liftIO $ do + now <- getCurrentTime + when (now < timestamp) + $ + -- Event comes from the simulated future: Wait here until now and timestamp are aligned. + let delta = diffTimeToMicroSeconds $ diffUTCTime timestamp now + in print ("Waiting " ++ show delta ++ " microseconds. (timestamp, now)" ++ show (timestamp, now)) + >> threadDelay delta + where + diffTimeToMicroSeconds :: NominalDiffTime -> Int + diffTimeToMicroSeconds dt = floor @Double (realToFrac dt * 1_000_000) + +plusDelta :: UTCTime -> Word -> UTCTime +plusDelta timestamp deltaMilliSeconds = addUTCTime (fromIntegral deltaMilliSeconds / 1000) timestamp + +sendAndReceive :: Word -> Map Word TestRecipient -> App () +sendAndReceive userNo userMap = do + print $ "pushing to user" ++ show userNo + let testRecipient = userMap Map.! (fromIntegral userNo) + alice = testRecipient.user + + r <- recipient alice + payload :: Value <- toJSON <$> liftIO randomPayload + now <- liftIO $ getCurrentTime + let push = + object + [ "recipients" .= [r], + "payload" + .= [ object + [ "foo" .= payload, + "sent_at" .= now + ] + ] + ] + + void $ postPush alice [push] >>= getBody 200 + + traceM $ "pushed to userNo=" <> show userNo <> " push=" <> show push + + messageDeliveryTimeout <- asks $ fromIntegral . (.maxDeliveryDelay) + traceM $ "XXX all clientIds " <> show testRecipient.clientIds + forM_ (testRecipient.clientIds) $ \(cid :: String) -> do + traceM $ "XXX using clientId=" <> show cid + runCodensity (TestEvents.createEventsWebSocket alice (Just cid)) $ \ws -> do + -- TODO: Tweak this value to the least acceptable event delivery duration + local (setTimeoutTo messageDeliveryTimeout) $ TestEvents.assertFindsEvent ws $ \e -> do + receivedAt <- liftIO getCurrentTime + p <- e %. "data.event.payload.0" + sentAt <- + (p %. "sent_at" >>= asString) + <&> ( \sentAtStr -> + fromMaybe (error ("Cannot parse timestamp: " <> sentAtStr)) $ parseUTC sentAtStr + ) + print $ "Message sent/receive delta: " ++ show (diffUTCTime receivedAt sentAt) + + p %. "foo" `shouldMatch` payload + traceM $ "XXX Succeeded for clientId=" <> show cid + where + -- \| Generate a random string with random length up to 2048 bytes + randomPayload :: IO String + randomPayload = + -- Measured with + -- `kubectl exec --namespace databases -it gundeck-gundeck-eks-eu-west-1a-sts-0 -- sh -c 'cqlsh -e "select blobAsText(payload) from gundeck.notifications LIMIT 5000;" ' | sed 's/^[ \t]*//;s/[ \t]*$//' | wc` + let len :: Int = 884 -- measured in prod + in mapM (\_ -> randomRIO ('\32', '\126')) [1 .. len] -- printable ASCII + parseUTC :: String -> Maybe UTCTime + parseUTC = parseTimeM True defaultTimeLocale "%Y-%m-%dT%H:%M:%S%QZ" + +setTimeoutTo :: Int -> Env -> Env +setTimeoutTo tSecs env = env {timeOutSeconds = tSecs} + +generateTestRecipient :: (HasCallStack) => App TestRecipient +generateTestRecipient = do + print "generateTestRecipient" + user <- recover $ (randomUser OwnDomain def) + r <- randomRIO @Int (1, 8) + clientIds <- forM [0 .. r] $ \i -> do + client <- + recover + $ addClient + user + def + { acapabilities = Just ["consumable-notifications"], + prekeys = Nothing, + lastPrekey = Just $ (someLastPrekeysRendered !! i), + clabel = "Test Client No. " <> show i, + model = "Test Model No. " <> show i + } + >>= getJSON 201 + objId client + + pure $ TestRecipient user clientIds + where + recover :: App a -> App a + recover = recoverAll (limitRetriesByCumulativeDelay 300 (exponentialBackoff 1_000_000)) . const diff --git a/integration/test/Testlib/Env.hs b/integration/test/Testlib/Env.hs index 67a90228ad0..19804252727 100644 --- a/integration/test/Testlib/Env.hs +++ b/integration/test/Testlib/Env.hs @@ -62,8 +62,8 @@ serviceHostPort m Stern = m.stern serviceHostPort m FederatorInternal = m.federatorInternal serviceHostPort m WireServerEnterprise = m.wireServerEnterprise -mkGlobalEnv :: FilePath -> Codensity IO GlobalEnv -mkGlobalEnv cfgFile = do +mkGlobalEnv :: FilePath -> Word -> Codensity IO GlobalEnv +mkGlobalEnv cfgFile shardingGroup = do eith <- liftIO $ Yaml.decodeFileEither cfgFile intConfig <- liftIO $ case eith of Left err -> do @@ -145,7 +145,11 @@ mkGlobalEnv cfgFile = do gDNSMockServerConfig = intConfig.dnsMockServer, gCellsEventQueue = intConfig.cellsEventQueue, gCellsEventWatchersLock, - gCellsEventWatchers + gCellsEventWatchers, + gShardingGroupCount = intConfig.shardingGroupCount, + gShardingGroup = shardingGroup, + gMaxUserNo = intConfig.maxUserNo, + gMaxDeliveryDelay = intConfig.maxDeliveryDelay } where createSSLContext :: Maybe FilePath -> IO (Maybe OpenSSL.SSLContext) @@ -201,7 +205,11 @@ mkEnv currentTestName ge = do dnsMockServerConfig = ge.gDNSMockServerConfig, cellsEventQueue = ge.gCellsEventQueue, cellsEventWatchersLock = ge.gCellsEventWatchersLock, - cellsEventWatchers = ge.gCellsEventWatchers + cellsEventWatchers = ge.gCellsEventWatchers, + shardingGroupCount = ge.gShardingGroupCount, + shardingGroup = ge.gShardingGroup, + maxUserNo = ge.gMaxUserNo, + maxDeliveryDelay = ge.gMaxDeliveryDelay } allCiphersuites :: [Ciphersuite] diff --git a/integration/test/Testlib/Options.hs b/integration/test/Testlib/Options.hs index f109e13d8f7..7e9bfc5b479 100644 --- a/integration/test/Testlib/Options.hs +++ b/integration/test/Testlib/Options.hs @@ -27,7 +27,9 @@ data TestOptions = TestOptions excludeTests :: [String], listTests :: Bool, xmlReport :: Maybe FilePath, - configFile :: String + configFile :: String, + shardingGroup :: Word + } parser :: Parser TestOptions @@ -64,6 +66,13 @@ parser = <> help "Use configuration FILE" <> value "services/integration.yaml" ) + <*> option + auto + ( long "sharding-group" + <> short 's' + <> help "The sharding group of this instance" + <> value 0 + ) optInfo :: ParserInfo TestOptions optInfo = diff --git a/integration/test/Testlib/Run.hs b/integration/test/Testlib/Run.hs index 1ae1ddf06dd..7447230f444 100644 --- a/integration/test/Testlib/Run.hs +++ b/integration/test/Testlib/Run.hs @@ -123,6 +123,7 @@ main = do opts <- getOptions let f = testFilter opts cfg = opts.configFile + shardingGroup = opts.shardingGroup allTests <- mkAllTests let tests = @@ -132,10 +133,10 @@ main = do let qualifiedName = fromMaybe module_ (stripPrefix "Test." module_) <> "." <> name in (qualifiedName, summary, full, action) - if opts.listTests then doListTests tests else runTests tests opts.xmlReport cfg + if opts.listTests then doListTests tests else runTests tests opts.xmlReport cfg shardingGroup -runTests :: [(String, x, y, App ())] -> Maybe FilePath -> FilePath -> IO () -runTests tests mXMLOutput cfg = do +runTests :: [(String, x, y, App ())] -> Maybe FilePath -> FilePath -> Word -> IO () +runTests tests mXMLOutput cfg shardingGroup = do output <- newChan let displayOutput = readChan output >>= \case @@ -180,7 +181,7 @@ runTests tests mXMLOutput cfg = do where mkEnvs :: FilePath -> Codensity IO (GlobalEnv, Env) mkEnvs fp = do - g <- mkGlobalEnv fp + g <- mkGlobalEnv fp shardingGroup e <- mkEnv Nothing g pure (g, e) diff --git a/integration/test/Testlib/RunServices.hs b/integration/test/Testlib/RunServices.hs index 4a9f6403d46..5fbcabc2e28 100644 --- a/integration/test/Testlib/RunServices.hs +++ b/integration/test/Testlib/RunServices.hs @@ -88,8 +88,10 @@ main = do let cp = proc "sh" (["-c", "exec \"$@\"", "--"] <> opts.runSubprocess) (_, _, _, ph) <- createProcess cp exitWith =<< waitForProcess ph + -- The shardingGroup only matters for the testBench test; probably not here. + shardingGroup = 0 - runCodensity (mkGlobalEnv cfg >>= mkEnv Nothing) $ \env -> + runCodensity (mkGlobalEnv cfg shardingGroup >>= mkEnv Nothing) $ \env -> runAppWithEnv env $ lowerCodensity $ do diff --git a/integration/test/Testlib/Types.hs b/integration/test/Testlib/Types.hs index 29aa0b2b4ba..99044af3c6e 100644 --- a/integration/test/Testlib/Types.hs +++ b/integration/test/Testlib/Types.hs @@ -144,7 +144,11 @@ data GlobalEnv = GlobalEnv gDNSMockServerConfig :: DNSMockServerConfig, gCellsEventQueue :: String, gCellsEventWatchersLock :: MVar (), - gCellsEventWatchers :: IORef (Map String QueueWatcher) + gCellsEventWatchers :: IORef (Map String QueueWatcher), + gShardingGroupCount :: Word, + gShardingGroup :: Word, + gMaxUserNo :: Word, + gMaxDeliveryDelay :: Word } data IntegrationConfig = IntegrationConfig @@ -160,7 +164,10 @@ data IntegrationConfig = IntegrationConfig rabbitmqV1 :: RabbitMqAdminOpts, cassandra :: CassandraConfig, dnsMockServer :: DNSMockServerConfig, - cellsEventQueue :: String + cellsEventQueue :: String, + shardingGroupCount :: Word, + maxUserNo :: Word, + maxDeliveryDelay :: Word } deriving (Show, Generic) @@ -181,6 +188,9 @@ instance FromJSON IntegrationConfig where <*> o .: fromString "cassandra" <*> o .: fromString "dnsMockServer" <*> o .: fromString "cellsEventQueue" + <*> o .: fromString "shardingGroupCount" + <*> o .: fromString "maxUserNo" + <*> o .: fromString "maxDeliveryDelay" data ServiceMap = ServiceMap { brig :: HostPort, @@ -271,7 +281,11 @@ data Env = Env dnsMockServerConfig :: DNSMockServerConfig, cellsEventQueue :: String, cellsEventWatchersLock :: MVar (), - cellsEventWatchers :: IORef (Map String QueueWatcher) + cellsEventWatchers :: IORef (Map String QueueWatcher), + shardingGroupCount :: Word, + shardingGroup :: Word, + maxUserNo :: Word, + maxDeliveryDelay :: Word } data Response = Response @@ -446,7 +460,7 @@ hoistCodensity m = Codensity $ \k -> do getServiceMap :: (HasCallStack) => String -> App ServiceMap getServiceMap fedDomain = do env <- ask - assertJust ("Could not find service map for federation domain: " <> fedDomain) (Map.lookup fedDomain env.serviceMap) + assertJust ("Could not find service map for federation domain: " <> fedDomain <> " in " <> show (Map.keys env.serviceMap)) (Map.lookup fedDomain env.serviceMap) getMLSState :: App MLSState getMLSState = do diff --git a/libs/cassandra-util/src/Cassandra/Options.hs b/libs/cassandra-util/src/Cassandra/Options.hs index 6ca3273636c..04ac0c2a375 100644 --- a/libs/cassandra-util/src/Cassandra/Options.hs +++ b/libs/cassandra-util/src/Cassandra/Options.hs @@ -23,6 +23,7 @@ module Cassandra.Options where import Data.Aeson.TH +import Data.Text qualified as T import Imports data Endpoint = Endpoint @@ -47,3 +48,9 @@ data CassandraOpts = CassandraOpts deriving (Show, Generic) deriveFromJSON defaultOptions ''CassandraOpts + +toPulsarUrl :: Endpoint -> String +toPulsarUrl e = "pulsar://" <> T.unpack e.host <> ":" <> show e.port + +toPulsarAdminUrl :: Endpoint -> String +toPulsarAdminUrl e = "https://" <> T.unpack e.host <> ":" <> show e.port diff --git a/libs/extended/default.nix b/libs/extended/default.nix index 4090a02a779..fc2c68d9103 100644 --- a/libs/extended/default.nix +++ b/libs/extended/default.nix @@ -27,6 +27,7 @@ , metrics-wai , monad-control , prometheus-client +, pulsar-client-hs , retry , servant , servant-client @@ -70,6 +71,7 @@ mkDerivation { metrics-wai monad-control prometheus-client + pulsar-client-hs retry servant servant-client diff --git a/libs/extended/extended.cabal b/libs/extended/extended.cabal index a771fe15901..2821479d2c8 100644 --- a/libs/extended/extended.cabal +++ b/libs/extended/extended.cabal @@ -23,6 +23,7 @@ library Hasql.Pool.Extended Network.AMQP.Extended Network.RabbitMqAdmin + Pulsar.Client.Logging Servant.API.Extended Servant.API.Extended.Endpath System.Logger.Extended @@ -98,6 +99,7 @@ library , metrics-wai , monad-control , prometheus-client + , pulsar-client-hs , retry , servant , servant-client diff --git a/libs/extended/src/Pulsar/Client/Logging.hs b/libs/extended/src/Pulsar/Client/Logging.hs new file mode 100644 index 00000000000..d333a24bfd5 --- /dev/null +++ b/libs/extended/src/Pulsar/Client/Logging.hs @@ -0,0 +1,43 @@ +module Pulsar.Client.Logging where + +import Imports +import Pulsar.Client qualified as Pulsar +import System.Logger qualified as Log + +onPulsarError :: (MonadIO m) => String -> Log.Logger -> Pulsar.RawResult -> m () +onPulsarError provenance logger result = + Log.err logger $ + Log.msg message + . Log.field "provenance" provenance + where + message = + "error: " <> pulsarResultToString result + +pulsarResultToString :: Pulsar.RawResult -> String +pulsarResultToString result = case Pulsar.renderResult result of + Just r -> show r + Nothing -> (show . Pulsar.unRawResult) result + +pulsarClientLogger :: (MonadIO m) => String -> Log.Logger -> Pulsar.LogLevel -> Pulsar.LogFile -> Pulsar.LogLine -> Pulsar.LogMessage -> m () +pulsarClientLogger provenance logger level file line message = + Log.log logger (toLogLevel level) $ + Log.msg message + . Log.field "file" file + . Log.field "line" (show line) + . Log.field "provenance" provenance + where + toLogLevel :: Pulsar.LogLevel -> Log.Level + toLogLevel 0 = Log.Debug + toLogLevel 1 = Log.Info + toLogLevel 2 = Log.Warn + toLogLevel 3 = Log.Error + toLogLevel n = error ("Unknown Pulsar log level" <> show n) + +logPulsarResult :: (MonadIO m) => String -> Log.Logger -> Pulsar.RawResult -> m () +logPulsarResult provenance logger result = + Log.debug logger $ + Log.msg message + . Log.field "provenance" provenance + where + message = + "result: " <> pulsarResultToString result diff --git a/libs/wire-api/src/Wire/API/Event/WebSocketProtocol.hs b/libs/wire-api/src/Wire/API/Event/WebSocketProtocol.hs index 493e029cb09..548584e34a7 100644 --- a/libs/wire-api/src/Wire/API/Event/WebSocketProtocol.hs +++ b/libs/wire-api/src/Wire/API/Event/WebSocketProtocol.hs @@ -24,13 +24,13 @@ import Data.Aeson (FromJSON, ToJSON) import Data.Aeson qualified as A import Data.Aeson.Types qualified as A import Data.Schema -import Data.Word import Imports import Wire.API.Internal.Notification import Wire.Arbitrary data AckData = AckData - { deliveryTag :: Word64, + { deliveryTag :: String, -- TODO: Maybe, use Pulsar.Client.MessageId? + -- | Acknowledge all deliveryTags <= 'deliveryTag', see RabbitMQ -- documenation: -- https://www.rabbitmq.com/docs/confirms#consumer-acks-multiple-parameter @@ -49,7 +49,7 @@ instance ToSchema AckData where data EventData = EventData { event :: QueuedNotification, - deliveryTag :: Word64 + deliveryTag :: String -- TODO: Maybe, use Pulsar.Client.MessageId? } deriving (Show, Eq, Generic) deriving (Arbitrary) via (GenericUniform EventData) @@ -64,7 +64,7 @@ instance ToSchema EventData where data SynchronizationData = SynchronizationData { markerId :: Text, - deliveryTag :: Word64 + deliveryTag :: String -- TODO: Maybe, use Pulsar.Client.MessageId? } deriving (Show, Eq, Generic) deriving (Arbitrary) via (GenericUniform SynchronizationData) diff --git a/libs/wire-api/src/Wire/API/Notification.hs b/libs/wire-api/src/Wire/API/Notification.hs index d3b5a40511d..fd8852ee290 100644 --- a/libs/wire-api/src/Wire/API/Notification.hs +++ b/libs/wire-api/src/Wire/API/Notification.hs @@ -45,6 +45,7 @@ module Wire.API.Notification temporaryRoutingKey, clientRoutingKey, queueOpts, + PulsarMessage (..), ) where @@ -255,3 +256,15 @@ queueOpts qName = ) ] } + +data PulsarMessage = PulsarMessage + { msgBody :: Text, + msgContentType :: String, + -- TODO: This could be a sum type + msgType :: Maybe String + } + deriving (Generic, Show) + +instance FromJSON PulsarMessage + +instance ToJSON PulsarMessage diff --git a/nix/haskell-pins.nix b/nix/haskell-pins.nix index 8184e905762..0fec6ff5a80 100644 --- a/nix/haskell-pins.nix +++ b/nix/haskell-pins.nix @@ -71,6 +71,18 @@ let }; }; + pulsar-hs = { + src = fetchgit { + url = "https://github.com/wireapp/pulsar-hs"; + rev = "8135d4ace819c6d4b8f5030a11c07aeaf8ad2498"; + hash = "sha256-w+s365QpHwAiLMwo6h/GfyKAQxTOYbPg0hZ3xkZ0GII="; + }; + packages = { + pulsar-client-hs = "pulsar-client-hs"; + pulsar-admin = "pulsar-admin"; + }; + }; + # -------------------- # END maintained by us # -------------------- diff --git a/nix/manual-overrides.nix b/nix/manual-overrides.nix index 189db41faa6..2571a57cae3 100644 --- a/nix/manual-overrides.nix +++ b/nix/manual-overrides.nix @@ -1,4 +1,4 @@ -{ libsodium, protobuf, hlib, mls-test-cli, fetchurl, curl, pkg-config, postgresql, openssl, ... }: +{ libsodium, protobuf, hlib, mls-test-cli, fetchurl, curl, pkg-config, postgresql, openssl, libpulsar, ... }: # FUTUREWORK: Figure out a way to detect if some of these packages are not # actually marked broken, so we can cleanup this file on every nixpkgs bump. hself: hsuper: { @@ -102,4 +102,13 @@ hself: hsuper: { unix ]; }); + + pulsar-client-hs = hlib.doJailbreak (hlib.overrideCabal hsuper.pulsar-client-hs (drv: { + # TODO: This should just replace `pulsar` (broken package, not related to + # message brokers) with the `libpulsar` patched by us. However, we only + # want to try `pulsar-client-hs` now... + librarySystemDepends = [ libpulsar ]; + # This is really evil: Nix ignores Cabal's ghc-options! + configureFlags = [ "--ghc-options=-optl-Wl,--wrap=pulsar_client_configuration_set_logger_t" ]; + })); } diff --git a/nix/overlay.nix b/nix/overlay.nix index 36af565994b..d17ede68954 100644 --- a/nix/overlay.nix +++ b/nix/overlay.nix @@ -83,4 +83,16 @@ self: super: { rabbitmqadmin = super.callPackage ./pkgs/rabbitmqadmin { }; sbomqs = super.callPackage ./pkgs/sbomqs { }; + + # TODO: Quick and dirty patching. Using the overlay causes troubles to HLS + # (why ever...). This is good enough for experimenting, though. + libpulsar = super.libpulsar.overrideAttrs (old: { + patches = (old.patches or [ ]) ++ [ + (builtins.fetchGit + { + url = "https://github.com/wireapp/pulsar-hs.git"; + rev = "c5e8520b0c3efbd022659ceb642fb73e903bd933"; + } + /nix/add-stdbool-table-view.patch) + ]; + }); } diff --git a/nix/wire-server.nix b/nix/wire-server.nix index 5063f8c4f84..6b56753837b 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -158,7 +158,7 @@ let ]; manualOverrides = import ./manual-overrides.nix (with pkgs; { - inherit (pkgs) libsodium protobuf fetchpatch fetchurl curl pkg-config postgresql openssl; + inherit (pkgs) libsodium protobuf fetchpatch fetchurl curl pkg-config postgresql openssl libpulsar; inherit hlib mls-test-cli; }); diff --git a/services/cannon/cannon.cabal b/services/cannon/cannon.cabal index c526d3cd9cb..0bf130f4993 100644 --- a/services/cannon/cannon.cabal +++ b/services/cannon/cannon.cabal @@ -23,8 +23,8 @@ library Cannon.App Cannon.Dict Cannon.Options + Cannon.PulsarConsumerApp Cannon.RabbitMq - Cannon.RabbitMqConsumerApp Cannon.Run Cannon.Types Cannon.WS @@ -85,6 +85,7 @@ library , api-field-json-th >=0.1.0.2 , async >=2.0 , base >=4.6 && <5 + , base64 , bilge >=0.12 , binary , bytestring >=0.10 @@ -108,10 +109,12 @@ library , metrics-wai >=0.4 , mwc-random >=0.13 , prometheus-client + , pulsar-client-hs , retry >=0.7 , safe-exceptions , servant-conduit , servant-server + , stm , strict >=0.3.2 , text >=1.1 , tinylog >=0.10 @@ -119,6 +122,7 @@ library , types-common >=0.16 , unix , unliftio + , utf8-string , vector >=0.10 , wai >=3.0 , wai-extra >=3.0 diff --git a/services/cannon/cannon.integration.yaml b/services/cannon/cannon.integration.yaml index 2df084dffc6..cb578f1a502 100644 --- a/services/cannon/cannon.integration.yaml +++ b/services/cannon/cannon.integration.yaml @@ -30,6 +30,10 @@ rabbitmq: caCert: test/resources/rabbitmq-ca.pem insecureSkipVerifyTls: false +pulsar: + host: localhost + port: 6650 + drainOpts: gracePeriodSeconds: 1 millisecondsBetweenBatches: 500 diff --git a/services/cannon/default.nix b/services/cannon/default.nix index 0544d335640..0843d3738c6 100644 --- a/services/cannon/default.nix +++ b/services/cannon/default.nix @@ -8,6 +8,7 @@ , api-field-json-th , async , base +, base64 , bilge , binary , bytestring @@ -34,12 +35,14 @@ , metrics-wai , mwc-random , prometheus-client +, pulsar-client-hs , QuickCheck , random , retry , safe-exceptions , servant-conduit , servant-server +, stm , strict , tasty , tasty-hunit @@ -50,6 +53,7 @@ , types-common , unix , unliftio +, utf8-string , uuid , vector , wai @@ -72,6 +76,7 @@ mkDerivation { api-field-json-th async base + base64 bilge binary bytestring @@ -95,10 +100,12 @@ mkDerivation { metrics-wai mwc-random prometheus-client + pulsar-client-hs retry safe-exceptions servant-conduit servant-server + stm strict text tinylog @@ -106,6 +113,7 @@ mkDerivation { types-common unix unliftio + utf8-string vector wai wai-extra diff --git a/services/cannon/src/Cannon/API/Public.hs b/services/cannon/src/Cannon/API/Public.hs index 45a56c38b60..6fd3ab661dd 100644 --- a/services/cannon/src/Cannon/API/Public.hs +++ b/services/cannon/src/Cannon/API/Public.hs @@ -21,7 +21,7 @@ module Cannon.API.Public where import Cannon.App (wsapp) -import Cannon.RabbitMqConsumerApp (rabbitMQWebSocketApp) +import Cannon.PulsarConsumerApp (pulsarWebSocketApp) import Cannon.Types import Cannon.WS import Control.Monad.IO.Class @@ -47,4 +47,4 @@ streamData userId connId clientId con = do consumeEvents :: UserId -> Maybe ClientId -> Maybe Text -> PendingConnection -> Cannon () consumeEvents userId mClientId mSyncMarker con = do e <- wsenv - liftIO $ rabbitMQWebSocketApp userId mClientId mSyncMarker e con + liftIO $ pulsarWebSocketApp userId mClientId mSyncMarker e con diff --git a/services/cannon/src/Cannon/Options.hs b/services/cannon/src/Cannon/Options.hs index 3a1cee69885..4ac6af588ea 100644 --- a/services/cannon/src/Cannon/Options.hs +++ b/services/cannon/src/Cannon/Options.hs @@ -44,10 +44,11 @@ module Cannon.Options DrainOpts, WSOpts (..), validateOpts, + pulsar, ) where -import Cassandra.Options (CassandraOpts) +import Cassandra.Options (CassandraOpts, Endpoint) import Control.Lens (makeFields) import Data.Aeson import Data.Aeson.APIFieldJsonTH @@ -130,7 +131,8 @@ data Opts = Opts _optsRabbitMqMaxConnections :: Int, -- | Maximum number of rabbitmq channels per connection. Must be strictly positive. _optsRabbitMqMaxChannels :: Int, - _optsNotificationTTL :: Int + _optsNotificationTTL :: Int, + _optsPulsar :: Endpoint } deriving (Show, Generic) @@ -159,3 +161,4 @@ instance FromJSON Opts where <*> o .:? "rabbitMqMaxConnections" .!= 1000 <*> o .:? "rabbitMqMaxChannels" .!= 300 <*> o .: "notificationTTL" + <*> o .: "pulsar" diff --git a/services/cannon/src/Cannon/PulsarConsumerApp.hs b/services/cannon/src/Cannon/PulsarConsumerApp.hs new file mode 100644 index 00000000000..48512aa0e23 --- /dev/null +++ b/services/cannon/src/Cannon/PulsarConsumerApp.hs @@ -0,0 +1,470 @@ +{-# LANGUAGE RecordWildCards #-} + +module Cannon.PulsarConsumerApp (pulsarWebSocketApp) where + +import Cannon.App (rejectOnError) +import Cannon.Options +import Cannon.WS hiding (env) +import Cassandra as C hiding (batch) +import Conduit (runResourceT) +import Control.Concurrent.Async +import Control.Concurrent.Chan +import Control.Exception (Handler (..), catches) +import Control.Exception.Base +import Control.Lens hiding ((#)) +import Control.Monad.Codensity +import Control.Monad.STM qualified as STM +import Data.Aeson hiding (Key) +import Data.Aeson qualified as A +import Data.Base64.Types +import Data.ByteString qualified as BS +import Data.ByteString.Base64 +import Data.ByteString.UTF8 +import Data.ByteString.UTF8 qualified as BSUTF8 +import Data.Id +import Data.Text +import Data.Text qualified as T +import Data.Text.Encoding qualified as TE +import Imports hiding (min, threadDelay) +import Network.WebSockets +import Network.WebSockets qualified as WS +import Network.WebSockets.Connection +import Pulsar.Client qualified as Pulsar +import Pulsar.Client.Logging +import System.Logger qualified as Log +import System.Timeout +import UnliftIO qualified +import Wire.API.Event.WebSocketProtocol +import Wire.API.Notification + +data InactivityTimeout = InactivityTimeout + deriving (Show) + +instance Exception InactivityTimeout + +-- TODO: The name is a misleading. However, while developing, it's useful to keep the analogies with RabbitMQ. +data PulsarChannel = PulsarChannel + { -- TODO: Rename: msgChannel + msgVar :: Chan (PulsarMsgId, ByteString), + closeSignal :: MVar (), + acknowledgeMessages :: Chan PulsarMsgId, + rejectMessages :: Chan PulsarMsgId + } + +newtype PulsarMsgId = PulsarMsgId {unPulsarMsgId :: ByteString} + +createPulsarChannel :: UserId -> Maybe ClientId -> Env -> Codensity IO PulsarChannel +createPulsarChannel uid mCid env = do + msgChannel :: Chan (PulsarMsgId, ByteString) <- lift newChan + acknowledgeMessages :: Chan PulsarMsgId <- lift newChan + rejectMessages :: Chan PulsarMsgId <- lift newChan + closeSignal :: MVar () <- lift $ newEmptyMVar + unackedMsgsCounter :: TVar Int <- newTVarIO 0 + let subscription = case mCid of + Nothing -> temporaryRoutingKey uid + Just cid -> clientNotificationQueueName uid cid + subscriptionType = case mCid of + Nothing -> Pulsar.Latest + Just _cid -> Pulsar.Earliest + liftIO $ + do + Log.debug env.logg $ + Log.msg (Log.val "Connecting Pulsar consumer") + . Log.field "topic" (show topic) + void . async $ flip runReaderT env.pulsarClient $ do + Pulsar.withConsumerNoUnsubscribe + ( Pulsar.defaultConsumerConfiguration + { Pulsar.consumerType = Just Pulsar.ConsumerExclusive, + Pulsar.consumerSubscriptionInitialPosition = Just subscriptionType + } + ) + ("cannon-websocket-" ++ unpack subscription) + topic + (onPulsarError "createPulsarChannel consumer" env.logg) + $ do + receiveMsgsAsync :: Async () <- receiveMsgs msgChannel unackedMsgsCounter + blockOnCloseSignalAsync :: Async () <- blockOnCloseSignal closeSignal + acknowledgeMsgsAsync <- acknowledgeMsgs acknowledgeMessages unackedMsgsCounter + rejectMsgsAsync <- rejectMsgs rejectMessages unackedMsgsCounter + Log.info env.logg $ + Log.msg (Log.val "Consumer ready. Waiting for external input.") + . Log.field "topic" (show topic) + void $ UnliftIO.waitAnyCancel [receiveMsgsAsync, blockOnCloseSignalAsync, acknowledgeMsgsAsync, rejectMsgsAsync] + pure () + Log.debug env.logg $ + Log.msg @String "createPulsarChannel: Done" + . Log.field "topic" (show topic) + pure $ + PulsarChannel + { msgVar = msgChannel, + closeSignal = closeSignal, + acknowledgeMessages = acknowledgeMessages, + rejectMessages = rejectMessages + } + where + topic = Pulsar.Topic . Pulsar.TopicName $ "persistent://wire/user-notifications/" ++ unpack (userRoutingKey uid) + + receiveMsgs :: (UnliftIO.MonadUnliftIO m) => Chan (PulsarMsgId, ByteString) -> TVar Int -> ReaderT Pulsar.Consumer m (Async ()) + receiveMsgs msgChannel unackedMsgsCounter = UnliftIO.async . forever $ do + liftIO $ waitUntilCounterBelow unackedMsgsCounter 500 + Pulsar.receiveMessage (liftIO . onPulsarError "receiveMessage" env.logg) $ do + content <- Pulsar.messageContent + Log.debug env.logg $ + Log.msg @String "received message with content" + . Log.field "content" (BSUTF8.toString content) + . Log.field "topic" (show topic) + msgId <- Pulsar.messageId Pulsar.messageIdSerialize + liftIO $ writeChan msgChannel (PulsarMsgId msgId, content) + liftIO $ incCounter unackedMsgsCounter + Log.debug env.logg $ + Log.msg @String "wrote message to channel" + . Log.field "content" (BSUTF8.toString content) + . Log.field "topic" (show topic) + + blockOnCloseSignal :: (UnliftIO.MonadUnliftIO m) => MVar () -> m (Async ()) + blockOnCloseSignal = UnliftIO.async . readMVar + + acknowledgeMsgs :: (UnliftIO.MonadUnliftIO m) => Chan PulsarMsgId -> TVar Int -> ReaderT Pulsar.Consumer m (Async ()) + acknowledgeMsgs chan unackedMsgsCounter = + UnliftIO.async . forever $ do + PulsarMsgId msgId <- UnliftIO.readChan chan + consumer :: Pulsar.Consumer <- ask + Log.debug env.logg $ + Log.msg @String "acknowledgeMsgs" + . Log.field "topic" (show topic) + result <- liftIO $ Pulsar.withDeserializedMessageId consumer msgId Pulsar.acknowledgeMessageId + liftIO $ logPulsarResult "createPulsarChannel - acknowledge message result: " env.logg result + liftIO $ decCounter unackedMsgsCounter + + rejectMsgs :: (UnliftIO.MonadUnliftIO m) => Chan PulsarMsgId -> TVar Int -> ReaderT Pulsar.Consumer m (Async ()) + rejectMsgs chan unackedMsgsCounter = + UnliftIO.async . forever $ do + PulsarMsgId msgId <- UnliftIO.readChan chan + consumer :: Pulsar.Consumer <- ask + Log.debug env.logg $ + Log.msg @String "rejectMsgs" + . Log.field "topic" (show topic) + Pulsar.withDeserializedMessageId consumer msgId Pulsar.acknowledgeNegativeMessageId + liftIO $ decCounter unackedMsgsCounter + + incCounter :: TVar Int -> IO () + incCounter tv = atomically $ modifyTVar' tv (+ 1) + + decCounter :: TVar Int -> IO () + decCounter tv = atomically $ modifyTVar' tv (subtract 1) + + waitUntilCounterBelow :: TVar Int -> Int -> IO () + waitUntilCounterBelow tv threshold = atomically $ do + v <- readTVar tv + STM.check (v < threshold) -- blocks (retry) until v < threshold + +pulsarWebSocketApp :: UserId -> Maybe ClientId -> Maybe Text -> Env -> ServerApp +pulsarWebSocketApp uid mcid mSyncMarkerId e pendingConn = + lowerCodensity $ + do + chan <- createPulsarChannel uid mcid e + conn <- Codensity $ bracket openWebSocket closeWebSocket + activity <- liftIO newEmptyMVar + let wsConn = + WSConnection + { inner = conn, + activity, + activityTimeout = e.wsOpts.activityTimeout, + pongTimeout = e.wsOpts.pongTimeout + } + + main <- Codensity + $ withAsync + $ flip + catches + [ -- TODO: Review exceptions. pulsar-hs and amqp exceptions surely differ. + handleClientMisbehaving conn, + handleWebSocketExceptions conn, + handleInactivity conn, + handleOtherExceptions conn + ] + $ do + traverse_ (sendFullSyncMessageIfNeeded wsConn uid e) mcid + traverse_ (publishSyncMessage uid . mkSynchronizationMessage) mSyncMarkerId + sendNotifications chan wsConn + + let monitor = do + timeout wsConn.activityTimeout (takeMVar wsConn.activity) >>= \case + Just _ -> monitor + Nothing -> do + WS.sendPing wsConn.inner ("ping" :: Text) + timeout wsConn.pongTimeout (takeMVar wsConn.inner.connectionHeartbeat) >>= \case + Just _ -> monitor + Nothing -> cancelWith main InactivityTimeout + + _ <- Codensity $ withAsync monitor + + liftIO $ wait main + -- TODO: This probably needs more exception handling... However, I'd like to see the principle working first. + putMVar chan.closeSignal () + where + publishSyncMessage :: UserId -> ByteString -> IO () + publishSyncMessage userId message = + flip runReaderT e.pulsarClient $ do + let topic = Pulsar.TopicName $ "persistent://wire/user-notifications/" ++ unpack (userRoutingKey userId) + Pulsar.withProducer Pulsar.defaultProducerConfiguration topic (onPulsarError "publishSyncMessage producer" e.logg) $ do + result <- runResourceT $ do + (_, message') <- Pulsar.buildMessage $ Pulsar.defaultMessageBuilder {Pulsar.content = Just $ message} + lift $ Pulsar.sendMessage message' + liftIO $ logPulsarResult "consumeWebsocket" e.logg result + pure () + + logClient = + Log.field "user" (idToText uid) + . Log.field "client" (maybe "" clientToText mcid) + + openWebSocket = + acceptRequest pendingConn + `catch` rejectOnError pendingConn + + closeWebSocket wsConn = do + logCloseWebsocket + -- ignore any exceptions when sending the close message + void . try @SomeException $ WS.sendClose wsConn ("" :: ByteString) + + getEventData :: PulsarChannel -> IO (Either EventData SynchronizationData) + getEventData chan = do + (msgId, msg) <- readChan chan.msgVar + decMsg :: PulsarMessage <- either (\err -> logParseError err >> error "Unexpected parse error") pure $ A.eitherDecode (BS.fromStrict msg) + case decMsg.msgType of + Just "synchronization" -> do + let syncData = + SynchronizationData + { markerId = decMsg.msgBody, + deliveryTag = encodeMsgId msgId + } + pure $ Right syncData + _ -> do + case eitherDecode @QueuedNotification ((BS.fromStrict . TE.encodeUtf8) decMsg.msgBody) of + Left err -> + do + logParseError err + writeChan chan.rejectMessages msgId + -- TODO: Deadlettering hasn't been configured, yet. See e.g. https://pulsar.apache.org/docs/4.1.x/client-libraries-websocket/#negatively-acknowledge-messages + getEventData chan + Right notif -> do + logEvent notif + pure $ + Left $ + EventData + { event = notif, + deliveryTag = encodeMsgId msgId + } + + handleWebSocketExceptions wsConn = + Handler $ + \(err :: WS.ConnectionException) -> do + case err of + CloseRequest code reason -> + Log.debug e.logg $ + Log.msg (Log.val "Client requested to close connection") + . Log.field "status_code" code + . Log.field "reason" reason + . logClient + ConnectionClosed -> + Log.info e.logg $ + Log.msg (Log.val "Client closed tcp connection abruptly") + . logClient + _ -> do + Log.info e.logg $ + Log.msg (Log.val "Failed to receive message, closing websocket") + . Log.field "error" (displayException err) + . logClient + WS.sendCloseCode wsConn 1003 ("websocket-failure" :: ByteString) + + handleInactivity wsConn = + Handler $ \(_ :: InactivityTimeout) -> do + Log.info e.logg $ + Log.msg (Log.val "Closing websocket due to inactivity") + . logClient + WS.sendCloseCode wsConn 1002 ("inactivity" :: ByteString) + + handleClientMisbehaving wsConn = + Handler $ \(err :: WebSocketServerError) -> do + case err of + FailedToParseClientMessage parseError -> do + Log.info e.logg $ + Log.msg (Log.val "Failed to parse received message, closing websocket") + . Log.field "parse_error" parseError + . logClient + WS.sendCloseCode wsConn 1003 ("failed-to-parse" :: ByteString) + UnexpectedAck -> do + Log.info e.logg $ + Log.msg (Log.val "Client sent unexpected ack message") + . logClient + WS.sendCloseCode wsConn 1003 ("unexpected-ack" :: ByteString) + + handleOtherExceptions wsConn = Handler $ + \(err :: SomeException) -> do + WS.sendCloseCode wsConn 1003 ("internal-error" :: ByteString) + throwIO err + + mkSynchronizationMessage :: StrictText -> ByteString + mkSynchronizationMessage markerId = + -- TODO: Check all fromStrict/toStrict calls: It makes no sense to be "sometimes lazy". + BS.toStrict . encode $ + PulsarMessage + { msgBody = markerId, + msgContentType = "text/plain; charset=utf-8", + msgType = Just "synchronization" + } + + sendNotifications :: PulsarChannel -> WSConnection -> IO () + sendNotifications chan wsConn = do + Log.debug e.logg $ + Log.msg (Log.val "sendNotifications called ") + let consumeRabbitMq = forever $ do + Log.debug e.logg $ + Log.msg (Log.val "sendNotifications consumeRabbitMq called ") + eventData <- getEventData chan + let msg = case eventData of + Left event -> EventMessage event + Right sync -> EventSyncMessage sync + Log.debug e.logg $ + Log.msg @String "sendNotifications sending" + . Log.field "messahe" (show msg) + catch (WS.sendBinaryData wsConn.inner (encode msg)) $ + \(err :: SomeException) -> do + logSendFailure err + throwIO err + + -- get ack from websocket and forward to rabbitmq + let consumeWebsocket = forever $ do + getClientMessage wsConn >>= \case + AckFullSync -> throwIO UnexpectedAck + AckMessage ackData -> do + logAckReceived ackData + writeChan chan.acknowledgeMessages $ decodeMsgId ackData.deliveryTag + + -- run both loops concurrently, so that + -- - notifications are delivered without having to wait for acks + -- - exceptions on either side do not cause a deadlock + concurrently_ consumeRabbitMq consumeWebsocket + + decodeMsgId :: String -> PulsarMsgId + decodeMsgId = either (error . ("decodeMsgId: " ++) . unpack) PulsarMsgId . decodeBase64Untyped . BSUTF8.fromString + + encodeMsgId :: PulsarMsgId -> String + encodeMsgId = T.unpack . extractBase64 . encodeBase64 . unPulsarMsgId + + logParseError :: String -> IO () + logParseError err = + Log.err e.logg $ + Log.msg (Log.val "failed to decode event from the queue as a JSON") + . logClient + . Log.field "parse_error" err + + logEvent :: QueuedNotification -> IO () + logEvent event = + Log.debug e.logg $ + Log.msg (Log.val "got event") + . logClient + . Log.field "event" (encode event) + + logSendFailure :: SomeException -> IO () + logSendFailure err = + Log.err e.logg $ + Log.msg (Log.val "Pushing to WS failed, closing connection") + . Log.field "error" (displayException err) + . logClient + + logAckReceived :: AckData -> IO () + logAckReceived ackData = + Log.debug e.logg $ + Log.msg (Log.val "Received ACK") + . Log.field "delivery_tag" ackData.deliveryTag + . Log.field "multiple" ackData.multiple + . logClient + + logCloseWebsocket :: IO () + logCloseWebsocket = + Log.debug e.logg $ + Log.msg (Log.val "Closing the websocket") + . logClient + +-- | Check if client has missed messages. If so, send a full synchronisation +-- message and wait for the corresponding ack. +sendFullSyncMessageIfNeeded :: + WSConnection -> + UserId -> + Env -> + ClientId -> + IO () +sendFullSyncMessageIfNeeded wsConn uid env cid = do + row <- C.runClient env.cassandra do + retry x5 $ query1 q (params LocalQuorum (uid, cid)) + for_ row $ \_ -> sendFullSyncMessage uid cid wsConn env + where + q :: PrepQuery R (UserId, ClientId) (Identity (Maybe UserId)) + q = + [sql| SELECT user_id FROM missed_notifications + WHERE user_id = ? and client_id = ? + |] + +sendFullSyncMessage :: + UserId -> + ClientId -> + WSConnection -> + Env -> + IO () +sendFullSyncMessage uid cid wsConn env = do + let event = encode EventFullSync + WS.sendBinaryData wsConn.inner event + getClientMessage wsConn >>= \case + AckMessage _ -> throwIO UnexpectedAck + AckFullSync -> + C.runClient env.cassandra do + retry x1 $ write delete (params LocalQuorum (uid, cid)) + where + delete :: PrepQuery W (UserId, ClientId) () + delete = + [sql| + DELETE FROM missed_notifications + WHERE user_id = ? and client_id = ? + |] + +data WSConnection = WSConnection + { inner :: WS.Connection, + activity :: MVar (), + activityTimeout :: Int, + pongTimeout :: Int + } + +getClientMessage :: WSConnection -> IO MessageClientToServer +getClientMessage wsConn = do + msg <- WS.fromDataMessage <$> receiveDataMessageWithTimeout wsConn + case eitherDecode msg of + Left err -> throwIO (FailedToParseClientMessage err) + Right m -> pure m + +-- | A modified copy of 'WS.receiveDataMessage' which can detect client +-- inactivity. +receiveDataMessageWithTimeout :: WSConnection -> IO DataMessage +receiveDataMessageWithTimeout wsConn = do + msg <- WS.receive wsConn.inner + case msg of + DataMessage _ _ _ am -> pure am + ControlMessage cm -> case cm of + Close i closeMsg -> do + hasSentClose <- readIORef $ connectionSentClose wsConn.inner + unless hasSentClose $ send wsConn.inner msg + throwIO $ CloseRequest i closeMsg + Pong _ -> do + _ <- tryPutMVar (connectionHeartbeat wsConn.inner) () + receiveDataMessageWithTimeout wsConn + Ping pl -> do + _ <- tryPutMVar wsConn.activity () + send wsConn.inner (ControlMessage (Pong pl)) + receiveDataMessageWithTimeout wsConn + +data WebSocketServerError + = FailedToParseClientMessage String + | UnexpectedAck + deriving (Show) + +instance Exception WebSocketServerError diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs deleted file mode 100644 index 16c3c0e53fd..00000000000 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ /dev/null @@ -1,371 +0,0 @@ -{-# LANGUAGE RecordWildCards #-} - --- This file is part of the Wire Server implementation. --- --- Copyright (C) 2025 Wire Swiss GmbH --- --- This program is free software: you can redistribute it and/or modify it under --- the terms of the GNU Affero General Public License as published by the Free --- Software Foundation, either version 3 of the License, or (at your option) any --- later version. --- --- This program is distributed in the hope that it will be useful, but WITHOUT --- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS --- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more --- details. --- --- You should have received a copy of the GNU Affero General Public License along --- with this program. If not, see . - -module Cannon.RabbitMqConsumerApp (rabbitMQWebSocketApp) where - -import Cannon.App (rejectOnError) -import Cannon.Options -import Cannon.RabbitMq -import Cannon.WS hiding (env) -import Cassandra as C hiding (batch) -import Control.Concurrent.Async -import Control.Exception (Handler (..), bracket, catch, catches, handle, throwIO, try) -import Control.Lens hiding ((#)) -import Control.Monad.Codensity -import Data.Aeson hiding (Key) -import Data.Id -import Data.Text -import Data.Text qualified as Text -import Data.Text.Lazy qualified as TL -import Data.Text.Lazy.Encoding qualified as TLE -import Imports hiding (min, threadDelay) -import Network.AMQP (newQueue) -import Network.AMQP qualified as Q -import Network.WebSockets -import Network.WebSockets qualified as WS -import Network.WebSockets.Connection -import System.Logger qualified as Log -import System.Timeout -import Wire.API.Event.WebSocketProtocol -import Wire.API.Notification - -data InactivityTimeout = InactivityTimeout - deriving (Show) - -instance Exception InactivityTimeout - -rabbitMQWebSocketApp :: UserId -> Maybe ClientId -> Maybe Text -> Env -> ServerApp -rabbitMQWebSocketApp uid mcid mSyncMarkerId e pendingConn = - handle handleTooManyChannels . lowerCodensity $ - do - (chan, queueInfo) <- createChannel uid mcid e.pool createQueue - conn <- Codensity $ bracket openWebSocket closeWebSocket - activity <- liftIO newEmptyMVar - let wsConn = - WSConnection - { inner = conn, - activity, - activityTimeout = e.wsOpts.activityTimeout, - pongTimeout = e.wsOpts.pongTimeout - } - - main <- Codensity - $ withAsync - $ flip - catches - [ handleClientMisbehaving conn, - handleWebSocketExceptions conn, - handleRabbitMqChannelException conn, - handleInactivity conn, - handleOtherExceptions conn - ] - $ do - traverse_ (sendFullSyncMessageIfNeeded wsConn uid e) mcid - traverse_ (Q.publishMsg chan.inner "" queueInfo.queueName . mkSynchronizationMessage e.notificationTTL) (mcid *> mSyncMarkerId) - sendNotifications chan wsConn - - let monitor = do - timeout wsConn.activityTimeout (takeMVar wsConn.activity) >>= \case - Just _ -> monitor - Nothing -> do - WS.sendPing wsConn.inner ("ping" :: Text) - timeout wsConn.pongTimeout (takeMVar wsConn.inner.connectionHeartbeat) >>= \case - Just _ -> monitor - Nothing -> cancelWith main InactivityTimeout - - _ <- Codensity $ withAsync monitor - - liftIO $ wait main - where - logClient = - Log.field "user" (idToText uid) - . Log.field "client" (maybe "" clientToText mcid) - - openWebSocket = - acceptRequest pendingConn - `catch` rejectOnError pendingConn - - closeWebSocket wsConn = do - logCloseWebsocket - -- ignore any exceptions when sending the close message - void . try @SomeException $ WS.sendClose wsConn ("" :: ByteString) - - getEventData :: RabbitMqChannel -> IO (Either EventData SynchronizationData) - getEventData chan = do - (msg, envelope) <- getMessage chan - case msg.msgType of - Just "synchronization" -> do - let syncData = - SynchronizationData - { markerId = TL.toStrict $ TLE.decodeUtf8 msg.msgBody, - deliveryTag = envelope.envDeliveryTag - } - pure $ Right syncData - _ -> do - case eitherDecode @QueuedNotification msg.msgBody of - Left err -> do - logParseError err - -- This message cannot be parsed, make sure it doesn't requeue. There - -- is no need to throw an error and kill the websocket as this is - -- probably caused by a bug or someone messing with RabbitMQ. - -- - -- The bug case is slightly dangerous as it could drop a lot of events - -- en masse, if at some point we decide that Events should not be - -- pushed as JSONs, hopefully we think of the parsing side if/when - -- that happens. - Q.rejectEnv envelope False - -- try again - getEventData chan - Right notif -> do - logEvent notif - pure $ - Left $ - EventData - { event = notif, - deliveryTag = envelope.envDeliveryTag - } - - handleWebSocketExceptions wsConn = - Handler $ - \(err :: WS.ConnectionException) -> do - case err of - CloseRequest code reason -> - Log.debug e.logg $ - Log.msg (Log.val "Client requested to close connection") - . Log.field "status_code" code - . Log.field "reason" reason - . logClient - ConnectionClosed -> - Log.info e.logg $ - Log.msg (Log.val "Client closed tcp connection abruptly") - . logClient - _ -> do - Log.info e.logg $ - Log.msg (Log.val "Failed to receive message, closing websocket") - . Log.field "error" (displayException err) - . logClient - WS.sendCloseCode wsConn 1003 ("websocket-failure" :: ByteString) - - handleInactivity wsConn = - Handler $ \(_ :: InactivityTimeout) -> do - Log.info e.logg $ - Log.msg (Log.val "Closing websocket due to inactivity") - . logClient - WS.sendCloseCode wsConn 1002 ("inactivity" :: ByteString) - - handleClientMisbehaving wsConn = - Handler $ \(err :: WebSocketServerError) -> do - case err of - FailedToParseClientMessage parseError -> do - Log.info e.logg $ - Log.msg (Log.val "Failed to parse received message, closing websocket") - . Log.field "parse_error" parseError - . logClient - WS.sendCloseCode wsConn 1003 ("failed-to-parse" :: ByteString) - UnexpectedAck -> do - Log.info e.logg $ - Log.msg (Log.val "Client sent unexpected ack message") - . logClient - WS.sendCloseCode wsConn 1003 ("unexpected-ack" :: ByteString) - - handleRabbitMqChannelException wsConn = do - Handler $ \ChannelClosed -> do - Log.debug e.logg $ Log.msg (Log.val "RabbitMQ channel closed") . logClient - WS.sendCloseCode wsConn 1001 ("" :: ByteString) - - handleOtherExceptions wsConn = Handler $ - \(err :: SomeException) -> do - WS.sendCloseCode wsConn 1003 ("internal-error" :: ByteString) - throwIO err - - handleTooManyChannels TooManyChannels = - rejectRequestWith pendingConn $ - RejectRequest - { rejectCode = 503, - rejectMessage = "Service Unavailable", - rejectHeaders = [], - rejectBody = "" - } - - createQueue chan = case mcid of - Nothing -> Codensity $ \k -> do - (queueName, messageCount, _) <- - Q.declareQueue chan $ - newQueue - { Q.queueExclusive = True, - Q.queueAutoDelete = True - } - for_ [userRoutingKey uid, temporaryRoutingKey uid] $ - Q.bindQueue chan queueName userNotificationExchangeName - k $ QueueInfo {queueName = queueName, messageCount = messageCount} - Just cid -> Codensity $ \k -> do - (queueName, messageCount, _) <- Q.declareQueue chan $ queueOpts (clientNotificationQueueName uid cid) - k $ QueueInfo queueName messageCount - - mkSynchronizationMessage ttl markerId = - Q.newMsg - { Q.msgBody = TLE.encodeUtf8 (TL.fromStrict markerId), - Q.msgContentType = Just "text/plain; charset=utf-8", - Q.msgDeliveryMode = Just Q.Persistent, - Q.msgExpiration = Just (Text.pack $ show ttl), - Q.msgType = Just "synchronization" - } - - sendNotifications :: RabbitMqChannel -> WSConnection -> IO () - sendNotifications chan wsConn = do - let consumeRabbitMq = forever $ do - eventData <- getEventData chan - let msg = case eventData of - Left event -> EventMessage event - Right sync -> EventSyncMessage sync - - catch (WS.sendBinaryData wsConn.inner (encode msg)) $ - \(err :: SomeException) -> do - logSendFailure err - throwIO err - - -- get ack from websocket and forward to rabbitmq - let consumeWebsocket = forever $ do - getClientMessage wsConn >>= \case - AckFullSync -> throwIO UnexpectedAck - AckMessage ackData -> do - logAckReceived ackData - void $ ackMessage chan ackData.deliveryTag ackData.multiple - - -- run both loops concurrently, so that - -- - notifications are delivered without having to wait for acks - -- - exceptions on either side do not cause a deadlock - concurrently_ consumeRabbitMq consumeWebsocket - - logParseError :: String -> IO () - logParseError err = - Log.err e.logg $ - Log.msg (Log.val "failed to decode event from the queue as a JSON") - . logClient - . Log.field "parse_error" err - - logEvent :: QueuedNotification -> IO () - logEvent event = - Log.debug e.logg $ - Log.msg (Log.val "got event") - . logClient - . Log.field "event" (encode event) - - logSendFailure :: SomeException -> IO () - logSendFailure err = - Log.err e.logg $ - Log.msg (Log.val "Pushing to WS failed, closing connection") - . Log.field "error" (displayException err) - . logClient - - logAckReceived :: AckData -> IO () - logAckReceived ackData = - Log.debug e.logg $ - Log.msg (Log.val "Received ACK") - . Log.field "delivery_tag" ackData.deliveryTag - . Log.field "multiple" ackData.multiple - . logClient - - logCloseWebsocket :: IO () - logCloseWebsocket = - Log.debug e.logg $ - Log.msg (Log.val "Closing the websocket") - . logClient - --- | Check if client has missed messages. If so, send a full synchronisation --- message and wait for the corresponding ack. -sendFullSyncMessageIfNeeded :: - WSConnection -> - UserId -> - Env -> - ClientId -> - IO () -sendFullSyncMessageIfNeeded wsConn uid env cid = do - row <- C.runClient env.cassandra do - retry x5 $ query1 q (params LocalQuorum (uid, cid)) - for_ row $ \_ -> sendFullSyncMessage uid cid wsConn env - where - q :: PrepQuery R (UserId, ClientId) (Identity (Maybe UserId)) - q = - [sql| SELECT user_id FROM missed_notifications - WHERE user_id = ? and client_id = ? - |] - -sendFullSyncMessage :: - UserId -> - ClientId -> - WSConnection -> - Env -> - IO () -sendFullSyncMessage uid cid wsConn env = do - let event = encode EventFullSync - WS.sendBinaryData wsConn.inner event - getClientMessage wsConn >>= \case - AckMessage _ -> throwIO UnexpectedAck - AckFullSync -> - C.runClient env.cassandra do - retry x1 $ write delete (params LocalQuorum (uid, cid)) - where - delete :: PrepQuery W (UserId, ClientId) () - delete = - [sql| - DELETE FROM missed_notifications - WHERE user_id = ? and client_id = ? - |] - -data WSConnection = WSConnection - { inner :: WS.Connection, - activity :: MVar (), - activityTimeout :: Int, - pongTimeout :: Int - } - -getClientMessage :: WSConnection -> IO MessageClientToServer -getClientMessage wsConn = do - msg <- WS.fromDataMessage <$> receiveDataMessageWithTimeout wsConn - case eitherDecode msg of - Left err -> throwIO (FailedToParseClientMessage err) - Right m -> pure m - --- | A modified copy of 'WS.receiveDataMessage' which can detect client --- inactivity. -receiveDataMessageWithTimeout :: WSConnection -> IO DataMessage -receiveDataMessageWithTimeout wsConn = do - msg <- WS.receive wsConn.inner - case msg of - DataMessage _ _ _ am -> pure am - ControlMessage cm -> case cm of - Close i closeMsg -> do - hasSentClose <- readIORef $ connectionSentClose wsConn.inner - unless hasSentClose $ send wsConn.inner msg - throwIO $ CloseRequest i closeMsg - Pong _ -> do - _ <- tryPutMVar (connectionHeartbeat wsConn.inner) () - receiveDataMessageWithTimeout wsConn - Ping pl -> do - _ <- tryPutMVar wsConn.activity () - send wsConn.inner (ControlMessage (Pong pl)) - receiveDataMessageWithTimeout wsConn - -data WebSocketServerError - = FailedToParseClientMessage String - | UnexpectedAck - deriving (Show) - -instance Exception WebSocketServerError diff --git a/services/cannon/src/Cannon/Run.hs b/services/cannon/src/Cannon/Run.hs index 1583072882d..094d11a89b4 100644 --- a/services/cannon/src/Cannon/Run.hs +++ b/services/cannon/src/Cannon/Run.hs @@ -30,6 +30,7 @@ import Cannon.Options import Cannon.RabbitMq import Cannon.Types hiding (Env) import Cannon.WS hiding (drainOpts, env) +import Cassandra.Options (toPulsarUrl) import Cassandra.Util (defInitCassandra) import Control.Concurrent import Control.Concurrent.Async qualified as Async @@ -52,6 +53,8 @@ import OpenTelemetry.Instrumentation.Wai import OpenTelemetry.Trace hiding (Server) import OpenTelemetry.Trace qualified as Otel import Prometheus qualified as Prom +import Pulsar.Client qualified as Pulsar +import Pulsar.Client.Logging import Servant import System.IO.Strict qualified as Strict import System.Logger.Class qualified as LC @@ -68,61 +71,60 @@ import Wire.OpenTelemetry (withTracer) type CombinedAPI = CannonAPI :<|> Internal.API run :: Opts -> IO () -run o = lowerCodensity $ do - lift $ validateOpts o - tracer <- Codensity withTracer - when (o ^. drainOpts . millisecondsBetweenBatches == 0) $ - error "drainOpts.millisecondsBetweenBatches must not be set to 0." - when (o ^. drainOpts . gracePeriodSeconds == 0) $ - error "drainOpts.gracePeriodSeconds must not be set to 0." - ext <- lift loadExternal - g <- - Codensity $ - E.bracket - (L.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat)) - L.close - cassandra <- lift $ defInitCassandra (o ^. cassandraOpts) g +run o = do + g <- L.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat) + Pulsar.withClient (Pulsar.defaultClientConfiguration {Pulsar.clientLogger = Just (pulsarClientLogger "Cannon run" g)}) (toPulsarUrl (o ^. pulsar)) $ do + pulsarClient <- ask + liftIO . lowerCodensity $ do + lift $ validateOpts o + tracer <- Codensity withTracer + when (o ^. drainOpts . millisecondsBetweenBatches == 0) $ + error "drainOpts.millisecondsBetweenBatches must not be set to 0." + when (o ^. drainOpts . gracePeriodSeconds == 0) $ + error "drainOpts.gracePeriodSeconds must not be set to 0." + ext <- lift loadExternal + cassandra <- lift $ defInitCassandra (o ^. cassandraOpts) g - e <- do - d1 <- D.empty numDictSlices - d2 <- D.empty numDictSlices - man <- lift $ newManager defaultManagerSettings {managerConnCount = 128} - rnd <- lift createSystemRandom - clk <- lift mkClock - mkEnv ext o cassandra g d1 d2 man rnd clk (o ^. Cannon.Options.rabbitmq) + e <- do + d1 <- D.empty numDictSlices + d2 <- D.empty numDictSlices + man <- lift $ newManager defaultManagerSettings {managerConnCount = 128} + rnd <- lift createSystemRandom + clk <- lift mkClock + mkEnv ext o cassandra g d1 d2 man rnd clk (o ^. Cannon.Options.rabbitmq) pulsarClient - void $ Codensity $ Async.withAsync $ runCannon e refreshMetrics - let s = newSettings $ Server (o ^. cannon . host) (o ^. cannon . port) (applog e) (Just idleTimeout) + void $ Codensity $ Async.withAsync $ runCannon e refreshMetrics + let s = newSettings $ Server (o ^. cannon . host) (o ^. cannon . port) (applog e) (Just idleTimeout) - otelMiddleWare <- lift newOpenTelemetryWaiMiddleware - let middleware :: Wai.Middleware - middleware = - versionMiddleware (foldMap expandVersionExp (o ^. disabledAPIVersions)) - . requestIdMiddleware g defaultRequestIdHeaderName - . servantPrometheusMiddleware (Proxy @CombinedAPI) - . otelMiddleWare - . Gzip.gzip Gzip.def - . catchErrors g defaultRequestIdHeaderName - app :: Application - app = middleware (serve (Proxy @CombinedAPI) server) - server :: Servant.Server CombinedAPI - server = - hoistServer (Proxy @CannonAPI) (runCannonToServant e) publicAPIServer - :<|> hoistServer (Proxy @Internal.API) (runCannonToServant e) internalServer - tid <- lift myThreadId + otelMiddleWare <- lift newOpenTelemetryWaiMiddleware + let middleware :: Wai.Middleware + middleware = + versionMiddleware (foldMap expandVersionExp (o ^. disabledAPIVersions)) + . requestIdMiddleware g defaultRequestIdHeaderName + . servantPrometheusMiddleware (Proxy @CombinedAPI) + . otelMiddleWare + . Gzip.gzip Gzip.def + . catchErrors g defaultRequestIdHeaderName + app :: Application + app = middleware (serve (Proxy @CombinedAPI) server) + server :: Servant.Server CombinedAPI + server = + hoistServer (Proxy @CannonAPI) (runCannonToServant e) publicAPIServer + :<|> hoistServer (Proxy @Internal.API) (runCannonToServant e) internalServer + tid <- lift myThreadId - Codensity $ \k -> - inSpan tracer "cannon" defaultSpanArguments {kind = Otel.Server} (k ()) - lift $ - E.handle uncaughtExceptionHandler $ do - let handler = signalHandler (env e) (o ^. drainOpts) tid - void $ installHandler sigTERM handler Nothing - void $ installHandler sigINT handler Nothing - -- FUTUREWORK(@akshaymankar, @fisx): we may want to call `runSettingsWithShutdown` here, - -- but it's a sensitive change, and it looks like this is closing all the websockets at - -- the same time and then calling the drain script. I suspect this might be due to some - -- cleanup in wai. this needs to be tested very carefully when touched. - runSettings s app + Codensity $ \k -> + inSpan tracer "cannon" defaultSpanArguments {kind = Otel.Server} (k ()) + lift $ + E.handle uncaughtExceptionHandler $ do + let handler = signalHandler (env e) (o ^. drainOpts) tid + void $ installHandler sigTERM handler Nothing + void $ installHandler sigINT handler Nothing + -- FUTUREWORK(@akshaymankar, @fisx): we may want to call `runSettingsWithShutdown` here, + -- but it's a sensitive change, and it looks like this is closing all the websockets at + -- the same time and then calling the drain script. I suspect this might be due to some + -- cleanup in wai. this needs to be tested very carefully when touched. + runSettings s app where idleTimeout = fromIntegral $ maxPingInterval + 3 -- Each cannon instance advertises its own location (ip or dns name) to gundeck. diff --git a/services/cannon/src/Cannon/Types.hs b/services/cannon/src/Cannon/Types.hs index f1499f308f4..f7e8363e8ed 100644 --- a/services/cannon/src/Cannon/Types.hs +++ b/services/cannon/src/Cannon/Types.hs @@ -38,8 +38,9 @@ import Cannon.RabbitMq import Cannon.WS (Clock, Key, Websocket) import Cannon.WS qualified as WS import Cassandra (ClientState) +import Cassandra.Options (toPulsarUrl) import Control.Concurrent.Async (mapConcurrently) -import Control.Lens ((^.)) +import Control.Lens (to, (^.)) import Control.Monad.Catch import Control.Monad.Codensity import Data.Id @@ -49,6 +50,7 @@ import Imports import Network.AMQP qualified as Q import Network.AMQP.Extended (AmqpEndpoint) import Prometheus +import Pulsar.Client qualified as Pulsar import Servant qualified import System.Logger qualified as Logger import System.Logger.Class hiding (info) @@ -109,8 +111,9 @@ mkEnv :: GenIO -> Clock -> AmqpEndpoint -> + Pulsar.Client -> Codensity IO Env -mkEnv external o cs l d conns p g t endpoint = do +mkEnv external o cs l d conns p g t endpoint pulsarC = do let poolOpts = RabbitMqPoolOptions { endpoint = endpoint, @@ -136,6 +139,8 @@ mkEnv external o cs l d conns p g t endpoint = do cs pool (o ^. notificationTTL) + (o ^. pulsar . to toPulsarUrl) + pulsarC pure $ Env o l d conns (RequestId defRequestId) wsEnv runCannon :: Env -> Cannon a -> IO a diff --git a/services/cannon/src/Cannon/WS.hs b/services/cannon/src/Cannon/WS.hs index d470019f447..5eab1debba4 100644 --- a/services/cannon/src/Cannon/WS.hs +++ b/services/cannon/src/Cannon/WS.hs @@ -77,6 +77,7 @@ import Network.HTTP.Types.Method import Network.HTTP.Types.Status import Network.Wai.Utilities.Error import Network.WebSockets hiding (Request) +import Pulsar.Client qualified as Pulsar import System.Logger qualified as Logger import System.Logger.Class hiding (Error, Settings, close, (.=)) import System.Random.MWC (GenIO, uniform) @@ -159,7 +160,9 @@ data Env = Env wsOpts :: WSOpts, cassandra :: ClientState, pool :: RabbitMqPool, - notificationTTL :: Int + notificationTTL :: Int, + pulsarUrl :: String, + pulsarClient :: Pulsar.Client } setRequestId :: RequestId -> Env -> Env @@ -210,8 +213,10 @@ env :: ClientState -> RabbitMqPool -> Int -> + String -> + Pulsar.Client -> Env -env externalHostname portnum gundeckHost gundeckPort logg manager websockets rabbitConnections rand clock drainOpts wsOpts cassandra pool notificationTTL = +env externalHostname portnum gundeckHost gundeckPort logg manager websockets rabbitConnections rand clock drainOpts wsOpts cassandra pool notificationTTL pulsarUrl pulsarClient = let upstream = (Bilge.host gundeckHost . Bilge.port gundeckPort $ empty) reqId = RequestId defRequestId in Env {..} diff --git a/services/gundeck/default.nix b/services/gundeck/default.nix index a709d333df8..7a5936246a0 100644 --- a/services/gundeck/default.nix +++ b/services/gundeck/default.nix @@ -52,6 +52,8 @@ , optparse-applicative , prometheus-client , psqueues +, pulsar-admin +, pulsar-client-hs , QuickCheck , quickcheck-instances , quickcheck-state-machine @@ -63,6 +65,7 @@ , safe-exceptions , scientific , servant +, servant-client , servant-server , string-conversions , tagged @@ -133,11 +136,14 @@ mkDerivation { network-uri prometheus-client psqueues + pulsar-admin + pulsar-client-hs raw-strings-qq resourcet retry safe-exceptions servant + servant-client servant-server text these @@ -180,6 +186,7 @@ mkDerivation { network network-uri optparse-applicative + pulsar-client-hs random retry safe diff --git a/services/gundeck/gundeck.cabal b/services/gundeck/gundeck.cabal index 470cd7b5ae7..b6907f92f6f 100644 --- a/services/gundeck/gundeck.cabal +++ b/services/gundeck/gundeck.cabal @@ -149,11 +149,14 @@ library , network-uri >=2.6 , prometheus-client , psqueues >=0.2.2 + , pulsar-admin + , pulsar-client-hs , raw-strings-qq , resourcet >=1.1 , retry >=0.5 , safe-exceptions , servant + , servant-client , servant-server , text >=1.1 , these @@ -306,6 +309,7 @@ executable gundeck-integration , cassandra-util , containers , exceptions + , extended , gundeck , HsOpenSSL , http-client @@ -317,6 +321,7 @@ executable gundeck-integration , network , network-uri , optparse-applicative + , pulsar-client-hs , random , retry , safe diff --git a/services/gundeck/gundeck.integration.yaml b/services/gundeck/gundeck.integration.yaml index 1c33557402a..df39731dbad 100644 --- a/services/gundeck/gundeck.integration.yaml +++ b/services/gundeck/gundeck.integration.yaml @@ -42,6 +42,14 @@ rabbitmq: caCert: test/resources/rabbitmq-ca.pem insecureSkipVerifyTls: false +pulsar: + host: localhost + port: 6650 + +pulsarAdmin: + host: localhost + port: 5080 + settings: httpPoolSize: 1024 notificationTTL: 24192200 diff --git a/services/gundeck/src/Gundeck/API/Internal.hs b/services/gundeck/src/Gundeck/API/Internal.hs index c1c1591ab8d..3a311acf02a 100644 --- a/services/gundeck/src/Gundeck/API/Internal.hs +++ b/services/gundeck/src/Gundeck/API/Internal.hs @@ -22,7 +22,9 @@ module Gundeck.API.Internal where import Cassandra qualified +import Control.Exception import Control.Lens (view) +import Control.Monad.Catch import Data.Id import Gundeck.Client import Gundeck.Client qualified as Client @@ -33,6 +35,7 @@ import Gundeck.Push.Data qualified as PushTok import Gundeck.Push.Native.Types qualified as PushTok import Imports import Servant +import System.Logger.Class import Wire.API.Push.Token qualified as PushTok import Wire.API.Push.V2 import Wire.API.Routes.Internal.Gundeck @@ -68,7 +71,14 @@ getPushTokensH :: UserId -> Gundeck PushTok.PushTokenList getPushTokensH uid = PushTok.PushTokenList <$> (view PushTok.addrPushToken <$$> PushTok.lookup uid Cassandra.All) registerConsumableNotificationsClient :: UserId -> ClientId -> Gundeck NoContent -registerConsumableNotificationsClient uid cid = do - chan <- getRabbitMqChan - void . liftIO $ setupConsumableNotifications chan uid cid - pure NoContent +registerConsumableNotificationsClient uid cid = + -- TODO: This error handling is crazy: However, if there is any exception we want to see during this debug phase. + Control.Monad.Catch.catch + ( do + setupConsumableNotifications uid cid + pure NoContent + ) + handler + where + handler :: SomeException -> Gundeck NoContent + handler e = System.Logger.Class.log Error (msg $ displayException e) >> pure NoContent diff --git a/services/gundeck/src/Gundeck/Client.hs b/services/gundeck/src/Gundeck/Client.hs index 9d89c2da805..ab124020bf5 100644 --- a/services/gundeck/src/Gundeck/Client.hs +++ b/services/gundeck/src/Gundeck/Client.hs @@ -17,14 +17,18 @@ module Gundeck.Client where +import Cassandra.Options import Control.Lens (view) import Data.Id +import Gundeck.Env import Gundeck.Monad import Gundeck.Notification.Data qualified as Notifications import Gundeck.Push.Data qualified as Push import Gundeck.Push.Native import Imports -import Network.AMQP +import Pulsar.Admin +import Servant.Client +import System.Logger qualified as Log import Wire.API.Notification unregister :: UserId -> ClientId -> Gundeck () @@ -42,16 +46,36 @@ removeUser user = do Notifications.deleteAll user setupConsumableNotifications :: - Channel -> UserId -> ClientId -> - IO Text -setupConsumableNotifications chan uid cid = do - let qName = clientNotificationQueueName uid cid - void $ - declareQueue - chan - (queueOpts qName) - for_ [userRoutingKey uid, clientRoutingKey uid cid] $ - bindQueue chan qName userNotificationExchangeName - pure qName + Gundeck () +setupConsumableNotifications uid cid = do + let -- Rebuilding `latest` here. See https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java#L58 + resetCursorCfg = + ResetCursorData + { resetCursorDataBatchIndex = Just (-1), + resetCursorDataEntryId = Just $ fromIntegral (maxBound :: Int64), + resetCursorDataExcluded = Nothing, + resetCursorDataLedgerId = Just $ fromIntegral (maxBound :: Int64), + resetCursorDataPartitionIndex = Just (-1), + resetCursorDataProperties = Nothing + } + cfg = + PersistentTopicsCreateSubscriptionParameters + { persistentTopicsCreateSubscriptionTenant = "wire", + persistentTopicsCreateSubscriptionNamespace = "user-notifications", + persistentTopicsCreateSubscriptionTopic = userRoutingKey uid, + persistentTopicsCreateSubscriptionSubscriptionName = ("cannon-websocket-" :: Text) <> clientNotificationQueueName uid cid, + persistentTopicsCreateSubscriptionAuthoritative = Nothing, + persistentTopicsCreateSubscriptionReplicated = Nothing, + persistentTopicsCreateSubscriptionMessageId = resetCursorCfg + } + httpManager <- view Gundeck.Monad.manager + pulsarAdminUrlString <- toPulsarAdminUrl <$> view pulsarAdmin + pulsarAdminUrl <- parseBaseUrl pulsarAdminUrlString + liftIO . void $ flip runClientM (mkClientEnv httpManager pulsarAdminUrl) $ persistentTopicsCreateSubscription cfg + logger <- view applog + Log.debug logger $ + Log.msg @String "Subscription created" + . Log.field "topic" (show cfg.persistentTopicsCreateSubscriptionTopic) + . Log.field "subscription" (show cfg.persistentTopicsCreateSubscriptionSubscriptionName) diff --git a/services/gundeck/src/Gundeck/Env.hs b/services/gundeck/src/Gundeck/Env.hs index e3670c13a8e..c35275d06da 100644 --- a/services/gundeck/src/Gundeck/Env.hs +++ b/services/gundeck/src/Gundeck/Env.hs @@ -47,8 +47,10 @@ import Network.HTTP.Client (responseTimeoutMicro) import Network.HTTP.Client.TLS (tlsManagerSettings) import Network.TLS as TLS import Network.TLS.Extra qualified as TLS +import Pulsar.Client qualified as Pulsar import System.Logger qualified as Log import System.Logger.Extended qualified as Logger +import Util.Options (Endpoint) data Env = Env { _reqId :: !RequestId, @@ -61,14 +63,16 @@ data Env = Env _awsEnv :: !Aws.Env, _time :: !(IO Milliseconds), _threadBudgetState :: !(Maybe ThreadBudgetState), - _rabbitMqChannel :: MVar Channel + _rabbitMqChannel :: MVar Channel, + _pulsar :: Endpoint, + _pulsarAdmin :: Endpoint, + _pulsarClient :: Pulsar.Client } makeLenses ''Env -createEnv :: Opts -> IO ([Async ()], Env) -createEnv o = do - l <- Logger.mkLogger (o ^. logLevel) (o ^. logNetStrings) (o ^. logFormat) +createEnv :: Opts -> Logger.Logger -> Pulsar.Client -> IO ([Async ()], Env) +createEnv o l pulsarClientArg = do n <- newManager tlsManagerSettings @@ -105,7 +109,7 @@ createEnv o = do } mtbs <- mkThreadBudgetState `mapM` (o ^. settings . maxConcurrentNativePushes) rabbitMqChannelMVar <- Q.mkRabbitMqChannelMVar l (Just "gundeck") (o ^. rabbitmq) - pure $! (rThread : rAdditionalThreads,) $! Env (RequestId defRequestId) o l n p r rAdditional a io mtbs rabbitMqChannelMVar + pure $! (rThread : rAdditionalThreads,) $! Env (RequestId defRequestId) o l n p r rAdditional a io mtbs rabbitMqChannelMVar (o ^. Opt.pulsar) (o ^. Opt.pulsarAdmin) pulsarClientArg reqIdMsg :: RequestId -> Logger.Msg -> Logger.Msg reqIdMsg = ("request" Logger..=) . unRequestId diff --git a/services/gundeck/src/Gundeck/Options.hs b/services/gundeck/src/Gundeck/Options.hs index ee55c98bebe..fb33151db61 100644 --- a/services/gundeck/src/Gundeck/Options.hs +++ b/services/gundeck/src/Gundeck/Options.hs @@ -139,6 +139,8 @@ data Opts = Opts _rabbitmq :: !AmqpEndpoint, _discoUrl :: !(Maybe Text), _settings :: !Settings, + _pulsar :: !Endpoint, + _pulsarAdmin :: !Endpoint, -- Logging -- | Log level (Debug, Info, etc) diff --git a/services/gundeck/src/Gundeck/Push.hs b/services/gundeck/src/Gundeck/Push.hs index 3609bb5dfb0..cba938ac824 100644 --- a/services/gundeck/src/Gundeck/Push.hs +++ b/services/gundeck/src/Gundeck/Push.hs @@ -53,7 +53,10 @@ import Control.Error import Control.Lens (to, view, (.~), (^.)) import Control.Monad.Catch import Control.Monad.Except (throwError) +import Data.Aeson qualified as A import Data.Aeson qualified as Aeson +import Data.ByteString qualified as B +import Data.ByteString qualified as BS import Data.ByteString.Conversion (toByteString') import Data.Id import Data.List.Extra qualified as List @@ -62,6 +65,7 @@ import Data.Map qualified as Map import Data.Misc import Data.Set qualified as Set import Data.Text qualified as Text +import Data.Text.Encoding qualified as TE import Data.These import Data.Timeout import Data.UUID qualified as UUID @@ -84,9 +88,12 @@ import Network.AMQP (Message (..)) import Network.AMQP qualified as Q import Network.HTTP.Types import Network.Wai.Utilities +import Pulsar.Client qualified as Pulsar +import Pulsar.Client.Logging import System.Logger.Class (msg, val, (+++), (.=), (~~)) import System.Logger.Class qualified as Log import UnliftIO (pooledMapConcurrentlyN) +import UnliftIO.Resource import Util.Options import Wire.API.Internal.Notification import Wire.API.Notification @@ -115,6 +122,7 @@ class (MonadThrow m) => MonadPushAll m where mpaRunWithBudget :: Int -> a -> m a -> m a mpaGetClients :: Set UserId -> m UserClientsFull mpaPublishToRabbitMq :: Text -> Text -> Q.Message -> m () + mpaPublishToPulsar :: Text -> Q.Message -> m () instance MonadPushAll Gundeck where mpaNotificationTTL = view (options . settings . notificationTTL) @@ -128,12 +136,35 @@ instance MonadPushAll Gundeck where mpaRunWithBudget = runWithBudget'' mpaGetClients = getClients mpaPublishToRabbitMq = publishToRabbitMq + mpaPublishToPulsar = publishToPulsar publishToRabbitMq :: Text -> Text -> Q.Message -> Gundeck () publishToRabbitMq exchangeName routingKey qMsg = do chan <- getRabbitMqChan void $ liftIO $ Q.publishMsg chan exchangeName routingKey qMsg +publishToPulsar :: Text -> Q.Message -> Gundeck () +publishToPulsar routingKey qMsg = do + logger <- view applog + pulsarC <- view Gundeck.Env.pulsarClient + flip runReaderT pulsarC $ + Pulsar.withProducer Pulsar.defaultProducerConfiguration topicName (onPulsarError "publishToPulsar" logger) $ do + result <- runResourceT $ do + (_, message) <- Pulsar.buildMessage $ Pulsar.defaultMessageBuilder {Pulsar.content = Just $ BS.toStrict (A.encode pulsarMessage)} + lift $ Pulsar.sendMessage message + logPulsarResult "publishToPulsar" logger result + where + topicName = Pulsar.TopicName $ "persistent://wire/user-notifications/" ++ Text.unpack routingKey + + pulsarMessage :: PulsarMessage + pulsarMessage = + PulsarMessage + { msgBody = TE.decodeUtf8 . B.toStrict $ (Q.msgBody qMsg), + msgContentType = "application/json", + -- TODO: This could be a sum type + msgType = Nothing + } + -- | Another layer of wrap around 'runWithBudget'. runWithBudget'' :: Int -> a -> Gundeck a -> Gundeck a runWithBudget'' budget fallback action = do @@ -247,7 +278,7 @@ pushAll pushes = do pushAllLegacy legacyNotifs allUserClients rabbitmqNotifs <- mapM mkNewNotification rabbitmqPushes - pushAllViaRabbitMq rabbitmqNotifs allUserClients + pushAllViaMessageBroker rabbitmqNotifs allUserClients -- Note that Cells needs all notifications because it doesn't matter whether -- some recipients have rabbitmq clients or not. @@ -297,28 +328,23 @@ pushNativeWithBudget notif psh dontPush = do mpaRunWithBudget cost () $ mpaPushNative notif (psh ^. pushNativePriority) =<< nativeTargets psh rcps' dontPush -pushAllViaRabbitMq :: (MonadPushAll m, MonadMapAsync m, MonadNativeTargets m) => [NewNotification] -> UserClientsFull -> m () -pushAllViaRabbitMq newNotifs userClientsFull = do - for_ newNotifs $ pushViaRabbitMq +pushAllViaMessageBroker :: (MonadPushAll m, MonadMapAsync m, MonadNativeTargets m) => [NewNotification] -> UserClientsFull -> m () +pushAllViaMessageBroker newNotifs userClientsFull = do + for_ newNotifs $ pushViaPulsar mpaForkIO $ do for_ newNotifs $ \newNotif -> do let cassandraClients = Map.map (Set.filter $ not . supportsConsumableNotifications) userClientsFull.userClientsFull cassandraClientIds = Map.foldMapWithKey (\uid clients -> Set.map (\c -> (uid, c.clientId)) clients) cassandraClients pushNativeWithBudget newNotif.nnNotification newNotif.nnPush (Set.toList $ cassandraClientIds) -pushViaRabbitMq :: (MonadPushAll m) => NewNotification -> m () -pushViaRabbitMq newNotif = do +pushViaPulsar :: (MonadPushAll m) => NewNotification -> m () +pushViaPulsar newNotif = do qMsg <- mkMessage newNotif.nnNotification - let routingKeys = - Set.unions $ - flip Set.map (Set.fromList . toList $ newNotif.nnRecipients) \r -> - case r._recipientClients of - RecipientClientsAll -> - Set.singleton $ userRoutingKey r._recipientId - RecipientClientsSome (toList -> cs) -> - Set.fromList $ map (clientRoutingKey r._recipientId) cs + let routingKeys = Set.unions $ + flip Set.map (Set.fromList . toList $ newNotif.nnRecipients) \r -> + Set.singleton $ userRoutingKey r._recipientId for_ routingKeys $ \routingKey -> - mpaPublishToRabbitMq userNotificationExchangeName routingKey qMsg + mpaPublishToPulsar routingKey qMsg pushAllToCells :: (MonadPushAll m, Log.MonadLogger m) => [NewNotification] -> m () pushAllToCells newNotifs = do diff --git a/services/gundeck/src/Gundeck/Run.hs b/services/gundeck/src/Gundeck/Run.hs index f3f1ed140db..4a2117399de 100644 --- a/services/gundeck/src/Gundeck/Run.hs +++ b/services/gundeck/src/Gundeck/Run.hs @@ -70,10 +70,13 @@ import Network.Wai.Utilities.Server hiding (serverPort) import OpenTelemetry.Instrumentation.Wai (newOpenTelemetryWaiMiddleware) import OpenTelemetry.Trace (defaultSpanArguments, inSpan, kind) import OpenTelemetry.Trace qualified as Otel +import Pulsar.Client qualified as Pulsar +import Pulsar.Client.Logging import Servant (Handler (Handler), (:<|>) (..)) import Servant qualified import System.Logger qualified as Log import System.Logger.Class qualified as MonadLogger +import System.Logger.Extended qualified as Logger import UnliftIO.Async qualified as Async import Util.Options import Wire.API.Notification @@ -84,31 +87,34 @@ import Wire.OpenTelemetry run :: Opts -> IO () run opts = withTracer \tracer -> do - (rThreads, env) <- createEnv opts - let logger = env ^. applog - - runDirect env setUpRabbitMqExchangesAndQueues - - runClient (env ^. cstate) $ - versionCheck lastSchemaVersion - let s = newSettings $ defaultServer (unpack . host $ opts ^. gundeck) (port $ opts ^. gundeck) logger - let throttleMillis = fromMaybe defSqsThrottleMillis $ opts ^. (settings . sqsThrottleMillis) - - lst <- Async.async $ Aws.execute (env ^. awsEnv) (Aws.listen throttleMillis (runDirect env . onEvent)) - wtbs <- forM (env ^. threadBudgetState) $ \tbs -> Async.async $ runDirect env $ watchThreadBudgetState tbs 10 - wCollectAuth <- Async.async (collectAuthMetrics (Aws._awsEnv (Env._awsEnv env))) - - app <- middleware env <*> pure (mkApp env) - inSpan tracer "gundeck" defaultSpanArguments {kind = Otel.Server} (runSettingsWithShutdown s app Nothing) `finally` do - Log.info logger $ Log.msg (Log.val "Shutting down ...") - shutdown (env ^. cstate) - Async.cancel lst - Async.cancel wCollectAuth - forM_ wtbs Async.cancel - forM_ rThreads Async.cancel - Redis.disconnect =<< takeMVar (env ^. rstate) - whenJust (env ^. rstateAdditionalWrite) $ (=<<) Redis.disconnect . takeMVar - Log.close (env ^. applog) + logger <- Logger.mkLogger (opts ^. logLevel) (opts ^. logNetStrings) (opts ^. logFormat) + Pulsar.withClient (Pulsar.defaultClientConfiguration {Pulsar.clientLogger = Just (pulsarClientLogger "publishToPulsar" logger)}) (toPulsarUrl (opts ^. Gundeck.Options.pulsar)) $ do + pulsarC <- ask + liftIO $ do + (rThreads, env) <- createEnv opts logger pulsarC + + runDirect env setUpRabbitMqExchangesAndQueues + + runClient (env ^. cstate) $ + versionCheck lastSchemaVersion + let s = newSettings $ defaultServer (unpack . host $ opts ^. gundeck) (port $ opts ^. gundeck) logger + let throttleMillis = fromMaybe defSqsThrottleMillis $ opts ^. (settings . sqsThrottleMillis) + + lst <- Async.async $ Aws.execute (env ^. awsEnv) (Aws.listen throttleMillis (runDirect env . onEvent)) + wtbs <- forM (env ^. threadBudgetState) $ \tbs -> Async.async $ runDirect env $ watchThreadBudgetState tbs 10 + wCollectAuth <- Async.async (collectAuthMetrics (Aws._awsEnv (Env._awsEnv env))) + + app <- middleware env <*> pure (mkApp env) + inSpan tracer "gundeck" defaultSpanArguments {kind = Otel.Server} (runSettingsWithShutdown s app Nothing) `finally` do + Log.info logger $ Log.msg (Log.val "Shutting down ...") + shutdown (env ^. cstate) + Async.cancel lst + Async.cancel wCollectAuth + forM_ wtbs Async.cancel + forM_ rThreads Async.cancel + Redis.disconnect =<< takeMVar (env ^. rstate) + whenJust (env ^. rstateAdditionalWrite) $ (=<<) Redis.disconnect . takeMVar + Log.close (env ^. applog) where setUpRabbitMqExchangesAndQueues :: Gundeck () setUpRabbitMqExchangesAndQueues = do diff --git a/services/gundeck/test/integration/Util.hs b/services/gundeck/test/integration/Util.hs index d6790424b2f..cb292baac4c 100644 --- a/services/gundeck/test/integration/Util.hs +++ b/services/gundeck/test/integration/Util.hs @@ -18,6 +18,7 @@ module Util where import Bilge qualified +import Cassandra.Options import Control.Concurrent (forkFinally) import Control.Concurrent.Async (race_) import Control.Exception qualified as E @@ -33,17 +34,24 @@ import Imports import Network.Socket hiding (openSocket) import Network.Socket.ByteString (recv, sendAll) import Network.Wai.Utilities.MockServer (withMockServer) +import Pulsar.Client qualified as Pulsar +import Pulsar.Client.Logging +import System.Logger.Extended qualified as Logger import TestSetup withSettingsOverrides :: (Opts -> Opts) -> TestM a -> TestM a withSettingsOverrides f action = do ts <- ask let opts = f (view tsOpts ts) - (_rThreads, env) <- liftIO $ createEnv opts - liftIO . lowerCodensity $ do - let app = mkApp env - p <- withMockServer app - liftIO $ Bilge.runHttpT (ts ^. tsManager) $ runReaderT (runTestM action) $ ts & tsGundeck .~ GundeckR (mkRequest p) + logger <- liftIO $ Logger.mkLogger (opts ^. logLevel) (opts ^. logNetStrings) (opts ^. logFormat) + Pulsar.withClient (Pulsar.defaultClientConfiguration {Pulsar.clientLogger = Just (pulsarClientLogger "withSettingsOverrides" logger)}) (toPulsarUrl (opts ^. Gundeck.Options.pulsar)) $ do + pulsarC <- ask + liftIO $ do + (_rThreads, env) <- liftIO $ createEnv opts logger pulsarC + liftIO . lowerCodensity $ do + let app = mkApp env + p <- withMockServer app + liftIO $ Bilge.runHttpT (ts ^. tsManager) $ runReaderT (runTestM action) $ ts & tsGundeck .~ GundeckR (mkRequest p) where mkRequest p = Bilge.host "127.0.0.1" . Bilge.port p diff --git a/services/gundeck/test/unit/MockGundeck.hs b/services/gundeck/test/unit/MockGundeck.hs index aaef7736187..e9c56580d12 100644 --- a/services/gundeck/test/unit/MockGundeck.hs +++ b/services/gundeck/test/unit/MockGundeck.hs @@ -109,8 +109,10 @@ data MockState = MockState -- | Non-transient notifications that are stored in the database first thing before -- delivery (so clients can always come back and pick them up later until they expire). _msCassQueue :: NotifQueue, - -- | A record of notifications that have been puhsed via RabbitMQ. - _msRabbitQueue :: Map (Text, Text) IntMultiSet + -- | A record of notifications that have been pushed via RabbitMQ. + _msRabbitQueue :: Map (Text, Text) IntMultiSet, + -- | A record of notifications that have been pushed via Pulsar. + _msPulsarQueue :: Map Text IntMultiSet } deriving (Eq) @@ -126,13 +128,13 @@ makeLenses ''MockEnv makeLenses ''MockState instance Show MockState where - show (MockState w n c r) = + show (MockState w n c r p) = intercalate "\n" - ["", "websocket: " <> show w, "native: " <> show n, "cassandra: " <> show c, "rabbitmq: " <> show r, ""] + ["", "websocket: " <> show w, "native: " <> show n, "cassandra: " <> show c, "rabbitmq: " <> show r, "pulsar: " <> show p, ""] emptyMockState :: MockState -emptyMockState = MockState mempty mempty mempty mempty +emptyMockState = MockState mempty mempty mempty mempty mempty -- these custom instances make for better error reports if tests fail. instance ToJSON MockEnv where @@ -447,6 +449,7 @@ instance MonadPushAll MockGundeck where mpaRunWithBudget _ _ = id -- no throttling needed as long as we don't overdo it in the tests... mpaGetClients = mockGetClients mpaPublishToRabbitMq = mockPushRabbitMq + mpaPublishToPulsar = mockPushPulsar instance MonadNativeTargets MockGundeck where mntgtLogErr _ = pure () @@ -575,7 +578,8 @@ handlePushRabbit Push {..} = do forM_ _pushRecipients $ \(Recipient uid _ cids) -> do clients <- Set.toList . Set.unions . Map.elems . (.userClientsFull) <$> mpaGetClients (Set.singleton uid) let legacyClients = map (.clientId) $ filter (not . supportsConsumableNotifications) clients - let routingKeys = case cids of + -- TODO: This could surely be expressed in a more elegant manner. However, it should be correct. + let routingKeys = nub $ case cids of RecipientClientsAll -> case legacyClients of [] -> [userRoutingKey uid] @@ -583,9 +587,11 @@ handlePushRabbit Push {..} = do let rabbitClients = filter (`notElem` legacyClients) $ map (.clientId) clients in [userRoutingKey uid | not (null rabbitClients)] RecipientClientsSome cc -> - (clientRoutingKey uid <$> filter (`notElem` legacyClients) (toList cc)) + case filter (`notElem` legacyClients) (toList cc) of + [] -> [] + _xs -> [userRoutingKey uid] for routingKeys $ \routingKey -> - msRabbitQueue %= deliver ("user-notifications", routingKey) _pushPayload + msPulsarQueue %= deliver routingKey _pushPayload when _pushIsCellsEvent $ do msRabbitQueue %= deliver ("", "cells") _pushPayload @@ -652,6 +658,13 @@ mockPushRabbitMq exchange routingKey message = do Right (queuedNotif :: QueuedNotification) -> msRabbitQueue %= deliver (exchange, routingKey) (queuedNotif ^. queuedNotificationPayload) +mockPushPulsar :: Text -> AMQP.Message -> MockGundeck () +mockPushPulsar exchange message = do + case Aeson.eitherDecode message.msgBody of + Left e -> error $ "Invalid message body: " <> e + Right (queuedNotif :: QueuedNotification) -> + msPulsarQueue %= deliver exchange (queuedNotif ^. queuedNotificationPayload) + mockLookupAddresses :: (HasCallStack, m ~ MockGundeck) => UserId -> diff --git a/services/integration.yaml b/services/integration.yaml index 427aa761d1e..9ec920a603d 100644 --- a/services/integration.yaml +++ b/services/integration.yaml @@ -330,3 +330,7 @@ integrationTestHostName: "localhost" additionalElasticSearch: https://localhost:9201 cellsEventQueue: cells_events + +shardingGroupCount: 1 +maxUserNo: 1000 +maxDeliveryDelay: 120