Skip to content

Commit 1267d59

Browse files
committed
Simplify Direct Reply-To
This commit is no change in functionality and mostly deletes dead code. 1. Code targeting Erlang 22 and below is deleted since the mininmum required Erlang version is higher nowadays. "In OTP 23 distribution flag DFLAG_BIG_CREATION became mandatory. All pids are now encoded using NEW_PID_EXT, even external pids received as PID_EXT from older nodes." https://www.erlang.org/doc/apps/erts/erl_ext_dist.html#new_pid_ext 2. All v1 encoding and decoding of the Pid is deleted since the lower version RabbitMQ node supports the v2 encoding nowadays.
1 parent 2291579 commit 1267d59

File tree

4 files changed

+38
-99
lines changed

4 files changed

+38
-99
lines changed

deps/rabbit/src/pid_recomposition.erl

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
-module(pid_recomposition).
99

10-
1110
%% API
1211
-export([
1312
to_binary/1,
@@ -19,40 +18,23 @@
1918
-define(TTB_PREFIX, 131).
2019

2120
-define(NEW_PID_EXT, 88).
22-
-define(PID_EXT, 103).
2321
-define(ATOM_UTF8_EXT, 118).
2422
-define(SMALL_ATOM_UTF8_EXT, 119).
2523

26-
%%
27-
%% API
28-
%%
29-
3024
-spec decompose(pid()) -> #{atom() => any()}.
3125
decompose(Pid) ->
3226
from_binary(term_to_binary(Pid, [{minor_version, 2}])).
3327

3428
-spec from_binary(binary()) -> #{atom() => any()}.
3529
from_binary(Bin) ->
36-
PidData = case Bin of
37-
%% Erlang 23+
38-
<<?TTB_PREFIX, ?NEW_PID_EXT, Val0/binary>> -> Val0;
39-
%% Erlang 22
40-
<<?TTB_PREFIX, ?PID_EXT, Val1/binary>> -> Val1
41-
end,
30+
<<?TTB_PREFIX, ?NEW_PID_EXT, PidData/binary>> = Bin,
4231
{Node, Rest2} = case PidData of
4332
<<?ATOM_UTF8_EXT, AtomLen:16/integer, Node0:AtomLen/binary, Rest1/binary>> ->
4433
{Node0, Rest1};
4534
<<?SMALL_ATOM_UTF8_EXT, AtomLen/integer, Node0:AtomLen/binary, Rest1/binary>> ->
4635
{Node0, Rest1}
4736
end,
48-
{ID, Serial, Creation} = case Rest2 of
49-
%% NEW_PID_EXT on Erlang 23+
50-
<<ID0:32/integer, Serial0:32/integer, Creation0:32/integer>> ->
51-
{ID0, Serial0, Creation0};
52-
%% PID_EXT on Erlang 22
53-
<<ID1:32/integer, Serial1:32/integer, Creation1:8/integer>> ->
54-
{ID1, Serial1, Creation1}
55-
end,
37+
<<ID:32/integer, Serial:32/integer, Creation:32/integer>> = Rest2,
5638
#{
5739
node => binary_to_atom(Node, utf8),
5840
id => ID,
@@ -62,9 +44,16 @@ from_binary(Bin) ->
6244

6345
-spec to_binary(#{atom() => any()}) -> binary().
6446
to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) ->
65-
BinNode = atom_to_binary(Node, utf8),
47+
BinNode = atom_to_binary(Node),
6648
NodeLen = byte_size(BinNode),
67-
<<?TTB_PREFIX:8/unsigned, ?NEW_PID_EXT:8/unsigned, ?ATOM_UTF8_EXT:8/unsigned, NodeLen:16/unsigned, BinNode/binary, ID:32, Serial:32, Creation:32>>.
49+
<<?TTB_PREFIX:8/unsigned,
50+
?NEW_PID_EXT:8/unsigned,
51+
?ATOM_UTF8_EXT:8/unsigned,
52+
NodeLen:16/unsigned,
53+
BinNode/binary,
54+
ID:32,
55+
Serial:32,
56+
Creation:32>>.
6857

6958
-spec recompose(#{atom() => any()}) -> pid().
7059
recompose(M) ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -297,22 +297,11 @@ send_command(Pid, Msg) ->
297297

298298
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
299299
deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
300-
case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin,
301-
rabbit_nodes:all_running_with_hashes()) of
300+
Nodes = rabbit_nodes:all_running_with_hashes(),
301+
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
302302
{ok, Pid, Key} ->
303-
delegate:invoke_no_result(Pid, {?MODULE, deliver_reply_local,
304-
[Key, Message]});
305-
{error, _} ->
306-
deliver_reply_v1(EncodedBin, Message)
307-
end.
308-
309-
-spec deliver_reply_v1(binary(), mc:state()) -> 'ok'.
310-
deliver_reply_v1(EncodedBin, Message) ->
311-
%% the the original encoding function
312-
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
313-
{ok, V1Pid, V1Key} ->
314-
delegate:invoke_no_result(V1Pid,
315-
{?MODULE, deliver_reply_local, [V1Key, Message]});
303+
delegate:invoke_no_result(
304+
Pid, {?MODULE, deliver_reply_local, [Key, Message]});
316305
{error, _} ->
317306
ok
318307
end.
@@ -331,30 +320,19 @@ deliver_reply_local(Pid, Key, Message) ->
331320
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
332321
exists;
333322
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
334-
case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of
335-
{error, _} ->
336-
declare_fast_reply_to_v1(EncodedBin);
323+
Nodes = rabbit_nodes:all_running_with_hashes(),
324+
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
337325
{ok, Pid, Key} ->
338326
Msg = {declare_fast_reply_to, Key},
339327
rabbit_misc:with_exit_handler(
340328
rabbit_misc:const(not_found),
341-
fun() -> gen_server2:call(Pid, Msg, infinity) end)
329+
fun() -> gen_server2:call(Pid, Msg, infinity) end);
330+
{error, _} ->
331+
not_found
342332
end;
343333
declare_fast_reply_to(_) ->
344334
not_found.
345335

346-
declare_fast_reply_to_v1(EncodedBin) ->
347-
%% the the original encoding function
348-
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
349-
{ok, V1Pid, V1Key} ->
350-
Msg = {declare_fast_reply_to, V1Key},
351-
rabbit_misc:with_exit_handler(
352-
rabbit_misc:const(not_found),
353-
fun() -> gen_server2:call(V1Pid, Msg, infinity) end);
354-
{error, _} ->
355-
not_found
356-
end.
357-
358336
-spec list() -> [pid()].
359337

360338
list() ->
@@ -1319,7 +1297,7 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
13191297
Other -> Other
13201298
end,
13211299
%% Precalculate both suffix and key
1322-
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()),
1300+
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix(self()),
13231301
Consumer = {CTag, Suffix, Key},
13241302
State1 = State#ch{reply_consumer = Consumer},
13251303
case NoWait of

deps/rabbit/src/rabbit_direct_reply_to.erl

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,45 +7,14 @@
77

88
-module(rabbit_direct_reply_to).
99

10-
%% API
11-
-export([
12-
%% Original amq.rabbitmq.reply-to target channel encoding
13-
compute_key_and_suffix_v1/1,
14-
decode_reply_to_v1/1,
10+
-export([compute_key_and_suffix/1,
11+
decode_reply_to/2]).
1512

16-
%% v2 amq.rabbitmq.reply-to target channel encoding
17-
compute_key_and_suffix_v2/1,
18-
decode_reply_to_v2/2
19-
]).
20-
21-
%%
22-
%% API
23-
%%
24-
25-
-type decoded_pid_and_key() :: {ok, pid(), binary()}.
26-
27-
-spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}.
28-
%% This original pid encoding function produces values that exceed routing key length limit
29-
%% on nodes with long (say, 130+ characters) node names.
30-
compute_key_and_suffix_v1(Pid) ->
31-
Key = base64:encode(rabbit_guid:gen()),
32-
PidEnc = base64:encode(term_to_binary(Pid)),
33-
Suffix = <<PidEnc/binary, ".", Key/binary>>,
34-
{Key, Suffix}.
35-
36-
-spec decode_reply_to_v1(binary()) -> decoded_pid_and_key() | {error, any()}.
37-
decode_reply_to_v1(Bin) ->
38-
case string:lexemes(Bin, ".") of
39-
[PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
40-
{ok, Pid, unicode:characters_to_binary(Key)};
41-
_ -> {error, unrecognized_format}
42-
end.
43-
44-
45-
-spec compute_key_and_suffix_v2(pid()) -> {binary(), binary()}.
4613
%% This pid encoding function produces values that are of mostly fixed size
4714
%% regardless of the node name length.
48-
compute_key_and_suffix_v2(Pid) ->
15+
-spec compute_key_and_suffix(pid()) ->
16+
{binary(), binary()}.
17+
compute_key_and_suffix(Pid) ->
4918
Key = base64:encode(rabbit_guid:gen()),
5019

5120
PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid),
@@ -61,19 +30,22 @@ compute_key_and_suffix_v2(Pid) ->
6130
Suffix = <<RecomposedEncoded/binary, ".", Key/binary>>,
6231
{Key, Suffix}.
6332

64-
-spec decode_reply_to_v2(binary(), #{non_neg_integer() => node()}) -> decoded_pid_and_key() | {error, any()}.
65-
decode_reply_to_v2(Bin, CandidateNodes) ->
33+
-spec decode_reply_to(binary(), #{non_neg_integer() => node()}) ->
34+
{ok, pid(), binary()} | {error, any()}.
35+
decode_reply_to(Bin, CandidateNodes) ->
6636
try
6737
[PidEnc, Key] = binary:split(Bin, <<".">>),
6838
RawPidBin = base64:decode(PidEnc),
6939
PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin),
7040
{_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename),
7141
case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of
72-
undefined -> {error, target_node_not_found};
42+
undefined ->
43+
{error, target_node_not_found};
7344
Candidate ->
7445
PidParts = maps:update(node, Candidate, PidParts0),
7546
{ok, pid_recomposition:recompose(PidParts), Key}
7647
end
7748
catch
78-
error:_ -> {error, unrecognized_format}
49+
error:_ ->
50+
{error, unrecognized_format}
7951
end.

deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
all() ->
1010
[
11-
decode_reply_to_v2
11+
decode_reply_to
1212
].
1313

1414
init_per_suite(Config) ->
@@ -32,7 +32,7 @@ end_per_testcase(_TestCase, _Config) ->
3232
%%% Tests %%%
3333

3434

35-
decode_reply_to_v2(Config) ->
35+
decode_reply_to(Config) ->
3636
rabbit_ct_proper_helpers:run_proper(
3737
fun() -> prop_decode_reply_to(Config) end,
3838
[],
@@ -61,9 +61,9 @@ prop_decode_reply_to(_) ->
6161
NonB64 = <<0, Random/binary>>,
6262

6363
{ok, pid_recomposition:recompose(PidParts), Key} =:=
64-
rabbit_direct_reply_to:decode_reply_to_v2(IxBin, NodeMap)
64+
rabbit_direct_reply_to:decode_reply_to(IxBin, NodeMap)
6565
andalso {error, target_node_not_found} =:=
66-
rabbit_direct_reply_to:decode_reply_to_v2(IxBin, NoNodeMap)
66+
rabbit_direct_reply_to:decode_reply_to(IxBin, NoNodeMap)
6767
andalso {error, unrecognized_format} =:=
68-
rabbit_direct_reply_to:decode_reply_to_v2(NonB64, NodeMap)
68+
rabbit_direct_reply_to:decode_reply_to(NonB64, NodeMap)
6969
end).

0 commit comments

Comments
 (0)