Commit d557d523819c9e3d5cce08fbe8fc46ea27ec7aa4

Authored by Peter M. Groen
1 parent 11fe0b09

Fix deferred subscriptions

include/mqttclient.h
... ... @@ -42,6 +42,26 @@ namespace osdev {
42 42 namespace components {
43 43 namespace mqtt {
44 44  
  45 +// Internal structure for storing subscriptions until a valid connection becomes available.
  46 +class Subscription
  47 +{
  48 +public:
  49 + Subscription( const std::string &topic, int qos, const std::function<void(MqttMessage)>& call_back )
  50 + : m_topic( topic )
  51 + , m_qos( qos )
  52 + , m_call_back(call_back )
  53 + {}
  54 +
  55 + std::string getTopic() const { return m_topic; }
  56 + int getQoS() const { return m_qos; }
  57 + std::function<void(MqttMessage)>& getCallBack() const { return m_call_back; }
  58 +
  59 +private:
  60 + std::string m_topic;
  61 + int m_qos;
  62 + std::function<void(MqttMessage)>& m_call_back;
  63 +};
  64 +
45 65 // Forward definition
46 66 class IMqttClientImpl;
47 67  
... ... @@ -215,6 +235,7 @@ private:
215 235  
216 236 mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive
217 237 mutable std::mutex m_internalMutex; ///< Protect the internal state.
  238 + mutable std::mutex m_subscriptionMutex; ///< Protect the deferred Subscription Buffer
218 239 std::string m_endpoint; ///< The endpoint uri.
219 240 std::string m_clientId; ///< The main client identification.
220 241 std::set<Token> m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens.
... ... @@ -225,6 +246,7 @@ private:
225 246 std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients; ///< A vector of additional wrapper clients.
226 247 SynchronizedQueue<std::function<void()>> m_eventQueue; ///< Synchronized queue for scheduling additional work.
227 248 std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads.
  249 + std::vector<Subscription> m_deferredSubscriptions; ///< A buffer to store subscription requests until the principal client comes online
228 250 };
229 251  
230 252 } // End namespace mqtt
... ...
src/clientpaho.cpp
... ... @@ -680,7 +680,7 @@ std::int32_t ClientPaho::publishInternal( const MqttMessage&amp; message, int qos )
680 680  
681 681 if( !m_pendingOperations.insert( opts.token ).second )
682 682 {
683   - LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
  683 + // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
684 684 }
685 685 m_operationResult.erase( opts.token );
686 686 return opts.token;
... ... @@ -718,10 +718,12 @@ std::int32_t ClientPaho::subscribeInternal( const std::string&amp; topic, int qos )
718 718  
719 719 void ClientPaho::setConnectionStatus( ConnectionStatus status )
720 720 {
  721 + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
721 722 ConnectionStatus curStatus = m_connectionStatus;
722 723 m_connectionStatus = status;
723 724 if( status != curStatus && m_connectionStatusCallback )
724 725 {
  726 + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
725 727 m_connectionStatusCallback( m_clientId, status );
726 728 }
727 729 }
... ... @@ -787,6 +789,8 @@ void ClientPaho::onConnectOnInstance( const std::string&amp; cause )
787 789  
788 790 void ClientPaho::onConnectSuccessOnInstance()
789 791 {
  792 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
  793 + std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
790 794 {
791 795 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
792 796 // Register the connect callback that is used in reconnect scenarios.
... ... @@ -892,7 +896,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure&amp; response )
892 896 void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
893 897 {
894 898 auto pd = response.publishData();
895   - LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
  899 + // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
896 900 {
897 901 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
898 902 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
... ...
src/mqttclient.cpp
... ... @@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string&amp; clientId, std::size_t clie
57 57 MqttClient::MqttClient(const std::string& _clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback)
58 58 : m_interfaceMutex()
59 59 , m_internalMutex()
  60 + , m_subscriptionMutex()
60 61 , m_endpoint()
61 62 , m_clientId(_clientId)
62 63 , m_activeTokens()
... ... @@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string&amp; _clientId, const std::function&lt;void(co
67 68 , m_additionalClients()
68 69 , m_eventQueue(_clientId)
69 70 , m_workerThread( std::thread( &MqttClient::eventHandler, this ) )
  71 + , m_deferredSubscriptions()
70 72 {
71 73 Log::init( "mqtt-library" );
72 74 LogInfo( "MQTT Client started", "[MqttClient::MqttClient]");
... ... @@ -248,12 +250,18 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
248 250 IMqttClientImpl* client(nullptr);
249 251 {
250 252 // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
251   - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected)
  253 + if (!m_principalClient || m_principalClient->connectionStatus() != ConnectionStatus::Connected)
252 254 {
253 255 LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) );
254   - // throw (?)(MqttException, "Not connected");
  256 + // Store the subscription in the buffer for later processing.
  257 + {
  258 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  259 + m_deferredSubscriptions.push_back( Subscription( topic, qos, cb ) );
  260 + }
  261 +
255 262 return Token(m_clientId, -1);
256 263 }
  264 +
257 265 if (!m_principalClient->isOverlapping(topic))
258 266 {
259 267 client = m_principalClient.get();
... ... @@ -270,6 +278,7 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
270 278 }
271 279 }
272 280 }
  281 +
273 282 if (!clientFound)
274 283 {
275 284 LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) );
... ... @@ -282,9 +291,10 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
282 291 client = m_additionalClients.back().get();
283 292 }
284 293 }
  294 +
285 295 if (!clientFound)
286 296 {
287   - client->connect( false );
  297 + client->connect( true );
288 298 }
289 299 return Token{ client->clientId(), client->subscribe(topic, qos, cb) };
290 300 }
... ... @@ -386,32 +396,42 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
386 396 std::vector<ConnectionStatus> connectionStates{};
387 397 {
388 398 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
389   - if (!m_principalClient) {
390   - return;
391   - }
392   - if (m_principalClient) {
  399 +
  400 + if (m_principalClient)
  401 + {
393 402 principalClient = m_principalClient.get();
394 403 clients.push_back(principalClient);
395 404 connectionStates.push_back(m_principalClient->connectionStatus());
396 405 }
397   - for (const auto& c : m_additionalClients) {
  406 +
  407 + for (const auto& c : m_additionalClients)
  408 + {
398 409 clients.push_back(c.get());
399 410 connectionStates.push_back(c->connectionStatus());
400 411 }
401 412 }
  413 +
402 414 auto newState = determineState(connectionStates);
403   - bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
  415 + // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
  416 + bool resubscribe = ( StateEnum::Good == newState );
404 417 if (resubscribe)
405 418 {
  419 + LogDebug( "[MqttClient::connectionStatusChanged]",
  420 + std::string( m_clientId + " - Resubscribing..." ) );
406 421 {
407 422 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
408 423 m_activeTokens.clear();
409 424 }
410   - for (auto* cl : clients) {
411   - try {
  425 +
  426 + for (auto* cl : clients)
  427 + {
  428 + try
  429 + {
  430 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - Client " + cl->clientId() + " has " + std::string( cl->hasPendingSubscriptions() ? "" : "no" ) + " pending subscriptions" ) );
412 431 cl->resubscribe();
413 432 }
414   - catch (const std::exception& e) {
  433 + catch (const std::exception& e)
  434 + {
415 435 LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) );
416 436 }
417 437 }
... ... @@ -421,20 +441,32 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
421 441 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread
422 442 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions.
423 443 this->pushEvent([this, resubscribe, clients, principalClient, newState]() {
424   - if (resubscribe) {
  444 + if (resubscribe)
  445 + {
425 446 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens.
426 447 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread.
427 448 auto waitFor = std::chrono::milliseconds(1000);
428   - if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) {
429   - if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) {
  449 + if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{}))
  450 + {
  451 + if (std::accumulate(clients.begin(),
  452 + clients.end(),
  453 + false,
  454 + [](bool hasPending, IMqttClientImpl* client)
  455 + {
  456 + return hasPending || client->hasPendingSubscriptions();
  457 + }))
  458 + {
430 459 LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) );
431 460 }
432 461 }
433   - if (principalClient) {
434   - try {
  462 + if (principalClient)
  463 + {
  464 + try
  465 + {
435 466 principalClient->publishPending();
436 467 }
437   - catch (const std::exception& e) {
  468 + catch (const std::exception& e)
  469 + {
438 470 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
439 471 }
440 472 }
... ...