diff --git a/examples/pub/main.cpp b/examples/pub/main.cpp index bc91d51..559ef73 100644 --- a/examples/pub/main.cpp +++ b/examples/pub/main.cpp @@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) } messageNumber++; } - sleepcp( 1, T_SECONDS ); + sleepcp(100, T_MILLI); } } else diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index add7aff..339be32 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) while (loop_counter < MAX_LOOP_COUNT) { sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. - std::cout << "."; loop_counter++; } @@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" ); + + // Start a loop to give the subscriber the possibility to do its work. + while (loop_counter < MAX_LOOP_COUNT) + { + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. + loop_counter++; + } + + pSubscriber->subscribe("test/publisher/#"); while (1) { - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene. - std::cout << "."; + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene. } } diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index 42eafb8..2f7aef9 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -431,29 +431,30 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus } auto newState = determineState(connectionStates); - // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); - bool resubscribe = ( StateEnum::Good == newState ); - if (resubscribe) + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast(m_serverState.state())))); + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast(newState)))); + bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); + // First activate pending subscriptions { - // First activate pending subscriptions + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); + while( m_deferredSubscriptions.size() > 0 ) { - OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); - LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); - while( m_deferredSubscriptions.size() > 0 ) - { - auto subscription = m_deferredSubscriptions.at( 0 ); - this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); - m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); - } + auto subscription = m_deferredSubscriptions.at( 0 ); + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); } + } - LogDebug( "[MqttClient::connectionStatusChanged]", - std::string( m_clientId + " - Resubscribing..." ) ); - { - OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); - m_activeTokens.clear(); - } + LogDebug( "[MqttClient::connectionStatusChanged]", + std::string( m_clientId + " - Resubscribing..." ) ); + { + OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); + m_activeTokens.clear(); + } + if (resubscribe) + { for (auto* cl : clients) { try @@ -471,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus // The server state change and a possible resubscription are done in the context of the MqttClient worker thread // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. - this->pushEvent([this, resubscribe, clients, principalClient, newState]() { - if (resubscribe) - { + this->pushEvent([this, resubscribe, clients, principalClient, newState]() + { + // if (resubscribe) + // { // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. auto waitFor = std::chrono::milliseconds(1000); @@ -501,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); } } - } + // } m_serverState.emitStateChanged(newState); }); }