Closed
Merge Request #9 · created by Steven de Ridder


deferred_connection

promise is not working because it is reset to null in the connectFailure callback. removing the set_value() fixed the issue of subscription after promise.

Test:
start mosquitto broker and the publisher test on a remote desktop.
disable the internet connection on the subscribing desktop.
start the subscriber test on the subscribing desktop.

at this point, the mqtt client keeps returning to the connectFailure callback

enable the internet connection again.

at this point, the mqtt client returns to the connectSuccess callback. subscriptions are activated and data is coming in.


From feat/sridder/deferred_connection into master

Closed by Steven de Ridder

Changes were not merged into target branch

2 participants

src/clientpaho.cpp
... ... @@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
186 186 {
187 187 {
188 188 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
189   - if (ConnectionStatus::Disconnected != m_connectionStatus)
  189 + if( ConnectionStatus::Disconnected != m_connectionStatus )
190 190 {
191 191 return -1;
192 192 }
193   - setConnectionStatus(ConnectionStatus::ConnectInProgress);
  193 + setConnectionStatus( ConnectionStatus::ConnectInProgress );
194 194 }
195 195  
196 196 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
... ... @@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
215 215 }
216 216  
217 217  
218   - if (!m_username.empty())
  218 + if( !m_username.empty() )
219 219 {
220 220 conn_opts.username = m_username.c_str();
221 221 }
222 222  
223   - if (!m_password.empty())
  223 + if( !m_password.empty() )
224 224 {
225 225 conn_opts.password = m_password.c_str();
226 226 }
... ... @@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
228 228 std::promise<void> waitForConnectPromise{};
229 229 auto waitForConnect = waitForConnectPromise.get_future();
230 230 m_connectPromise.reset();
231   - if (wait)
  231 + if( wait )
232 232 {
233   - m_connectPromise = std::make_unique<std::promise<void>>(std::move(waitForConnectPromise));
  233 + m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
234 234 }
235 235  
236 236 {
237   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
238   - if (!m_pendingOperations.insert(-100).second)
  237 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
  238 + if( !m_pendingOperations.insert( -100 ).second )
239 239 {
240 240 // Write something
241 241 }
242   - m_operationResult.erase(-100);
  242 + m_operationResult.erase( -100 );
243 243 }
244 244  
245   - int rc = MQTTAsync_connect(m_client, &conn_opts);
246   - if (MQTTASYNC_SUCCESS != rc)
  245 + int rc = MQTTAsync_connect( m_client, &conn_opts );
  246 + if( MQTTASYNC_SUCCESS != rc )
247 247 {
248   - setConnectionStatus(ConnectionStatus::Disconnected);
249   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  248 + setConnectionStatus( ConnectionStatus::Disconnected );
  249 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
250 250 m_operationResult[-100] = false;
251 251 m_pendingOperations.erase(-100);
252 252 }
253 253  
254   - if (wait)
  254 + if( wait )
255 255 {
256 256 waitForConnect.get();
257 257 m_connectPromise.reset();
... ... @@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
306 306 m_operationResult[-200] = false;
307 307 m_pendingOperations.erase(-200);
308 308  
309   - if (MQTTASYNC_DISCONNECTED == rc) {
  309 + if (MQTTASYNC_DISCONNECTED == rc)
  310 + {
310 311 return -1;
311 312 }
312 313 // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
313 314 }
314 315  
315   - if (wait) {
  316 + if( wait )
  317 + {
316 318 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
317 319 {
318 320 // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);
... ... @@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos)
334 336 else if (ConnectionStatus::Disconnected == m_connectionStatus)
335 337 {
336 338 // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId);
  339 + connect( true );
337 340 }
338 341  
339 342 if (!isValidTopic(message.topic()))
... ... @@ -368,7 +371,8 @@ void ClientPaho::publishPending()
368 371 {
369 372 {
370 373 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
371   - if (!m_processPendingPublishes) {
  374 + if (!m_processPendingPublishes)
  375 + {
372 376 return;
373 377 }
374 378 }
... ... @@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response)
785 789 // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
786 790 {
787 791 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
788   - if (m_connectPromise) {
789   - m_connectPromise->set_value();
790   - }
791 792 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
792 793 m_operationResult[-100] = false;
793 794 m_pendingOperations.erase(-100);
... ...