Merged
Merge Request #22
·
created by
Activate Resubscribe only after a correct state-change
[Bug] If connecting and subscribing to a retained message, the message was received twice.
[Solution] Check for old and new state of the connection. Only call resubscribe when state changes from ::ConnectionFailure to ::Good. This prevents the calling of the re-subscribe in all other cases. Paho-C has the States defined, but not the state-transitions.
From
fix/pgroen/resubscribe_called_unnecessary
into
master
-
mentioned in commit 8660a680ecc4710e51716e241bf8433dfea331ce
-
Status changed to merged
Showing
3 changed files
examples/pub/main.cpp
examples/sub/main.cpp
@@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) | @@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) | ||
91 | while (loop_counter < MAX_LOOP_COUNT) | 91 | while (loop_counter < MAX_LOOP_COUNT) |
92 | { | 92 | { |
93 | sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. | 93 | sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. |
94 | - std::cout << "."; | ||
95 | loop_counter++; | 94 | loop_counter++; |
96 | } | 95 | } |
97 | 96 | ||
@@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) | @@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) | ||
101 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); | 100 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); |
102 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); | 101 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); |
103 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); | 102 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); |
103 | + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" ); | ||
104 | + | ||
105 | + // Start a loop to give the subscriber the possibility to do its work. | ||
106 | + while (loop_counter < MAX_LOOP_COUNT) | ||
107 | + { | ||
108 | + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. | ||
109 | + loop_counter++; | ||
110 | + } | ||
111 | + | ||
112 | + pSubscriber->subscribe("test/publisher/#"); | ||
104 | 113 | ||
105 | while (1) | 114 | while (1) |
106 | { | 115 | { |
107 | - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene. | ||
108 | - std::cout << "."; | 116 | + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene. |
109 | } | 117 | } |
110 | 118 | ||
111 | } | 119 | } |
src/mqttclient.cpp
@@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
431 | } | 431 | } |
432 | 432 | ||
433 | auto newState = determineState(connectionStates); | 433 | auto newState = determineState(connectionStates); |
434 | - // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); | ||
435 | - bool resubscribe = ( StateEnum::Good == newState ); | ||
436 | - if (resubscribe) | 434 | + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast<int>(m_serverState.state())))); |
435 | + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast<int>(newState)))); | ||
436 | + bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); | ||
437 | + // First activate pending subscriptions | ||
437 | { | 438 | { |
438 | - // First activate pending subscriptions | 439 | + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); |
440 | + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); | ||
441 | + while( m_deferredSubscriptions.size() > 0 ) | ||
439 | { | 442 | { |
440 | - OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); | ||
441 | - LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); | ||
442 | - while( m_deferredSubscriptions.size() > 0 ) | ||
443 | - { | ||
444 | - auto subscription = m_deferredSubscriptions.at( 0 ); | ||
445 | - this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); | ||
446 | - m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); | ||
447 | - } | 443 | + auto subscription = m_deferredSubscriptions.at( 0 ); |
444 | + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); | ||
445 | + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); | ||
448 | } | 446 | } |
447 | + } | ||
449 | 448 | ||
450 | - LogDebug( "[MqttClient::connectionStatusChanged]", | ||
451 | - std::string( m_clientId + " - Resubscribing..." ) ); | ||
452 | - { | ||
453 | - OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | ||
454 | - m_activeTokens.clear(); | ||
455 | - } | 449 | + LogDebug( "[MqttClient::connectionStatusChanged]", |
450 | + std::string( m_clientId + " - Resubscribing..." ) ); | ||
451 | + { | ||
452 | + OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | ||
453 | + m_activeTokens.clear(); | ||
454 | + } | ||
456 | 455 | ||
456 | + if (resubscribe) | ||
457 | + { | ||
457 | for (auto* cl : clients) | 458 | for (auto* cl : clients) |
458 | { | 459 | { |
459 | try | 460 | try |
@@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
471 | 472 | ||
472 | // The server state change and a possible resubscription are done in the context of the MqttClient worker thread | 473 | // The server state change and a possible resubscription are done in the context of the MqttClient worker thread |
473 | // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. | 474 | // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. |
474 | - this->pushEvent([this, resubscribe, clients, principalClient, newState]() { | ||
475 | - if (resubscribe) | ||
476 | - { | 475 | + this->pushEvent([this, resubscribe, clients, principalClient, newState]() |
476 | + { | ||
477 | + // if (resubscribe) | ||
478 | + // { | ||
477 | // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. | 479 | // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. |
478 | // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. | 480 | // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. |
479 | auto waitFor = std::chrono::milliseconds(1000); | 481 | auto waitFor = std::chrono::milliseconds(1000); |
@@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
501 | LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); | 503 | LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); |
502 | } | 504 | } |
503 | } | 505 | } |
504 | - } | 506 | + // } |
505 | m_serverState.emitStateChanged(newState); | 507 | m_serverState.emitStateChanged(newState); |
506 | }); | 508 | }); |
507 | } | 509 | } |