Commit 2569446f0a0768735feea538f102642d27c69424

Authored by Peter M. Groen
1 parent 8f48be6b

Added extra subscription checks to prevent double connections

CMakeLists.txt
... ... @@ -11,6 +11,7 @@ add_subdirectory(src)
11 11 add_subdirectory(examples/connect)
12 12 add_subdirectory(examples/pub)
13 13 add_subdirectory(examples/sub)
  14 +add_subdirectory(examples/subunsub)
14 15  
15 16 include(packaging)
16 17 package_component()
... ...
examples/sub/main.cpp
... ... @@ -27,6 +27,8 @@
27 27  
28 28 #include "subscriber.h"
29 29  
  30 +const int MAX_LOOP_COUNT = 10;
  31 +
30 32 enum TIME_RES
31 33 {
32 34 T_MICRO,
... ... @@ -65,6 +67,7 @@ void sleepcp( int number, TIME_RES resolution = T_MILLI ) // Cross-platform s
65 67  
66 68 int main( int argc, char* argv[] )
67 69 {
  70 + int loop_counter = 0;
68 71 // Satisfy the compiler
69 72 (void)argc;
70 73 (void)argv;
... ... @@ -79,14 +82,32 @@ int main( int argc, char* argv[] )
79 82 pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." );
80 83  
81 84 std::cout << "Subscribing to the test-topic....." << std::endl;
82   - pSubscriber->subscribe( "test/publisher/#" );
  85 + pSubscriber->subscribe( "test/publisher/TestPublisher_0" );
  86 + pSubscriber->subscribe( "test/publisher/TestPublisher_1" );
  87 + pSubscriber->subscribe( "test/publisher/TestPublisher_2" );
  88 + pSubscriber->subscribe( "test/publisher/TestPublisher_3" );
83 89  
84 90 // Start a loop to give the subscriber the possibility to do its work.
85   - while( 1 )
  91 + while (loop_counter < MAX_LOOP_COUNT)
86 92 {
87 93 sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
88 94 std::cout << ".";
  95 + loop_counter++;
  96 + }
  97 +
  98 + std::cout << "Unsubscribing from test/publisher/#" << std::endl;
  99 +
  100 + pSubscriber->unsubscribe("test/publisher/#" );
  101 + pSubscriber->unsubscribe( "test/publisher/TestPublisher_0" );
  102 + pSubscriber->unsubscribe( "test/publisher/TestPublisher_1" );
  103 + pSubscriber->unsubscribe( "test/publisher/TestPublisher_2" );
  104 +
  105 + while (1)
  106 + {
  107 + sleepcp(1, T_MILLI); // Sleep 1 Sec to give the scheduler the change to interfene.
  108 + std::cout << ".";
89 109 }
  110 +
90 111 }
91 112 else
92 113 {
... ...
examples/sub/subscriber.cpp
... ... @@ -43,6 +43,11 @@ void Subscriber::subscribe( const std::string &amp;message_topic )
43 43 });
44 44 }
45 45  
  46 +void Subscriber::unsubscribe( const std::string &message_topic )
  47 +{
  48 + m_mqtt_client.unsubscribe( message_topic, 1 );
  49 +}
  50 +
46 51 void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload )
47 52 {
48 53 std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl;
... ...
examples/sub/subscriber.h
... ... @@ -41,6 +41,8 @@ public:
41 41  
42 42 void subscribe( const std::string &message_topic );
43 43  
  44 + void unsubscribe( const std::string &message_topic );
  45 +
44 46 protected:
45 47 void receive_data( const std::string &message_topic, const std::string &message_payload );
46 48  
... ...
examples/subunsub/CMakeLists.txt 0 → 100644
  1 +cmake_minimum_required(VERSION 3.0)
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
  3 +
  4 +include(projectheader)
  5 +project_header(test_subunsub)
  6 +
  7 +include_directories( SYSTEM
  8 + ${CMAKE_CURRENT_SOURCE_DIR}/../../include
  9 +)
  10 +
  11 +include(compiler)
  12 +set(SRC_LIST
  13 + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
  14 +)
  15 +
  16 +add_executable( ${PROJECT_NAME}
  17 + ${SRC_LIST}
  18 +)
  19 +
  20 +target_link_libraries(
  21 + ${PROJECT_NAME}
  22 + mqtt-cpp
  23 +)
  24 +
  25 +set_target_properties( ${PROJECT_NAME} PROPERTIES
  26 + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
  27 + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  28 + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  29 +)
  30 +
  31 +include(installation)
  32 +install_application()
... ...
examples/subunsub/main.cpp 0 → 100644
  1 +#include <iostream>
  2 +#include <string>
  3 +#include <vector>
  4 +
  5 +#include "mqttclient.h"
  6 +
  7 +using namespace osdev::components::mqtt;
  8 +
  9 +std::vector<std::string> vecTopics =
  10 +{
  11 + "test/publisher/TestPublisher_0",
  12 + "test/publisher/TestPublisher_1",
  13 + "test/publisher/TestPublisher_2",
  14 + "test/publisher/TestPublisher_3",
  15 + "test/publisher/TestPublisher_4",
  16 + "test/publisher/TestPublisher_5",
  17 + "test/publisher/TestPublisher_6",
  18 + "test/publisher/TestPublisher_7",
  19 + "test/publisher/TestPublisher_8",
  20 + "test/publisher/TestPublisher_9",
  21 +};
  22 +
  23 +MqttClient oClient("SubscriptionTest");
  24 +
  25 +enum TIME_RES
  26 +{
  27 + T_MICRO,
  28 + T_MILLI,
  29 + T_SECONDS
  30 +};
  31 +
  32 +std::uint64_t getEpochUSecs()
  33 +{
  34 + auto tsUSec = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now());
  35 + return static_cast<std::uint64_t>(tsUSec.time_since_epoch().count());
  36 +}
  37 +
  38 +void sleepcp( int number, TIME_RES resolution = T_MILLI ) // Cross-platform sleep function
  39 +{
  40 + int factor = 0; // Should not happen..
  41 +
  42 + switch( resolution )
  43 + {
  44 + case T_MICRO:
  45 + factor = 1;
  46 + break;
  47 +
  48 + case T_MILLI:
  49 + factor = 1000;
  50 + break;
  51 +
  52 + case T_SECONDS:
  53 + factor = 1000000;
  54 + break;
  55 + }
  56 +
  57 + usleep( number * factor );
  58 +}
  59 +
  60 +void Subscribe()
  61 +{
  62 + for( const auto &message_topic : vecTopics)
  63 + {
  64 + std::cout << "Subscribing to : " << message_topic << std::endl;
  65 + oClient.subscribe(message_topic, 1, [](const osdev::components::mqtt::MqttMessage &message)
  66 + {
  67 + std::cout << "Received Topic : [" << message.topic() << "] Payload : " << message.payload() << std::endl;
  68 + });
  69 + }
  70 +}
  71 +
  72 +void Unsubscribe()
  73 +{
  74 + for( const auto &message_topic : vecTopics)
  75 + {
  76 + std::cout << "Unsubscribing from : " << message_topic << std::endl;
  77 + oClient.unsubscribe(message_topic, 1);
  78 + }
  79 +}
  80 +
  81 +int main(int argc, char* argv[])
  82 +{
  83 + (void)argc;
  84 + (void)argv;
  85 +
  86 + oClient.connect("localhost", 1883, Credentials());
  87 +
  88 + // First create all subscriptions
  89 + Subscribe();
  90 + sleepcp(1, T_SECONDS);
  91 +
  92 + while(1)
  93 + {
  94 + Unsubscribe();
  95 + sleepcp(1, T_SECONDS);
  96 + Subscribe();
  97 + sleepcp(10, T_SECONDS);
  98 + }
  99 +
  100 +}
... ...
src/CMakeLists.txt
... ... @@ -15,31 +15,31 @@ include_directories(
15 15 set(SRC_LIST
16 16 ${CMAKE_CURRENT_SOURCE_DIR}/log.cpp
17 17 ${CMAKE_CURRENT_SOURCE_DIR}/threadcontext.cpp
18   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttpublisherbase.cpp
19   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttsubscriberbase.cpp
20   - ${CMAKE_CURRENT_SOURCE_DIR}/clientpaho.cpp
21   - ${CMAKE_CURRENT_SOURCE_DIR}/commondefs.cpp
22   - ${CMAKE_CURRENT_SOURCE_DIR}/connectionstatus.cpp
23   - ${CMAKE_CURRENT_SOURCE_DIR}/credentials.cpp
24   - ${CMAKE_CURRENT_SOURCE_DIR}/errorcode.cpp
25   - ${CMAKE_CURRENT_SOURCE_DIR}/token.cpp
26   - ${CMAKE_CURRENT_SOURCE_DIR}/ihistogram.cpp
27   - ${CMAKE_CURRENT_SOURCE_DIR}/timemeasurement.cpp
28   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttidgenerator.cpp
29   - ${CMAKE_CURRENT_SOURCE_DIR}/mqtttypeconverter.cpp
30   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttutil.cpp
31   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttmessage.cpp
32   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttclient.cpp
33   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttfailure.cpp
34   - ${CMAKE_CURRENT_SOURCE_DIR}/mqttsuccess.cpp
35   - ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
36   - ${CMAKE_CURRENT_SOURCE_DIR}/imqttclientimpl.cpp
37   - ${CMAKE_CURRENT_SOURCE_DIR}/istatecallback.cpp
38   - ${CMAKE_CURRENT_SOURCE_DIR}/scopeguard.cpp
39   - ${CMAKE_CURRENT_SOURCE_DIR}/serverstate.cpp
40   - ${CMAKE_CURRENT_SOURCE_DIR}/sharedreaderlock.cpp
41   - ${CMAKE_CURRENT_SOURCE_DIR}/stringutils.cpp
42   - ${CMAKE_CURRENT_SOURCE_DIR}/uriparser.cpp
  18 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttpublisherbase.cpp
  19 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttsubscriberbase.cpp
  20 + ${CMAKE_CURRENT_SOURCE_DIR}/clientpaho.cpp
  21 + ${CMAKE_CURRENT_SOURCE_DIR}/commondefs.cpp
  22 + ${CMAKE_CURRENT_SOURCE_DIR}/connectionstatus.cpp
  23 + ${CMAKE_CURRENT_SOURCE_DIR}/credentials.cpp
  24 + ${CMAKE_CURRENT_SOURCE_DIR}/errorcode.cpp
  25 + ${CMAKE_CURRENT_SOURCE_DIR}/token.cpp
  26 + ${CMAKE_CURRENT_SOURCE_DIR}/ihistogram.cpp
  27 + ${CMAKE_CURRENT_SOURCE_DIR}/timemeasurement.cpp
  28 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttidgenerator.cpp
  29 + ${CMAKE_CURRENT_SOURCE_DIR}/mqtttypeconverter.cpp
  30 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttutil.cpp
  31 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttmessage.cpp
  32 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttclient.cpp
  33 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttfailure.cpp
  34 + ${CMAKE_CURRENT_SOURCE_DIR}/mqttsuccess.cpp
  35 + ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
  36 + ${CMAKE_CURRENT_SOURCE_DIR}/imqttclientimpl.cpp
  37 + ${CMAKE_CURRENT_SOURCE_DIR}/istatecallback.cpp
  38 + ${CMAKE_CURRENT_SOURCE_DIR}/scopeguard.cpp
  39 + ${CMAKE_CURRENT_SOURCE_DIR}/serverstate.cpp
  40 + ${CMAKE_CURRENT_SOURCE_DIR}/sharedreaderlock.cpp
  41 + ${CMAKE_CURRENT_SOURCE_DIR}/stringutils.cpp
  42 + ${CMAKE_CURRENT_SOURCE_DIR}/uriparser.cpp
43 43 )
44 44  
45 45 include(library)
... ...
src/clientpaho.cpp
... ... @@ -515,6 +515,19 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
515 515 break;
516 516 }
517 517 }
  518 +
  519 + if(!found) // Probably not found in subscriptions, also check the pendings.
  520 + {
  521 + for( const auto &s : m_pendingSubscriptions )
  522 + {
  523 + if( topic == s.first && qos == s.second.qos )
  524 + {
  525 + found = true;
  526 + break;
  527 + }
  528 + }
  529 + }
  530 +
518 531 if( !found )
519 532 {
520 533 return -1;
... ...
src/mqttclient.cpp
... ... @@ -313,6 +313,7 @@ std::set&lt;Token&gt; MqttClient::unsubscribe(const std::string&amp; topic, int qos)
313 313 // Throw (MqttException, "Not connected");
314 314 return std::set<Token>();
315 315 }
  316 +
316 317 clients.push_back(m_principalClient.get());
317 318 for (const auto& c : m_additionalClients)
318 319 {
... ...