Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e6710a3
Add pulsar to docker-compose
supersven Oct 8, 2025
3900186
Add pulsar-client-hs to nix env
supersven Nov 13, 2025
4085e80
Pretend to use pulsar-client-hs in Cannon
supersven Nov 13, 2025
d8c55b0
Upate pulsar-hs pin
supersven Nov 14, 2025
91de533
Better start with Gundeck
supersven Nov 17, 2025
3576277
Replace pulsar-hs overlay
supersven Nov 17, 2025
d2366c5
Push Gundeck notifications also to Pulsar
supersven Nov 17, 2025
a79b656
Add todo
supersven Nov 17, 2025
92fd10f
Prepare user-notifications topic in test setup
supersven Nov 18, 2025
830ee0e
Improved logging on Pulsar notification push
supersven Nov 18, 2025
f0dc1e3
Configure message TTL
supersven Nov 18, 2025
69fb21d
Add logger to C client
supersven Nov 18, 2025
c8236b4
WIP: Use Pulsar for notification websockets (Cannon)
supersven Nov 19, 2025
d10d176
Improve "logging"
supersven Nov 19, 2025
ab142c8
Send PulsarMessage(s) via Pulsar
supersven Nov 19, 2025
0322be4
Do not push to RabbitMq
supersven Nov 19, 2025
bd45808
Remove clutter
supersven Nov 19, 2025
7e428f5
Remove clutter
supersven Nov 20, 2025
4c02e98
Fix subscriptions and topics
supersven Nov 20, 2025
c0a00c4
Fix SynchronizationData creation
supersven Nov 20, 2025
5282cc0
Add TODO
supersven Nov 20, 2025
937bc6d
Remove obsolete TODO
supersven Nov 20, 2025
95ffc95
Close consumer when websocket closes
supersven Nov 21, 2025
97dcdd3
WIP more consumer functionality
supersven Nov 21, 2025
6530ee9
Acknowledge/reject messages
supersven Nov 21, 2025
c9f6b93
Cancel other Asyncs on close signal
supersven Nov 24, 2025
4ab56c0
Subscription lifecycle
supersven Nov 24, 2025
ece7c34
Limit amount of non-acked messages
supersven Nov 24, 2025
9b29781
Remove RabbitMQ leftovers
supersven Nov 24, 2025
1cfc69c
A bit more expressive types
supersven Nov 24, 2025
ca4e415
Remove bad threadDelay
supersven Nov 26, 2025
5639ec6
Move Pulsar REST API port 8080 -> 5080
supersven Dec 1, 2025
45f01f2
Mark missing test mock implementation with `todo`
supersven Dec 1, 2025
157fc49
Make Pulsar Endpoint configurable
supersven Dec 1, 2025
7c819d0
Add Pulsar endpoint config to Helm charts
supersven Dec 1, 2025
f1af4a2
Fix Gundeck unit test setup
supersven Dec 1, 2025
d22bcb6
Simplify expression
supersven Dec 1, 2025
ff2da8a
Improve logging
supersven Dec 1, 2025
49a0c06
Typo
supersven Dec 1, 2025
67c4117
Replace traceM with logging framework
supersven Dec 1, 2025
a7e006e
Extract common Pulsar logging helpers
supersven Dec 1, 2025
aad4fe3
Delete useless comment
supersven Dec 1, 2025
ebaa1bd
Delete PulsarQueueInfo
supersven Dec 1, 2025
d169442
Replace traceM with serious logging
supersven Dec 1, 2025
6527dcb
Fix warning about List1
supersven Dec 2, 2025
6c4c031
registerConsumableNotificationsClient: Log all exceptions
supersven Dec 4, 2025
f590c30
Hi CI
supersven Dec 5, 2025
affb323
use Admin API to create subscriptions for clients
supersven Dec 5, 2025
fdf55c4
WIP: Add notification benchmark test
supersven Dec 8, 2025
cf31123
Create one Pulsar client per app
supersven Dec 10, 2025
8225fc9
Adjust NotificationBenchmark for Pulsar
supersven Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ fake-aws fake-aws-s3 fake-aws-sqs aws-ingress fluent-bit kibana backoffice \
calling-test demo-smtp elasticsearch-curator elasticsearch-external \
elasticsearch-ephemeral minio-external cassandra-external \
ingress-nginx-controller nginx-ingress-services reaper restund \
k8ssandra-test-cluster ldap-scim-bridge wire-server-enterprise
k8ssandra-test-cluster ldap-scim-bridge wire-server-enterprise \
integration
KIND_CLUSTER_NAME := wire-server
HELM_PARALLELISM ?= 1 # 1 for sequential tests; 6 for all-parallel tests
# (run `psql -h localhost -p 5432 -d backendA -U wire-server -w` for the list of options for PSQL_DB)
Expand Down
6 changes: 6 additions & 0 deletions charts/cannon/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ data:
rabbitMqMaxConnections: {{ .config.rabbitMqMaxConnections }}
rabbitMqMaxChannels: {{ .config.rabbitMqMaxChannels }}

{{- with .config.pulsar }}
pulsar:
host: {{ .host }}
port: {{ .port }}
{{- end }}

drainOpts:
gracePeriodSeconds: {{ .config.drainOpts.gracePeriodSeconds }}
millisecondsBetweenBatches: {{ .config.drainOpts.millisecondsBetweenBatches }}
Expand Down
4 changes: 4 additions & 0 deletions charts/cannon/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ config:
# name: <secret-name>
# key: <ca-attribute>

pulsar:
host: localhost
port: 6650

# See also the section 'Controlling the speed of websocket draining during
# cannon pod replacement' in docs/how-to/install/configuration-options.rst
drainOpts:
Expand Down
12 changes: 12 additions & 0 deletions charts/gundeck/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ data:
{{- end }}
{{- end }}

{{- with .pulsar }}
pulsar:
host: {{ .host }}
port: {{ .port }}
{{- end }}

{{- with .pulsarAdmin }}
pulsarAdmin:
host: {{ .host }}
port: {{ .port }}
{{- end }}

redis:
host: {{ .redis.host }}
port: {{ .redis.port }}
Expand Down
8 changes: 8 additions & 0 deletions charts/gundeck/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ config:
# name: <secret-name>
# key: <ca-attribute>

pulsar:
host: localhost
port: 6650

pulsarAdmin:
host: localhost
port: 8080

# To enable additional writes during a migration:
# redisAdditionalWrite:
# host: redis-two
Expand Down
44 changes: 44 additions & 0 deletions deploy/dockerephemeral/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,50 @@ services:
# - DNS_SERVER_RECURSION_ALLOWED_NETWORKS=127.0.0.1, 192.168.1.0/24 #Comma separated list of IP addresses or network addresses to allow recursion. Valid only for `UseSpecifiedNetworkACL` recursion option. This option is obsolete and DNS_SERVER_RECURSION_NETWORK_ACL should be used instead.
# - DNS_SERVER_ENABLE_BLOCKING=false #Sets the DNS server to block domain names using Blocked Zone and Block List Zone.

pulsar:
image: apachepulsar/pulsar:latest
container_name: pulsar
# 8080 belongs to nginz
ports:
- 6650:6650
- 5080:8080
networks:
- demo_wire
command: bin/pulsar standalone

# Admin GUI
# Url: http://localhost:9527
# Username: pulsar
# Password: walter-frosch
pulsar-manager:
image: apachepulsar/pulsar-manager:latest
container_name: pulsar-manager
ports:
- 9527:9527
- 7750:7750
networks:
- demo_wire
environment:
- "SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties"
healthcheck:
test:
["CMD-SHELL", "curl http://127.0.0.1:7750/actuator/health || exit 1 "]
interval: 3s
timeout: 5s
retries: 100

pulsar-init:
container_name: pulsar-init
image: curlimages/curl:latest
networks:
- demo_wire
command: init-pulsar
volumes:
- ./pulsar-config/init-pulsar.sh:/bin/init-pulsar
depends_on:
pulsar-manager:
condition: service_healthy

volumes:
redis-node-1-data:
redis-node-2-data:
Expand Down
20 changes: 20 additions & 0 deletions deploy/dockerephemeral/pulsar-config/init-pulsar.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env sh

# Create a superuser in pulsar-manager.
# username: pulsar
# password: walter-frosch
CSRF_TOKEN=$(curl http://pulsar-manager:7750/pulsar-manager/csrf-token)
curl -v -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H "Content-Type: application/json" \
-X PUT http://pulsar-manager:7750/pulsar-manager/users/superuser \
-d '{"name": "pulsar", "password": "walter-frosch", "description": "test", "email": "username@test.org"}'

curl -v -X PUT http://pulsar:8080/admin/v2/tenants/wire \
-H "Content-Type: application/json" \
-d '{"adminRoles": ["pulsar"], "allowedClusters": ["standalone"]}'

curl -v -X PUT \
http://pulsar:8080/admin/v2/namespaces/wire/user-notifications \
-H "Content-Type: application/json" \
-d '{"message_ttl_in_seconds":3600}'
2 changes: 2 additions & 0 deletions integration/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
, split
, stm
, streaming-commons
, streamly
, string-conversions
, system-linux-proc
, tagged
Expand Down Expand Up @@ -179,6 +180,7 @@ mkDerivation {
split
stm
streaming-commons
streamly
string-conversions
system-linux-proc
tagged
Expand Down
2 changes: 2 additions & 0 deletions integration/integration.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ library
Test.MLS.Unreachable
Test.NginxZAuthModule
Test.Notifications
Test.NotificationsBenchmark
Test.OAuth
Test.PasswordReset
Test.Presence
Expand Down Expand Up @@ -299,6 +300,7 @@ library
, split
, stm
, streaming-commons
, streamly
, string-conversions
, system-linux-proc
, tagged
Expand Down
180 changes: 180 additions & 0 deletions integration/test/Test/NotificationsBenchmark.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
module Test.NotificationsBenchmark where

import API.Brig
import API.BrigCommon
import API.Common
import API.GundeckInternal
import Control.Concurrent
import Control.Concurrent.Async (Async)
import Control.Concurrent.STM.TBQueue
import Control.Monad.Codensity (Codensity (..))
import Control.Monad.Reader (MonadReader (ask), asks)
import Control.Monad.Reader.Class (local)
import Control.Retry
import qualified Data.Map.Strict as Map
import Data.Time
import Debug.Trace
import GHC.Conc.Sync
import GHC.Stack
import SetupHelpers
import qualified Streamly.Data.Fold.Prelude as Fold
import qualified Streamly.Data.Stream.Prelude as Stream
import System.Random
import qualified Test.Events as TestEvents
import Testlib.Prekeys
import Testlib.Prelude
import UnliftIO (async, waitAnyCancel)

data TestRecipient = TestRecipient
{ user :: Value,
clientIds :: [String]
}
deriving (Show)

testBench :: (HasCallStack) => App ()
testBench = do
shardingGroupCount <- asks (.shardingGroupCount)
shardingGroup <- asks (.shardingGroup)
maxUserNo <- asks (.maxUserNo)
env <- ask

-- Preparation
let threadCount = min numCapabilities (fromIntegral maxUserNo)
parCfg = Stream.maxThreads threadCount . Stream.ordered False
toMap = Fold.foldl' (\kv (k, v) -> Map.insert k v kv) Map.empty
-- Later, we only read from this map. Thus, it doesn't have to be thread-safe.
userMap :: Map Word TestRecipient <-
Stream.fromList [0 :: Word .. maxUserNo]
& Stream.filter ((shardingGroup ==) . (`mod` shardingGroupCount))
& Stream.parMapM parCfg (\i -> generateTestRecipient >>= \r -> pure (i, r))
& Stream.fold toMap

now <- liftIO getCurrentTime

-- TODO: To be replaced with real data from the file. (See
-- https://wearezeta.atlassian.net/wiki/spaces/PET/pages/2118680620/Simulating+production-like+data)
let fakeData = zip (plusDelta now <$> [0 :: Word ..]) (cycle [1 .. maxUserNo])

tbchans :: [(Int, TBQueue (UTCTime, Word))] <- liftIO $ forM [1 .. threadCount] $ \i -> do
q <- atomically $ newTBQueue 1 -- capacity 100, adjust as needed
pure (i, q)

workers :: [Async ()] <- forM tbchans $ \(i, chan) -> liftIO
. async
-- . handle (\(e :: SomeException) -> traceM $ "Caught exception in worker id=" <> show i <> " e=" <> show e)
. forever
. runAppWithEnv env
$ do
traceM $ "worker taking from id=" <> show i
(t, uNo) <- liftIO $ atomically $ readTBQueue chan
traceM $ "worker took from id=" <> show i <> " val=" <> show (t, uNo)
sendAndReceive uNo userMap
traceM $ "worker end id=" <> show i

producer <- async $ forM_ fakeData $ \(t, uNo) ->
when ((uNo `mod` shardingGroupCount) == shardingGroup)
$ let workerShard = fromIntegral uNo `mod` threadCount
(i, chan) = tbchans !! workerShard
in do
waitForTimeStamp t
traceM $ "producer putting to shard=" <> show workerShard <> " id=" <> show i <> " val=" <> show (t, uNo)
liftIO $ atomically $ writeTBQueue chan (t, uNo)

liftIO . void . waitAnyCancel $ producer : workers

waitForTimeStamp :: UTCTime -> App ()
waitForTimeStamp timestamp = liftIO $ do
now <- getCurrentTime
when (now < timestamp)
$
-- Event comes from the simulated future: Wait here until now and timestamp are aligned.
let delta = diffTimeToMicroSeconds $ diffUTCTime timestamp now
in print ("Waiting " ++ show delta ++ " microseconds. (timestamp, now)" ++ show (timestamp, now))
>> threadDelay delta
where
diffTimeToMicroSeconds :: NominalDiffTime -> Int
diffTimeToMicroSeconds dt = floor @Double (realToFrac dt * 1_000_000)

plusDelta :: UTCTime -> Word -> UTCTime
plusDelta timestamp deltaMilliSeconds = addUTCTime (fromIntegral deltaMilliSeconds / 1000) timestamp

sendAndReceive :: Word -> Map Word TestRecipient -> App ()
sendAndReceive userNo userMap = do
print $ "pushing to user" ++ show userNo
let testRecipient = userMap Map.! (fromIntegral userNo)
alice = testRecipient.user

r <- recipient alice
payload :: Value <- toJSON <$> liftIO randomPayload
now <- liftIO $ getCurrentTime
let push =
object
[ "recipients" .= [r],
"payload"
.= [ object
[ "foo" .= payload,
"sent_at" .= now
]
]
]

void $ postPush alice [push] >>= getBody 200

traceM $ "pushed to userNo=" <> show userNo <> " push=" <> show push

messageDeliveryTimeout <- asks $ fromIntegral . (.maxDeliveryDelay)
traceM $ "XXX all clientIds " <> show testRecipient.clientIds
forM_ (testRecipient.clientIds) $ \(cid :: String) -> do
traceM $ "XXX using clientId=" <> show cid
runCodensity (TestEvents.createEventsWebSocket alice (Just cid)) $ \ws -> do
-- TODO: Tweak this value to the least acceptable event delivery duration
local (setTimeoutTo messageDeliveryTimeout) $ TestEvents.assertFindsEvent ws $ \e -> do
receivedAt <- liftIO getCurrentTime
p <- e %. "data.event.payload.0"
sentAt <-
(p %. "sent_at" >>= asString)
<&> ( \sentAtStr ->
fromMaybe (error ("Cannot parse timestamp: " <> sentAtStr)) $ parseUTC sentAtStr
)
print $ "Message sent/receive delta: " ++ show (diffUTCTime receivedAt sentAt)

p %. "foo" `shouldMatch` payload
traceM $ "XXX Succeeded for clientId=" <> show cid
where
-- \| Generate a random string with random length up to 2048 bytes
randomPayload :: IO String
randomPayload =
-- Measured with
-- `kubectl exec --namespace databases -it gundeck-gundeck-eks-eu-west-1a-sts-0 -- sh -c 'cqlsh -e "select blobAsText(payload) from gundeck.notifications LIMIT 5000;" ' | sed 's/^[ \t]*//;s/[ \t]*$//' | wc`
let len :: Int = 884 -- measured in prod
in mapM (\_ -> randomRIO ('\32', '\126')) [1 .. len] -- printable ASCII
parseUTC :: String -> Maybe UTCTime
parseUTC = parseTimeM True defaultTimeLocale "%Y-%m-%dT%H:%M:%S%QZ"

setTimeoutTo :: Int -> Env -> Env
setTimeoutTo tSecs env = env {timeOutSeconds = tSecs}

generateTestRecipient :: (HasCallStack) => App TestRecipient
generateTestRecipient = do
print "generateTestRecipient"
user <- recover $ (randomUser OwnDomain def)
r <- randomRIO @Int (1, 8)
clientIds <- forM [0 .. r] $ \i -> do
client <-
recover
$ addClient
user
def
{ acapabilities = Just ["consumable-notifications"],
prekeys = Nothing,
lastPrekey = Just $ (someLastPrekeysRendered !! i),
clabel = "Test Client No. " <> show i,
model = "Test Model No. " <> show i
}
>>= getJSON 201
objId client

pure $ TestRecipient user clientIds
where
recover :: App a -> App a
recover = recoverAll (limitRetriesByCumulativeDelay 300 (exponentialBackoff 1_000_000)) . const
16 changes: 12 additions & 4 deletions integration/test/Testlib/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ serviceHostPort m Stern = m.stern
serviceHostPort m FederatorInternal = m.federatorInternal
serviceHostPort m WireServerEnterprise = m.wireServerEnterprise

mkGlobalEnv :: FilePath -> Codensity IO GlobalEnv
mkGlobalEnv cfgFile = do
mkGlobalEnv :: FilePath -> Word -> Codensity IO GlobalEnv
mkGlobalEnv cfgFile shardingGroup = do
eith <- liftIO $ Yaml.decodeFileEither cfgFile
intConfig <- liftIO $ case eith of
Left err -> do
Expand Down Expand Up @@ -145,7 +145,11 @@ mkGlobalEnv cfgFile = do
gDNSMockServerConfig = intConfig.dnsMockServer,
gCellsEventQueue = intConfig.cellsEventQueue,
gCellsEventWatchersLock,
gCellsEventWatchers
gCellsEventWatchers,
gShardingGroupCount = intConfig.shardingGroupCount,
gShardingGroup = shardingGroup,
gMaxUserNo = intConfig.maxUserNo,
gMaxDeliveryDelay = intConfig.maxDeliveryDelay
}
where
createSSLContext :: Maybe FilePath -> IO (Maybe OpenSSL.SSLContext)
Expand Down Expand Up @@ -201,7 +205,11 @@ mkEnv currentTestName ge = do
dnsMockServerConfig = ge.gDNSMockServerConfig,
cellsEventQueue = ge.gCellsEventQueue,
cellsEventWatchersLock = ge.gCellsEventWatchersLock,
cellsEventWatchers = ge.gCellsEventWatchers
cellsEventWatchers = ge.gCellsEventWatchers,
shardingGroupCount = ge.gShardingGroupCount,
shardingGroup = ge.gShardingGroup,
maxUserNo = ge.gMaxUserNo,
maxDeliveryDelay = ge.gMaxDeliveryDelay
}

allCiphersuites :: [Ciphersuite]
Expand Down
Loading