Skip to content

Commit f03e0c0

Browse files
authored
Move interactive-specific TestStreamService to interactive protos. (#33858)
Notably this allows the protos in model/pipeline to be definitions only rather than mixing services and definitions.
1 parent 047715e commit f03e0c0

File tree

5 files changed

+27
-27
lines changed

5 files changed

+27
-27
lines changed

model/interactive/src/main/proto/org/apache/beam/model/interactive/v1/beam_interactive_api.proto

+13
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,16 @@ message TestStreamFileRecord {
4949
// The recorded event from an element stream.
5050
org.apache.beam.model.pipeline.v1.TestStreamPayload.Event recorded_event = 1;
5151
}
52+
53+
service TestStreamService {
54+
// A TestStream will request for events using this RPC.
55+
rpc Events(EventsRequest) returns (stream org.apache.beam.model.pipeline.v1.TestStreamPayload.Event) {}
56+
}
57+
58+
message EventsRequest {
59+
// The set of PCollections to read from. These are the PTransform outputs
60+
// local names. These are a subset of the TestStream's outputs. This allows
61+
// Interactive Beam to cache many PCollections from a pipeline then replay a
62+
// subset of them.
63+
repeated string output_ids = 1;
64+
}

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto

+2-15
Original file line numberDiff line numberDiff line change
@@ -722,19 +722,6 @@ message TestStreamPayload {
722722
}
723723
}
724724

725-
service TestStreamService {
726-
// A TestStream will request for events using this RPC.
727-
rpc Events(EventsRequest) returns (stream TestStreamPayload.Event) {}
728-
}
729-
730-
message EventsRequest {
731-
// The set of PCollections to read from. These are the PTransform outputs
732-
// local names. These are a subset of the TestStream's outputs. This allows
733-
// Interactive Beam to cache many PCollections from a pipeline then replay a
734-
// subset of them.
735-
repeated string output_ids = 1;
736-
}
737-
738725
// The payload for the special-but-not-primitive WriteFiles transform.
739726
message WriteFilesPayload {
740727

@@ -750,7 +737,7 @@ message WriteFilesPayload {
750737

751738
map<string, SideInput> side_inputs = 5;
752739

753-
// This is different from runner based sharding. This is done by the runner backend, where as runner_determined_sharding
740+
// This is different from runner based sharding. This is done by the runner backend, where as runner_determined_sharding
754741
// is by the runner translator
755742
bool auto_sharded = 6;
756743
}
@@ -968,7 +955,7 @@ message StandardCoders {
968955
// 01 - on time
969956
// 10 - late
970957
// 11 - unknown
971-
// * bit 6 is 1 if this is the last pane, 0 otherwise.
958+
// * bit 6 is 1 if this is the last pane, 0 otherwise.
972959
// Commonly set with `byte |= 0x02`
973960
// * bit 7 is 1 if this is the first pane, 0 otherwise.
974961
// Commonly set with `byte |= 0x01`

sdks/python/apache_beam/runners/direct/test_stream_impl.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
from apache_beam import ParDo
3838
from apache_beam import coders
3939
from apache_beam import pvalue
40-
from apache_beam.portability.api import beam_runner_api_pb2
41-
from apache_beam.portability.api import beam_runner_api_pb2_grpc
40+
from apache_beam.portability.api import beam_interactive_api_pb2
41+
from apache_beam.portability.api import beam_interactive_api_pb2_grpc
4242
from apache_beam.testing.test_stream import ElementEvent
4343
from apache_beam.testing.test_stream import ProcessingTimeEvent
4444
from apache_beam.testing.test_stream import WatermarkEvent
@@ -267,10 +267,10 @@ def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive):
267267
is placed on the channel to signify a successful end.
268268
"""
269269
stub_channel = grpc.insecure_channel(endpoint)
270-
stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel)
270+
stub = beam_interactive_api_pb2_grpc.TestStreamServiceStub(stub_channel)
271271

272272
# Request the PCollections that we are looking for from the service.
273-
event_request = beam_runner_api_pb2.EventsRequest(
273+
event_request = beam_interactive_api_pb2.EventsRequest(
274274
output_ids=[str(tag) for tag in output_tags])
275275

276276
event_stream = stub.Events(event_request)

sdks/python/apache_beam/testing/test_stream_service.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121

2222
import grpc
2323

24-
from apache_beam.portability.api import beam_runner_api_pb2_grpc
24+
from apache_beam.portability.api import beam_interactive_api_pb2_grpc
2525

2626

2727
class TestStreamServiceController(
28-
beam_runner_api_pb2_grpc.TestStreamServiceServicer):
28+
beam_interactive_api_pb2_grpc.TestStreamServiceServicer):
2929
"""A server that streams TestStreamPayload.Events from a single EventRequest.
3030
3131
This server is used as a way for TestStreams to receive events from file.
@@ -42,7 +42,7 @@ def __init__(self, reader, endpoint=None, exception_handler=None):
4242
port = self._server.add_insecure_port('localhost:0')
4343
self.endpoint = 'localhost:{}'.format(port)
4444

45-
beam_runner_api_pb2_grpc.add_TestStreamServiceServicer_to_server(
45+
beam_interactive_api_pb2_grpc.add_TestStreamServiceServicer_to_server(
4646
self, self._server)
4747
self._reader = reader
4848
self._exception_handler = exception_handler

sdks/python/apache_beam/testing/test_stream_service_test.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import grpc
2424

2525
from apache_beam.portability.api import beam_interactive_api_pb2
26+
from apache_beam.portability.api import beam_interactive_api_pb2_grpc
2627
from apache_beam.portability.api import beam_runner_api_pb2
27-
from apache_beam.portability.api import beam_runner_api_pb2_grpc
2828
from apache_beam.testing.test_stream_service import TestStreamServiceController
2929

3030
# Nose automatically detects tests if they match a regex. Here, it mistakens
@@ -63,14 +63,14 @@ def setUp(self):
6363
self.controller.start()
6464

6565
channel = grpc.insecure_channel(self.controller.endpoint)
66-
self.stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(channel)
66+
self.stub = beam_interactive_api_pb2_grpc.TestStreamServiceStub(channel)
6767

6868
def tearDown(self):
6969
self.controller.stop()
7070

7171
def test_normal_run(self):
7272
r = self.stub.Events(
73-
beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
73+
beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
7474
events = [e for e in r]
7575
expected_events = [
7676
e for e in EventsReader(
@@ -81,9 +81,9 @@ def test_normal_run(self):
8181

8282
def test_multiple_sessions(self):
8383
resp_a = self.stub.Events(
84-
beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
84+
beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
8585
resp_b = self.stub.Events(
86-
beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
86+
beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS))
8787

8888
events_a = []
8989
events_b = []

0 commit comments

Comments
 (0)