Commit bbfaa0f8b285117af067929ee8bf08397df974c8

Authored by Peter M. Groen
2 parents b4d6ed56 2b967ea7

Merge branch 'feat/sridder/deferred_connection' into 'development'

deferred_connection

promise is not working because it is reset to null in the connectFailure callback.
removing the set_value() from the connectFailure callback fixed the issue of subscription after promise.

Test:<br>
start mosquitto broker and the publisher test on a remote desktop.<br>
disable the internet connection on the subscribing desktop.<br>
start the subscriber test on the subscribing desktop.<br><br>
<i> at this point, the mqtt client keeps returning to the connectFailure callback </i><br><br>
enable the internet connection again.<br><br>
<i> at this point, the mqtt client returns to the connectSuccess callback. subscriptions are activated and data is coming in.</i>

See merge request !10
Showing 1 changed file with 21 additions and 20 deletions
src/clientpaho.cpp
@@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
186 { 186 {
187 { 187 {
188 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 188 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
189 - if (ConnectionStatus::Disconnected != m_connectionStatus) 189 + if( ConnectionStatus::Disconnected != m_connectionStatus )
190 { 190 {
191 return -1; 191 return -1;
192 } 192 }
193 - setConnectionStatus(ConnectionStatus::ConnectInProgress); 193 + setConnectionStatus( ConnectionStatus::ConnectInProgress );
194 } 194 }
195 195
196 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; 196 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
@@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
215 } 215 }
216 216
217 217
218 - if (!m_username.empty()) 218 + if( !m_username.empty() )
219 { 219 {
220 conn_opts.username = m_username.c_str(); 220 conn_opts.username = m_username.c_str();
221 } 221 }
222 222
223 - if (!m_password.empty()) 223 + if( !m_password.empty() )
224 { 224 {
225 conn_opts.password = m_password.c_str(); 225 conn_opts.password = m_password.c_str();
226 } 226 }
@@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
228 std::promise<void> waitForConnectPromise{}; 228 std::promise<void> waitForConnectPromise{};
229 auto waitForConnect = waitForConnectPromise.get_future(); 229 auto waitForConnect = waitForConnectPromise.get_future();
230 m_connectPromise.reset(); 230 m_connectPromise.reset();
231 - if (wait) 231 + if( wait )
232 { 232 {
233 - m_connectPromise = std::make_unique<std::promise<void>>(std::move(waitForConnectPromise)); 233 + m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
234 } 234 }
235 235
236 { 236 {
237 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);  
238 - if (!m_pendingOperations.insert(-100).second) 237 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
  238 + if( !m_pendingOperations.insert( -100 ).second )
239 { 239 {
240 // Write something 240 // Write something
241 } 241 }
242 - m_operationResult.erase(-100); 242 + m_operationResult.erase( -100 );
243 } 243 }
244 244
245 - int rc = MQTTAsync_connect(m_client, &conn_opts);  
246 - if (MQTTASYNC_SUCCESS != rc) 245 + int rc = MQTTAsync_connect( m_client, &conn_opts );
  246 + if( MQTTASYNC_SUCCESS != rc )
247 { 247 {
248 - setConnectionStatus(ConnectionStatus::Disconnected);  
249 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 248 + setConnectionStatus( ConnectionStatus::Disconnected );
  249 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
250 m_operationResult[-100] = false; 250 m_operationResult[-100] = false;
251 m_pendingOperations.erase(-100); 251 m_pendingOperations.erase(-100);
252 } 252 }
253 253
254 - if (wait) 254 + if( wait )
255 { 255 {
256 waitForConnect.get(); 256 waitForConnect.get();
257 m_connectPromise.reset(); 257 m_connectPromise.reset();
@@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) @@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
306 m_operationResult[-200] = false; 306 m_operationResult[-200] = false;
307 m_pendingOperations.erase(-200); 307 m_pendingOperations.erase(-200);
308 308
309 - if (MQTTASYNC_DISCONNECTED == rc) { 309 + if (MQTTASYNC_DISCONNECTED == rc)
  310 + {
310 return -1; 311 return -1;
311 } 312 }
312 // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); 313 // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
313 } 314 }
314 315
315 - if (wait) { 316 + if( wait )
  317 + {
316 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) 318 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
317 { 319 {
318 // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); 320 // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);
@@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos) @@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos)
334 else if (ConnectionStatus::Disconnected == m_connectionStatus) 336 else if (ConnectionStatus::Disconnected == m_connectionStatus)
335 { 337 {
336 // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); 338 // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId);
  339 + connect( true );
337 } 340 }
338 341
339 if (!isValidTopic(message.topic())) 342 if (!isValidTopic(message.topic()))
@@ -368,7 +371,8 @@ void ClientPaho::publishPending() @@ -368,7 +371,8 @@ void ClientPaho::publishPending()
368 { 371 {
369 { 372 {
370 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 373 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
371 - if (!m_processPendingPublishes) { 374 + if (!m_processPendingPublishes)
  375 + {
372 return; 376 return;
373 } 377 }
374 } 378 }
@@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response) @@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response)
785 // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); 789 // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
786 { 790 {
787 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 791 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
788 - if (m_connectPromise) {  
789 - m_connectPromise->set_value();  
790 - }  
791 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); 792 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
792 m_operationResult[-100] = false; 793 m_operationResult[-100] = false;
793 m_pendingOperations.erase(-100); 794 m_pendingOperations.erase(-100);