Skip to content

Improve error when submitting work from a closed client #9049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,10 @@
"""Custom exception class to exit All(...) early."""


class ClosedClientError(Exception):
"""Raised when an action with a closed client can't be performed"""


def _handle_print(event):
_, msg = event
if not isinstance(msg, dict):
Expand Down Expand Up @@ -1419,9 +1423,8 @@
if self.status in ("running", "closing", "connecting", "newly-created"):
self.loop.add_callback(self._send_to_scheduler_safe, msg)
else:
raise Exception(
"Tried sending message after closing. Status: %s\n"
"Message: %s" % (self.status, msg)
raise ClosedClientError(

Check warning on line 1426 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L1426

Added line #L1426 was not covered by tests
f"Client is {self.status}. Can't send {msg['op']} message."
)

async def _start(self, timeout=no_default, **kwargs):
Expand Down
14 changes: 14 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from distributed.client import (
Client,
ClosedClientError,
Future,
FutureCancelledError,
FuturesCancelledError,
Expand Down Expand Up @@ -5711,6 +5712,19 @@ async def test_warn_when_submitting_large_values_memoryview(c, s):
c.submit(lambda: a)


@gen_cluster(client=True, nthreads=[])
async def test_closed_client_send_message(c, s):
# Ensure a meaningful, but concise error is raised when
# a closed client attempts to send a message to the scheduler
await c.close()
with pytest.raises(ClosedClientError, match="update-graph") as exc_info:
c.submit(lambda x: x + 1)

msg = str(exc_info.value)
assert "Can't send update-graph" in msg
assert len(msg) < 100


@gen_cluster(client=True)
async def test_unhashable_function(c, s, a, b):
func = _UnhashableCallable()
Expand Down
Loading