@@ -26,6 +26,8 @@ public class HybridClusterTransport : IClusterTransport
2626 private readonly NodeInfo _nodeInfo ;
2727 private readonly ConcurrentDictionary < string , NodeInfo > _knownNodes ;
2828 private readonly CancellationTokenSource _cancellationTokenSource ;
29+ private readonly ConcurrentDictionary < string , DateTime > _processedMessageIds ; // 已处理的消息ID,用于去重
30+ private readonly Timer _messageIdCleanupTimer ; // 定期清理过期的消息ID
2931 private bool _disposed = false ;
3032 private bool _started = false ;
3133
@@ -78,6 +80,7 @@ public HybridClusterTransport(
7880
7981 _knownNodes = new ConcurrentDictionary < string , NodeInfo > ( ) ;
8082 _cancellationTokenSource = new CancellationTokenSource ( ) ;
83+ _processedMessageIds = new ConcurrentDictionary < string , DateTime > ( ) ;
8184
8285 // Create specific loggers using logger factory / 使用 logger factory 创建特定类型的 logger
8386 var discoveryLogger = loggerFactory . CreateLogger < RedisNodeDiscoveryService > ( ) ;
@@ -104,6 +107,9 @@ public HybridClusterTransport(
104107 // Subscribe to discovery events / 订阅发现事件
105108 _discoveryService . NodeDiscovered += OnNodeDiscovered ;
106109 _discoveryService . NodeRemoved += OnNodeRemoved ;
110+
111+ // Start message ID cleanup timer (clean up every 5 minutes) / 启动消息ID清理定时器(每5分钟清理一次)
112+ _messageIdCleanupTimer = new Timer ( CleanupProcessedMessageIds , null , TimeSpan . FromMinutes ( 5 ) , TimeSpan . FromMinutes ( 5 ) ) ;
107113 }
108114
109115 /// <summary>
@@ -175,6 +181,7 @@ public async Task StopAsync()
175181
176182 try
177183 {
184+ _messageIdCleanupTimer ? . Dispose ( ) ;
178185 await _discoveryService . StopAsync ( ) ;
179186 await _messageQueueService . DisconnectAsync ( ) ;
180187 await _redisService . DisconnectAsync ( ) ;
@@ -358,6 +365,33 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
358365 return true ;
359366 }
360367
368+ // Check if message is for this node / 检查消息是否发送给当前节点
369+ // If ToNodeId is null, it's a broadcast message - all nodes should process it
370+ // If ToNodeId is set, only the target node should process it
371+ // 如果 ToNodeId 为 null,这是广播消息 - 所有节点都应该处理
372+ // 如果 ToNodeId 已设置,只有目标节点应该处理
373+ if ( ! string . IsNullOrEmpty ( message . ToNodeId ) && message . ToNodeId != _nodeId )
374+ {
375+ // This message is for another node, ignore it / 此消息是发送给其他节点的,忽略它
376+ _logger . LogDebug ( $ "Ignoring message { message . MessageId } - target node is { message . ToNodeId } , current node is { _nodeId } ") ;
377+ return true ; // Ack to remove from queue / 确认以从队列中移除
378+ }
379+
380+ // Check for duplicate messages using message ID / 使用消息ID检查重复消息
381+ if ( ! string . IsNullOrEmpty ( message . MessageId ) )
382+ {
383+ // Check if we've already processed this message / 检查是否已处理过此消息
384+ if ( _processedMessageIds . TryGetValue ( message . MessageId , out var processedTime ) )
385+ {
386+ // Message already processed, ignore it / 消息已处理,忽略它
387+ _logger . LogDebug ( $ "Ignoring duplicate message { message . MessageId } (processed at { processedTime : yyyy-MM-dd HH:mm:ss} )") ;
388+ return true ; // Ack to remove from queue / 确认以从队列中移除
389+ }
390+
391+ // Mark message as processed / 标记消息为已处理
392+ _processedMessageIds . TryAdd ( message . MessageId , DateTime . UtcNow ) ;
393+ }
394+
361395 // Trigger message received event / 触发消息接收事件
362396 MessageReceived ? . Invoke ( this , new ClusterMessageEventArgs
363397 {
@@ -380,6 +414,43 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
380414 }
381415 }
382416
417+ /// <summary>
418+ /// Cleanup processed message IDs older than 10 minutes / 清理超过10分钟的已处理消息ID
419+ /// </summary>
420+ private void CleanupProcessedMessageIds ( object state )
421+ {
422+ if ( _disposed || _cancellationTokenSource . Token . IsCancellationRequested )
423+ return ;
424+
425+ try
426+ {
427+ var cutoffTime = DateTime . UtcNow . AddMinutes ( - 10 ) ;
428+ var keysToRemove = new List < string > ( ) ;
429+
430+ foreach ( var kvp in _processedMessageIds )
431+ {
432+ if ( kvp . Value < cutoffTime )
433+ {
434+ keysToRemove . Add ( kvp . Key ) ;
435+ }
436+ }
437+
438+ foreach ( var key in keysToRemove )
439+ {
440+ _processedMessageIds . TryRemove ( key , out _ ) ;
441+ }
442+
443+ if ( keysToRemove . Count > 0 )
444+ {
445+ _logger . LogDebug ( $ "Cleaned up { keysToRemove . Count } old message IDs from deduplication cache") ;
446+ }
447+ }
448+ catch ( Exception ex )
449+ {
450+ _logger . LogWarning ( ex , "Error cleaning up processed message IDs" ) ;
451+ }
452+ }
453+
383454 /// <summary>
384455 /// Handle node discovered event / 处理节点发现事件
385456 /// </summary>
@@ -555,6 +626,7 @@ public void Dispose()
555626 }
556627
557628 _discoveryService ? . Dispose ( ) ;
629+ _messageIdCleanupTimer ? . Dispose ( ) ;
558630 _cancellationTokenSource ? . Dispose ( ) ;
559631 }
560632 }
0 commit comments