Skip to content

Commit

Permalink
Introduce Client#close_user_connections
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Feb 3, 2025
1 parent e433c8d commit 7a3bb33
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

## v0.20.0 (in development)

No (documented) changes yet.
### Enhancements

* `Client#.close_user_connections` is a new function that closes all connections
that authenticated with a specific username


## v0.19.0 (Feb 1, 2025)
Expand Down
26 changes: 26 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,32 @@ where
Ok(())
}

pub async fn close_user_connections(&self, username: &str, reason: Option<&str>) -> Result<()> {
match reason {
None => {
self.http_delete(
path!("connections", "username", username),
Some(StatusCode::NOT_FOUND),
None,
)
.await?
}
Some(value) => {
let mut headers = HeaderMap::new();
let hdr = HeaderValue::from_str(value)?;
headers.insert("X-Reason", hdr);
self.http_delete_with_headers(
path!("connections", "username", username),
headers,
None,
None,
)
.await?
}
};
Ok(())
}

/// Lists all connections in the given virtual host.
pub async fn list_connections_in(
&self,
Expand Down
22 changes: 22 additions & 0 deletions src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,28 @@ where
Ok(())
}

pub fn close_user_connections(&self, username: &str, reason: Option<&str>) -> Result<()> {
match reason {
None => self.http_delete(
path!("connections", "username", username),
Some(StatusCode::NOT_FOUND),
None,
)?,
Some(value) => {
let mut headers = HeaderMap::new();
let hdr = HeaderValue::from_str(value)?;
headers.insert("X-Reason", hdr);
self.http_delete_with_headers(
path!("connections", "username", username),
headers,
None,
None,
)?
}
};
Ok(())
}

/// Lists all connections in the given virtual host.
pub fn list_connections_in(&self, virtual_host: &str) -> Result<Vec<responses::Connection>> {
let response = self.http_get(path!("vhosts", virtual_host, "connections"), None, None)?;
Expand Down
26 changes: 26 additions & 0 deletions tests/async_connection_tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Duration;
// Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -93,3 +94,28 @@ async fn test_async_list_virtual_host_stream_connections() {
result1
);
}

#[tokio::test]
async fn test_async_close_user_connections() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let args = OpenConnectionArguments::new(&hostname(), 5672, USERNAME, PASSWORD);
let conn = Connection::open(&args).await.unwrap();
assert!(conn.is_open());

let result1 = rc
.close_user_connections(
USERNAME,
Some("closed in test_async_close_user_connections"),
)
.await;
assert!(
result1.is_ok(),
"close_user_connections returned {:?}",
result1
);

tokio::time::sleep(Duration::from_millis(50)).await;
assert!(!conn.is_open());
}

0 comments on commit 7a3bb33

Please sign in to comment.