Skip to content

Commit 9f026f7

Browse files
Merge pull request #12727 from rabbitmq/rabbitmq-server-12709
By @Ayanda-D: Ensure only alive leaders and followers when fetching QQ replica states
2 parents 6e8b566 + 53cc8f8 commit 9f026f7

File tree

3 files changed

+91
-21
lines changed

3 files changed

+91
-21
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

+20-11
Original file line numberDiff line numberDiff line change
@@ -441,19 +441,28 @@ become_leader0(QName, Name) ->
441441
-spec all_replica_states() -> {node(), #{atom() => atom()}}.
442442
all_replica_states() ->
443443
Rows0 = ets:tab2list(ra_state),
444-
Rows = lists:map(fun
445-
({K, follower, promotable}) ->
446-
{K, promotable};
447-
({K, follower, non_voter}) ->
448-
{K, non_voter};
449-
({K, S, _}) ->
450-
%% voter or unknown
451-
{K, S};
452-
(T) ->
453-
T
454-
end, Rows0),
444+
Rows = lists:filtermap(
445+
fun
446+
(T = {K, _, _}) ->
447+
case whereis(K) of
448+
undefined ->
449+
false;
450+
P when is_pid(P) ->
451+
{true, to_replica_state(T)}
452+
end;
453+
(_T) ->
454+
false
455+
end, Rows0),
455456
{node(), maps:from_list(Rows)}.
456457

458+
to_replica_state({K, follower, promotable}) ->
459+
{K, promotable};
460+
to_replica_state({K, follower, non_voter}) ->
461+
{K, non_voter};
462+
to_replica_state({K, S, _}) ->
463+
%% voter or unknown
464+
{K, S}.
465+
457466
-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
458467
list_with_minimum_quorum() ->
459468
Queues = rabbit_amqqueue:list_local_quorum_queues(),

deps/rabbit/test/quorum_queue_SUITE.erl

+50-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ groups() ->
9797
force_all_queues_shrink_member_to_current_member,
9898
force_vhost_queues_shrink_member_to_current_member,
9999
policy_repair,
100-
gh_12635
100+
gh_12635,
101+
replica_states
101102
]
102103
++ all_tests()},
103104
{cluster_size_5, [], [start_queue,
@@ -4352,6 +4353,54 @@ requeue_multiple_false(Config) ->
43524353
?assertEqual(#'queue.delete_ok'{message_count = 0},
43534354
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})).
43544355

4356+
replica_states(Config) ->
4357+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4358+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
4359+
4360+
[?assertEqual({'queue.declare_ok', Q, 0, 0},
4361+
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
4362+
|| Q <- [<<"Q1">>, <<"Q2">>, <<"Q3">>]],
4363+
4364+
Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4365+
4366+
[Q1_ClusterName, Q2_ClusterName, Q3_ClusterName] =
4367+
[begin
4368+
{ClusterName, _} = amqqueue:get_pid(Q),
4369+
ClusterName
4370+
end
4371+
|| Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue],
4372+
4373+
Result1 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []),
4374+
ct:pal("all replica states: ~tp", [Result1]),
4375+
4376+
lists:map(fun({_Node, ReplicaStates}) ->
4377+
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
4378+
?assert(maps:is_key(Q2_ClusterName, ReplicaStates)),
4379+
?assert(maps:is_key(Q3_ClusterName, ReplicaStates))
4380+
end, Result1),
4381+
4382+
%% Unregister a few queues (same outcome of 'noproc')
4383+
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q2_ClusterName]),
4384+
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, unregister, [Q3_ClusterName]),
4385+
4386+
?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q2_ClusterName])),
4387+
?assert(undefined == rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [Q3_ClusterName])),
4388+
4389+
Result2 = rabbit_misc:append_rpc_all_nodes(Servers, rabbit_quorum_queue, all_replica_states, []),
4390+
ct:pal("replica states with a node missing Q1 and Q2: ~tp", [Result2]),
4391+
4392+
lists:map(fun({Node, ReplicaStates}) ->
4393+
if Node == Server ->
4394+
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
4395+
?assertNot(maps:is_key(Q2_ClusterName, ReplicaStates)),
4396+
?assertNot(maps:is_key(Q3_ClusterName, ReplicaStates));
4397+
true ->
4398+
?assert(maps:is_key(Q1_ClusterName, ReplicaStates)),
4399+
?assert(maps:is_key(Q2_ClusterName, ReplicaStates)),
4400+
?assert(maps:is_key(Q3_ClusterName, ReplicaStates))
4401+
end
4402+
end, Result2).
4403+
43554404
%%----------------------------------------------------------------------------
43564405

43574406
same_elements(L1, L2)

deps/rabbit/test/unit_quorum_queue_SUITE.erl

+21-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
all() ->
99
[
10-
all_replica_states_includes_nonvoters,
10+
all_replica_states_includes_alive_nonvoters,
1111
filter_nonvoters,
1212
filter_quorum_critical_accounts_nonvoters,
1313
ra_machine_conf_delivery_limit
@@ -97,27 +97,29 @@ filter_nonvoters(_Config) ->
9797
[Q4] = rabbit_quorum_queue:filter_promotable(Qs, Ss),
9898
ok.
9999

100-
all_replica_states_includes_nonvoters(_Config) ->
100+
all_replica_states_includes_alive_nonvoters(_Config) ->
101101
ets:new(ra_state, [named_table, public, {write_concurrency, true}]),
102+
QPids = start_qprocs(_AliveQs = [q1, q2, q3, q4]),
102103
ets:insert(ra_state, [
103104
{q1, leader, voter},
104105
{q2, follower, voter},
105106
{q3, follower, promotable},
106107
{q4, init, unknown},
107-
%% pre ra-2.7.0
108-
{q5, leader},
109-
{q6, follower}
108+
%% queues in ra_state but not alive
109+
{q5, leader, voter},
110+
{q6, follower, noproc}
110111
]),
111112
{_, #{
112113
q1 := leader,
113114
q2 := follower,
114115
q3 := promotable,
115-
q4 := init,
116-
q5 := leader,
117-
q6 := follower
118-
}} = rabbit_quorum_queue:all_replica_states(),
116+
q4 := init
117+
} = ReplicaStates} = rabbit_quorum_queue:all_replica_states(),
118+
?assertNot(maps:is_key(q5, ReplicaStates)),
119+
?assertNot(maps:is_key(q6, ReplicaStates)),
119120

120121
true = ets:delete(ra_state),
122+
_ = stop_qprocs(QPids),
121123
ok.
122124

123125
make_ra_machine_conf(Q0, Arg, Pol, OpPol) ->
@@ -128,3 +130,13 @@ make_ra_machine_conf(Q0, Arg, Pol, OpPol) ->
128130
{definition, [{<<"delivery-limit">>,OpPol}]}]),
129131
rabbit_quorum_queue:ra_machine_config(Q).
130132

133+
start_qprocs(Qs) ->
134+
[begin
135+
QPid = spawn(fun() -> receive done -> ok end end),
136+
erlang:register(Q, QPid),
137+
QPid
138+
end || Q <- Qs].
139+
140+
stop_qprocs(Pids) ->
141+
[erlang:send(P, done)|| P <- Pids].
142+

0 commit comments

Comments
 (0)