Skip to content

Commit 67c6fd9

Browse files
committed
Update executor usage and throttled callback
Signed-off-by: methylDragon <[email protected]>
1 parent de22e91 commit 67c6fd9

15 files changed

+61
-107
lines changed

fuse_core/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
cmake_minimum_required(VERSION 3.5)
22
project(fuse_core)
33

4-
set(CMAKE_BUILD_TYPE RelWithDebInfo)
5-
64
# Default to C++17
75
if(NOT CMAKE_CXX_STANDARD)
86
set(CMAKE_CXX_STANDARD 17)

fuse_core/include/fuse_core/async_motion_model.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242

4343
#include <rclcpp/rclcpp.hpp>
4444

45-
#include <atomic>
4645
#include <string>
4746

4847

@@ -177,7 +176,6 @@ class AsyncMotionModel : public MotionModel
177176
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_; //!< A single/multi-threaded spinner assigned to the local callback queue
178177
size_t executor_thread_count_;
179178
std::thread spinner_; //!< Internal thread for spinning the executor
180-
std::atomic<bool> spinning_; //!< Flag for spinning the spin thread
181179

182180
/**
183181
* @brief Constructor

fuse_core/include/fuse_core/async_sensor_model.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ class AsyncSensorModel : public SensorModel
179179
TransactionCallback transaction_callback_; //!< The function to be executed every time a Transaction is "published"
180180
size_t executor_thread_count_;
181181
std::thread spinner_; //!< Internal thread for spinning the executor
182-
std::atomic<bool> spinning_; //!< Flag for spinning the spin thread
183182

184183
/**
185184
* @brief Constructor

fuse_core/include/fuse_core/throttled_callback.h

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
#include <functional>
3838
#include <utility>
3939

40-
#include <fuse_core/node_interfaces/node_interfaces.hpp>
4140
#include <rclcpp/clock.hpp>
4241
#include <rclcpp/duration.hpp>
4342
#include <rclcpp/time.hpp>
@@ -62,27 +61,17 @@ class ThrottledCallback
6261
* @param[in] keep_callback The callback to call when kept, i.e. not dropped. Defaults to nullptr
6362
* @param[in] drop_callback The callback to call when dropped because of the throttling. Defaults to nullptr
6463
* @param[in] throttle_period The throttling period duration in seconds. Defaults to 0.0, i.e. no throttling
65-
* @param[in] use_wall_time Whether to use wall time or not. Defaults to false
66-
* @throws if the use_wall_time is false and no NodeInterfaces is passed in
64+
* @param[in] clock The clock to throttle against. Defaults to using RCL_SYSTEM_TIME
6765
*/
6866
ThrottledCallback(Callback&& keep_callback = nullptr, // NOLINT(whitespace/operators)
6967
Callback&& drop_callback = nullptr, // NOLINT(whitespace/operators)
7068
const rclcpp::Duration& throttle_period = rclcpp::Duration(0,0),
71-
const bool use_wall_time = false,
72-
node_interfaces::NodeInterfaces<
73-
node_interfaces::Clock
74-
>::SharedPtr interfaces = nullptr)
69+
rclcpp::Clock::SharedPtr clock = std::make_shared<rclcpp::Clock>())
7570
: keep_callback_(keep_callback)
7671
, drop_callback_(drop_callback)
7772
, throttle_period_(throttle_period)
78-
, use_wall_time_(use_wall_time)
79-
, interfaces_(interfaces)
80-
{
81-
if (!use_wall_time && !interfaces) {
82-
throw std::runtime_error(
83-
"A NodeInterfaces object with a bound Clock interface must be passed in to use ROS time!");
84-
}
85-
}
73+
, clock_(clock)
74+
{}
8675

8776
/**
8877
* @brief Throttle period getter
@@ -95,13 +84,16 @@ class ThrottledCallback
9584
}
9685

9786
/**
98-
* @brief Use wall time flag getter
87+
* @brief Set the clock to throttle against
9988
*
100-
* @return True if using wall time, false otherwise
89+
* @param[in] clock The clock to set
10190
*/
102-
bool getUseWallTime() const
91+
void setClock(rclcpp::Clock::SharedPtr clock)
10392
{
104-
return use_wall_time_;
93+
clock_ = clock;
94+
95+
// Reset since we changed the clock
96+
last_called_time_ = rclcpp::Time();
10597
}
10698

10799
/**
@@ -114,17 +106,6 @@ class ThrottledCallback
114106
throttle_period_ = throttle_period;
115107
}
116108

117-
// TODO(CH3): REFACTOR THIS OUT!!!! Take a clock type param instead
118-
/**
119-
* @brief Use wall time flag setter
120-
*
121-
* @param[in] use_wall_time Whether to use rclcpp::WallTime or not
122-
*/
123-
void setUseWallTime(const bool use_wall_time)
124-
{
125-
use_wall_time_ = use_wall_time;
126-
}
127-
128109
/**
129110
* @brief Keep callback setter
130111
*
@@ -169,12 +150,7 @@ class ThrottledCallback
169150
// (a) This is the first call, i.e. the last called time is still invalid because it has not been set yet
170151
// (b) The throttle period is zero, so we should always keep the callbacks
171152
// (c) The elpased time between now and the last called time is greater than the throttle period
172-
rclcpp::Time now;
173-
if (use_wall_time_) {
174-
now = rclcpp::Clock(RCL_SYSTEM_TIME).now();
175-
} else {
176-
now = interfaces_.get_node_clock_interface()->get_clock()->now();
177-
}
153+
rclcpp::Time now = clock_->now();
178154

179155
if ((last_called_time_.nanoseconds() == 0)
180156
|| (throttle_period_.nanoseconds() == 0)
@@ -215,9 +191,8 @@ class ThrottledCallback
215191
Callback keep_callback_; //!< The callback to call when kept, i.e. not dropped
216192
Callback drop_callback_; //!< The callback to call when dropped because of throttling
217193
rclcpp::Duration throttle_period_; //!< The throttling period duration in seconds
218-
bool use_wall_time_; //<! The flag to indicate whether to use wall time or not
194+
rclcpp::Clock::SharedPtr clock_; //!< The clock to throttle against
219195
rclcpp::Time last_called_time_; //!< The last time the keep callback was called
220-
node_interfaces::NodeInterfaces<node_interfaces::Clock> interfaces_; //!< Node interfaces to use
221196
};
222197

223198
/**

fuse_core/src/async_motion_model.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ AsyncMotionModel::AsyncMotionModel(size_t thread_count) :
5656

5757
AsyncMotionModel::~AsyncMotionModel()
5858
{
59-
spinning_ = false;
59+
executor_->cancel();
6060
if (spinner_.joinable()) {
6161
spinner_.join();
6262
}
@@ -107,12 +107,8 @@ void AsyncMotionModel::initialize(const std::string& name)
107107
executor_->add_node(node_);
108108

109109
// TODO(CH3): Remove this if the internal node is removed
110-
spinning_ = true;
111110
spinner_ = std::thread([&](){
112-
auto context = node_->get_node_base_interface()->get_context();
113-
while(context->is_valid() && spinning_) {
114-
executor_->spin_some();
115-
}
111+
executor_->spin();
116112
});
117113
}
118114

@@ -153,7 +149,7 @@ void AsyncMotionModel::stop()
153149
executor_->remove_node(node_);
154150

155151
// TODO(CH3): Remove this if the internal node is removed
156-
spinning_ = false;
152+
executor_->cancel();
157153
if (spinner_.joinable()) {
158154
spinner_.join();
159155
}

fuse_core/src/async_publisher.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ AsyncPublisher::AsyncPublisher(size_t thread_count) :
4747

4848
AsyncPublisher::~AsyncPublisher()
4949
{
50-
spinning_ = false;
50+
executor_->cancel();
5151
if (spinner_.joinable()) {
5252
spinner_.join();
5353
}
@@ -83,10 +83,7 @@ void AsyncPublisher::initialize(const std::string& name)
8383
// TODO(CH3): Remove this if the internal node is removed
8484
spinning_ = true;
8585
spinner_ = std::thread([&](){
86-
auto context = node_->get_node_base_interface()->get_context();
87-
while(context->is_valid() && spinning_) {
88-
executor_->spin_some();
89-
}
86+
executor_->spin();
9087
});
9188
}
9289

@@ -122,7 +119,6 @@ void AsyncPublisher::stop()
122119
executor_->remove_node(node_);
123120

124121
// TODO(CH3): Remove this if the internal node is removed
125-
spinning_ = false;
126122
if (spinner_.joinable()) {
127123
spinner_.join();
128124
}

fuse_core/src/async_sensor_model.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ AsyncSensorModel::AsyncSensorModel(size_t thread_count) :
5454

5555
AsyncSensorModel::~AsyncSensorModel()
5656
{
57-
spinning_ = false;
57+
executor_->cancel();
5858
if (spinner_.joinable()) {
5959
spinner_.join();
6060
}
@@ -93,12 +93,8 @@ void AsyncSensorModel::initialize(
9393
executor_->add_node(node_);
9494

9595
// TODO(CH3): Remove this if the internal node is removed
96-
spinning_ = true;
9796
spinner_ = std::thread([&](){
98-
auto context = node_->get_node_base_interface()->get_context();
99-
while(context->is_valid() && spinning_) {
100-
executor_->spin_some();
101-
}
97+
executor_->spin();
10298
});
10399
}
104100

@@ -142,7 +138,6 @@ void AsyncSensorModel::stop()
142138
executor_->remove_node(node_);
143139

144140
// TODO(CH3): Remove this if the internal node is removed
145-
spinning_ = false;
146141
if (spinner_.joinable()) {
147142
spinner_.join();
148143
}

fuse_core/test/launch_tests/test_parameters.cpp

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,30 +44,24 @@ class TestParameters : public ::testing::Test
4444
public:
4545
void SetUp() override
4646
{
47-
exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
48-
49-
spinning_ = true;
47+
executor_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
5048
spinner_ = std::thread([&](){
51-
auto context = rclcpp::contexts::get_global_default_context();
52-
while(context->is_valid() && spinning_) {
53-
exec_->spin_some();
54-
}
49+
executor_->spin();
5550
});
5651
}
5752

5853
void TearDown() override
5954
{
60-
spinning_ = false;
55+
executor_->cancel();
6156
if (spinner_.joinable()) {
6257
spinner_.join();
6358
}
64-
65-
exec_.reset();
59+
executor_.reset();
6660
}
6761

6862
std::thread spinner_; //!< Internal thread for spinning the executor
6963
std::atomic<bool> spinning_; //!< Flag for spinning the spin thread
70-
rclcpp::executors::SingleThreadedExecutor::SharedPtr exec_;
64+
rclcpp::executors::SingleThreadedExecutor::SharedPtr executor_;
7165
};
7266

7367
TEST_F(TestParameters, getPositiveParam)

fuse_core/test/test_throttled_callback.cpp

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@
3838

3939
#include <gtest/gtest.h>
4040

41-
#include <atomic>
42-
4341
/**
4442
* @brief A helper class to publish a given number geometry_msgs::msg::Point messages at a given frequency.
4543
*
@@ -93,9 +91,7 @@ class PointPublisher : public rclcpp::Node
9391
{
9492
geometry_msgs::msg::Point point_message;
9593
point_message.x = i;
96-
9794
publisher_->publish(point_message);
98-
9995
rate.sleep();
10096
}
10197
}
@@ -125,11 +121,7 @@ class PointSensorModel : public rclcpp::Node
125121
, throttled_callback_(
126122
std::bind(&PointSensorModel::keepCallback, this, std::placeholders::_1),
127123
std::bind(&PointSensorModel::dropCallback, this, std::placeholders::_1),
128-
throttle_period,
129-
false,
130-
std::make_shared<
131-
fuse_core::node_interfaces::NodeInterfaces<fuse_core::node_interfaces::Clock>
132-
>(*this)
124+
throttle_period
133125
)
134126
{
135127
subscription_ = this->create_subscription<geometry_msgs::msg::Point>(
@@ -219,31 +211,24 @@ class TestThrottledCallback : public ::testing::Test
219211
void SetUp() override
220212
{
221213
rclcpp::init(0, nullptr);
222-
exec_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
223-
224-
spinning_ = true;
214+
executor_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
225215
spinner_ = std::thread([&](){
226-
auto context = rclcpp::contexts::get_global_default_context();
227-
while(context->is_valid() && spinning_) {
228-
exec_->spin_some();
229-
}
216+
executor_->spin();
230217
});
231218
}
232219

233220
void TearDown() override
234221
{
235-
spinning_ = false;
222+
executor_->cancel();
236223
rclcpp::shutdown();
237224
if (spinner_.joinable()) {
238225
spinner_.join();
239226
}
240-
241-
exec_.reset();
227+
executor_.reset();
242228
}
243229

244230
std::thread spinner_; //!< Internal thread for spinning the executor
245-
std::atomic<bool> spinning_; //!< Flag for spinning the spin thread
246-
rclcpp::executors::SingleThreadedExecutor::SharedPtr exec_;
231+
rclcpp::executors::SingleThreadedExecutor::SharedPtr executor_;
247232
};
248233

249234
TEST_F(TestThrottledCallback, NoDroppedMessagesIfThrottlePeriodIsZero)
@@ -253,7 +238,7 @@ TEST_F(TestThrottledCallback, NoDroppedMessagesIfThrottlePeriodIsZero)
253238
// Start sensor model to listen to messages:
254239
const rclcpp::Duration throttled_period(0, 0);
255240
auto sensor_model = std::make_shared<PointSensorModel>(throttled_period);
256-
exec_->add_node(sensor_model);
241+
executor_->add_node(sensor_model);
257242

258243
// Time should be valid after the context is initialized. But it doesn't hurt to verify.
259244
ASSERT_TRUE(fuse_core::wait_for_valid(sensor_model->getNode()->get_clock(),
@@ -264,7 +249,7 @@ TEST_F(TestThrottledCallback, NoDroppedMessagesIfThrottlePeriodIsZero)
264249
const double frequency = 10.0;
265250

266251
auto publisher = std::make_shared<PointPublisher>(frequency);
267-
exec_->add_node(publisher);
252+
executor_->add_node(publisher);
268253
publisher->publish(num_messages);
269254

270255
// Check all messages are kept and none are dropped, because when the throttle period is zero, throttling is disabled:
@@ -277,7 +262,7 @@ TEST_F(TestThrottledCallback, DropMessagesIfThrottlePeriodIsGreaterThanPublishPe
277262
// Start sensor model to listen to messages:
278263
const rclcpp::Duration throttled_period(0, RCUTILS_S_TO_NS(0.2));
279264
auto sensor_model = std::make_shared<PointSensorModel>(throttled_period);
280-
exec_->add_node(sensor_model);
265+
executor_->add_node(sensor_model);
281266

282267
// Time should be valid after the context is initialized. But it doesn't hurt to verify.
283268
ASSERT_TRUE(fuse_core::wait_for_valid(sensor_model->getNode()->get_clock(),
@@ -290,7 +275,7 @@ TEST_F(TestThrottledCallback, DropMessagesIfThrottlePeriodIsGreaterThanPublishPe
290275
const double frequency = 1.0 / period;
291276

292277
auto publisher = std::make_shared<PointPublisher>(frequency);
293-
exec_->add_node(publisher);
278+
executor_->add_node(publisher);
294279
publisher->publish(num_messages);
295280

296281
// Check the number of kept and dropped callbacks:
@@ -306,7 +291,7 @@ TEST_F(TestThrottledCallback, AlwaysKeepFirstMessageEvenIfThrottlePeriodIsTooLar
306291
// Start sensor model to listen to messages:
307292
const rclcpp::Duration throttled_period(10, 0);
308293
auto sensor_model = std::make_shared<PointSensorModel>(throttled_period);
309-
exec_->add_node(sensor_model);
294+
executor_->add_node(sensor_model);
310295

311296
// Time should be valid after the context is initialized. But it doesn't hurt to verify.
312297
ASSERT_TRUE(fuse_core::wait_for_valid(sensor_model->getNode()->get_clock(),
@@ -321,7 +306,7 @@ TEST_F(TestThrottledCallback, AlwaysKeepFirstMessageEvenIfThrottlePeriodIsTooLar
321306

322307
auto publisher = std::make_shared<PointPublisher>(frequency);
323308
publisher->publish(num_messages);
324-
exec_->add_node(publisher);
309+
executor_->add_node(publisher);
325310

326311
// Check that regardless of the large throttled period, at least one message is ketpt:
327312
EXPECT_EQ(1u, sensor_model->getKeptMessages());

fuse_models/src/acceleration_2d.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ void Acceleration2D::onInit()
6464
params_.loadFromROS(private_node_handle_);
6565

6666
throttled_callback_.setThrottlePeriod(params_.throttle_period);
67-
throttled_callback_.setUseWallTime(params_.throttle_use_wall_time);
67+
68+
if (!params_.throttle_use_wall_time) {
69+
throttled_callback_.setClock(node_->get_clock());
70+
}
6871

6972
if (params_.indices.empty())
7073
{

0 commit comments

Comments
 (0)