Skip to content

Commit db6652f

Browse files
committed
Add tcp_listenfd_server example with target_os = "wasi" support
Use the `LISTEN_FDS` mechanism to use pre-opened sockets. Especially for `wasm32-wasi` there is no other way to get access to sockets, than to use pre-opened sockets. Because `wasm32-wasi` does not yet return `TcpListener::local_addr()`, an unspecified IP address and port will be returned and displayed. ``` $ cargo +nightly build --target wasm32-wasi --release --example tcp_listenfd_server --features="os-poll net" Compiling cfg-if v1.0.0 Compiling wasi v0.10.2+wasi-snapshot-preview1 Compiling log v0.4.14 Compiling libc v0.2.112 Compiling ppv-lite86 v0.2.15 Compiling wasi v0.11.0+wasi-snapshot-preview1 Compiling getrandom v0.2.3 Compiling rand_core v0.6.3 Compiling env_logger v0.8.4 Compiling rand_chacha v0.3.1 Compiling mio v0.8.0 (/home/harald/git/mio) Compiling rand v0.8.4 Finished release [optimized] target(s) in 2.92s $ wasmtime run --tcplisten 127.0.0.1:9000 --env 'LISTEN_FDS=1' target/wasm32-wasi/release/examples/tcp_listenfd_server.wasm Using preopened socket FD 3 You can connect to the server using `nc`: $ nc <IP> <PORT> You'll see our welcome message and anything you type will be printed here. ``` Signed-off-by: Harald Hoyer <[email protected]>
1 parent be8120e commit db6652f

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ features = ["os-poll", "os-ext", "net"]
8080
name = "tcp_server"
8181
required-features = ["os-poll", "net"]
8282

83+
[[example]]
84+
name = "tcp_listenfd_server"
85+
required-features = ["os-poll", "net"]
86+
8387
[[example]]
8488
name = "udp_server"
8589
required-features = ["os-poll", "net"]

examples/tcp_listenfd_server.rs

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
// You can run this example from the root of the mio repo:
2+
// cargo run --example tcp_listenfd_server --features="os-poll net"
3+
// or with wasi:
4+
// cargo +nightly build --target wasm32-wasi --example tcp_listenfd_server --features="os-poll net"
5+
// wasmtime run --tcplisten 127.0.0.1:9000 --env 'LISTEN_FDS=1' target/wasm32-wasi/debug/examples/tcp_server.wasm
6+
7+
use mio::event::Event;
8+
use mio::net::{TcpListener, TcpStream};
9+
use mio::{Events, Interest, Poll, Registry, Token};
10+
use std::collections::HashMap;
11+
use std::io::{self, Read, Write};
12+
use std::str::from_utf8;
13+
14+
// Setup some tokens to allow us to identify which event is for which socket.
15+
const SERVER: Token = Token(0);
16+
17+
// Some data we'll send over the connection.
18+
const DATA: &[u8] = b"Hello world!\n";
19+
20+
#[cfg(not(windows))]
21+
fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
22+
#[cfg(unix)]
23+
use std::os::unix::io::FromRawFd;
24+
#[cfg(target_os = "wasi")]
25+
use std::os::wasi::io::FromRawFd;
26+
27+
let stdlistener = unsafe { std::net::TcpListener::from_raw_fd(3) };
28+
stdlistener.set_nonblocking(true).unwrap();
29+
Some(stdlistener)
30+
}
31+
32+
#[cfg(windows)]
33+
fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
34+
// Windows does not support `LISTEN_FDS`
35+
None
36+
}
37+
38+
fn main() -> io::Result<()> {
39+
env_logger::init();
40+
41+
std::env::var("LISTEN_FDS").expect("LISTEN_FDS environment variable unset");
42+
43+
// Create a poll instance.
44+
let mut poll = Poll::new()?;
45+
// Create storage for events.
46+
let mut events = Events::with_capacity(128);
47+
48+
// Setup the TCP server socket.
49+
let mut server = {
50+
let stdlistener = get_first_listen_fd_listener().unwrap();
51+
stdlistener.set_nonblocking(true)?;
52+
println!("Using preopened socket FD 3");
53+
println!("You can connect to the server using `nc`:");
54+
match stdlistener.local_addr() {
55+
Ok(a) => println!(" $ nc {} {}", a.ip(), a.port()),
56+
Err(_) => println!(" $ nc <IP> <PORT>"),
57+
}
58+
println!("You'll see our welcome message and anything you type will be printed here.");
59+
TcpListener::from_std(stdlistener)
60+
};
61+
62+
// Register the server with poll we can receive events for it.
63+
poll.registry()
64+
.register(&mut server, SERVER, Interest::READABLE)?;
65+
66+
// Map of `Token` -> `TcpStream`.
67+
let mut connections = HashMap::new();
68+
// Unique token for each incoming connection.
69+
let mut unique_token = Token(SERVER.0 + 1);
70+
71+
loop {
72+
poll.poll(&mut events, None)?;
73+
74+
for event in events.iter() {
75+
match event.token() {
76+
SERVER => loop {
77+
// Received an event for the TCP server socket, which
78+
// indicates we can accept an connection.
79+
let (mut connection, address) = match server.accept() {
80+
Ok((connection, address)) => (connection, address),
81+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
82+
// If we get a `WouldBlock` error we know our
83+
// listener has no more incoming connections queued,
84+
// so we can return to polling and wait for some
85+
// more.
86+
break;
87+
}
88+
Err(e) => {
89+
// If it was any other kind of error, something went
90+
// wrong and we terminate with an error.
91+
return Err(e);
92+
}
93+
};
94+
95+
println!("Accepted connection from: {}", address);
96+
97+
let token = next(&mut unique_token);
98+
poll.registry()
99+
.register(&mut connection, token, Interest::WRITABLE)?;
100+
101+
connections.insert(token, connection);
102+
},
103+
token => {
104+
// Maybe received an event for a TCP connection.
105+
let done = if let Some(connection) = connections.get_mut(&token) {
106+
handle_connection_event(poll.registry(), connection, event)?
107+
} else {
108+
// Sporadic events happen, we can safely ignore them.
109+
false
110+
};
111+
if done {
112+
if let Some(mut connection) = connections.remove(&token) {
113+
poll.registry().deregister(&mut connection)?;
114+
}
115+
}
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
fn next(current: &mut Token) -> Token {
123+
let next = current.0;
124+
current.0 += 1;
125+
Token(next)
126+
}
127+
128+
/// Returns `true` if the connection is done.
129+
fn handle_connection_event(
130+
registry: &Registry,
131+
connection: &mut TcpStream,
132+
event: &Event,
133+
) -> io::Result<bool> {
134+
if event.is_writable() {
135+
// We can (maybe) write to the connection.
136+
match connection.write(DATA) {
137+
// We want to write the entire `DATA` buffer in a single go. If we
138+
// write less we'll return a short write error (same as
139+
// `io::Write::write_all` does).
140+
Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
141+
Ok(_) => {
142+
// After we've written something we'll reregister the connection
143+
// to only respond to readable events.
144+
registry.reregister(connection, event.token(), Interest::READABLE)?
145+
}
146+
// Would block "errors" are the OS's way of saying that the
147+
// connection is not actually ready to perform this I/O operation.
148+
Err(ref err) if would_block(err) => {}
149+
// Got interrupted (how rude!), we'll try again.
150+
Err(ref err) if interrupted(err) => {
151+
return handle_connection_event(registry, connection, event)
152+
}
153+
// Other errors we'll consider fatal.
154+
Err(err) => return Err(err),
155+
}
156+
}
157+
158+
if event.is_readable() {
159+
let mut connection_closed = false;
160+
let mut received_data = vec![0; 4096];
161+
let mut bytes_read = 0;
162+
// We can (maybe) read from the connection.
163+
loop {
164+
match connection.read(&mut received_data[bytes_read..]) {
165+
Ok(0) => {
166+
// Reading 0 bytes means the other side has closed the
167+
// connection or is done writing, then so are we.
168+
connection_closed = true;
169+
break;
170+
}
171+
Ok(n) => {
172+
bytes_read += n;
173+
if bytes_read == received_data.len() {
174+
received_data.resize(received_data.len() + 1024, 0);
175+
}
176+
}
177+
// Would block "errors" are the OS's way of saying that the
178+
// connection is not actually ready to perform this I/O operation.
179+
Err(ref err) if would_block(err) => break,
180+
Err(ref err) if interrupted(err) => continue,
181+
// Other errors we'll consider fatal.
182+
Err(err) => return Err(err),
183+
}
184+
}
185+
186+
if bytes_read != 0 {
187+
let received_data = &received_data[..bytes_read];
188+
if let Ok(str_buf) = from_utf8(received_data) {
189+
println!("Received data: {}", str_buf.trim_end());
190+
} else {
191+
println!("Received (none UTF-8) data: {:?}", received_data);
192+
}
193+
}
194+
195+
if connection_closed {
196+
println!("Connection closed");
197+
return Ok(true);
198+
}
199+
}
200+
201+
Ok(false)
202+
}
203+
204+
fn would_block(err: &io::Error) -> bool {
205+
err.kind() == io::ErrorKind::WouldBlock
206+
}
207+
208+
fn interrupted(err: &io::Error) -> bool {
209+
err.kind() == io::ErrorKind::Interrupted
210+
}

0 commit comments

Comments
 (0)