Skip to content

Commit 2580b28

Browse files
committed
Avoids suspending the fsm when a sync push notification is possible.
1 parent 9de5025 commit 2580b28

File tree

4 files changed

+63
-22
lines changed

4 files changed

+63
-22
lines changed

include/boost/redis/connection.hpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ struct reader_op {
223223
public:
224224
reader_op(Conn& conn) noexcept
225225
: conn_{&conn}
226-
, fsm_{conn.mpx_}
226+
, fsm_{conn.mpx_, [&conn](std::size_t size){return conn.receive_channel_.try_send(system::error_code(), size);}}
227227
{ }
228228

229229
template <class Self>
@@ -252,12 +252,7 @@ struct reader_op {
252252
std::move(self));
253253
return;
254254
case reader_fsm::action::type::notify_push_receiver:
255-
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
256-
continue;
257-
} else {
258-
conn_->receive_channel_.async_send(ec, act.push_size_, std::move(self));
259-
return;
260-
}
255+
conn_->receive_channel_.async_send(ec, act.push_size_, std::move(self));
261256
return;
262257
case reader_fsm::action::type::cancel_run: conn_->cancel(operation::run); continue;
263258
case reader_fsm::action::type::done: self.complete(act.ec_); return;

include/boost/redis/detail/reader_fsm.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
#include <boost/system/error_code.hpp>
1414

1515
#include <cstddef>
16+
#include <functional>
1617

1718
namespace boost::redis::detail {
1819

1920
class reader_fsm {
2021
public:
22+
using push_notifier_type = std::function<bool(std::size_t)>;
23+
2124
struct action {
2225
enum class type
2326
{
@@ -34,7 +37,8 @@ class reader_fsm {
3437
system::error_code ec_ = {};
3538
};
3639

37-
explicit reader_fsm(multiplexer& mpx) noexcept;
40+
explicit
41+
reader_fsm(multiplexer& mpx, push_notifier_type push_notifier) noexcept;
3842

3943
action resume(
4044
std::size_t bytes_read,
@@ -46,6 +50,7 @@ class reader_fsm {
4650
action action_after_resume_;
4751
action::type next_read_type_ = action::type::append_some;
4852
multiplexer* mpx_ = nullptr;
53+
push_notifier_type push_notifier_;
4954
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
5055
};
5156

include/boost/redis/impl/reader_fsm.ipp

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010

1111
namespace boost::redis::detail {
1212

13-
reader_fsm::reader_fsm(multiplexer& mpx) noexcept
13+
reader_fsm::reader_fsm(multiplexer& mpx, push_notifier_type push_notifier) noexcept
1414
: mpx_{&mpx}
15+
, push_notifier_{std::move(push_notifier)}
1516
{ }
1617

1718
reader_fsm::action reader_fsm::resume(
@@ -48,20 +49,13 @@ reader_fsm::action reader_fsm::resume(
4849
break;
4950
}
5051

51-
if (!res_.first.value()) {
52+
if (!res_.first.value() && !push_notifier_(res_.second)) {
5253
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
5354
if (ec) {
5455
action_after_resume_ = {action::type::done, 0u, ec};
5556
BOOST_REDIS_YIELD(resume_point_, 7, action::type::cancel_run)
5657
return action_after_resume_;
5758
}
58-
} else {
59-
// TODO: Here we should notify the exec operation that
60-
// it can be completed. This will improve log clarity
61-
// and will make this code symmetrical in how it
62-
// handles pushes and other messages. The new action
63-
// type can be named notify_exec. To do that we need to
64-
// refactor the multiplexer.
6559
}
6660
}
6761
}

test/test_reader_fsm.cpp

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,16 @@ std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t)
3535
// Operators
3636
namespace {
3737

38+
// Sync push notifier that always fails so that the fsm has to suspend
39+
// so that the caller can notify it asynchronously.
40+
auto sync_push_notifier = [](std::size_t){return false;};
41+
3842
void test_push()
3943
{
4044
multiplexer mpx;
4145
generic_response resp;
4246
mpx.set_receive_response(resp);
43-
reader_fsm fsm{mpx};
47+
reader_fsm fsm{mpx, sync_push_notifier};
4448
error_code ec;
4549
action act;
4650

@@ -85,7 +89,7 @@ void test_read_needs_more()
8589
multiplexer mpx;
8690
generic_response resp;
8791
mpx.set_receive_response(resp);
88-
reader_fsm fsm{mpx};
92+
reader_fsm fsm{mpx, sync_push_notifier};
8993
error_code ec;
9094
action act;
9195

@@ -130,7 +134,7 @@ void test_read_error()
130134
multiplexer mpx;
131135
generic_response resp;
132136
mpx.set_receive_response(resp);
133-
reader_fsm fsm{mpx};
137+
reader_fsm fsm{mpx, sync_push_notifier};
134138
error_code ec;
135139
action act;
136140

@@ -160,7 +164,7 @@ void test_parse_error()
160164
multiplexer mpx;
161165
generic_response resp;
162166
mpx.set_receive_response(resp);
163-
reader_fsm fsm{mpx};
167+
reader_fsm fsm{mpx, sync_push_notifier};
164168
error_code ec;
165169
action act;
166170

@@ -190,7 +194,7 @@ void test_push_deliver_error()
190194
multiplexer mpx;
191195
generic_response resp;
192196
mpx.set_receive_response(resp);
193-
reader_fsm fsm{mpx};
197+
reader_fsm fsm{mpx, sync_push_notifier};
194198
error_code ec;
195199
action act;
196200

@@ -219,10 +223,53 @@ void test_push_deliver_error()
219223
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
220224
}
221225

226+
void test_sync_send_push()
227+
{
228+
auto push_notifier = [i = 0](std::size_t) mutable
229+
{
230+
// When i == 3 we are simulating that the push channel became
231+
// full and an async notification is necessary.
232+
return ++i != 3;
233+
};
234+
235+
multiplexer mpx;
236+
generic_response resp;
237+
mpx.set_receive_response(resp);
238+
reader_fsm fsm{mpx, push_notifier};
239+
error_code ec;
240+
action act;
241+
242+
// Initiate
243+
act = fsm.resume(0, ec, cancellation_type_t::none);
244+
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
245+
act = fsm.resume(0, ec, cancellation_type_t::none);
246+
BOOST_TEST_EQ(act.type_, action::type::append_some);
247+
248+
// The fsm is asking for data.
249+
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
250+
mpx.get_read_buffer().append(">1\r\n+msg2 \r\n");
251+
mpx.get_read_buffer().append(">1\r\n+msg3 \r\n");
252+
auto const bytes_read = mpx.get_read_buffer().size();
253+
254+
// Only the last push has to be notified asynchronously. The other
255+
// ones have been notified synchronously by the callback passed in
256+
// the constructor.
257+
act = fsm.resume(bytes_read, ec, cancellation_type_t::none);
258+
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
259+
BOOST_TEST_EQ(act.push_size_, 13u);
260+
BOOST_TEST_EQ(act.ec_, error_code());
261+
262+
// All pushes were delivered so the fsm should demand more data
263+
act = fsm.resume(0, ec, cancellation_type_t::none);
264+
BOOST_TEST_EQ(act.type_, action::type::append_some);
265+
BOOST_TEST_EQ(act.ec_, error_code());
266+
}
267+
222268
} // namespace
223269

224270
int main()
225271
{
272+
test_sync_send_push();
226273
test_push_deliver_error();
227274
test_read_needs_more();
228275
test_push();

0 commit comments

Comments
 (0)