Skip to content

Commit 34d4582

Browse files
authored
Fix Error Handling For Connection Manager (#507)
1 parent e3a9cab commit 34d4582

File tree

3 files changed

+171
-35
lines changed

3 files changed

+171
-35
lines changed

source/connection_manager.c

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,6 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
11641164

11651165
struct aws_http_connection_manager *manager = work->manager;
11661166

1167-
int representative_error = 0;
11681167
size_t new_connection_failures = 0;
11691168

11701169
/*
@@ -1202,29 +1201,28 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
12021201
*/
12031202
struct aws_array_list errors;
12041203
AWS_ZERO_STRUCT(errors);
1205-
/* Even if we can't init this array, we still need to invoke error callbacks properly */
1206-
bool push_errors = false;
12071204

12081205
if (work->new_connections > 0) {
12091206
AWS_LOGF_INFO(
12101207
AWS_LS_HTTP_CONNECTION_MANAGER,
12111208
"id=%p: Requesting %zu new connections from http",
12121209
(void *)manager,
12131210
work->new_connections);
1214-
push_errors = aws_array_list_init_dynamic(&errors, work->allocator, work->new_connections, sizeof(int)) ==
1215-
AWS_ERROR_SUCCESS;
1211+
AWS_FATAL_ASSERT(
1212+
aws_array_list_init_dynamic(&errors, work->allocator, work->new_connections, sizeof(int)) ==
1213+
AWS_OP_SUCCESS);
12161214
}
12171215

12181216
for (size_t i = 0; i < work->new_connections; ++i) {
12191217
if (s_aws_http_connection_manager_new_connection(manager)) {
12201218
++new_connection_failures;
1221-
representative_error = aws_last_error();
1222-
if (push_errors) {
1223-
AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &representative_error) == AWS_OP_SUCCESS);
1224-
}
1219+
int error = aws_last_error();
1220+
AWS_FATAL_ASSERT(aws_array_list_push_back(&errors, &error) == AWS_OP_SUCCESS);
12251221
}
12261222
}
12271223

1224+
bool has_pending_acquisitions = false;
1225+
struct aws_connection_management_transaction pending_acquisitions_work;
12281226
if (new_connection_failures > 0) {
12291227
/*
12301228
* We failed and aren't going to receive a callback, but the current state assumes we will receive
@@ -1235,44 +1233,45 @@ static void s_aws_http_connection_manager_execute_transaction(struct aws_connect
12351233
AWS_FATAL_ASSERT(manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] >= new_connection_failures);
12361234
s_connection_manager_internal_ref_decrease(manager, AWS_HCMCT_PENDING_CONNECTIONS, new_connection_failures);
12371235

1238-
/*
1239-
* Rather than failing one acquisition for each connection failure, if there's at least one
1240-
* connection failure, we instead fail all excess acquisitions, since there's no pending
1241-
* connect that will necessarily resolve them.
1242-
*
1243-
* Try to correspond an error with the acquisition failure, but as a fallback just use the
1244-
* representative error.
1245-
*/
1246-
size_t i = 0;
1247-
while (manager->pending_acquisition_count > manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS]) {
1248-
int error = representative_error;
1249-
if (i < aws_array_list_length(&errors)) {
1250-
aws_array_list_get_at(&errors, &error, i);
1251-
}
1252-
1236+
for (size_t i = 0; i < new_connection_failures && manager->pending_acquisition_count > 0; i++) {
1237+
int error;
1238+
aws_array_list_get_at(&errors, &error, i);
12531239
AWS_LOGF_DEBUG(
12541240
AWS_LS_HTTP_CONNECTION_MANAGER,
1255-
"id=%p: Failing excess connection acquisition with error code %d",
1241+
"id=%p: Failing connection acquisition with error code %d",
12561242
(void *)manager,
12571243
(int)error);
12581244
s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error, &work->completions);
1259-
++i;
12601245
}
1261-
1246+
has_pending_acquisitions =
1247+
manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count <
1248+
manager->pending_acquisition_count;
1249+
if (has_pending_acquisitions) {
1250+
/* If there are pending acquisitions, schedule work again to try acquiring them */
1251+
s_aws_connection_management_transaction_init(&pending_acquisitions_work, manager);
1252+
s_aws_http_connection_manager_build_transaction(&pending_acquisitions_work);
1253+
}
12621254
aws_mutex_unlock(&manager->lock);
12631255
}
12641256

12651257
/*
12661258
* Step 4 - Perform acquisition callbacks
12671259
*/
12681260
s_aws_http_connection_manager_complete_acquisitions(&work->completions, work->allocator);
1269-
12701261
aws_array_list_clean_up(&errors);
12711262

12721263
/*
12731264
* Step 5 - Clean up work. Do this here rather than at the end of every caller. Destroy the manager if necessary
12741265
*/
12751266
s_aws_connection_management_transaction_clean_up(work);
1267+
1268+
/*
1269+
* Step 6 - If some connection acquisition requests failed and we still have more pending acquisitions, try to
1270+
* acquire them.
1271+
*/
1272+
if (has_pending_acquisitions) {
1273+
s_aws_http_connection_manager_execute_transaction(&pending_acquisitions_work);
1274+
}
12761275
}
12771276

12781277
void aws_http_connection_manager_acquire_connection(
@@ -1479,12 +1478,11 @@ static void s_cm_on_connection_ready_or_failed(
14791478
work->connection_to_release = connection;
14801479
}
14811480
} else {
1482-
/* fail acquisition as one connection cannot be used any more */
1483-
while (manager->pending_acquisition_count >
1484-
manager->internal_ref[AWS_HCMCT_PENDING_CONNECTIONS] + manager->pending_settings_count) {
1481+
if (manager->pending_acquisition_count > 0) {
1482+
/* fail acquisition as connection acquire failed */
14851483
AWS_LOGF_DEBUG(
14861484
AWS_LS_HTTP_CONNECTION_MANAGER,
1487-
"id=%p: Failing excess connection acquisition with error code %d",
1485+
"id=%p: Failing connection acquisition with error code %d",
14881486
(void *)manager,
14891487
(int)error_code);
14901488
s_aws_http_connection_manager_move_front_acquisition(manager, NULL, error_code, &work->completions);

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,8 @@ add_net_test_case(connection_manager_setup_shutdown)
529529
add_net_test_case(connection_manager_acquire_release_mix_synchronous)
530530
add_net_test_case(connection_manager_acquisition_timeout)
531531
add_net_test_case(connection_manager_connect_callback_failure)
532+
add_net_test_case(test_connection_manager_connect_callback_async_failure)
533+
add_net_test_case(test_connection_manager_connect_callback_async_with_immediate_failure)
532534
add_net_test_case(connection_manager_connect_immediate_failure)
533535
add_net_test_case(connection_manager_proxy_setup_shutdown)
534536
add_net_test_case(connection_manager_idle_culling_single)

tests/test_connection_manager.c

Lines changed: 139 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ AWS_TEST_CASE(
819819
connection_manager_max_pending_acquisitions_with_vended_connections,
820820
s_test_connection_manager_max_pending_acquisitions_with_vended_connections);
821821

822-
static int s_aws_http_connection_manager_create_connection_sync_mock(
822+
static int s_aws_http_connection_manager_create_connection_validate(
823823
const struct aws_http_client_connection_options *options) {
824824
struct cm_tester *tester = &s_tester;
825825

@@ -830,8 +830,6 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
830830
ASSERT_TRUE(aws_byte_cursor_eq_c_str(&interface_name, options->socket_options->network_interface_name));
831831
}
832832

833-
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);
834-
835833
ASSERT_SUCCESS(aws_mutex_lock(&tester->lock));
836834
tester->release_connection_fn = options->on_shutdown;
837835
ASSERT_SUCCESS(aws_mutex_unlock(&tester->lock));
@@ -847,7 +845,15 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
847845
ASSERT_UINT_EQUALS(options->proxy_options->connection_type, tester->verify_proxy_options->connection_type);
848846
}
849847

848+
return AWS_OP_SUCCESS;
849+
}
850+
static int s_aws_http_connection_manager_create_connection_sync_mock(
851+
const struct aws_http_client_connection_options *options) {
852+
s_aws_http_connection_manager_create_connection_validate(options);
853+
struct cm_tester *tester = &s_tester;
854+
850855
struct mock_connection *connection = NULL;
856+
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);
851857

852858
if (next_connection_id < aws_array_list_length(&tester->mock_connections)) {
853859
aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id);
@@ -868,6 +874,63 @@ static int s_aws_http_connection_manager_create_connection_sync_mock(
868874
return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN);
869875
}
870876

877+
struct connect_task_args {
878+
void *user_data;
879+
struct mock_connection *connection;
880+
aws_http_on_client_connection_setup_fn *on_setup;
881+
};
882+
883+
static void s_aws_http_connection_manager_connect_task(
884+
struct aws_task *task,
885+
void *user_data,
886+
enum aws_task_status status) {
887+
(void)status;
888+
struct cm_tester *tester = &s_tester;
889+
890+
struct connect_task_args *task_args = user_data;
891+
struct mock_connection *connection = task_args->connection;
892+
if (connection) {
893+
if (connection->result == AWS_NCRT_SUCCESS) {
894+
task_args->on_setup((struct aws_http_connection *)connection, AWS_ERROR_SUCCESS, task_args->user_data);
895+
} else if (connection->result == AWS_NCRT_ERROR_VIA_CALLBACK) {
896+
task_args->on_setup(NULL, AWS_ERROR_HTTP_UNKNOWN, task_args->user_data);
897+
} else {
898+
AWS_FATAL_ASSERT(0 && "Unexpected connection->result");
899+
}
900+
}
901+
902+
aws_mem_release(tester->allocator, task);
903+
aws_mem_release(tester->allocator, task_args);
904+
}
905+
906+
static int s_aws_http_connection_manager_connect_async_mock(const struct aws_http_client_connection_options *options) {
907+
s_aws_http_connection_manager_create_connection_validate(options);
908+
struct cm_tester *tester = &s_tester;
909+
910+
struct mock_connection *connection = NULL;
911+
size_t next_connection_id = aws_atomic_fetch_add(&tester->next_connection_id, 1);
912+
if (next_connection_id < aws_array_list_length(&tester->mock_connections)) {
913+
aws_array_list_get_at(&tester->mock_connections, &connection, next_connection_id);
914+
}
915+
916+
if (connection->result == AWS_NCRT_ERROR_FROM_CREATE) {
917+
return aws_raise_error(AWS_ERROR_HTTP_UNKNOWN);
918+
}
919+
920+
struct aws_task *task = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_task));
921+
struct connect_task_args *task_args = aws_mem_calloc(options->allocator, 1, sizeof(struct connect_task_args));
922+
task_args->connection = connection;
923+
task_args->user_data = options->user_data;
924+
task_args->on_setup = options->on_setup;
925+
aws_task_init(task, s_aws_http_connection_manager_connect_task, task_args, "create_connection_task");
926+
927+
struct aws_event_loop *event_loop = aws_event_loop_group_get_next_loop(tester->event_loop_group);
928+
uint64_t now;
929+
ASSERT_SUCCESS(aws_event_loop_current_clock_time(event_loop, &now));
930+
aws_event_loop_schedule_task_future(event_loop, task, now + 1000000000);
931+
return AWS_OP_SUCCESS;
932+
}
933+
871934
static void s_aws_http_connection_manager_release_connection_sync_mock(struct aws_http_connection *connection) {
872935
(void)connection;
873936

@@ -920,6 +983,17 @@ static struct aws_http_connection_manager_system_vtable s_synchronous_mocks = {
920983
.aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock,
921984
};
922985

986+
static struct aws_http_connection_manager_system_vtable s_async_connect_mock = {
987+
.aws_http_client_connect = s_aws_http_connection_manager_connect_async_mock,
988+
.aws_http_connection_release = s_aws_http_connection_manager_release_connection_sync_mock,
989+
.aws_http_connection_close = s_aws_http_connection_manager_close_connection_sync_mock,
990+
.aws_http_connection_new_requests_allowed = s_aws_http_connection_manager_is_connection_available_sync_mock,
991+
.aws_high_res_clock_get_ticks = aws_high_res_clock_get_ticks,
992+
.aws_http_connection_get_channel = s_aws_http_connection_manager_connection_get_channel_sync_mock,
993+
.aws_channel_thread_is_callers_thread = s_aws_http_connection_manager_is_callers_thread_sync_mock,
994+
.aws_http_connection_get_version = s_aws_http_connection_manager_connection_get_version_sync_mock,
995+
};
996+
923997
static int s_test_connection_manager_with_network_interface_list(struct aws_allocator *allocator, void *ctx) {
924998
(void)ctx;
925999
struct aws_byte_cursor *interface_names_array = aws_mem_calloc(allocator, 3, sizeof(struct aws_byte_cursor));
@@ -1055,6 +1129,68 @@ static int s_test_connection_manager_connect_callback_failure(struct aws_allocat
10551129
}
10561130
AWS_TEST_CASE(connection_manager_connect_callback_failure, s_test_connection_manager_connect_callback_failure);
10571131

1132+
static int s_test_connection_manager_connect_callback_async_failure(struct aws_allocator *allocator, void *ctx) {
1133+
(void)ctx;
1134+
int error_connections = 5;
1135+
int success_connections = 5;
1136+
struct cm_tester_options options = {
1137+
.allocator = allocator,
1138+
.max_connections = error_connections,
1139+
.mock_table = &s_async_connect_mock,
1140+
};
1141+
1142+
ASSERT_SUCCESS(s_cm_tester_init(&options));
1143+
1144+
s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false);
1145+
s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true);
1146+
1147+
s_acquire_connections(error_connections + success_connections);
1148+
1149+
ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections));
1150+
1151+
ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections);
1152+
ASSERT_SUCCESS(s_release_connections(success_connections, false));
1153+
1154+
ASSERT_SUCCESS(s_cm_tester_clean_up());
1155+
1156+
return AWS_OP_SUCCESS;
1157+
}
1158+
AWS_TEST_CASE(
1159+
test_connection_manager_connect_callback_async_failure,
1160+
s_test_connection_manager_connect_callback_async_failure);
1161+
1162+
static int s_test_connection_manager_connect_callback_async_with_immediate_failure(
1163+
struct aws_allocator *allocator,
1164+
void *ctx) {
1165+
(void)ctx;
1166+
int error_connections = 5;
1167+
int success_connections = 5;
1168+
struct cm_tester_options options = {
1169+
.allocator = allocator,
1170+
.max_connections = error_connections + success_connections,
1171+
.mock_table = &s_async_connect_mock,
1172+
};
1173+
1174+
ASSERT_SUCCESS(s_cm_tester_init(&options));
1175+
1176+
s_add_mock_connections(success_connections, AWS_NCRT_SUCCESS, true);
1177+
s_add_mock_connections(error_connections, AWS_NCRT_ERROR_VIA_CALLBACK, false);
1178+
1179+
s_acquire_connections(error_connections + success_connections);
1180+
1181+
ASSERT_SUCCESS(s_wait_on_connection_reply_count(error_connections + success_connections));
1182+
1183+
ASSERT_UINT_EQUALS(s_tester.connection_errors, error_connections);
1184+
ASSERT_SUCCESS(s_release_connections(success_connections, false));
1185+
1186+
ASSERT_SUCCESS(s_cm_tester_clean_up());
1187+
1188+
return AWS_OP_SUCCESS;
1189+
}
1190+
AWS_TEST_CASE(
1191+
test_connection_manager_connect_callback_async_with_immediate_failure,
1192+
s_test_connection_manager_connect_callback_async_with_immediate_failure);
1193+
10581194
static int s_test_connection_manager_connect_immediate_failure(struct aws_allocator *allocator, void *ctx) {
10591195
(void)ctx;
10601196

0 commit comments

Comments
 (0)