Skip to content

Definition export: inject default queue type into virtual host metadata #12821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 27, 2024
7 changes: 5 additions & 2 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1071,10 +1071,13 @@ list_vhosts() ->
[vhost_definition(V) || V <- rabbit_vhost:all()].

vhost_definition(VHost) ->
Name = vhost:get_name(VHost),
DQT = rabbit_queue_type:short_alias_of(rabbit_vhost:default_queue_type(Name)),
#{
<<"name">> => vhost:get_name(VHost),
<<"name">> => Name,
<<"limits">> => vhost:get_limits(VHost),
<<"metadata">> => vhost:get_metadata(VHost)
<<"metadata">> => vhost:get_metadata(VHost),
<<"default_queue_type">> => DQT
}.

list_users() ->
Expand Down
40 changes: 40 additions & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-behaviour(rabbit_registry_class).

-include("amqqueue.hrl").
-include("vhost.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").

Expand All @@ -22,7 +23,10 @@
feature_flag_name/1,
to_binary/1,
default/0,
default_alias/0,
fallback/0,
inject_dqt/1,
vhosts_with_dqt/1,
is_enabled/1,
is_compatible/4,
declare/2,
Expand Down Expand Up @@ -319,6 +323,15 @@ short_alias_of(rabbit_stream_queue) ->
%% AMQP 1.0 management client
short_alias_of({utf8, <<"stream">>}) ->
<<"stream">>;
%% for cases where this function is used for
%% formatting of values that already might use these
%% short aliases
short_alias_of(<<"quorum">>) ->
<<"quorum">>;
short_alias_of(<<"classic">>) ->
<<"classic">>;
short_alias_of(<<"stream">>) ->
<<"stream">>;
short_alias_of(_Other) ->
undefined.

Expand All @@ -345,6 +358,10 @@ default() ->
fallback()),
rabbit_data_coercion:to_atom(V).

-spec default_alias() -> binary().
default_alias() ->
short_alias_of(default()).

-spec to_binary(module()) -> binary().
to_binary(rabbit_classic_queue) ->
<<"classic">>;
Expand Down Expand Up @@ -841,6 +858,29 @@ known_queue_type_names() ->
QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
?KNOWN_QUEUE_TYPES ++ QTypeBins.

inject_dqt(VHost) when ?is_vhost(VHost) ->
inject_dqt(vhost:to_map(VHost));
inject_dqt(VHost) when is_list(VHost) ->
inject_dqt(rabbit_data_coercion:to_map(VHost));
inject_dqt(M = #{default_queue_type := undefined}) ->
NQT = short_alias_of(default()),
Meta0 = maps:get(metadata, M, #{}),
Meta = Meta0#{default_queue_type => NQT},

M#{default_queue_type => NQT, metadata => Meta};
inject_dqt(M = #{default_queue_type := DQT}) ->
NQT = short_alias_of(DQT),
Meta0 = maps:get(metadata, M, #{}),
Meta = Meta0#{default_queue_type => NQT},

M#{default_queue_type => NQT, metadata => Meta}.

-spec vhosts_with_dqt([any()]) -> [map()].
vhosts_with_dqt(List) when is_list(List) ->
%% inject DQT (default queue type) at the top level and
%% its metadata
lists:map(fun inject_dqt/1, List).

-spec check_queue_limits(amqqueue:amqqueue()) ->
ok |
{error, queue_limit_exceeded, Reason :: string(), Args :: term()}.
Expand Down
22 changes: 17 additions & 5 deletions deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,14 @@ default_queue_type(VirtualHost) ->
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
default_queue_type(VirtualHost, FallbackQueueType) ->
NodeDefault = application:get_env(rabbit, default_queue_type, FallbackQueueType),
case exists(VirtualHost) of
false -> FallbackQueueType;
false -> NodeDefault;
true ->
Record = lookup(VirtualHost),
case vhost:get_default_queue_type(Record) of
undefined -> FallbackQueueType;
<<"undefined">> -> FallbackQueueType;
undefined -> NodeDefault;
<<"undefined">> -> NodeDefault;
Type -> Type
end
end.
Expand Down Expand Up @@ -622,8 +623,19 @@ i(tracing, VHost) -> rabbit_trace:enabled(vhost:get_name(VHost));
i(cluster_state, VHost) -> vhost_cluster_state(vhost:get_name(VHost));
i(description, VHost) -> vhost:get_description(VHost);
i(tags, VHost) -> vhost:get_tags(VHost);
i(default_queue_type, VHost) -> vhost:get_default_queue_type(VHost);
i(metadata, VHost) -> vhost:get_metadata(VHost);
i(default_queue_type, VHost) -> rabbit_queue_type:short_alias_of(default_queue_type(vhost:get_name(VHost)));
i(metadata, VHost) ->
DQT = rabbit_queue_type:short_alias_of(default_queue_type(vhost:get_name(VHost))),
case vhost:get_metadata(VHost) of
undefined ->
#{default_queue_type => DQT};
M = #{default_queue_type := undefined} ->
M#{default_queue_type => DQT};
M = #{default_queue_type := QT} ->
M#{default_queue_type => rabbit_queue_type:short_alias_of(QT)};
M when is_map(M) ->
M#{default_queue_type => DQT}
end;
i(Item, VHost) ->
rabbit_log:error("Don't know how to compute a virtual host info item '~ts' for virtual host '~tp'", [Item, VHost]),
throw({bad_argument, Item}).
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/src/vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
get_description/1,
get_tags/1,
get_default_queue_type/1,

set_limits/2,
set_metadata/2,
merge_metadata/2,
new_metadata/3,
is_tagged_with/2
is_tagged_with/2,

to_map/1
]).

-define(record_version, vhost_v2).
Expand Down Expand Up @@ -196,3 +199,13 @@ new_metadata(Description, Tags, DefaultQueueType) ->
-spec is_tagged_with(vhost(), tag()) -> boolean().
is_tagged_with(VHost, Tag) ->
lists:member(Tag, get_tags(VHost)).

-spec to_map(vhost()) -> map().
to_map(VHost) ->
#{
name => get_name(VHost),
description => get_description(VHost),
tags => get_tags(VHost),
default_queue_type => get_default_queue_type(VHost),
metadata => get_metadata(VHost)
}.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
## Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

defmodule RabbitMQ.CLI.Ctl.Commands.ExportDefinitionsCommand do
alias RabbitMQ.CLI.Core.{DocGuide, ExitCodes, Helpers}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListVhostsCommand do
use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout

def merge_defaults([], opts) do
# this default historically benefits those who script using 'rabbitmqctl list_vhosts',
# adding more fields here would break scripts but be more useful to a human reader. MK.
merge_defaults(["name"], opts)
end

Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
add_vhost/2,
add_vhost/3,
add_vhost/4,
update_vhost_metadata/3,
delete_vhost/2,
delete_vhost/3,
delete_vhost/4,
Expand Down Expand Up @@ -1602,6 +1603,9 @@ add_vhost(Config, Node, VHost) ->
add_vhost(Config, Node, VHost, Username) ->
catch rpc(Config, Node, rabbit_vhost, add, [VHost, Username]).

update_vhost_metadata(Config, VHost, Meta) ->
catch rpc(Config, 0, rabbit_vhost, update_metadata, [VHost, Meta, <<"acting-user">>]).

delete_vhost(Config, VHost) ->
delete_vhost(Config, 0, VHost).

Expand Down
126 changes: 83 additions & 43 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,39 @@ all_definitions(ReqData, Context) ->
Vsn = rabbit:base_product_version(),
ProductName = rabbit:product_name(),
ProductVersion = rabbit:product_version(),
rabbit_mgmt_util:reply(
[{rabbit_version, rabbit_data_coercion:to_binary(Vsn)},
{rabbitmq_version, rabbit_data_coercion:to_binary(Vsn)},
{product_name, rabbit_data_coercion:to_binary(ProductName)},
{product_version, rabbit_data_coercion:to_binary(ProductVersion)}] ++
retain_whitelisted(
[{users, rabbit_mgmt_wm_users:users(all)},
{vhosts, rabbit_mgmt_wm_vhosts:basic()},
{permissions, rabbit_mgmt_wm_permissions:permissions()},
{topic_permissions, rabbit_mgmt_wm_topic_permissions:topic_permissions()},
{parameters, rabbit_mgmt_wm_parameters:basic(ReqData)},
{global_parameters, rabbit_mgmt_wm_global_parameters:basic()},
{policies, rabbit_mgmt_wm_policies:basic(ReqData)},
{queues, Qs},
{exchanges, Xs},
{bindings, Bs}]),
case rabbit_mgmt_util:qs_val(<<"download">>, ReqData) of
undefined -> ReqData;
Filename -> rabbit_mgmt_util:set_resp_header(
<<"Content-Disposition">>,
"attachment; filename=" ++
binary_to_list(Filename), ReqData)
end,
Context).

Contents = [
{users, rabbit_mgmt_wm_users:users(all)},
{vhosts, rabbit_mgmt_wm_vhosts:basic()},
{permissions, rabbit_mgmt_wm_permissions:permissions()},
{topic_permissions, rabbit_mgmt_wm_topic_permissions:topic_permissions()},
{parameters, rabbit_mgmt_wm_parameters:basic(ReqData)},
{global_parameters, rabbit_mgmt_wm_global_parameters:basic()},
{policies, rabbit_mgmt_wm_policies:basic(ReqData)},
{queues, Qs},
{exchanges, Xs},
{bindings, Bs}
],

TopLevelDefsAndMetadata = [
{rabbit_version, rabbit_data_coercion:to_binary(Vsn)},
{rabbitmq_version, rabbit_data_coercion:to_binary(Vsn)},
{product_name, rabbit_data_coercion:to_binary(ProductName)},
{product_version, rabbit_data_coercion:to_binary(ProductVersion)},
{rabbitmq_definition_format, <<"cluster">>},
{original_cluster_name, rabbit_nodes:cluster_name()},
{explanation, rabbit_data_coercion:to_binary(io_lib:format("Definitions of cluster '~ts'", [rabbit_nodes:cluster_name()]))}
],
Result = TopLevelDefsAndMetadata ++ retain_whitelisted(Contents),
ReqData1 = case rabbit_mgmt_util:qs_val(<<"download">>, ReqData) of
undefined -> ReqData;
Filename -> rabbit_mgmt_util:set_resp_header(
<<"Content-Disposition">>,
"attachment; filename=" ++
binary_to_list(Filename), ReqData)
end,

rabbit_mgmt_util:reply(Result, ReqData1, Context).

accept_json(ReqData0, Context) ->
BodySizeLimit = application:get_env(rabbitmq_management, max_http_body_size, ?MANAGEMENT_DEFAULT_HTTP_MAX_BODY_SIZE),
Expand All @@ -94,7 +103,10 @@ accept_json(ReqData0, Context) ->
accept(Body, ReqData, Context)
end.

vhost_definitions(ReqData, VHost, Context) ->
vhost_definitions(ReqData, VHostName, Context) ->
%% the existence of this virtual host is verified in the called, 'to_json/2'
VHost = rabbit_vhost:lookup(VHostName),

%% rabbit_mgmt_wm_<>:basic/1 filters by VHost if it is available.
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
Xs = [strip_vhost(X) || X <- rabbit_mgmt_wm_exchanges:basic(ReqData),
Expand All @@ -105,25 +117,48 @@ vhost_definitions(ReqData, VHost, Context) ->
%% TODO: should we stop stripping virtual host? Such files cannot be imported on boot, for example.
Bs = [strip_vhost(B) || B <- rabbit_mgmt_wm_bindings:basic(ReqData),
export_binding(B, QNames)],
{ok, Vsn} = application:get_key(rabbit, vsn),
Parameters = [strip_vhost(
rabbit_mgmt_format:parameter(P))
|| P <- rabbit_runtime_parameters:list(VHost)],
rabbit_mgmt_util:reply(
[{rabbit_version, rabbit_data_coercion:to_binary(Vsn)}] ++
retain_whitelisted(
[{parameters, Parameters},
{policies, [strip_vhost(P) || P <- rabbit_mgmt_wm_policies:basic(ReqData)]},
{queues, Qs},
{exchanges, Xs},
{bindings, Bs}]),
case rabbit_mgmt_util:qs_val(<<"download">>, ReqData) of
undefined -> ReqData;
Filename ->
HeaderVal = "attachment; filename=" ++ binary_to_list(Filename),
rabbit_mgmt_util:set_resp_header(<<"Content-Disposition">>, HeaderVal, ReqData)
end,
Context).
|| P <- rabbit_runtime_parameters:list(VHostName)],
Contents = [
{parameters, Parameters},
{policies, [strip_vhost(P) || P <- rabbit_mgmt_wm_policies:basic(ReqData)]},
{queues, Qs},
{exchanges, Xs},
{bindings, Bs}
],

Vsn = rabbit:base_product_version(),
ProductName = rabbit:product_name(),
ProductVersion = rabbit:product_version(),

DQT = rabbit_queue_type:short_alias_of(rabbit_vhost:default_queue_type(VHostName)),
%% note: the type changes to a map
VHost1 = rabbit_queue_type:inject_dqt(VHost),
Metadata = maps:get(metadata, VHost1),

TopLevelDefsAndMetadata = [
{rabbit_version, rabbit_data_coercion:to_binary(Vsn)},
{rabbitmq_version, rabbit_data_coercion:to_binary(Vsn)},
{product_name, rabbit_data_coercion:to_binary(ProductName)},
{product_version, rabbit_data_coercion:to_binary(ProductVersion)},
{rabbitmq_definition_format, <<"single_virtual_host">>},
{original_vhost_name, VHostName},
{explanation, rabbit_data_coercion:to_binary(io_lib:format("Definitions of virtual host '~ts'", [VHostName]))},
{metadata, Metadata},
{description, vhost:get_description(VHost)},
{default_queue_type, DQT},
{limits, vhost:get_limits(VHost)}
],
Result = TopLevelDefsAndMetadata ++ retain_whitelisted(Contents),

ReqData1 = case rabbit_mgmt_util:qs_val(<<"download">>, ReqData) of
undefined -> ReqData;
Filename ->
HeaderVal = "attachment; filename=" ++ binary_to_list(Filename),
rabbit_mgmt_util:set_resp_header(<<"Content-Disposition">>, HeaderVal, ReqData)
end,
rabbit_mgmt_util:reply(Result, ReqData1, Context).

accept_multipart(ReqData0, Context) ->
{Parts, ReqData} = get_all_parts(ReqData0),
Expand Down Expand Up @@ -271,7 +306,12 @@ retain_whitelisted(Items) ->
retain_whitelisted_items(Name, List, Allowed) ->
{Name, [only_whitelisted_for_item(I, Allowed) || I <- List]}.

only_whitelisted_for_item(Item, Allowed) ->
only_whitelisted_for_item(Item, Allowed) when is_map(Item) ->
Map1 = maps:with(Allowed, Item),
maps:filter(fun(_Key, Val) ->
Val =/= undefined
end, Map1);
only_whitelisted_for_item(Item, Allowed) when is_list(Item) ->
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed), Fact =/= undefined].

strip_vhost(Item) ->
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_overview.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ to_json(ReqData, Context = #context{user = User = #user{tags = Tags}}) ->
{erlang_full_version, erlang_full_version()},
{release_series_support_status, rabbit_release_series:readable_support_status()},
{disable_stats, rabbit_mgmt_util:disable_stats(ReqData)},
{default_queue_type, rabbit_queue_type:default_alias()},
{is_op_policy_updating_enabled, not rabbit_mgmt_features:is_op_policy_updating_disabled()},
{enable_queue_totals, rabbit_mgmt_util:enable_queue_totals(ReqData)}],
try
Expand Down
14 changes: 10 additions & 4 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_vhosts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ to_json(ReqData, Context = #context{user = User}) ->
try
Basic = [rabbit_vhost:info(V)
|| V <- rabbit_mgmt_util:list_visible_vhosts(User)],
Data = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT,
?BASIC_COLUMNS, ReqData,
Context, fun augment/2),
Augmented = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT,
?BASIC_COLUMNS, ReqData,
Context, fun augment/2),
%% inject default DQT into virtual host metadata,
%% where necessary
Data = rabbit_queue_type:vhosts_with_dqt(Augmented),
rabbit_mgmt_util:reply(Data, ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
Expand Down Expand Up @@ -64,4 +67,7 @@ augmented(ReqData, #context{user = User}) ->
end.

basic() ->
rabbit_vhost:info_all([name, description, tags, default_queue_type, metadata]).
Maps = lists:map(
fun maps:from_list/1,
rabbit_vhost:info_all([name, description, tags, default_queue_type, metadata])),
rabbit_queue_type:vhosts_with_dqt(Maps).
Loading
Loading