Skip to content

Commit 14baec0

Browse files
authored
Merge pull request #96 from rabbitmq/token-renew
Support token renewal
2 parents 1c49897 + f377524 commit 14baec0

File tree

19 files changed

+451
-93
lines changed

19 files changed

+451
-93
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27+
env:
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:main'
2729
- name: Start toxiproxy
2830
run: ci/start-toxiproxy.sh
2931
- name: Display Java version

ci/start-broker.sh

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env bash
22

3-
RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0}
3+
RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}
44

55
wait_for_message() {
66
while ! docker logs "$1" | grep -q "$2";
@@ -17,7 +17,7 @@ cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
1717
chmod o+r rabbitmq-configuration/tls/*
1818
chmod g+r rabbitmq-configuration/tls/*
1919

20-
echo "[rabbitmq_auth_mechanism_ssl]." >> rabbitmq-configuration/enabled_plugins
20+
echo "[rabbitmq_auth_mechanism_ssl,rabbitmq_auth_backend_oauth2]." >> rabbitmq-configuration/enabled_plugins
2121

2222
echo "loopback_users = none
2323
@@ -34,7 +34,24 @@ ssl_options.depth = 1
3434
3535
auth_mechanisms.1 = PLAIN
3636
auth_mechanisms.2 = ANONYMOUS
37-
auth_mechanisms.3 = EXTERNAL" >> rabbitmq-configuration/rabbitmq.conf
37+
auth_mechanisms.3 = EXTERNAL
38+
39+
auth_backends.1 = internal
40+
auth_backends.2 = rabbit_auth_backend_oauth2" >> rabbitmq-configuration/rabbitmq.conf
41+
42+
echo "[
43+
{rabbitmq_auth_backend_oauth2, [{key_config,
44+
[{signing_keys,
45+
#{<<\"token-key\">> =>
46+
{map,
47+
#{<<\"alg\">> => <<\"HS256\">>,
48+
<<\"k\">> => <<\"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH\">>,
49+
<<\"kid\">> => <<\"token-key\">>,
50+
<<\"kty\">> => <<\"oct\">>,
51+
<<\"use\">> => <<\"sig\">>,
52+
<<\"value\">> => <<\"token-key\">>}}}}]},
53+
{resource_server_id,<<\"rabbitmq\">>}]}
54+
]." >> rabbitmq-configuration/advanced.config
3855

3956
echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
4057

pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@
5151
<assertj.version>3.26.3</assertj.version>
5252
<mockito.version>5.14.2</mockito.version>
5353
<jqwik.version>1.9.1</jqwik.version>
54-
<amqp-client.version>5.20.0</amqp-client.version>
54+
<amqp-client.version>5.22.0</amqp-client.version>
5555
<micrometer-tracing-test.version>1.4.0</micrometer-tracing-test.version>
5656
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
57+
<jose4j.version>0.9.6</jose4j.version>
5758
<maven.compiler.plugin.version>3.13.0</maven.compiler.plugin.version>
5859
<maven.dependency.plugin.version>3.8.1</maven.dependency.plugin.version>
5960
<maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version>
@@ -236,6 +237,14 @@
236237
<scope>provided</scope>
237238
</dependency>
238239

240+
<dependency>
241+
<groupId>org.bitbucket.b_c</groupId>
242+
<artifactId>jose4j</artifactId>
243+
<version>${jose4j.version}</version>
244+
<scope>test</scope>
245+
</dependency>
246+
247+
239248
</dependencies>
240249

241250
<dependencyManagement>

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
21+
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
22+
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
2123

2224
import com.rabbitmq.client.amqp.*;
2325
import com.rabbitmq.client.amqp.ObservationCollector;
@@ -72,7 +74,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
7274
private final ConnectionSettings.AffinityStrategy affinityStrategy;
7375
private final String name;
7476
private final Lock instanceLock = new ReentrantLock();
75-
private final boolean filterExpressionsSupported;
77+
private final boolean filterExpressionsSupported, setTokenSupported;
7678
private volatile ExecutorService dispatchingExecutorService;
7779

7880
AmqpConnection(AmqpConnectionBuilder builder) {
@@ -128,8 +130,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
128130
ConnectionUtils.NO_RETRY_STRATEGY,
129131
this.name());
130132
this.sync(ncw);
131-
this.filterExpressionsSupported =
132-
Utils.supportFilterExpressions(brokerVersion(this.nativeConnection));
133+
String brokerVesion = brokerVersion(this.nativeConnection);
134+
this.filterExpressionsSupported = supportFilterExpressions(brokerVesion);
135+
this.setTokenSupported = supportSetToken(brokerVesion);
133136
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
134137
this.state(OPEN);
135138
this.environment.metricsCollector().openConnection();
@@ -331,6 +334,7 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
331334
"Not recovering connection '{}' for error {}",
332335
this.name(),
333336
event.failureCause().getMessage());
337+
close(ExceptionUtils.convert(ioex));
334338
}
335339
};
336340

@@ -707,6 +711,10 @@ boolean filterExpressionsSupported() {
707711
return this.filterExpressionsSupported;
708712
}
709713

714+
boolean setTokenSupported() {
715+
return this.setTokenSupported;
716+
}
717+
710718
long id() {
711719
return this.id;
712720
}
@@ -730,10 +738,10 @@ private void close(Throwable cause) {
730738
rpcServer.close();
731739
}
732740
for (AmqpPublisher publisher : this.publishers) {
733-
publisher.close();
741+
publisher.close(cause);
734742
}
735743
for (AmqpConsumer consumer : this.consumers) {
736-
consumer.close();
744+
consumer.close(cause);
737745
}
738746
try {
739747
this.dispatchingExecutorService.shutdownNow();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
2121
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
22+
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.*;
2223
import static java.time.Duration.ofSeconds;
2324
import static java.util.Optional.ofNullable;
2425

@@ -71,7 +72,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7172
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
7273
private final ExecutorService dispatchingExecutorService;
7374
private final java.util.function.Consumer<Delivery> nativeHandler;
74-
private final java.util.function.Consumer<ClientException> nativeReceiverCloseHandler;
75+
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
7576
// native receiver internal state, accessed only in the native executor/scheduler
7677
private ProtonReceiver protonReceiver;
7778
private volatile Scheduler protonExecutor;
@@ -101,7 +102,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
101102

102103
this.dispatchingExecutorService = connection.dispatchingExecutorService();
103104
this.nativeHandler = createNativeHandler(messageHandler);
104-
this.nativeReceiverCloseHandler =
105+
this.nativeCloseHandler =
105106
e ->
106107
this.dispatchingExecutorService.submit(
107108
() -> {
@@ -116,7 +117,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
116117
this.filters,
117118
this.subscriptionListener,
118119
this.nativeHandler,
119-
this.nativeReceiverCloseHandler);
120+
this.nativeCloseHandler);
120121
this.initStateFromNativeReceiver(this.nativeReceiver);
121122
this.metricsCollector = this.connection.metricsCollector();
122123
try {
@@ -277,8 +278,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
277278
messageHandler.handle(context, message);
278279
}
279280
}
280-
} catch (ClientLinkRemotelyClosedException e) {
281-
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
281+
} catch (ClientLinkRemotelyClosedException | ClientSessionRemotelyClosedException e) {
282+
if (notFound(e) || resourceDeleted(e) || unauthorizedAccess(e)) {
282283
this.close(ExceptionUtils.convert(e));
283284
}
284285
} catch (ClientConnectionRemotelyClosedException e) {
@@ -304,7 +305,7 @@ void recoverAfterConnectionFailure() {
304305
this.filters,
305306
this.subscriptionListener,
306307
this.nativeHandler,
307-
this.nativeReceiverCloseHandler),
308+
this.nativeCloseHandler),
308309
e -> {
309310
boolean shouldRetry =
310311
e instanceof AmqpException.AmqpResourceClosedException
@@ -327,7 +328,7 @@ void recoverAfterConnectionFailure() {
327328
}
328329
}
329330

330-
private void close(Throwable cause) {
331+
void close(Throwable cause) {
331332
if (this.closed.compareAndSet(false, true)) {
332333
this.state(CLOSING, cause);
333334
this.connection.removeConsumer(this);
@@ -533,13 +534,6 @@ private void handleException(Exception ex, String operation) {
533534
}
534535

535536
private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
536-
if (ex instanceof ClientLinkRemotelyClosedException) {
537-
ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException) ex;
538-
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
539-
consumer.close(ExceptionUtils.convert(e));
540-
return true;
541-
}
542-
}
543-
return false;
537+
return ExceptionUtils.maybeCloseConsumerOnException(consumer::close, ex);
544538
}
545539
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.rabbitmq.client.amqp.AmqpException;
2424
import com.rabbitmq.client.amqp.Management;
25+
import java.nio.charset.StandardCharsets;
2526
import java.time.Duration;
2627
import java.util.*;
2728
import java.util.concurrent.*;
@@ -38,6 +39,7 @@
3839
import org.apache.qpid.protonj2.client.exceptions.ClientException;
3940
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
4041
import org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException;
42+
import org.apache.qpid.protonj2.types.Binary;
4143
import org.slf4j.Logger;
4244
import org.slf4j.LoggerFactory;
4345

@@ -169,6 +171,27 @@ public UnbindSpecification unbind() {
169171
return new AmqpBindingManagement.AmqpUnbindSpecification(this);
170172
}
171173

174+
void setToken(String token) {
175+
if (!this.connection.setTokenSupported()) {
176+
throw new UnsupportedOperationException("Token renewal requires at least RabbitMQ 4.1.0");
177+
}
178+
checkAvailable();
179+
UUID requestId = messageId();
180+
try {
181+
Message<?> request =
182+
Message.create(new Binary(token.getBytes(StandardCharsets.UTF_8)))
183+
.to("/auth/tokens")
184+
.subject("PUT");
185+
186+
OutstandingRequest outstandingRequest = this.request(request, requestId);
187+
outstandingRequest.block();
188+
189+
checkResponse(outstandingRequest, requestId, 204);
190+
} catch (ClientException e) {
191+
throw new AmqpException("Error on set-token operation", e);
192+
}
193+
}
194+
172195
@Override
173196
public void close() {
174197
if (this.initializing) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.*;
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.function.Consumer;
3031
import java.util.function.Function;
3132
import org.apache.qpid.protonj2.client.*;
3233
import org.apache.qpid.protonj2.client.exceptions.ClientException;
@@ -53,6 +54,8 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
5354
private final Duration publishTimeout;
5455
private final SessionHandler sessionHandler;
5556
private volatile ObservationCollector.ConnectionInfo connectionInfo;
57+
private final ExecutorService dispatchingExecutorService;
58+
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
5659

5760
AmqpPublisher(AmqpPublisherBuilder builder) {
5861
super(builder.listeners());
@@ -63,7 +66,17 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
6366
this.connection = builder.connection();
6467
this.publishTimeout = builder.publishTimeout();
6568
this.sessionHandler = this.connection.createSessionHandler();
66-
this.sender = this.createSender(sessionHandler.session(), this.address, this.publishTimeout);
69+
this.dispatchingExecutorService = connection.dispatchingExecutorService();
70+
this.nativeCloseHandler =
71+
e ->
72+
this.dispatchingExecutorService.submit(
73+
() -> {
74+
// get result to make spotbugs happy
75+
boolean ignored = maybeCloseConsumerOnException(this, e);
76+
});
77+
this.sender =
78+
this.createSender(
79+
sessionHandler.session(), this.address, this.publishTimeout, this.nativeCloseHandler);
6780
this.metricsCollector = this.connection.metricsCollector();
6881
this.observationCollector = this.connection.observationCollector();
6982
this.state(OPEN);
@@ -154,7 +167,11 @@ private static MetricsCollector.PublishDisposition mapToPublishDisposition(Statu
154167
void recoverAfterConnectionFailure() {
155168
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
156169
this.sender =
157-
this.createSender(this.sessionHandler.sessionNoCheck(), this.address, this.publishTimeout);
170+
this.createSender(
171+
this.sessionHandler.sessionNoCheck(),
172+
this.address,
173+
this.publishTimeout,
174+
this.nativeCloseHandler);
158175
}
159176

160177
@Override
@@ -164,14 +181,19 @@ public void close() {
164181

165182
// internal API
166183

167-
private Sender createSender(Session session, String address, Duration publishTimeout) {
184+
private Sender createSender(
185+
Session session,
186+
String address,
187+
Duration publishTimeout,
188+
Consumer<ClientException> nativeCloseHandler) {
168189
SenderOptions senderOptions =
169190
new SenderOptions()
170191
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
171192
.sendTimeout(
172193
publishTimeout.isNegative()
173194
? ConnectionOptions.INFINITE
174-
: publishTimeout.toMillis());
195+
: publishTimeout.toMillis())
196+
.closeHandler(nativeCloseHandler);
175197
try {
176198
Sender s =
177199
address == null
@@ -183,7 +205,7 @@ private Sender createSender(Session session, String address, Duration publishTim
183205
}
184206
}
185207

186-
private void close(Throwable cause) {
208+
void close(Throwable cause) {
187209
if (this.closed.compareAndSet(false, true)) {
188210
this.state(State.CLOSING, cause);
189211
this.connection.removePublisher(this);
@@ -198,6 +220,10 @@ private void close(Throwable cause) {
198220
}
199221
}
200222

223+
private static boolean maybeCloseConsumerOnException(AmqpPublisher publisher, Exception ex) {
224+
return ExceptionUtils.maybeCloseConsumerOnException(publisher::close, ex);
225+
}
226+
201227
private static class DefaultContext implements Publisher.Context {
202228

203229
private final Message message;

0 commit comments

Comments
 (0)