Skip to content

Commit b295a21

Browse files
committed
feat: add pending request channel size configuration to ConnectionConfig
1 parent a3faa6f commit b295a21

File tree

1 file changed

+42
-13
lines changed

1 file changed

+42
-13
lines changed

scylla/src/network/connection.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ pub(crate) struct ConnectionConfig {
287287
pub(crate) authenticator: Option<Arc<dyn AuthenticatorProvider>>,
288288
pub(crate) address_translator: Option<Arc<dyn AddressTranslator>>,
289289
pub(crate) write_coalescing_delay: Option<WriteCoalescingDelay>,
290+
pub(crate) pending_request_channel_size: Option<usize>,
290291

291292
pub(crate) keepalive_interval: Option<Duration>,
292293
pub(crate) keepalive_timeout: Option<Duration>,
@@ -322,17 +323,44 @@ impl ConnectionConfig {
322323
authenticator: self.authenticator.clone(),
323324
address_translator: self.address_translator.clone(),
324325
write_coalescing_delay: self.write_coalescing_delay.clone(),
326+
pending_request_channel_size: self.pending_request_channel_size,
327+
325328
keepalive_interval: self.keepalive_interval,
326329
keepalive_timeout: self.keepalive_timeout,
327330
tablet_sender: self.tablet_sender.clone(),
328331
identity: self.identity.clone(),
329332
}
330333
}
334+
335+
/// Set the size of the pending request channel for each connection.
336+
///
337+
/// # Arguments
338+
///
339+
/// * `size` - The maximum number of pending requests per connection.
340+
///
341+
/// # Notes
342+
///
343+
/// - This is different from cpp-driver's implementation, which uses a per-RequestProcessor queue.
344+
/// - The default is 2048, a balanced value between performance and memory usage.
345+
/// - Adjust based on your specific workload and system resources.
346+
///
347+
/// # Example
348+
///
349+
/// ```
350+
/// let session = SessionBuilder::new()
351+
/// .connection_config(
352+
/// ConnectionConfig::new()
353+
/// .with_pending_request_channel_size(4096)
354+
/// )
355+
/// .build()
356+
/// .await?;
357+
/// ```
358+
pub fn with_pending_request_channel_size(mut self, size: usize) -> Self {
359+
self.pending_request_channel_size = Some(size);
360+
self
361+
}
331362
}
332363

333-
/// Configuration used for new connections, customized for a specific endpoint.
334-
///
335-
/// Created from [ConnectionConfig] using [ConnectionConfig::to_host_connection_config].
336364
#[derive(Clone)]
337365
pub(crate) struct HostConnectionConfig {
338366
pub(crate) local_ip_address: Option<IpAddr>,
@@ -349,6 +377,7 @@ pub(crate) struct HostConnectionConfig {
349377
pub(crate) authenticator: Option<Arc<dyn AuthenticatorProvider>>,
350378
pub(crate) address_translator: Option<Arc<dyn AddressTranslator>>,
351379
pub(crate) write_coalescing_delay: Option<WriteCoalescingDelay>,
380+
pub(crate) pending_request_channel_size: Option<usize>,
352381

353382
pub(crate) keepalive_interval: Option<Duration>,
354383
pub(crate) keepalive_timeout: Option<Duration>,
@@ -357,6 +386,13 @@ pub(crate) struct HostConnectionConfig {
357386
pub(crate) identity: SelfIdentity<'static>,
358387
}
359388

389+
#[cfg(test)]
390+
impl HostConnectionConfig {
391+
fn is_tls(&self) -> bool {
392+
self.tls_config.is_some()
393+
}
394+
}
395+
360396
#[cfg(test)]
361397
impl Default for HostConnectionConfig {
362398
fn default() -> Self {
@@ -382,6 +418,7 @@ impl Default for HostConnectionConfig {
382418
tablet_sender: None,
383419

384420
identity: SelfIdentity::default(),
421+
pending_request_channel_size: Some(2048),
385422
}
386423
}
387424
}
@@ -411,19 +448,11 @@ impl Default for ConnectionConfig {
411448
tablet_sender: None,
412449

413450
identity: SelfIdentity::default(),
451+
pending_request_channel_size: Some(2048),
414452
}
415453
}
416454
}
417455

418-
impl HostConnectionConfig {
419-
fn is_tls(&self) -> bool {
420-
self.tls_config.is_some()
421-
}
422-
}
423-
424-
// Used to listen for fatal error in connection
425-
pub(crate) type ErrorReceiver = tokio::sync::oneshot::Receiver<ConnectionError>;
426-
427456
impl Connection {
428457
// Returns new connection and ErrorReceiver which can be used to wait for a fatal error
429458
/// Opens a connection and makes it ready to send/receive CQL frames on it,
@@ -451,7 +480,7 @@ impl Connection {
451480
}
452481

453482
// TODO: What should be the size of the channel?
454-
let (sender, receiver) = mpsc::channel(1024);
483+
let (sender, receiver) = mpsc::channel(config.pending_request_channel_size.unwrap_or(1024));
455484
let (error_sender, error_receiver) = tokio::sync::oneshot::channel();
456485
// Unbounded because it allows for synchronous pushes
457486
let (orphan_notification_sender, orphan_notification_receiver) = mpsc::unbounded_channel();

0 commit comments

Comments
 (0)