Skip to content

Commit d890cf1

Browse files
Merge pull request #14147 from rabbitmq/issue-13429
Display AMQP filters in Management UI
2 parents f553775 + 23c6730 commit d890cf1

31 files changed

+6288
-204
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-record(filter, {
9+
descriptor :: binary() | non_neg_integer(),
10+
value :: term()
11+
}).

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
-module(amqp10_client).
99

10-
-include("amqp10_client.hrl").
10+
-include("amqp10_client_internal.hrl").
1111
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1212

1313
-export([open_connection/1,

deps/amqp10_client/src/amqp10_client_connection.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-behaviour(gen_statem).
1111

12-
-include("amqp10_client.hrl").
12+
-include("amqp10_client_internal.hrl").
1313
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1414
-include_lib("amqp10_common/include/amqp10_types.hrl").
1515

deps/amqp10_client/src/amqp10_client_frame_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
-behaviour(gen_statem).
1010

11-
-include("amqp10_client.hrl").
11+
-include("amqp10_client_internal.hrl").
1212
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1313

1414
-ifdef(TEST).

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-behaviour(gen_statem).
1010

1111
-include("amqp10_client.hrl").
12+
-include("amqp10_client_internal.hrl").
1213
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1314
-include_lib("amqp10_common/include/amqp10_types.hrl").
1415

@@ -86,7 +87,7 @@
8687
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8788

8889
% http://www.amqp.org/specification/1.0/filters
89-
-type filter() :: #{binary() => binary() | map() | list(binary())}.
90+
-type filter() :: #{binary() => #filter{} | binary() | map() | list(binary())}.
9091
-type max_message_size() :: undefined | non_neg_integer().
9192
-type footer_opt() :: crc32 | adler32.
9293

@@ -781,29 +782,39 @@ translate_filters(Filters)
781782
when map_size(Filters) =:= 0 ->
782783
undefined;
783784
translate_filters(Filters) ->
784-
{map,
785-
maps:fold(
786-
fun
787-
(<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
788-
%% special case conversion
789-
Key = sym(K),
790-
[{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
791-
(K, V, Acc) when is_binary(K) ->
792-
%% try treat any filter value generically
793-
Key = sym(K),
794-
Value = filter_value_type(V),
795-
[{Key, {described, Key, Value}} | Acc]
796-
end, [], Filters)}.
797-
798-
filter_value_type(V) when is_binary(V) ->
785+
{map, lists:map(
786+
fun({Name, #filter{descriptor = Desc,
787+
value = V}})
788+
when is_binary(Name) ->
789+
Descriptor = if is_binary(Desc) -> {symbol, Desc};
790+
is_integer(Desc) -> {ulong, Desc}
791+
end,
792+
{{symbol, Name}, {described, Descriptor, V}};
793+
({<<"apache.org:legacy-amqp-headers-binding:map">> = K, V})
794+
when is_map(V) ->
795+
%% special case conversion
796+
Key = sym(K),
797+
Val = translate_legacy_amqp_headers_binding(V),
798+
{Key, {described, Key, Val}};
799+
({K, V})
800+
when is_binary(K) ->
801+
Key = {symbol, K},
802+
Val = filter_value_type(V),
803+
{Key, {described, Key, Val}}
804+
end, maps:to_list(Filters))}.
805+
806+
filter_value_type(V)
807+
when is_binary(V) ->
799808
%% this is clearly not always correct
800809
{utf8, V};
801810
filter_value_type(V)
802811
when is_integer(V) andalso V >= 0 ->
803812
{uint, V};
804-
filter_value_type(VList) when is_list(VList) ->
813+
filter_value_type(VList)
814+
when is_list(VList) ->
805815
{list, [filter_value_type(V) || V <- VList]};
806-
filter_value_type({T, _} = V) when is_atom(T) ->
816+
filter_value_type({T, _} = V)
817+
when is_atom(T) ->
807818
%% looks like an already tagged type, just pass it through
808819
V.
809820

@@ -1507,16 +1518,17 @@ translate_filters_selector_filter_test() ->
15071518
} = translate_filters(#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}).
15081519

15091520
translate_filters_multiple_filters_test() ->
1510-
{map,
1511-
[
1512-
{{symbol, <<"apache.org:selector-filter:string">>},
1513-
{described, {symbol, <<"apache.org:selector-filter:string">>},
1514-
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
1515-
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
1516-
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}
1517-
]
1518-
} = translate_filters(#{
1519-
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
1520-
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
1521-
}).
1521+
{map, Actual} = translate_filters(
1522+
#{
1523+
<<"apache.org:legacy-amqp-direct-binding:string">> => <<"my topic">>,
1524+
<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>
1525+
}),
1526+
Expected = [{{symbol, <<"apache.org:selector-filter:string">>},
1527+
{described, {symbol, <<"apache.org:selector-filter:string">>},
1528+
{utf8, <<"amqp.annotation.x-opt-enqueuedtimeutc > 123456789">>}}},
1529+
{{symbol, <<"apache.org:legacy-amqp-direct-binding:string">>},
1530+
{described, {symbol, <<"apache.org:legacy-amqp-direct-binding:string">>}, {utf8,<<"my topic">>}}}],
1531+
ActualSorted = lists:sort(Actual),
1532+
ExpectedSorted = lists:sort(Expected),
1533+
ExpectedSorted = ActualSorted.
15221534
-endif.

deps/amqp10_client/test/mock_server.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
recv_amqp_header_step/1
1717
]).
1818

19-
-include("src/amqp10_client.hrl").
19+
-include("src/amqp10_client_internal.hrl").
2020

2121
start(Port) ->
2222
{ok, LSock} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, false}]),
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
%% A filter with this name contains a JMS message selector.
8+
%% We use the same name as sent by the Qpid JMS client in
9+
%% https://github.com/apache/qpid-jms/blob/2.7.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java#L75
10+
-define(FILTER_NAME_JMS, <<"jms-selector">>).
11+
12+
%% A filter with this name contains an SQL expression.
13+
%% In the current version, such a filter must comply with the JMS message selector syntax.
14+
%% However, we use a name other than "jms-selector" in case we want to extend the allowed syntax
15+
%% in the future, for example allowing for some of the extended grammar described in
16+
%% §6 "SQL Filter Expressions" of
17+
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
18+
-define(FILTER_NAME_SQL, <<"sql-filter">>).
19+
20+
%% SQL-based filtering syntax
21+
%% These descriptors are defined in
22+
%% https://www.amqp.org/specification/1.0/filters
23+
-define(DESCRIPTOR_NAME_SELECTOR_FILTER, <<"apache.org:selector-filter:string">>).
24+
-define(DESCRIPTOR_CODE_SELECTOR_FILTER, 16#0000468C00000004).
25+
26+
%% AMQP Filter Expressions Version 1.0 Working Draft 09
27+
%% https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227
28+
-define(DESCRIPTOR_NAME_PROPERTIES_FILTER, <<"amqp:properties-filter">>).
29+
-define(DESCRIPTOR_CODE_PROPERTIES_FILTER, 16#173).
30+
-define(DESCRIPTOR_NAME_APPLICATION_PROPERTIES_FILTER, <<"amqp:application-properties-filter">>).
31+
-define(DESCRIPTOR_CODE_APPLICATION_PROPERTIES_FILTER, 16#174).

deps/amqp10_common/include/amqp10_filtex.hrl

Lines changed: 0 additions & 15 deletions
This file was deleted.

deps/rabbit/Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ define ct_master.erl
258258
endef
259259

260260
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
261-
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filtex amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
261+
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_jms_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
262262
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
263263
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
264264

@@ -363,6 +363,9 @@ ifdef TRACE_SUPERVISOR2
363363
RMQ_ERLC_OPTS += -DTRACE_SUPERVISOR2=true
364364
endif
365365

366+
# https://www.erlang.org/doc/apps/parsetools/leex.html#file/2
367+
export ERL_COMPILER_OPTIONS := deterministic
368+
366369
# --------------------------------------------------------------------
367370
# Documentation.
368371
# --------------------------------------------------------------------

deps/rabbit/src/mc.erl

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
-type str() :: atom() | string() | binary().
4646
-type internal_ann_key() :: atom().
4747
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
48-
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
48+
-type x_ann_value() :: str() | number() | TaggedValue :: tuple() | [x_ann_value()].
4949
-type protocol() :: module().
5050
-type annotations() :: #{internal_ann_key() => term(),
5151
x_ann_key() => x_ann_value()}.
@@ -76,8 +76,7 @@
7676
-type property_value() :: undefined |
7777
string() |
7878
binary() |
79-
integer() |
80-
float() |
79+
number() |
8180
boolean().
8281
-type tagged_value() :: {uuid, binary()} |
8382
{utf8, binary()} |
@@ -155,9 +154,9 @@ init(Proto, Data, Anns) ->
155154
-spec init(protocol(), term(), annotations(), environment()) -> state().
156155
init(Proto, Data, Anns0, Env) ->
157156
{ProtoData, ProtoAnns} = Proto:init(Data),
158-
Anns1 = case map_size(Env) =:= 0 of
159-
true -> Anns0;
160-
false -> Anns0#{env => Env}
157+
Anns1 = case map_size(Env) of
158+
0 -> Anns0;
159+
_ -> Anns0#{env => Env}
161160
end,
162161
Anns2 = maps:merge(ProtoAnns, Anns1),
163162
Anns = ensure_received_at_timestamp(Anns2),
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-module(rabbit_amqp_filter).
8+
9+
-export([eval/2]).
10+
11+
-type expression() :: undefined |
12+
{property, rabbit_amqp_filter_prop:parsed_expressions()} |
13+
{jms, rabbit_amqp_filter_jms:parsed_expression()}.
14+
15+
-export_type([expression/0]).
16+
17+
-spec eval(expression(), mc:state()) -> boolean().
18+
eval(undefined, _Mc) ->
19+
%% A receiver without filter wants all messages.
20+
true;
21+
eval({property, Expr}, Mc) ->
22+
rabbit_amqp_filter_prop:eval(Expr, Mc);
23+
eval({jms, Expr}, Mc) ->
24+
rabbit_amqp_filter_jms:eval(Expr, Mc).

0 commit comments

Comments
 (0)