Skip to content

Commit b822247

Browse files
authored
Merge pull request #274 from boostorg/refactoring_clean_code
Adds a sans-io fsm to the read operation.
2 parents f04d97f + 620b1e9 commit b822247

13 files changed

+459
-91
lines changed

include/boost/redis/connection.hpp

Lines changed: 38 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <boost/redis/detail/health_checker.hpp>
1616
#include <boost/redis/detail/helper.hpp>
1717
#include <boost/redis/detail/multiplexer.hpp>
18+
#include <boost/redis/detail/reader_fsm.hpp>
1819
#include <boost/redis/detail/redis_stream.hpp>
1920
#include <boost/redis/detail/resp3_handshaker.hpp>
2021
#include <boost/redis/error.hpp>
@@ -217,74 +218,49 @@ struct reader_op {
217218
static constexpr std::size_t buffer_growth_hint = 4096;
218219

219220
Conn* conn_;
220-
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
221-
asio::coroutine coro{};
221+
detail::reader_fsm fsm_;
222+
223+
public:
224+
reader_op(Conn& conn) noexcept
225+
: conn_{&conn}
226+
, fsm_{conn.mpx_}
227+
{ }
222228

223229
template <class Self>
224230
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
225231
{
226-
BOOST_ASIO_CORO_REENTER(coro) for (;;)
227-
{
228-
// Appends some data to the buffer if necessary.
229-
BOOST_ASIO_CORO_YIELD
230-
async_append_some(
231-
conn_->stream_,
232-
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
233-
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
234-
std::move(self));
235-
236-
conn_->logger_.on_read(ec, n);
237-
238-
// The connection is not viable after an error.
239-
if (ec) {
240-
conn_->logger_.trace("reader_op (1)", ec);
241-
conn_->cancel(operation::run);
242-
self.complete(ec);
243-
return;
244-
}
245-
246-
// The connection might have been canceled while this op was
247-
// suspended or after queueing so we have to check.
248-
if (!conn_->is_open()) {
249-
conn_->logger_.trace("reader_op (2): connection is closed.");
250-
self.complete(ec);
251-
return;
252-
}
253-
254-
while (!conn_->mpx_.get_read_buffer().empty()) {
255-
res_ = conn_->mpx_.consume_next(ec);
256-
257-
if (ec) {
258-
conn_->logger_.trace("reader_op (3)", ec);
259-
conn_->cancel(operation::run);
260-
self.complete(ec);
232+
using dyn_buffer_type = asio::dynamic_string_buffer<
233+
char,
234+
std::char_traits<char>,
235+
std::allocator<char>>;
236+
237+
for (;;) {
238+
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
239+
240+
conn_->logger_.on_fsm_resume(act);
241+
242+
switch (act.type_) {
243+
case reader_fsm::action::type::setup_cancellation:
244+
self.reset_cancellation_state(asio::enable_terminal_cancellation());
245+
continue;
246+
case reader_fsm::action::type::needs_more:
247+
case reader_fsm::action::type::append_some:
248+
async_append_some(
249+
conn_->stream_,
250+
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
251+
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
252+
std::move(self));
261253
return;
262-
}
263-
264-
if (!res_.first.has_value()) {
265-
// More data is needed.
266-
break;
267-
}
268-
269-
if (res_.first.value()) {
270-
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
271-
BOOST_ASIO_CORO_YIELD
272-
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
273-
}
274-
275-
if (ec) {
276-
conn_->logger_.trace("reader_op (4)", ec);
277-
conn_->cancel(operation::run);
278-
self.complete(ec);
279-
return;
280-
}
281-
282-
if (!conn_->is_open()) {
283-
conn_->logger_.trace("reader_op (5): connection is closed.");
284-
self.complete(asio::error::operation_aborted);
254+
case reader_fsm::action::type::notify_push_receiver:
255+
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
256+
continue;
257+
} else {
258+
conn_->receive_channel_.async_send(ec, act.push_size_, std::move(self));
285259
return;
286260
}
287-
}
261+
return;
262+
case reader_fsm::action::type::cancel_run: conn_->cancel(operation::run); continue;
263+
case reader_fsm::action::type::done: self.complete(act.ec_); return;
288264
}
289265
}
290266
}
@@ -844,7 +820,7 @@ class basic_connection {
844820
auto reader(CompletionToken&& token)
845821
{
846822
return asio::async_compose<CompletionToken, void(system::error_code)>(
847-
detail::reader_op<this_type>{this},
823+
detail::reader_op<this_type>{*this},
848824
std::forward<CompletionToken>(token),
849825
writer_timer_);
850826
}

include/boost/redis/detail/connection_logger.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#ifndef BOOST_REDIS_CONNECTION_LOGGER_HPP
88
#define BOOST_REDIS_CONNECTION_LOGGER_HPP
99

10+
#include <boost/redis/detail/reader_fsm.hpp>
1011
#include <boost/redis/logger.hpp>
1112
#include <boost/redis/response.hpp>
1213

@@ -37,7 +38,7 @@ class connection_logger {
3738
void on_connect(system::error_code const& ec, std::string_view unix_socket_ep);
3839
void on_ssl_handshake(system::error_code const& ec);
3940
void on_write(system::error_code const& ec, std::size_t n);
40-
void on_read(system::error_code const& ec, std::size_t n);
41+
void on_fsm_resume(reader_fsm::action const& action);
4142
void on_hello(system::error_code const& ec, generic_response const& resp);
4243
void log(logger::level lvl, std::string_view msg);
4344
void log(logger::level lvl, std::string_view op, system::error_code const& ec);

include/boost/redis/detail/coroutine.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#define BOOST_REDIS_YIELD(resume_point_var, resume_point_id, ...) \
2626
{ \
2727
resume_point_var = resume_point_id; \
28-
return __VA_ARGS__; \
28+
return {__VA_ARGS__}; \
2929
case resume_point_id: \
3030
{ \
3131
} \
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#ifndef BOOST_REDIS_READER_FSM_HPP
8+
#define BOOST_REDIS_READER_FSM_HPP
9+
10+
#include <boost/redis/detail/multiplexer.hpp>
11+
12+
#include <boost/asio/cancellation_type.hpp>
13+
#include <boost/system/error_code.hpp>
14+
15+
#include <cstddef>
16+
17+
namespace boost::redis::detail {
18+
19+
class reader_fsm {
20+
public:
21+
struct action {
22+
enum class type
23+
{
24+
setup_cancellation,
25+
append_some,
26+
needs_more,
27+
notify_push_receiver,
28+
cancel_run,
29+
done,
30+
};
31+
32+
type type_ = type::setup_cancellation;
33+
std::size_t push_size_ = 0;
34+
system::error_code ec_ = {};
35+
};
36+
37+
explicit reader_fsm(multiplexer& mpx) noexcept;
38+
39+
action resume(
40+
std::size_t bytes_read,
41+
system::error_code ec,
42+
asio::cancellation_type_t /*cancel_state*/);
43+
44+
private:
45+
int resume_point_{0};
46+
action action_after_resume_;
47+
action::type next_read_type_ = action::type::append_some;
48+
multiplexer* mpx_ = nullptr;
49+
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
50+
};
51+
52+
} // namespace boost::redis::detail
53+
54+
#endif // BOOST_REDIS_READER_FSM_HPP

include/boost/redis/impl/connection_logger.ipp

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva ([email protected])
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
22
*
33
* Distributed under the Boost Software License, Version 1.0. (See
44
* accompanying file LICENSE.txt)
@@ -14,6 +14,38 @@
1414

1515
namespace boost::redis::detail {
1616

17+
#define BOOST_REDIS_READER_SWITCH_CASE(elem) \
18+
case reader_fsm::action::type::elem: return "reader_fsm::action::type::" #elem
19+
20+
#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \
21+
case exec_action_type::elem: return "exec_action_type::" #elem
22+
23+
auto to_string(reader_fsm::action::type t) noexcept -> char const*
24+
{
25+
switch (t) {
26+
BOOST_REDIS_READER_SWITCH_CASE(setup_cancellation);
27+
BOOST_REDIS_READER_SWITCH_CASE(append_some);
28+
BOOST_REDIS_READER_SWITCH_CASE(needs_more);
29+
BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver);
30+
BOOST_REDIS_READER_SWITCH_CASE(cancel_run);
31+
BOOST_REDIS_READER_SWITCH_CASE(done);
32+
default: return "action::type::<invalid type>";
33+
}
34+
}
35+
36+
auto to_string(exec_action_type t) noexcept -> char const*
37+
{
38+
switch (t) {
39+
BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation);
40+
BOOST_REDIS_EXEC_SWITCH_CASE(immediate);
41+
BOOST_REDIS_EXEC_SWITCH_CASE(done);
42+
BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer);
43+
BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response);
44+
BOOST_REDIS_EXEC_SWITCH_CASE(cancel_run);
45+
default: return "exec_action_type::<invalid type>";
46+
}
47+
}
48+
1749
inline void format_tcp_endpoint(const asio::ip::tcp::endpoint& ep, std::string& to)
1850
{
1951
// This formatting is inspired by Asio's endpoint operator<<
@@ -125,20 +157,21 @@ void connection_logger::on_write(system::error_code const& ec, std::size_t n)
125157
logger_.fn(logger::level::info, msg_);
126158
}
127159

128-
void connection_logger::on_read(system::error_code const& ec, std::size_t n)
160+
void connection_logger::on_fsm_resume(reader_fsm::action const& action)
129161
{
130-
if (logger_.lvl < logger::level::info)
162+
if (logger_.lvl < logger::level::debug)
131163
return;
132164

133-
msg_ = "reader_op: ";
134-
if (ec) {
135-
format_error_code(ec, msg_);
136-
} else {
137-
msg_ += std::to_string(n);
138-
msg_ += " bytes read.";
139-
}
165+
std::string msg;
166+
msg += "(";
167+
msg += to_string(action.type_);
168+
msg += ", ";
169+
msg += std::to_string(action.push_size_);
170+
msg += ", ";
171+
msg += action.ec_.message();
172+
msg += ")";
140173

141-
logger_.fn(logger::level::info, msg_);
174+
logger_.fn(logger::level::debug, msg);
142175
}
143176

144177
void connection_logger::on_hello(system::error_code const& ec, generic_response const& resp)
@@ -180,4 +213,4 @@ void connection_logger::log(logger::level lvl, std::string_view op, system::erro
180213
logger_.fn(lvl, msg_);
181214
}
182215

183-
} // namespace boost::redis::detail
216+
} // namespace boost::redis::detail

include/boost/redis/impl/exec_fsm.ipp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,7 @@ inline bool is_cancellation(asio::cancellation_type_t type)
2525
asio::cancellation_type_t::terminal));
2626
}
2727

28-
} // namespace boost::redis::detail
29-
30-
boost::redis::detail::exec_action boost::redis::detail::exec_fsm::resume(
31-
bool connection_is_open,
32-
asio::cancellation_type_t cancel_state)
28+
exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t cancel_state)
3329
{
3430
switch (resume_point_) {
3531
BOOST_REDIS_CORO_INITIAL
@@ -91,4 +87,6 @@ boost::redis::detail::exec_action boost::redis::detail::exec_fsm::resume(
9187
return exec_action{system::error_code()};
9288
}
9389

94-
#endif
90+
} // namespace boost::redis::detail
91+
92+
#endif
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
2+
*
3+
* Distributed under the Boost Software License, Version 1.0. (See
4+
* accompanying file LICENSE.txt)
5+
*/
6+
7+
#include <boost/redis/detail/coroutine.hpp>
8+
#include <boost/redis/detail/multiplexer.hpp>
9+
#include <boost/redis/detail/reader_fsm.hpp>
10+
11+
namespace boost::redis::detail {
12+
13+
reader_fsm::reader_fsm(multiplexer& mpx) noexcept
14+
: mpx_{&mpx}
15+
{ }
16+
17+
reader_fsm::action reader_fsm::resume(
18+
std::size_t bytes_read,
19+
system::error_code ec,
20+
asio::cancellation_type_t /*cancel_state*/)
21+
{
22+
switch (resume_point_) {
23+
BOOST_REDIS_CORO_INITIAL
24+
BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation)
25+
26+
for (;;) {
27+
BOOST_REDIS_YIELD(resume_point_, 2, next_read_type_)
28+
if (ec) {
29+
// TODO: If an error occurred but data was read (i.e.
30+
// bytes_read != 0) we should try to process that data and
31+
// deliver it to the user before calling cancel_run.
32+
action_after_resume_ = {action::type::done, bytes_read, ec};
33+
BOOST_REDIS_YIELD(resume_point_, 3, action::type::cancel_run)
34+
return action_after_resume_;
35+
}
36+
37+
next_read_type_ = action::type::append_some;
38+
while (!mpx_->get_read_buffer().empty()) {
39+
res_ = mpx_->consume_next(ec);
40+
if (ec) {
41+
action_after_resume_ = {action::type::done, res_.second, ec};
42+
BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run)
43+
return action_after_resume_;
44+
}
45+
46+
if (!res_.first.has_value()) {
47+
next_read_type_ = action::type::needs_more;
48+
break;
49+
}
50+
51+
if (res_.first.value()) {
52+
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
53+
if (ec) {
54+
action_after_resume_ = {action::type::done, 0u, ec};
55+
BOOST_REDIS_YIELD(resume_point_, 7, action::type::cancel_run)
56+
return action_after_resume_;
57+
}
58+
} else {
59+
// TODO: Here we should notify the exec operation that
60+
// it can be completed. This will improve log clarity
61+
// and will make this code symmetrical in how it
62+
// handles pushes and other messages. The new action
63+
// type can be named notify_exec. To do that we need to
64+
// refactor the multiplexer.
65+
}
66+
}
67+
}
68+
}
69+
70+
BOOST_ASSERT(false);
71+
return {action::type::done, 0, system::error_code()};
72+
}
73+
74+
} // namespace boost::redis::detail

include/boost/redis/src.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <boost/redis/impl/ignore.ipp>
1212
#include <boost/redis/impl/logger.ipp>
1313
#include <boost/redis/impl/multiplexer.ipp>
14+
#include <boost/redis/impl/reader_fsm.ipp>
1415
#include <boost/redis/impl/request.ipp>
1516
#include <boost/redis/impl/resp3_handshaker.ipp>
1617
#include <boost/redis/impl/response.ipp>

0 commit comments

Comments
 (0)