Commit 8660a680ecc4710e51716e241bf8433dfea331ce

Authored by Peter M. Groen
2 parents 6f2da92f f1384b55

Merge branch 'fix/pgroen/resubscribe_called_unnecessary' into 'master'

Activate Resubscribe only after a correct state-change

[Bug]
If connecting and subscribing to a retained message, the message was received twice.

[Solution]
Check for old and new state of the connection. Only call resubscribe when state changes from ::ConnectionFailure to ::Good. This prevents the calling of the re-subscribe in all other cases. Paho-C has the States defined, but not the state-transitions.

See merge request !22
examples/pub/main.cpp
@@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) @@ -94,7 +94,7 @@ int main( int argc, char* argv[] )
94 } 94 }
95 messageNumber++; 95 messageNumber++;
96 } 96 }
97 - sleepcp( 1, T_SECONDS ); 97 + sleepcp(100, T_MILLI);
98 } 98 }
99 } 99 }
100 else 100 else
examples/sub/main.cpp
@@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) @@ -91,7 +91,6 @@ int main( int argc, char* argv[] )
91 while (loop_counter < MAX_LOOP_COUNT) 91 while (loop_counter < MAX_LOOP_COUNT)
92 { 92 {
93 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. 93 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
94 - std::cout << ".";  
95 loop_counter++; 94 loop_counter++;
96 } 95 }
97 96
@@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) @@ -101,11 +100,20 @@ int main( int argc, char* argv[] )
101 pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); 100 pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" );
102 pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); 101 pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" );
103 pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); 102 pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" );
  103 + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" );
  104 +
  105 + // Start a loop to give the subscriber the possibility to do its work.
  106 + while (loop_counter < MAX_LOOP_COUNT)
  107 + {
  108 + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
  109 + loop_counter++;
  110 + }
  111 +
  112 + pSubscriber->subscribe("test/publisher/#");
104 113
105 while (1) 114 while (1)
106 { 115 {
107 - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene.  
108 - std::cout << "."; 116 + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene.
109 } 117 }
110 118
111 } 119 }
src/mqttclient.cpp
@@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
431 } 431 }
432 432
433 auto newState = determineState(connectionStates); 433 auto newState = determineState(connectionStates);
434 - // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);  
435 - bool resubscribe = ( StateEnum::Good == newState );  
436 - if (resubscribe) 434 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast<int>(m_serverState.state()))));
  435 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast<int>(newState))));
  436 + bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
  437 + // First activate pending subscriptions
437 { 438 {
438 - // First activate pending subscriptions 439 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  440 + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) );
  441 + while( m_deferredSubscriptions.size() > 0 )
439 { 442 {
440 - OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);  
441 - LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) );  
442 - while( m_deferredSubscriptions.size() > 0 )  
443 - {  
444 - auto subscription = m_deferredSubscriptions.at( 0 );  
445 - this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() );  
446 - m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() );  
447 - } 443 + auto subscription = m_deferredSubscriptions.at( 0 );
  444 + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() );
  445 + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() );
448 } 446 }
  447 + }
449 448
450 - LogDebug( "[MqttClient::connectionStatusChanged]",  
451 - std::string( m_clientId + " - Resubscribing..." ) );  
452 - {  
453 - OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);  
454 - m_activeTokens.clear();  
455 - } 449 + LogDebug( "[MqttClient::connectionStatusChanged]",
  450 + std::string( m_clientId + " - Resubscribing..." ) );
  451 + {
  452 + OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
  453 + m_activeTokens.clear();
  454 + }
456 455
  456 + if (resubscribe)
  457 + {
457 for (auto* cl : clients) 458 for (auto* cl : clients)
458 { 459 {
459 try 460 try
@@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
471 472
472 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread 473 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread
473 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. 474 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions.
474 - this->pushEvent([this, resubscribe, clients, principalClient, newState]() {  
475 - if (resubscribe)  
476 - { 475 + this->pushEvent([this, resubscribe, clients, principalClient, newState]()
  476 + {
  477 + // if (resubscribe)
  478 + // {
477 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. 479 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens.
478 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. 480 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread.
479 auto waitFor = std::chrono::milliseconds(1000); 481 auto waitFor = std::chrono::milliseconds(1000);
@@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
501 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); 503 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
502 } 504 }
503 } 505 }
504 - } 506 + // }
505 m_serverState.emitStateChanged(newState); 507 m_serverState.emitStateChanged(newState);
506 }); 508 });
507 } 509 }