Skip to content

Commit b844326

Browse files
kiukchungfacebook-github-bot
authored andcommitted
(monarch_hyperactor) Create python binding for a RemoteAllocator that takes a list of remote channel addresses (#170)
Summary: Pull Request resolved: #170 See: P1833193535 for how the user-facing UX would look like. NOTE: Recommended to start the review at `monarch‎/python‎/tests‎/test_allocator.py‎` to get a sense of what the API/Usage looks like. NOTE: hyperactor's ChannelAddr can be represented in string format such as: `tcp!127.0.0.1:26600` or `metatls!devgpu001.pci.facebook.com:26600` which includes all the necessary information to create a `Channel`. Unfortunately, the current `RemoteAllocator` related interfaces take `transport` (`ChannelTransport`), `port`, and a list of `hostnames`, and applies the same transport and port to all. This isn't ideal (especially for flexibility in deployment and testing). So the python bindings take a list of channel address strings rather than a list of hostnames. To support multi-node actor meshes in OSS without having to write a custom allocator for each scheduler (e.g. `SlurmAllocator`, `KubernetesAllocator`) we take advantage of the infrastructure we already have in TorchX and TorchElastic. This Diff creates Python bindings for `RemoteAllocatorBase` that takes a list of server addresses (in channel_addr format - e.g. `metatls!devgpu032.nha1.facebook.com:26600` or `tcp!devgpu032.nha1.facebook.com:26601`) of remote-process-allocator server and connects to it. The internals reuse existing `RemoteProcessAlloc` with a custom `PyRemoteProcessAllocInitializer` that simply returns a `Vec<RemoteProcessAllocHost>` given the user provided list of server addresses. ## Next Steps: 1. [1/2] Add hostnames to `monarch.tools.mesh_spec.MeshSpec` struct + ability to query hostnames given a running job. 2. [2/2] Make it possible to run 2x remote process allocators (each on its own port) on MAST Reviewed By: technicianted Differential Revision: D75928565 fbshipit-source-id: 2dac97fbdabe11c1578c5930501ac49b93906727
1 parent 547656a commit b844326

File tree

11 files changed

+730
-15
lines changed

11 files changed

+730
-15
lines changed

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ jobs:
5454
# Install test dependencies
5555
pip install -r python/tests/requirements.txt
5656
57+
# Install remote process_allocator binary (some tests use it)
58+
cargo install --path monarch_hyperactor
59+
5760
# Build and install monarch
5861
# NB: monarch is currently can't be built in isolated builds (e.g not PEP519 compatible)
5962
# because 'torch-sys' needs to be compiled against 'torch' in the main python environment

monarch_extension/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
142142
module,
143143
"monarch_hyperactor.alloc",
144144
)?)?;
145+
monarch_hyperactor::channel::register_python_bindings(&get_or_add_new_module(
146+
module,
147+
"monarch_hyperactor.channel",
148+
)?)?;
145149
monarch_hyperactor::actor_mesh::register_python_bindings(&get_or_add_new_module(
146150
module,
147151
"monarch_hyperactor.actor_mesh",

monarch_hyperactor/src/alloc.rs

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,30 @@
77
*/
88

99
use std::collections::HashMap;
10+
use std::str::FromStr;
1011
use std::sync::Arc;
12+
use std::time::Duration;
1113

14+
use anyhow::anyhow;
15+
use async_trait::async_trait;
16+
use hyperactor::WorldId;
17+
use hyperactor::channel::ChannelAddr;
18+
use hyperactor::channel::ChannelTransport;
1219
use hyperactor_extension::alloc::PyAlloc;
1320
use hyperactor_extension::alloc::PyAllocSpec;
21+
use hyperactor_mesh::alloc::AllocSpec;
1422
use hyperactor_mesh::alloc::Allocator;
23+
use hyperactor_mesh::alloc::AllocatorError;
1524
use hyperactor_mesh::alloc::LocalAllocator;
1625
use hyperactor_mesh::alloc::ProcessAllocator;
26+
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAlloc;
27+
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocHost;
28+
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocInitializer;
1729
use pyo3::exceptions::PyRuntimeError;
1830
use pyo3::prelude::*;
1931
use tokio::process::Command;
2032

33+
use crate::channel::PyChannelAddr;
2134
use crate::runtime::signal_safe_block_on;
2235

2336
#[pyclass(
@@ -48,7 +61,7 @@ impl PyLocalAllocator {
4861
.allocate(spec)
4962
.await
5063
.map(|inner| PyAlloc::new(Box::new(inner)))
51-
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
64+
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
5265
})
5366
}
5467

@@ -132,9 +145,174 @@ impl PyProcessAllocator {
132145
}
133146
}
134147

148+
/// A `[hyperactor_mesh::alloc::RemoteProcessAllocInitializer]` wrapper to enable subclassing from Python.
149+
///
150+
/// Basically follows https://pyo3.rs/v0.25.0/trait-bounds.html.
151+
/// The Python subclass should implement `def initialize_alloc(self) -> list[str]`.
152+
pub struct PyRemoteProcessAllocInitializer {
153+
// instance of a Python subclass of `monarch._rust_bindings.monarch_hyperactor.alloc.RemoteProcessAllocInitializer`.
154+
py_inner: Py<PyAny>,
155+
}
156+
157+
impl Clone for PyRemoteProcessAllocInitializer {
158+
fn clone(&self) -> Self {
159+
Self {
160+
py_inner: Python::with_gil(|py| Py::clone_ref(&self.py_inner, py)),
161+
}
162+
}
163+
}
164+
impl PyRemoteProcessAllocInitializer {
165+
/// calls the initializer's `initialize_alloc()` as implemented in python
166+
///
167+
/// NOTE: changes to python method calls must be made in sync with
168+
/// the method signature of `RemoteAllocInitializer` in
169+
/// `monarch/python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi`
170+
async fn py_initialize_alloc(&self) -> PyResult<Vec<String>> {
171+
// call the function as implemented in python
172+
let future = Python::with_gil(|py| -> PyResult<_> {
173+
let coroutine = self.py_inner.bind(py).call_method0("initialize_alloc")?;
174+
pyo3_async_runtimes::tokio::into_future(coroutine)
175+
})?;
176+
177+
let addrs = future.await?;
178+
Python::with_gil(|py| -> PyResult<Vec<String>> { addrs.extract(py) })
179+
}
180+
181+
async fn get_transport_and_port(&self) -> PyResult<(ChannelTransport, u16)> {
182+
// NOTE: the upstream RemoteAllocator APIs take (transport, port, hostnames)
183+
// (e.g. assumes the same transport and port for all servers).
184+
// Until that is fixed we have to assume the same here.
185+
// Get the transport and port from the first address
186+
// TODO T227130269
187+
let addrs = self.py_initialize_alloc().await?;
188+
let addr = addrs
189+
.first()
190+
.ok_or_else(|| anyhow!("initializer must return non-empty list of addresses"))?;
191+
let channel_addr = PyChannelAddr::parse(addr)?;
192+
let port = channel_addr.get_port()?;
193+
let transport = channel_addr.get_transport()?;
194+
Ok((transport.into(), port))
195+
}
196+
}
197+
198+
#[async_trait]
199+
impl RemoteProcessAllocInitializer for PyRemoteProcessAllocInitializer {
200+
async fn initialize_alloc(&mut self) -> Result<Vec<RemoteProcessAllocHost>, anyhow::Error> {
201+
// call the function as implemented in python
202+
let addrs = self.py_initialize_alloc().await?;
203+
addrs
204+
.iter()
205+
.map(|channel_addr| {
206+
let addr = ChannelAddr::from_str(channel_addr)?;
207+
let (id, hostname) = match addr {
208+
ChannelAddr::Tcp(socket) => (socket.ip().to_string(), socket.ip().to_string()),
209+
ChannelAddr::MetaTls(hostname, _) => (hostname.clone(), hostname.clone()),
210+
ChannelAddr::Unix(_) => (addr.to_string(), addr.to_string()),
211+
_ => anyhow::bail!("unsupported transport for channel address: `{addr}`"),
212+
};
213+
Ok(RemoteProcessAllocHost { id, hostname })
214+
})
215+
.collect()
216+
}
217+
}
218+
219+
#[pyclass(
220+
name = "RemoteAllocatorBase",
221+
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
222+
subclass
223+
)]
224+
#[derive(Clone)]
225+
pub struct PyRemoteAllocator {
226+
// IMPORTANT: other than the `initializer` this struct should not hold any non-trivially
227+
// clonable data (e.g. such that the Clone derive-attribute would not work).
228+
// This allows us to avoid having yet-another-wrapper for PyRemoteAllocator since
229+
// PyRemoteProcessAllocInitializer is already a wrapper and its wrapped Py<PyAny> is
230+
// shared by reference.
231+
world_id: String,
232+
initializer: PyRemoteProcessAllocInitializer,
233+
heartbeat_interval: Duration,
234+
}
235+
236+
#[async_trait]
237+
impl Allocator for PyRemoteAllocator {
238+
type Alloc = RemoteProcessAlloc;
239+
240+
async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
241+
let initializer = self.initializer.clone();
242+
let (transport, port) = initializer
243+
.get_transport_and_port()
244+
.await
245+
.map_err(|e| AllocatorError::Other(e.into()))?;
246+
247+
let alloc = RemoteProcessAlloc::new(
248+
spec,
249+
WorldId(self.world_id.clone()),
250+
transport,
251+
port,
252+
self.heartbeat_interval,
253+
initializer,
254+
)
255+
.await?;
256+
Ok(alloc)
257+
}
258+
}
259+
260+
#[pymethods]
261+
impl PyRemoteAllocator {
262+
#[new]
263+
#[pyo3(signature = (
264+
world_id,
265+
initializer,
266+
heartbeat_interval = Duration::from_secs(5),
267+
))]
268+
fn new(
269+
world_id: String,
270+
initializer: Py<PyAny>,
271+
heartbeat_interval: Duration,
272+
) -> PyResult<Self> {
273+
Ok(Self {
274+
world_id,
275+
initializer: PyRemoteProcessAllocInitializer {
276+
py_inner: initializer,
277+
},
278+
heartbeat_interval,
279+
})
280+
}
281+
282+
fn allocate_nonblocking<'py>(
283+
&self,
284+
py: Python<'py>,
285+
spec: &PyAllocSpec,
286+
) -> PyResult<Bound<'py, PyAny>> {
287+
let spec = spec.inner.clone();
288+
let mut cloned = self.clone();
289+
290+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
291+
cloned
292+
.allocate(spec)
293+
.await
294+
.map(|alloc| PyAlloc::new(Box::new(alloc)))
295+
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
296+
})
297+
}
298+
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
299+
let spec = spec.inner.clone();
300+
let mut cloned = self.clone();
301+
302+
signal_safe_block_on(py, async move {
303+
cloned
304+
.allocate(spec)
305+
.await
306+
.map(|alloc| PyAlloc::new(Box::new(alloc)))
307+
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
308+
})?
309+
}
310+
}
311+
135312
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
136313
hyperactor_mod.add_class::<PyProcessAllocator>()?;
137314
hyperactor_mod.add_class::<PyLocalAllocator>()?;
315+
hyperactor_mod.add_class::<PyRemoteAllocator>()?;
138316

139317
Ok(())
140318
}

monarch_hyperactor/src/bin/process_allocator/common.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,24 @@ use clap::command;
1313
use hyperactor::channel::ChannelAddr;
1414
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocator;
1515
use tokio::process::Command;
16+
1617
#[derive(Parser, Debug)]
1718
#[command(about = "Runs hyperactor's process allocator")]
1819
pub struct Args {
1920
#[arg(
2021
long,
21-
default_value = "[::]",
22-
help = "The address bind to. The process allocator runs on `bind_addr:port`"
22+
default_value_t = 26600,
23+
help = "The port to bind to on [::] (all network interfaces on this host). Same as specifying `--addr=[::]:{port}`"
2324
)]
24-
pub addr: String,
25+
pub port: u16,
2526

2627
#[arg(
2728
long,
28-
default_value_t = 26600,
29-
help = "Port to bind to. The process allocator runs on `bind_addr:port`"
29+
help = "The address to bind to in the form: \
30+
`{transport}!{address}:{port}` (e.g. `tcp!127.0.0.1:26600`). \
31+
If specified, `--port` argument is ignored"
3032
)]
31-
pub port: u16,
33+
pub addr: Option<String>,
3234

3335
#[arg(
3436
long,
@@ -72,8 +74,8 @@ mod tests {
7274

7375
let parsed_args = Args::parse_from(args);
7476

75-
assert_eq!(parsed_args.addr, "[::]");
7677
assert_eq!(parsed_args.port, 26600);
78+
assert_eq!(parsed_args.addr, None);
7779
assert_eq!(parsed_args.program, "monarch_bootstrap");
7880
Ok(())
7981
}
@@ -82,15 +84,13 @@ mod tests {
8284
async fn test_args() -> Result<(), anyhow::Error> {
8385
let args = vec![
8486
"process_allocator",
85-
"--addr=127.0.0.1",
86-
"--port=29500",
87+
"--addr=tcp!127.0.0.1:29501",
8788
"--program=/bin/echo",
8889
];
8990

9091
let parsed_args = Args::parse_from(args);
9192

92-
assert_eq!(parsed_args.addr, "127.0.0.1");
93-
assert_eq!(parsed_args.port, 29500);
93+
assert_eq!(parsed_args.addr, Some("tcp!127.0.0.1:29501".to_string()));
9494
assert_eq!(parsed_args.program, "/bin/echo");
9595
Ok(())
9696
}

monarch_hyperactor/src/bin/process_allocator/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
mod common;
1010

11+
use std::str::FromStr;
12+
1113
use clap::Parser;
1214
use common::Args;
1315
use common::main_impl;
@@ -18,9 +20,11 @@ async fn main() {
1820
let args = Args::parse();
1921
hyperactor::initialize();
2022

21-
let bind = format!("{}:{}", args.addr, args.port);
22-
let socket_addr: std::net::SocketAddr = bind.parse().unwrap();
23-
let serve_address = ChannelAddr::Tcp(socket_addr);
23+
let bind = args
24+
.addr
25+
.unwrap_or_else(|| format!("tcp![::]:{}", args.port));
26+
27+
let serve_address = ChannelAddr::from_str(&bind).unwrap();
2428

2529
let _ = main_impl(serve_address, args.program).await.unwrap();
2630
}

0 commit comments

Comments
 (0)