Skip to content

Improve device plugin registration error messages #754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions agent/src/plugin_manager/device_plugin_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryFrom, path::Path, sync::Arc, time::SystemTime};
use std::{path::Path, sync::Arc, time::SystemTime};

use akri_shared::uds::unix_stream;
use async_trait::async_trait;
Expand Down Expand Up @@ -134,7 +134,7 @@ pub enum RunnerError {
TimeError,

#[error("Unable to register plugin to kubelet")]
RegistrationError,
RegistrationError(#[from] anyhow::Error),
}

pub(super) async fn serve_and_register_plugin<T: Clone + 'static + Send + Sync>(
Expand Down Expand Up @@ -192,7 +192,7 @@ pub(super) async fn serve_and_register_plugin<T: Clone + 'static + Send + Sync>(

if let Err(e) = register_plugin(device_plugin_name, device_endpoint, socket_path).await {
plugin.stop();
return Err(e);
return Err(RunnerError::RegistrationError(e));
}
Ok(())
}
Expand All @@ -201,12 +201,14 @@ async fn register_plugin(
device_plugin_name: String,
device_endpoint: String,
socket_path: String,
) -> Result<(), RunnerError> {
) -> anyhow::Result<()> {
use anyhow::Context;

let capability_id: String = format!("akri.sh/{}", device_plugin_name);

akri_shared::uds::unix_stream::try_connect(&socket_path)
.await
.map_err(|_| RunnerError::RegistrationError)?;
.with_context(|| format!("while trying to connect to {socket_path}"))?;

info!(
"register - entered for Instance {} and socket_name: {}",
Expand All @@ -228,7 +230,7 @@ async fn register_plugin(
UnixStream::connect(kubelet_socket_closure.clone())
}))
.await
.map_err(|_| RunnerError::RegistrationError)?;
.with_context(|| format!("while trying to connect to {KUBELET_SOCKET}"))?;
let mut registration_client = registration_client::RegistrationClient::new(channel);

let register_request = tonic::Request::new(RegisterRequest {
Expand All @@ -243,9 +245,6 @@ async fn register_plugin(
);

// If fail to register with the kubelet, terminate device plugin
registration_client
.register(register_request)
.await
.map_err(|_| RunnerError::RegistrationError)?;
registration_client.register(register_request).await?;
Ok(())
}
3 changes: 3 additions & 0 deletions discovery-utils/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub mod mock_discovery_handler {
return_error: bool,
devices: Vec<Device>,
) -> tokio::task::JoinHandle<()> {
use anyhow::Context;

let discovery_handler = MockDiscoveryHandler {
return_error,
devices,
Expand All @@ -184,6 +186,7 @@ pub mod mock_discovery_handler {
// Try to connect in loop until first thread has served Discovery Handler
unix_stream::try_connect(discovery_handler_endpoint)
.await
.with_context(|| format!("while trying to connect to {discovery_handler_endpoint}"))
.unwrap();
handle
}
Expand Down
63 changes: 25 additions & 38 deletions shared/src/uds/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,49 +57,36 @@ impl AsyncWrite for UnixStream {
}
}

pub async fn try_connect(socket_path: &str) -> Result<(), anyhow::Error> {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub async fn try_connect(socket_path: &str) -> anyhow::Result<()> {
use anyhow::Context;
use std::time::{Duration, SystemTime};

// We will ignore this dummy uri because UDS does not use it.
// Some servers will check the uri content so the uri needs to
// be in valid format even it's not used, the scheme part is used
// to specific what scheme to use, such as http or https
let endpoint = tonic::transport::Endpoint::from_static("http://[::1]:50051");

// Test that server is running, trying for at most 10 seconds
// Similar to grpc.timeout, which is yet to be implemented for tonic
// See issue: https://github.com/hyperium/tonic/issues/75
let mut connected = false;
let start = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
let start_plus_10 = start + 10;
let start = SystemTime::now();

loop {
let path_connector = tower::service_fn({
let socket_path = socket_path.to_string();
move |_: tonic::transport::Uri| tokio::net::UnixStream::connect(socket_path.clone())
});

while (SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
< start_plus_10)
&& !connected
{
let path = socket_path.to_string();
// We will ignore this dummy uri because UDS does not use it.
// Some servers will check the uri content so the uri needs to
// be in valid format even it's not used, the scheme part is used
// to specific what scheme to use, such as http or https
if let Ok(_v) = tonic::transport::Endpoint::try_from("http://[::1]:50051")
.map_err(|e| anyhow::format_err!("{}", e))?
.connect_with_connector(tower::service_fn(move |_: tonic::transport::Uri| {
tokio::net::UnixStream::connect(path.clone())
}))
.await
{
connected = true
} else {
tokio::time::sleep(Duration::from_secs(1)).await
if let Err(e) = endpoint.connect_with_connector(path_connector).await {
let elapsed = start.elapsed().expect("System time should be monotonic");
if elapsed.as_secs() < 10 {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
return Err(e).context("After trying for at least 10 seconds");
}
}
if connected {
Ok(())
} else {
Err(anyhow::format_err!(
"Could not connect to server on socket {}",
socket_path
))

return Ok(());
}
}