@@ -292,29 +292,44 @@ class run_op {
292
292
: conn_{conn}
293
293
{ }
294
294
295
+ // Called after the parallel group finishes
295
296
template <class Self >
296
- void operator ()(Self& self, system::error_code ec)
297
+ void operator ()(
298
+ Self& self,
299
+ order_t order,
300
+ system::error_code ec0,
301
+ system::error_code ec1,
302
+ system::error_code ec2,
303
+ system::error_code ec3,
304
+ system::error_code)
297
305
{
298
- (*this )(self, order_t {}, ec);
306
+ system::error_code final_ec;
307
+
308
+ if (order[0 ] == 0 && !!ec0) {
309
+ // The hello op finished first and with an error
310
+ final_ec = ec0;
311
+ } else if (order[0 ] == 2 && ec2 == error::pong_timeout) {
312
+ // The check ping timeout finished first. Use the ping error code
313
+ final_ec = ec1;
314
+ } else {
315
+ // Use the reader error code
316
+ final_ec = ec3;
317
+ }
318
+
319
+ (*this )(self, final_ec);
299
320
}
300
321
322
+ // TODO: this op doesn't handle per-operation cancellation correctly
301
323
template <class Self >
302
- void operator ()(
303
- Self& self,
304
- order_t order = {},
305
- system::error_code ec0 = {},
306
- system::error_code ec1 = {},
307
- system::error_code ec2 = {},
308
- system::error_code ec3 = {},
309
- system::error_code = {})
324
+ void operator ()(Self& self, system::error_code ec = {})
310
325
{
311
326
BOOST_ASIO_CORO_REENTER (coro_)
312
327
{
313
328
// Check config
314
- ec0 = check_config (conn_->cfg_ );
315
- if (ec0 ) {
316
- conn_->logger_ .log (logger::level::err, " Invalid configuration" , ec0 );
317
- stored_ec_ = ec0 ;
329
+ ec = check_config (conn_->cfg_ );
330
+ if (ec ) {
331
+ conn_->logger_ .log (logger::level::err, " Invalid configuration" , ec );
332
+ stored_ec_ = ec ;
318
333
BOOST_ASIO_CORO_YIELD asio::async_immediate (self.get_io_executor (), std::move (self));
319
334
self.complete (stored_ec_);
320
335
return ;
@@ -325,66 +340,59 @@ class run_op {
325
340
BOOST_ASIO_CORO_YIELD
326
341
conn_->stream_ .async_connect (&conn_->cfg_ , &conn_->logger_ , std::move (self));
327
342
328
- // If we failed, try again
329
- if (ec0) {
330
- self.complete (ec0);
331
- return ;
332
- }
333
-
334
- conn_->mpx_ .reset ();
335
-
336
- // Note: Order is important here because the writer might
337
- // trigger an async_write before the async_hello thereby
338
- // causing an authentication problem.
339
- BOOST_ASIO_CORO_YIELD
340
- asio::experimental::make_parallel_group (
341
- [this ](auto token) {
342
- return conn_->handshaker_ .async_hello (*conn_, token);
343
- },
344
- [this ](auto token) {
345
- return conn_->health_checker_ .async_ping (*conn_, token);
346
- },
347
- [this ](auto token) {
348
- return conn_->health_checker_ .async_check_timeout (*conn_, token);
349
- },
350
- [this ](auto token) {
351
- return conn_->reader (token);
352
- },
353
- [this ](auto token) {
354
- return conn_->writer (token);
355
- })
356
- .async_wait (asio::experimental::wait_for_one_error (), std::move (self));
357
-
358
- if (order[0 ] == 0 && !!ec0) {
359
- self.complete (ec0);
360
- return ;
361
- }
362
-
363
- if (order[0 ] == 2 && ec2 == error::pong_timeout) {
364
- self.complete (ec1);
365
- return ;
343
+ // If we were successful, run all the connection tasks
344
+ if (!ec) {
345
+ conn_->mpx_ .reset ();
346
+
347
+ // Note: Order is important here because the writer might
348
+ // trigger an async_write before the async_hello thereby
349
+ // causing an authentication problem.
350
+ BOOST_ASIO_CORO_YIELD
351
+ asio::experimental::make_parallel_group (
352
+ [this ](auto token) {
353
+ return conn_->handshaker_ .async_hello (*conn_, token);
354
+ },
355
+ [this ](auto token) {
356
+ return conn_->health_checker_ .async_ping (*conn_, token);
357
+ },
358
+ [this ](auto token) {
359
+ return conn_->health_checker_ .async_check_timeout (*conn_, token);
360
+ },
361
+ [this ](auto token) {
362
+ return conn_->reader (token);
363
+ },
364
+ [this ](auto token) {
365
+ return conn_->writer (token);
366
+ })
367
+ .async_wait (asio::experimental::wait_for_one_error (), std::move (self));
368
+
369
+ // The parallel group result will be translated into a single error
370
+ // code by the specialized operator() overload
371
+
372
+ // The receive operation must be cancelled because channel
373
+ // subscription does not survive a reconnection but requires
374
+ // re-subscription.
375
+ conn_->cancel (operation::receive);
366
376
}
367
377
368
- // The receive operation must be cancelled because channel
369
- // subscription does not survive a reconnection but requires
370
- // re-subscription.
371
- conn_->cancel (operation::receive);
372
-
378
+ // If we are not going to try again, we're done
373
379
if (!conn_->will_reconnect ()) {
374
- conn_->cancel (operation::reconnection);
375
- self.complete (ec3);
380
+ self.complete (ec);
376
381
return ;
377
382
}
378
383
384
+ // Wait for the reconnection interval
379
385
conn_->reconnect_timer_ .expires_after (conn_->cfg_ .reconnect_wait_interval );
380
-
381
386
BOOST_ASIO_CORO_YIELD
382
- conn_->reconnect_timer_ .async_wait (asio::prepend (std::move (self), order_t {}));
383
- if (ec0) {
384
- self.complete (ec0);
387
+ conn_->reconnect_timer_ .async_wait (std::move (self));
388
+
389
+ // If the timer was cancelled, exit
390
+ if (ec) {
391
+ self.complete (ec);
385
392
return ;
386
393
}
387
394
395
+ // If we won't reconnect, exit
388
396
if (!conn_->will_reconnect ()) {
389
397
self.complete (asio::error::operation_aborted);
390
398
return ;
0 commit comments