-
mentioned in commit 8310c84d887a29f753b76a92519af671be024367
-
Status changed to merged
Showing
6 changed files
examples/sub/main.cpp
@@ -76,7 +76,8 @@ int main( int argc, char* argv[] ) | @@ -76,7 +76,8 @@ int main( int argc, char* argv[] ) | ||
76 | { | 76 | { |
77 | std::cout << "[OK]" << std::endl; | 77 | std::cout << "[OK]" << std::endl; |
78 | std::cout << "Connecting to the test-broker : " << std::endl; | 78 | std::cout << "Connecting to the test-broker : " << std::endl; |
79 | - pSubscriber->connect( "localhost", 1883, "", "" ); | 79 | + pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." ); |
80 | + | ||
80 | std::cout << "Subscribing to the test-topic....." << std::endl; | 81 | std::cout << "Subscribing to the test-topic....." << std::endl; |
81 | pSubscriber->subscribe( "test/publisher/#" ); | 82 | pSubscriber->subscribe( "test/publisher/#" ); |
82 | 83 | ||
@@ -84,6 +85,7 @@ int main( int argc, char* argv[] ) | @@ -84,6 +85,7 @@ int main( int argc, char* argv[] ) | ||
84 | while( 1 ) | 85 | while( 1 ) |
85 | { | 86 | { |
86 | sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. | 87 | sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. |
88 | + std::cout << "."; | ||
87 | } | 89 | } |
88 | } | 90 | } |
89 | else | 91 | else |
examples/sub/subscriber.cpp
@@ -24,11 +24,25 @@ | @@ -24,11 +24,25 @@ | ||
24 | #include <iostream> | 24 | #include <iostream> |
25 | 25 | ||
26 | Subscriber::Subscriber( const std::string &client_id ) | 26 | Subscriber::Subscriber( const std::string &client_id ) |
27 | - : MqttSubscriberBase( client_id ) | 27 | + : m_mqtt_client( client_id ) |
28 | { | 28 | { |
29 | 29 | ||
30 | } | 30 | } |
31 | 31 | ||
32 | +void Subscriber::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message ) | ||
33 | +{ | ||
34 | + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) ); | ||
35 | + std::cout << "Client state : " << m_mqtt_client.state() << std::endl; | ||
36 | +} | ||
37 | + | ||
38 | +void Subscriber::subscribe( const std::string &message_topic ) | ||
39 | +{ | ||
40 | + m_mqtt_client.subscribe( message_topic, 1, [this](const osdev::components::mqtt::MqttMessage &message ) | ||
41 | + { | ||
42 | + this->receive_data( message.topic(), message.payload() ); | ||
43 | + }); | ||
44 | +} | ||
45 | + | ||
32 | void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) | 46 | void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) |
33 | { | 47 | { |
34 | std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; | 48 | std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; |
examples/sub/subscriber.h
@@ -22,19 +22,29 @@ | @@ -22,19 +22,29 @@ | ||
22 | #pragma once | 22 | #pragma once |
23 | 23 | ||
24 | // std | 24 | // std |
25 | +#include <memory> | ||
25 | #include <string> | 26 | #include <string> |
26 | 27 | ||
27 | // mqtt-cpp | 28 | // mqtt-cpp |
28 | -#include "mqttsubscriberbase.h" | 29 | +#include "mqttclient.h" |
30 | +#include "compat-c++14.h" | ||
29 | 31 | ||
30 | -class Subscriber : public MqttSubscriberBase | 32 | +class Subscriber |
31 | { | 33 | { |
32 | public: | 34 | public: |
33 | - Subscriber( const std::string &client_id ); | 35 | + Subscriber(const std::string &client_id); |
34 | 36 | ||
35 | virtual ~Subscriber() {} | 37 | virtual ~Subscriber() {} |
36 | 38 | ||
39 | + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string(), | ||
40 | + const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() ); | ||
41 | + | ||
42 | + void subscribe( const std::string &message_topic ); | ||
43 | + | ||
37 | protected: | 44 | protected: |
38 | - void receive_data( const std::string &message_topic, const std::string &message_payload ); | 45 | + void receive_data( const std::string &message_topic, const std::string &message_payload ); |
46 | + | ||
47 | +private: | ||
48 | + osdev::components::mqtt::MqttClient m_mqtt_client; | ||
39 | 49 | ||
40 | }; | 50 | }; |
include/mqttclient.h
@@ -42,6 +42,26 @@ namespace osdev { | @@ -42,6 +42,26 @@ namespace osdev { | ||
42 | namespace components { | 42 | namespace components { |
43 | namespace mqtt { | 43 | namespace mqtt { |
44 | 44 | ||
45 | +// Internal structure for storing subscriptions until a valid connection becomes available. | ||
46 | +class Subscription | ||
47 | +{ | ||
48 | +public: | ||
49 | + Subscription( const std::string &topic, int qos, const std::function<void(MqttMessage)> call_back ) | ||
50 | + : m_topic( topic ) | ||
51 | + , m_qos( qos ) | ||
52 | + , m_call_back(call_back ) | ||
53 | + {} | ||
54 | + | ||
55 | + std::string getTopic() const { return m_topic; } | ||
56 | + int getQoS() const { return m_qos; } | ||
57 | + std::function<void(MqttMessage)> getCallBack() { return m_call_back; } | ||
58 | + | ||
59 | +private: | ||
60 | + std::string m_topic; | ||
61 | + int m_qos; | ||
62 | + std::function<void(MqttMessage)> m_call_back; | ||
63 | +}; | ||
64 | + | ||
45 | // Forward definition | 65 | // Forward definition |
46 | class IMqttClientImpl; | 66 | class IMqttClientImpl; |
47 | 67 | ||
@@ -215,6 +235,7 @@ private: | @@ -215,6 +235,7 @@ private: | ||
215 | 235 | ||
216 | mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive | 236 | mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive |
217 | mutable std::mutex m_internalMutex; ///< Protect the internal state. | 237 | mutable std::mutex m_internalMutex; ///< Protect the internal state. |
238 | + mutable std::mutex m_subscriptionMutex; ///< Protect the deferred Subscription Buffer | ||
218 | std::string m_endpoint; ///< The endpoint uri. | 239 | std::string m_endpoint; ///< The endpoint uri. |
219 | std::string m_clientId; ///< The main client identification. | 240 | std::string m_clientId; ///< The main client identification. |
220 | std::set<Token> m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens. | 241 | std::set<Token> m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens. |
@@ -225,6 +246,7 @@ private: | @@ -225,6 +246,7 @@ private: | ||
225 | std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients; ///< A vector of additional wrapper clients. | 246 | std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients; ///< A vector of additional wrapper clients. |
226 | SynchronizedQueue<std::function<void()>> m_eventQueue; ///< Synchronized queue for scheduling additional work. | 247 | SynchronizedQueue<std::function<void()>> m_eventQueue; ///< Synchronized queue for scheduling additional work. |
227 | std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads. | 248 | std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads. |
249 | + std::vector<Subscription> m_deferredSubscriptions; ///< A buffer to store subscription requests until the principal client comes online | ||
228 | }; | 250 | }; |
229 | 251 | ||
230 | } // End namespace mqtt | 252 | } // End namespace mqtt |
src/clientpaho.cpp
@@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std | @@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std | ||
425 | { | 425 | { |
426 | if( ConnectionStatus::Connected != m_connectionStatus ) | 426 | if( ConnectionStatus::Connected != m_connectionStatus ) |
427 | { | 427 | { |
428 | - // MqttException, "Not connected" | 428 | + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) ); |
429 | } | 429 | } |
430 | 430 | ||
431 | if( !isValidTopic( topic ) ) | 431 | if( !isValidTopic( topic ) ) |
432 | { | 432 | { |
433 | // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); | 433 | // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); |
434 | + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) ); | ||
435 | + return -1; | ||
434 | } | 436 | } |
435 | 437 | ||
436 | if( qos > 2 ) | 438 | if( qos > 2 ) |
@@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std | @@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std | ||
477 | if( isOverlappingInternal( topic, existingTopic ) ) | 479 | if( isOverlappingInternal( topic, existingTopic ) ) |
478 | { | 480 | { |
479 | // (OverlappingTopicException, "overlapping topic", existingTopic, topic); | 481 | // (OverlappingTopicException, "overlapping topic", existingTopic, topic); |
482 | + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) ); | ||
480 | } | 483 | } |
481 | 484 | ||
482 | LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); | 485 | LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); |
@@ -677,7 +680,7 @@ std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) | @@ -677,7 +680,7 @@ std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) | ||
677 | 680 | ||
678 | if( !m_pendingOperations.insert( opts.token ).second ) | 681 | if( !m_pendingOperations.insert( opts.token ).second ) |
679 | { | 682 | { |
680 | - LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); | 683 | + // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); |
681 | } | 684 | } |
682 | m_operationResult.erase( opts.token ); | 685 | m_operationResult.erase( opts.token ); |
683 | return opts.token; | 686 | return opts.token; |
@@ -715,10 +718,12 @@ std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) | @@ -715,10 +718,12 @@ std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) | ||
715 | 718 | ||
716 | void ClientPaho::setConnectionStatus( ConnectionStatus status ) | 719 | void ClientPaho::setConnectionStatus( ConnectionStatus status ) |
717 | { | 720 | { |
721 | + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) ); | ||
718 | ConnectionStatus curStatus = m_connectionStatus; | 722 | ConnectionStatus curStatus = m_connectionStatus; |
719 | m_connectionStatus = status; | 723 | m_connectionStatus = status; |
720 | if( status != curStatus && m_connectionStatusCallback ) | 724 | if( status != curStatus && m_connectionStatusCallback ) |
721 | { | 725 | { |
726 | + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) ); | ||
722 | m_connectionStatusCallback( m_clientId, status ); | 727 | m_connectionStatusCallback( m_clientId, status ); |
723 | } | 728 | } |
724 | } | 729 | } |
@@ -784,6 +789,8 @@ void ClientPaho::onConnectOnInstance( const std::string& cause ) | @@ -784,6 +789,8 @@ void ClientPaho::onConnectOnInstance( const std::string& cause ) | ||
784 | 789 | ||
785 | void ClientPaho::onConnectSuccessOnInstance() | 790 | void ClientPaho::onConnectSuccessOnInstance() |
786 | { | 791 | { |
792 | + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", | ||
793 | + std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) ); | ||
787 | { | 794 | { |
788 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 795 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
789 | // Register the connect callback that is used in reconnect scenarios. | 796 | // Register the connect callback that is used in reconnect scenarios. |
@@ -889,7 +896,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) | @@ -889,7 +896,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) | ||
889 | void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) | 896 | void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) |
890 | { | 897 | { |
891 | auto pd = response.publishData(); | 898 | auto pd = response.publishData(); |
892 | - LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); | 899 | + // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); |
893 | { | 900 | { |
894 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 901 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
895 | // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); | 902 | // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); |
src/mqttclient.cpp
@@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string& clientId, std::size_t clie | @@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string& clientId, std::size_t clie | ||
57 | MqttClient::MqttClient(const std::string& _clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback) | 57 | MqttClient::MqttClient(const std::string& _clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback) |
58 | : m_interfaceMutex() | 58 | : m_interfaceMutex() |
59 | , m_internalMutex() | 59 | , m_internalMutex() |
60 | + , m_subscriptionMutex() | ||
60 | , m_endpoint() | 61 | , m_endpoint() |
61 | , m_clientId(_clientId) | 62 | , m_clientId(_clientId) |
62 | , m_activeTokens() | 63 | , m_activeTokens() |
@@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string& _clientId, const std::function<void(co | @@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string& _clientId, const std::function<void(co | ||
67 | , m_additionalClients() | 68 | , m_additionalClients() |
68 | , m_eventQueue(_clientId) | 69 | , m_eventQueue(_clientId) |
69 | , m_workerThread( std::thread( &MqttClient::eventHandler, this ) ) | 70 | , m_workerThread( std::thread( &MqttClient::eventHandler, this ) ) |
71 | + , m_deferredSubscriptions() | ||
70 | { | 72 | { |
71 | Log::init( "mqtt-library" ); | 73 | Log::init( "mqtt-library" ); |
72 | LogInfo( "MQTT Client started", "[MqttClient::MqttClient]"); | 74 | LogInfo( "MQTT Client started", "[MqttClient::MqttClient]"); |
@@ -248,12 +250,18 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | @@ -248,12 +250,18 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | ||
248 | IMqttClientImpl* client(nullptr); | 250 | IMqttClientImpl* client(nullptr); |
249 | { | 251 | { |
250 | // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 252 | // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
251 | - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) | 253 | + if (!m_principalClient || m_principalClient->connectionStatus() != ConnectionStatus::Connected) |
252 | { | 254 | { |
253 | LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) ); | 255 | LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) ); |
254 | - // throw (?)(MqttException, "Not connected"); | 256 | + // Store the subscription in the buffer for later processing. |
257 | + { | ||
258 | + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); | ||
259 | + m_deferredSubscriptions.emplace_back( topic, qos, cb ); | ||
260 | + } | ||
261 | + | ||
255 | return Token(m_clientId, -1); | 262 | return Token(m_clientId, -1); |
256 | } | 263 | } |
264 | + | ||
257 | if (!m_principalClient->isOverlapping(topic)) | 265 | if (!m_principalClient->isOverlapping(topic)) |
258 | { | 266 | { |
259 | client = m_principalClient.get(); | 267 | client = m_principalClient.get(); |
@@ -263,13 +271,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | @@ -263,13 +271,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | ||
263 | { | 271 | { |
264 | for (const auto& c : m_additionalClients) | 272 | for (const auto& c : m_additionalClients) |
265 | { | 273 | { |
266 | - if (!c->isOverlapping(topic)) { | 274 | + if (!c->isOverlapping(topic)) |
275 | + { | ||
267 | client = c.get(); | 276 | client = c.get(); |
268 | clientFound = true; | 277 | clientFound = true; |
269 | break; | 278 | break; |
270 | } | 279 | } |
271 | } | 280 | } |
272 | } | 281 | } |
282 | + | ||
273 | if (!clientFound) | 283 | if (!clientFound) |
274 | { | 284 | { |
275 | LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); | 285 | LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); |
@@ -282,9 +292,10 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | @@ -282,9 +292,10 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | ||
282 | client = m_additionalClients.back().get(); | 292 | client = m_additionalClients.back().get(); |
283 | } | 293 | } |
284 | } | 294 | } |
295 | + | ||
285 | if (!clientFound) | 296 | if (!clientFound) |
286 | { | 297 | { |
287 | - client->connect(true); | 298 | + client->connect( true ); |
288 | } | 299 | } |
289 | return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; | 300 | return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; |
290 | } | 301 | } |
@@ -296,13 +307,15 @@ std::set<Token> MqttClient::unsubscribe(const std::string& topic, int qos) | @@ -296,13 +307,15 @@ std::set<Token> MqttClient::unsubscribe(const std::string& topic, int qos) | ||
296 | std::vector<IMqttClientImpl*> clients{}; | 307 | std::vector<IMqttClientImpl*> clients{}; |
297 | { | 308 | { |
298 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 309 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
299 | - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) { | 310 | + if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) |
311 | + { | ||
300 | LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) ); | 312 | LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) ); |
301 | // Throw (MqttException, "Not connected"); | 313 | // Throw (MqttException, "Not connected"); |
302 | return std::set<Token>(); | 314 | return std::set<Token>(); |
303 | } | 315 | } |
304 | clients.push_back(m_principalClient.get()); | 316 | clients.push_back(m_principalClient.get()); |
305 | - for (const auto& c : m_additionalClients) { | 317 | + for (const auto& c : m_additionalClients) |
318 | + { | ||
306 | clients.push_back(c.get()); | 319 | clients.push_back(c.get()); |
307 | } | 320 | } |
308 | } | 321 | } |
@@ -386,31 +399,54 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -386,31 +399,54 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
386 | std::vector<ConnectionStatus> connectionStates{}; | 399 | std::vector<ConnectionStatus> connectionStates{}; |
387 | { | 400 | { |
388 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 401 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
389 | - if (!m_principalClient) { | ||
390 | - return; | ||
391 | - } | ||
392 | - if (m_principalClient) { | 402 | + |
403 | + if (m_principalClient) | ||
404 | + { | ||
393 | principalClient = m_principalClient.get(); | 405 | principalClient = m_principalClient.get(); |
394 | clients.push_back(principalClient); | 406 | clients.push_back(principalClient); |
395 | connectionStates.push_back(m_principalClient->connectionStatus()); | 407 | connectionStates.push_back(m_principalClient->connectionStatus()); |
396 | } | 408 | } |
397 | - for (const auto& c : m_additionalClients) { | 409 | + |
410 | + for (const auto& c : m_additionalClients) | ||
411 | + { | ||
398 | clients.push_back(c.get()); | 412 | clients.push_back(c.get()); |
399 | connectionStates.push_back(c->connectionStatus()); | 413 | connectionStates.push_back(c->connectionStatus()); |
400 | } | 414 | } |
401 | } | 415 | } |
416 | + | ||
402 | auto newState = determineState(connectionStates); | 417 | auto newState = determineState(connectionStates); |
403 | - bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); | ||
404 | - if (resubscribe) { | 418 | + // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); |
419 | + bool resubscribe = ( StateEnum::Good == newState ); | ||
420 | + if (resubscribe) | ||
421 | + { | ||
422 | + // First activate pending subscriptions | ||
423 | + { | ||
424 | + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); | ||
425 | + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); | ||
426 | + while( m_deferredSubscriptions.size() > 0 ) | ||
427 | + { | ||
428 | + auto subscription = m_deferredSubscriptions.at( 0 ); | ||
429 | + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); | ||
430 | + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); | ||
431 | + } | ||
432 | + } | ||
433 | + | ||
434 | + LogDebug( "[MqttClient::connectionStatusChanged]", | ||
435 | + std::string( m_clientId + " - Resubscribing..." ) ); | ||
405 | { | 436 | { |
406 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 437 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
407 | m_activeTokens.clear(); | 438 | m_activeTokens.clear(); |
408 | } | 439 | } |
409 | - for (auto* cl : clients) { | ||
410 | - try { | 440 | + |
441 | + for (auto* cl : clients) | ||
442 | + { | ||
443 | + try | ||
444 | + { | ||
445 | + LogDebug( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - Client " + cl->clientId() + " has " + std::string( cl->hasPendingSubscriptions() ? "" : "no" ) + " pending subscriptions" ) ); | ||
411 | cl->resubscribe(); | 446 | cl->resubscribe(); |
412 | } | 447 | } |
413 | - catch (const std::exception& e) { | 448 | + catch (const std::exception& e) |
449 | + { | ||
414 | LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) ); | 450 | LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) ); |
415 | } | 451 | } |
416 | } | 452 | } |
@@ -420,20 +456,32 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -420,20 +456,32 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
420 | // The server state change and a possible resubscription are done in the context of the MqttClient worker thread | 456 | // The server state change and a possible resubscription are done in the context of the MqttClient worker thread |
421 | // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. | 457 | // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. |
422 | this->pushEvent([this, resubscribe, clients, principalClient, newState]() { | 458 | this->pushEvent([this, resubscribe, clients, principalClient, newState]() { |
423 | - if (resubscribe) { | 459 | + if (resubscribe) |
460 | + { | ||
424 | // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. | 461 | // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. |
425 | // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. | 462 | // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. |
426 | auto waitFor = std::chrono::milliseconds(1000); | 463 | auto waitFor = std::chrono::milliseconds(1000); |
427 | - if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) { | ||
428 | - if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) { | 464 | + if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) |
465 | + { | ||
466 | + if (std::accumulate(clients.begin(), | ||
467 | + clients.end(), | ||
468 | + false, | ||
469 | + [](bool hasPending, IMqttClientImpl* client) | ||
470 | + { | ||
471 | + return hasPending || client->hasPendingSubscriptions(); | ||
472 | + })) | ||
473 | + { | ||
429 | LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) ); | 474 | LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) ); |
430 | } | 475 | } |
431 | } | 476 | } |
432 | - if (principalClient) { | ||
433 | - try { | 477 | + if (principalClient) |
478 | + { | ||
479 | + try | ||
480 | + { | ||
434 | principalClient->publishPending(); | 481 | principalClient->publishPending(); |
435 | } | 482 | } |
436 | - catch (const std::exception& e) { | 483 | + catch (const std::exception& e) |
484 | + { | ||
437 | LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); | 485 | LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); |
438 | } | 486 | } |
439 | } | 487 | } |