Commit 8d8360df92f39f66b9e706beb9fd863c3b8b7cb9
Merge branch 'pgroen/fix/ab257161_investigate' into 'master'
pgroen/fix/ab257161 investigate When we delete our mqttclient instances (276 instances), sometimes we encounter a SEGFAULT originating in MqttClient::eventHandler. GDB always shows: 0x000055555576b4d5 in operator() (_closure=0x7ffea40023b0) at /source/mqtt-cpp/src/mqttclient.cpp:499. This is the line: principalClient->publishPending(); . principalClient pointer is not null, so the if-statement at 495 passes, but it is not a valid pointer. That is because the destructor already deleted that instance of principalClient (line 83, mprincipalClient.swap(principalClient); ). This is clearly indicated by this output: (gdb) p m_eventQueue.m_stop $1 = std::atomic = { true } That indicates the destructor has already executed line 87. This is caused by obtaining the classic pointer of m_principalClient at line 421, so principalClient is a reference which isn’t accounted for by the smart-pointer. See merge request !23
Showing
5 changed files
with
248 additions
and
11 deletions
src/mqttclient.cpp
@@ -73,6 +73,12 @@ MqttClient::MqttClient(const std::string& _clientId, const std::function<void(co | @@ -73,6 +73,12 @@ MqttClient::MqttClient(const std::string& _clientId, const std::function<void(co | ||
73 | 73 | ||
74 | MqttClient::~MqttClient() | 74 | MqttClient::~MqttClient() |
75 | { | 75 | { |
76 | + LogDebug( "MqttClient", std::string( m_clientId + " - dtor stop queue" ) ); | ||
77 | + m_eventQueue.stop(); | ||
78 | + if (m_workerThread.joinable()) { | ||
79 | + m_workerThread.join(); | ||
80 | + } | ||
81 | + | ||
76 | { | 82 | { |
77 | // LogDebug( "MqttClient", std::string( m_clientId + " - disconnect" ) ); | 83 | // LogDebug( "MqttClient", std::string( m_clientId + " - disconnect" ) ); |
78 | this->disconnect(); | 84 | this->disconnect(); |
@@ -83,11 +89,6 @@ MqttClient::~MqttClient() | @@ -83,11 +89,6 @@ MqttClient::~MqttClient() | ||
83 | m_principalClient.swap(principalClient); | 89 | m_principalClient.swap(principalClient); |
84 | } | 90 | } |
85 | 91 | ||
86 | - LogDebug( "MqttClient", std::string( m_clientId + " - dtor stop queue" ) ); | ||
87 | - m_eventQueue.stop(); | ||
88 | - if (m_workerThread.joinable()) { | ||
89 | - m_workerThread.join(); | ||
90 | - } | ||
91 | LogDebug( "MqttClient", std::string( m_clientId + " - dtor ready" ) ); | 92 | LogDebug( "MqttClient", std::string( m_clientId + " - dtor ready" ) ); |
92 | } | 93 | } |
93 | 94 |
test/CMakeLists.txt
@@ -7,24 +7,26 @@ | @@ -7,24 +7,26 @@ | ||
7 | # | 7 | # |
8 | # Build rules for the MQTT Library | 8 | # Build rules for the MQTT Library |
9 | 9 | ||
10 | -add_executable(topictest | 10 | +add_executable(mqtt_test |
11 | + helperclasses/PublisherClass.h | ||
11 | TopicLengthTest.cpp | 12 | TopicLengthTest.cpp |
13 | + SledgeHammerTest.cpp | ||
12 | ) | 14 | ) |
13 | 15 | ||
14 | -target_include_directories(topictest PRIVATE | 16 | +target_include_directories(mqtt_test PRIVATE |
15 | ${CMAKE_CIRRENT_SOURECE_DIR} | 17 | ${CMAKE_CIRRENT_SOURECE_DIR} |
16 | ../include | 18 | ../include |
17 | ) | 19 | ) |
18 | 20 | ||
19 | -target_link_libraries(topictest PRIVATE | 21 | +target_link_libraries(mqtt_test PRIVATE |
20 | gmock_main | 22 | gmock_main |
21 | gmock | 23 | gmock |
22 | gtest | 24 | gtest |
23 | mqtt-cpp | 25 | mqtt-cpp |
24 | ) | 26 | ) |
25 | 27 | ||
26 | -add_test(NAME topictest COMMAND topictest) | 28 | +add_test(NAME mqtt_test COMMAND mqtt_test) |
27 | 29 | ||
28 | -set_tests_properties(topictest PROPERTIES | 30 | +set_tests_properties(mqtt_test PROPERTIES |
29 | WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" | 31 | WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" |
30 | ) | 32 | ) |
test/SledgeHammerTest.cpp
0 → 100644
1 | +/**************************************************************************** | ||
2 | + * COpyright (c) 2023 Open Systems Development B.V. | ||
3 | + ****************************************************************************/ | ||
4 | + | ||
5 | +#include <gmock/gmock.h> | ||
6 | +#include <gtest/gtest.h> | ||
7 | +#include <string> | ||
8 | +#include <memory> | ||
9 | +#include <chrono> | ||
10 | +#include <unistd.h> | ||
11 | + | ||
12 | +#include <iostream> | ||
13 | + | ||
14 | +#include "helperclasses/PublisherClass.h" | ||
15 | + | ||
16 | +/// Every test does basically the same. | ||
17 | +/// 1. Create a Publisher object | ||
18 | +/// 2. Connect to the MQTT broker | ||
19 | +/// 3. Publish 10 message(s) | ||
20 | +/// 4. Disconnect from the MQTT broker | ||
21 | +/// 5. Destroy the Publisher object | ||
22 | +/// 6. Repeat 10 times. | ||
23 | + | ||
24 | +const std::string sledge_maintopic = "SledgeHammerTest/"; | ||
25 | + | ||
26 | +enum TIME_RES | ||
27 | +{ | ||
28 | + T_MICRO, | ||
29 | + T_MILLI, | ||
30 | + T_SECONDS | ||
31 | +}; | ||
32 | + | ||
33 | +std::uint64_t getEpochUSecs() | ||
34 | +{ | ||
35 | + auto tsUSec =std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::system_clock::now()); | ||
36 | + return static_cast<std::uint64_t>(tsUSec.time_since_epoch().count()); | ||
37 | +} | ||
38 | + | ||
39 | + | ||
40 | +void sleepcp( int number, TIME_RES resolution = T_MILLI ) // Cross-platform sleep function | ||
41 | +{ | ||
42 | + int factor = 0; // Should not happen.. | ||
43 | + | ||
44 | + switch( resolution ) | ||
45 | + { | ||
46 | + case T_MICRO: | ||
47 | + factor = 1; | ||
48 | + break; | ||
49 | + | ||
50 | + case T_MILLI: | ||
51 | + factor = 1000; | ||
52 | + break; | ||
53 | + | ||
54 | + case T_SECONDS: | ||
55 | + factor = 1000000; | ||
56 | + break; | ||
57 | + } | ||
58 | + | ||
59 | + usleep( number * factor ); | ||
60 | +} | ||
61 | + | ||
62 | +/// Test a single connection, 1 time | ||
63 | +TEST(SledgeHammerTest, SingleConnectionCleanExit) | ||
64 | +{ | ||
65 | + Publisher *publisher = nullptr; | ||
66 | + for (int test_counter = 0; test_counter < 10; test_counter++) | ||
67 | + { | ||
68 | + publisher = new Publisher(std::to_string(getEpochUSecs())); | ||
69 | + if(publisher) | ||
70 | + { | ||
71 | + publisher->connect("localhost"); | ||
72 | + publisher->publish(sledge_maintopic + "Single Connection Clean Exit Test/" + std::to_string(test_counter), "Hello World. I'm alive.."); | ||
73 | + publisher->disconnect(); | ||
74 | + delete publisher; | ||
75 | + publisher = nullptr; | ||
76 | + } | ||
77 | + } | ||
78 | +} | ||
79 | + | ||
80 | +TEST(SledgeHammerTest, SingleConnectionForcedExit) | ||
81 | +{ | ||
82 | + Publisher *publisher = nullptr; | ||
83 | + for (int test_counter = 0; test_counter < 10; test_counter++) | ||
84 | + { | ||
85 | + publisher = new Publisher(std::to_string(getEpochUSecs())); | ||
86 | + if(publisher) | ||
87 | + { | ||
88 | + publisher->connect("localhost"); | ||
89 | + publisher->publish(sledge_maintopic + "Single Connection Forced Exit/Test/" + std::to_string(test_counter), "Hello World. I'm alive.."); | ||
90 | + delete publisher; | ||
91 | + publisher = nullptr; | ||
92 | + } | ||
93 | + } | ||
94 | +} | ||
95 | + | ||
96 | +TEST(SledgeHammerTest, MultipleConnections_10_CleanExit) | ||
97 | +{ | ||
98 | + std::unordered_map<std::string, Publisher *> publishers; | ||
99 | + | ||
100 | + for (int test_counter = 0; test_counter < 10; test_counter++) | ||
101 | + { | ||
102 | + publishers["Publisher" + std::to_string(test_counter)] = new Publisher(std::to_string(getEpochUSecs())); | ||
103 | + if(publishers["Publisher" + std::to_string(test_counter)]) | ||
104 | + { | ||
105 | + publishers["Publisher" + std::to_string(test_counter)]->connect("localhost"); | ||
106 | + } | ||
107 | + } | ||
108 | + | ||
109 | + for (int test_counter = 0; test_counter < 10; test_counter++) | ||
110 | + { | ||
111 | + for (int nCount = 0; nCount < 10; nCount++) | ||
112 | + { | ||
113 | + publishers["Publisher" + std::to_string(test_counter)]->publish(sledge_maintopic + "Multiple Connections [10]/Clean Exit/Test " + std::to_string(test_counter), "Hello World. I'm alive.."); | ||
114 | + } | ||
115 | + publishers["Publisher" + std::to_string(test_counter)]->disconnect(); | ||
116 | + delete publishers["Publisher" + std::to_string(test_counter)]; | ||
117 | + publishers.erase("Publisher" + std::to_string(test_counter)); | ||
118 | + } | ||
119 | +} | ||
120 | + | ||
121 | +TEST(SledgeHammerTest, MultipleConnections_BurnTest) | ||
122 | +{ | ||
123 | + std::unordered_map<std::string, Publisher *> publishers; | ||
124 | + | ||
125 | + const int max_run = 100; | ||
126 | + const int max_connections = 5; | ||
127 | + const int max_messages = 100; | ||
128 | + | ||
129 | + for (int test_run = 0; test_run < max_run; test_run++) | ||
130 | + { | ||
131 | + std::cout << "Creating " << max_connections << " connections for test run " << test_run << " " << std::endl; | ||
132 | + for (int test_counter = 0; test_counter < max_connections; test_counter++) | ||
133 | + { | ||
134 | + publishers["Publisher" + std::to_string(test_counter)] = new Publisher(std::to_string(getEpochUSecs())); | ||
135 | + if(publishers["Publisher" + std::to_string(test_counter)]) | ||
136 | + { | ||
137 | + publishers["Publisher" + std::to_string(test_counter)]->connect("localhost"); | ||
138 | + std::cout << "."; | ||
139 | + } | ||
140 | + } | ||
141 | + std::cout << std::endl; | ||
142 | + | ||
143 | + // Wait a second for the connections to be established. | ||
144 | + sleepcp(1, T_SECONDS); | ||
145 | + | ||
146 | + std::cout << "Publishing " << max_messages << " messages for test run " << test_run << std::endl; | ||
147 | + for (int test_counter = 0; test_counter < max_connections; test_counter++) | ||
148 | + { | ||
149 | + std::cout << "\tConnection : " << test_counter << " "; | ||
150 | + for (int nCount = 0; nCount < max_messages; nCount++) | ||
151 | + { | ||
152 | + if(publishers["Publisher" + std::to_string(test_counter)]) | ||
153 | + { | ||
154 | + std::cout << "."; | ||
155 | + publishers["Publisher" + std::to_string(test_counter)]->publish(sledge_maintopic + | ||
156 | + "Multiple Connections [" + std::to_string(max_connections) + "]/Multi Messages [" + std::to_string(max_messages) + "]/Test Run " + | ||
157 | + std::to_string(test_run) + "/" + "[" + std::to_string(test_counter) + "]" + | ||
158 | + "[" + std::to_string(nCount) + "]", "Hello World. I'm alive.."); | ||
159 | + } | ||
160 | + } | ||
161 | + std::cout << std::endl; | ||
162 | + } | ||
163 | + | ||
164 | + // Wait another second to update the broker. | ||
165 | + sleepcp(1, T_SECONDS); | ||
166 | + | ||
167 | + std::cout << "Disconnecting " << max_connections << " connections for test run " << test_run << " "; | ||
168 | + for (int test_counter = 0; test_counter < max_connections; test_counter++) | ||
169 | + { | ||
170 | + if(publishers["Publisher" + std::to_string(test_counter)]) | ||
171 | + { | ||
172 | + std::cout << "."; | ||
173 | + publishers["Publisher" + std::to_string(test_counter)]->disconnect(); | ||
174 | + delete publishers["Publisher" + std::to_string(test_counter)]; | ||
175 | + } | ||
176 | + } | ||
177 | + std::cout << std::endl; | ||
178 | + publishers.clear(); | ||
179 | + std::cout << std::string(200,'=') << std::endl; | ||
180 | + | ||
181 | + // Wait for another second before re-running another 100 times. | ||
182 | + sleepcp(1, T_SECONDS); | ||
183 | + } | ||
184 | +} | ||
0 | \ No newline at end of file | 185 | \ No newline at end of file |
test/TopicLengthTest.cpp
@@ -12,7 +12,7 @@ | @@ -12,7 +12,7 @@ | ||
12 | using namespace osdev::components::mqtt; | 12 | using namespace osdev::components::mqtt; |
13 | using namespace osdev::components::log; | 13 | using namespace osdev::components::log; |
14 | 14 | ||
15 | -static const std::string main_topic = "test/"; | 15 | +static const std::string main_topic = "Topic Length Test/"; |
16 | 16 | ||
17 | /**************************************************************************** | 17 | /**************************************************************************** |
18 | * H E L P E R C L A S S E S | 18 | * H E L P E R C L A S S E S |
test/helperclasses/PublisherClass.h
0 → 100644
1 | +/**************************************************************************** | ||
2 | + * Copyright (c)2024 Open Systems Development B.V. | ||
3 | + ****************************************************************************/ | ||
4 | + | ||
5 | +#include "mqttclient.h" | ||
6 | + | ||
7 | +#include <string> | ||
8 | + | ||
9 | +using namespace osdev::components::mqtt; | ||
10 | +using namespace osdev::components::log; | ||
11 | + | ||
12 | +class Publisher | ||
13 | +{ | ||
14 | +public: | ||
15 | + Publisher(const std::string &unique_id) : m_mqtt_client("SledgeHammerTest" + unique_id){} | ||
16 | + virtual ~Publisher() {} | ||
17 | + | ||
18 | + void connect( const std::string &hostname, | ||
19 | + int portnumber = 1883, | ||
20 | + const std::string &username = std::string(), | ||
21 | + const std::string &password = std::string(), | ||
22 | + const std::string &lwt_topic = std::string(), | ||
23 | + const std::string &lwt_message = std::string() | ||
24 | + ) | ||
25 | + { | ||
26 | + m_mqtt_client.connect(hostname, portnumber, | ||
27 | + Credentials(username, password), | ||
28 | + mqtt_LWT(lwt_topic, lwt_message), | ||
29 | + true, | ||
30 | + LogSettings | ||
31 | + { | ||
32 | + LogLevel::Debug, | ||
33 | + LogMask::None | ||
34 | + }); | ||
35 | + } | ||
36 | + | ||
37 | + void disconnect() | ||
38 | + { | ||
39 | + m_mqtt_client.disconnect(); | ||
40 | + } | ||
41 | + | ||
42 | + void publish(const std::string &message_topic, const std::string &message_payload) | ||
43 | + { | ||
44 | + MqttMessage message(message_topic, true, false, message_payload); | ||
45 | + Token t_result = m_mqtt_client.publish(message, 0); | ||
46 | + } | ||
47 | + | ||
48 | +private: | ||
49 | + osdev::components::mqtt::MqttClient m_mqtt_client; | ||
50 | +}; | ||
0 | \ No newline at end of file | 51 | \ No newline at end of file |