Closed
Merge Request #9
·
created by
deferred_connection
promise is not working because it is reset to null in the connectFailure callback. removing the set_value() fixed the issue of subscription after promise.
Test:
start mosquitto broker and the publisher test on a remote desktop.
disable the internet connection on the subscribing desktop.
start the subscriber test on the subscribing desktop.
at this point, the mqtt client keeps returning to the connectFailure callback
enable the internet connection again.
at this point, the mqtt client returns to the connectSuccess callback. subscriptions are activated and data is coming in.
From
feat/sridder/deferred_connection
into
master
-
…k. removing the set_value() fixed the issue ofsubscription after promise.
Showing
1 changed file
src/clientpaho.cpp
@@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | @@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | ||
186 | { | 186 | { |
187 | { | 187 | { |
188 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 188 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
189 | - if (ConnectionStatus::Disconnected != m_connectionStatus) | 189 | + if( ConnectionStatus::Disconnected != m_connectionStatus ) |
190 | { | 190 | { |
191 | return -1; | 191 | return -1; |
192 | } | 192 | } |
193 | - setConnectionStatus(ConnectionStatus::ConnectInProgress); | 193 | + setConnectionStatus( ConnectionStatus::ConnectInProgress ); |
194 | } | 194 | } |
195 | 195 | ||
196 | MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; | 196 | MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; |
@@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | @@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | ||
215 | } | 215 | } |
216 | 216 | ||
217 | 217 | ||
218 | - if (!m_username.empty()) | 218 | + if( !m_username.empty() ) |
219 | { | 219 | { |
220 | conn_opts.username = m_username.c_str(); | 220 | conn_opts.username = m_username.c_str(); |
221 | } | 221 | } |
222 | 222 | ||
223 | - if (!m_password.empty()) | 223 | + if( !m_password.empty() ) |
224 | { | 224 | { |
225 | conn_opts.password = m_password.c_str(); | 225 | conn_opts.password = m_password.c_str(); |
226 | } | 226 | } |
@@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | @@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | ||
228 | std::promise<void> waitForConnectPromise{}; | 228 | std::promise<void> waitForConnectPromise{}; |
229 | auto waitForConnect = waitForConnectPromise.get_future(); | 229 | auto waitForConnect = waitForConnectPromise.get_future(); |
230 | m_connectPromise.reset(); | 230 | m_connectPromise.reset(); |
231 | - if (wait) | 231 | + if( wait ) |
232 | { | 232 | { |
233 | - m_connectPromise = std::make_unique<std::promise<void>>(std::move(waitForConnectPromise)); | 233 | + m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) ); |
234 | } | 234 | } |
235 | 235 | ||
236 | { | 236 | { |
237 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | ||
238 | - if (!m_pendingOperations.insert(-100).second) | 237 | + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); |
238 | + if( !m_pendingOperations.insert( -100 ).second ) | ||
239 | { | 239 | { |
240 | // Write something | 240 | // Write something |
241 | } | 241 | } |
242 | - m_operationResult.erase(-100); | 242 | + m_operationResult.erase( -100 ); |
243 | } | 243 | } |
244 | 244 | ||
245 | - int rc = MQTTAsync_connect(m_client, &conn_opts); | ||
246 | - if (MQTTASYNC_SUCCESS != rc) | 245 | + int rc = MQTTAsync_connect( m_client, &conn_opts ); |
246 | + if( MQTTASYNC_SUCCESS != rc ) | ||
247 | { | 247 | { |
248 | - setConnectionStatus(ConnectionStatus::Disconnected); | ||
249 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 248 | + setConnectionStatus( ConnectionStatus::Disconnected ); |
249 | + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); | ||
250 | m_operationResult[-100] = false; | 250 | m_operationResult[-100] = false; |
251 | m_pendingOperations.erase(-100); | 251 | m_pendingOperations.erase(-100); |
252 | } | 252 | } |
253 | 253 | ||
254 | - if (wait) | 254 | + if( wait ) |
255 | { | 255 | { |
256 | waitForConnect.get(); | 256 | waitForConnect.get(); |
257 | m_connectPromise.reset(); | 257 | m_connectPromise.reset(); |
@@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) | @@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) | ||
306 | m_operationResult[-200] = false; | 306 | m_operationResult[-200] = false; |
307 | m_pendingOperations.erase(-200); | 307 | m_pendingOperations.erase(-200); |
308 | 308 | ||
309 | - if (MQTTASYNC_DISCONNECTED == rc) { | 309 | + if (MQTTASYNC_DISCONNECTED == rc) |
310 | + { | ||
310 | return -1; | 311 | return -1; |
311 | } | 312 | } |
312 | // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); | 313 | // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); |
313 | } | 314 | } |
314 | 315 | ||
315 | - if (wait) { | 316 | + if( wait ) |
317 | + { | ||
316 | if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) | 318 | if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) |
317 | { | 319 | { |
318 | // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); | 320 | // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); |
@@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) | @@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) | ||
334 | else if (ConnectionStatus::Disconnected == m_connectionStatus) | 336 | else if (ConnectionStatus::Disconnected == m_connectionStatus) |
335 | { | 337 | { |
336 | // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); | 338 | // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); |
339 | + connect( true ); | ||
337 | } | 340 | } |
338 | 341 | ||
339 | if (!isValidTopic(message.topic())) | 342 | if (!isValidTopic(message.topic())) |
@@ -368,7 +371,8 @@ void ClientPaho::publishPending() | @@ -368,7 +371,8 @@ void ClientPaho::publishPending() | ||
368 | { | 371 | { |
369 | { | 372 | { |
370 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 373 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
371 | - if (!m_processPendingPublishes) { | 374 | + if (!m_processPendingPublishes) |
375 | + { | ||
372 | return; | 376 | return; |
373 | } | 377 | } |
374 | } | 378 | } |
@@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) | @@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) | ||
785 | // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); | 789 | // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); |
786 | { | 790 | { |
787 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 791 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
788 | - if (m_connectPromise) { | ||
789 | - m_connectPromise->set_value(); | ||
790 | - } | ||
791 | // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); | 792 | // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); |
792 | m_operationResult[-100] = false; | 793 | m_operationResult[-100] = false; |
793 | m_pendingOperations.erase(-100); | 794 | m_pendingOperations.erase(-100); |