Skip to content

Commit

Permalink
More functions for Tanzu RabbitMQ SDS support
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jan 25, 2025
1 parent 2457caf commit 1ad265e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 2 deletions.
59 changes: 58 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use std::fmt;

use crate::error::Error;
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
use crate::requests::EmptyPayload;
use crate::responses::{
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
GetMessage, OAuthConfiguration,
GetMessage, OAuthConfiguration, SchemaDefinitionSyncStatus,
};
use crate::{
commons::{BindingDestinationType, SupportedProtocol, UserLimitTarget, VirtualHostLimitTarget},
Expand Down Expand Up @@ -1401,6 +1402,62 @@ where
Ok(response)
}

//
// Schema Definition Sync (Tanzu RabbitMQ)
//

pub async fn schema_definition_sync_status(
&self,
node: Option<&str>,
) -> Result<SchemaDefinitionSyncStatus> {
let response = match node {
Some(val) => {
self.http_get(path!("tanzu", "osr", "schema", "status", val), None, None)
.await?
}
None => self.http_get("tanzu/osr/schema/status", None, None).await?,
};
let response = response.json().await?;

Ok(response)
}

pub async fn enable_schema_definition_sync(&self, node: Option<&str>) -> Result<()> {
let payload = EmptyPayload::new();
let _ = match node {
Some(val) => {
self.http_put(
path!("tanzu", "osr", "schema", "enable", val),
&payload,
None,
None,
)
.await?
}
None => {
self.http_put("tanzu/osr/schema/enable", &payload, None, None)
.await?
}
};

Ok(())
}

pub async fn disable_schema_definition_sync(&self, node: Option<&str>) -> Result<()> {
let _ = match node {
Some(val) => {
self.http_delete(path!("tanzu", "osr", "schema", "disable", val), None, None)
.await?
}
None => {
self.http_delete("tanzu/osr/schema/disable", None, None)
.await?
}
};

Ok(())
}

//
// Implementation
//
Expand Down
21 changes: 20 additions & 1 deletion src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use crate::error::Error;
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
use crate::requests::EmptyPayload;
use crate::responses::{
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
GetMessage, OAuthConfiguration,
Expand Down Expand Up @@ -1257,6 +1258,24 @@ where
Ok(response)
}

pub fn enable_schema_definition_sync(&self, node: &str) -> Result<()> {
let payload = EmptyPayload::new();
self.http_put(
path!("tanzu", "osr", "schema", "enable", node),
&payload,
None,
None,
)?;

Ok(())
}

pub fn disable_schema_definition_sync(&self, node: &str) -> Result<()> {
self.http_delete(path!("tanzu", "osr", "schema", "disable", node), None, None)?;

Ok(())
}

//
// Implementation
//
Expand Down Expand Up @@ -1284,7 +1303,7 @@ where
vhost: &str,
exchange: &str,
vertex: BindindVertex,
) -> Result<Vec<responses::BindingInfo>> {
) -> Result<Vec<BindingInfo>> {
let response = self.http_get(
path!("exchanges", vhost, exchange, "bindings", vertex),
None,
Expand Down
9 changes: 9 additions & 0 deletions src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,12 @@ pub struct Permissions<'a> {
}

pub type MessageProperties = Map<String, Value>;

#[derive(Serialize, Default)]
pub struct EmptyPayload;

impl EmptyPayload {
pub fn new() -> Self {
Self
}
}

0 comments on commit 1ad265e

Please sign in to comment.