Merged
Merge Request #22
·
created by
Activate Resubscribe only after a correct state-change
[Bug] If connecting and subscribing to a retained message, the message was received twice.
[Solution] Check for old and new state of the connection. Only call resubscribe when state changes from ::ConnectionFailure to ::Good. This prevents the calling of the re-subscribe in all other cases. Paho-C has the States defined, but not the state-transitions.
From
fix/pgroen/resubscribe_called_unnecessary
into
master
-
mentioned in commit 8660a680ecc4710e51716e241bf8433dfea331ce
-
Status changed to merged
Showing
3 changed files
examples/pub/main.cpp
examples/sub/main.cpp
... | ... | @@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) |
91 | 91 | while (loop_counter < MAX_LOOP_COUNT) |
92 | 92 | { |
93 | 93 | sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. |
94 | - std::cout << "."; | |
95 | 94 | loop_counter++; |
96 | 95 | } |
97 | 96 | |
... | ... | @@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) |
101 | 100 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); |
102 | 101 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); |
103 | 102 | pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); |
103 | + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" ); | |
104 | + | |
105 | + // Start a loop to give the subscriber the possibility to do its work. | |
106 | + while (loop_counter < MAX_LOOP_COUNT) | |
107 | + { | |
108 | + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. | |
109 | + loop_counter++; | |
110 | + } | |
111 | + | |
112 | + pSubscriber->subscribe("test/publisher/#"); | |
104 | 113 | |
105 | 114 | while (1) |
106 | 115 | { |
107 | - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene. | |
108 | - std::cout << "."; | |
116 | + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene. | |
109 | 117 | } |
110 | 118 | |
111 | 119 | } | ... | ... |
src/mqttclient.cpp
... | ... | @@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus |
431 | 431 | } |
432 | 432 | |
433 | 433 | auto newState = determineState(connectionStates); |
434 | - // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); | |
435 | - bool resubscribe = ( StateEnum::Good == newState ); | |
436 | - if (resubscribe) | |
434 | + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast<int>(m_serverState.state())))); | |
435 | + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast<int>(newState)))); | |
436 | + bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); | |
437 | + // First activate pending subscriptions | |
437 | 438 | { |
438 | - // First activate pending subscriptions | |
439 | + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); | |
440 | + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); | |
441 | + while( m_deferredSubscriptions.size() > 0 ) | |
439 | 442 | { |
440 | - OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); | |
441 | - LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); | |
442 | - while( m_deferredSubscriptions.size() > 0 ) | |
443 | - { | |
444 | - auto subscription = m_deferredSubscriptions.at( 0 ); | |
445 | - this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); | |
446 | - m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); | |
447 | - } | |
443 | + auto subscription = m_deferredSubscriptions.at( 0 ); | |
444 | + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); | |
445 | + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); | |
448 | 446 | } |
447 | + } | |
449 | 448 | |
450 | - LogDebug( "[MqttClient::connectionStatusChanged]", | |
451 | - std::string( m_clientId + " - Resubscribing..." ) ); | |
452 | - { | |
453 | - OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | |
454 | - m_activeTokens.clear(); | |
455 | - } | |
449 | + LogDebug( "[MqttClient::connectionStatusChanged]", | |
450 | + std::string( m_clientId + " - Resubscribing..." ) ); | |
451 | + { | |
452 | + OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | |
453 | + m_activeTokens.clear(); | |
454 | + } | |
456 | 455 | |
456 | + if (resubscribe) | |
457 | + { | |
457 | 458 | for (auto* cl : clients) |
458 | 459 | { |
459 | 460 | try |
... | ... | @@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus |
471 | 472 | |
472 | 473 | // The server state change and a possible resubscription are done in the context of the MqttClient worker thread |
473 | 474 | // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. |
474 | - this->pushEvent([this, resubscribe, clients, principalClient, newState]() { | |
475 | - if (resubscribe) | |
476 | - { | |
475 | + this->pushEvent([this, resubscribe, clients, principalClient, newState]() | |
476 | + { | |
477 | + // if (resubscribe) | |
478 | + // { | |
477 | 479 | // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. |
478 | 480 | // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. |
479 | 481 | auto waitFor = std::chrono::milliseconds(1000); |
... | ... | @@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus |
501 | 503 | LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); |
502 | 504 | } |
503 | 505 | } |
504 | - } | |
506 | + // } | |
505 | 507 | m_serverState.emitStateChanged(newState); |
506 | 508 | }); |
507 | 509 | } | ... | ... |