Skip to content

tests: Speculative retry #1251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: branch-hackathon
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions scylla-proxy/src/actions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt, sync::Arc, time::Duration};

use bytes::Bytes;
use rand::{Rng, RngCore};
use std::fmt::Debug;
use std::{fmt, sync::Arc, time::Duration};
use tokio::sync::mpsc;

#[cfg(test)]
Expand Down Expand Up @@ -51,14 +51,41 @@ pub enum Condition {

// True if any REGISTER was sent on this connection. Useful to filter out control connection messages.
ConnectionRegisteredAnyEvent,

// A custom condition handler, allows you to customize frame matching
CustomCondition(ConditionHandler),
}

pub struct ConditionHandler(Arc<dyn Fn(&EvaluationContext) -> bool + Send + Sync>);

impl ConditionHandler {
pub fn new(handler: Arc<dyn Fn(&EvaluationContext) -> bool + Send + Sync>) -> Self {
ConditionHandler(handler)
}

fn execute(&mut self, ctx: &EvaluationContext) -> bool {
self.0(ctx)
}
}

impl Debug for ConditionHandler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ConditionHandler")
}
}

impl Clone for ConditionHandler {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

/// The context in which [`Conditions`](Condition) are evaluated.
pub(crate) struct EvaluationContext {
pub(crate) connection_seq_no: usize,
pub(crate) connection_has_events: bool,
pub(crate) opcode: FrameOpcode,
pub(crate) frame_body: Bytes,
pub struct EvaluationContext {
pub connection_seq_no: usize,
pub connection_has_events: bool,
pub opcode: FrameOpcode,
pub frame_body: Bytes,
}

impl Condition {
Expand Down Expand Up @@ -116,7 +143,11 @@ impl Condition {
val
},

Condition::ConnectionRegisteredAnyEvent => ctx.connection_has_events
Condition::ConnectionRegisteredAnyEvent => ctx.connection_has_events,

Condition::CustomCondition(cb) => {
cb.execute(ctx)
}
}
}

Expand All @@ -135,6 +166,11 @@ impl Condition {
pub fn or(self, c2: Self) -> Self {
Self::Or(Box::new(self), Box::new(c2))
}

/// A convenience function for creating [Condition::CustomCondition] variant.
pub fn custom(handler_fn: fn(&EvaluationContext) -> bool) -> Self {
Self::CustomCondition(ConditionHandler::new(Arc::new(handler_fn)))
}
}

/// Just a trait to unify API of both [RequestReaction] and [ResponseReaction].
Expand Down
2 changes: 1 addition & 1 deletion scylla-proxy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) enum FrameType {
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(crate) enum FrameOpcode {
pub enum FrameOpcode {
Request(RequestOpcode),
Response(ResponseOpcode),
}
Expand Down
6 changes: 3 additions & 3 deletions scylla-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ mod proxy;
pub type TargetShard = u16;

pub use actions::{
example_db_errors, Action, Condition, Reaction, RequestReaction, RequestRule, ResponseReaction,
ResponseRule,
example_db_errors, Action, Condition, ConditionHandler, Reaction, RequestReaction, RequestRule,
ResponseReaction, ResponseRule,
};
pub use errors::{DoorkeeperError, ProxyError, WorkerError};
pub use frame::{RequestFrame, RequestOpcode, ResponseFrame, ResponseOpcode};
pub use frame::{FrameOpcode, RequestFrame, RequestOpcode, ResponseFrame, ResponseOpcode};
pub use proxy::{Node, Proxy, RunningProxy, ShardAwareness};

pub use proxy::get_exclusive_local_address;
Expand Down
2 changes: 2 additions & 0 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mod shards;
mod silent_prepare_batch;
mod silent_prepare_query;
mod skip_metadata_optimization;
mod speculative_retry;
mod speculative_tests_utils;
mod tablets;
#[path = "../common/utils.rs"]
mod utils;
82 changes: 0 additions & 82 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,88 +14,6 @@ use scylla_proxy::{
WorkerError,
};

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn speculative_execution_is_fired() {
setup_tracing();
const TIMEOUT_PER_REQUEST: Duration = Duration::from_millis(1000);

let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move {
// DB preparation phase
let simple_speculative_no_retry_profile = ExecutionProfile::builder().speculative_execution_policy(Some(Arc::new(SimpleSpeculativeExecutionPolicy {
max_retry_count: 2,
retry_interval: Duration::from_millis(10),
}))).retry_policy(Arc::new(FallthroughRetryPolicy)).build();
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.default_execution_profile_handle(simple_speculative_no_retry_profile.into_handle())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();

let ks = unique_keyspace_name();
session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks)).await.unwrap();
session.use_keyspace(ks, false).await.unwrap();
session
.ddl("CREATE TABLE t (a int primary key)")
.await
.unwrap();

let mut q = Query::from("INSERT INTO t (a) VALUES (?)");
q.set_is_idempotent(true); // this is to allow speculative execution to fire

let drop_frame_rule = RequestRule(
Condition::RequestOpcode(RequestOpcode::Prepare)
.and(Condition::BodyContainsCaseSensitive(Box::new(*b"t"))),
RequestReaction::drop_frame(),
);

info!("--------------------- BEGINNING main test part ----------------");

info!("--------------------- first query - no rules ----------------");
// first run before any rules
session.query_unpaged(q.clone(), (3,)).await.unwrap();

info!("--------------------- second query - 0 and 2 nodes not responding ----------------");
running_proxy.running_nodes[0]
.change_request_rules(Some(vec![drop_frame_rule.clone()]));
running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_frame_rule.clone()]));

session.query_unpaged(q.clone(), (2,)).await.unwrap();

info!("--------------------- third query - 0 and 1 nodes not responding ----------------");
running_proxy.running_nodes[2]
.change_request_rules(None);
running_proxy.running_nodes[1]
.change_request_rules(Some(vec![drop_frame_rule.clone()]));

session.query_unpaged(q.clone(), (1,)).await.unwrap();


info!("--------------------- fourth query - all nodes not responding ----------------");
running_proxy.running_nodes[2]
.change_request_rules(Some(vec![drop_frame_rule]));

tokio::select! {
res = session.query_unpaged(q, (0,)) => panic!("Rules did not work: received response {:?}", res),
_ = tokio::time::sleep(TIMEOUT_PER_REQUEST) => (),
};

info!("--------------------- FINISHING main test part ----------------");

running_proxy
}).await;

match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
Expand Down
Loading
Loading