Skip to content

Commit 2c0cdee

Browse files
authored
Support x-cc message annotation (#12559)
Support x-cc message annotation Support an `x-cc` message annotation in AMQP 1.0 similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. The value of the `x-cc` message annotation must by a list of strings. A message annotation is used since application properties allow only simple types.
1 parent 0c905f9 commit 2c0cdee

17 files changed

+593
-102
lines changed

deps/rabbit/BUILD.bazel

+6
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,12 @@ rabbitmq_integration_suite(
862862
rabbitmq_integration_suite(
863863
name = "topic_permission_SUITE",
864864
size = "medium",
865+
additional_beam = [
866+
":test_amqp_utils_beam",
867+
],
868+
runtime_deps = [
869+
"//deps/rabbitmq_amqp_client:erlang_app",
870+
],
865871
)
866872

867873
rabbitmq_integration_suite(

deps/rabbit/app.bzl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
15591559
outs = ["test/topic_permission_SUITE.beam"],
15601560
app_name = "rabbit",
15611561
erlc_opts = "//:test_erlc_opts",
1562-
deps = ["//deps/amqp_client:erlang_app"],
1562+
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
15631563
)
15641564
erlang_bytecode(
15651565
name = "transactions_SUITE_beam_files",

deps/rabbit/src/mc.erl

+28-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
priority/1,
2727
set_ttl/2,
2828
x_header/2,
29+
x_headers/1,
2930
routing_headers/2,
3031
exchange/1,
3132
routing_keys/1,
@@ -88,6 +89,7 @@
8889
{timestamp, non_neg_integer()} |
8990
{list, [tagged_value()]} |
9091
{map, [{tagged_value(), tagged_value()}]} |
92+
{array, atom(), [tagged_value()]} |
9193
null |
9294
undefined.
9395

@@ -104,11 +106,16 @@
104106
{MetadataSize :: non_neg_integer(),
105107
PayloadSize :: non_neg_integer()}.
106108

107-
%% retrieve and x- header from the protocol data
109+
%% retrieve an x- header from the protocol data
108110
%% the return value should be tagged with an AMQP 1.0 type
109111
-callback x_header(binary(), proto_state()) ->
110112
tagged_value().
111113

114+
%% retrieve x- headers from the protocol data
115+
%% the return values should be tagged with an AMQP 1.0 type
116+
-callback x_headers(proto_state()) ->
117+
#{binary() => tagged_value()}.
118+
112119
%% retrieve a property field from the protocol data
113120
%% e.g. message_id, correlation_id
114121
-callback property(atom(), proto_state()) ->
@@ -148,7 +155,7 @@ init(Proto, Data, Anns) ->
148155
-spec init(protocol(), term(), annotations(), environment()) -> state().
149156
init(Proto, Data, Anns0, Env) ->
150157
{ProtoData, ProtoAnns} = Proto:init(Data),
151-
Anns1 = case map_size(Env) == 0 of
158+
Anns1 = case map_size(Env) =:= 0 of
152159
true -> Anns0;
153160
false -> Anns0#{env => Env}
154161
end,
@@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto,
214221
x_header(Key, BasicMsg) ->
215222
mc_compat:x_header(Key, BasicMsg).
216223

224+
-spec x_headers(state()) ->
225+
#{binary() => tagged_value()}.
226+
x_headers(#?MODULE{protocol = Proto,
227+
annotations = Anns,
228+
data = Data}) ->
229+
%% x-headers may be have been added to the annotations map.
230+
New = maps:filtermap(
231+
fun(Key, Val) ->
232+
case mc_util:is_x_header(Key) of
233+
true ->
234+
{true, mc_util:infer_type(Val)};
235+
false ->
236+
false
237+
end
238+
end, Anns),
239+
maps:merge(Proto:x_headers(Data), New);
240+
x_headers(BasicMsg) ->
241+
mc_compat:x_headers(BasicMsg).
242+
217243
-spec routing_headers(state(), [x_headers | complex_types]) ->
218244
#{binary() => property_value()}.
219245
routing_headers(#?MODULE{protocol = Proto,

deps/rabbit/src/mc_amqp.erl

+13-34
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
init/1,
99
size/1,
1010
x_header/2,
11+
x_headers/1,
1112
property/2,
1213
routing_headers/2,
1314
convert_to/3,
@@ -125,6 +126,9 @@ size(#v1{message_annotations = MA,
125126
x_header(Key, Msg) ->
126127
message_annotation(Key, Msg, undefined).
127128

129+
x_headers(Msg) ->
130+
#{K => V || {{_T, K}, V} <- message_annotations(Msg)}.
131+
128132
property(_Prop, #msg_body_encoded{properties = undefined}) ->
129133
undefined;
130134
property(Prop, #msg_body_encoded{properties = Props}) ->
@@ -618,41 +622,16 @@ encode_deaths(Deaths) ->
618622
{map, Map}
619623
end, Deaths).
620624

621-
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
625+
essential_properties(Msg) ->
622626
Durable = get_property(durable, Msg),
623627
Priority = get_property(priority, Msg),
624628
Timestamp = get_property(timestamp, Msg),
625629
Ttl = get_property(ttl, Msg),
626-
Anns0 = #{?ANN_DURABLE => Durable},
627-
Anns = maps_put_truthy(
628-
?ANN_PRIORITY, Priority,
629-
maps_put_truthy(
630-
?ANN_TIMESTAMP, Timestamp,
631-
maps_put_truthy(
632-
ttl, Ttl,
633-
Anns0))),
634-
case MA of
635-
[] ->
636-
Anns;
637-
_ ->
638-
lists:foldl(
639-
fun ({{symbol, <<"x-routing-key">>},
640-
{utf8, Key}}, Acc) ->
641-
maps:update_with(?ANN_ROUTING_KEYS,
642-
fun(L) -> [Key | L] end,
643-
[Key],
644-
Acc);
645-
({{symbol, <<"x-cc">>},
646-
{list, CCs0}}, Acc) ->
647-
CCs = [CC || {_T, CC} <- CCs0],
648-
maps:update_with(?ANN_ROUTING_KEYS,
649-
fun(L) -> L ++ CCs end,
650-
CCs,
651-
Acc);
652-
({{symbol, <<"x-exchange">>},
653-
{utf8, Exchange}}, Acc) ->
654-
Acc#{?ANN_EXCHANGE => Exchange};
655-
(_, Acc) ->
656-
Acc
657-
end, Anns, MA)
658-
end.
630+
Anns = #{?ANN_DURABLE => Durable},
631+
maps_put_truthy(
632+
?ANN_PRIORITY, Priority,
633+
maps_put_truthy(
634+
?ANN_TIMESTAMP, Timestamp,
635+
maps_put_truthy(
636+
ttl, Ttl,
637+
Anns))).

deps/rabbit/src/mc_amqpl.erl

+18-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
init/1,
1212
size/1,
1313
x_header/2,
14+
x_headers/1,
1415
routing_headers/2,
1516
convert_to/3,
1617
convert_from/3,
@@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) ->
273274
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
274275
x_header(Key, Content).
275276

277+
x_headers(#content{properties = #'P_basic'{headers = undefined}}) ->
278+
#{};
279+
x_headers(#content{properties = #'P_basic'{headers = Headers}}) ->
280+
L = lists:filtermap(
281+
fun({Name, Type, Val}) ->
282+
case mc_util:is_x_header(Name) of
283+
true ->
284+
{true, {Name, from_091(Type, Val)}};
285+
false ->
286+
false
287+
end
288+
end, Headers),
289+
maps:from_list(L);
290+
x_headers(#content{properties = none} = Content0) ->
291+
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
292+
x_headers(Content).
293+
276294
property(Prop, Content) ->
277295
mc_util:infer_type(mc_compat:get_property(Prop, Content)).
278296

@@ -707,7 +725,6 @@ supported_header_value_type(table) ->
707725
supported_header_value_type(_) ->
708726
true.
709727

710-
711728
amqp10_map_get(_K, []) ->
712729
undefined;
713730
amqp10_map_get(K, Tuples) ->

deps/rabbit/src/mc_compat.erl

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
priority/1,
2121
set_ttl/2,
2222
x_header/2,
23+
x_headers/1,
2324
routing_headers/2,
2425
%%%
2526
convert_to/2,
@@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) ->
138139
x_header(Key,#basic_message{content = Content}) ->
139140
mc_amqpl:x_header(Key, Content).
140141

142+
x_headers(#basic_message{content = Content}) ->
143+
mc_amqpl:x_headers(Content).
144+
141145
routing_headers(#basic_message{content = Content}, Opts) ->
142146
mc_amqpl:routing_headers(Content, Opts).
143147

deps/rabbit/src/mc_util.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) ->
6161
amqp_map_get(Key, {map, List}, Default) ->
6262
amqp_map_get(Key, List, Default);
6363
amqp_map_get(Key, List, Default) when is_list(List) ->
64-
case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of
64+
case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of
6565
{value, {_K, V}} ->
6666
V;
6767
false ->

deps/rabbit/src/rabbit_amqp_session.erl

+50-20
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@
154154
%% The routing key is either defined in the ATTACH frame and static for
155155
%% the life time of the link or dynamically provided in each message's
156156
%% "to" field (address v2) or "subject" field (address v1).
157+
%% (A publisher can set additional routing keys via the x-cc message annotation.)
157158
routing_key :: rabbit_types:routing_key() | to | subject,
158159
%% queue_name_bin is only set if the link target address refers to a queue.
159160
queue_name_bin :: undefined | rabbit_misc:resource_name(),
@@ -2369,11 +2370,11 @@ incoming_link_transfer(
23692370

23702371
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
23712372
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
2372-
{ok, X, RoutingKey, Mc1, PermCache} ->
2373+
{ok, X, RoutingKeys, Mc1, PermCache} ->
23732374
Mc2 = rabbit_message_interceptor:intercept(Mc1),
23742375
check_user_id(Mc2, User),
2375-
TopicPermCache = check_write_permitted_on_topic(
2376-
X, User, RoutingKey, TopicPermCache0),
2376+
TopicPermCache = check_write_permitted_on_topics(
2377+
X, User, RoutingKeys, TopicPermCache0),
23772378
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
23782379
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
23792380
Opts = #{correlation => {HandleInt, DeliveryId}},
@@ -2408,14 +2409,14 @@ incoming_link_transfer(
24082409
"delivery_tag=~p, delivery_id=~p, reason=~p",
24092410
[DeliveryTag, DeliveryId, Reason])
24102411
end;
2411-
{error, #'v1_0.error'{} = Err} ->
2412+
{error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} ->
24122413
Disposition = case Settled of
24132414
true -> [];
24142415
false -> [released(DeliveryId)]
24152416
end,
24162417
Detach = [detach(HandleInt, Link0, Err)],
24172418
{error, Disposition ++ Detach};
2418-
{error, anonymous_terminus, #'v1_0.error'{} = Err} ->
2419+
{error, {anonymous_terminus, true}, #'v1_0.error'{} = Err} ->
24192420
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
24202421
case Settled of
24212422
true ->
@@ -2440,13 +2441,13 @@ incoming_link_transfer(
24402441
end.
24412442

24422443
lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) ->
2443-
lookup_routing_key(X, LinkRKey, Mc, PermCache);
2444+
lookup_routing_key(X, LinkRKey, Mc, false, PermCache);
24442445
lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) ->
24452446
case rabbit_exchange:lookup(XName) of
24462447
{ok, X} ->
2447-
lookup_routing_key(X, LinkRKey, Mc, PermCache);
2448+
lookup_routing_key(X, LinkRKey, Mc, false, PermCache);
24482449
{error, not_found} ->
2449-
{error, error_not_found(XName)}
2450+
{error, {anonymous_terminus, false}, error_not_found(XName)}
24502451
end;
24512452
lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
24522453
case mc:property(to, Mc) of
@@ -2458,25 +2459,26 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
24582459
case rabbit_exchange:lookup(XName) of
24592460
{ok, X} ->
24602461
check_internal_exchange(X),
2461-
lookup_routing_key(X, RKey, Mc, PermCache);
2462+
lookup_routing_key(X, RKey, Mc, true, PermCache);
24622463
{error, not_found} ->
2463-
{error, anonymous_terminus, error_not_found(XName)}
2464+
{error, {anonymous_terminus, true}, error_not_found(XName)}
24642465
end;
24652466
{error, bad_address} ->
2466-
{error, anonymous_terminus,
2467+
{error, {anonymous_terminus, true},
24672468
#'v1_0.error'{
24682469
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
24692470
description = {utf8, <<"bad 'to' address string: ", String/binary>>}}}
24702471
end;
24712472
undefined ->
2472-
{error, anonymous_terminus,
2473+
{error, {anonymous_terminus, true},
24732474
#'v1_0.error'{
24742475
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
24752476
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}
24762477
end.
24772478

24782479
lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}},
2479-
RKey0, Mc0, PermCache) ->
2480+
RKey0, Mc0, AnonTerm, PermCache) ->
2481+
Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0),
24802482
RKey = case RKey0 of
24812483
subject ->
24822484
case mc:property(subject, Mc0) of
@@ -2488,9 +2490,31 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}},
24882490
_ when is_binary(RKey0) ->
24892491
RKey0
24902492
end,
2491-
Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0),
2492-
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc1),
2493-
{ok, X, RKey, Mc, PermCache}.
2493+
case mc:x_header(<<"x-cc">>, Mc0) of
2494+
undefined ->
2495+
RKeys = [RKey],
2496+
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
2497+
{ok, X, RKeys, Mc, PermCache};
2498+
{list, CCs0} = L ->
2499+
try lists:map(fun({utf8, CC}) -> CC end, CCs0) of
2500+
CCs ->
2501+
RKeys = [RKey | CCs],
2502+
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
2503+
{ok, X, RKeys, Mc, PermCache}
2504+
catch error:function_clause ->
2505+
{error, {anonymous_terminus, AnonTerm}, bad_x_cc(L)}
2506+
end;
2507+
BadValue ->
2508+
{error, {anonymous_terminus, AnonTerm}, bad_x_cc(BadValue)}
2509+
end.
2510+
2511+
bad_x_cc(Value) ->
2512+
Desc = unicode:characters_to_binary(
2513+
lists:flatten(
2514+
io_lib:format(
2515+
"bad value for 'x-cc' message-annotation: ~tp", [Value]))),
2516+
#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
2517+
description = {utf8, Desc}}.
24942518

24952519
process_routing_confirm([], _SenderSettles = true, _, U) ->
24962520
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
@@ -3445,14 +3469,20 @@ check_resource_access(Resource, Perm, User, Cache) ->
34453469
end
34463470
end.
34473471

3448-
-spec check_write_permitted_on_topic(
3472+
-spec check_write_permitted_on_topics(
34493473
rabbit_types:exchange(),
34503474
rabbit_types:user(),
3451-
rabbit_types:routing_key(),
3475+
[rabbit_types:routing_key(),...],
34523476
topic_permission_cache()) ->
34533477
topic_permission_cache().
3454-
check_write_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) ->
3455-
check_topic_authorisation(Resource, User, RoutingKey, write, TopicPermCache).
3478+
check_write_permitted_on_topics(#exchange{type = topic} = Resource,
3479+
User, RoutingKeys, TopicPermCache) ->
3480+
lists:foldl(
3481+
fun(RoutingKey, Cache) ->
3482+
check_topic_authorisation(Resource, User, RoutingKey, write, Cache)
3483+
end, TopicPermCache, RoutingKeys);
3484+
check_write_permitted_on_topics(_, _, _, TopicPermCache) ->
3485+
TopicPermCache.
34563486

34573487
-spec check_read_permitted_on_topic(
34583488
rabbit_types:exchange(),

0 commit comments

Comments
 (0)