Only use file based locks for main task. Secondary tasks use traditional multiprocessing lock

This commit is contained in:
allegroai 2020-03-05 12:10:23 +02:00
parent da804ca75f
commit a2ecb2c75d
2 changed files with 50 additions and 2 deletions

View File

@ -36,7 +36,7 @@ from ...storage.helper import StorageError
from .access import AccessMixin from .access import AccessMixin
from .log import TaskHandler from .log import TaskHandler
from .repo import ScriptInfo from .repo import ScriptInfo
from ...config import config from ...config import config, PROC_MASTER_ID_ENV_VAR
class Task(IdObjectBase, AccessMixin, SetupUploadMixin): class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@ -995,3 +995,51 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
scroll = log_events.response.scroll_id scroll = log_events.response.scroll_id
return events_list return events_list
@property
def _edit_lock(self):
if self.__edit_lock:
return self.__edit_lock
if not PROC_MASTER_ID_ENV_VAR.get() or len(PROC_MASTER_ID_ENV_VAR.get().split(':')) < 2:
self.__edit_lock = RLock()
elif PROC_MASTER_ID_ENV_VAR.get().split(':')[1] == str(self.id):
# remove previous file lock instance, just in case.
filename = os.path.join(gettempdir(), 'trains_{}.lock'.format(self.id))
try:
os.unlink(filename)
except Exception:
pass
# create a new file based lock
self.__edit_lock = FileRLock(filename=filename)
else:
self.__edit_lock = RLock()
return self.__edit_lock
@_edit_lock.setter
def _edit_lock(self, value):
self.__edit_lock = value
@classmethod
def __update_master_pid_task(cls, pid=None, task=None):
pid = pid or os.getpid()
if not task:
PROC_MASTER_ID_ENV_VAR.set(str(pid) + ':')
else:
PROC_MASTER_ID_ENV_VAR.set(str(pid) + ':' + str(task.id))
# make sure we refresh the edit lock next time we need it,
task._edit_lock = None
@classmethod
def __get_master_id_task_id(cls):
master_task_id = PROC_MASTER_ID_ENV_VAR.get().split(':')
# we could not find a task ID, revert to old stub behaviour
if len(master_task_id) < 2 or not master_task_id[1]:
return None
return master_task_id[1]
@classmethod
def __is_subprocess(cls):
# notice this class function is called from Task.ExitHooks, do not rename/move it.
is_subprocess = PROC_MASTER_ID_ENV_VAR.get() and \
PROC_MASTER_ID_ENV_VAR.get().split(':')[0] != str(os.getpid())
return is_subprocess

View File

@ -35,7 +35,7 @@ from .binding.frameworks.tensorflow_bind import TensorflowBinding
from .binding.frameworks.xgboost_bind import PatchXGBoostModelIO from .binding.frameworks.xgboost_bind import PatchXGBoostModelIO
from .binding.joblib_bind import PatchedJoblib from .binding.joblib_bind import PatchedJoblib
from .binding.matplotlib_bind import PatchedMatplotlib from .binding.matplotlib_bind import PatchedMatplotlib
from .config import config, PROC_MASTER_ID_ENV_VAR, DEV_TASK_NO_REUSE from .config import config, DEV_TASK_NO_REUSE
from .config import running_remotely, get_remote_task_id from .config import running_remotely, get_remote_task_id
from .config.cache import SessionCache from .config.cache import SessionCache
from .debugging.log import LoggerRoot from .debugging.log import LoggerRoot