Merged
Merge Request #7
·
created by
Fix/pgroen/error handling improvement
Added logging and sensible return points if a problem occurs. After merge :
- it will be tagged as version 0.9.4
- it will be integrated into the Main IPZC SDK ( Aarch64 ) @ Priva
-
the 'test' directory does not seem to hold any substance. I'd say it can be removed for now.
-
is there any reason for using the for(;;) construction over while(true) besides compiler satisfaction?
-
It is in essence the same as a while( true ) -loop. As a for-clause has three elements :
for (initialization; condition; update) { // body of-loop }
By using it this way, it will just create an infinite loop There is not much difference as a while loop. ( compiled, they produce the exact same hex-pattern
-
can be merged after comments are resolved.
-
mentioned in commit a19279232f12b3d0c946d46510c823b9eddb1a41
-
Status changed to merged
531 | 543 | |
532 | 544 | void MqttClient::eventHandler() |
533 | 545 | { |
534 | - // InfoLogToFIle ("MqttClient", "%1 - starting event handler", m_clientId); | |
535 | - for (;;) { | |
546 | + LogInfo("[MqttClient::eventHandler]", std::string( m_clientId + " - starting event handler." ) ); | |
547 | + for (;;) | |
2 |
|
1 | cmake_minimum_required(VERSION 3.0) | 1 | cmake_minimum_required(VERSION 3.0) |
2 | project(osdev_mqtt) | 2 | project(osdev_mqtt) |
3 | -# ============================================================================== | ||
4 | -# Check to see if we're a submodule or top-repo. | ||
5 | -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/cmake) | ||
6 | - message( STATUS "Looks like we're a single module" ) | ||
7 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) | ||
8 | -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../cmake) | ||
9 | - message( STATUS "Looks like we're a submodule" ) | ||
10 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../cmake) | ||
11 | -else() | ||
12 | - message( FATAL_ERROR "No cmake directory found. Did you run the submodules script?" ) | ||
13 | -endif() | ||
14 | - | ||
15 | -# ============================================================================== | ||
16 | -# Check to see if there is versioning information available | ||
17 | -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/versioning) | ||
18 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/versioning/cmake) | ||
19 | -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../versioning) | ||
20 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../versioning/cmake) | ||
21 | -else() | ||
22 | - message( FATAL_ERROR "No ${CURRENT_SOURCE_DIR}/osdev_versioning directory found. Did you run the submodules script?" ) | ||
23 | -endif() | 3 | +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) |
24 | 4 | ||
25 | # ============================================================================== | 5 | # ============================================================================== |
26 | # = Include build information | 6 | # = Include build information |
27 | -include(osdevversion) | ||
28 | include(projectheader) | 7 | include(projectheader) |
29 | - | ||
30 | project_header(osdev_mqtt) | 8 | project_header(osdev_mqtt) |
31 | 9 | ||
10 | +add_subdirectory(submodules/logger) | ||
32 | add_subdirectory(src) | 11 | add_subdirectory(src) |
33 | add_subdirectory(examples/pub) | 12 | add_subdirectory(examples/pub) |
34 | add_subdirectory(examples/sub) | 13 | add_subdirectory(examples/sub) |
1 | cmake_minimum_required(VERSION 3.0) | 1 | cmake_minimum_required(VERSION 3.0) |
2 | -LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake) | 2 | +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) |
3 | 3 | ||
4 | include(projectheader) | 4 | include(projectheader) |
5 | project_header(test_mqtt_pub) | 5 | project_header(test_mqtt_pub) |
@@ -27,7 +27,7 @@ target_link_libraries( | @@ -27,7 +27,7 @@ target_link_libraries( | ||
27 | set_target_properties( ${PROJECT_NAME} PROPERTIES | 27 | set_target_properties( ${PROJECT_NAME} PROPERTIES |
28 | RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin | 28 | RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin |
29 | LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib | 29 | LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib |
30 | - ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/archive | 30 | + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib |
31 | ) | 31 | ) |
32 | 32 | ||
33 | include(installation) | 33 | include(installation) |
1 | cmake_minimum_required(VERSION 3.0) | 1 | cmake_minimum_required(VERSION 3.0) |
2 | -LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake) | 2 | +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) |
3 | 3 | ||
4 | include(projectheader) | 4 | include(projectheader) |
5 | project_header(test_mqtt_sub) | 5 | project_header(test_mqtt_sub) |
@@ -3,7 +3,9 @@ | @@ -3,7 +3,9 @@ | ||
3 | # =============================================== | 3 | # =============================================== |
4 | # == Setting some environment variables | 4 | # == Setting some environment variables |
5 | # =============================================== | 5 | # =============================================== |
6 | -GIT_URL_SUBS="http://gitlab.osdev.nl/open_source" | 6 | +GIT_URL_OPEN="http://gitlab.osdev.nl/open_source" |
7 | +GIT_URL_CLOSED="git@gitlab.osdev.nl:closed_source" | ||
8 | + | ||
7 | FUNC_RESULT="-1" | 9 | FUNC_RESULT="-1" |
8 | 10 | ||
9 | # Name : print_usage_exit() | 11 | # Name : print_usage_exit() |
@@ -98,12 +100,19 @@ function read_submodules() | @@ -98,12 +100,19 @@ function read_submodules() | ||
98 | function add_submodules() | 100 | function add_submodules() |
99 | { | 101 | { |
100 | echo -e "Adding SubModule(s)." | 102 | echo -e "Adding SubModule(s)." |
101 | - for SUB_MODULE in ${SUB_MODULES} | 103 | + for SUB_MODULE in ${SUB_MODULES_OPEN} |
102 | do | 104 | do |
103 | - echo -e "< ${SUB_MODULE} >" | ||
104 | - git submodule add -f ${GIT_URL_SUBS}/${SUB_MODULE}.git ${SUB_MODULE} | ||
105 | - git config submodule.${SUB_MODULE}.url ${GIT_URL_SUBS}/${SUB_MODULE}.git | 105 | + git submodule add -f ${GIT_URL_OPEN}/${SUB_MODULE}.git submodules/${SUB_MODULE} |
106 | + git config submodule.${SUB_MODULE}.url ${GIT_URL_OPEN}/${SUB_MODULE}.git | ||
107 | + done | ||
108 | + | ||
109 | + for SUB_MODULE in ${SUB_MODULES_CLOSED} | ||
110 | + do | ||
111 | + echo {GIT_URL_CLOSED}/${SUB_MODULE}.git | ||
112 | + git submodule add -f ${GIT_URL_CLOSED}/${SUB_MODULE}.git submodules/${SUB_MODULE} | ||
113 | + git config submodule.${SUB_MODULE}.url ${GIT_URL_CLOSED}/${SUB_MODULE}.git | ||
106 | done | 114 | done |
115 | + | ||
107 | } | 116 | } |
108 | 117 | ||
109 | # Name : get_submodules | 118 | # Name : get_submodules |
1 | cmake_minimum_required(VERSION 3.12) | 1 | cmake_minimum_required(VERSION 3.12) |
2 | -# ============================================================================== | ||
3 | -# Check to see if we're a submodule or top-repo. | ||
4 | -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../cmake) | ||
5 | - message( STATUS "Looks like we're a single module" ) | ||
6 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../cmake) | ||
7 | -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake) | ||
8 | - message( STATUS "Looks like we're a submodule" ) | ||
9 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../cmake) | ||
10 | -else() | ||
11 | - message( FATAL_ERROR "No cmake directory found. Did you run the submodules script?" ) | ||
12 | -endif() | 2 | +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) |
13 | 3 | ||
14 | -# ============================================================================== | ||
15 | -# Check to see if there is versioning information available | ||
16 | -if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../versioning) | ||
17 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../versioning/cmake) | ||
18 | -elseif(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../versioning) | ||
19 | - LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../versioning/cmake) | ||
20 | -else() | ||
21 | - message( FATAL_ERROR "No ${CURRENT_SOURCE_DIR}/osdev_versioning directory found. Did you run the submodules script?" ) | ||
22 | -endif() | ||
23 | -# ============================================================================== | ||
24 | include(projectheader) | 4 | include(projectheader) |
25 | - | ||
26 | project_header(mqtt-cpp) | 5 | project_header(mqtt-cpp) |
27 | 6 | ||
28 | find_package( Boost REQUIRED COMPONENTS regex ) | 7 | find_package( Boost REQUIRED COMPONENTS regex ) |
@@ -31,6 +10,7 @@ include(compiler) | @@ -31,6 +10,7 @@ include(compiler) | ||
31 | 10 | ||
32 | include_directories( | 11 | include_directories( |
33 | ${CMAKE_SOURCE_DIR}/include | 12 | ${CMAKE_SOURCE_DIR}/include |
13 | + ${CMAKE_SOURCE_DIR}/submodules/logger/src | ||
34 | ) | 14 | ) |
35 | 15 | ||
36 | set(SRC_LIST | 16 | set(SRC_LIST |
@@ -67,6 +47,7 @@ add_libraries( | @@ -67,6 +47,7 @@ add_libraries( | ||
67 | Boost::boost | 47 | Boost::boost |
68 | Boost::regex | 48 | Boost::regex |
69 | paho-mqtt3a | 49 | paho-mqtt3a |
50 | + ${CMAKE_SOURCE_DIR}/build/lib/liblogger.a | ||
70 | ) | 51 | ) |
71 | 52 | ||
72 | include(installation) | 53 | include(installation) |
@@ -21,21 +21,24 @@ | @@ -21,21 +21,24 @@ | ||
21 | * ***************************************************************************/ | 21 | * ***************************************************************************/ |
22 | #include "mqttclient.h" | 22 | #include "mqttclient.h" |
23 | 23 | ||
24 | +// osdev::components::logger | ||
25 | +#include "log.h" | ||
26 | + | ||
24 | // osdev::components::mqtt | 27 | // osdev::components::mqtt |
25 | #include "clientpaho.h" | 28 | #include "clientpaho.h" |
26 | #include "mqttutil.h" | 29 | #include "mqttutil.h" |
27 | #include "mqttidgenerator.h" | 30 | #include "mqttidgenerator.h" |
28 | #include "mqtttypeconverter.h" | 31 | #include "mqtttypeconverter.h" |
29 | - | ||
30 | -// mlogic::common | ||
31 | #include "lockguard.h" | 32 | #include "lockguard.h" |
32 | #include "uriparser.h" | 33 | #include "uriparser.h" |
33 | 34 | ||
34 | // std | 35 | // std |
35 | #include <numeric> | 36 | #include <numeric> |
36 | #include <iostream> | 37 | #include <iostream> |
38 | +#include <string> | ||
37 | 39 | ||
38 | using namespace osdev::components::mqtt; | 40 | using namespace osdev::components::mqtt; |
41 | +using namespace osdev::components::log; | ||
39 | 42 | ||
40 | namespace { | 43 | namespace { |
41 | /** | 44 | /** |
@@ -63,28 +66,30 @@ MqttClient::MqttClient(const std::string& _clientId, const std::function<void(co | @@ -63,28 +66,30 @@ MqttClient::MqttClient(const std::string& _clientId, const std::function<void(co | ||
63 | , m_principalClient() | 66 | , m_principalClient() |
64 | , m_additionalClients() | 67 | , m_additionalClients() |
65 | , m_eventQueue(_clientId) | 68 | , m_eventQueue(_clientId) |
66 | - , m_workerThread(std::thread(&MqttClient::eventHandler, this)) | 69 | + , m_workerThread( std::thread( &MqttClient::eventHandler, this ) ) |
67 | { | 70 | { |
71 | + Log::init( "mqtt-library" ); | ||
72 | + LogInfo( "MQTT Client started", "[MqttClient::MqttClient]"); | ||
68 | } | 73 | } |
69 | 74 | ||
70 | MqttClient::~MqttClient() | 75 | MqttClient::~MqttClient() |
71 | { | 76 | { |
72 | - // DebugLogToFIle ("MqttClient", "%1 - dtor", m_clientId); | ||
73 | { | 77 | { |
74 | - // DebugLogToFIle ("MqttClient", "%1 - disconnect", m_clientId); | 78 | + // LogDebug( "MqttClient", std::string( m_clientId + " - disconnect" ) ); |
75 | this->disconnect(); | 79 | this->disconnect(); |
76 | decltype(m_principalClient) principalClient{}; | 80 | decltype(m_principalClient) principalClient{}; |
77 | 81 | ||
78 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 82 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
79 | - // DebugLogToFIle ("MqttClient", "%1 - cleanup principal client", m_clientId); | 83 | + LogDebug( "MqttClient", std::string( m_clientId + " - cleanup principal client" ) ); |
80 | m_principalClient.swap(principalClient); | 84 | m_principalClient.swap(principalClient); |
81 | } | 85 | } |
82 | - // DebugLogToFIle ("MqttClient", "%1 - dtor stop queue", m_clientId); | 86 | + |
87 | + LogDebug( "MqttClient", std::string( m_clientId + " - dtor stop queue" ) ); | ||
83 | m_eventQueue.stop(); | 88 | m_eventQueue.stop(); |
84 | if (m_workerThread.joinable()) { | 89 | if (m_workerThread.joinable()) { |
85 | m_workerThread.join(); | 90 | m_workerThread.join(); |
86 | } | 91 | } |
87 | - // DebugLogToFIle ("MqttClient", "%1 - dtor ready", m_clientId); | 92 | + LogDebug( "MqttClient", std::string( m_clientId + " - dtor ready" ) ); |
88 | } | 93 | } |
89 | 94 | ||
90 | std::string MqttClient::clientId() const | 95 | std::string MqttClient::clientId() const |
@@ -135,7 +140,8 @@ void MqttClient::connect(const std::string& host, int port, const Credentials& c | @@ -135,7 +140,8 @@ void MqttClient::connect(const std::string& host, int port, const Credentials& c | ||
135 | 140 | ||
136 | void MqttClient::connect(const std::string& _endpoint) | 141 | void MqttClient::connect(const std::string& _endpoint) |
137 | { | 142 | { |
138 | - // InfoLogToFIle ("MqttClient", "%1 - Request connect", m_clientId); | 143 | + LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); |
144 | + | ||
139 | OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); | 145 | OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); |
140 | IMqttClientImpl* client(nullptr); | 146 | IMqttClientImpl* client(nullptr); |
141 | { | 147 | { |
@@ -148,8 +154,8 @@ void MqttClient::connect(const std::string& _endpoint) | @@ -148,8 +154,8 @@ void MqttClient::connect(const std::string& _endpoint) | ||
148 | } | 154 | } |
149 | else | 155 | else |
150 | { | 156 | { |
151 | - // ErrorLogToFIle ("MqttClient", "%1 - Cannot connect to different endpoint. Disconnect first.", m_clientId); | ||
152 | - // Normally a throw (Yuck!) (MqttException, "Cannot connect while already connected"); | 157 | + LogError( "MqttClient", std::string( m_clientId + " - Cannot connect to different endpoint. Disconnect first." ) ); |
158 | + return; | ||
153 | } | 159 | } |
154 | } | 160 | } |
155 | m_endpoint = _endpoint; | 161 | m_endpoint = _endpoint; |
@@ -168,26 +174,31 @@ void MqttClient::connect(const std::string& _endpoint) | @@ -168,26 +174,31 @@ void MqttClient::connect(const std::string& _endpoint) | ||
168 | 174 | ||
169 | void MqttClient::disconnect() | 175 | void MqttClient::disconnect() |
170 | { | 176 | { |
171 | - // InfoLogToFIle ("MqttClient", "%1 - Request disconnect", m_clientId); | 177 | + LogInfo( "MqttClient", std::string( m_clientId + " - Request disconnect" ) ); |
172 | OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); | 178 | OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); |
173 | 179 | ||
174 | decltype(m_additionalClients) additionalClients{}; | 180 | decltype(m_additionalClients) additionalClients{}; |
175 | std::vector<IMqttClientImpl*> clients{}; | 181 | std::vector<IMqttClientImpl*> clients{}; |
176 | { | 182 | { |
177 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 183 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
178 | - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected || m_principalClient->connectionStatus() == ConnectionStatus::DisconnectInProgress) { | 184 | + if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected || m_principalClient->connectionStatus() == ConnectionStatus::DisconnectInProgress) |
185 | + { | ||
186 | + LogDebug( "MqttClient", std::string( m_clientId + " - Principal client not connected" ) ); | ||
179 | return; | 187 | return; |
180 | } | 188 | } |
181 | - m_additionalClients.swap(additionalClients); | 189 | + m_additionalClients.swap( additionalClients ); |
182 | 190 | ||
183 | - for (const auto& c : additionalClients) { | ||
184 | - clients.push_back(c.get()); | 191 | + for (const auto& c : additionalClients) |
192 | + { | ||
193 | + clients.push_back( c.get() ); | ||
185 | } | 194 | } |
186 | - clients.push_back(m_principalClient.get()); | 195 | + clients.push_back( m_principalClient.get() ); |
187 | } | 196 | } |
188 | 197 | ||
189 | - // DebugLogToFIle ("MqttClient", "%1 - Unsubscribe and disconnect clients", m_clientId); | ||
190 | - for (auto& cl : clients) { | 198 | + |
199 | + LogDebug( "MqttClient", std::string( m_clientId + " - Unsubscribe and disconnect clients" ) ); | ||
200 | + for ( auto& cl : clients ) | ||
201 | + { | ||
191 | cl->unsubscribeAll(); | 202 | cl->unsubscribeAll(); |
192 | } | 203 | } |
193 | this->waitForCompletionInternal(clients, std::chrono::milliseconds(2000), std::set<Token>{}); | 204 | this->waitForCompletionInternal(clients, std::chrono::milliseconds(2000), std::set<Token>{}); |
@@ -211,15 +222,16 @@ Token MqttClient::publish(const MqttMessage& message, int qos) | @@ -211,15 +222,16 @@ Token MqttClient::publish(const MqttMessage& message, int qos) | ||
211 | { | 222 | { |
212 | if( !m_principalClient ) | 223 | if( !m_principalClient ) |
213 | { | 224 | { |
214 | - std::cout << "Principal client not initialized" << std::endl; | 225 | + LogInfo( "[MqttClient::publish]", std::string( "Principal client not initialized") ); |
215 | } | 226 | } |
216 | 227 | ||
217 | if( m_principalClient->connectionStatus() == ConnectionStatus::Disconnected ) | 228 | if( m_principalClient->connectionStatus() == ConnectionStatus::Disconnected ) |
218 | { | 229 | { |
219 | std::cout << "Unable to publish, not connected.." << std::endl; | 230 | std::cout << "Unable to publish, not connected.." << std::endl; |
220 | } | 231 | } |
221 | - // ErrorLogToFIle ("MqttClient", "%1 - Unable to publish, not connected", m_clientId); | ||
222 | - // Throw (MqttException, "Not connected"); | 232 | + LogError("MqttClient", std::string( m_clientId + " - Unable to publish, not connected" ) ); |
233 | + | ||
234 | + return Token(m_clientId, -1); | ||
223 | } | 235 | } |
224 | client = m_principalClient.get(); | 236 | client = m_principalClient.get(); |
225 | } | 237 | } |
@@ -228,7 +240,7 @@ Token MqttClient::publish(const MqttMessage& message, int qos) | @@ -228,7 +240,7 @@ Token MqttClient::publish(const MqttMessage& message, int qos) | ||
228 | 240 | ||
229 | Token MqttClient::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage)>& cb) | 241 | Token MqttClient::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage)>& cb) |
230 | { | 242 | { |
231 | - // DebugLogToFIle ("MqttClient", "%1 - Subscribe to topic %2 with qos %3", m_clientId, topic, qos); | 243 | + LogDebug( "[MqttClient::subscribe]", std::string( m_clientId + " - Subscribe to topic " + topic ) ); |
232 | // OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); | 244 | // OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); |
233 | bool clientFound = false; | 245 | bool clientFound = false; |
234 | IMqttClientImpl* client(nullptr); | 246 | IMqttClientImpl* client(nullptr); |
@@ -236,8 +248,9 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | @@ -236,8 +248,9 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | ||
236 | // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 248 | // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
237 | if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) | 249 | if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) |
238 | { | 250 | { |
239 | - // ErrorLogToFIle ("MqttClient", "%1 - Unable to subscribe, not connected", m_clientId); | 251 | + LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) ); |
240 | // throw (?)(MqttException, "Not connected"); | 252 | // throw (?)(MqttException, "Not connected"); |
253 | + return Token(m_clientId, -1); | ||
241 | } | 254 | } |
242 | if (!m_principalClient->isOverlapping(topic)) { | 255 | if (!m_principalClient->isOverlapping(topic)) { |
243 | client = m_principalClient.get(); | 256 | client = m_principalClient.get(); |
@@ -253,7 +266,7 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | @@ -253,7 +266,7 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | ||
253 | } | 266 | } |
254 | } | 267 | } |
255 | if (!clientFound) { | 268 | if (!clientFound) { |
256 | - // DebugLogToFIle ("MqttClient", "%1 - Creating new ClientPaho instance for subscription on topic %2", m_clientId, topic); | 269 | + LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); |
257 | std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1. | 270 | std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1. |
258 | m_additionalClients.emplace_back(std::make_unique<ClientPaho>( | 271 | m_additionalClients.emplace_back(std::make_unique<ClientPaho>( |
259 | m_endpoint, | 272 | m_endpoint, |
@@ -271,14 +284,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | @@ -271,14 +284,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi | ||
271 | 284 | ||
272 | std::set<Token> MqttClient::unsubscribe(const std::string& topic, int qos) | 285 | std::set<Token> MqttClient::unsubscribe(const std::string& topic, int qos) |
273 | { | 286 | { |
274 | - // DebugLogToFIle ("MqttClient", "%1 - Unsubscribe from topic %2 with qos %3", m_clientId, topic, qos); | 287 | + LogDebug("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unsubscribe from topic " + topic ) ); |
275 | OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); | 288 | OSDEV_COMPONENTS_LOCKGUARD(m_interfaceMutex); |
276 | std::vector<IMqttClientImpl*> clients{}; | 289 | std::vector<IMqttClientImpl*> clients{}; |
277 | { | 290 | { |
278 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 291 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
279 | if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) { | 292 | if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) { |
280 | - // ErrorLogToFIle ("MqttClient", "%1 - Unable to unsubscribe, not connected", m_clientId); | 293 | + LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) ); |
281 | // Throw (MqttException, "Not connected"); | 294 | // Throw (MqttException, "Not connected"); |
295 | + return std::set<Token>(); | ||
282 | } | 296 | } |
283 | clients.push_back(m_principalClient.get()); | 297 | clients.push_back(m_principalClient.get()); |
284 | for (const auto& c : m_additionalClients) { | 298 | for (const auto& c : m_additionalClients) { |
@@ -359,9 +373,7 @@ std::string MqttClient::endpoint() const | @@ -359,9 +373,7 @@ std::string MqttClient::endpoint() const | ||
359 | 373 | ||
360 | void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus cs) | 374 | void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus cs) |
361 | { | 375 | { |
362 | - (void)id; | ||
363 | - (void)cs; | ||
364 | - // DebugLogToFIle ("MqttClient", "%1 - connection status of wrapped client %2 changed to %3", m_clientId, id, cs); | 376 | + LogDebug("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - connection status of wrapped client " + id + " changed to " + std::to_string( static_cast<int>(cs) ) ) ); |
365 | IMqttClientImpl* principalClient{ nullptr }; | 377 | IMqttClientImpl* principalClient{ nullptr }; |
366 | std::vector<IMqttClientImpl*> clients{}; | 378 | std::vector<IMqttClientImpl*> clients{}; |
367 | std::vector<ConnectionStatus> connectionStates{}; | 379 | std::vector<ConnectionStatus> connectionStates{}; |
@@ -392,7 +404,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -392,7 +404,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
392 | cl->resubscribe(); | 404 | cl->resubscribe(); |
393 | } | 405 | } |
394 | catch (const std::exception& e) { | 406 | catch (const std::exception& e) { |
395 | - // ErrorLogToFIle ("MqttClient", "%1 - resubscribe on wrapped client %2 in context of connection status change in wrapped client %3 failed : %4", m_clientId, cl->clientId(), id, e.what()); | 407 | + LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) ); |
396 | } | 408 | } |
397 | } | 409 | } |
398 | m_activeTokensCV.notify_all(); | 410 | m_activeTokensCV.notify_all(); |
@@ -407,7 +419,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -407,7 +419,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
407 | auto waitFor = std::chrono::milliseconds(1000); | 419 | auto waitFor = std::chrono::milliseconds(1000); |
408 | if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) { | 420 | if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) { |
409 | if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) { | 421 | if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) { |
410 | - // WarnLogToFIle ("MqttClient", "%1 - subscriptions are not recovered within timeout.", m_clientId) | 422 | + LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) ); |
411 | } | 423 | } |
412 | } | 424 | } |
413 | if (principalClient) { | 425 | if (principalClient) { |
@@ -415,7 +427,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | @@ -415,7 +427,7 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus | ||
415 | principalClient->publishPending(); | 427 | principalClient->publishPending(); |
416 | } | 428 | } |
417 | catch (const std::exception& e) { | 429 | catch (const std::exception& e) { |
418 | - // ErrorLogToFIle ("MqttClient", "%1 - publishPending on wrapped client %2 failed : %3", m_clientId, principalClient->clientId(), e.what()); | 430 | + LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); |
419 | } | 431 | } |
420 | } | 432 | } |
421 | } | 433 | } |
@@ -434,7 +446,7 @@ void MqttClient::deliveryComplete(const std::string& _clientId, std::int32_t tok | @@ -434,7 +446,7 @@ void MqttClient::deliveryComplete(const std::string& _clientId, std::int32_t tok | ||
434 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); | 446 | OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); |
435 | if (!m_activeTokens.insert(t).second) { | 447 | if (!m_activeTokens.insert(t).second) { |
436 | // This should not happen. This means that some callback on the wrapper never came. | 448 | // This should not happen. This means that some callback on the wrapper never came. |
437 | - // ErrorLogToFIle ("MqttClient", "%1 -deliveryComplete, token %1 is already active", m_clientId, t); | 449 | + LogDebug("[MqttClient::deliveryComplete]", std::string( m_clientId + " - deliveryComplete, token is already active" ) ); |
438 | } | 450 | } |
439 | } | 451 | } |
440 | this->pushEvent([this, t]() { | 452 | this->pushEvent([this, t]() { |
@@ -531,8 +543,9 @@ void MqttClient::pushEvent(std::function<void()> ev) | @@ -531,8 +543,9 @@ void MqttClient::pushEvent(std::function<void()> ev) | ||
531 | 543 | ||
532 | void MqttClient::eventHandler() | 544 | void MqttClient::eventHandler() |
533 | { | 545 | { |
534 | - // InfoLogToFIle ("MqttClient", "%1 - starting event handler", m_clientId); | ||
535 | - for (;;) { | 546 | + LogInfo("[MqttClient::eventHandler]", std::string( m_clientId + " - starting event handler." ) ); |
547 | + for (;;) | ||
2 |
|
||
548 | + { | ||
536 | std::vector<std::function<void()>> events; | 549 | std::vector<std::function<void()>> events; |
537 | if (!m_eventQueue.pop(events)) | 550 | if (!m_eventQueue.pop(events)) |
538 | { | 551 | { |
@@ -543,5 +556,5 @@ void MqttClient::eventHandler() | @@ -543,5 +556,5 @@ void MqttClient::eventHandler() | ||
543 | ev(); | 556 | ev(); |
544 | } | 557 | } |
545 | } | 558 | } |
546 | - // InfoLogToFIle ("MqttClient", "%1 - leaving event handler", m_clientId); | 559 | + LogInfo("[MqttClient::eventHandler]", std::string( m_clientId + " - leaving event handler." ) ); |
547 | } | 560 | } |