Commit b4d6ed568a3652ab9a0078f2eb6e976d1497d898
Merge branch 'feat/sridder/will_set' into 'development'
added LWT ( last will & testament ) option to mqtt connect. added support for a last will & testament message. LWT settings are not required for connection, but can be provided as optional parameter when calling the mqttclient connect function. ( see publisher test case ). The parameters to be added are the topic to publish on and the message to be published in case of a connection disruption. See merge request !8
Showing
10 changed files
with
106 additions
and
19 deletions
examples/pub/main.cpp
@@ -78,7 +78,7 @@ int main( int argc, char* argv[] ) | @@ -78,7 +78,7 @@ int main( int argc, char* argv[] ) | ||
78 | { | 78 | { |
79 | std::cout << "{OK}" << std::endl; | 79 | std::cout << "{OK}" << std::endl; |
80 | std::cout << "Connecting to the broker : "; | 80 | std::cout << "Connecting to the broker : "; |
81 | - pPublisher->connect( "localhost", 1883, "", "" ); | 81 | + pPublisher->connect( "localhost", 1883, "", "", "LWT-test", "connection disrupted.." ); |
82 | 82 | ||
83 | // Assume we are connected now, start publishing. | 83 | // Assume we are connected now, start publishing. |
84 | while( 1 ) | 84 | while( 1 ) |
@@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) | @@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) | ||
94 | } | 94 | } |
95 | messageNumber++; | 95 | messageNumber++; |
96 | } | 96 | } |
97 | - sleepcp( 1, T_MICRO ); | 97 | + sleepcp( 5, T_SECONDS ); |
98 | } | 98 | } |
99 | } | 99 | } |
100 | else | 100 | else |
examples/pub/publisher.cpp
@@ -29,9 +29,9 @@ Publisher::Publisher() | @@ -29,9 +29,9 @@ Publisher::Publisher() | ||
29 | 29 | ||
30 | } | 30 | } |
31 | 31 | ||
32 | -void Publisher::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password ) | 32 | +void Publisher::connect(const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message) |
33 | { | 33 | { |
34 | - m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ) ); | 34 | + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) ); |
35 | std::cout << "Client state : " << m_mqtt_client.state() << std::endl; | 35 | std::cout << "Client state : " << m_mqtt_client.state() << std::endl; |
36 | } | 36 | } |
37 | 37 |
examples/pub/publisher.h
@@ -36,7 +36,8 @@ public: | @@ -36,7 +36,8 @@ public: | ||
36 | 36 | ||
37 | virtual ~Publisher() {} | 37 | virtual ~Publisher() {} |
38 | 38 | ||
39 | - void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string() ); | 39 | + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string() |
40 | + , const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() ); | ||
40 | 41 | ||
41 | void publish( const std::string &message_topic, const std::string &message_payload ); | 42 | void publish( const std::string &message_topic, const std::string &message_payload ); |
42 | 43 |
include/clientpaho.h
@@ -45,6 +45,7 @@ | @@ -45,6 +45,7 @@ | ||
45 | #include "imqttclientimpl.h" | 45 | #include "imqttclientimpl.h" |
46 | #include "mqttfailure.h" | 46 | #include "mqttfailure.h" |
47 | #include "mqttsuccess.h" | 47 | #include "mqttsuccess.h" |
48 | +#include "mqtt_lwt.h" | ||
48 | 49 | ||
49 | namespace osdev { | 50 | namespace osdev { |
50 | namespace components { | 51 | namespace components { |
@@ -94,7 +95,7 @@ public: | @@ -94,7 +95,7 @@ public: | ||
94 | /** | 95 | /** |
95 | * @see IMqttClientImpl | 96 | * @see IMqttClientImpl |
96 | */ | 97 | */ |
97 | - virtual std::int32_t connect(bool wait) override; | 98 | + virtual std::int32_t connect(bool wait, const mqtt_LWT &lwt = mqtt_LWT() ) override; |
98 | 99 | ||
99 | /** | 100 | /** |
100 | * @see IMqttClientImpl | 101 | * @see IMqttClientImpl |
include/imqttclient.h
@@ -39,6 +39,7 @@ | @@ -39,6 +39,7 @@ | ||
39 | #include "credentials.h" | 39 | #include "credentials.h" |
40 | #include "mqttmessage.h" | 40 | #include "mqttmessage.h" |
41 | #include "token.h" | 41 | #include "token.h" |
42 | +#include "mqtt_lwt.h" | ||
42 | 43 | ||
43 | namespace osdev { | 44 | namespace osdev { |
44 | namespace components { | 45 | namespace components { |
@@ -59,13 +60,13 @@ public: | @@ -59,13 +60,13 @@ public: | ||
59 | * @param port The port to use. | 60 | * @param port The port to use. |
60 | * @param credentials The credentials to use. | 61 | * @param credentials The credentials to use. |
61 | */ | 62 | */ |
62 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) = 0; | 63 | + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; |
63 | 64 | ||
64 | /** | 65 | /** |
65 | * @brief Connect to the endpoint | 66 | * @brief Connect to the endpoint |
66 | * @param endpoint an uri endpoint description. | 67 | * @param endpoint an uri endpoint description. |
67 | */ | 68 | */ |
68 | - virtual void connect(const std::string& endpoint) = 0; | 69 | + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; |
69 | 70 | ||
70 | /** | 71 | /** |
71 | * @brief Disconnect the client from the broker | 72 | * @brief Disconnect the client from the broker |
include/imqttclientimpl.h
@@ -36,6 +36,7 @@ | @@ -36,6 +36,7 @@ | ||
36 | // mlogic::mqtt | 36 | // mlogic::mqtt |
37 | #include "connectionstatus.h" | 37 | #include "connectionstatus.h" |
38 | #include "mqttmessage.h" | 38 | #include "mqttmessage.h" |
39 | +#include "mqtt_lwt.h" | ||
39 | 40 | ||
40 | namespace osdev { | 41 | namespace osdev { |
41 | namespace components { | 42 | namespace components { |
@@ -65,7 +66,7 @@ public: | @@ -65,7 +66,7 @@ public: | ||
65 | * @param wait A flag that indicates if the method should wait for a succesful connection. | 66 | * @param wait A flag that indicates if the method should wait for a succesful connection. |
66 | * @return the operation token. | 67 | * @return the operation token. |
67 | */ | 68 | */ |
68 | - virtual std::int32_t connect(bool wait) = 0; | 69 | + virtual std::int32_t connect( bool wait, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; |
69 | 70 | ||
70 | /** | 71 | /** |
71 | * @brief Disconnect the wrapper. | 72 | * @brief Disconnect the wrapper. |
include/mqtt_lwt.h
0 โ 100644
1 | +/* **************************************************************************** | ||
2 | + * Copyright 2019 Open Systems Development BV * | ||
3 | + * * | ||
4 | + * Permission is hereby granted, free of charge, to any person obtaining a * | ||
5 | + * copy of this software and associated documentation files (the "Software"), * | ||
6 | + * to deal in the Software without restriction, including without limitation * | ||
7 | + * the rights to use, copy, modify, merge, publish, distribute, sublicense, * | ||
8 | + * and/or sell copies of the Software, and to permit persons to whom the * | ||
9 | + * Software is furnished to do so, subject to the following conditions: * | ||
10 | + * * | ||
11 | + * The above copyright notice and this permission notice shall be included in * | ||
12 | + * all copies or substantial portions of the Software. * | ||
13 | + * * | ||
14 | + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * | ||
15 | + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * | ||
16 | + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * | ||
17 | + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * | ||
18 | + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * | ||
19 | + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * | ||
20 | + * DEALINGS IN THE SOFTWARE. * | ||
21 | + * ***************************************************************************/ | ||
22 | +#pragma once | ||
23 | + | ||
24 | +// std | ||
25 | +#include <string> | ||
26 | + | ||
27 | +namespace osdev { | ||
28 | +namespace components { | ||
29 | +namespace mqtt { | ||
30 | + | ||
31 | +/*! | ||
32 | + * \brief Class that holds LWT ( Last Will & Testament ) context. | ||
33 | + */ | ||
34 | +class mqtt_LWT | ||
35 | +{ | ||
36 | +public: | ||
37 | + /*! | ||
38 | + * \brief Default CTor, empty LWT | ||
39 | + */ | ||
40 | + mqtt_LWT() | ||
41 | + : m_topic() | ||
42 | + , m_message() | ||
43 | + {} | ||
44 | + | ||
45 | + /*! | ||
46 | + * \brief Constructor for LWT topic/message | ||
47 | + * \param topic - The topic on which the LWT is published. | ||
48 | + * \param message - The message published on broker disconnection. | ||
49 | + */ | ||
50 | + mqtt_LWT( const std::string &topic, const std::string &message ) | ||
51 | + : m_topic( topic ) | ||
52 | + , m_message( message ) | ||
53 | + {} | ||
54 | + | ||
55 | + const std::string& topic() const { return m_topic; } | ||
56 | + const std::string& message() const { return m_message; } | ||
57 | + | ||
58 | +private: | ||
59 | + std::string m_topic; | ||
60 | + std::string m_message; | ||
61 | +}; | ||
62 | + | ||
63 | +} // End namespace mqtt | ||
64 | +} // End namespace components | ||
65 | +} // End namespace osdev |
include/mqttclient.h
@@ -36,6 +36,7 @@ | @@ -36,6 +36,7 @@ | ||
36 | #include "serverstate.h" | 36 | #include "serverstate.h" |
37 | 37 | ||
38 | #include "imqttclient.h" | 38 | #include "imqttclient.h" |
39 | +#include "mqtt_lwt.h" | ||
39 | 40 | ||
40 | namespace osdev { | 41 | namespace osdev { |
41 | namespace components { | 42 | namespace components { |
@@ -91,12 +92,12 @@ public: | @@ -91,12 +92,12 @@ public: | ||
91 | /** | 92 | /** |
92 | * @see IMqttClient | 93 | * @see IMqttClient |
93 | */ | 94 | */ |
94 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) override; | 95 | + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; |
95 | 96 | ||
96 | /** | 97 | /** |
97 | * @see IMqttClient | 98 | * @see IMqttClient |
98 | */ | 99 | */ |
99 | - virtual void connect(const std::string& endpoint) override; | 100 | + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override; |
100 | 101 | ||
101 | /** | 102 | /** |
102 | * @see IMqttClient | 103 | * @see IMqttClient |
src/clientpaho.cpp
@@ -182,7 +182,7 @@ ConnectionStatus ClientPaho::connectionStatus() const | @@ -182,7 +182,7 @@ ConnectionStatus ClientPaho::connectionStatus() const | ||
182 | return m_connectionStatus; | 182 | return m_connectionStatus; |
183 | } | 183 | } |
184 | 184 | ||
185 | -std::int32_t ClientPaho::connect(bool wait) | 185 | +std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
186 | { | 186 | { |
187 | { | 187 | { |
188 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 188 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
@@ -200,6 +200,21 @@ std::int32_t ClientPaho::connect(bool wait) | @@ -200,6 +200,21 @@ std::int32_t ClientPaho::connect(bool wait) | ||
200 | conn_opts.onFailure = &ClientPaho::onConnectFailure; | 200 | conn_opts.onFailure = &ClientPaho::onConnectFailure; |
201 | conn_opts.context = this; | 201 | conn_opts.context = this; |
202 | conn_opts.automaticReconnect = 1; | 202 | conn_opts.automaticReconnect = 1; |
203 | + | ||
204 | + if( !lwt.topic().empty() ) | ||
205 | + { | ||
206 | + MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; | ||
207 | + will_opts.message = lwt.message().c_str(); | ||
208 | + will_opts.topicName = lwt.topic().c_str(); | ||
209 | + | ||
210 | + conn_opts.will = &will_opts; | ||
211 | + } | ||
212 | + else | ||
213 | + { | ||
214 | + conn_opts.will = nullptr; | ||
215 | + } | ||
216 | + | ||
217 | + | ||
203 | if (!m_username.empty()) | 218 | if (!m_username.empty()) |
204 | { | 219 | { |
205 | conn_opts.username = m_username.c_str(); | 220 | conn_opts.username = m_username.c_str(); |
@@ -451,7 +466,7 @@ void ClientPaho::resubscribe() | @@ -451,7 +466,7 @@ void ClientPaho::resubscribe() | ||
451 | } | 466 | } |
452 | } | 467 | } |
453 | 468 | ||
454 | -std::int32_t ClientPaho::unsubscribe(const std::string& topic, int qos) | 469 | +std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) |
455 | { | 470 | { |
456 | { | 471 | { |
457 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | 472 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
src/mqttclient.cpp
@@ -125,9 +125,8 @@ StateEnum MqttClient::state() const | @@ -125,9 +125,8 @@ StateEnum MqttClient::state() const | ||
125 | return m_serverState.state(); | 125 | return m_serverState.state(); |
126 | } | 126 | } |
127 | 127 | ||
128 | -void MqttClient::connect(const std::string& host, int port, const Credentials& credentials) | 128 | +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt ) |
129 | { | 129 | { |
130 | - | ||
131 | osdev::components::mqtt::ParsedUri _endpoint = { | 130 | osdev::components::mqtt::ParsedUri _endpoint = { |
132 | { "scheme", "tcp" }, | 131 | { "scheme", "tcp" }, |
133 | { "user", credentials.username() }, | 132 | { "user", credentials.username() }, |
@@ -135,10 +134,11 @@ void MqttClient::connect(const std::string& host, int port, const Credentials& c | @@ -135,10 +134,11 @@ void MqttClient::connect(const std::string& host, int port, const Credentials& c | ||
135 | { "host", host }, | 134 | { "host", host }, |
136 | { "port", std::to_string(port) } | 135 | { "port", std::to_string(port) } |
137 | }; | 136 | }; |
138 | - this->connect(UriParser::toString(_endpoint)); | 137 | + |
138 | + this->connect( UriParser::toString( _endpoint ), lwt ); | ||
139 | } | 139 | } |
140 | 140 | ||
141 | -void MqttClient::connect(const std::string& _endpoint) | 141 | +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) |
142 | { | 142 | { |
143 | LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); | 143 | LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); |
144 | 144 | ||
@@ -159,7 +159,8 @@ void MqttClient::connect(const std::string& _endpoint) | @@ -159,7 +159,8 @@ void MqttClient::connect(const std::string& _endpoint) | ||
159 | } | 159 | } |
160 | } | 160 | } |
161 | m_endpoint = _endpoint; | 161 | m_endpoint = _endpoint; |
162 | - if (!m_principalClient) { | 162 | + if (!m_principalClient) |
163 | + { | ||
163 | std::string derivedClientId(generateUniqueClientId(m_clientId, 1)); | 164 | std::string derivedClientId(generateUniqueClientId(m_clientId, 1)); |
164 | m_principalClient = std::make_unique<ClientPaho>( | 165 | m_principalClient = std::make_unique<ClientPaho>( |
165 | m_endpoint, | 166 | m_endpoint, |
@@ -169,7 +170,8 @@ void MqttClient::connect(const std::string& _endpoint) | @@ -169,7 +170,8 @@ void MqttClient::connect(const std::string& _endpoint) | ||
169 | } | 170 | } |
170 | client = m_principalClient.get(); | 171 | client = m_principalClient.get(); |
171 | } | 172 | } |
172 | - client->connect(true); | 173 | + |
174 | + client->connect( true, lwt ); | ||
173 | } | 175 | } |
174 | 176 | ||
175 | void MqttClient::disconnect() | 177 | void MqttClient::disconnect() |