mirror of
https://github.com/clearml/clearml
synced 2025-03-03 10:42:00 +00:00
Kill repo/package detection thread on exit
This commit is contained in:
parent
b2c2002c40
commit
337e60a376
@ -1438,22 +1438,32 @@ class Task(_Task):
|
||||
|
||||
def _wait_for_repo_detection(self, timeout=None):
|
||||
# wait for detection repo sync
|
||||
if self._detect_repo_async_thread:
|
||||
with self._repo_detect_lock:
|
||||
if self._detect_repo_async_thread:
|
||||
try:
|
||||
if not self._detect_repo_async_thread:
|
||||
return
|
||||
with self._repo_detect_lock:
|
||||
if not self._detect_repo_async_thread:
|
||||
return
|
||||
try:
|
||||
if self._detect_repo_async_thread.is_alive():
|
||||
# if negative timeout, just kill the thread:
|
||||
if timeout is not None and timeout < 0:
|
||||
from .utilities.os.lowlevel import kill_thread
|
||||
kill_thread(self._detect_repo_async_thread)
|
||||
else:
|
||||
self.log.info('Waiting for repository detection and full package requirement analysis')
|
||||
self._detect_repo_async_thread.join(timeout=timeout)
|
||||
# because join has no return value
|
||||
if self._detect_repo_async_thread.is_alive():
|
||||
self.log.info('Waiting for repository detection and full package requirement analysis')
|
||||
self._detect_repo_async_thread.join(timeout=timeout)
|
||||
# because join has no return value
|
||||
if self._detect_repo_async_thread.is_alive():
|
||||
self.log.info('Repository and package analysis timed out ({} sec), '
|
||||
'giving up'.format(timeout))
|
||||
else:
|
||||
self.log.info('Finished repository detection and package analysis')
|
||||
self._detect_repo_async_thread = None
|
||||
except Exception:
|
||||
pass
|
||||
self.log.info('Repository and package analysis timed out ({} sec), '
|
||||
'giving up'.format(timeout))
|
||||
# fone waiting, kill the thread
|
||||
from .utilities.os.lowlevel import kill_thread
|
||||
kill_thread(self._detect_repo_async_thread)
|
||||
else:
|
||||
self.log.info('Finished repository detection and package analysis')
|
||||
self._detect_repo_async_thread = None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _summary_artifacts(self):
|
||||
# signal artifacts upload, and stop daemon
|
||||
@ -1505,6 +1515,9 @@ class Task(_Task):
|
||||
if not is_sub_process:
|
||||
self._wait_for_repo_detection(timeout=10.)
|
||||
|
||||
# kill the repo thread (negative timeout, do not wait), if it hasn't finished yet.
|
||||
self._wait_for_repo_detection(timeout=-1)
|
||||
|
||||
# wait for uploads
|
||||
print_done_waiting = False
|
||||
if wait_for_uploads and (BackendModel.get_num_results() > 0 or
|
||||
@ -1571,6 +1584,8 @@ class Task(_Task):
|
||||
except Exception:
|
||||
pass
|
||||
self._edit_lock = None
|
||||
# HACK FOR TRACE
|
||||
self.is_main_task()
|
||||
|
||||
@classmethod
|
||||
def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False):
|
||||
|
62
trains/utilities/os/lowlevel.py
Normal file
62
trains/utilities/os/lowlevel.py
Normal file
@ -0,0 +1,62 @@
|
||||
import ctypes
|
||||
import threading
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
# Nasty hack to raise exception for other threads
|
||||
def _lowlevel_async_raise(thread_obj, exception=None):
|
||||
NULL = 0
|
||||
found = False
|
||||
target_tid = 0
|
||||
for tid, tobj in threading._active.items():
|
||||
if tobj is thread_obj:
|
||||
found = True
|
||||
target_tid = tid
|
||||
break
|
||||
|
||||
if not found:
|
||||
# raise ValueError("Invalid thread object")
|
||||
return False
|
||||
|
||||
if not exception:
|
||||
exception = SystemExit()
|
||||
|
||||
if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
|
||||
target_tid = ctypes.c_ulong(target_tid)
|
||||
NULL = ctypes.c_ulong(NULL)
|
||||
else:
|
||||
target_tid = ctypes.c_long(target_tid)
|
||||
NULL = ctypes.c_long(NULL)
|
||||
|
||||
try:
|
||||
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, ctypes.py_object(exception))
|
||||
except:
|
||||
ret = 0
|
||||
|
||||
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
|
||||
if ret == 0:
|
||||
# raise ValueError("Invalid thread ID")
|
||||
return False
|
||||
elif ret > 1:
|
||||
# Huh? Why would we notify more than one threads?
|
||||
# Because we punch a hole into C level interpreter.
|
||||
# So it is better to clean up the mess.
|
||||
try:
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, NULL)
|
||||
except:
|
||||
pass
|
||||
# raise SystemError("PyThreadState_SetAsyncExc failed")
|
||||
return False
|
||||
|
||||
# print("Successfully set asynchronized exception for", target_tid)
|
||||
return True
|
||||
|
||||
|
||||
def kill_thread(thread_obj, wait=False):
|
||||
if not _lowlevel_async_raise(thread_obj, SystemExit()):
|
||||
return False
|
||||
|
||||
while wait and thread_obj.is_alive():
|
||||
time.sleep(0.1)
|
||||
return True
|
Loading…
Reference in New Issue
Block a user