diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index 6f467b2..20b60c9 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -676,6 +676,9 @@ class Worker(ServiceCommandSection): # match previous behaviour when we validated queue names before everything else queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False)) + self._standalone_mode = kwargs.get('standalone_mode', False) + self._services_mode = kwargs.get('services_mode', False) + # make sure we only have a single instance, # also make sure we set worker_id properly and cache folders self._singleton() @@ -683,9 +686,6 @@ class Worker(ServiceCommandSection): # check if we have the latest version start_check_update_daemon() - self._standalone_mode = kwargs.get('standalone_mode', False) - self._services_mode = kwargs.get('services_mode', False) - self.check(**kwargs) self.log.debug("starting resource monitor thread") print("Worker \"{}\" - ".format(self.worker_id), end='') @@ -2281,8 +2281,11 @@ class Worker(ServiceCommandSection): else: worker_name = '{}:cpu'.format(worker_name) + # if we are running in services mode, we allow double register since + # docker-compose will kill instances before they cleanup self.worker_id, worker_slot = Singleton.register_instance( - unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client) + unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client, + allow_double=bool(self._services_mode) and bool(ENV_DOCKER_HOST_MOUNT.get())) if self.worker_id is None: error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id)) diff --git a/trains_agent/helper/singleton.py b/trains_agent/helper/singleton.py index 8507c9e..2963e56 100644 --- a/trains_agent/helper/singleton.py +++ b/trains_agent/helper/singleton.py @@ -38,7 +38,7 @@ class Singleton(object): pass @classmethod - def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None): + def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False): """ # Exit the process if another instance of us is using the same worker_id @@ -65,8 +65,9 @@ class Singleton(object): f.write(bytes(os.getpid())) f.flush() try: - ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name, - api_client=api_client) + ret = cls._register_instance( + unique_worker_id=unique_worker_id, worker_name=worker_name, + api_client=api_client, allow_double=allow_double) except: ret = None, None @@ -78,7 +79,7 @@ class Singleton(object): return ret @classmethod - def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None): + def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False): if cls.worker_id: return cls.worker_id, cls.instance_slot # make sure we have a unique name @@ -123,7 +124,11 @@ class Singleton(object): continue if uid == unique_worker_id: - return None, None + if allow_double: + warning('Instance with the same WORKER_ID [{}] was found on this machine. ' + 'We are ignoring it, make sure this not a mistake.'.format(unique_worker_id)) + else: + return None, None slots[slot] = uid