Commit 11fe0b09d19faa4f76dea23f651ebf5e6b376555

Authored by Peter M. Groen
1 parent 4b9da1a8

Rework for subscription async

examples/sub/main.cpp
@@ -76,7 +76,8 @@ int main( int argc, char* argv[] ) @@ -76,7 +76,8 @@ int main( int argc, char* argv[] )
76 { 76 {
77 std::cout << "[OK]" << std::endl; 77 std::cout << "[OK]" << std::endl;
78 std::cout << "Connecting to the test-broker : " << std::endl; 78 std::cout << "Connecting to the test-broker : " << std::endl;
79 - pSubscriber->connect( "localhost", 1883, "", "" ); 79 + pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." );
  80 +
80 std::cout << "Subscribing to the test-topic....." << std::endl; 81 std::cout << "Subscribing to the test-topic....." << std::endl;
81 pSubscriber->subscribe( "test/publisher/#" ); 82 pSubscriber->subscribe( "test/publisher/#" );
82 83
@@ -84,6 +85,7 @@ int main( int argc, char* argv[] ) @@ -84,6 +85,7 @@ int main( int argc, char* argv[] )
84 while( 1 ) 85 while( 1 )
85 { 86 {
86 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. 87 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
  88 + std::cout << ".";
87 } 89 }
88 } 90 }
89 else 91 else
examples/sub/subscriber.cpp
@@ -24,11 +24,25 @@ @@ -24,11 +24,25 @@
24 #include <iostream> 24 #include <iostream>
25 25
26 Subscriber::Subscriber( const std::string &client_id ) 26 Subscriber::Subscriber( const std::string &client_id )
27 - : MqttSubscriberBase( client_id ) 27 + : m_mqtt_client( client_id )
28 { 28 {
29 29
30 } 30 }
31 31
  32 +void Subscriber::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message )
  33 +{
  34 + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) );
  35 + std::cout << "Client state : " << m_mqtt_client.state() << std::endl;
  36 +}
  37 +
  38 +void Subscriber::subscribe( const std::string &message_topic )
  39 +{
  40 + m_mqtt_client.subscribe( message_topic, 1, [this](const osdev::components::mqtt::MqttMessage &message )
  41 + {
  42 + this->receive_data( message.topic(), message.payload() );
  43 + });
  44 +}
  45 +
32 void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) 46 void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload )
33 { 47 {
34 std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; 48 std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl;
examples/sub/subscriber.h
@@ -22,19 +22,29 @@ @@ -22,19 +22,29 @@
22 #pragma once 22 #pragma once
23 23
24 // std 24 // std
  25 +#include <memory>
25 #include <string> 26 #include <string>
26 27
27 // mqtt-cpp 28 // mqtt-cpp
28 -#include "mqttsubscriberbase.h" 29 +#include "mqttclient.h"
  30 +#include "compat-c++14.h"
29 31
30 -class Subscriber : public MqttSubscriberBase 32 +class Subscriber
31 { 33 {
32 public: 34 public:
33 - Subscriber( const std::string &client_id ); 35 + Subscriber(const std::string &client_id);
34 36
35 virtual ~Subscriber() {} 37 virtual ~Subscriber() {}
36 38
  39 + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string(),
  40 + const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() );
  41 +
  42 + void subscribe( const std::string &message_topic );
  43 +
37 protected: 44 protected:
38 - void receive_data( const std::string &message_topic, const std::string &message_payload ); 45 + void receive_data( const std::string &message_topic, const std::string &message_payload );
  46 +
  47 +private:
  48 + osdev::components::mqtt::MqttClient m_mqtt_client;
39 49
40 }; 50 };
src/clientpaho.cpp
@@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std @@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std
425 { 425 {
426 if( ConnectionStatus::Connected != m_connectionStatus ) 426 if( ConnectionStatus::Connected != m_connectionStatus )
427 { 427 {
428 - // MqttException, "Not connected" 428 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
429 } 429 }
430 430
431 if( !isValidTopic( topic ) ) 431 if( !isValidTopic( topic ) )
432 { 432 {
433 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); 433 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
  434 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
  435 + return -1;
434 } 436 }
435 437
436 if( qos > 2 ) 438 if( qos > 2 )
@@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std @@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std
477 if( isOverlappingInternal( topic, existingTopic ) ) 479 if( isOverlappingInternal( topic, existingTopic ) )
478 { 480 {
479 // (OverlappingTopicException, "overlapping topic", existingTopic, topic); 481 // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
  482 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
480 } 483 }
481 484
482 LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); 485 LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
src/mqttclient.cpp
@@ -284,7 +284,7 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi @@ -284,7 +284,7 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
284 } 284 }
285 if (!clientFound) 285 if (!clientFound)
286 { 286 {
287 - client->connect(true); 287 + client->connect( false );
288 } 288 }
289 return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; 289 return Token{ client->clientId(), client->subscribe(topic, qos, cb) };
290 } 290 }
@@ -401,7 +401,8 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -401,7 +401,8 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
401 } 401 }
402 auto newState = determineState(connectionStates); 402 auto newState = determineState(connectionStates);
403 bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); 403 bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
404 - if (resubscribe) { 404 + if (resubscribe)
  405 + {
405 { 406 {
406 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); 407 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
407 m_activeTokens.clear(); 408 m_activeTokens.clear();