Skip to content

Commit 50bf516

Browse files
authored
core: add --detect_file_replacement flag and plumbing (redo) (#5546)
* core: add --detect_file_replacement flag and plumbing * core_plugin_test.py: add load_fast * ignore unrecognized accumulator kwargs in multiplexer test * data_ingester_test.py add detect_file_replacement flag
1 parent 7d2d687 commit 50bf516

File tree

9 files changed

+70
-11
lines changed

9 files changed

+70
-11
lines changed

tensorboard/backend/event_processing/data_ingester.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def __init__(self, flags):
6969
purge_orphaned_data=flags.purge_orphaned_data,
7070
max_reload_threads=flags.max_reload_threads,
7171
event_file_active_filter=_get_event_file_active_filter(flags),
72+
detect_file_replacement=flags.detect_file_replacement,
7273
)
7374
self._data_provider = data_provider.MultiplexerDataProvider(
7475
self._multiplexer, flags.logdir or flags.logdir_spec

tensorboard/backend/event_processing/data_ingester_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
class FakeFlags(object):
3333
def __init__(
3434
self,
35+
detect_file_replacement=None,
3536
generic_data="auto",
3637
logdir="",
3738
logdir_spec="",
@@ -45,6 +46,7 @@ def __init__(
4546
samples_per_plugin=None,
4647
window_title="",
4748
):
49+
self.detect_file_replacement = detect_file_replacement
4850
self.generic_data = generic_data
4951
self.logdir = logdir
5052
self.logdir_spec = logdir_spec

tensorboard/backend/event_processing/plugin_event_accumulator.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def __init__(
9999
tensor_size_guidance=None,
100100
purge_orphaned_data=True,
101101
event_file_active_filter=None,
102+
detect_file_replacement=None,
102103
):
103104
"""Construct the `EventAccumulator`.
104105
@@ -122,6 +123,9 @@ def __init__(
122123
event_file_active_filter: Optional predicate for determining whether an
123124
event file latest load timestamp should be considered active. If passed,
124125
this will enable multifile directory loading.
126+
detect_file_replacement: Optional boolean; if True, event file loading
127+
will try to detect when a file has been replaced with a new version
128+
that contains additional data, by monitoring the file size.
125129
"""
126130
size_guidance = dict(size_guidance or DEFAULT_SIZE_GUIDANCE)
127131
sizes = {}
@@ -155,7 +159,9 @@ def __init__(
155159
self._plugin_tag_lock = threading.Lock()
156160

157161
self.path = path
158-
self._generator = _GeneratorFromPath(path, event_file_active_filter)
162+
self._generator = _GeneratorFromPath(
163+
path, event_file_active_filter, detect_file_replacement
164+
)
159165
self._generator_mutex = threading.Lock()
160166

161167
self.purge_orphaned_data = purge_orphaned_data
@@ -639,23 +645,33 @@ def _GetPurgeMessage(
639645
)
640646

641647

642-
def _GeneratorFromPath(path, event_file_active_filter=None):
648+
def _GeneratorFromPath(
649+
path, event_file_active_filter=None, detect_file_replacement=None
650+
):
643651
"""Create an event generator for file or directory at given path string."""
644652
if not path:
645653
raise ValueError("path must be a valid string")
646654
if io_wrapper.IsSummaryEventsFile(path):
647-
return event_file_loader.EventFileLoader(path)
655+
return event_file_loader.EventFileLoader(path, detect_file_replacement)
648656
elif event_file_active_filter:
657+
loader_factory = (
658+
lambda path: event_file_loader.TimestampedEventFileLoader(
659+
path, detect_file_replacement
660+
)
661+
)
649662
return directory_loader.DirectoryLoader(
650663
path,
651-
event_file_loader.TimestampedEventFileLoader,
664+
loader_factory,
652665
path_filter=io_wrapper.IsSummaryEventsFile,
653666
active_filter=event_file_active_filter,
654667
)
655668
else:
669+
loader_factory = lambda path: event_file_loader.EventFileLoader(
670+
path, detect_file_replacement
671+
)
656672
return directory_watcher.DirectoryWatcher(
657673
path,
658-
event_file_loader.EventFileLoader,
674+
loader_factory,
659675
io_wrapper.IsSummaryEventsFile,
660676
)
661677

tensorboard/backend/event_processing/plugin_event_multiplexer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def __init__(
7777
purge_orphaned_data=True,
7878
max_reload_threads=None,
7979
event_file_active_filter=None,
80+
detect_file_replacement=None,
8081
):
8182
"""Constructor for the `EventMultiplexer`.
8283
@@ -98,6 +99,9 @@ def __init__(
9899
event_file_active_filter: Optional predicate for determining whether an
99100
event file latest load timestamp should be considered active. If passed,
100101
this will enable multifile directory loading.
102+
detect_file_replacement: Optional boolean; if True, event file loading
103+
will try to detect when a file has been replaced with a new version
104+
that contains additional data, by monitoring the file size.
101105
"""
102106
logger.info("Event Multiplexer initializing.")
103107
self._accumulators_mutex = threading.Lock()
@@ -111,6 +115,7 @@ def __init__(
111115
self.purge_orphaned_data = purge_orphaned_data
112116
self._max_reload_threads = max_reload_threads or 1
113117
self._event_file_active_filter = event_file_active_filter
118+
self._detect_file_replacement = detect_file_replacement
114119
if run_path_map is not None:
115120
logger.info(
116121
"Event Multplexer doing initialization load for %s",
@@ -159,6 +164,7 @@ def AddRun(self, path, name=None):
159164
tensor_size_guidance=self._tensor_size_guidance,
160165
purge_orphaned_data=self.purge_orphaned_data,
161166
event_file_active_filter=self._event_file_active_filter,
167+
detect_file_replacement=self._detect_file_replacement,
162168
)
163169
self._accumulators[name] = accumulator
164170
self._paths[name] = path

tensorboard/backend/event_processing/plugin_event_multiplexer_test.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,9 @@ def Reload(self):
9494

9595
def _GetFakeAccumulator(
9696
path,
97-
size_guidance=None,
98-
tensor_size_guidance=None,
99-
purge_orphaned_data=None,
100-
event_file_active_filter=None,
97+
**unused_kwargs,
10198
):
102-
del size_guidance, tensor_size_guidance, purge_orphaned_data # Unused.
103-
del event_file_active_filter # unused
99+
del unused_kwargs
104100
return _FakeAccumulator(path)
105101

106102

tensorboard/plugins/core/core_plugin.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,26 @@ def define_flags(self, parser):
646646
means keep all samples of that type. For instance "scalars=500,images=0"
647647
keeps 500 scalars and all images. Most users should not need to set this
648648
flag.\
649+
""",
650+
)
651+
652+
parser.add_argument(
653+
"--detect_file_replacement",
654+
metavar="BOOL",
655+
# Custom str-to-bool converter since regular bool() doesn't work.
656+
type=lambda v: {"true": True, "false": False}.get(v.lower(), v),
657+
choices=[True, False],
658+
default=None,
659+
help="""\
660+
[experimental] If true, this enables experimental support for detecting when
661+
event files are replaced with new versions that contain additional data. This is
662+
not needed in the normal case where new data is either appended to an existing
663+
file or written to a brand new file, but it arises, for example, when using
664+
rsync without the --inplace option, in which new versions of the original file
665+
are first written to a temporary file, then swapped into the final location.
666+
667+
This option is currently incompatible with --load_fast=true, and if passed will
668+
disable fast-loading mode. (default: false)\
649669
""",
650670
)
651671

@@ -683,6 +703,13 @@ def fix_flags(self, flags):
683703
)
684704
elif flags.host is not None and flags.bind_all:
685705
raise FlagsError("Must not specify both --host and --bind_all.")
706+
elif (
707+
flags.load_fast == "true" and flags.detect_file_replacement is True
708+
):
709+
raise FlagsError(
710+
"Must not specify both --load_fast=true and"
711+
"--detect_file_replacement=true"
712+
)
686713

687714
flags.path_prefix = flags.path_prefix.rstrip("/")
688715
if flags.path_prefix and not flags.path_prefix.startswith("/"):

tensorboard/plugins/core/core_plugin_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def __init__(
5353
grpc_data_provider="",
5454
host=None,
5555
inspect=False,
56+
load_fast="auto",
5657
logdir="",
5758
logdir_spec="",
5859
path_prefix="",
@@ -66,6 +67,7 @@ def __init__(
6667
self.grpc_data_provider = grpc_data_provider
6768
self.host = host
6869
self.inspect = inspect
70+
self.load_fast = load_fast
6971
self.logdir = logdir
7072
self.logdir_spec = logdir_spec
7173
self.path_prefix = path_prefix

tensorboard/program.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,13 @@ def _should_use_data_server(flags):
491491
"paths; falling back to slower Python-only load path."
492492
)
493493
return False
494+
if flags.detect_file_replacement is True:
495+
logger.info(
496+
"Note: --detect_file_replacement=true is not supported with "
497+
"--load_fast behavior; falling back to slower Python-only load "
498+
"path."
499+
)
500+
return False
494501
return True
495502

496503

tensorboard/program_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def test_should_use_data_server(self):
8686
def f(**kwargs):
8787
kwargs.setdefault("logdir", "")
8888
kwargs.setdefault("logdir_spec", "")
89+
kwargs.setdefault("detect_file_replacement", None)
8990
flags = argparse.Namespace()
9091
for k, v in kwargs.items():
9192
setattr(flags, k, v)
@@ -96,6 +97,7 @@ def f(**kwargs):
9697
self.assertTrue(f(logdir="logs/mnist/"))
9798
self.assertTrue(f(logdir="gs://logs"))
9899
self.assertFalse(f(logdir="notgs://logs"))
100+
self.assertFalse(f(logdir="foo", detect_file_replacement=True))
99101

100102

101103
class WerkzeugServerTest(tb_test.TestCase):

0 commit comments

Comments
 (0)