Commit f1384b55e5f972aa1d8cda24fd859c5719ae8cc2

Authored by Peter M. Groen
1 parent 3fef3f83

Fix publishing

examples/pub/main.cpp
@@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) @@ -94,7 +94,7 @@ int main( int argc, char* argv[] )
94 } 94 }
95 messageNumber++; 95 messageNumber++;
96 } 96 }
97 - sleepcp( 1, T_SECONDS ); 97 + sleepcp(100, T_MILLI);
98 } 98 }
99 } 99 }
100 else 100 else
examples/sub/main.cpp
@@ -91,7 +91,6 @@ int main( int argc, char* argv[] ) @@ -91,7 +91,6 @@ int main( int argc, char* argv[] )
91 while (loop_counter < MAX_LOOP_COUNT) 91 while (loop_counter < MAX_LOOP_COUNT)
92 { 92 {
93 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. 93 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
94 - std::cout << ".";  
95 loop_counter++; 94 loop_counter++;
96 } 95 }
97 96
@@ -101,11 +100,20 @@ int main( int argc, char* argv[] ) @@ -101,11 +100,20 @@ int main( int argc, char* argv[] )
101 pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); 100 pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" );
102 pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); 101 pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" );
103 pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); 102 pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" );
  103 + pSubscriber->unsubscribe( "test/publisher/TestPublisher_3" );
  104 +
  105 + // Start a loop to give the subscriber the possibility to do its work.
  106 + while (loop_counter < MAX_LOOP_COUNT)
  107 + {
  108 + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
  109 + loop_counter++;
  110 + }
  111 +
  112 + pSubscriber->subscribe("test/publisher/#");
104 113
105 while (1) 114 while (1)
106 { 115 {
107 - sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene.  
108 - std::cout << "."; 116 + sleepcp(1, T_MILLI); // Sleep 1 mSec to give the scheduler the change to interfene.
109 } 117 }
110 118
111 } 119 }
src/mqttclient.cpp
@@ -431,8 +431,8 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -431,8 +431,8 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
431 } 431 }
432 432
433 auto newState = determineState(connectionStates); 433 auto newState = determineState(connectionStates);
434 - LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast<int>(m_serverState.state())));  
435 - LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast<int>(newState))); 434 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("Old state : " + std::to_string(static_cast<int>(m_serverState.state()))));
  435 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string("New state : " + std::to_string(static_cast<int>(newState))));
436 bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); 436 bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
437 // First activate pending subscriptions 437 // First activate pending subscriptions
438 { 438 {
@@ -472,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -472,9 +472,10 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
472 472
473 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread 473 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread
474 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. 474 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions.
475 - this->pushEvent([this, resubscribe, clients, principalClient, newState]() {  
476 - if (resubscribe)  
477 - { 475 + this->pushEvent([this, resubscribe, clients, principalClient, newState]()
  476 + {
  477 + // if (resubscribe)
  478 + // {
478 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. 479 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens.
479 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. 480 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread.
480 auto waitFor = std::chrono::milliseconds(1000); 481 auto waitFor = std::chrono::milliseconds(1000);
@@ -502,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -502,7 +503,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
502 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); 503 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
503 } 504 }
504 } 505 }
505 - } 506 + // }
506 m_serverState.emitStateChanged(newState); 507 m_serverState.emitStateChanged(newState);
507 }); 508 });
508 } 509 }