Skip to content

Commit

Permalink
Convenience functions for working with streams
Browse files Browse the repository at this point in the history
Note that list_streams is not one of them
because RabbitMQ HTTP API at the moment
does not support listing specifically
streams.
  • Loading branch information
michaelklishin committed Feb 1, 2025
1 parent e98e3e4 commit ab50450
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
this metric wont' be available for, say, RabbitMQ Stream Protocol
connections

### Enhancements

* New convenience functions for working with streams: `Client#get_stream_info`, `Client#delete_stream`

* `Client#declare_stream` and `requests::StreamParams` for convenient
stream declaration


## v0.17.0 (Jan 27, 2025)

Expand Down
36 changes: 35 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;

use crate::error::Error;
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
use crate::requests::EmptyPayload;
use crate::requests::{EmptyPayload, StreamParams};
use crate::responses::{
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
GetMessage, OAuthConfiguration, SchemaDefinitionSyncStatus, WarmStandbyReplicationStatus,
Expand Down Expand Up @@ -618,6 +618,15 @@ where
Ok(response)
}

/// Returns information about a stream.
pub async fn get_stream_info(
&self,
virtual_host: &str,
name: &str,
) -> Result<responses::QueueInfo> {
self.get_queue_info(virtual_host, name).await
}

/// Returns information about an exchange.
pub async fn get_exchange_info(
&self,
Expand Down Expand Up @@ -685,6 +694,27 @@ where
Ok(())
}

pub async fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> {
let mut m: Map<String, Value> = Map::new();

if let Some(m2) = params.arguments.clone() {
m.extend(m2);
};

if let Some(val) = params.max_length_bytes {
m.insert("max_length_bytes".to_owned(), json!(val));
};
if let Some(val) = params.max_segment_length_bytes {
m.insert("max_segment_length_bytes".to_owned(), json!(val));
};

let q_params = QueueParams::new_stream(params.name, Some(m));
let _response = self
.http_put(path!("queues", vhost, params.name), &q_params, None, None)
.await?;
Ok(())
}

pub async fn declare_exchange(&self, vhost: &str, params: &ExchangeParams<'_>) -> Result<()> {
let _response = self
.http_put(path!("exchanges", vhost, params.name), params, None, None)
Expand Down Expand Up @@ -807,6 +837,10 @@ where
Ok(())
}

pub async fn delete_stream(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
self.delete_queue(vhost, name, idempotently).await
}

pub async fn delete_exchange(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
let excludes = if idempotently {
Some(StatusCode::NOT_FOUND)
Expand Down
31 changes: 30 additions & 1 deletion src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use crate::error::Error;
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
use crate::requests::EmptyPayload;
use crate::requests::{EmptyPayload, StreamParams};
use crate::responses::{
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
GetMessage, OAuthConfiguration, WarmStandbyReplicationStatus,
Expand Down Expand Up @@ -558,6 +558,11 @@ where
Ok(response)
}

/// Returns information about a stream.
pub fn get_stream_info(&self, virtual_host: &str, name: &str) -> Result<responses::QueueInfo> {
self.get_queue_info(virtual_host, name)
}

/// Returns information about an exchange.
pub fn get_exchange_info(
&self,
Expand Down Expand Up @@ -613,6 +618,26 @@ where
Ok(())
}

pub fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> {
let mut m: Map<String, Value> = Map::new();

if let Some(m2) = params.arguments.clone() {
m.extend(m2);
};

if let Some(val) = params.max_length_bytes {
m.insert("max_length_bytes".to_owned(), json!(val));
};
if let Some(val) = params.max_segment_length_bytes {
m.insert("max_segment_length_bytes".to_owned(), json!(val));
};

let q_params = QueueParams::new_stream(params.name, Some(m));
let _response =
self.http_put(path!("queues", vhost, params.name), &q_params, None, None)?;
Ok(())
}

pub fn declare_exchange(&self, vhost: &str, params: &ExchangeParams) -> Result<()> {
let _response =
self.http_put(path!("exchanges", vhost, params.name), params, None, None)?;
Expand Down Expand Up @@ -715,6 +740,10 @@ where
Ok(())
}

pub fn delete_stream(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
self.delete_queue(vhost, name, idempotently)
}

pub fn delete_exchange(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
let excludes = if idempotently {
Some(StatusCode::NOT_FOUND)
Expand Down
45 changes: 43 additions & 2 deletions src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ pub struct UserParams<'a> {

pub type XArguments = Option<Map<String, Value>>;

/// [Queue](https://rabbitmq.com/docs/queues/) properties used at queue declaration time
#[derive(Serialize)]
/// [Queue](https://rabbitmq.com/docs/queues/) properties used at declaration time.
/// Prefer constructor functions, they correctly put [`QueueType`] to the optional
/// argument map.
#[derive(Serialize, Debug)]
pub struct QueueParams<'a> {
/// The name of the queue to declare.
/// Must be no longer than 255 bytes in length.
Expand Down Expand Up @@ -166,6 +168,45 @@ impl<'a> QueueParams<'a> {
}
}

/// [Stream](https://rabbitmq.com/docs/streams/) properties used at declaration time
#[derive(Serialize, Debug)]
pub struct StreamParams<'a> {
/// The name of the stream to declare.
/// Must be no longer than 255 bytes in length.
pub name: &'a str,
pub expiration: &'a str,
pub max_length_bytes: Option<u64>,
pub max_segment_length_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: XArguments,
}

impl<'a> StreamParams<'a> {
pub fn new(name: &'a str, expiration: &'a str) -> Self {
Self {
name,
expiration,
max_length_bytes: None,
max_segment_length_bytes: None,
arguments: None,
}
}

pub fn with_expiration_and_length_limit(
name: &'a str,
expiration: &'a str,
max_length_bytes: u64,
) -> Self {
Self {
name,
expiration,
max_length_bytes: Some(max_length_bytes),
max_segment_length_bytes: None,
arguments: None,
}
}
}

/// Exchange properties used at queue declaration time
#[derive(Debug, Serialize)]
pub struct ExchangeParams<'a> {
Expand Down
2 changes: 1 addition & 1 deletion tests/async_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn test_async_declare_a_quorum_queue() {
}

#[tokio::test]
async fn test_async_declare_a_stream() {
async fn test_async_declare_a_stream_with_declare_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
Expand Down
71 changes: 71 additions & 0 deletions tests/async_stream_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rabbitmq_http_client::{api::Client, requests::StreamParams};
use serde_json::{json, Map, Value};

mod test_helpers;
use crate::test_helpers::{endpoint, PASSWORD, USERNAME};

#[tokio::test]
async fn test_async_declare_stream() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.async.stream.279287349823479";
let expiration = "24h";

let _ = rc.delete_stream(vhost, name, false).await;

let result1 = rc.get_stream_info(vhost, name).await;
assert!(result1.is_err());

let mut map = Map::<String, Value>::new();
map.insert("x-initial-cluster-size".to_owned(), json!(3));
let optional_args = Some(map);

let params = StreamParams {
name,
expiration,
max_length_bytes: None,
max_segment_length_bytes: None,
arguments: optional_args,
};

let result2 = rc.declare_stream(vhost, &params).await;
assert!(result2.is_ok(), "declare_stream returned {:?}", result2);

let _ = rc.delete_stream(vhost, name, false).await;
}

#[tokio::test]
async fn test_async_delete_stream() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.async.stream.67816479475";

let _ = rc.delete_stream(vhost, name, false).await;

let result1 = rc.get_stream_info(vhost, name).await;
assert!(result1.is_err());

let params = StreamParams::new(name, "7D");

let result2 = rc.declare_stream(vhost, &params).await;
assert!(result2.is_ok(), "declare_stream returned {:?}", result2);

rc.delete_stream(vhost, name, false).await.unwrap();
let result3 = rc.get_stream_info(vhost, name).await;
assert!(result3.is_err());
}
2 changes: 1 addition & 1 deletion tests/blocking_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn test_blocking_declare_a_quorum_queue() {
}

#[test]
fn test_blocking_declare_a_stream() {
fn test_blocking_declare_a_stream_with_declare_queue() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
Expand Down
71 changes: 71 additions & 0 deletions tests/blocking_stream_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) 2023-2025 RabbitMQ Core Team ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rabbitmq_http_client::{blocking_api::Client, requests::StreamParams};
use serde_json::{json, Map, Value};

mod test_helpers;
use crate::test_helpers::{endpoint, PASSWORD, USERNAME};

#[test]
fn test_blocking_declare_stream() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.blocking.stream.792879823479";
let expiration = "24h";

let _ = rc.delete_stream(vhost, name, false);

let result1 = rc.get_stream_info(vhost, name);
assert!(result1.is_err());

let mut map = Map::<String, Value>::new();
map.insert("x-initial-cluster-size".to_owned(), json!(3));
let optional_args = Some(map);

let params = StreamParams {
name,
expiration,
max_length_bytes: None,
max_segment_length_bytes: None,
arguments: optional_args,
};

let result2 = rc.declare_stream(vhost, &params);
assert!(result2.is_ok(), "declare_stream returned {:?}", result2);

let _ = rc.delete_stream(vhost, name, false);
}

#[test]
fn test_blocking_delete_stream() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
let vhost = "/";
let name = "rust.tests.blocking.stream.67816479475";

let _ = rc.delete_stream(vhost, name, false);

let result1 = rc.get_stream_info(vhost, name);
assert!(result1.is_err());

let params = StreamParams::new(name, "7D");

let result2 = rc.declare_stream(vhost, &params);
assert!(result2.is_ok(), "declare_stream returned {:?}", result2);

rc.delete_stream(vhost, name, false).unwrap();
let result3 = rc.get_stream_info(vhost, name);
assert!(result3.is_err());
}

0 comments on commit ab50450

Please sign in to comment.