Commit a19279232f12b3d0c946d46510c823b9eddb1a41

Authored by Peter M. Groen
2 parents d4d38789 1cfa5165

Merge branch 'fix/pgroen/error-handling-improvement' into 'master'

Fix/pgroen/error handling improvement

Added logging and sensible return points if a problem occurs.
After merge :
* it will be tagged as version 0.9.4
* it will be integrated into the Main IPZC SDK ( Aarch64 ) @ Priva

See merge request !7
CMakeLists.txt
1 1 cmake_minimum_required(VERSION 3.0)
2 2 project(osdev_mqtt)
3   -# ==============================================================================
4   -# Check to see if we're a submodule or top-repo.
5   -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
6   - message( STATUS "Looks like we're a single module" )
7   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
8   -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../cmake)
9   - message( STATUS "Looks like we're a submodule" )
10   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../cmake)
11   -else()
12   - message( FATAL_ERROR "No cmake directory found. Did you run the submodules script?" )
13   -endif()
14   -
15   -# ==============================================================================
16   -# Check to see if there is versioning information available
17   -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/versioning)
18   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/versioning/cmake)
19   -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../versioning)
20   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../versioning/cmake)
21   -else()
22   - message( FATAL_ERROR "No ${CURRENT_SOURCE_DIR}/osdev_versioning directory found. Did you run the submodules script?" )
23   -endif()
  3 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
24 4  
25 5 # ==============================================================================
26 6 # = Include build information
27   -include(osdevversion)
28 7 include(projectheader)
29   -
30 8 project_header(osdev_mqtt)
31 9  
  10 +add_subdirectory(submodules/logger)
32 11 add_subdirectory(src)
33 12 add_subdirectory(examples/pub)
34 13 add_subdirectory(examples/sub)
... ...
examples/pub/CMakeLists.txt
1 1 cmake_minimum_required(VERSION 3.0)
2   -LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake)
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
3 3  
4 4 include(projectheader)
5 5 project_header(test_mqtt_pub)
... ... @@ -27,7 +27,7 @@ target_link_libraries(
27 27 set_target_properties( ${PROJECT_NAME} PROPERTIES
28 28 RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
29 29 LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
30   - ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/archive
  30 + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
31 31 )
32 32  
33 33 include(installation)
... ...
examples/sub/CMakeLists.txt
1 1 cmake_minimum_required(VERSION 3.0)
2   -LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake)
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
3 3  
4 4 include(projectheader)
5 5 project_header(test_mqtt_sub)
... ...
scripts/setup_submodules
... ... @@ -3,7 +3,9 @@
3 3 # ===============================================
4 4 # == Setting some environment variables
5 5 # ===============================================
6   -GIT_URL_SUBS="http://gitlab.osdev.nl/open_source"
  6 +GIT_URL_OPEN="http://gitlab.osdev.nl/open_source"
  7 +GIT_URL_CLOSED="git@gitlab.osdev.nl:closed_source"
  8 +
7 9 FUNC_RESULT="-1"
8 10  
9 11 # Name : print_usage_exit()
... ... @@ -98,12 +100,19 @@ function read_submodules()
98 100 function add_submodules()
99 101 {
100 102 echo -e "Adding SubModule(s)."
101   - for SUB_MODULE in ${SUB_MODULES}
  103 + for SUB_MODULE in ${SUB_MODULES_OPEN}
102 104 do
103   - echo -e "< ${SUB_MODULE} >"
104   - git submodule add -f ${GIT_URL_SUBS}/${SUB_MODULE}.git ${SUB_MODULE}
105   - git config submodule.${SUB_MODULE}.url ${GIT_URL_SUBS}/${SUB_MODULE}.git
  105 + git submodule add -f ${GIT_URL_OPEN}/${SUB_MODULE}.git submodules/${SUB_MODULE}
  106 + git config submodule.${SUB_MODULE}.url ${GIT_URL_OPEN}/${SUB_MODULE}.git
  107 + done
  108 +
  109 + for SUB_MODULE in ${SUB_MODULES_CLOSED}
  110 + do
  111 + echo {GIT_URL_CLOSED}/${SUB_MODULE}.git
  112 + git submodule add -f ${GIT_URL_CLOSED}/${SUB_MODULE}.git submodules/${SUB_MODULE}
  113 + git config submodule.${SUB_MODULE}.url ${GIT_URL_CLOSED}/${SUB_MODULE}.git
106 114 done
  115 +
107 116 }
108 117  
109 118 # Name : get_submodules
... ...
scripts/submodules.list
1   -SUB_MODULES="versioning
2   -cmake"
  1 +SUB_MODULES_OPEN="cmake
  2 +logger"
  3 +
  4 +SUB_MODULES_CLOSED=""
... ...
src/CMakeLists.txt
1 1 cmake_minimum_required(VERSION 3.12)
2   -# ==============================================================================
3   -# Check to see if we're a submodule or top-repo.
4   -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../cmake)
5   - message( STATUS "Looks like we're a single module" )
6   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../cmake)
7   -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake)
8   - message( STATUS "Looks like we're a submodule" )
9   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake)
10   -else()
11   - message( FATAL_ERROR "No cmake directory found. Did you run the submodules script?" )
12   -endif()
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
13 3  
14   -# ==============================================================================
15   -# Check to see if there is versioning information available
16   -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../versioning)
17   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../versioning/cmake)
18   -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../versioning)
19   - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../versioning/cmake)
20   -else()
21   - message( FATAL_ERROR "No ${CURRENT_SOURCE_DIR}/osdev_versioning directory found. Did you run the submodules script?" )
22   -endif()
23   -# ==============================================================================
24 4 include(projectheader)
25   -
26 5 project_header(mqtt-cpp)
27 6  
28 7 find_package( Boost REQUIRED COMPONENTS regex )
... ... @@ -31,6 +10,7 @@ include(compiler)
31 10  
32 11 include_directories(
33 12 ${CMAKE_SOURCE_DIR}/include
  13 + ${CMAKE_SOURCE_DIR}/submodules/logger/src
34 14 )
35 15  
36 16 set(SRC_LIST
... ... @@ -67,6 +47,7 @@ add_libraries(
67 47 Boost::boost
68 48 Boost::regex
69 49 paho-mqtt3a
  50 + ${CMAKE_SOURCE_DIR}/build/lib/liblogger.a
70 51 )
71 52  
72 53 include(installation)
... ...
src/mqttclient.cpp
... ... @@ -21,21 +21,24 @@
21 21 * ***************************************************************************/
22 22 #include "mqttclient.h"
23 23  
  24 +// osdev::components::logger
  25 +#include "log.h"
  26 +
24 27 // osdev::components::mqtt
25 28 #include "clientpaho.h"
26 29 #include "mqttutil.h"
27 30 #include "mqttidgenerator.h"
28 31 #include "mqtttypeconverter.h"
29   -
30   -// mlogic::common
31 32 #include "lockguard.h"
32 33 #include "uriparser.h"
33 34  
34 35 // std
35 36 #include <numeric>
36 37 #include <iostream>
  38 +#include <string>
37 39  
38 40 using namespace osdev::components::mqtt;
  41 +using namespace osdev::components::log;
39 42  
40 43 namespace {
41 44 /**
... ... @@ -63,28 +66,30 @@ MqttClient::MqttClient(const std::string&amp; _clientId, const std::function&lt;void(co
63 66 , m_principalClient()
64 67 , m_additionalClients()
65 68 , m_eventQueue(_clientId)
66   - , m_workerThread(std::thread(&MqttClient::eventHandler, this))
  69 + , m_workerThread( std::thread( &MqttClient::eventHandler, this ) )
67 70 {
  71 + Log::init( "mqtt-library" );
  72 + LogInfo( "MQTT Client started", "[MqttClient::MqttClient]");
68 73 }
69 74  
70 75 MqttClient::~MqttClient()
71 76 {
72   - // DebugLogToFIle ("MqttClient", "%1 - dtor", m_clientId);
73 77 {
74   - // DebugLogToFIle ("MqttClient", "%1 - disconnect", m_clientId);
  78 + // LogDebug( "MqttClient", std::string( m_clientId + " - disconnect" ) );
75 79 this->disconnect();
76 80 decltype(m_principalClient) principalClient{};
77 81  
78 82 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
79   - // DebugLogToFIle ("MqttClient", "%1 - cleanup principal client", m_clientId);
  83 + LogDebug( "MqttClient", std::string( m_clientId + " - cleanup principal client" ) );
80 84 m_principalClient.swap(principalClient);
81 85 }
82   - // DebugLogToFIle ("MqttClient", "%1 - dtor stop queue", m_clientId);
  86 +
  87 + LogDebug( "MqttClient", std::string( m_clientId + " - dtor stop queue" ) );
83 88 m_eventQueue.stop();
84 89 if (m_workerThread.joinable()) {
85 90 m_workerThread.join();
86 91 }
87   - // DebugLogToFIle ("MqttClient", "%1 - dtor ready", m_clientId);
  92 + LogDebug( "MqttClient", std::string( m_clientId + " - dtor ready" ) );
88 93 }
89 94  
90 95 std::string MqttClient::clientId() const
... ... @@ -135,7 +140,8 @@ void MqttClient::connect(const std::string&amp; host, int port, const Credentials&amp; c
135 140  
136 141 void MqttClient::connect(const std::string& _endpoint)
137 142 {
138   - // InfoLogToFIle ("MqttClient", "%1 - Request connect", m_clientId);
  143 + LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) );
  144 +
139 145 OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex);
140 146 IMqttClientImpl* client(nullptr);
141 147 {
... ... @@ -148,8 +154,8 @@ void MqttClient::connect(const std::string&amp; _endpoint)
148 154 }
149 155 else
150 156 {
151   - // ErrorLogToFIle ("MqttClient", "%1 - Cannot connect to different endpoint. Disconnect first.", m_clientId);
152   - // Normally a throw (Yuck!) (MqttException, "Cannot connect while already connected");
  157 + LogError( "MqttClient", std::string( m_clientId + " - Cannot connect to different endpoint. Disconnect first." ) );
  158 + return;
153 159 }
154 160 }
155 161 m_endpoint = _endpoint;
... ... @@ -168,26 +174,31 @@ void MqttClient::connect(const std::string&amp; _endpoint)
168 174  
169 175 void MqttClient::disconnect()
170 176 {
171   - // InfoLogToFIle ("MqttClient", "%1 - Request disconnect", m_clientId);
  177 + LogInfo( "MqttClient", std::string( m_clientId + " - Request disconnect" ) );
172 178 OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex);
173 179  
174 180 decltype(m_additionalClients) additionalClients{};
175 181 std::vector<IMqttClientImpl*> clients{};
176 182 {
177 183 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
178   - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected || m_principalClient->connectionStatus() == ConnectionStatus::DisconnectInProgress) {
  184 + if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected || m_principalClient->connectionStatus() == ConnectionStatus::DisconnectInProgress)
  185 + {
  186 + LogDebug( "MqttClient", std::string( m_clientId + " - Principal client not connected" ) );
179 187 return;
180 188 }
181   - m_additionalClients.swap(additionalClients);
  189 + m_additionalClients.swap( additionalClients );
182 190  
183   - for (const auto& c : additionalClients) {
184   - clients.push_back(c.get());
  191 + for (const auto& c : additionalClients)
  192 + {
  193 + clients.push_back( c.get() );
185 194 }
186   - clients.push_back(m_principalClient.get());
  195 + clients.push_back( m_principalClient.get() );
187 196 }
188 197  
189   - // DebugLogToFIle ("MqttClient", "%1 - Unsubscribe and disconnect clients", m_clientId);
190   - for (auto& cl : clients) {
  198 +
  199 + LogDebug( "MqttClient", std::string( m_clientId + " - Unsubscribe and disconnect clients" ) );
  200 + for ( auto& cl : clients )
  201 + {
191 202 cl->unsubscribeAll();
192 203 }
193 204 this->waitForCompletionInternal(clients, std::chrono::milliseconds(2000), std::set<Token>{});
... ... @@ -211,15 +222,16 @@ Token MqttClient::publish(const MqttMessage&amp; message, int qos)
211 222 {
212 223 if( !m_principalClient )
213 224 {
214   - std::cout << "Principal client not initialized" << std::endl;
  225 + LogInfo( "[MqttClient::publish]", std::string( "Principal client not initialized") );
215 226 }
216 227  
217 228 if( m_principalClient->connectionStatus() == ConnectionStatus::Disconnected )
218 229 {
219 230 std::cout << "Unable to publish, not connected.." << std::endl;
220 231 }
221   - // ErrorLogToFIle ("MqttClient", "%1 - Unable to publish, not connected", m_clientId);
222   - // Throw (MqttException, "Not connected");
  232 + LogError("MqttClient", std::string( m_clientId + " - Unable to publish, not connected" ) );
  233 +
  234 + return Token(m_clientId, -1);
223 235 }
224 236 client = m_principalClient.get();
225 237 }
... ... @@ -228,7 +240,7 @@ Token MqttClient::publish(const MqttMessage&amp; message, int qos)
228 240  
229 241 Token MqttClient::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage)>& cb)
230 242 {
231   - // DebugLogToFIle ("MqttClient", "%1 - Subscribe to topic %2 with qos %3", m_clientId, topic, qos);
  243 + LogDebug( "[MqttClient::subscribe]", std::string( m_clientId + " - Subscribe to topic " + topic ) );
232 244 // OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex);
233 245 bool clientFound = false;
234 246 IMqttClientImpl* client(nullptr);
... ... @@ -236,8 +248,9 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
236 248 // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
237 249 if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected)
238 250 {
239   - // ErrorLogToFIle ("MqttClient", "%1 - Unable to subscribe, not connected", m_clientId);
  251 + LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) );
240 252 // throw (?)(MqttException, "Not connected");
  253 + return Token(m_clientId, -1);
241 254 }
242 255 if (!m_principalClient->isOverlapping(topic)) {
243 256 client = m_principalClient.get();
... ... @@ -253,7 +266,7 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
253 266 }
254 267 }
255 268 if (!clientFound) {
256   - // DebugLogToFIle ("MqttClient", "%1 - Creating new ClientPaho instance for subscription on topic %2", m_clientId, topic);
  269 + LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) );
257 270 std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1.
258 271 m_additionalClients.emplace_back(std::make_unique<ClientPaho>(
259 272 m_endpoint,
... ... @@ -271,14 +284,15 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
271 284  
272 285 std::set<Token> MqttClient::unsubscribe(const std::string& topic, int qos)
273 286 {
274   - // DebugLogToFIle ("MqttClient", "%1 - Unsubscribe from topic %2 with qos %3", m_clientId, topic, qos);
  287 + LogDebug("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unsubscribe from topic " + topic ) );
275 288 OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex);
276 289 std::vector<IMqttClientImpl*> clients{};
277 290 {
278 291 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
279 292 if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) {
280   - // ErrorLogToFIle ("MqttClient", "%1 - Unable to unsubscribe, not connected", m_clientId);
  293 + LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) );
281 294 // Throw (MqttException, "Not connected");
  295 + return std::set<Token>();
282 296 }
283 297 clients.push_back(m_principalClient.get());
284 298 for (const auto& c : m_additionalClients) {
... ... @@ -359,9 +373,7 @@ std::string MqttClient::endpoint() const
359 373  
360 374 void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus cs)
361 375 {
362   - (void)id;
363   - (void)cs;
364   - // DebugLogToFIle ("MqttClient", "%1 - connection status of wrapped client %2 changed to %3", m_clientId, id, cs);
  376 + LogDebug("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - connection status of wrapped client " + id + " changed to " + std::to_string( static_cast<int>(cs) ) ) );
365 377 IMqttClientImpl* principalClient{ nullptr };
366 378 std::vector<IMqttClientImpl*> clients{};
367 379 std::vector<ConnectionStatus> connectionStates{};
... ... @@ -392,7 +404,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
392 404 cl->resubscribe();
393 405 }
394 406 catch (const std::exception& e) {
395   - // ErrorLogToFIle ("MqttClient", "%1 - resubscribe on wrapped client %2 in context of connection status change in wrapped client %3 failed : %4", m_clientId, cl->clientId(), id, e.what());
  407 + LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) );
396 408 }
397 409 }
398 410 m_activeTokensCV.notify_all();
... ... @@ -407,7 +419,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
407 419 auto waitFor = std::chrono::milliseconds(1000);
408 420 if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) {
409 421 if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) {
410   - // WarnLogToFIle ("MqttClient", "%1 - subscriptions are not recovered within timeout.", m_clientId)
  422 + LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) );
411 423 }
412 424 }
413 425 if (principalClient) {
... ... @@ -415,7 +427,7 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
415 427 principalClient->publishPending();
416 428 }
417 429 catch (const std::exception& e) {
418   - // ErrorLogToFIle ("MqttClient", "%1 - publishPending on wrapped client %2 failed : %3", m_clientId, principalClient->clientId(), e.what());
  430 + LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
419 431 }
420 432 }
421 433 }
... ... @@ -434,7 +446,7 @@ void MqttClient::deliveryComplete(const std::string&amp; _clientId, std::int32_t tok
434 446 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
435 447 if (!m_activeTokens.insert(t).second) {
436 448 // This should not happen. This means that some callback on the wrapper never came.
437   - // ErrorLogToFIle ("MqttClient", "%1 -deliveryComplete, token %1 is already active", m_clientId, t);
  449 + LogDebug("[MqttClient::deliveryComplete]", std::string( m_clientId + " - deliveryComplete, token is already active" ) );
438 450 }
439 451 }
440 452 this->pushEvent([this, t]() {
... ... @@ -531,8 +543,9 @@ void MqttClient::pushEvent(std::function&lt;void()&gt; ev)
531 543  
532 544 void MqttClient::eventHandler()
533 545 {
534   - // InfoLogToFIle ("MqttClient", "%1 - starting event handler", m_clientId);
535   - for (;;) {
  546 + LogInfo("[MqttClient::eventHandler]", std::string( m_clientId + " - starting event handler." ) );
  547 + for (;;)
  548 + {
536 549 std::vector<std::function<void()>> events;
537 550 if (!m_eventQueue.pop(events))
538 551 {
... ... @@ -543,5 +556,5 @@ void MqttClient::eventHandler()
543 556 ev();
544 557 }
545 558 }
546   - // InfoLogToFIle ("MqttClient", "%1 - leaving event handler", m_clientId);
  559 + LogInfo("[MqttClient::eventHandler]", std::string( m_clientId + " - leaving event handler." ) );
547 560 }
... ...