Skip to content

Commit 7d0871f

Browse files
committed
Adds a sans-io fsm for read operation.
1 parent 89a42db commit 7d0871f

File tree

13 files changed

+470
-107
lines changed

13 files changed

+470
-107
lines changed

include/boost/redis/connection.hpp

Lines changed: 52 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <boost/redis/detail/health_checker.hpp>
1515
#include <boost/redis/detail/helper.hpp>
1616
#include <boost/redis/detail/multiplexer.hpp>
17+
#include <boost/redis/detail/reader_fsm.hpp>
1718
#include <boost/redis/detail/redis_stream.hpp>
1819
#include <boost/redis/detail/resp3_handshaker.hpp>
1920
#include <boost/redis/error.hpp>
@@ -206,85 +207,57 @@ struct writer_op {
206207
};
207208

208209
template <class Conn, class Logger>
209-
struct reader_op {
210-
using dyn_buffer_type = asio::dynamic_string_buffer<
211-
char,
212-
std::char_traits<char>,
213-
std::allocator<char>>;
214-
210+
class reader_op {
211+
private:
215212
// TODO: Move this to config so the user can fine tune?
216213
static constexpr std::size_t buffer_growth_hint = 4096;
217214

218215
Conn* conn_;
219216
Logger logger_;
220-
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
221-
asio::coroutine coro{};
217+
detail::reader_fsm fsm_;
218+
219+
public:
220+
reader_op(Conn& conn, Logger logger) noexcept
221+
: conn_{&conn}
222+
, logger_{logger}
223+
, fsm_{conn.mpx_}
224+
{ }
222225

223226
template <class Self>
224227
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
225228
{
226-
BOOST_ASIO_CORO_REENTER(coro) for (;;)
227-
{
228-
// Appends some data to the buffer if necessary.
229-
BOOST_ASIO_CORO_YIELD
230-
async_append_some(
231-
conn_->stream_,
232-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
233-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
234-
std::move(self));
235-
236-
logger_.on_read(ec, n);
237-
238-
// The connection is not viable after an error.
239-
if (ec) {
240-
logger_.trace("reader_op (1)", ec);
241-
conn_->cancel(operation::run);
242-
self.complete(ec);
243-
return;
244-
}
245-
246-
// The connection might have been canceled while this op was
247-
// suspended or after queueing so we have to check.
248-
if (!conn_->is_open()) {
249-
logger_.trace("reader_op (2): connection is closed.");
250-
self.complete(ec);
251-
return;
252-
}
253-
254-
while (!conn_->mpx_.get_read_buffer().empty()) {
255-
res_ = conn_->mpx_.consume_next(ec);
256-
257-
if (ec) {
258-
logger_.trace("reader_op (3)", ec);
259-
conn_->cancel(operation::run);
260-
self.complete(ec);
229+
using dyn_buffer_type = asio::dynamic_string_buffer<
230+
char,
231+
std::char_traits<char>,
232+
std::allocator<char>>;
233+
234+
for (;;) {
235+
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
236+
237+
logger_.on_fsm_resume(act);
238+
239+
switch (act.type_) {
240+
case reader_fsm::action::type::setup_cancellation:
241+
self.reset_cancellation_state(asio::enable_terminal_cancellation());
242+
continue;
243+
case reader_fsm::action::type::needs_more:
244+
case reader_fsm::action::type::append_some:
245+
async_append_some(
246+
conn_->stream_,
247+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
248+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
249+
std::move(self));
261250
return;
262-
}
263-
264-
if (!res_.first.has_value()) {
265-
// More data is needed.
266-
break;
267-
}
268-
269-
if (res_.first.value()) {
270-
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
271-
BOOST_ASIO_CORO_YIELD
272-
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
273-
}
274-
275-
if (ec) {
276-
logger_.trace("reader_op (4)", ec);
277-
conn_->cancel(operation::run);
278-
self.complete(ec);
279-
return;
280-
}
281-
282-
if (!conn_->is_open()) {
283-
logger_.trace("reader_op (5): connection is closed.");
284-
self.complete(asio::error::operation_aborted);
251+
case reader_fsm::action::type::notify_push_receiver:
252+
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
253+
continue;
254+
} else {
255+
conn_->receive_channel_.async_send(ec, act.push_size_, std::move(self));
285256
return;
286257
}
287-
}
258+
return;
259+
case reader_fsm::action::type::cancel_run: conn_->cancel(operation::run); continue;
260+
case reader_fsm::action::type::done: self.complete(act.ec_); return;
288261
}
289262
}
290263
}
@@ -340,20 +313,20 @@ class run_op {
340313
// causing an authentication problem.
341314
BOOST_ASIO_CORO_YIELD
342315
asio::experimental::make_parallel_group(
343-
[this](auto token) {
344-
return conn_->handshaker_.async_hello(*conn_, logger_, token);
316+
[this, logger = logger_](auto token) {
317+
return conn_->handshaker_.async_hello(*conn_, logger, token);
345318
},
346-
[this](auto token) {
347-
return conn_->health_checker_.async_ping(*conn_, logger_, token);
319+
[this, logger = logger_](auto token) {
320+
return conn_->health_checker_.async_ping(*conn_, logger, token);
348321
},
349-
[this](auto token) {
350-
return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);
322+
[this, logger = logger_](auto token) {
323+
return conn_->health_checker_.async_check_timeout(*conn_, logger, token);
351324
},
352-
[this](auto token) {
353-
return conn_->reader(logger_, token);
325+
[this, logger = logger_](auto token) {
326+
return conn_->reader(logger, token);
354327
},
355-
[this](auto token) {
356-
return conn_->writer(logger_, token);
328+
[this, logger = logger_](auto token) {
329+
return conn_->writer(logger, token);
357330
})
358331
.async_wait(asio::experimental::wait_for_one_error(), std::move(self));
359332

@@ -730,7 +703,7 @@ class basic_connection {
730703
mpx_.cancel_on_conn_lost();
731704
}
732705

733-
template <class, class> friend struct detail::reader_op;
706+
template <class, class> friend class detail::reader_op;
734707
template <class, class> friend struct detail::writer_op;
735708
template <class> friend struct detail::exec_op;
736709
template <class, class> friend class detail::run_op;
@@ -739,7 +712,7 @@ class basic_connection {
739712
auto reader(Logger l, CompletionToken&& token)
740713
{
741714
return asio::async_compose<CompletionToken, void(system::error_code)>(
742-
detail::reader_op<this_type, Logger>{this, l},
715+
detail::reader_op<this_type, Logger>{*this, l},
743716
std::forward<CompletionToken>(token),
744717
writer_timer_);
745718
}

include/boost/redis/detail/coroutine.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#define BOOST_REDIS_YIELD(resume_point_var, resume_point_id, ...) \
2626
{ \
2727
resume_point_var = resume_point_id; \
28-
return __VA_ARGS__; \
28+
return {__VA_ARGS__}; \
2929
case resume_point_id: \
3030
{ \
3131
} \
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#ifndef BOOST_REDIS_READER_FSM_HPP
8+
#define BOOST_REDIS_READER_FSM_HPP
9+
10+
#include <boost/redis/detail/multiplexer.hpp>
11+
12+
#include <boost/asio/cancellation_type.hpp>
13+
#include <boost/system/error_code.hpp>
14+
15+
#include <cstddef>
16+
17+
namespace boost::redis::detail {
18+
19+
class reader_fsm {
20+
public:
21+
struct action {
22+
enum class type
23+
{
24+
setup_cancellation,
25+
append_some,
26+
needs_more,
27+
notify_push_receiver,
28+
cancel_run,
29+
done,
30+
};
31+
32+
type type_ = type::setup_cancellation;
33+
std::size_t push_size_ = 0;
34+
system::error_code ec_ = {};
35+
};
36+
37+
explicit reader_fsm(multiplexer& mpx) noexcept;
38+
39+
action resume(
40+
std::size_t bytes_read,
41+
system::error_code ec,
42+
asio::cancellation_type_t /*cancel_state*/);
43+
44+
private:
45+
int resume_point_{0};
46+
action action_after_resume_;
47+
action::type next_read_type_ = action::type::append_some;
48+
multiplexer* mpx_ = nullptr;
49+
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
50+
};
51+
52+
} // namespace boost::redis::detail
53+
54+
#endif // BOOST_REDIS_READER_FSM_HPP

include/boost/redis/impl/exec_fsm.ipp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,7 @@ inline bool is_cancellation(asio::cancellation_type_t type)
2525
asio::cancellation_type_t::terminal));
2626
}
2727

28-
} // namespace boost::redis::detail
29-
30-
boost::redis::detail::exec_action boost::redis::detail::exec_fsm::resume(
31-
bool connection_is_open,
32-
asio::cancellation_type_t cancel_state)
28+
exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t cancel_state)
3329
{
3430
switch (resume_point_) {
3531
BOOST_REDIS_CORO_INITIAL
@@ -91,4 +87,6 @@ boost::redis::detail::exec_action boost::redis::detail::exec_fsm::resume(
9187
return exec_action{system::error_code()};
9288
}
9389

94-
#endif
90+
} // namespace boost::redis::detail
91+
92+
#endif

include/boost/redis/impl/logger.ipp

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,42 @@
1313

1414
namespace boost::redis {
1515

16+
namespace detail {
17+
18+
#define BOOST_REDIS_READER_SWITCH_CASE(elem) \
19+
case reader_fsm::action::type::elem: return "reader_fsm::action::type::" #elem
20+
21+
#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \
22+
case exec_action_type::elem: return "exec_action_type::" #elem
23+
24+
auto to_string(reader_fsm::action::type t) noexcept -> char const*
25+
{
26+
switch (t) {
27+
BOOST_REDIS_READER_SWITCH_CASE(setup_cancellation);
28+
BOOST_REDIS_READER_SWITCH_CASE(append_some);
29+
BOOST_REDIS_READER_SWITCH_CASE(needs_more);
30+
BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver);
31+
BOOST_REDIS_READER_SWITCH_CASE(cancel_run);
32+
BOOST_REDIS_READER_SWITCH_CASE(done);
33+
default: return "action::type::<invalid type>";
34+
}
35+
}
36+
37+
auto to_string(exec_action_type t) noexcept -> char const*
38+
{
39+
switch (t) {
40+
BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation);
41+
BOOST_REDIS_EXEC_SWITCH_CASE(immediate);
42+
BOOST_REDIS_EXEC_SWITCH_CASE(done);
43+
BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer);
44+
BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response);
45+
BOOST_REDIS_EXEC_SWITCH_CASE(cancel_run);
46+
default: return "exec_action_type::<invalid type>";
47+
}
48+
}
49+
50+
} // namespace detail
51+
1652
void logger::write_prefix()
1753
{
1854
if (!std::empty(prefix_))
@@ -89,19 +125,15 @@ void logger::on_write(system::error_code const& ec, std::string_view payload)
89125
std::clog << std::endl;
90126
}
91127

92-
void logger::on_read(system::error_code const& ec, std::size_t n)
128+
void logger::on_fsm_resume(detail::reader_fsm::action const& action)
93129
{
94-
if (level_ < level::info)
130+
if (level_ < level::debug)
95131
return;
96132

97133
write_prefix();
98134

99-
if (ec)
100-
std::clog << "reader_op: " << ec.message();
101-
else
102-
std::clog << "reader_op: " << n << " bytes read.";
103-
104-
std::clog << std::endl;
135+
std::clog << "fsm action: (" << to_string(action.type_) << ", " << action.push_size_ << ", "
136+
<< action.ec_.message() << ")" << std::endl;
105137
}
106138

107139
void logger::on_hello(system::error_code const& ec, generic_response const& resp)

0 commit comments

Comments
 (0)