Skip to content

Commit 5470f22

Browse files
committed
fix(transport): close async subscriptions on ingress stop
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent 65cea51 commit 5470f22

File tree

6 files changed

+54
-15
lines changed

6 files changed

+54
-15
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/transport-quic/tests/loopback.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async fn loopback() -> anyhow::Result<()> {
5252
.shutdown()
5353
.await
5454
.context("failed to shutdown stream")?;
55+
drop(outgoing);
5556
info!("wrote `bar`");
5657
anyhow::Ok(())
5758
},
@@ -65,6 +66,7 @@ async fn loopback() -> anyhow::Result<()> {
6566
.shutdown()
6667
.await
6768
.context("failed to shutdown stream")?;
69+
drop(nested_tx);
6870
info!("wrote `client->server`");
6971
anyhow::Ok(())
7072
},
@@ -125,6 +127,7 @@ async fn loopback() -> anyhow::Result<()> {
125127
.shutdown()
126128
.await
127129
.context("failed to shutdown stream")?;
130+
drop(outgoing);
128131
info!("wrote `foo`");
129132
anyhow::Ok(())
130133
},
@@ -138,6 +141,7 @@ async fn loopback() -> anyhow::Result<()> {
138141
.shutdown()
139142
.await
140143
.context("failed to shutdown stream")?;
144+
drop(nested_tx);
141145
info!("wrote `server->client`");
142146
anyhow::Ok(())
143147
},

crates/transport/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport"
3-
version = "0.28.2"
3+
version = "0.28.3"
44
description = "wRPC core transport functionality"
55

66
authors.workspace = true

crates/transport/src/frame/conn/client.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,17 @@ where
5252
results_io.spawn({
5353
let index = Arc::clone(&index);
5454
async move {
55-
if let Err(err) = ingress(rx, index, results_tx).await {
55+
if let Err(err) = ingress(rx, &index, results_tx).await {
5656
error!(?err, "result ingress failed");
5757
} else {
5858
debug!("result ingress successfully complete");
5959
}
60+
let Ok(mut index) = index.lock() else {
61+
error!("failed to lock index trie");
62+
return;
63+
};
64+
trace!("shutting down index trie");
65+
index.close_tx();
6066
}
6167
.in_current_span()
6268
});

crates/transport/src/frame/conn/mod.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ pub use client::*;
2929
pub use server::*;
3030

3131
/// Index trie containing async stream subscriptions
32-
#[derive(Default)]
32+
#[derive(Debug, Default)]
3333
enum IndexTrie {
3434
#[default]
3535
Empty,
3636
Leaf {
37-
tx: mpsc::Sender<std::io::Result<Bytes>>,
37+
tx: Option<mpsc::Sender<std::io::Result<Bytes>>>,
3838
rx: Option<mpsc::Receiver<std::io::Result<Bytes>>>,
3939
},
4040
IndexNode {
@@ -65,7 +65,7 @@ impl<'a>
6565
),
6666
) -> Self {
6767
match path {
68-
[] => Self::Leaf { tx, rx },
68+
[] => Self::Leaf { tx: Some(tx), rx },
6969
[None, path @ ..] => Self::WildcardNode {
7070
tx: None,
7171
rx: None,
@@ -165,7 +165,7 @@ impl IndexTrie {
165165
let Some((i, path)) = path.split_first() else {
166166
return match self {
167167
Self::Empty => None,
168-
Self::Leaf { tx, .. } => Some(tx.clone()),
168+
Self::Leaf { tx, .. } => tx.clone(),
169169
Self::IndexNode { tx, .. } | Self::WildcardNode { tx, .. } => tx.clone(),
170170
};
171171
};
@@ -182,6 +182,33 @@ impl IndexTrie {
182182
}
183183
}
184184

185+
/// Closes all senders in the trie
186+
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
187+
fn close_tx(&mut self) {
188+
match self {
189+
Self::Empty => {}
190+
Self::Leaf { tx, .. } => {
191+
mem::take(tx);
192+
}
193+
Self::IndexNode {
194+
tx, ref mut nested, ..
195+
} => {
196+
mem::take(tx);
197+
for nested in nested.iter_mut().flatten() {
198+
nested.close_tx();
199+
}
200+
}
201+
Self::WildcardNode {
202+
tx, ref mut nested, ..
203+
} => {
204+
mem::take(tx);
205+
if let Some(nested) = nested {
206+
nested.close_tx();
207+
}
208+
}
209+
}
210+
}
211+
185212
/// Inserts `sender` and `receiver` under a `path` - returns `false` if it failed and `true` if it succeeded.
186213
/// Tree state after `false` is returned is undefined
187214
#[instrument(level = "trace", skip(self, sender, receiver), ret(level = "trace"))]
@@ -208,14 +235,10 @@ impl IndexTrie {
208235
let mut nested = Vec::with_capacity(n);
209236
nested.resize_with(n, Option::default);
210237
nested[*i] = Some(Self::from((path, sender, receiver)));
211-
*self = Self::IndexNode {
212-
tx: Some(tx),
213-
rx,
214-
nested,
215-
};
238+
*self = Self::IndexNode { tx, rx, nested };
216239
} else {
217240
*self = Self::WildcardNode {
218-
tx: Some(tx),
241+
tx,
219242
rx,
220243
nested: Some(Box::new(Self::from((path, sender, receiver)))),
221244
};
@@ -405,7 +428,7 @@ impl AsyncWrite for Outgoing {
405428
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
406429
async fn ingress(
407430
mut rx: impl AsyncRead + Unpin,
408-
index: Arc<std::sync::Mutex<IndexTrie>>,
431+
index: &std::sync::Mutex<IndexTrie>,
409432
param_tx: mpsc::Sender<std::io::Result<Bytes>>,
410433
) -> std::io::Result<()> {
411434
loop {

crates/transport/src/frame/conn/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,17 @@ where
149149
params_io.spawn({
150150
let index = Arc::clone(&index);
151151
async move {
152-
if let Err(err) = ingress(rx, index, params_tx).await {
152+
if let Err(err) = ingress(rx, &index, params_tx).await {
153153
error!(?err, "parameter ingress failed");
154154
} else {
155155
debug!("parameter ingress successfully complete");
156156
}
157+
let Ok(mut index) = index.lock() else {
158+
error!("failed to lock index trie");
159+
return;
160+
};
161+
trace!("shutting down index trie");
162+
index.close_tx();
157163
}
158164
.in_current_span()
159165
});

0 commit comments

Comments
 (0)