Skip to content

Commit 0cd2b3e

Browse files
Merge pull request #13086 from rabbitmq/mergify/bp/v4.0.x/pr-13085
Delete stream consumer metrics when AMQP 091 connection closes (backport #13085)
2 parents dedd9f4 + 21e69e3 commit 0cd2b3e

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

+9-3
Original file line numberDiff line numberDiff line change
@@ -969,9 +969,15 @@ init(Q) when ?is_amqqueue(Q) ->
969969
E
970970
end.
971971

972-
close(#stream_client{readers = Readers}) ->
973-
maps:foreach(fun (_, #stream{log = Log}) ->
974-
osiris_log:close(Log)
972+
close(#stream_client{readers = Readers,
973+
name = QName}) ->
974+
maps:foreach(fun (CTag, #stream{log = Log}) ->
975+
close_log(Log),
976+
rabbit_core_metrics:consumer_deleted(self(), CTag, QName),
977+
rabbit_event:notify(consumer_deleted,
978+
[{consumer_tag, CTag},
979+
{channel, self()},
980+
{queue, QName}])
975981
end, Readers).
976982

977983
update(Q, State)

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

+28
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ all_tests_3() ->
143143
consume_credit_out_of_order_ack,
144144
consume_credit_multiple_ack,
145145
basic_cancel,
146+
consumer_metrics_cleaned_on_connection_close,
146147
receive_basic_cancel_on_queue_deletion,
147148
keep_consuming_on_leader_restart,
148149
max_length_bytes,
@@ -1184,6 +1185,33 @@ basic_cancel(Config) ->
11841185
end,
11851186
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
11861187

1188+
consumer_metrics_cleaned_on_connection_close(Config) ->
1189+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1190+
1191+
Q = ?config(queue_name, Config),
1192+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1193+
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1194+
1195+
Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
1196+
{ok, Ch} = amqp_connection:open_channel(Conn),
1197+
qos(Ch, 10, false),
1198+
CTag = <<"consumer_metrics_cleaned_on_connection_close">>,
1199+
subscribe(Ch, Q, false, 0, CTag),
1200+
rabbit_ct_helpers:await_condition(
1201+
fun() ->
1202+
1 == length(filter_consumers(Config, Server, CTag))
1203+
end, 30000),
1204+
1205+
ok = rabbit_ct_client_helpers:close_connection(Conn),
1206+
1207+
rabbit_ct_helpers:await_condition(
1208+
fun() ->
1209+
0 == length(filter_consumers(Config, Server, CTag))
1210+
end, 30000),
1211+
1212+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
1213+
1214+
11871215
receive_basic_cancel_on_queue_deletion(Config) ->
11881216
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
11891217

0 commit comments

Comments
 (0)