Skip to content

Commit 3aa78d2

Browse files
robertwbtvalentyn
andauthored
Better error message for large elements. (#30639)
This will cause an exception when the too-large element is emitted, rather than later when the proto is serialized (which happens on another thread and may also cause spurious errors in the data channel consumption). --------- Co-authored-by: tvalentyn <[email protected]>
1 parent c298da5 commit 3aa78d2

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

sdks/python/apache_beam/runners/worker/data_plane.py

+9
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363

6464
_DEFAULT_SIZE_FLUSH_THRESHOLD = 10 << 20 # 10MB
6565
_DEFAULT_TIME_FLUSH_THRESHOLD_MS = 0 # disable time-based flush by default
66+
_FLUSH_MAX_SIZE = (2 << 30) - 100 # 2GB less some overhead, protobuf/grpc limit
6667

6768
# Keep a set of completed instructions to discard late received data. The set
6869
# can have up to _MAX_CLEANED_INSTRUCTIONS items. See _GrpcDataChannel.
@@ -147,6 +148,14 @@ def maybe_flush(self):
147148
def flush(self):
148149
# type: () -> None
149150
if self._flush_callback:
151+
if self.size() > _FLUSH_MAX_SIZE:
152+
raise ValueError(
153+
f'Buffer size {self.size()} exceeds GRPC limit {_FLUSH_MAX_SIZE}. '
154+
'This is likely due to a single element that is too large. '
155+
'To resolve, prefer multiple small elements over single large '
156+
'elements in PCollections. If needed, store large blobs in '
157+
'external storage systems, and use PCollections to pass their '
158+
'metadata, or use a custom coder that reduces the element\'s size.')
150159
self._flush_callback(self.get())
151160
self._clear()
152161

0 commit comments

Comments
 (0)