@@ -290,12 +290,25 @@ struct reader_op {
290
290
}
291
291
};
292
292
293
+ inline system::error_code check_config (const config& cfg)
294
+ {
295
+ if (!cfg.unix_socket .empty ()) {
296
+ #ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
297
+ return error::unix_sockets_unsupported;
298
+ #endif
299
+ if (cfg.use_ssl )
300
+ return error::unix_sockets_ssl_unsupported;
301
+ }
302
+ return system::error_code{};
303
+ }
304
+
293
305
template <class Conn , class Logger >
294
306
class run_op {
295
307
private:
296
308
Conn* conn_ = nullptr ;
297
309
Logger logger_;
298
310
asio::coroutine coro_{};
311
+ system::error_code stored_ec_;
299
312
300
313
using order_t = std::array<std::size_t , 5 >;
301
314
@@ -321,75 +334,87 @@ class run_op {
321
334
system::error_code ec3 = {},
322
335
system::error_code = {})
323
336
{
324
- BOOST_ASIO_CORO_REENTER (coro_) for (;;)
337
+ BOOST_ASIO_CORO_REENTER (coro_)
325
338
{
326
- // Try to connect
327
- BOOST_ASIO_CORO_YIELD
328
- conn_->stream_ .async_connect (&conn_->cfg_ , logger_, std::move (self));
329
-
330
- // If we failed, try again
339
+ // Check config
340
+ ec0 = check_config (conn_->cfg_ );
331
341
if (ec0) {
332
- self.complete (ec0);
342
+ logger_.log_error (" Invalid configuration" , ec0);
343
+ stored_ec_ = ec0;
344
+ BOOST_ASIO_CORO_YIELD asio::async_immediate (self.get_io_executor (), std::move (self));
345
+ self.complete (stored_ec_);
333
346
return ;
334
347
}
335
348
336
- conn_->mpx_ .reset ();
349
+ for (;;) {
350
+ // Try to connect
351
+ BOOST_ASIO_CORO_YIELD
352
+ conn_->stream_ .async_connect (&conn_->cfg_ , logger_, std::move (self));
337
353
338
- // Note: Order is important here because the writer might
339
- // trigger an async_write before the async_hello thereby
340
- // causing an authentication problem.
341
- BOOST_ASIO_CORO_YIELD
342
- asio::experimental::make_parallel_group (
343
- [this ](auto token) {
344
- return conn_->handshaker_ .async_hello (*conn_, logger_, token);
345
- },
346
- [this ](auto token) {
347
- return conn_->health_checker_ .async_ping (*conn_, logger_, token);
348
- },
349
- [this ](auto token) {
350
- return conn_->health_checker_ .async_check_timeout (*conn_, logger_, token);
351
- },
352
- [this ](auto token) {
353
- return conn_->reader (logger_, token);
354
- },
355
- [this ](auto token) {
356
- return conn_->writer (logger_, token);
357
- })
358
- .async_wait (asio::experimental::wait_for_one_error (), std::move (self));
359
-
360
- if (order[0 ] == 0 && !!ec0) {
361
- self.complete (ec0);
362
- return ;
363
- }
354
+ // If we failed, try again
355
+ if (ec0) {
356
+ self.complete (ec0);
357
+ return ;
358
+ }
364
359
365
- if (order[0 ] == 2 && ec2 == error::pong_timeout) {
366
- self.complete (ec1);
367
- return ;
368
- }
360
+ conn_->mpx_ .reset ();
369
361
370
- // The receive operation must be cancelled because channel
371
- // subscription does not survive a reconnection but requires
372
- // re-subscription.
373
- conn_->cancel (operation::receive);
362
+ // Note: Order is important here because the writer might
363
+ // trigger an async_write before the async_hello thereby
364
+ // causing an authentication problem.
365
+ BOOST_ASIO_CORO_YIELD
366
+ asio::experimental::make_parallel_group (
367
+ [this ](auto token) {
368
+ return conn_->handshaker_ .async_hello (*conn_, logger_, token);
369
+ },
370
+ [this ](auto token) {
371
+ return conn_->health_checker_ .async_ping (*conn_, logger_, token);
372
+ },
373
+ [this ](auto token) {
374
+ return conn_->health_checker_ .async_check_timeout (*conn_, logger_, token);
375
+ },
376
+ [this ](auto token) {
377
+ return conn_->reader (logger_, token);
378
+ },
379
+ [this ](auto token) {
380
+ return conn_->writer (logger_, token);
381
+ })
382
+ .async_wait (asio::experimental::wait_for_one_error (), std::move (self));
383
+
384
+ if (order[0 ] == 0 && !!ec0) {
385
+ self.complete (ec0);
386
+ return ;
387
+ }
374
388
375
- if (!conn_->will_reconnect ()) {
376
- conn_->cancel (operation::reconnection);
377
- self.complete (ec3);
378
- return ;
379
- }
389
+ if (order[0 ] == 2 && ec2 == error::pong_timeout) {
390
+ self.complete (ec1);
391
+ return ;
392
+ }
380
393
381
- conn_->reconnect_timer_ .expires_after (conn_->cfg_ .reconnect_wait_interval );
394
+ // The receive operation must be cancelled because channel
395
+ // subscription does not survive a reconnection but requires
396
+ // re-subscription.
397
+ conn_->cancel (operation::receive);
382
398
383
- BOOST_ASIO_CORO_YIELD
384
- conn_->reconnect_timer_ .async_wait (asio::prepend (std::move (self), order_t {}));
385
- if (ec0) {
386
- self.complete (ec0);
387
- return ;
388
- }
399
+ if (!conn_->will_reconnect ()) {
400
+ conn_->cancel (operation::reconnection);
401
+ self.complete (ec3);
402
+ return ;
403
+ }
389
404
390
- if (!conn_->will_reconnect ()) {
391
- self.complete (asio::error::operation_aborted);
392
- return ;
405
+ conn_->reconnect_timer_ .expires_after (conn_->cfg_ .reconnect_wait_interval );
406
+
407
+ BOOST_ASIO_CORO_YIELD
408
+ conn_->reconnect_timer_ .async_wait (asio::prepend (std::move (self), order_t {}));
409
+ if (ec0) {
410
+ self.complete (ec0);
411
+ return ;
412
+ }
413
+
414
+ if (!conn_->will_reconnect ()) {
415
+ self.complete (asio::error::operation_aborted);
416
+ return ;
417
+ }
393
418
}
394
419
}
395
420
}
@@ -753,9 +778,7 @@ class basic_connection {
753
778
writer_timer_);
754
779
}
755
780
756
- auto is_open () const noexcept { return stream_.is_open (); }
757
-
758
- [[nodiscard]] bool trigger_write () const noexcept { return is_open () && !mpx_.is_writing (); }
781
+ bool is_open () const noexcept { return stream_.is_open (); }
759
782
760
783
detail::redis_stream<Executor> stream_;
761
784
0 commit comments