From 11fe0b09d19faa4f76dea23f651ebf5e6b376555 Mon Sep 17 00:00:00 2001 From: Peter M. Groen Date: Thu, 7 Jul 2022 17:17:48 +0200 Subject: [PATCH] Rework for subscription async --- examples/sub/main.cpp | 4 +++- examples/sub/subscriber.cpp | 16 +++++++++++++++- examples/sub/subscriber.h | 18 ++++++++++++++---- src/clientpaho.cpp | 5 ++++- src/mqttclient.cpp | 5 +++-- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index c9f1850..6374d90 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -76,7 +76,8 @@ int main( int argc, char* argv[] ) { std::cout << "[OK]" << std::endl; std::cout << "Connecting to the test-broker : " << std::endl; - pSubscriber->connect( "localhost", 1883, "", "" ); + pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." ); + std::cout << "Subscribing to the test-topic....." << std::endl; pSubscriber->subscribe( "test/publisher/#" ); @@ -84,6 +85,7 @@ int main( int argc, char* argv[] ) while( 1 ) { sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. + std::cout << "."; } } else diff --git a/examples/sub/subscriber.cpp b/examples/sub/subscriber.cpp index 8354dd6..ac6d812 100644 --- a/examples/sub/subscriber.cpp +++ b/examples/sub/subscriber.cpp @@ -24,11 +24,25 @@ #include Subscriber::Subscriber( const std::string &client_id ) - : MqttSubscriberBase( client_id ) + : m_mqtt_client( client_id ) { } +void Subscriber::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message ) +{ + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) ); + std::cout << "Client state : " << m_mqtt_client.state() << std::endl; +} + +void Subscriber::subscribe( const std::string &message_topic ) +{ + m_mqtt_client.subscribe( message_topic, 1, [this](const osdev::components::mqtt::MqttMessage &message ) + { + this->receive_data( message.topic(), message.payload() ); + }); +} + void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) { std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; diff --git a/examples/sub/subscriber.h b/examples/sub/subscriber.h index 8c67ef4..5f30bff 100644 --- a/examples/sub/subscriber.h +++ b/examples/sub/subscriber.h @@ -22,19 +22,29 @@ #pragma once // std +#include #include // mqtt-cpp -#include "mqttsubscriberbase.h" +#include "mqttclient.h" +#include "compat-c++14.h" -class Subscriber : public MqttSubscriberBase +class Subscriber { public: - Subscriber( const std::string &client_id ); + Subscriber(const std::string &client_id); virtual ~Subscriber() {} + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string(), + const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() ); + + void subscribe( const std::string &message_topic ); + protected: - void receive_data( const std::string &message_topic, const std::string &message_payload ); + void receive_data( const std::string &message_topic, const std::string &message_payload ); + +private: + osdev::components::mqtt::MqttClient m_mqtt_client; }; diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 24e6239..5ec2436 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std { if( ConnectionStatus::Connected != m_connectionStatus ) { - // MqttException, "Not connected" + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) ); } if( !isValidTopic( topic ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) ); + return -1; } if( qos > 2 ) @@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std if( isOverlappingInternal( topic, existingTopic ) ) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) ); } LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index ad478fb..dbf0d79 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -284,7 +284,7 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi } if (!clientFound) { - client->connect(true); + client->connect( false ); } return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; } @@ -401,7 +401,8 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus } auto newState = determineState(connectionStates); bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); - if (resubscribe) { + if (resubscribe) + { { OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); m_activeTokens.clear(); -- libgit2 0.21.4