From f1384b55e5f972aa1d8cda24fd859c5719ae8cc2 Mon Sep 17 00:00:00 2001 From: Peter M. Groen Date: Wed, 17 Jan 2024 11:05:55 +0100 Subject: [PATCH] Fix publishing --- examples/pub/main.cpp | 2 +- examples/sub/main.cpp | 14 +++++++++++--- src/mqttclient.cpp | 13 +++++++------ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/examples/pub/main.cpp b/examples/pub/main.cpp index bc91d51..559ef73 100644 --- a/examples/pub/main.cpp +++ b/examples/pub/main.cpp @@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) } messageNumber++; } - sleepcp( 1, T_SECONDS ); + sleepcp(100, T_MILLI); } } else diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index add7aff..339be32 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) while (loop_counter < MAX_LOOP_COUNT) { sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. - std::cout << "."; loop_counter++; } @@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" ); + + // Start a loop to give the subscriber the possibility to do its work. + while (loop_counter < MAX_LOOP_COUNT) + { + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. + loop_counter++; + } + + pSubscriber->subscribe("test/publisher/#"); while (1) { - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene. - std::cout << "."; + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene. } } diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index 2d1633d..2f7aef9 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -431,8 +431,8 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus } auto newState = determineState(connectionStates); - LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast(m_serverState.state()))); - LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast(newState))); + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast(m_serverState.state())))); + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast(newState)))); bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); // First activate pending subscriptions { @@ -472,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus // The server state change and a possible resubscription are done in the context of the MqttClient worker thread // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. - this->pushEvent([this, resubscribe, clients, principalClient, newState]() { - if (resubscribe) - { + this->pushEvent([this, resubscribe, clients, principalClient, newState]() + { + // if (resubscribe) + // { // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. auto waitFor = std::chrono::milliseconds(1000); @@ -502,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); } } - } + // } m_serverState.emitStateChanged(newState); }); } -- libgit2 0.21.4