Allow services mode to re-register (docker can kill it and not exit gracefully)

This commit is contained in:
allegroai 2020-06-01 16:34:33 +03:00
parent ce02385420
commit 54d9d77294
2 changed files with 17 additions and 9 deletions

View File

@ -676,6 +676,9 @@ class Worker(ServiceCommandSection):
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False)
# make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders
self._singleton()
@ -683,9 +686,6 @@ class Worker(ServiceCommandSection):
# check if we have the latest version
start_check_update_daemon()
self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False)
self.check(**kwargs)
self.log.debug("starting resource monitor thread")
print("Worker \"{}\" - ".format(self.worker_id), end='')
@ -2281,8 +2281,11 @@ class Worker(ServiceCommandSection):
else:
worker_name = '{}:cpu'.format(worker_name)
# if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup
self.worker_id, worker_slot = Singleton.register_instance(
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client)
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
allow_double=bool(self._services_mode) and bool(ENV_DOCKER_HOST_MOUNT.get()))
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))

View File

@ -38,7 +38,7 @@ class Singleton(object):
pass
@classmethod
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
"""
# Exit the process if another instance of us is using the same worker_id
@ -65,8 +65,9 @@ class Singleton(object):
f.write(bytes(os.getpid()))
f.flush()
try:
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
api_client=api_client)
ret = cls._register_instance(
unique_worker_id=unique_worker_id, worker_name=worker_name,
api_client=api_client, allow_double=allow_double)
except:
ret = None, None
@ -78,7 +79,7 @@ class Singleton(object):
return ret
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
@ -123,7 +124,11 @@ class Singleton(object):
continue
if uid == unique_worker_id:
return None, None
if allow_double:
warning('Instance with the same WORKER_ID [{}] was found on this machine. '
'We are ignoring it, make sure this not a mistake.'.format(unique_worker_id))
else:
return None, None
slots[slot] = uid