Merged
Merge Request #22 · created by Peter M. Groen


Activate Resubscribe only after a correct state-change

[Bug] If connecting and subscribing to a retained message, the message was received twice.

[Solution] Check for old and new state of the connection. Only call resubscribe when state changes from ::ConnectionFailure to ::Good. This prevents the calling of the re-subscribe in all other cases. Paho-C has the States defined, but not the state-transitions.


From fix/pgroen/resubscribe_called_unnecessary into master

Merged by Peter M. Groen

Source branch has been removed
1 participants

examples/pub/main.cpp
... ... @@ -94,7 +94,7 @@ int main( int argc, char* argv[] )
94 94 }
95 95 messageNumber++;
96 96 }
97   - sleepcp( 1, T_SECONDS );
  97 + sleepcp(100, T_MILLI);
98 98 }
99 99 }
100 100 else
... ...
examples/sub/main.cpp
... ... @@ -91,7 +91,6 @@ int main( int argc, char* argv[] )
91 91 while (loop_counter < MAX_LOOP_COUNT)
92 92 {
93 93 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
94   - std::cout << ".";
95 94 loop_counter++;
96 95 }
97 96  
... ... @@ -101,11 +100,20 @@ int main( int argc, char* argv[] )
101 100 pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" );
102 101 pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" );
103 102 pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" );
  103 + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" );
  104 +
  105 + // Start a loop to give the subscriber the possibility to do its work.
  106 + while (loop_counter < MAX_LOOP_COUNT)
  107 + {
  108 + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
  109 + loop_counter++;
  110 + }
  111 +
  112 + pSubscriber->subscribe("test/publisher/#");
104 113  
105 114 while (1)
106 115 {
107   - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene.
108   - std::cout << ".";
  116 + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene.
109 117 }
110 118  
111 119 }
... ...
src/mqttclient.cpp
... ... @@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
431 431 }
432 432  
433 433 auto newState = determineState(connectionStates);
434   - // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
435   - bool resubscribe = ( StateEnum::Good == newState );
436   - if (resubscribe)
  434 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast<int>(m_serverState.state()))));
  435 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast<int>(newState))));
  436 + bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
  437 + // First activate pending subscriptions
437 438 {
438   - // First activate pending subscriptions
  439 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  440 + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) );
  441 + while( m_deferredSubscriptions.size() > 0 )
439 442 {
440   - OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
441   - LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) );
442   - while( m_deferredSubscriptions.size() > 0 )
443   - {
444   - auto subscription = m_deferredSubscriptions.at( 0 );
445   - this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() );
446   - m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() );
447   - }
  443 + auto subscription = m_deferredSubscriptions.at( 0 );
  444 + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() );
  445 + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() );
448 446 }
  447 + }
449 448  
450   - LogDebug( "[MqttClient::connectionStatusChanged]",
451   - std::string( m_clientId + " - Resubscribing..." ) );
452   - {
453   - OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
454   - m_activeTokens.clear();
455   - }
  449 + LogDebug( "[MqttClient::connectionStatusChanged]",
  450 + std::string( m_clientId + " - Resubscribing..." ) );
  451 + {
  452 + OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
  453 + m_activeTokens.clear();
  454 + }
456 455  
  456 + if (resubscribe)
  457 + {
457 458 for (auto* cl : clients)
458 459 {
459 460 try
... ... @@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
471 472  
472 473 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread
473 474 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions.
474   - this->pushEvent([this, resubscribe, clients, principalClient, newState]() {
475   - if (resubscribe)
476   - {
  475 + this->pushEvent([this, resubscribe, clients, principalClient, newState]()
  476 + {
  477 + // if (resubscribe)
  478 + // {
477 479 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens.
478 480 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread.
479 481 auto waitFor = std::chrono::milliseconds(1000);
... ... @@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
501 503 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
502 504 }
503 505 }
504   - }
  506 + // }
505 507 m_serverState.emitStateChanged(newState);
506 508 });
507 509 }
... ...