Skip to content

Commit a0e5aea

Browse files
committed
Adding test coverage for cluster pubsub behavior with maihntenance push notifications. (#3860)
1 parent 3f6e277 commit a0e5aea

File tree

2 files changed

+179
-5
lines changed

2 files changed

+179
-5
lines changed

redis/maint_notifications.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,18 +1004,23 @@ def handle_oss_maintenance_completed_notification(
10041004
disconnect_startup_nodes_pools=False,
10051005
additional_startup_nodes_info=[(new_node_host, int(new_node_port))],
10061006
)
1007+
# mark for reconnect all in use connections to the node - this will force them to
1008+
# disconnect after they complete their current commands
1009+
# Some of them might be used by sub sub and we don't know which ones - so we disconnect
1010+
# all in flight connections after they are done with current command execution
1011+
for conn in (
1012+
current_node.redis_connection.connection_pool._get_in_use_connections()
1013+
):
1014+
conn.mark_for_reconnect()
10071015

10081016
if (
10091017
current_node
10101018
not in self.cluster_client.nodes_manager.nodes_cache.values()
10111019
):
1012-
# disconnect all free connections to the node
1020+
# disconnect all free connections to the node - this node will be dropped
1021+
# from the cluster, so we don't need to revert the timeouts
10131022
for conn in current_node.redis_connection.connection_pool._get_free_connections():
10141023
conn.disconnect()
1015-
# mark for reconnect all in use connections to the node - this will force them to
1016-
# disconnect after they complete their current commands
1017-
for conn in current_node.redis_connection.connection_pool._get_in_use_connections():
1018-
conn.mark_for_reconnect()
10191024
else:
10201025
if self.config.is_relaxed_timeouts_enabled():
10211026
# reset the timeouts for the node to which the connection is connected
@@ -1025,6 +1030,7 @@ def handle_oss_maintenance_completed_notification(
10251030
*current_node.redis_connection.connection_pool._get_in_use_connections(),
10261031
*current_node.redis_connection.connection_pool._get_free_connections(),
10271032
):
1033+
conn.reset_tmp_settings(reset_relaxed_timeout=True)
10281034
conn.update_current_socket_timeout(relaxed_timeout=-1)
10291035
conn.maintenance_state = MaintenanceState.NONE
10301036

tests/maint_notifications/test_cluster_maint_notifications_handling.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
from asyncio import Queue
12
from dataclasses import dataclass
3+
from threading import Thread
24
from typing import List, Optional, cast
35

46
from redis import ConnectionPool, RedisCluster
@@ -975,3 +977,169 @@ def test_smigrating_smigrated_on_the_same_node_two_slot_ranges(
975977
),
976978
],
977979
)
980+
981+
def test_smigrating_smigrated_with_sharded_pubsub(
982+
self,
983+
):
984+
"""
985+
Test handling of sharded pubsub connections when SMIGRATING and SMIGRATED
986+
notifications are received.
987+
"""
988+
# warm up connection pools - create several connections in each pool
989+
self._warm_up_connection_pools(self.cluster, created_connections_count=5)
990+
991+
node_1 = self.cluster.nodes_manager.get_node(host="0.0.0.0", port=NODE_PORT_1)
992+
993+
pubsub = self.cluster.pubsub()
994+
995+
# subscribe to a channel on node1
996+
pubsub.ssubscribe("anyprefix:{7}:k")
997+
998+
msg = pubsub.get_sharded_message(
999+
ignore_subscribe_messages=False, timeout=10, target_node=node_1
1000+
)
1001+
# subscribe msg
1002+
assert msg is not None and msg["type"] == "ssubscribe"
1003+
1004+
smigrating_node_1 = RespTranslator.oss_maint_notification_to_resp(
1005+
"SMIGRATING 12 <5200-5460>"
1006+
)
1007+
self.proxy_helper.send_notification(NODE_PORT_1, smigrating_node_1)
1008+
1009+
# get message with node 1 connection to consume the notification
1010+
# timeout is 1 second
1011+
msg = pubsub.get_sharded_message(ignore_subscribe_messages=False, timeout=5000)
1012+
# smigrating handled
1013+
assert msg is None
1014+
1015+
assert pubsub.node_pubsub_mapping[node_1.name].connection._sock is not None
1016+
assert pubsub.node_pubsub_mapping[node_1.name].connection._socket_timeout == 30
1017+
assert (
1018+
pubsub.node_pubsub_mapping[node_1.name].connection._socket_connect_timeout
1019+
== 30
1020+
)
1021+
1022+
self.proxy_helper.set_cluster_slots(
1023+
"test_topology",
1024+
[
1025+
SlotsRange("0.0.0.0", NODE_PORT_1, 0, 5200),
1026+
SlotsRange("0.0.0.0", NODE_PORT_2, 5201, 10922),
1027+
SlotsRange("0.0.0.0", NODE_PORT_3, 10923, 16383),
1028+
],
1029+
)
1030+
1031+
smigrated_node_1 = RespTranslator.oss_maint_notification_to_resp(
1032+
"SMIGRATED 14 0.0.0.0:15380 <5200-5460>"
1033+
)
1034+
self.proxy_helper.send_notification(NODE_PORT_1, smigrated_node_1)
1035+
# execute command with node 1 connection
1036+
# this will first consume the SMIGRATING notification for the connection
1037+
# this should update the cluster topology and move the slot range to the new node
1038+
# and should set the pubsub connection for reconnect
1039+
res = self.cluster.set("anyprefix:{3}:k", "VAL")
1040+
assert res is True
1041+
1042+
assert pubsub.node_pubsub_mapping[node_1.name].connection._should_reconnect
1043+
assert pubsub.node_pubsub_mapping[node_1.name].connection._sock is not None
1044+
assert (
1045+
pubsub.node_pubsub_mapping[node_1.name].connection._socket_timeout is None
1046+
)
1047+
assert (
1048+
pubsub.node_pubsub_mapping[node_1.name].connection._socket_connect_timeout
1049+
is None
1050+
)
1051+
1052+
# first message will be SMIGRATED notification handling
1053+
# during this read connection will be reconnected and will resubscribe to channels
1054+
msg = pubsub.get_sharded_message(ignore_subscribe_messages=True, timeout=10)
1055+
assert msg is None
1056+
1057+
assert not pubsub.node_pubsub_mapping[node_1.name].connection._should_reconnect
1058+
assert pubsub.node_pubsub_mapping[node_1.name].connection._sock is not None
1059+
assert (
1060+
pubsub.node_pubsub_mapping[node_1.name].connection._socket_timeout is None
1061+
)
1062+
assert (
1063+
pubsub.node_pubsub_mapping[node_1.name].connection._socket_connect_timeout
1064+
is None
1065+
)
1066+
assert (
1067+
pubsub.node_pubsub_mapping[node_1.name].connection.maintenance_state
1068+
== MaintenanceState.NONE
1069+
)
1070+
# validate resubscribed
1071+
assert pubsub.node_pubsub_mapping[node_1.name].subscribed
1072+
1073+
def test_smigrating_smigrated_with_std_pubsub(
1074+
self,
1075+
):
1076+
"""
1077+
Test handling of standard pubsub connections when SMIGRATING and SMIGRATED
1078+
notifications are received.
1079+
"""
1080+
# warm up connection pools - create several connections in each pool
1081+
self._warm_up_connection_pools(self.cluster, created_connections_count=5)
1082+
1083+
pubsub = self.cluster.pubsub()
1084+
1085+
# subscribe to a channel on node1
1086+
pubsub.subscribe("anyprefix:{7}:k")
1087+
1088+
msg = pubsub.get_message(ignore_subscribe_messages=False, timeout=10)
1089+
# subscribe msg
1090+
assert msg is not None and msg["type"] == "subscribe"
1091+
1092+
smigrating_node_1 = RespTranslator.oss_maint_notification_to_resp(
1093+
"SMIGRATING 12 <5200-5460>"
1094+
)
1095+
self.proxy_helper.send_notification(NODE_PORT_1, smigrating_node_1)
1096+
1097+
# get message with node 1 connection to consume the notification
1098+
# timeout is 1 second
1099+
msg = pubsub.get_message(ignore_subscribe_messages=False, timeout=5000)
1100+
# smigrating handled
1101+
assert msg is None
1102+
1103+
assert pubsub.connection._sock is not None
1104+
assert pubsub.connection._socket_timeout == 30
1105+
assert pubsub.connection._socket_connect_timeout == 30
1106+
1107+
self.proxy_helper.set_cluster_slots(
1108+
"test_topology",
1109+
[
1110+
SlotsRange("0.0.0.0", NODE_PORT_1, 0, 5200),
1111+
SlotsRange("0.0.0.0", NODE_PORT_2, 5201, 10922),
1112+
SlotsRange("0.0.0.0", NODE_PORT_3, 10923, 16383),
1113+
],
1114+
)
1115+
1116+
smigrated_node_1 = RespTranslator.oss_maint_notification_to_resp(
1117+
"SMIGRATED 14 0.0.0.0:15380 <5200-5460>"
1118+
)
1119+
self.proxy_helper.send_notification(NODE_PORT_1, smigrated_node_1)
1120+
# execute command with node 1 connection
1121+
# this will first consume the SMIGRATING notification for the connection
1122+
# this should update the cluster topology and move the slot range to the new node
1123+
# and should set the pubsub connection for reconnect
1124+
res = self.cluster.set("anyprefix:{3}:k", "VAL")
1125+
assert res is True
1126+
1127+
assert res is True
1128+
1129+
assert pubsub.connection._should_reconnect
1130+
assert pubsub.connection._sock is not None
1131+
assert pubsub.connection._socket_timeout is None
1132+
assert pubsub.connection._socket_connect_timeout is None
1133+
1134+
# first message will be SMIGRATED notification handling
1135+
# during this read connection will be reconnected and will resubscribe to channels
1136+
msg = pubsub.get_message(ignore_subscribe_messages=True, timeout=10)
1137+
assert msg is None
1138+
1139+
assert not pubsub.connection._should_reconnect
1140+
assert pubsub.connection._sock is not None
1141+
assert pubsub.connection._socket_timeout is None
1142+
assert pubsub.connection._socket_connect_timeout is None
1143+
assert pubsub.connection.maintenance_state == MaintenanceState.NONE
1144+
# validate resubscribed
1145+
assert pubsub.subscribed

0 commit comments

Comments
 (0)