Skip to content

Commit bebfcbc

Browse files
authored
Add an experiment to make prism server a singleton (#34623)
* Add an experiment flag to make prism job server a singleton. * Attempt to fix the failed tests. * Replace singleton code with util.shared.Shared. * Fix lints
1 parent de2a83f commit bebfcbc

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

sdks/python/apache_beam/runners/portability/prism_runner.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from apache_beam.runners.portability import job_server
4040
from apache_beam.runners.portability import portable_runner
4141
from apache_beam.transforms import environments
42+
from apache_beam.utils import shared
4243
from apache_beam.utils import subprocess_server
4344
from apache_beam.version import __version__ as beam_version
4445

@@ -56,6 +57,8 @@ class PrismRunner(portable_runner.PortableRunner):
5657
"""A runner for launching jobs on Prism, automatically downloading and
5758
starting a Prism instance if needed.
5859
"""
60+
shared_handle = shared.Shared()
61+
5962
def default_environment(
6063
self,
6164
options: pipeline_options.PipelineOptions) -> environments.Environment:
@@ -66,7 +69,12 @@ def default_environment(
6669
return super().default_environment(options)
6770

6871
def default_job_server(self, options):
69-
return job_server.StopOnExitJobServer(PrismJobServer(options))
72+
debug_options = options.view_as(pipeline_options.DebugOptions)
73+
get_job_server = lambda: job_server.StopOnExitJobServer(
74+
PrismJobServer(options))
75+
if debug_options.lookup_experiment("enable_prism_server_singleton"):
76+
return PrismRunner.shared_handle.acquire(get_job_server)
77+
return get_job_server()
7078

7179
def create_job_service_handle(self, job_service, options):
7280
return portable_runner.JobServiceHandle(

sdks/python/apache_beam/runners/portability/prism_runner_test.py

+33
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from apache_beam.runners.portability import prism_runner
4040
from apache_beam.testing.util import assert_that
4141
from apache_beam.testing.util import equal_to
42+
from apache_beam.utils import shared
4243

4344
# Run as
4445
#
@@ -381,6 +382,38 @@ def test_with_remote_path(self, has_cache_bin, has_cache_zip, ignore_cache):
381382
mock_zipfile_init.assert_called_once()
382383

383384

385+
class PrismRunnerSingletonTest(unittest.TestCase):
386+
@parameterized.expand([True, False])
387+
def test_singleton(self, enable_singleton):
388+
if enable_singleton:
389+
options = DebugOptions(["--experiment=enable_prism_server_singleton"])
390+
else:
391+
options = DebugOptions()
392+
393+
runner = prism_runner.PrismRunner()
394+
with mock.patch(
395+
'apache_beam.runners.portability.prism_runner.PrismJobServer'
396+
) as mock_prism_server:
397+
398+
# Reset the class-level singleton for every fresh run
399+
prism_runner.PrismRunner.shared_handle = shared.Shared()
400+
401+
runner = prism_runner.PrismRunner()
402+
runner.default_job_server(options)
403+
404+
mock_prism_server.assert_called_once()
405+
mock_prism_server.reset_mock()
406+
407+
runner = prism_runner.PrismRunner()
408+
runner.default_job_server(options)
409+
if enable_singleton:
410+
# If singleton is enabled, we won't try to create a new server for the
411+
# second run.
412+
mock_prism_server.assert_not_called()
413+
else:
414+
mock_prism_server.assert_called_once()
415+
416+
384417
if __name__ == '__main__':
385418
# Run the tests.
386419
logging.getLogger().setLevel(logging.INFO)

0 commit comments

Comments
 (0)