Commit 8ae7f1fe55514a8cf88c2e6cdb4ca598f7a80fc2
1 parent
d557d523
Fix deferred subscriptions
Showing
2 changed files
with
14 additions
and
2 deletions
include/mqttclient.h
... | ... | @@ -49,12 +49,12 @@ public: |
49 | 49 | Subscription( const std::string &topic, int qos, const std::function<void(MqttMessage)>& call_back ) |
50 | 50 | : m_topic( topic ) |
51 | 51 | , m_qos( qos ) |
52 | - , m_call_back(call_back ) | |
52 | + , m_call_back(const_cast<std::function<void(MqttMessage)>&>(call_back )) | |
53 | 53 | {} |
54 | 54 | |
55 | 55 | std::string getTopic() const { return m_topic; } |
56 | 56 | int getQoS() const { return m_qos; } |
57 | - std::function<void(MqttMessage)>& getCallBack() const { return m_call_back; } | |
57 | + std::function<void(MqttMessage)>& getCallBack() const { return m_call_back; } | |
58 | 58 | |
59 | 59 | private: |
60 | 60 | std::string m_topic; | ... | ... |
src/mqttclient.cpp
... | ... | @@ -416,6 +416,18 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus |
416 | 416 | bool resubscribe = ( StateEnum::Good == newState ); |
417 | 417 | if (resubscribe) |
418 | 418 | { |
419 | + // First activate pending subscriptions | |
420 | + { | |
421 | + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); | |
422 | + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); | |
423 | + while( m_deferredSubscriptions.size() > 0 ) | |
424 | + { | |
425 | + auto subscription = m_deferredSubscriptions.at( 0 ); | |
426 | + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); | |
427 | + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); | |
428 | + } | |
429 | + } | |
430 | + | |
419 | 431 | LogDebug( "[MqttClient::connectionStatusChanged]", |
420 | 432 | std::string( m_clientId + " - Resubscribing..." ) ); |
421 | 433 | { | ... | ... |