-
Notifications
You must be signed in to change notification settings - Fork 1
[broker-30] Implement delivering retained messages #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentTopicTree.java
Outdated
Show resolved
Hide resolved
model/src/main/java/javasabr/mqtt/model/topic/tree/TopicMessageNode.java
Outdated
Show resolved
Hide resolved
# Conflicts: # model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentRetainedMessageTree.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/RetainedMessageNode.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java # model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java # model/src/main/java/javasabr/mqtt/model/subscribtion/tree/ConcurrentSubscriptionTree.java # model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterNode.java # model/src/main/java/javasabr/mqtt/model/subscribtion/tree/TopicFilterTreeBase.java # model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentTopicTree.java # model/src/main/java/javasabr/mqtt/model/topic/tree/TopicNode.java # model/src/main/java/javasabr/mqtt/model/topic/tree/TopicTreeBase.java # model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy # service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java
24664c4 to
01bac5b
Compare
# Conflicts: # application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java # core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java # core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandlerTest.groovy
# Conflicts: # core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java # core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishOutMessageHandler.java # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/PersistedMqttPublishOutMessageHandler.java # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java
# Conflicts: # core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishOutMessageHandler.java
| publishReceivingService, | ||
| messageOutFactoryService, | ||
| topicService); | ||
| return new PublishMqttInMessageHandler(publishReceivingService, messageOutFactoryService, topicService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong formating
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's after code reformatting with the latest code style from master https://github.com/JavaSaBr/settings/blob/master/intellij-idea/javasabr-intellij-codestyle.xml
Have you configured something else in code style?
| subscriptionService, | ||
| messageOutFactoryService, | ||
| topicService); | ||
| return new UnsubscribeMqttInMessageHandler(subscriptionService, messageOutFactoryService, topicService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what exactly is wrong?
| env.getProperty("mqtt.external.connection.max.string.length", int.class, MqttProperties.MAXIMUM_STRING_LENGTH), | ||
| env.getProperty("mqtt.external.connection.max.binary.size", int.class, MqttProperties.MAXIMUM_BINARY_SIZE), | ||
| env.getProperty("mqtt.external.connection.max.topic.levels", int.class, MqttProperties.MAXIMUM_TOPIC_LEVELS), | ||
| env.getProperty("mqtt.external.connection.min.keep.alive", int.class, MqttProperties.SERVER_KEEP_ALIVE_DEFAULT), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what exactly is wrong?
|
|
||
| PublishHandlingResult startDelivering(Publish publish, SingleSubscriber subscriber); | ||
|
|
||
| Array<PublishHandlingResult> deliverRetainedMessages(SingleSubscriber subscriber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PublishDeliveringService is responsible only for delivering a publish to a user. That is it.
I think you need to have a separated service like "RetainMessagesService" which will have a topic tree + with retain messages (instead of subscribers) and will use 'PublishDeliveringService' to deliver retain messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| public abstract class AbstractTrieNode<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like typo -> AbstractTreeNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually the name of a prefix tree: https://drstearns.github.io/tutorials/trie/
| return; | ||
| } | ||
| if (level == lastLevel) { | ||
| if (level == lastLevel || TopicFilter.MULTI_LEVEL_WILDCARD.equals(segment)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to have it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition allows us to merge the exactlyTopicMatch method with the following one
mqtt-broker/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java
Lines 97 to 102 in 294a6c6
| private void multiWildcardTopicMatch(MutableArray<SingleSubscriber> result) { | |
| SubscriberNode subscriberNode = childNode(TopicFilter.MULTI_LEVEL_WILDCARD); | |
| if (subscriberNode != null) { | |
| appendSubscribersTo(result, subscriberNode); | |
| } | |
| } |
I think in this case one method is better than the three, don't you think so?
| } else if (level < lastLevel) { | ||
| subscriberNode.matchesTo(level + 1, topicName, lastLevel, result); | ||
| } | ||
| collectMatchingSubscribers(topicName.segment(level), level, topicName, lastLevel, container); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you returned the old way which I specially removed :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a reduction of duplication, and it seems more readable to me
|
|
||
| if (existeQos.ordinal() < candidateQos.ordinal()) { | ||
| QoS existedQos = result | ||
| .get(found) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong formattimg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it seems like the following option has to be set to 1
<option name="METHOD_CALL_CHAIN_WRAP" value="2" />
https://github.com/JavaSaBr/settings/blob/28bafa79d458c1cbeb76d6c8e24980ce81a0c491/intellij-idea/javasabr-intellij-codestyle.xml#L80-L80
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same formatting in other places
mqtt-broker/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java
Lines 114 to 116 in 294a6c6
| Array<Subscription> subscriptions = session | |
| .activeSubscriptions() | |
| .subscriptions(); |
| return ArrayFactory.mutableArray(RetainedMessageNode.class); | ||
| } | ||
|
|
||
| final AtomicReference<@Nullable Publish> retainedMessage = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can have only 1 message per topic? I think here should be a list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, just a message per topic name, if I haven't messed anything up
please check yourself https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901104
4d8e497 to
57eeb77
Compare
Initial implementation of retain message feature
3 MQTT Control Packets
└ 3.3 PUBLISH – Publish message
└ 3.3.1 PUBLISH Fixed Header
└ 3.3.1.3 RETAIN
retainAsPublished=0then RETAIN flag is set to 0 in the forwarder publish message