diff --git a/CMakeLists.txt b/CMakeLists.txt index f285463..34291a3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ add_subdirectory(src) add_subdirectory(examples/connect) add_subdirectory(examples/pub) add_subdirectory(examples/sub) +add_subdirectory(examples/subunsub) include(packaging) package_component() diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index 6374d90..add7aff 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -27,6 +27,8 @@ #include "subscriber.h" +const int MAX_LOOP_COUNT = 10; + enum TIME_RES { T_MICRO, @@ -65,6 +67,7 @@ void sleepcp( int number, TIME_RES resolution = T_MILLI ) // Cross-platform s int main( int argc, char* argv[] ) { + int loop_counter = 0; // Satisfy the compiler (void)argc; (void)argv; @@ -79,14 +82,32 @@ int main( int argc, char* argv[] ) pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." ); std::cout << "Subscribing to the test-topic....." << std::endl; - pSubscriber->subscribe( "test/publisher/#" ); + pSubscriber->subscribe( "test/publisher/TestPublisher_0" ); + pSubscriber->subscribe( "test/publisher/TestPublisher_1" ); + pSubscriber->subscribe( "test/publisher/TestPublisher_2" ); + pSubscriber->subscribe( "test/publisher/TestPublisher_3" ); // Start a loop to give the subscriber the possibility to do its work. - while( 1 ) + while (loop_counter < MAX_LOOP_COUNT) { sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. std::cout << "."; + loop_counter++; + } + + std::cout << "Unsubscribing from test/publisher/#" << std::endl; + + pSubscriber->unsubscribe("test/publisher/#" ); + pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" ); + pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" ); + pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" ); + + while (1) + { + sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene. + std::cout << "."; } + } else { diff --git a/examples/sub/subscriber.cpp b/examples/sub/subscriber.cpp index ac6d812..9917482 100644 --- a/examples/sub/subscriber.cpp +++ b/examples/sub/subscriber.cpp @@ -43,6 +43,11 @@ void Subscriber::subscribe( const std::string &message_topic ) }); } +void Subscriber::unsubscribe( const std::string &message_topic ) +{ + m_mqtt_client.unsubscribe( message_topic, 1 ); +} + void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) { std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; diff --git a/examples/sub/subscriber.h b/examples/sub/subscriber.h index 5f30bff..9b45d40 100644 --- a/examples/sub/subscriber.h +++ b/examples/sub/subscriber.h @@ -41,6 +41,8 @@ public: void subscribe( const std::string &message_topic ); + void unsubscribe( const std::string &message_topic ); + protected: void receive_data( const std::string &message_topic, const std::string &message_payload ); diff --git a/examples/subunsub/CMakeLists.txt b/examples/subunsub/CMakeLists.txt new file mode 100644 index 0000000..b2feb22 --- /dev/null +++ b/examples/subunsub/CMakeLists.txt @@ -0,0 +1,32 @@ +cmake_minimum_required(VERSION 3.0) +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) + +include(projectheader) +project_header(test_subunsub) + +include_directories( SYSTEM + ${CMAKE_CURRENT_SOURCE_DIR}/../../include +) + +include(compiler) +set(SRC_LIST + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp +) + +add_executable( ${PROJECT_NAME} + ${SRC_LIST} +) + +target_link_libraries( + ${PROJECT_NAME} + mqtt-cpp +) + +set_target_properties( ${PROJECT_NAME} PROPERTIES + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib +) + +include(installation) +install_application() diff --git a/examples/subunsub/main.cpp b/examples/subunsub/main.cpp new file mode 100644 index 0000000..2e40915 --- /dev/null +++ b/examples/subunsub/main.cpp @@ -0,0 +1,100 @@ +#include +#include +#include + +#include "mqttclient.h" + +using namespace osdev::components::mqtt; + +std::vector vecTopics = +{ + "test/publisher/TestPublisher_0", + "test/publisher/TestPublisher_1", + "test/publisher/TestPublisher_2", + "test/publisher/TestPublisher_3", + "test/publisher/TestPublisher_4", + "test/publisher/TestPublisher_5", + "test/publisher/TestPublisher_6", + "test/publisher/TestPublisher_7", + "test/publisher/TestPublisher_8", + "test/publisher/TestPublisher_9", +}; + +MqttClient oClient("SubscriptionTest"); + +enum TIME_RES +{ + T_MICRO, + T_MILLI, + T_SECONDS +}; + +std::uint64_t getEpochUSecs() +{ + auto tsUSec = std::chrono::time_point_cast(std::chrono::system_clock::now()); + return static_cast(tsUSec.time_since_epoch().count()); +} + +void sleepcp( int number, TIME_RES resolution = T_MILLI ) // Cross-platform sleep function +{ + int factor = 0; // Should not happen.. + + switch( resolution ) + { + case T_MICRO: + factor = 1; + break; + + case T_MILLI: + factor = 1000; + break; + + case T_SECONDS: + factor = 1000000; + break; + } + + usleep( number * factor ); +} + +void Subscribe() +{ + for( const auto &message_topic : vecTopics) + { + std::cout << "Subscribing to : " << message_topic << std::endl; + oClient.subscribe(message_topic, 1, [](const osdev::components::mqtt::MqttMessage &message) + { + std::cout << "Received Topic : [" << message.topic() << "] Payload : " << message.payload() << std::endl; + }); + } +} + +void Unsubscribe() +{ + for( const auto &message_topic : vecTopics) + { + std::cout << "Unsubscribing from : " << message_topic << std::endl; + oClient.unsubscribe(message_topic, 1); + } +} + +int main(int argc, char* argv[]) +{ + (void)argc; + (void)argv; + + oClient.connect("localhost", 1883, Credentials()); + + // First create all subscriptions + Subscribe(); + sleepcp(1, T_SECONDS); + + while(1) + { + Unsubscribe(); + sleepcp(1, T_SECONDS); + Subscribe(); + sleepcp(10, T_SECONDS); + } + +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 51a66ae..8235e56 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -15,31 +15,31 @@ include_directories( set(SRC_LIST ${CMAKE_CURRENT_SOURCE_DIR}/log.cpp ${CMAKE_CURRENT_SOURCE_DIR}/threadcontext.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttpublisherbase.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttsubscriberbase.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/clientpaho.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/commondefs.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/connectionstatus.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/credentials.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/errorcode.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/token.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/ihistogram.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/timemeasurement.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttidgenerator.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqtttypeconverter.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttutil.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttmessage.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttclient.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttfailure.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/mqttsuccess.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/imqttclientimpl.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/istatecallback.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/scopeguard.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/serverstate.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/sharedreaderlock.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/stringutils.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/uriparser.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttpublisherbase.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttsubscriberbase.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/clientpaho.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/commondefs.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/connectionstatus.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/credentials.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/errorcode.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/token.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ihistogram.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/timemeasurement.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttidgenerator.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqtttypeconverter.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttutil.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttmessage.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttclient.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttfailure.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mqttsuccess.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/imqttclientimpl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/istatecallback.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/scopeguard.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/serverstate.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/sharedreaderlock.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/stringutils.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/uriparser.cpp ) include(library) diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 9b492c3..3228ab5 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -515,6 +515,19 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) break; } } + + if(!found) // Probably not found in subscriptions, also check the pendings. + { + for( const auto &s : m_pendingSubscriptions ) + { + if( topic == s.first && qos == s.second.qos ) + { + found = true; + break; + } + } + } + if( !found ) { return -1; diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index 5618b4b..a4d2564 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -313,6 +313,7 @@ std::set MqttClient::unsubscribe(const std::string& topic, int qos) // Throw (MqttException, "Not connected"); return std::set(); } + clients.push_back(m_principalClient.get()); for (const auto& c : m_additionalClients) {