Skip to content

Commit

Permalink
Merge pull request #13441 from rabbitmq/mergify/bp/v4.0.x/pr-13440
Browse files Browse the repository at this point in the history
Handle mc_amqp 3.13 `msg` record in 4.x (backport #13435) (backport #13440)
  • Loading branch information
michaelklishin authored Mar 1, 2025
2 parents 63e6f4a + 95dd670 commit 9c2b7cb
Showing 1 changed file with 76 additions and 6 deletions.
82 changes: 76 additions & 6 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,29 @@
Val :: term()}].
-type opt(T) :: T | undefined.

%% This representation was used in v3.13.7. 4.x understands this record for
%% backward compatibility, specifically for the rare case where:
%% 1. a 3.13 node internally parsed a message from a stream via
%% ```
%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
%% ```
%% 2. published this Message to a queue
%% 3. RabbitMQ got upgraded to 4.x
%%
%% This record along with all its conversions in this module can therefore
%% be deleted in some future RabbitMQ version once it's safe to assume that
%% these upgraded messages have all been consumed.
-record(msg,
{
header :: opt(#'v1_0.header'{}),
delivery_annotations = []:: list(),
message_annotations = [] :: list(),
properties :: opt(#'v1_0.properties'{}),
application_properties = [] :: list(),
data = [] :: amqp10_data(),
footer = [] :: list()
}).

%% This representation is used when the message was originally sent with
%% a protocol other than AMQP and the message was not read from a stream.
-record(msg_body_decoded,
Expand Down Expand Up @@ -94,7 +117,7 @@
body_code :: body_descriptor_code()
}).

-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.

-export_type([state/0]).

Expand All @@ -109,6 +132,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
convert_from(_SourceProto, _, _Env) ->
not_implemented.

convert_to(?MODULE, Msg = #msg{}, _Env) ->
convert_from_3_13_msg(Msg);
convert_to(?MODULE, Msg, _Env) ->
Msg;
convert_to(TargetProto, Msg, Env) ->
Expand All @@ -120,7 +145,22 @@ size(#v1{message_annotations = MA,
[] -> 0;
_ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE
end,
{MetaSize, byte_size(Body)}.
{MetaSize, byte_size(Body)};
%% Copied from v3.13.7.
%% This might be called in rabbit_fifo_v3 and must therefore not be modified
%% to ensure determinism of quorum queues version 3.
size(#msg{data = Body}) ->
BodySize = if is_list(Body) ->
lists:foldl(
fun(#'v1_0.data'{content = Data}, Acc) ->
iolist_size(Data) + Acc;
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
Acc
end, 0, Body);
is_record(Body, 'v1_0.amqp_value') ->
0
end,
{_MetaSize = 0, BodySize}.

x_header(Key, Msg) ->
message_annotation(Key, Msg, undefined).
Expand All @@ -129,6 +169,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
undefined;
property(Prop, #msg_body_encoded{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #msg{properties = undefined}) ->
undefined;
property(Prop, #msg{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) ->
undefined;
property(Prop, #v1{bare_and_footer = Bin,
Expand Down Expand Up @@ -260,7 +304,9 @@ protocol_state(#v1{message_annotations = MA0,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
[encode(Sections), BareAndFooter];
protocol_state(#msg{} = Msg, Anns) ->
protocol_state(convert_from_3_13_msg(Msg), Anns).

prepare(read, Msg) ->
Msg;
Expand All @@ -284,7 +330,9 @@ prepare(store, #msg_body_encoded{
bare_and_footer_application_properties_pos = AppPropsPos,
bare_and_footer_body_pos = BodyPos,
body_code = BodyCode
}.
};
prepare(store, Msg = #msg{}) ->
Msg.

%% internal

Expand Down Expand Up @@ -341,7 +389,9 @@ msg_to_sections(#v1{message_annotations = MAC,
Sections = amqp10_framing:decode_bin(Bin),
Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}]
end,
to_sections(undefined, MAC, Tail).
to_sections(undefined, MAC, Tail);
msg_to_sections(#msg{} = Msg) ->
msg_to_sections(convert_from_3_13_msg(Msg)).

to_sections(H, MAC, P, APC, Tail) ->
S0 = case APC of
Expand Down Expand Up @@ -372,6 +422,20 @@ to_sections(H, MAC, Tail) ->
[H | S]
end.

convert_from_3_13_msg(#msg{header = H,
delivery_annotations = _,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}) ->
#msg_body_decoded{header = H,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}.

-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
amqp_annotations().
protocol_state_message_annotations(MA, Anns) ->
Expand Down Expand Up @@ -444,11 +508,14 @@ message_annotation(Key, State, Default)

message_annotations(#msg_body_decoded{message_annotations = L}) -> L;
message_annotations(#msg_body_encoded{message_annotations = L}) -> L;
message_annotations(#v1{message_annotations = L}) -> L.
message_annotations(#v1{message_annotations = L}) -> L;
message_annotations(#msg{message_annotations = L}) -> L.

message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content).

message_annotations_as_simple_map0(Content) ->
Expand All @@ -463,6 +530,9 @@ message_annotations_as_simple_map0(Content) ->
application_properties_as_simple_map(
#msg_body_encoded{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#msg{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) ->
L;
Expand Down

0 comments on commit 9c2b7cb

Please sign in to comment.