Commit 2b967ea77c223a95c9a23349d2c781c2170eb1bd
1 parent
b4d6ed56
promise is not working because it is reset to null in the connectFailure callbac…
…k. removing the set_value() fixed the issue ofsubscription after promise.
Showing
1 changed file
with
21 additions
and
20 deletions
src/clientpaho.cpp
... | ... | @@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
186 | 186 | { |
187 | 187 | { |
188 | 188 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
189 | - if (ConnectionStatus::Disconnected != m_connectionStatus) | |
189 | + if( ConnectionStatus::Disconnected != m_connectionStatus ) | |
190 | 190 | { |
191 | 191 | return -1; |
192 | 192 | } |
193 | - setConnectionStatus(ConnectionStatus::ConnectInProgress); | |
193 | + setConnectionStatus( ConnectionStatus::ConnectInProgress ); | |
194 | 194 | } |
195 | 195 | |
196 | 196 | MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; |
... | ... | @@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
215 | 215 | } |
216 | 216 | |
217 | 217 | |
218 | - if (!m_username.empty()) | |
218 | + if( !m_username.empty() ) | |
219 | 219 | { |
220 | 220 | conn_opts.username = m_username.c_str(); |
221 | 221 | } |
222 | 222 | |
223 | - if (!m_password.empty()) | |
223 | + if( !m_password.empty() ) | |
224 | 224 | { |
225 | 225 | conn_opts.password = m_password.c_str(); |
226 | 226 | } |
... | ... | @@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
228 | 228 | std::promise<void> waitForConnectPromise{}; |
229 | 229 | auto waitForConnect = waitForConnectPromise.get_future(); |
230 | 230 | m_connectPromise.reset(); |
231 | - if (wait) | |
231 | + if( wait ) | |
232 | 232 | { |
233 | - m_connectPromise = std::make_unique<std::promise<void>>(std::move(waitForConnectPromise)); | |
233 | + m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) ); | |
234 | 234 | } |
235 | 235 | |
236 | 236 | { |
237 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | |
238 | - if (!m_pendingOperations.insert(-100).second) | |
237 | + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); | |
238 | + if( !m_pendingOperations.insert( -100 ).second ) | |
239 | 239 | { |
240 | 240 | // Write something |
241 | 241 | } |
242 | - m_operationResult.erase(-100); | |
242 | + m_operationResult.erase( -100 ); | |
243 | 243 | } |
244 | 244 | |
245 | - int rc = MQTTAsync_connect(m_client, &conn_opts); | |
246 | - if (MQTTASYNC_SUCCESS != rc) | |
245 | + int rc = MQTTAsync_connect( m_client, &conn_opts ); | |
246 | + if( MQTTASYNC_SUCCESS != rc ) | |
247 | 247 | { |
248 | - setConnectionStatus(ConnectionStatus::Disconnected); | |
249 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | |
248 | + setConnectionStatus( ConnectionStatus::Disconnected ); | |
249 | + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); | |
250 | 250 | m_operationResult[-100] = false; |
251 | 251 | m_pendingOperations.erase(-100); |
252 | 252 | } |
253 | 253 | |
254 | - if (wait) | |
254 | + if( wait ) | |
255 | 255 | { |
256 | 256 | waitForConnect.get(); |
257 | 257 | m_connectPromise.reset(); |
... | ... | @@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) |
306 | 306 | m_operationResult[-200] = false; |
307 | 307 | m_pendingOperations.erase(-200); |
308 | 308 | |
309 | - if (MQTTASYNC_DISCONNECTED == rc) { | |
309 | + if (MQTTASYNC_DISCONNECTED == rc) | |
310 | + { | |
310 | 311 | return -1; |
311 | 312 | } |
312 | 313 | // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); |
313 | 314 | } |
314 | 315 | |
315 | - if (wait) { | |
316 | + if( wait ) | |
317 | + { | |
316 | 318 | if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) |
317 | 319 | { |
318 | 320 | // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); |
... | ... | @@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) |
334 | 336 | else if (ConnectionStatus::Disconnected == m_connectionStatus) |
335 | 337 | { |
336 | 338 | // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); |
339 | + connect( true ); | |
337 | 340 | } |
338 | 341 | |
339 | 342 | if (!isValidTopic(message.topic())) |
... | ... | @@ -368,7 +371,8 @@ void ClientPaho::publishPending() |
368 | 371 | { |
369 | 372 | { |
370 | 373 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
371 | - if (!m_processPendingPublishes) { | |
374 | + if (!m_processPendingPublishes) | |
375 | + { | |
372 | 376 | return; |
373 | 377 | } |
374 | 378 | } |
... | ... | @@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) |
785 | 789 | // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); |
786 | 790 | { |
787 | 791 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
788 | - if (m_connectPromise) { | |
789 | - m_connectPromise->set_value(); | |
790 | - } | |
791 | 792 | // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); |
792 | 793 | m_operationResult[-100] = false; |
793 | 794 | m_pendingOperations.erase(-100); | ... | ... |