Skip to content

Commit f1396b5

Browse files
Merge pull request #13525 from rabbitmq/ik-dpc-queue-protection
By @ikavgo: introduce a deletion protection marker for queues
2 parents 3eeb8f9 + c69403e commit f1396b5

File tree

4 files changed

+189
-11
lines changed

4 files changed

+189
-11
lines changed

deps/rabbit/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_dis
276276
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
277277
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
278278
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
279-
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing
279+
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
280280

281281
PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
282282
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))

deps/rabbit/src/amqqueue.erl

+41-10
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
is_exclusive/1,
6262
is_classic/1,
6363
is_quorum/1,
64+
is_internal/1,
65+
internal_owner/1,
66+
make_internal/1,
67+
make_internal/2,
6468
pattern_match_all/0,
6569
pattern_match_on_name/1,
6670
pattern_match_on_type/1,
@@ -78,6 +82,8 @@
7882
-define(is_backwards_compat_classic(T),
7983
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
8084

85+
-type amqqueue_options() :: map() | ets:match_pattern().
86+
8187
-record(amqqueue, {
8288
%% immutable
8389
name :: rabbit_amqqueue:name() | ets:match_pattern(),
@@ -108,7 +114,7 @@
108114
slave_pids_pending_shutdown = [], %% reserved
109115
%% secondary index
110116
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
111-
options = #{} :: map() | ets:match_pattern(),
117+
options = #{} :: amqqueue_options(),
112118
type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
113119
type_state = #{} :: map() | ets:match_pattern()
114120
}).
@@ -351,6 +357,19 @@ get_arguments(#amqqueue{arguments = Args}) ->
351357
set_arguments(#amqqueue{} = Queue, Args) ->
352358
Queue#amqqueue{arguments = Args}.
353359

360+
% options
361+
362+
-spec get_options(amqqueue()) -> amqqueue_options().
363+
364+
get_options(#amqqueue{options = Options}) ->
365+
Options.
366+
367+
-spec set_options(amqqueue(), amqqueue_options()) -> amqqueue().
368+
369+
set_options(#amqqueue{} = Queue, Options) ->
370+
Queue#amqqueue{options = Options}.
371+
372+
354373
% decorators
355374

356375
-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
@@ -395,15 +414,6 @@ get_name(#amqqueue{name = Name}) -> Name.
395414
set_name(#amqqueue{} = Queue, Name) ->
396415
Queue#amqqueue{name = Name}.
397416

398-
-spec get_options(amqqueue()) -> map().
399-
400-
get_options(#amqqueue{options = Options}) -> Options.
401-
402-
-spec set_options(amqqueue(), map()) -> amqqueue().
403-
404-
set_options(#amqqueue{} = Queue, Options) ->
405-
Queue#amqqueue{options = Options}.
406-
407417
% pid
408418

409419
-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none.
@@ -497,6 +507,27 @@ is_classic(Queue) ->
497507
is_quorum(Queue) ->
498508
get_type(Queue) =:= rabbit_quorum_queue.
499509

510+
-spec is_internal(amqqueue()) -> boolean().
511+
512+
is_internal(#amqqueue{options = #{internal := true}}) -> true;
513+
is_internal(#amqqueue{}) -> false.
514+
515+
-spec internal_owner(amqqueue()) -> rabbit_types:option(#resource{}).
516+
517+
internal_owner(#amqqueue{options = #{internal := true,
518+
internal_owner := IOwner}}) ->
519+
IOwner;
520+
internal_owner(#amqqueue{}) ->
521+
undefined.
522+
523+
make_internal(Q = #amqqueue{options = Options}) when is_map(Options) ->
524+
Q#amqqueue{options = maps:merge(Options, #{internal => true,
525+
internal_owner => undefined})}.
526+
make_internal(Q = #amqqueue{options = Options}, Owner)
527+
when is_map(Options) andalso is_record(Owner, resource) ->
528+
Q#amqqueue{options = maps:merge(Options, #{internal => true,
529+
interna_owner => Owner})}.
530+
500531
fields() ->
501532
fields(?record_version).
502533

deps/rabbit/src/rabbit_amqqueue.erl

+30
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,35 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
811811
"match that of the original declaration.",
812812
[rabbit_misc:rs(QueueName)]).
813813

814+
-spec check_internal(amqqueue:amqqueue(), rabbit_types:username()) ->
815+
'ok' | rabbit_types:channel_exit().
816+
check_internal(Q, Username) ->
817+
case amqqueue:is_internal(Q) of
818+
true ->
819+
case Username of
820+
%% note cli delete command uses "cli_user"
821+
?INTERNAL_USER ->
822+
ok;
823+
_ ->
824+
QueueName = amqqueue:get_name(Q),
825+
case amqqueue:internal_owner(Q) of
826+
undefined ->
827+
rabbit_misc:protocol_error(
828+
resource_locked,
829+
"Cannot delete protected ~ts.",
830+
[rabbit_misc:rs(QueueName)]);
831+
IOwner ->
832+
rabbit_misc:protocol_error(
833+
resource_locked,
834+
"Cannot delete protected ~ts. It was "
835+
"declared as an protected and can be deleted only by deleting the owner entity: ~ts",
836+
[rabbit_misc:rs(QueueName), rabbit_misc:rs(IOwner)])
837+
end
838+
end;
839+
false ->
840+
ok
841+
end.
842+
814843
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
815844
A | rabbit_types:channel_exit().
816845
with_exclusive_access_or_die(Name, ReaderPid, F) ->
@@ -1681,6 +1710,7 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
16811710
case with(
16821711
QueueName,
16831712
fun (Q) ->
1713+
ok = check_internal(Q, Username),
16841714
if CheckExclusive ->
16851715
check_exclusive_access(Q, ConnPid);
16861716
true ->
+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
-module(rabbit_amqqueue_SUITE).
2+
3+
-compile([export_all, nowarn_export_all]).
4+
5+
-include_lib("common_test/include/ct.hrl").
6+
-include_lib("eunit/include/eunit.hrl").
7+
-include_lib("amqp_client/include/amqp_client.hrl").
8+
9+
%%%===================================================================
10+
%%% Common Test callbacks
11+
%%%===================================================================
12+
13+
all() ->
14+
[
15+
{group, rabbit_amqqueue_tests}
16+
].
17+
18+
19+
all_tests() ->
20+
[
21+
normal_queue_delete_with,
22+
internal_queue_delete_with
23+
].
24+
25+
groups() ->
26+
[
27+
{rabbit_amqqueue_tests, [], all_tests()}
28+
].
29+
30+
init_per_suite(Config) ->
31+
rabbit_ct_helpers:log_environment(),
32+
rabbit_ct_helpers:run_setup_steps(Config).
33+
34+
end_per_suite(Config) ->
35+
rabbit_ct_helpers:run_teardown_steps(Config).
36+
37+
init_per_group(_Group, Config) ->
38+
rabbit_ct_helpers:run_steps(Config,
39+
rabbit_ct_broker_helpers:setup_steps()).
40+
41+
end_per_group(_Group, Config) ->
42+
rabbit_ct_helpers:run_steps(Config,
43+
rabbit_ct_broker_helpers:teardown_steps()).
44+
45+
init_per_testcase(Testcase, Config) ->
46+
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
47+
rabbit_ct_helpers:run_steps(Config1,
48+
rabbit_ct_client_helpers:setup_steps()).
49+
50+
end_per_testcase(Testcase, Config) ->
51+
Config1 = rabbit_ct_helpers:run_steps(
52+
Config,
53+
rabbit_ct_client_helpers:teardown_steps()),
54+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
55+
56+
%%%===================================================================
57+
%%% Test cases
58+
%%%===================================================================
59+
60+
normal_queue_delete_with(Config) ->
61+
QName = queue_name(Config, <<"normal">>),
62+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
63+
Queue = amqqueue:new(QName,
64+
none, %% pid
65+
true, %% durable
66+
false, %% auto delete
67+
none, %% owner,
68+
[],
69+
<<"/">>,
70+
#{},
71+
rabbit_classic_queue),
72+
73+
?assertMatch({new, _Q}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, declare, [Queue, Node])),
74+
75+
?assertMatch({ok, _}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete_with, [QName, false, false, <<"dummy">>])),
76+
77+
?assertMatch({error, not_found}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName])),
78+
79+
ok.
80+
81+
internal_queue_delete_with(Config) ->
82+
QName = queue_name(Config, <<"internal_protected">>),
83+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
84+
Queue = amqqueue:new(QName,
85+
none, %% pid
86+
true, %% durable
87+
false, %% auto delete
88+
none, %% owner,
89+
[],
90+
<<"/">>,
91+
#{},
92+
rabbit_classic_queue),
93+
IQueue = amqqueue:make_internal(Queue, rabbit_misc:r(<<"/">>, exchange, <<"amq.default">>)),
94+
95+
?assertMatch({new, _Q}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, declare, [IQueue, Node])),
96+
97+
?assertException(exit, {exception,
98+
{amqp_error, resource_locked,
99+
"Cannot delete protected queue 'rabbit_amqqueue_tests/internal_protected' in vhost '/'.",
100+
none}}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete_with, [QName, false, false, <<"dummy">>])),
101+
102+
?assertMatch({ok, _}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName])),
103+
104+
?assertMatch({ok, _}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete_with, [QName, false, false, ?INTERNAL_USER])),
105+
106+
?assertMatch({error, not_found}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName])),
107+
108+
ok.
109+
110+
%% Utility
111+
112+
queue_name(Config, Name) ->
113+
Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)),
114+
queue_name(Name1).
115+
116+
queue_name(Name) ->
117+
rabbit_misc:r(<<"/">>, queue, Name).

0 commit comments

Comments
 (0)