Skip to content

Commit

Permalink
Handle mc_amqp 3.13 msg record in 4.x
Browse files Browse the repository at this point in the history
The `msg` record was used in 3.13. This commit makes 4.x understand
this record for backward compatibility, specifically for the rare case where:
1. a 3.13 node internally parsed a message from a stream via
```
Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
```
2. published this Message to a queue
3. RabbitMQ got upgraded to 4.x

(This commit can be reverted in some future RabbitMQ version once it's
safe to assume that these upgraded messages have been consumed.)

The changes were manually tested as described in Jira RMQ-1525.

(cherry picked from commit 91f5ce2)
  • Loading branch information
ansd authored and mergify[bot] committed Feb 28, 2025
1 parent c19fc0e commit 75cffc9
Showing 1 changed file with 76 additions and 6 deletions.
82 changes: 76 additions & 6 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@
Val :: term()}].
-type opt(T) :: T | undefined.

%% This representation was used in v3.13.7. 4.x understands this record for
%% backward compatibility, specifically for the rare case where:
%% 1. a 3.13 node internally parsed a message from a stream via
%% ```
%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
%% ```
%% 2. published this Message to a queue
%% 3. RabbitMQ got upgraded to 4.x
%%
%% This record along with all its conversions in this module can therefore
%% be deleted in some future RabbitMQ version once it's safe to assume that
%% these upgraded messages have all been consumed.
-record(msg,
{
header :: opt(#'v1_0.header'{}),
delivery_annotations = []:: list(),
message_annotations = [] :: list(),
properties :: opt(#'v1_0.properties'{}),
application_properties = [] :: list(),
data = [] :: amqp10_data(),
footer = [] :: list()
}).

%% This representation is used when the message was originally sent with
%% a protocol other than AMQP and the message was not read from a stream.
-record(msg_body_decoded,
Expand Down Expand Up @@ -97,7 +120,7 @@
body_code :: body_descriptor_code()
}).

-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.

-export_type([state/0]).

Expand Down Expand Up @@ -128,6 +151,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
convert_from(_SourceProto, _, _Env) ->
not_implemented.

convert_to(?MODULE, Msg = #msg{}, _Env) ->
convert_from_3_13_msg(Msg);
convert_to(?MODULE, Msg, _Env) ->
Msg;
convert_to(TargetProto, Msg, Env) ->
Expand All @@ -139,7 +164,22 @@ size(#v1{message_annotations = MA,
[] -> 0;
_ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE
end,
{MetaSize, byte_size(Body)}.
{MetaSize, byte_size(Body)};
%% Copied from v3.13.7.
%% This might be called in rabbit_fifo_v3 and must therefore not be modified
%% to ensure determinism of quorum queues version 3.
size(#msg{data = Body}) ->
BodySize = if is_list(Body) ->
lists:foldl(
fun(#'v1_0.data'{content = Data}, Acc) ->
iolist_size(Data) + Acc;
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
Acc
end, 0, Body);
is_record(Body, 'v1_0.amqp_value') ->
0
end,
{_MetaSize = 0, BodySize}.

x_header(Key, Msg) ->
message_annotation(Key, Msg, undefined).
Expand All @@ -151,6 +191,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
undefined;
property(Prop, #msg_body_encoded{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #msg{properties = undefined}) ->
undefined;
property(Prop, #msg{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) ->
undefined;
property(Prop, #v1{bare_and_footer = Bin,
Expand Down Expand Up @@ -298,7 +342,9 @@ protocol_state(#v1{message_annotations = MA0,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
[encode(Sections), BareAndFooter];
protocol_state(#msg{} = Msg, Anns) ->
protocol_state(convert_from_3_13_msg(Msg), Anns).

prepare(read, Msg) ->
Msg;
Expand All @@ -322,7 +368,9 @@ prepare(store, #msg_body_encoded{
bare_and_footer_application_properties_pos = AppPropsPos,
bare_and_footer_body_pos = BodyPos,
body_code = BodyCode
}.
};
prepare(store, Msg = #msg{}) ->
Msg.

%% internal

Expand Down Expand Up @@ -379,7 +427,9 @@ msg_to_sections(#v1{message_annotations = MAC,
Sections = amqp10_framing:decode_bin(Bin),
Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}]
end,
to_sections(undefined, MAC, Tail).
to_sections(undefined, MAC, Tail);
msg_to_sections(#msg{} = Msg) ->
msg_to_sections(convert_from_3_13_msg(Msg)).

to_sections(H, MAC, P, APC, Tail) ->
S0 = case APC of
Expand Down Expand Up @@ -410,6 +460,20 @@ to_sections(H, MAC, Tail) ->
[H | S]
end.

convert_from_3_13_msg(#msg{header = H,
delivery_annotations = _,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}) ->
#msg_body_decoded{header = H,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}.

-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
amqp_annotations().
protocol_state_message_annotations(MA, Anns) ->
Expand Down Expand Up @@ -482,11 +546,14 @@ message_annotation(Key, State, Default)

message_annotations(#msg_body_decoded{message_annotations = L}) -> L;
message_annotations(#msg_body_encoded{message_annotations = L}) -> L;
message_annotations(#v1{message_annotations = L}) -> L.
message_annotations(#v1{message_annotations = L}) -> L;
message_annotations(#msg{message_annotations = L}) -> L.

message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content).

message_annotations_as_simple_map0(Content) ->
Expand All @@ -501,6 +568,9 @@ message_annotations_as_simple_map0(Content) ->
application_properties_as_simple_map(
#msg_body_encoded{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#msg{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) ->
L;
Expand Down

0 comments on commit 75cffc9

Please sign in to comment.