Skip to content

Commit 91c502e

Browse files
committed
Support x-cc message annotation
Support an `x-cc` message annotation in AMQP 1.0 similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. The value of the `x-cc` message annotation must by an array of strings. A message annotation is used since application properties allow only simple types.
1 parent 814d44d commit 91c502e

17 files changed

+595
-105
lines changed

deps/rabbit/BUILD.bazel

+6
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,12 @@ rabbitmq_integration_suite(
862862
rabbitmq_integration_suite(
863863
name = "topic_permission_SUITE",
864864
size = "medium",
865+
additional_beam = [
866+
":test_amqp_utils_beam",
867+
],
868+
runtime_deps = [
869+
"//deps/rabbitmq_amqp_client:erlang_app",
870+
],
865871
)
866872

867873
rabbitmq_integration_suite(

deps/rabbit/app.bzl

+1-1
Original file line numberDiff line numberDiff line change
@@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
15591559
outs = ["test/topic_permission_SUITE.beam"],
15601560
app_name = "rabbit",
15611561
erlc_opts = "//:test_erlc_opts",
1562-
deps = ["//deps/amqp_client:erlang_app"],
1562+
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
15631563
)
15641564
erlang_bytecode(
15651565
name = "transactions_SUITE_beam_files",

deps/rabbit/src/mc.erl

+28-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
priority/1,
2727
set_ttl/2,
2828
x_header/2,
29+
x_headers/1,
2930
routing_headers/2,
3031
exchange/1,
3132
routing_keys/1,
@@ -88,6 +89,7 @@
8889
{timestamp, non_neg_integer()} |
8990
{list, [tagged_value()]} |
9091
{map, [{tagged_value(), tagged_value()}]} |
92+
{array, atom(), [tagged_value()]} |
9193
null |
9294
undefined.
9395

@@ -104,11 +106,16 @@
104106
{MetadataSize :: non_neg_integer(),
105107
PayloadSize :: non_neg_integer()}.
106108

107-
%% retrieve and x- header from the protocol data
109+
%% retrieve an x- header from the protocol data
108110
%% the return value should be tagged with an AMQP 1.0 type
109111
-callback x_header(binary(), proto_state()) ->
110112
tagged_value().
111113

114+
%% retrieve x- headers from the protocol data
115+
%% the return values should be tagged with an AMQP 1.0 type
116+
-callback x_headers(proto_state()) ->
117+
#{binary() => tagged_value()}.
118+
112119
%% retrieve a property field from the protocol data
113120
%% e.g. message_id, correlation_id
114121
-callback property(atom(), proto_state()) ->
@@ -148,7 +155,7 @@ init(Proto, Data, Anns) ->
148155
-spec init(protocol(), term(), annotations(), environment()) -> state().
149156
init(Proto, Data, Anns0, Env) ->
150157
{ProtoData, ProtoAnns} = Proto:init(Data),
151-
Anns1 = case map_size(Env) == 0 of
158+
Anns1 = case map_size(Env) =:= 0 of
152159
true -> Anns0;
153160
false -> Anns0#{env => Env}
154161
end,
@@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto,
214221
x_header(Key, BasicMsg) ->
215222
mc_compat:x_header(Key, BasicMsg).
216223

224+
-spec x_headers(state()) ->
225+
#{binary() => tagged_value()}.
226+
x_headers(#?MODULE{protocol = Proto,
227+
annotations = Anns,
228+
data = Data}) ->
229+
%% x-headers may be have been added to the annotations map.
230+
New = maps:filtermap(
231+
fun(Key, Val) ->
232+
case mc_util:is_x_header(Key) of
233+
true ->
234+
{true, mc_util:infer_type(Val)};
235+
false ->
236+
false
237+
end
238+
end, Anns),
239+
maps:merge(Proto:x_headers(Data), New);
240+
x_headers(BasicMsg) ->
241+
mc_compat:x_headers(BasicMsg).
242+
217243
-spec routing_headers(state(), [x_headers | complex_types]) ->
218244
#{binary() => property_value()}.
219245
routing_headers(#?MODULE{protocol = Proto,

deps/rabbit/src/mc_amqp.erl

+13-34
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
init/1,
99
size/1,
1010
x_header/2,
11+
x_headers/1,
1112
property/2,
1213
routing_headers/2,
1314
convert_to/3,
@@ -125,6 +126,9 @@ size(#v1{message_annotations = MA,
125126
x_header(Key, Msg) ->
126127
message_annotation(Key, Msg, undefined).
127128

129+
x_headers(Msg) ->
130+
#{K => V || {{_T, K}, V} <- message_annotations(Msg)}.
131+
128132
property(_Prop, #msg_body_encoded{properties = undefined}) ->
129133
undefined;
130134
property(Prop, #msg_body_encoded{properties = Props}) ->
@@ -618,41 +622,16 @@ encode_deaths(Deaths) ->
618622
{map, Map}
619623
end, Deaths).
620624

621-
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
625+
essential_properties(Msg) ->
622626
Durable = get_property(durable, Msg),
623627
Priority = get_property(priority, Msg),
624628
Timestamp = get_property(timestamp, Msg),
625629
Ttl = get_property(ttl, Msg),
626-
Anns0 = #{?ANN_DURABLE => Durable},
627-
Anns = maps_put_truthy(
628-
?ANN_PRIORITY, Priority,
629-
maps_put_truthy(
630-
?ANN_TIMESTAMP, Timestamp,
631-
maps_put_truthy(
632-
ttl, Ttl,
633-
Anns0))),
634-
case MA of
635-
[] ->
636-
Anns;
637-
_ ->
638-
lists:foldl(
639-
fun ({{symbol, <<"x-routing-key">>},
640-
{utf8, Key}}, Acc) ->
641-
maps:update_with(?ANN_ROUTING_KEYS,
642-
fun(L) -> [Key | L] end,
643-
[Key],
644-
Acc);
645-
({{symbol, <<"x-cc">>},
646-
{list, CCs0}}, Acc) ->
647-
CCs = [CC || {_T, CC} <- CCs0],
648-
maps:update_with(?ANN_ROUTING_KEYS,
649-
fun(L) -> L ++ CCs end,
650-
CCs,
651-
Acc);
652-
({{symbol, <<"x-exchange">>},
653-
{utf8, Exchange}}, Acc) ->
654-
Acc#{?ANN_EXCHANGE => Exchange};
655-
(_, Acc) ->
656-
Acc
657-
end, Anns, MA)
658-
end.
630+
Anns = #{?ANN_DURABLE => Durable},
631+
maps_put_truthy(
632+
?ANN_PRIORITY, Priority,
633+
maps_put_truthy(
634+
?ANN_TIMESTAMP, Timestamp,
635+
maps_put_truthy(
636+
ttl, Ttl,
637+
Anns))).

deps/rabbit/src/mc_amqpl.erl

+55-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
init/1,
1212
size/1,
1313
x_header/2,
14+
x_headers/1,
1415
routing_headers/2,
1516
convert_to/3,
1617
convert_from/3,
@@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) ->
273274
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
274275
x_header(Key, Content).
275276

277+
x_headers(#content{properties = #'P_basic'{headers = undefined}}) ->
278+
#{};
279+
x_headers(#content{properties = #'P_basic'{headers = Headers}}) ->
280+
L = lists:filtermap(
281+
fun({Name, Type, Val}) ->
282+
case mc_util:is_x_header(Name) of
283+
true ->
284+
{true, {Name, from_091(Type, Val)}};
285+
false ->
286+
false
287+
end
288+
end, Headers),
289+
maps:from_list(L);
290+
x_headers(#content{properties = none} = Content0) ->
291+
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
292+
x_headers(Content).
293+
276294
property(Prop, Content) ->
277295
mc_util:infer_type(mc_compat:get_property(Prop, Content)).
278296

@@ -690,10 +708,23 @@ from_091(binary, V) -> {binary, V};
690708
from_091(timestamp, V) -> {timestamp, V * 1000};
691709
from_091(byte, V) -> {byte, V};
692710
from_091(void, _V) -> null;
693-
from_091(array, L) ->
694-
{list, [from_091(T, V) || {T, V} <- L]};
695711
from_091(table, L) ->
696-
{map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}.
712+
{map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]};
713+
from_091(array, []) ->
714+
{list, []};
715+
from_091(array, L0 = [{T0, _} | _]) ->
716+
{L = [{T1, _} | _], {Monomorphic, _}} =
717+
lists:mapfoldl(fun({T, V}, {Mono0, PrevType}) ->
718+
Mono = case Mono0 of
719+
false -> false;
720+
true -> T =:= PrevType
721+
end,
722+
{from_091(T, V), {Mono, T}}
723+
end, {true, T0}, L0),
724+
case Monomorphic of
725+
true -> {array, T1, L};
726+
false -> {list, L}
727+
end.
697728

698729
map_add(_T, _Key, _Type, undefined, Acc) ->
699730
Acc;
@@ -707,7 +738,6 @@ supported_header_value_type(table) ->
707738
supported_header_value_type(_) ->
708739
true.
709740

710-
711741
amqp10_map_get(_K, []) ->
712742
undefined;
713743
amqp10_map_get(K, Tuples) ->
@@ -857,3 +887,24 @@ amqp10_section_header(Header, Headers) ->
857887

858888
amqp_encoded_binary(Section) ->
859889
iolist_to_binary(amqp10_framing:encode_bin(Section)).
890+
891+
-ifdef(TEST).
892+
-include_lib("eunit/include/eunit.hrl").
893+
894+
from_091_array_test() ->
895+
{list, []} = from_091(array, []),
896+
{array, utf8, [{utf8, <<"e1">>}]} = from_091(array, [{longstr, <<"e1">>}]),
897+
{array, utf8, [{utf8, <<"e1">>},
898+
{utf8, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>},
899+
{longstr, <<"e2">>}]),
900+
{list, [{utf8, <<"e1">>},
901+
{binary, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>},
902+
{binary, <<"e2">>}]),
903+
{list, [{utf8, <<"e1">>},
904+
{binary, <<"e2">>},
905+
{utf8, <<"e3">>},
906+
{utf8, <<"e4">>}]} = from_091(array, [{longstr, <<"e1">>},
907+
{binary, <<"e2">>},
908+
{longstr, <<"e3">>},
909+
{longstr, <<"e4">>}]).
910+
-endif.

deps/rabbit/src/mc_compat.erl

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
priority/1,
2121
set_ttl/2,
2222
x_header/2,
23+
x_headers/1,
2324
routing_headers/2,
2425
%%%
2526
convert_to/2,
@@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) ->
138139
x_header(Key,#basic_message{content = Content}) ->
139140
mc_amqpl:x_header(Key, Content).
140141

142+
x_headers(#basic_message{content = Content}) ->
143+
mc_amqpl:x_headers(Content).
144+
141145
routing_headers(#basic_message{content = Content}, Opts) ->
142146
mc_amqpl:routing_headers(Content, Opts).
143147

deps/rabbit/src/mc_util.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) ->
6161
amqp_map_get(Key, {map, List}, Default) ->
6262
amqp_map_get(Key, List, Default);
6363
amqp_map_get(Key, List, Default) when is_list(List) ->
64-
case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of
64+
case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of
6565
{value, {_K, V}} ->
6666
V;
6767
false ->

0 commit comments

Comments
 (0)