Skip to content

Commit a588a5c

Browse files
Use Watcher API for instance reconciliation in controller
Signed-off-by: Kate Goldenring <[email protected]>
1 parent 9aec489 commit a588a5c

File tree

6 files changed

+151
-284
lines changed

6 files changed

+151
-284
lines changed

controller/src/main.rs

+40-25
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
extern crate lazy_static;
33
mod util;
44

5-
use akri_shared::akri::{metrics::run_metrics_server, API_NAMESPACE};
5+
use akri_shared::{
6+
akri::{metrics::run_metrics_server, API_NAMESPACE},
7+
k8s::AKRI_CONFIGURATION_LABEL_NAME,
8+
};
9+
use futures::StreamExt;
10+
use kube::runtime::{watcher::Config, Controller};
611
use prometheus::IntGaugeVec;
712
use std::sync::Arc;
8-
use util::{
9-
controller_ctx::{ControllerContext, CONTROLLER_FIELD_MANAGER_ID},
10-
instance_action, node_watcher, pod_watcher,
11-
};
13+
use util::{controller_ctx::ControllerContext, instance_action, node_watcher, pod_watcher};
1214

1315
/// Length of time to sleep between controller system validation checks
1416
pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30;
@@ -34,33 +36,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
3436
);
3537

3638
log::info!("{} Controller logging started", API_NAMESPACE);
37-
let mut tasks = Vec::new();
3839

3940
// Start server for prometheus metrics
4041
tokio::spawn(run_metrics_server());
41-
42-
let controller_ctx = Arc::new(ControllerContext::new(
43-
Arc::new(kube::Client::try_default().await?),
44-
CONTROLLER_FIELD_MANAGER_ID,
45-
));
46-
let instance_watcher_ctx = controller_ctx.clone();
42+
let client = Arc::new(kube::Client::try_default().await?);
43+
let controller_ctx = Arc::new(ControllerContext::new(client.clone()));
4744
let node_watcher_ctx = controller_ctx.clone();
4845
let pod_watcher_ctx = controller_ctx.clone();
4946

50-
// Handle instance changes
51-
tasks.push(tokio::spawn(async {
52-
instance_action::run(instance_watcher_ctx).await;
53-
}));
54-
// Watch for node disappearance
55-
tasks.push(tokio::spawn(async {
56-
node_watcher::run(node_watcher_ctx).await;
57-
}));
58-
// Watch for broker Pod state changes
59-
tasks.push(tokio::spawn(async {
60-
pod_watcher::run(pod_watcher_ctx).await;
61-
}));
47+
node_watcher::check(client.clone()).await?;
48+
let node_controller = Controller::new(
49+
node_watcher_ctx.client.all().as_inner(),
50+
Config::default().any_semantic(),
51+
)
52+
.shutdown_on_signal()
53+
.run(
54+
node_watcher::reconcile,
55+
node_watcher::error_policy,
56+
node_watcher_ctx,
57+
)
58+
.filter_map(|x| async move { std::result::Result::ok(x) })
59+
.for_each(|_| futures::future::ready(()));
60+
61+
pod_watcher::check(client.clone()).await?;
62+
let pod_controller = Controller::new(
63+
pod_watcher_ctx.client.all().as_inner(),
64+
Config::default().labels(AKRI_CONFIGURATION_LABEL_NAME),
65+
)
66+
.shutdown_on_signal()
67+
.run(
68+
pod_watcher::reconcile,
69+
pod_watcher::error_policy,
70+
pod_watcher_ctx,
71+
)
72+
.filter_map(|x| async move { std::result::Result::ok(x) })
73+
.for_each(|_| futures::future::ready(()));
6274

63-
futures::future::try_join_all(tasks).await?;
75+
tokio::select! {
76+
_ = futures::future::join(node_controller, pod_controller) => {},
77+
_ = instance_action::run(client) => {}
78+
}
6479

6580
log::info!("{} Controller end", API_NAMESPACE);
6681
Ok(())

controller/src/util/controller_ctx.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ use k8s_openapi::api::core::v1::{Node, Pod, Service};
1010

1111
use tokio::sync::RwLock;
1212

13-
// Identifier for the controller to be set as the field manager for server-side apply
14-
pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller";
15-
1613
/// Pod states that BrokerPodWatcher is interested in
1714
///
1815
/// PodState describes the various states that the controller can
@@ -91,16 +88,14 @@ pub struct ControllerContext {
9188
pub client: Arc<dyn ControllerKubeClient>,
9289
pub known_pods: Arc<RwLock<HashMap<String, PodState>>>,
9390
pub known_nodes: Arc<RwLock<HashMap<String, NodeState>>>,
94-
pub identifier: String,
9591
}
9692

9793
impl ControllerContext {
98-
pub fn new(client: Arc<dyn ControllerKubeClient>, identifier: &str) -> Self {
94+
pub fn new(client: Arc<dyn ControllerKubeClient>) -> Self {
9995
ControllerContext {
10096
client,
10197
known_pods: Arc::new(RwLock::new(HashMap::new())),
10298
known_nodes: Arc::new(RwLock::new(HashMap::new())),
103-
identifier: identifier.to_string(),
10499
}
105100
}
106101
}

0 commit comments

Comments
 (0)