From 01c0925b879a05b19abf4f931556de5f17166606 Mon Sep 17 00:00:00 2001 From: Caique Figueiredo Date: Thu, 10 Feb 2022 11:38:53 -0300 Subject: [PATCH 01/12] Set partition.assignment.strategy to range and cooperative --- nri-kafka/src/Kafka/Worker/Internal.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index bff7517e..8fb7db99 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -272,7 +272,8 @@ createConsumer ++ Consumer.extraProps ( Dict.fromList [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), - ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)), + ("partition.assignment.strategy", "range,cooperative-sticky") ] ) ++ case maybeStatsCallback of From 5142bf05a3136ccab0fd38dbcae9941bcccc3dde Mon Sep 17 00:00:00 2001 From: Caique Figueiredo Date: Thu, 10 Feb 2022 12:51:54 -0300 Subject: [PATCH 02/12] Start/stop zk with proper property paths --- run-tests.sh | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/run-tests.sh b/run-tests.sh index d0b60f20..6d96a163 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -16,18 +16,23 @@ pg_ctl start -o '-k .' mkdir -p ./_build/redis/data redis-server --daemonize yes --dir ./_build/redis/data -## start zookeeper (for kafka) +## start zookeeper (for kafka) +zk_server_properties_path=$(dirname "$(which zkServer.sh)")/../conf/zoo_sample.cfg mkdir -p /tmp/zookeeper /tmp/zookeeper-logs -ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh stop zoo_sample.cfg +ZOOPIDFILE=/tmp/zookeeper-logs/pid \ + ZOO_LOG_DIR=/tmp/zookeeper-logs \ + zkServer.sh stop "$zk_server_properties_path" rm -rf /tmp/zookeeper/* /tmp/zookeeper-logs/* -ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh start zoo_sample.cfg +ZOOPIDFILE=/tmp/zookeeper-logs/pid \ + ZOO_LOG_DIR=/tmp/zookeeper-logs \ + zkServer.sh start "$zk_server_properties_path" ## wait for zookeeper echo "waiting for zookeeper to start" until nc -vz localhost 2181 do sleep 1 -done +done echo "zookeeper available" ## start kafka @@ -50,4 +55,6 @@ cabal test all # cleanup kafka-server-stop.sh -ZOOPIDFILE=/tmp/zookeeper-logs/pid ZOO_LOG_DIR=/tmp/zookeeper-logs zkServer.sh stop zoo_sample.cfg +ZOOPIDFILE=/tmp/zookeeper-logs/pid \ + ZOO_LOG_DIR=/tmp/zookeeper-logs \ + zkServer.sh stop "$zk_server_properties_path" From 7265c9512045e32557567b0153417b4ad0765b32 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 10:41:03 -0300 Subject: [PATCH 03/12] Bump hw-kafka to use our fork --- nix/sources.json | 12 ++++++++++++ shell-ghc-8-10.nix | 2 ++ 2 files changed, 14 insertions(+) diff --git a/nix/sources.json b/nix/sources.json index 142a0664..26bf2ffa 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -1,4 +1,16 @@ { + "hw-kafka-client": { + "branch": "main", + "description": "Kafka client for Haskell, including auto-rebalancing consumers", + "homepage": null, + "owner": "NoRedInk", + "repo": "hw-kafka-client", + "rev": "afb77994286f9c4876f562fb0a8c7b098b56248f", + "sha256": "1ygmvw508n7dc6is9yzz8yc1k8nhz66f6snagvb7sjijfsym31lw", + "type": "tarball", + "url": "https://github.com/NoRedInk/hw-kafka-client/archive/afb77994286f9c4876f562fb0a8c7b098b56248f.tar.gz", + "url_template": "https://github.com///archive/.tar.gz" + }, "niv": { "branch": "master", "description": "Easy dependency management for Nix projects", diff --git a/shell-ghc-8-10.nix b/shell-ghc-8-10.nix index dbc64a2f..d17892cd 100644 --- a/shell-ghc-8-10.nix +++ b/shell-ghc-8-10.nix @@ -10,5 +10,7 @@ in import nix/mk-shell.nix { safe-coloured-text-terminfo = super.callCabal2nix "safe-coloured-text-terminfo" "${sources.safe-coloured-text}/safe-coloured-text-terminfo" { }; + hw-kafka-client = super.haskell.lib.dontCheck + (super.callCabal2nix "hw-kafka-client" sources.hw-kafka-client { }); }); } From dbcb5b3b04e3824516b08de25188f4eb9a9617a1 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 11:00:47 -0300 Subject: [PATCH 04/12] Range and CooperativeSticky are not compatible We're gettingt this error: All partition.assignment.strategy (range,cooperative-sticky) assignors must have the same protocol type, online migration between assignors with different protocol types is not supported Some minimal reading of rdkafka made us believe range is EAGER and cooperative-sticky is .. well .. COOPERATIVE, which are different protocol types. This means we can't use both at the same time and in practice, means we need downtime on workers when deploying this change. Not terrible tbh. --- nri-kafka/src/Kafka/Worker/Internal.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 8fb7db99..eaf50523 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -273,7 +273,7 @@ createConsumer ( Dict.fromList [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)), - ("partition.assignment.strategy", "range,cooperative-sticky") + ("partition.assignment.strategy", "cooperative-sticky") ] ) ++ case maybeStatsCallback of From 60297d732452dfc3365828c1fd6080605abbfcd4 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 11:20:36 -0300 Subject: [PATCH 05/12] Use the type we created instead of extraprops Extra props doesn't set things up correctly to use incremental_assign and friends. --- nri-kafka/src/Kafka/Worker/Internal.hs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index eaf50523..227ce4c8 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -14,6 +14,7 @@ import qualified Data.UUID.V4 import qualified Dict import qualified GHC.Clock import qualified Kafka.Consumer as Consumer +import qualified Kafka.Consumer.AssignmentStrategy as AssignmentStrategy import qualified Kafka.Internal as Kafka import qualified Kafka.Metadata import qualified Kafka.Stats as Stats @@ -269,11 +270,11 @@ createConsumer ++ Consumer.logLevel logLevel ++ Consumer.setCallback (Consumer.rebalanceCallback rebalance) ++ Consumer.compression Consumer.Snappy + ++ Consumer.setAssignmentStrategy AssignmentStrategy.CooperativeSticky ++ Consumer.extraProps ( Dict.fromList [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), - ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)), - ("partition.assignment.strategy", "cooperative-sticky") + ("statistics.interval.ms", Text.fromInt (Settings.unStatisticsIntervalMs statisticsIntervalMs)) ] ) ++ case maybeStatsCallback of From b31490b5049f330d0bb690d6235bfb69b1ed05da Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 17:20:20 -0300 Subject: [PATCH 06/12] Super has no haskell --- shell-ghc-8-10.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shell-ghc-8-10.nix b/shell-ghc-8-10.nix index d17892cd..9ebf1b15 100644 --- a/shell-ghc-8-10.nix +++ b/shell-ghc-8-10.nix @@ -10,7 +10,7 @@ in import nix/mk-shell.nix { safe-coloured-text-terminfo = super.callCabal2nix "safe-coloured-text-terminfo" "${sources.safe-coloured-text}/safe-coloured-text-terminfo" { }; - hw-kafka-client = super.haskell.lib.dontCheck + hw-kafka-client = pkgs.haskell.lib.dontCheck (super.callCabal2nix "hw-kafka-client" sources.hw-kafka-client { }); }); } From d79f4faa8f3fd596cd9202a7a01259b4e809e0aa Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 17:20:56 -0300 Subject: [PATCH 07/12] Our fork is at 5.0.0 --- nri-kafka/nri-kafka.cabal | 4 ++-- nri-kafka/package.yaml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nri-kafka/nri-kafka.cabal b/nri-kafka/nri-kafka.cabal index 01fdfe56..319286ee 100644 --- a/nri-kafka/nri-kafka.cabal +++ b/nri-kafka/nri-kafka.cabal @@ -71,7 +71,7 @@ library , bytestring >=0.10.8.2 && <0.12 , conduit >=1.3.0 && <1.4 , containers >=0.6.0.1 && <0.7 - , hw-kafka-client >=4.0.3 && <5.0 + , hw-kafka-client >=5.0.0 && <6.0 , nri-env-parser >=0.1.0.0 && <0.2 , nri-observability >=0.1.1.1 && <0.2 , nri-prelude >=0.1.0.0 && <0.7 @@ -135,7 +135,7 @@ test-suite tests , bytestring >=0.10.8.2 && <0.12 , conduit >=1.3.0 && <1.4 , containers >=0.6.0.1 && <0.7 - , hw-kafka-client >=4.0.3 && <5.0 + , hw-kafka-client >=5.0.0 && <6.0 , nri-env-parser >=0.1.0.0 && <0.2 , nri-observability >=0.1.1.1 && <0.2 , nri-prelude >=0.1.0.0 && <0.7 diff --git a/nri-kafka/package.yaml b/nri-kafka/package.yaml index 4eb278b4..25112513 100644 --- a/nri-kafka/package.yaml +++ b/nri-kafka/package.yaml @@ -20,7 +20,7 @@ dependencies: - bytestring >= 0.10.8.2 && < 0.12 - conduit >= 1.3.0 && < 1.4 - containers >= 0.6.0.1 && < 0.7 - - hw-kafka-client >=4.0.3 && < 5.0 + - hw-kafka-client >=5.0.0 && < 6.0 - nri-env-parser >= 0.1.0.0 && < 0.2 - nri-observability >= 0.1.1.1 && < 0.2 - nri-prelude >= 0.1.0.0 && < 0.7 From 593a579b5fc3910c61e12cb7a032181396517622 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 17:22:56 -0300 Subject: [PATCH 08/12] 4.0.4 removed produceMessageBatch We've switched to the only remaining pollEvent proxy: flushProducer It might be fine to flush 'em every 100ms. We'll see. --- nri-kafka/src/Kafka.hs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 8e9a28d8..029f8620 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -159,7 +159,7 @@ key msg = Maybe.map Internal.unKey (Internal.key msg) handler :: Settings.Settings -> Maybe Stats.StatsCallback -> Conduit.Acquire Internal.Handler handler settings maybeStatsCallback = do producer <- Conduit.mkAcquire (mkProducer settings maybeStatsCallback) Producer.closeProducer - _ <- Conduit.mkAcquire (startPollEventLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) + _ <- Conduit.mkAcquire (startFlushLoop producer) (\terminator -> STM.atomically (TMVar.putTMVar terminator Terminate)) liftIO (mkHandler settings producer) data Terminate = Terminate @@ -167,24 +167,24 @@ data Terminate = Terminate -- | By default events only get polled right before sending a record to kafka. -- This means that the deliveryCallback only gets fired on the next call to produceMessage'. -- We want to be informed about delivery status as soon as possible though. -startPollEventLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b) -startPollEventLoop producer = do +-- The only way to do that right now in hw-kafka-client is by flushing the queue +startFlushLoop :: Producer.KafkaProducer -> Prelude.IO (TMVar.TMVar b) +startFlushLoop producer = do terminator <- STM.atomically TMVar.newEmptyTMVar _ <- Async.race_ - (pollEvents producer) + (flushProducer producer) (STM.atomically <| TMVar.readTMVar terminator) |> Async.async Prelude.pure terminator -- | We use a little trick here to poll events, by sending an empty message batch. -- This will call the internal pollEvent function in hw-kafka-client. -pollEvents :: Producer.KafkaProducer -> Prelude.IO () -pollEvents producer = do - Producer.produceMessageBatch producer [] - |> map (\_ -> ()) +flushProducer :: Producer.KafkaProducer -> Prelude.IO () +flushProducer producer = do + Producer.flushProducer producer Control.Concurrent.threadDelay 100_000 {- 100ms -} - pollEvents producer + flushProducer producer -- | mkHandler :: Settings.Settings -> Producer.KafkaProducer -> Prelude.IO Internal.Handler From ed6b17944cef73e9bcee10681140be419d55f141 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 17:23:51 -0300 Subject: [PATCH 09/12] New required field prHeaders --- nri-kafka/src/Kafka.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index 029f8620..411985d0 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -137,6 +137,7 @@ record msg = do |> ByteString.Lazy.toStrict ) (Internal.payload msg) + , Producer.prHeaders = Prelude.mempty } -- | The topic of a message. This function might sometimes be useful in tests. From 7b137d597f14c38cb4d0ce1dac75eda64bd3a39d Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 17:24:04 -0300 Subject: [PATCH 10/12] Fix name and hey it's a list --- nri-kafka/src/Kafka/Worker/Internal.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Worker/Internal.hs b/nri-kafka/src/Kafka/Worker/Internal.hs index 227ce4c8..36f41b03 100644 --- a/nri-kafka/src/Kafka/Worker/Internal.hs +++ b/nri-kafka/src/Kafka/Worker/Internal.hs @@ -270,7 +270,7 @@ createConsumer ++ Consumer.logLevel logLevel ++ Consumer.setCallback (Consumer.rebalanceCallback rebalance) ++ Consumer.compression Consumer.Snappy - ++ Consumer.setAssignmentStrategy AssignmentStrategy.CooperativeSticky + ++ Consumer.setAssignmentStrategy [AssignmentStrategy.CooperativeStickyAssignor] ++ Consumer.extraProps ( Dict.fromList [ ("max.poll.interval.ms", Text.fromInt (Settings.unMaxPollIntervalMs maxPollIntervalMs)), From da82ac63cf0d5eea32448e97ec61a2380f9c651e Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 21 Apr 2022 17:26:18 -0300 Subject: [PATCH 11/12] Fix test --- nri-kafka/test/Helpers.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/nri-kafka/test/Helpers.hs b/nri-kafka/test/Helpers.hs index 0212a460..73485b5a 100644 --- a/nri-kafka/test/Helpers.hs +++ b/nri-kafka/test/Helpers.hs @@ -161,6 +161,7 @@ record topicName partitionId val = { Producer.prTopic = Producer.TopicName (Internal.unTopic topicName), Producer.prPartition = Producer.SpecifiedPartition (Prelude.fromIntegral partitionId), Producer.prKey = Nothing, + Producer.prHeaders = Prelude.mempty, Producer.prValue = Internal.MsgWithMetaData { Internal.metaData = From b7b35b815e2bbeac1a78dfce3fdfb8b1fdadbe79 Mon Sep 17 00:00:00 2001 From: Juliano Solanho Date: Thu, 5 May 2022 11:03:28 -0300 Subject: [PATCH 12/12] Make partition assignment strategy configurable --- nri-kafka/src/Kafka/Settings.hs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/nri-kafka/src/Kafka/Settings.hs b/nri-kafka/src/Kafka/Settings.hs index 82240e85..722513fc 100644 --- a/nri-kafka/src/Kafka/Settings.hs +++ b/nri-kafka/src/Kafka/Settings.hs @@ -10,6 +10,7 @@ module Kafka.Settings where import qualified Environment +import qualified Kafka.Consumer.AssignmentStrategy as AssignmentStrategy import qualified Kafka.Producer import qualified Kafka.Settings.Internal as Internal import qualified Prelude @@ -27,7 +28,11 @@ data Settings = Settings -- | librdkafka statistics emit interval. The application also needs to -- register a stats callback using rd_kafka_conf_set_stats_cb(). The -- granularity is 1000ms. A value of 0 disables statistics. - statisticsIntervalMs :: StatisticsIntervalMs + statisticsIntervalMs :: StatisticsIntervalMs, + -- | partition assignment strategy for workers. one of + -- RangeAssignor: https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html + -- CooperativeStickyAssignor: https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html + partitionAssignmentStrategy :: AssignmentStrategy.ConsumerAssignmentStrategy } -- | Number of messages to batch together before sending to Kafka. @@ -52,6 +57,7 @@ decoder = |> andMap decoderDeliveryTimeout |> andMap decoderBatchNumMessages |> andMap decoderStatisticsIntervalMs + |> andMap decoderPartitionAssignmentStrategy decoderDeliveryTimeout :: Environment.Decoder Kafka.Producer.Timeout decoderDeliveryTimeout = @@ -82,3 +88,18 @@ decoderStatisticsIntervalMs = Environment.defaultValue = "0" } (map StatisticsIntervalMs Environment.int) + +decoderPartitionAssignmentStrategy :: Environment.Decoder AssignmentStrategy.ConsumerAssignmentStrategy +decoderPartitionAssignmentStrategy = + Environment.variable + Environment.Variable + { Environment.name = "KAFKA_PARTITION_ASSIGNMENT_STRATEGY", + Environment.description = "sets the kafka partition assignemnt strategy. one of: {cooperative-sticky,range-assignor}", + Environment.defaultValue = "cooperative-sticky" + } + ( Environment.custom Environment.text + <| \str -> case str of + "cooperative-sticky" -> Ok AssignmentStrategy.CooperativeStickyAssignor + "range-assignor" -> Ok AssignmentStrategy.RangeAssignor + invalidValue -> Err ("Invalid value: " ++ invalidValue) + )