@@ -47,9 +47,8 @@ def _at_exit_callback(self) -> None:
47
47
try :
48
48
# Stop the data processing thread
49
49
self ._stop_data_logging_thread_event .set ()
50
- # Waits till queue is drained.
51
- self ._run_data_logging_thread .result ()
52
- self ._batch_logging_threadpool .shutdown (wait = False )
50
+ # Waits till logging queue is drained.
51
+ self ._batch_logging_thread .join ()
53
52
self ._batch_status_check_threadpool .shutdown (wait = False )
54
53
except Exception as e :
55
54
_logger .error (f"Encountered error while trying to finish logging: { e } " )
@@ -132,14 +131,10 @@ def __getstate__(self):
132
131
del state ["_run_data_logging_thread" ]
133
132
if "_stop_data_logging_thread_event" in state :
134
133
del state ["_stop_data_logging_thread_event" ]
135
- if "_batch_logging_threadpool " in state :
136
- del state ["_batch_logging_threadpool " ]
134
+ if "_batch_logging_thread " in state :
135
+ del state ["_batch_logging_thread " ]
137
136
if "_batch_status_check_threadpool" in state :
138
137
del state ["_batch_status_check_threadpool" ]
139
- if "_run_data_logging_thread" in state :
140
- del state ["_run_data_logging_thread" ]
141
- if "_stop_data_logging_thread_event" in state :
142
- del state ["_stop_data_logging_thread_event" ]
143
138
144
139
return state
145
140
@@ -158,7 +153,7 @@ def __setstate__(self, state):
158
153
self ._queue = Queue ()
159
154
self ._lock = threading .RLock ()
160
155
self ._is_activated = False
161
- self ._batch_logging_threadpool = None
156
+ self ._batch_logging_thread = None
162
157
self ._batch_status_check_threadpool = None
163
158
self ._stop_data_logging_thread_event = None
164
159
@@ -193,7 +188,6 @@ def log_batch_async(
193
188
)
194
189
195
190
self ._queue .put (batch )
196
-
197
191
operation_future = self ._batch_status_check_threadpool .submit (self ._wait_for_batch , batch )
198
192
return RunOperations (operation_futures = [operation_future ])
199
193
@@ -217,14 +211,17 @@ def activate(self) -> None:
217
211
self ._stop_data_logging_thread_event = threading .Event ()
218
212
219
213
# Keeping max_workers=1 so that there are no two threads
220
- self ._batch_logging_threadpool = ThreadPoolExecutor (max_workers = 1 )
221
-
222
- self ._batch_status_check_threadpool = ThreadPoolExecutor (max_workers = 10 )
223
-
224
- self ._run_data_logging_thread = self ._batch_logging_threadpool .submit (
225
- self ._logging_loop
226
- ) # concurrent.futures.Future[self._logging_loop]
214
+ self ._batch_logging_thread = threading .Thread (
215
+ target = self ._logging_loop ,
216
+ name = "MLflowAsyncLoggingLoop" ,
217
+ daemon = True ,
218
+ )
227
219
220
+ self ._batch_status_check_threadpool = ThreadPoolExecutor (
221
+ max_workers = 10 ,
222
+ thread_name_prefix = "MLflowAsyncLoggingStatusCheck" ,
223
+ )
224
+ self ._batch_logging_thread .start ()
228
225
atexit .register (self ._at_exit_callback )
229
226
230
227
self ._is_activated = True
0 commit comments