Merged
Merge Request #13 · created by Peter M. Groen


Fix/pgroen/subscription failure


From fix/pgroen/subscription_failure into master

Merged by Steven de Ridder

Source branch has been removed
2 participants



examples/sub/main.cpp
... ... @@ -76,7 +76,8 @@ int main( int argc, char* argv[] )
76 76 {
77 77 std::cout << "[OK]" << std::endl;
78 78 std::cout << "Connecting to the test-broker : " << std::endl;
79   - pSubscriber->connect( "localhost", 1883, "", "" );
  79 + pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." );
  80 +
80 81 std::cout << "Subscribing to the test-topic....." << std::endl;
81 82 pSubscriber->subscribe( "test/publisher/#" );
82 83  
... ... @@ -84,6 +85,7 @@ int main( int argc, char* argv[] )
84 85 while( 1 )
85 86 {
86 87 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
  88 + std::cout << ".";
87 89 }
88 90 }
89 91 else
... ...
examples/sub/subscriber.cpp
... ... @@ -24,11 +24,25 @@
24 24 #include <iostream>
25 25  
26 26 Subscriber::Subscriber( const std::string &client_id )
27   - : MqttSubscriberBase( client_id )
  27 + : m_mqtt_client( client_id )
28 28 {
29 29  
30 30 }
31 31  
  32 +void Subscriber::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message )
  33 +{
  34 + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) );
  35 + std::cout << "Client state : " << m_mqtt_client.state() << std::endl;
  36 +}
  37 +
  38 +void Subscriber::subscribe( const std::string &message_topic )
  39 +{
  40 + m_mqtt_client.subscribe( message_topic, 1, [this](const osdev::components::mqtt::MqttMessage &message )
  41 + {
  42 + this->receive_data( message.topic(), message.payload() );
  43 + });
  44 +}
  45 +
32 46 void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload )
33 47 {
34 48 std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl;
... ...
examples/sub/subscriber.h
... ... @@ -22,19 +22,29 @@
22 22 #pragma once
23 23  
24 24 // std
  25 +#include <memory>
25 26 #include <string>
26 27  
27 28 // mqtt-cpp
28   -#include "mqttsubscriberbase.h"
  29 +#include "mqttclient.h"
  30 +#include "compat-c++14.h"
29 31  
30   -class Subscriber : public MqttSubscriberBase
  32 +class Subscriber
31 33 {
32 34 public:
33   - Subscriber( const std::string &client_id );
  35 + Subscriber(const std::string &client_id);
34 36  
35 37 virtual ~Subscriber() {}
36 38  
  39 + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string(),
  40 + const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() );
  41 +
  42 + void subscribe( const std::string &message_topic );
  43 +
37 44 protected:
38   - void receive_data( const std::string &message_topic, const std::string &message_payload );
  45 + void receive_data( const std::string &message_topic, const std::string &message_payload );
  46 +
  47 +private:
  48 + osdev::components::mqtt::MqttClient m_mqtt_client;
39 49  
40 50 };
... ...
include/mqttclient.h
... ... @@ -42,6 +42,26 @@ namespace osdev {
42 42 namespace components {
43 43 namespace mqtt {
44 44  
  45 +// Internal structure for storing subscriptions until a valid connection becomes available.
  46 +class Subscription
  47 +{
  48 +public:
  49 + Subscription( const std::string &topic, int qos, const std::function<void(MqttMessage)> call_back )
  50 + : m_topic( topic )
  51 + , m_qos( qos )
  52 + , m_call_back(call_back )
  53 + {}
  54 +
  55 + std::string getTopic() const { return m_topic; }
  56 + int getQoS() const { return m_qos; }
  57 + std::function<void(MqttMessage)> getCallBack() { return m_call_back; }
  58 +
  59 +private:
  60 + std::string m_topic;
  61 + int m_qos;
  62 + std::function<void(MqttMessage)> m_call_back;
  63 +};
  64 +
45 65 // Forward definition
46 66 class IMqttClientImpl;
47 67  
... ... @@ -215,6 +235,7 @@ private:
215 235  
216 236 mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive
217 237 mutable std::mutex m_internalMutex; ///< Protect the internal state.
  238 + mutable std::mutex m_subscriptionMutex; ///< Protect the deferred Subscription Buffer
218 239 std::string m_endpoint; ///< The endpoint uri.
219 240 std::string m_clientId; ///< The main client identification.
220 241 std::set<Token> m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens.
... ... @@ -225,6 +246,7 @@ private:
225 246 std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients; ///< A vector of additional wrapper clients.
226 247 SynchronizedQueue<std::function<void()>> m_eventQueue; ///< Synchronized queue for scheduling additional work.
227 248 std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads.
  249 + std::vector<Subscription> m_deferredSubscriptions; ///< A buffer to store subscription requests until the principal client comes online
228 250 };
229 251  
230 252 } // End namespace mqtt
... ...
src/clientpaho.cpp
... ... @@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std
425 425 {
426 426 if( ConnectionStatus::Connected != m_connectionStatus )
427 427 {
428   - // MqttException, "Not connected"
  428 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
429 429 }
430 430  
431 431 if( !isValidTopic( topic ) )
432 432 {
433 433 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
  434 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
  435 + return -1;
434 436 }
435 437  
436 438 if( qos > 2 )
... ... @@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std
477 479 if( isOverlappingInternal( topic, existingTopic ) )
478 480 {
479 481 // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
  482 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
480 483 }
481 484  
482 485 LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
... ... @@ -677,7 +680,7 @@ std::int32_t ClientPaho::publishInternal( const MqttMessage&amp; message, int qos )
677 680  
678 681 if( !m_pendingOperations.insert( opts.token ).second )
679 682 {
680   - LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
  683 + // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
681 684 }
682 685 m_operationResult.erase( opts.token );
683 686 return opts.token;
... ... @@ -715,10 +718,12 @@ std::int32_t ClientPaho::subscribeInternal( const std::string&amp; topic, int qos )
715 718  
716 719 void ClientPaho::setConnectionStatus( ConnectionStatus status )
717 720 {
  721 + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
718 722 ConnectionStatus curStatus = m_connectionStatus;
719 723 m_connectionStatus = status;
720 724 if( status != curStatus && m_connectionStatusCallback )
721 725 {
  726 + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
722 727 m_connectionStatusCallback( m_clientId, status );
723 728 }
724 729 }
... ... @@ -784,6 +789,8 @@ void ClientPaho::onConnectOnInstance( const std::string&amp; cause )
784 789  
785 790 void ClientPaho::onConnectSuccessOnInstance()
786 791 {
  792 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
  793 + std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
787 794 {
788 795 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
789 796 // Register the connect callback that is used in reconnect scenarios.
... ... @@ -889,7 +896,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure&amp; response )
889 896 void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
890 897 {
891 898 auto pd = response.publishData();
892   - LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
  899 + // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
893 900 {
894 901 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
895 902 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
... ...
src/mqttclient.cpp
... ... @@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string&amp; clientId, std::size_t clie
57 57 MqttClient::MqttClient(const std::string& _clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback)
58 58 : m_interfaceMutex()
59 59 , m_internalMutex()
  60 + , m_subscriptionMutex()
60 61 , m_endpoint()
61 62 , m_clientId(_clientId)
62 63 , m_activeTokens()
... ... @@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string&amp; _clientId, const std::function&lt;void(co
67 68 , m_additionalClients()
68 69 , m_eventQueue(_clientId)
69 70 , m_workerThread( std::thread( &MqttClient::eventHandler, this ) )
  71 + , m_deferredSubscriptions()
70 72 {
71 73 Log::init( "mqtt-library" );
72 74 LogInfo( "MQTT Client started", "[MqttClient::MqttClient]");
... ... @@ -248,12 +250,18 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
248 250 IMqttClientImpl* client(nullptr);
249 251 {
250 252 // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
251   - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected)
  253 + if (!m_principalClient || m_principalClient->connectionStatus() != ConnectionStatus::Connected)
252 254 {
253 255 LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) );
254   - // throw (?)(MqttException, "Not connected");
  256 + // Store the subscription in the buffer for later processing.
  257 + {
  258 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  259 + m_deferredSubscriptions.emplace_back( topic, qos, cb );
  260 + }
  261 +
255 262 return Token(m_clientId, -1);
256 263 }
  264 +
257 265 if (!m_principalClient->isOverlapping(topic))
258 266 {
259 267 client = m_principalClient.get();
... ... @@ -263,13 +271,15 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
263 271 {
264 272 for (const auto& c : m_additionalClients)
265 273 {
266   - if (!c->isOverlapping(topic)) {
  274 + if (!c->isOverlapping(topic))
  275 + {
267 276 client = c.get();
268 277 clientFound = true;
269 278 break;
270 279 }
271 280 }
272 281 }
  282 +
273 283 if (!clientFound)
274 284 {
275 285 LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) );
... ... @@ -282,9 +292,10 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
282 292 client = m_additionalClients.back().get();
283 293 }
284 294 }
  295 +
285 296 if (!clientFound)
286 297 {
287   - client->connect(true);
  298 + client->connect( true );
288 299 }
289 300 return Token{ client->clientId(), client->subscribe(topic, qos, cb) };
290 301 }
... ... @@ -296,13 +307,15 @@ std::set&lt;Token&gt; MqttClient::unsubscribe(const std::string&amp; topic, int qos)
296 307 std::vector<IMqttClientImpl*> clients{};
297 308 {
298 309 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
299   - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) {
  310 + if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected)
  311 + {
300 312 LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) );
301 313 // Throw (MqttException, "Not connected");
302 314 return std::set<Token>();
303 315 }
304 316 clients.push_back(m_principalClient.get());
305   - for (const auto& c : m_additionalClients) {
  317 + for (const auto& c : m_additionalClients)
  318 + {
306 319 clients.push_back(c.get());
307 320 }
308 321 }
... ... @@ -386,31 +399,54 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
386 399 std::vector<ConnectionStatus> connectionStates{};
387 400 {
388 401 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
389   - if (!m_principalClient) {
390   - return;
391   - }
392   - if (m_principalClient) {
  402 +
  403 + if (m_principalClient)
  404 + {
393 405 principalClient = m_principalClient.get();
394 406 clients.push_back(principalClient);
395 407 connectionStates.push_back(m_principalClient->connectionStatus());
396 408 }
397   - for (const auto& c : m_additionalClients) {
  409 +
  410 + for (const auto& c : m_additionalClients)
  411 + {
398 412 clients.push_back(c.get());
399 413 connectionStates.push_back(c->connectionStatus());
400 414 }
401 415 }
  416 +
402 417 auto newState = determineState(connectionStates);
403   - bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
404   - if (resubscribe) {
  418 + // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
  419 + bool resubscribe = ( StateEnum::Good == newState );
  420 + if (resubscribe)
  421 + {
  422 + // First activate pending subscriptions
  423 + {
  424 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  425 + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) );
  426 + while( m_deferredSubscriptions.size() > 0 )
  427 + {
  428 + auto subscription = m_deferredSubscriptions.at( 0 );
  429 + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() );
  430 + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() );
  431 + }
  432 + }
  433 +
  434 + LogDebug( "[MqttClient::connectionStatusChanged]",
  435 + std::string( m_clientId + " - Resubscribing..." ) );
405 436 {
406 437 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
407 438 m_activeTokens.clear();
408 439 }
409   - for (auto* cl : clients) {
410   - try {
  440 +
  441 + for (auto* cl : clients)
  442 + {
  443 + try
  444 + {
  445 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - Client " + cl->clientId() + " has " + std::string( cl->hasPendingSubscriptions() ? "" : "no" ) + " pending subscriptions" ) );
411 446 cl->resubscribe();
412 447 }
413   - catch (const std::exception& e) {
  448 + catch (const std::exception& e)
  449 + {
414 450 LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) );
415 451 }
416 452 }
... ... @@ -420,20 +456,32 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
420 456 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread
421 457 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions.
422 458 this->pushEvent([this, resubscribe, clients, principalClient, newState]() {
423   - if (resubscribe) {
  459 + if (resubscribe)
  460 + {
424 461 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens.
425 462 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread.
426 463 auto waitFor = std::chrono::milliseconds(1000);
427   - if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) {
428   - if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) {
  464 + if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{}))
  465 + {
  466 + if (std::accumulate(clients.begin(),
  467 + clients.end(),
  468 + false,
  469 + [](bool hasPending, IMqttClientImpl* client)
  470 + {
  471 + return hasPending || client->hasPendingSubscriptions();
  472 + }))
  473 + {
429 474 LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) );
430 475 }
431 476 }
432   - if (principalClient) {
433   - try {
  477 + if (principalClient)
  478 + {
  479 + try
  480 + {
434 481 principalClient->publishPending();
435 482 }
436   - catch (const std::exception& e) {
  483 + catch (const std::exception& e)
  484 + {
437 485 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
438 486 }
439 487 }
... ...