Closed
Merge Request #9 · created by Steven de Ridder


deferred_connection

promise is not working because it is reset to null in the connectFailure callback. removing the set_value() fixed the issue of subscription after promise.

Test:
start mosquitto broker and the publisher test on a remote desktop.
disable the internet connection on the subscribing desktop.
start the subscriber test on the subscribing desktop.

at this point, the mqtt client keeps returning to the connectFailure callback

enable the internet connection again.

at this point, the mqtt client returns to the connectSuccess callback. subscriptions are activated and data is coming in.


From feat/sridder/deferred_connection into master

Closed by Steven de Ridder

Changes were not merged into target branch

2 participants

src/clientpaho.cpp
@@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) @@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
186 { 186 {
187 { 187 {
188 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 188 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
189 - if (ConnectionStatus::Disconnected != m_connectionStatus) 189 + if( ConnectionStatus::Disconnected != m_connectionStatus )
190 { 190 {
191 return -1; 191 return -1;
192 } 192 }
193 - setConnectionStatus(ConnectionStatus::ConnectInProgress); 193 + setConnectionStatus( ConnectionStatus::ConnectInProgress );
194 } 194 }
195 195
196 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; 196 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
@@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) @@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
215 } 215 }
216 216
217 217
218 - if (!m_username.empty()) 218 + if( !m_username.empty() )
219 { 219 {
220 conn_opts.username = m_username.c_str(); 220 conn_opts.username = m_username.c_str();
221 } 221 }
222 222
223 - if (!m_password.empty()) 223 + if( !m_password.empty() )
224 { 224 {
225 conn_opts.password = m_password.c_str(); 225 conn_opts.password = m_password.c_str();
226 } 226 }
@@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) @@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
228 std::promise<void> waitForConnectPromise{}; 228 std::promise<void> waitForConnectPromise{};
229 auto waitForConnect = waitForConnectPromise.get_future(); 229 auto waitForConnect = waitForConnectPromise.get_future();
230 m_connectPromise.reset(); 230 m_connectPromise.reset();
231 - if (wait) 231 + if( wait )
232 { 232 {
233 - m_connectPromise = std::make_unique<std::promise<void>>(std::move(waitForConnectPromise)); 233 + m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
234 } 234 }
235 235
236 { 236 {
237 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);  
238 - if (!m_pendingOperations.insert(-100).second) 237 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
  238 + if( !m_pendingOperations.insert( -100 ).second )
239 { 239 {
240 // Write something 240 // Write something
241 } 241 }
242 - m_operationResult.erase(-100); 242 + m_operationResult.erase( -100 );
243 } 243 }
244 244
245 - int rc = MQTTAsync_connect(m_client, &conn_opts);  
246 - if (MQTTASYNC_SUCCESS != rc) 245 + int rc = MQTTAsync_connect( m_client, &conn_opts );
  246 + if( MQTTASYNC_SUCCESS != rc )
247 { 247 {
248 - setConnectionStatus(ConnectionStatus::Disconnected);  
249 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 248 + setConnectionStatus( ConnectionStatus::Disconnected );
  249 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
250 m_operationResult[-100] = false; 250 m_operationResult[-100] = false;
251 m_pendingOperations.erase(-100); 251 m_pendingOperations.erase(-100);
252 } 252 }
253 253
254 - if (wait) 254 + if( wait )
255 { 255 {
256 waitForConnect.get(); 256 waitForConnect.get();
257 m_connectPromise.reset(); 257 m_connectPromise.reset();
@@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) @@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
306 m_operationResult[-200] = false; 306 m_operationResult[-200] = false;
307 m_pendingOperations.erase(-200); 307 m_pendingOperations.erase(-200);
308 308
309 - if (MQTTASYNC_DISCONNECTED == rc) { 309 + if (MQTTASYNC_DISCONNECTED == rc)
  310 + {
310 return -1; 311 return -1;
311 } 312 }
312 // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); 313 // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
313 } 314 }
314 315
315 - if (wait) { 316 + if( wait )
  317 + {
316 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) 318 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
317 { 319 {
318 // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); 320 // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);
@@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos) @@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos)
334 else if (ConnectionStatus::Disconnected == m_connectionStatus) 336 else if (ConnectionStatus::Disconnected == m_connectionStatus)
335 { 337 {
336 // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); 338 // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId);
  339 + connect( true );
337 } 340 }
338 341
339 if (!isValidTopic(message.topic())) 342 if (!isValidTopic(message.topic()))
@@ -368,7 +371,8 @@ void ClientPaho::publishPending() @@ -368,7 +371,8 @@ void ClientPaho::publishPending()
368 { 371 {
369 { 372 {
370 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 373 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
371 - if (!m_processPendingPublishes) { 374 + if (!m_processPendingPublishes)
  375 + {
372 return; 376 return;
373 } 377 }
374 } 378 }
@@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response) @@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response)
785 // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); 789 // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
786 { 790 {
787 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 791 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
788 - if (m_connectPromise) {  
789 - m_connectPromise->set_value();  
790 - }  
791 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); 792 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
792 m_operationResult[-100] = false; 793 m_operationResult[-100] = false;
793 m_pendingOperations.erase(-100); 794 m_pendingOperations.erase(-100);