Impact
In a multi-node vLLM deployment, vLLM uses ZeroMQ for some multi-node communication purposes. The primary vLLM host opens an XPUB
ZeroMQ socket and binds it to ALL interfaces. While the socket is always opened for a multi-node deployment, it is only used when doing tensor parallelism across multiple hosts.
Any client with network access to this host can connect to this XPUB
socket unless its port is blocked by a firewall. Once connected, these arbitrary clients will receive all of the same data broadcasted to all of the secondary vLLM hosts. This data is internal vLLM state information that is not useful to an attacker.
By potentially connecting to this socket many times and not reading data published to them, an attacker can also cause a denial of service by slowing down or potentially blocking the publisher.
Detailed Analysis
The XPUB
socket in question is created here:
|
socket_addr = f"tcp://*:{remote_subscribe_port}" |
|
self.remote_socket.bind(socket_addr) |
Data is published over this socket via MessageQueue.enqueue()
which is called by MessageQueue.broadcast_object()
:
|
if self.n_remote_reader > 0: |
|
self.remote_socket.send(serialized_obj) |
|
def broadcast_object(self, obj=None): |
|
if self._is_writer: |
|
self.enqueue(obj) |
|
return obj |
The MessageQueue.broadcast_object()
method is called by the GroupCoordinator.broadcast_object()
method in parallel_state.py
:
|
if self.mq_broadcaster is not None: |
|
assert src == 0, "Message queue broadcaster only supports src=0" |
|
return self.mq_broadcaster.broadcast_object(obj) |
The broadcast over ZeroMQ is only done if the GroupCoordinator
was created with use_message_queue_broadcaster
set to True
:
|
self.mq_broadcaster: Optional[MessageQueue] = None |
|
if use_message_queue_broadcaster and self.world_size > 1: |
|
self.mq_broadcaster = MessageQueue.create_from_process_group( |
|
self.cpu_group, 1 << 22, 6) |
The only case where GroupCoordinator
is created with use_message_queue_broadcaster
is the coordinator for the tensor parallelism group:
|
# message queue broadcaster is only used in tensor model parallel group |
|
_TP = init_model_parallel_group(group_ranks, |
|
get_world_group().local_rank, |
|
backend, |
|
use_message_queue_broadcaster=True, |
|
group_name="tp") |
To determine what data is broadcasted to the tensor parallism group, we must continue tracing. GroupCoordinator.broadcast_object()
is called by GroupCoordinator.broadcoast_tensor_dict()
:
|
self.broadcast_object(metadata_list, src=src) |
which is called by broadcast_tensor_dict()
in communication_op.py
:
|
def broadcast_tensor_dict(tensor_dict: Optional[Dict[Any, Union[torch.Tensor, |
|
Any]]] = None, |
|
src: int = 0): |
|
if not torch.distributed.is_initialized(): |
|
return tensor_dict |
|
return get_tp_group().broadcast_tensor_dict(tensor_dict, src) |
If we look at _get_driver_input_and_broadcast()
in the V0 worker_base.py
, we'll see how this tensor dict is formed:
|
def _get_driver_input_and_broadcast( |
|
self, execute_model_req: ExecuteModelRequest |
|
) -> Tuple[BroadcastableModelInput, WorkerInput, Dict[str, torch.Tensor]]: |
|
""" Get the driver input and broadcast it to other workers. """ |
|
assert self.is_driver_worker |
|
|
|
worker_input: WorkerInput = self.prepare_worker_input( |
|
execute_model_req=execute_model_req) |
|
model_input: ModelRunnerInputBase = ( |
|
self.model_runner.prepare_model_input( |
|
execute_model_req.seq_group_metadata_list, |
|
execute_model_req.virtual_engine, |
|
execute_model_req.finished_requests_ids)) |
|
|
|
kwargs = extract_previous_hidden_states(execute_model_req) |
|
|
|
if self.do_metadata_broadcast: |
|
broadcast_data = worker_input.as_broadcastable_tensor_dict() |
|
broadcast_data.update(model_input.as_broadcastable_tensor_dict()) |
|
broadcast_data.update(kwargs) |
|
broadcast_tensor_dict(broadcast_data, src=0) |
but the data actually sent over ZeroMQ is the metadata_list
portion that is split from this tensor_dict
. The tensor parts are sent via torch.distributed
and only metadata about those tensors is sent via ZeroMQ.
|
def _split_tensor_dict( |
|
tensor_dict: Dict[str, Union[torch.Tensor, Any]] |
|
) -> Tuple[List[Tuple[str, Any]], List[torch.Tensor]]: |
|
"""Split the tensor dictionary into two parts: |
|
1. A list of (key, value) pairs. If the value is a tensor, it is replaced |
|
by its metadata. |
|
2. A list of tensors. |
|
""" |
|
metadata_list: List[Tuple[str, Any]] = [] |
|
tensor_list: List[torch.Tensor] = [] |
|
for key, value in tensor_dict.items(): |
|
if isinstance(value, torch.Tensor): |
|
# Note: we cannot use `value.device` here, |
|
# because it contains not only the device type but also the device |
|
# index (e.g. "cuda:0"). We only need the device type. |
|
# receiving side will set the device index. |
|
device = value.device.type |
|
metadata_list.append( |
|
(key, TensorMetadata(device, value.dtype, value.size()))) |
|
tensor_list.append(value) |
|
else: |
|
metadata_list.append((key, value)) |
|
return metadata_list, tensor_list |
Patches
Workarounds
Prior to the fix, your options include:
- Do not expose the vLLM host to a network where any untrusted connections may reach the host.
- Ensure that only the other vLLM hosts are able to connect to the TCP port used for the
XPUB
socket. Note that port used is random.
References
- Relevant code first introduced in #6183
Impact
In a multi-node vLLM deployment, vLLM uses ZeroMQ for some multi-node communication purposes. The primary vLLM host opens an
XPUB
ZeroMQ socket and binds it to ALL interfaces. While the socket is always opened for a multi-node deployment, it is only used when doing tensor parallelism across multiple hosts.Any client with network access to this host can connect to this
XPUB
socket unless its port is blocked by a firewall. Once connected, these arbitrary clients will receive all of the same data broadcasted to all of the secondary vLLM hosts. This data is internal vLLM state information that is not useful to an attacker.By potentially connecting to this socket many times and not reading data published to them, an attacker can also cause a denial of service by slowing down or potentially blocking the publisher.
Detailed Analysis
The
XPUB
socket in question is created here:vllm/vllm/distributed/device_communicators/shm_broadcast.py
Lines 236 to 237 in c21b99b
Data is published over this socket via
MessageQueue.enqueue()
which is called byMessageQueue.broadcast_object()
:vllm/vllm/distributed/device_communicators/shm_broadcast.py
Lines 452 to 453 in 790b797
vllm/vllm/distributed/device_communicators/shm_broadcast.py
Lines 475 to 478 in 790b797
The
MessageQueue.broadcast_object()
method is called by theGroupCoordinator.broadcast_object()
method inparallel_state.py
:vllm/vllm/distributed/parallel_state.py
Lines 364 to 366 in 790b797
The broadcast over ZeroMQ is only done if the
GroupCoordinator
was created withuse_message_queue_broadcaster
set toTrue
:vllm/vllm/distributed/parallel_state.py
Lines 216 to 219 in 790b797
The only case where
GroupCoordinator
is created withuse_message_queue_broadcaster
is the coordinator for the tensor parallelism group:vllm/vllm/distributed/parallel_state.py
Lines 931 to 936 in 790b797
To determine what data is broadcasted to the tensor parallism group, we must continue tracing.
GroupCoordinator.broadcast_object()
is called byGroupCoordinator.broadcoast_tensor_dict()
:vllm/vllm/distributed/parallel_state.py
Line 489 in 790b797
which is called by
broadcast_tensor_dict()
incommunication_op.py
:vllm/vllm/distributed/communication_op.py
Lines 29 to 34 in 790b797
If we look at
_get_driver_input_and_broadcast()
in the V0worker_base.py
, we'll see how this tensor dict is formed:vllm/vllm/worker/worker_base.py
Lines 332 to 352 in 790b797
but the data actually sent over ZeroMQ is the
metadata_list
portion that is split from thistensor_dict
. The tensor parts are sent viatorch.distributed
and only metadata about those tensors is sent via ZeroMQ.vllm/vllm/distributed/parallel_state.py
Lines 61 to 83 in 54a66e5
Patches
Workarounds
Prior to the fix, your options include:
XPUB
socket. Note that port used is random.References