Compare commits

..

5 Commits

14 changed files with 716 additions and 266 deletions

View File

@@ -12,6 +12,7 @@ import shlex
import shutil
import signal
import string
import socket
import subprocess
import sys
import traceback
@@ -24,7 +25,7 @@ from functools import partial
from os.path import basename
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple, List
from typing import Text, Optional, Any, Tuple, List, Dict, Mapping, Union
import attr
import six
@@ -368,6 +369,8 @@ def get_task_container(session, task_id):
container = result.json()['data']['tasks'][0]['container'] if result.ok else {}
if container.get('arguments'):
container['arguments'] = shlex.split(str(container.get('arguments')).strip())
if container.get('image'):
container['image'] = container.get('image').strip()
except (ValueError, TypeError):
container = {}
else:
@@ -635,6 +638,8 @@ class Worker(ServiceCommandSection):
_docker_fixed_user_cache = '/clearml_agent_cache'
_temp_cleanup_list = []
hostname_task_runtime_prop = "_exec_agent_hostname"
@property
def service(self):
""" Worker command service endpoint """
@@ -850,6 +855,20 @@ class Worker(ServiceCommandSection):
# "Running task '{}'".format(task_id)
print(self._task_logging_start_message.format(task_id))
task_session = task_session or self._session
# noinspection PyBroadException
try:
res = task_session.send_request(
service='tasks', action='edit', method=Request.def_method,
json={
"task": task_id, "force": True, "runtime": {self.hostname_task_runtime_prop: socket.gethostname()}
},
)
if not res.ok:
raise Exception("failed setting runtime property")
except Exception as ex:
print("Warning: failed obtaining/setting hostname for task '{}': {}".format(task_id, ex))
# set task status to in_progress so we know it was popped from the queue
# noinspection PyBroadException
try:
@@ -2352,6 +2371,7 @@ class Worker(ServiceCommandSection):
raise CommandFailedError("Cloning failed")
else:
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
# noinspection PyBroadException
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
@@ -3820,6 +3840,60 @@ class Worker(ServiceCommandSection):
pass
return results
@staticmethod
def _resolve_docker_env_args(docker_args):
# type: (List[str]) -> List[str]
"""
Resolve -e / --env docker environment args matching $VAR or ${VAR} from the host environment
:argument docker_args: List of docker argument strings (flags and values)
"""
non_list_args = (
"rm", "read-only", "sig-proxy", "tty", "privileged", "publish-all", "interactive", "init", "help", "detach"
)
non_list_args_single = (
"t", "P", "i", "d",
)
# if no filtering, do nothing
if not docker_args:
return docker_args
args = docker_args[:]
skip_arg = False
for i, cmd in enumerate(docker_args):
if skip_arg and not cmd.startswith("-"):
continue
skip_arg = False
if cmd.startswith("--"):
# jump over single command
if cmd[2:] in non_list_args:
continue
elif cmd.startswith("-"):
# jump over single character non args
if cmd[1:] in non_list_args_single:
continue
# if we are here we have a command to bypass and the list after it
if cmd in ('-e', '--env'):
skip_arg = True
for j in range(i+1, len(args)):
if args[j].startswith("-"):
break
parts = args[j].split("=", 1)
if len(parts) != 2:
continue
args[j] = "{}={}".format(parts[0], os.path.expandvars(parts[1]))
elif cmd.startswith("-"):
skip_arg = True
return args
def _get_docker_cmd(
self,
worker_id, parent_worker_id,
@@ -3883,9 +3957,14 @@ class Worker(ServiceCommandSection):
docker_arguments = list(docker_arguments) \
if isinstance(docker_arguments, (list, tuple)) else [docker_arguments]
docker_arguments = self._filter_docker_args(docker_arguments)
if self._session.config.get("agent.docker_allow_host_environ", None):
docker_arguments = self._resolve_docker_env_args(docker_arguments)
base_cmd += [a for a in docker_arguments if a]
if extra_docker_arguments:
# we always resolve environments in the `extra_docker_arguments` becuase the admin set them (not users)
extra_docker_arguments = self._resolve_docker_env_args(extra_docker_arguments)
extra_docker_arguments = [extra_docker_arguments] \
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
base_cmd += [str(a) for a in extra_docker_arguments if a]
@@ -3961,7 +4040,7 @@ class Worker(ServiceCommandSection):
base_cmd += ['-e', 'CLEARML_WORKER_ID='+worker_id, ]
# update the docker image, so the system knows where it runs
base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={} {}'.format(docker_image, ' '.join(docker_arguments or [])).strip()]
base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={}'.format(docker_image)]
if env_task_id:
base_cmd += ['-e', 'CLEARML_TASK_ID={}'.format(env_task_id), ]

View File

@@ -152,6 +152,7 @@ WORKING_STANDALONE_DIR = "code"
DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
PIP_EXTRA_INDICES = []
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
ENV_PIP_EXTRA_INSTALL_FLAGS = EnvironmentConfig("CLEARML_EXTRA_PIP_INSTALL_FLAGS", type=list)
ENV_DOCKER_IMAGE = EnvironmentConfig("CLEARML_DOCKER_IMAGE", "TRAINS_DOCKER_IMAGE")
ENV_WORKER_ID = EnvironmentConfig("CLEARML_WORKER_ID", "TRAINS_WORKER_ID")
ENV_WORKER_TAGS = EnvironmentConfig("CLEARML_WORKER_TAGS")

View File

@@ -0,0 +1,15 @@
from threading import Thread
from clearml_agent.session import Session
class K8sDaemon(Thread):
def __init__(self, agent):
super(K8sDaemon, self).__init__(target=self.target)
self.daemon = True
self._agent = agent
self.log = agent.log
self._session: Session = agent._session
def target(self):
pass

View File

@@ -0,0 +1,12 @@
class GetPodsError(Exception):
pass
class GetJobsError(Exception):
pass
class GetPodCountError(Exception):
pass

View File

@@ -13,7 +13,6 @@ from collections import defaultdict, namedtuple
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from threading import Thread
from time import sleep, time
from typing import Text, List, Callable, Any, Collection, Optional, Union, Iterable, Dict, Tuple, Set
@@ -28,8 +27,11 @@ from clearml_agent.definitions import (
ENV_AGENT_GIT_PASS,
ENV_FORCE_SYSTEM_SITE_PACKAGES,
)
from clearml_agent.errors import APIError
from clearml_agent.errors import APIError, UsageError
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
from clearml_agent.glue.errors import GetPodCountError
from clearml_agent.glue.utilities import get_path, get_bash_output
from clearml_agent.glue.pending_pods_daemon import PendingPodsDaemon
from clearml_agent.helper.base import safe_remove_file
from clearml_agent.helper.dicts import merge_dicts
from clearml_agent.helper.process import get_bash_output, stringify_bash_output
@@ -38,6 +40,7 @@ from clearml_agent.interface.base import ObjectID
class K8sIntegration(Worker):
SUPPORTED_KIND = ("pod", "job")
K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
@@ -46,12 +49,6 @@ class K8sIntegration(Worker):
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_CLEANUP_DELETE_CMD = "kubectl delete pods " \
"-l={agent_label} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace} " \
"--output name"
BASH_INSTALL_SSH_CMD = [
"apt-get update",
"apt-get install -y openssh-server",
@@ -82,7 +79,7 @@ class K8sIntegration(Worker):
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"{extra_bash_init_cmd}",
"$LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
"[ ! -z $CLEARML_AGENT_NO_UPDATE ] || $LOCAL_PYTHON -m pip install clearml-agent{agent_install_args}",
"{extra_docker_bash_script}",
"$LOCAL_PYTHON -m clearml_agent execute {default_execution_agent_args} --id {task_id}"
]
@@ -136,6 +133,10 @@ class K8sIntegration(Worker):
:param int max_pods_limit: Maximum number of pods that K8S glue can run at the same time
"""
super(K8sIntegration, self).__init__()
self.kind = os.environ.get("CLEARML_K8S_GLUE_KIND", "pod").strip().lower()
if self.kind not in self.SUPPORTED_KIND:
raise UsageError(f"Kind '{self.kind}' not supported (expected {','.join(self.SUPPORTED_KIND)})")
self.using_jobs = self.kind == "job"
self.pod_name_prefix = pod_name_prefix or self.DEFAULT_POD_NAME_PREFIX
self.limit_pod_label = limit_pod_label or self.DEFAULT_LIMIT_POD_LABEL
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
@@ -180,11 +181,18 @@ class K8sIntegration(Worker):
self._agent_label = None
self._monitor_hanging_pods()
self._pending_pods_daemon = self._create_pending_pods_daemon(
cls_=PendingPodsDaemon,
polling_interval=self._polling_interval
)
self._pending_pods_daemon.start()
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
def _create_pending_pods_daemon(self, cls_, **kwargs):
return cls_(agent=self, **kwargs)
def _load_overrides_yaml(self, overrides_yaml):
if not overrides_yaml:
return
@@ -210,26 +218,33 @@ class K8sIntegration(Worker):
self.log.warning('Removing containers section: {}'.format(overrides['spec'].pop('containers')))
self.overrides_json_string = json.dumps(overrides)
def _monitor_hanging_pods(self):
_check_pod_thread = Thread(target=self._monitor_hanging_pods_daemon)
_check_pod_thread.daemon = True
_check_pod_thread.start()
@staticmethod
def _load_template_file(path):
with open(os.path.expandvars(os.path.expanduser(str(path))), 'rt') as f:
return yaml.load(f, Loader=getattr(yaml, 'FullLoader', None))
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None):
# type: (str, Iterable[str], Iterable[str], str, Iterable[str]) -> Dict
if not labels:
@staticmethod
def _get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
def _get_kubectl_options(self, command, extra_labels=None, filters=None, output="json", labels=None, ns=None):
# type: (str, Iterable[str], Iterable[str], str, Iterable[str], str) -> Dict
if labels is False:
labels = []
elif not labels:
labels = [self._get_agent_label()]
labels = list(labels) + (list(extra_labels) if extra_labels else [])
d = {
"-l": ",".join(labels),
"-n": str(self.namespace),
"-n": ns or str(self.namespace),
"-o": output,
}
if labels:
d["-l"] = ",".join(labels)
if filters:
d["--field-selector"] = ",".join(filters)
return d
@@ -240,132 +255,6 @@ class K8sIntegration(Worker):
command=command, opts=" ".join(x for item in opts.items() for x in item)
)
def _monitor_hanging_pods_daemon(self):
last_tasks_msgs = {} # last msg updated for every task
while True:
kubectl_cmd = self.get_kubectl_command("get pods", filters=["status.phase=Pending"])
self.log.debug("Detecting hanging pods: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
sleep(self._polling_interval)
continue
pods = output_config.get('items', [])
task_id_to_details = dict()
for pod in pods:
pod_name = pod.get('metadata', {}).get('name', None)
if not pod_name:
continue
task_id = pod_name.rpartition('-')[-1]
if not task_id:
continue
namespace = pod.get('metadata', {}).get('namespace', None)
if not namespace:
continue
task_id_to_details[task_id] = (pod_name, namespace)
msg = None
waiting = self._get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = self._get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
delete_pod_cmd = 'kubectl delete pods {} -n {}'.format(pod_name, namespace)
self.log.debug(" - deleting pod due to ImagePullBackOff: {}".format(delete_pod_cmd))
get_bash_output(delete_pod_cmd)
try:
self.log.debug(" - Detecting hanging pods: {}".format(kubectl_cmd))
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue",
force=True
)
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
# clean up any msg for this task
last_tasks_msgs.pop(task_id, None)
continue
if msg and last_tasks_msgs.get(task_id, None) != msg:
try:
result = self._session.send_request(
service='tasks',
action='update',
json={"task": task_id, "status_message": "K8S glue status: {}".format(msg)},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = self._get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)
if task_id_to_details:
try:
result = self._session.get(
service='tasks',
action='get_all',
json={"id": list(task_id_to_details), "status": ["stopped"], "only_fields": ["id"]},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod_name, namespace = task_id_to_details.get(task_id)
if not pod_name:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
self.log.info(
"K8S Glue pods monitor: task {} was aborted by its pod {} is still pending, "
"deleting pod".format(task_id, pod_name)
)
kubectl_cmd = "kubectl delete pod {pod_name} --output name {namespace}".format(
namespace=f"--namespace={namespace}" if namespace else "", pod_name=pod_name,
).strip()
self.log.debug("Deleting aborted task pending pod: {}".format(kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
if not output:
self.log.warning("K8S Glue pods monitor: failed deleting pod {}".format(pod_name))
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: failed checking aborted tasks for hanging pods: {}'.format(ex)
)
# clean up any last message for a task that wasn't seen as a pod
last_tasks_msgs = {k: v for k, v in last_tasks_msgs.items() if k in task_id_to_details}
sleep(self._polling_interval)
def _set_task_user_properties(self, task_id: str, task_session=None, **properties: str):
session = task_session or self._session
if self._edit_hyperparams_support is not True:
@@ -465,6 +354,69 @@ class K8sIntegration(Worker):
except Exception as ex:
print("ERROR: Failed getting tenant for task session: {}".format(ex))
def get_jobs_info(self, info_path: str, condition: str = None, namespace=None, debug_msg: str = None)\
-> Dict[str, str]:
cond = "==".join((x.strip("=") for x in condition.partition("=")[::2]))
output = f"jsonpath='{{range .items[?(@.{cond})]}}{{@.{info_path}}}{{\" \"}}{{@.metadata.namespace}}{{\"\\n\"}}{{end}}'"
kubectl_cmd = self.get_kubectl_command("get job", output=output, ns=namespace)
if debug_msg:
self.log.debug(debug_msg.format(cmd=kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
output = output.strip("'") # for Windows debugging :(
try:
data_items = dict(l.strip().partition(" ")[::2] for l in output.splitlines())
return data_items
except Exception as ex:
self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
def get_pods_for_jobs(self, job_condition: str = None, pod_filters: List[str] = None, debug_msg: str = None):
controller_uids = self.get_jobs_info(
"spec.selector.matchLabels.controller-uid", condition=job_condition, debug_msg=debug_msg
)
if not controller_uids:
# No pods were found for these jobs
return []
pods = self.get_pods(filters=pod_filters, debug_msg=debug_msg)
return [
pod for pod in pods
if get_path(pod, "metadata", "labels", "controller-uid") in controller_uids
]
def get_pods(self, filters: List[str] = None, debug_msg: str = None):
kubectl_cmd = self.get_kubectl_command(
"get pods",
filters=filters,
labels=False if self.using_jobs else None,
)
if debug_msg:
self.log.debug(debug_msg.format(cmd=kubectl_cmd))
output = stringify_bash_output(get_bash_output(kubectl_cmd))
try:
output_config = json.loads(output)
except Exception as ex:
self.log.warning('Failed parsing kubectl output:\n{}\nEx: {}'.format(output, ex))
return
return output_config.get('items', [])
def _get_pod_count(self, extra_labels: List[str] = None, msg: str = None):
kubectl_cmd_new = self.get_kubectl_command(
f"get {self.kind}s",
extra_labels= extra_labels
)
self.log.debug("{}{}".format((msg + ": ") if msg else "", kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = stringify_bash_output(output)
error = stringify_bash_output(error)
try:
return len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\nEx: {}".format(output, ex)
)
raise GetPodCountError()
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session
@@ -476,6 +428,12 @@ class K8sIntegration(Worker):
print('Pushing task {} into temporary pending queue'.format(task_id))
_ = session.api_client.tasks.stop(task_id, force=True)
# Just make sure to clean up in case the task is stuck in the queue (known issue)
self._session.api_client.queues.remove_task(
task=task_id,
queue=self.k8s_pending_queue_id,
)
res = self._session.api_client.tasks.enqueue(
task_id,
queue=self.k8s_pending_queue_id,
@@ -515,14 +473,14 @@ class K8sIntegration(Worker):
hocon_config_encoded = config_content.encode("ascii")
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
clearml_conf_create_script = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
)]
if task_session:
create_clearml_conf.append(
clearml_conf_create_script.append(
"export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format(
base64.b64encode(task_session.token.encode("ascii")).decode('ascii')
)
@@ -543,23 +501,15 @@ class K8sIntegration(Worker):
while self.ports_mode or self.max_pods_limit:
pod_number = self.base_pod_num + pod_count
kubectl_cmd_new = self.get_kubectl_command(
"get pods",
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None
)
self.log.debug("Looking for a free pod/port: {}".format(kubectl_cmd_new))
process = subprocess.Popen(kubectl_cmd_new.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
output = stringify_bash_output(output)
error = stringify_bash_output(error)
try:
items_count = len(json.loads(output).get("items", []))
except (ValueError, TypeError) as ex:
items_count = self._get_pod_count(
extra_labels=[self.limit_pod_label.format(pod_number=pod_number)] if self.ports_mode else None,
msg="Looking for a free pod/port"
)
except GetPodCountError:
self.log.warning(
"K8S Glue pods monitor: Failed parsing kubectl output:\n{}\ntask '{}' "
"will be enqueued back to queue '{}'\nEx: {}".format(
output, task_id, queue, ex
"K8S Glue pods monitor: task '{}' will be enqueued back to queue '{}'".format(
task_id, queue
)
)
session.api_client.tasks.stop(task_id, force=True)
@@ -583,8 +533,6 @@ class K8sIntegration(Worker):
if current_pod_count >= max_count:
# All pods are taken, exit
self.log.debug(
"kubectl last result: {}\n{}".format(error, output))
self.log.warning(
"All k8s services are in use, task '{}' "
"will be enqueued back to queue '{}'".format(
@@ -629,7 +577,7 @@ class K8sIntegration(Worker):
output, error = self._kubectl_apply(
template=template,
pod_number=pod_number,
create_clearml_conf=create_clearml_conf,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
@@ -639,11 +587,11 @@ class K8sIntegration(Worker):
namespace=namespace,
)
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
@@ -704,32 +652,10 @@ class K8sIntegration(Worker):
return {target: results} if results else {}
return results
def _kubectl_apply(
self,
create_clearml_conf,
docker_image,
docker_args,
docker_bash,
labels,
queue,
task_id,
namespace,
template=None,
pod_number=None
):
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
template.setdefault('metadata', {})
name = self.pod_name_prefix + str(task_id)
template['metadata']['name'] = name
template.setdefault('spec', {})
template['spec'].setdefault('containers', [])
template['spec'].setdefault('restartPolicy', 'Never')
if labels:
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
template['metadata']['labels'].update(labels_dict)
def _create_template_container(
self, pod_name: str, task_id: str, docker_image: str, docker_args: List[str],
docker_bash: str, clearml_conf_create_script: List[str]
) -> dict:
container = self._get_docker_args(
docker_args,
target="env",
@@ -753,7 +679,7 @@ class K8sIntegration(Worker):
agent_install_args=self.POD_AGENT_INSTALL_ARGS)
for line in container_bash_script])
extra_bash_commands = list(create_clearml_conf or [])
extra_bash_commands = list(clearml_conf_create_script or [])
start_agent_script_path = ENV_START_AGENT_SCRIPT_PATH.get() or "~/__start_agent__.sh"
@@ -767,20 +693,77 @@ class K8sIntegration(Worker):
)
# Notice: we always leave with exit code 0, so pods are never restarted
container = self._merge_containers(
return self._merge_containers(
container,
dict(name=name, image=docker_image,
dict(name=pod_name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))])
)
if template['spec']['containers']:
template['spec']['containers'][0] = self._merge_containers(template['spec']['containers'][0], container)
def _kubectl_apply(
self,
clearml_conf_create_script: List[str],
docker_image,
docker_args,
docker_bash,
labels,
queue,
task_id,
namespace,
template=None,
pod_number=None
):
if "apiVersion" not in template:
template["apiVersion"] = "batch/v1" if self.using_jobs else "v1"
if "kind" in template:
if template["kind"].lower() != self.kind:
return (
"", f"Template kind {template['kind']} does not maych kind {self.kind.capitalize()} set for agent"
)
else:
template['spec']['containers'].append(container)
template["kind"] = self.kind.capitalize()
metadata = template.setdefault('metadata', {})
name = self.pod_name_prefix + str(task_id)
metadata['name'] = name
def place_labels(metadata_dict):
labels_dict = dict(pair.split('=', 1) for pair in labels)
metadata_dict.setdefault('labels', {}).update(labels_dict)
if labels:
# Place labels on base resource (job or single pod)
place_labels(metadata)
spec = template.setdefault('spec', {})
if self.using_jobs:
spec.setdefault('backoffLimit', 0)
spec_template = spec.setdefault('template', {})
if labels:
# Place same labels fro any pod spawned by the job
place_labels(spec_template.setdefault('metadata', {}))
spec = spec_template.setdefault('spec', {})
containers = spec.setdefault('containers', [])
spec.setdefault('restartPolicy', 'Never')
container = self._create_template_container(
pod_name=name,
task_id=task_id,
docker_image=docker_image,
docker_args=docker_args,
docker_bash=docker_bash,
clearml_conf_create_script=clearml_conf_create_script
)
if containers:
containers[0] = self._merge_containers(containers[0], container)
else:
containers.append(container)
if self._docker_force_pull:
for c in template['spec']['containers']:
for c in containers:
c.setdefault('imagePullPolicy', 'Always')
fp, yaml_file = tempfile.mkstemp(prefix='clearml_k8stmpl_', suffix='.yml')
@@ -812,6 +795,83 @@ class K8sIntegration(Worker):
return stringify_bash_output(output), stringify_bash_output(error)
def _process_bash_lines_response(self, bash_cmd: str, raise_error=True):
res = get_bash_output(bash_cmd, raise_error=raise_error)
lines = [
line for line in
(r.strip().rpartition("/")[-1] for r in res.splitlines())
if line.startswith(self.pod_name_prefix)
]
return lines
def _delete_pods(self, selectors: List[str], namespace: str, msg: str = None) -> List[str]:
kubectl_cmd = \
"kubectl delete pod -l={agent_label} " \
"--namespace={namespace} --field-selector={selector} --output name".format(
selector=",".join(selectors),
agent_label=self._get_agent_label(),
namespace=namespace,
)
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
msg or "", namespace, kubectl_cmd
))
lines = self._process_bash_lines_response(kubectl_cmd)
self.log.debug(" - deleted pods %s", ", ".join(lines))
return lines
def _delete_jobs_by_names(self, names_to_ns: Dict[str, str], msg: str = None) -> List[str]:
if not names_to_ns:
return []
ns_to_names = defaultdict(list)
for name, ns in names_to_ns.items():
ns_to_names[ns].append(name)
results = []
for ns, names in ns_to_names.items():
kubectl_cmd = "kubectl delete job --namespace={ns} --output=name {names}".format(
ns=ns, names=" ".join(names)
)
self.log.debug("Deleting jobs {}: {}".format(
msg or "", kubectl_cmd
))
lines = self._process_bash_lines_response(kubectl_cmd)
if not lines:
continue
self.log.debug(" - deleted jobs %s", ", ".join(lines))
results.extend(lines)
return results
def _delete_completed_or_failed_pods(self, namespace, msg: str = None):
if not self.using_jobs:
return self._delete_pods(
selectors=["status.phase!=Pending", "status.phase!=Running"], namespace=namespace, msg=msg
)
job_names_to_delete = {}
# locate failed pods for jobs
failed_pods = self.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase!=Pending", "status.phase!=Running", "status.phase!=Terminating"],
debug_msg="Deleting failed pods: {cmd}"
)
if failed_pods:
job_names_to_delete = {
get_path(pod, "metadata", "labels", "job-name"): get_path(pod, "metadata", "namespace")
for pod in failed_pods
if get_path(pod, "metadata", "labels", "job-name")
}
self.log.debug(f" - found jobs with failed pods: {' '.join(job_names_to_delete)}")
completed_job_names = self.get_jobs_info(
"metadata.name", condition="status.succeeded=1", namespace=namespace, debug_msg=msg
)
if completed_job_names:
self.log.debug(f" - found completed jobs: {' '.join(completed_job_names)}")
job_names_to_delete.update(completed_job_names)
return self._delete_jobs_by_names(names_to_ns=job_names_to_delete, msg=msg)
def _cleanup_old_pods(self, namespaces, extra_msg=None):
# type: (Iterable[str], Optional[str]) -> Dict[str, List[str]]
self.log.debug("Cleaning up pods")
@@ -820,23 +880,12 @@ class K8sIntegration(Worker):
if time() - self._last_pod_cleanup_per_ns[namespace] < self._min_cleanup_interval_per_ns_sec:
# Do not try to cleanup the same namespace too quickly
continue
kubectl_cmd = self.KUBECTL_CLEANUP_DELETE_CMD.format(
namespace=namespace, agent_label=self._get_agent_label()
)
self.log.debug("Deleting old/failed pods{} for ns {}: {}".format(
extra_msg or "", namespace, kubectl_cmd
))
try:
res = get_bash_output(kubectl_cmd, raise_error=True)
lines = [
line for line in
(r.strip().rpartition("/")[-1] for r in res.splitlines())
if line.startswith(self.pod_name_prefix)
]
self.log.debug(" - deleted pod(s) %s", ", ".join(lines))
deleted_pods[namespace].extend(lines)
res = self._delete_completed_or_failed_pods(namespace, extra_msg)
deleted_pods[namespace].extend(res)
except Exception as ex:
self.log.error("Failed deleting old/failed pods for ns %s: %s", namespace, str(ex))
self.log.error("Failed deleting completed/failed pods for ns %s: %s", namespace, str(ex))
finally:
self._last_pod_cleanup_per_ns[namespace] = time()
@@ -857,7 +906,7 @@ class K8sIntegration(Worker):
)
tasks_to_abort = result["tasks"]
except Exception as ex:
self.log.warning('Failed getting running tasks for deleted pods: {}'.format(ex))
self.log.warning('Failed getting running tasks for deleted {}(s): {}'.format(self.kind, ex))
for task in tasks_to_abort:
task_id = task.get("id")
@@ -870,15 +919,27 @@ class K8sIntegration(Worker):
self._session.get(
service='tasks',
action='dequeue',
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
json={
"task": task_id,
"force": True,
"status_reason": "Pod deleted (not pending or running)",
"status_message": "{} deleted by agent {}".format(
self.kind.capitalize(), self.worker_id or "unknown"
)
},
method=Request.def_method,
)
self._session.get(
service='tasks',
action='failed',
json={"task": task_id, "force": True, "status_reason": "Pod deleted (not pending or running)",
"status_message": "Pod deleted by agent {}".format(self.worker_id or "unknown")},
json={
"task": task_id,
"force": True,
"status_reason": "Pod deleted (not pending or running)",
"status_message": "{} deleted by agent {}".format(
self.kind.capitalize(), self.worker_id or "unknown"
)
},
method=Request.def_method,
)
except Exception as ex:
@@ -919,10 +980,10 @@ class K8sIntegration(Worker):
# check if have pod limit, then check if we hit it.
if self.max_pods_limit:
if current_pods >= self.max_pods_limit:
print("Maximum pod limit reached {}/{}, sleeping for {:.1f} seconds".format(
current_pods, self.max_pods_limit, self._polling_interval))
print("Maximum {} limit reached {}/{}, sleeping for {:.1f} seconds".format(
self.kind, current_pods, self.max_pods_limit, self._polling_interval))
# delete old completed / failed pods
self._cleanup_old_pods(namespaces, " due to pod limit")
self._cleanup_old_pods(namespaces, f" due to {self.kind} limit")
# go to sleep
sleep(self._polling_interval)
continue
@@ -930,7 +991,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
self._cleanup_old_pods(namespaces)
self._cleanup_old_pods(namespaces, extra_msg="Cleanup cycle {cmd}")
# get next task in queue
try:

View File

@@ -0,0 +1,223 @@
from time import sleep
from typing import Dict, Tuple, Optional, List
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.utilities import get_bash_output
from clearml_agent.helper.process import stringify_bash_output
from .daemon import K8sDaemon
from .utilities import get_path
from .errors import GetPodsError
class PendingPodsDaemon(K8sDaemon):
def __init__(self, polling_interval: float, agent):
super(PendingPodsDaemon, self).__init__(agent=agent)
self._polling_interval = polling_interval
self._last_tasks_msgs = {} # last msg updated for every task
def get_pods(self):
if self._agent.using_jobs:
return self._agent.get_pods_for_jobs(
job_condition="status.active=1",
pod_filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
return self._agent.get_pods(
filters=["status.phase=Pending"],
debug_msg="Detecting pending pods: {cmd}"
)
def _get_pod_name(self, pod: dict):
return get_path(pod, "metadata", "name")
def _get_k8s_resource_name(self, pod: dict):
if self._agent.using_jobs:
return get_path(pod, "metadata", "labels", "job-name")
return get_path(pod, "metadata", "name")
def _get_task_id(self, pod: dict):
return self._get_k8s_resource_name(pod).rpartition('-')[-1]
@staticmethod
def _get_k8s_resource_namespace(pod: dict):
return pod.get('metadata', {}).get('namespace', None)
def target(self):
"""
Handle pending objects (pods or jobs, depending on the agent mode).
- Delete any pending objects that are not expected to recover
- Delete any pending objects for whom the associated task was aborted
"""
while True:
# noinspection PyBroadException
try:
# Get pods (standalone pods if we're in pods mode, or pods associated to jobs if we're in jobs mode)
pods = self.get_pods()
if pods is None:
raise GetPodsError()
task_id_to_pod = dict()
for pod in pods:
pod_name = self._get_pod_name(pod)
if not pod_name:
continue
task_id = self._get_task_id(pod)
if not task_id:
continue
namespace = self._get_k8s_resource_namespace(pod)
if not namespace:
continue
task_id_to_pod[task_id] = pod
msg = None
tags = []
waiting = get_path(pod, 'status', 'containerStatuses', 0, 'state', 'waiting')
if not waiting:
condition = get_path(pod, 'status', 'conditions', 0)
if condition:
reason = condition.get('reason')
if reason == 'Unschedulable':
message = condition.get('message')
msg = reason + (" ({})".format(message) if message else "")
else:
reason = waiting.get("reason", None)
message = waiting.get("message", None)
msg = reason + (" ({})".format(message) if message else "")
if reason == 'ImagePullBackOff':
self.delete_k8s_resource(k8s_resource=pod, msg=reason)
try:
self._session.api_client.tasks.failed(
task=task_id,
status_reason="K8S glue error: {}".format(msg),
status_message="Changed by K8S glue",
force=True
)
self._agent.command.send_logs(
task_id, ["K8S Error: {}".format(msg)],
session=self._session
)
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: Failed deleting task "{}"\nEX: {}'.format(task_id, ex)
)
# clean up any msg for this task
self._last_tasks_msgs.pop(task_id, None)
continue
self._update_pending_task_msg(task_id, msg, tags)
if task_id_to_pod:
self._process_tasks_for_pending_pods(task_id_to_pod)
# clean up any last message for a task that wasn't seen as a pod
self._last_tasks_msgs = {k: v for k, v in self._last_tasks_msgs.items() if k in task_id_to_pod}
except GetPodsError:
pass
except Exception:
self.log.exception("Hanging pods daemon loop")
sleep(self._polling_interval)
def delete_k8s_resource(self, k8s_resource: dict, msg: str = None):
delete_cmd = "kubectl delete {kind} {name} -n {namespace} --output name".format(
kind=self._agent.kind,
name=self._get_k8s_resource_name(k8s_resource),
namespace=self._get_k8s_resource_namespace(k8s_resource)
).strip()
self.log.debug(" - deleting {} {}: {}".format(self._agent.kind, (" " + msg) if msg else "", delete_cmd))
return get_bash_output(delete_cmd).strip()
def _process_tasks_for_pending_pods(self, task_id_to_details: Dict[str, dict]):
self._handle_aborted_tasks(task_id_to_details)
def _handle_aborted_tasks(self, pending_tasks_details: Dict[str, dict]):
try:
result = self._session.get(
service='tasks',
action='get_all',
json={
"id": list(pending_tasks_details),
"status": ["stopped"],
"only_fields": ["id"]
},
method=Request.def_method,
async_enable=False,
)
aborted_task_ids = list(filter(None, (task.get("id") for task in result["tasks"])))
for task_id in aborted_task_ids:
pod = pending_tasks_details.get(task_id)
if not pod:
self.log.error("Failed locating aborted task {} in pending pods list".format(task_id))
continue
resource_name = self._get_k8s_resource_name(pod)
self.log.info(
"K8S Glue pending monitor: task {} was aborted but the k8s resource {} is still pending, "
"deleting pod".format(task_id, resource_name)
)
output = self.delete_k8s_resource(k8s_resource=pod, msg="Pending resource of an aborted task")
if not output:
self.log.warning("K8S Glue pending monitor: failed deleting resource {}".format(resource_name))
except Exception as ex:
self.log.warning(
'K8S Glue pending monitor: failed checking aborted tasks for pending resources: {}'.format(ex)
)
def _update_pending_task_msg(self, task_id: str, msg: str, tags: List[str] = None):
if not msg or self._last_tasks_msgs.get(task_id, None) == (msg, tags):
return
try:
# Make sure the task is queued
result = self._session.send_request(
service='tasks',
action='get_all',
json={"id": task_id, "only_fields": ["status"]},
method=Request.def_method,
async_enable=False,
)
if result.ok:
status = get_path(result.json(), 'data', 'tasks', 0, 'status')
# if task is in progress, change its status to enqueued
if status == "in_progress":
result = self._session.send_request(
service='tasks', action='enqueue',
json={
"task": task_id, "force": True, "queue": self._agent.k8s_pending_queue_id
},
method=Request.def_method,
async_enable=False,
)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
self.log.debug(
"K8S Glue pods monitor: failed forcing task status change"
" for pending task {}: {}".format(task_id, result_msg)
)
# Update task status message
payload = {"task": task_id, "status_message": "K8S glue status: {}".format(msg)}
if tags:
payload["tags"] = tags
result = self._session.send_request('tasks', 'update', json=payload, method=Request.def_method)
if not result.ok:
result_msg = get_path(result.json(), 'meta', 'result_msg')
raise Exception(result_msg or result.text)
# update last msg for this task
self._last_tasks_msgs[task_id] = msg
except Exception as ex:
self.log.warning(
'K8S Glue pods monitor: Failed setting status message for task "{}"\nMSG: {}\nEX: {}'.format(
task_id, msg, ex
)
)

View File

@@ -0,0 +1,18 @@
import functools
from subprocess import DEVNULL
from clearml_agent.helper.process import get_bash_output as _get_bash_output
def get_path(d, *path, default=None):
try:
return functools.reduce(
lambda a, b: a[b], path, d
)
except (IndexError, KeyError):
return default
def get_bash_output(cmd, stderr=DEVNULL, raise_error=False):
return _get_bash_output(cmd, stderr=stderr, raise_error=raise_error)

View File

@@ -20,20 +20,22 @@ from typing import Text, Dict, Any, Optional, AnyStr, IO, Union
import attr
import furl
import six
import yaml
from attr import fields_dict
from pathlib2 import Path
import six
from six.moves import reduce
from clearml_agent.external import pyhocon
from clearml_agent.errors import CommandFailedError
from clearml_agent.external import pyhocon
from clearml_agent.helper.dicts import filter_keys
pretty_lines = False
log = logging.getLogger(__name__)
use_powershell = os.getenv("CLEARML_AGENT_USE_POWERSHELL", None)
def which(cmd, path=None):
result = find_executable(cmd, path)
@@ -52,7 +54,7 @@ def select_for_platform(linux, windows):
def bash_c():
return 'bash -c' if not is_windows_platform() else 'cmd /c'
return 'bash -c' if not is_windows_platform() else ('powershell -Command' if use_powershell else 'cmd /c')
def return_list(arg):

View File

@@ -50,7 +50,7 @@ class PackageManager(object):
pass
@abc.abstractmethod
def freeze(self):
def freeze(self, freeze_full_environment=False):
pass
@abc.abstractmethod
@@ -141,8 +141,9 @@ class PackageManager(object):
@classmethod
def out_of_scope_install_package(cls, package_name, *args):
if PackageManager._selected_manager is not None:
# noinspection PyBroadException
try:
result = PackageManager._selected_manager._install(package_name, *args)
result = PackageManager._selected_manager.install_packages(package_name, *args)
if result not in (0, None, True):
return False
except Exception:
@@ -150,10 +151,11 @@ class PackageManager(object):
return True
@classmethod
def out_of_scope_freeze(cls):
def out_of_scope_freeze(cls, freeze_full_environment=False):
if PackageManager._selected_manager is not None:
# noinspection PyBroadException
try:
return PackageManager._selected_manager.freeze()
return PackageManager._selected_manager.freeze(freeze_full_environment)
except Exception:
pass
return []

View File

@@ -4,7 +4,7 @@ from itertools import chain
from pathlib import Path
from typing import Text, Optional
from clearml_agent.definitions import PIP_EXTRA_INDICES, PROGRAM_NAME
from clearml_agent.definitions import PIP_EXTRA_INDICES, PROGRAM_NAME, ENV_PIP_EXTRA_INSTALL_FLAGS
from clearml_agent.helper.package.base import PackageManager
from clearml_agent.helper.process import Argv, DEVNULL
from clearml_agent.session import Session
@@ -52,7 +52,7 @@ class SystemPip(PackageManager):
package,
'--dest', cache_dir,
'--no-deps',
) + self.install_flags()
) + self.download_flags()
)
def load_requirements(self, requirements):
@@ -65,13 +65,14 @@ class SystemPip(PackageManager):
def uninstall(self, package):
self.run_with_env(('uninstall', '-y', package))
def freeze(self):
def freeze(self, freeze_full_environment=False):
"""
pip freeze to all install packages except the running program
:return: Dict contains pip as key and pip's packages to install
:rtype: Dict[str: List[str]]
"""
packages = self.run_with_env(('freeze',), output=True).splitlines()
packages = self.run_with_env(
('freeze',) if not freeze_full_environment else ('freeze', '--all'), output=True).splitlines()
packages_without_program = [package for package in packages if PROGRAM_NAME not in package]
return {'pip': packages_without_program}
@@ -87,6 +88,11 @@ class SystemPip(PackageManager):
# make sure we are not running it with our own PYTHONPATH
env = dict(**os.environ)
env.pop('PYTHONPATH', None)
# Debug print
if self.session.debug_mode:
print(command)
return (command.get_output if output else command.check_call)(stdin=DEVNULL, env=env, **kwargs)
def _make_command(self, command):
@@ -97,4 +103,17 @@ class SystemPip(PackageManager):
self.indices_args = tuple(
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
)
extra_pip_flags = \
ENV_PIP_EXTRA_INSTALL_FLAGS.get() or \
self.session.config.get("agent.package_manager.extra_pip_install_flags", None)
return (self.indices_args + tuple(extra_pip_flags)) if extra_pip_flags else self.indices_args
def download_flags(self):
if self.indices_args is None:
self.indices_args = tuple(
chain.from_iterable(('--extra-index-url', x) for x in PIP_EXTRA_INDICES)
)
return self.indices_args

View File

@@ -147,7 +147,7 @@ class PoetryAPI(object):
any((self.path / indicator).exists() for indicator in self.INDICATOR_FILES)
)
def freeze(self):
def freeze(self, freeze_full_environment=False):
lines = self.config.run("show", cwd=str(self.path)).splitlines()
lines = [[p for p in line.split(' ') if p] for line in lines]
return {"pip": [parts[0]+'=='+parts[1]+' # '+' '.join(parts[2:]) for parts in lines]}

View File

@@ -7,7 +7,7 @@ from .requirements import SimpleSubstitution
class PriorityPackageRequirement(SimpleSubstitution):
name = ("cython", "numpy", "setuptools", )
name = ("cython", "numpy", "setuptools", "pip", )
optional_package_names = tuple()
def __init__(self, *args, **kwargs):
@@ -50,31 +50,39 @@ class PriorityPackageRequirement(SimpleSubstitution):
"""
# if we replaced setuptools, it means someone requested it, and since freeze will not contain it,
# we need to add it manually
if not self._replaced_packages or "setuptools" not in self._replaced_packages:
if not self._replaced_packages:
return list_of_requirements
try:
for k, lines in list_of_requirements.items():
# k is either pip/conda
if k not in ('pip', 'conda'):
continue
for i, line in enumerate(lines):
if not line or line.lstrip().startswith('#'):
continue
parts = [p for p in re.split(r'\s|=|\.|<|>|~|!|@|#', line) if p]
if not parts:
continue
# if we found setuptools, do nothing
if parts[0] == "setuptools":
return list_of_requirements
if "pip" in self._replaced_packages:
full_freeze = PackageManager.out_of_scope_freeze(freeze_full_environment=True)
# now let's look for pip
pips = [line for line in full_freeze.get("pip", []) if line.split("==")[0] == "pip"]
if pips and "pip" in list_of_requirements:
list_of_requirements["pip"] = [pips[0]] + list_of_requirements["pip"]
# if we are here it means we have not found setuptools
# we should add it:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["setuptools"]] + list_of_requirements["pip"]
if "setuptools" in self._replaced_packages:
try:
for k, lines in list_of_requirements.items():
# k is either pip/conda
if k not in ('pip', 'conda'):
continue
for i, line in enumerate(lines):
if not line or line.lstrip().startswith('#'):
continue
parts = [p for p in re.split(r'\s|=|\.|<|>|~|!|@|#', line) if p]
if not parts:
continue
# if we found setuptools, do nothing
if parts[0] == "setuptools":
return list_of_requirements
except Exception as ex: # noqa
return list_of_requirements
# if we are here it means we have not found setuptools
# we should add it:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["setuptools"]] + list_of_requirements["pip"]
except Exception as ex: # noqa
return list_of_requirements
return list_of_requirements

View File

@@ -1 +1 @@
__version__ = '1.5.3rc3'
__version__ = '1.5.3rc4'

View File

@@ -93,6 +93,9 @@ agent {
# extra_index_url: ["https://allegroai.jfrog.io/clearml/api/pypi/public/simple"]
extra_index_url: []
# additional flags to use when calling pip install, example: ["--use-deprecated=legacy-resolver", ]
# extra_pip_install_flags: []
# control the pytorch wheel resolving algorithm, options are: "pip", "direct"
# "pip" (default): would automatically detect the cuda version, and supply pip with the correct
# extra-index-url, based on pytorch.org tables
@@ -182,6 +185,7 @@ agent {
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# You can also pass host environments into the container with ["-e", "HOST_NAME=$HOST_NAME"]
# extra_docker_arguments: ["--ipc=host", "-v", "/mnt/host/data:/mnt/data"]
# optional shell script to run in docker when started before the experiment is started
@@ -192,6 +196,12 @@ agent {
# change to false to skip installation and decrease docker spin up time
# docker_install_opencv_libs: true
# Allow passing host environments into docker container with Task's docker container args
# Example "-e HOST_NAME=$HOST_NAME"
# NOTICE this might introduce security risk allowing access to keys/secret on the host machine1
# Use with care!
# docker_allow_host_environ: false
# set to true in order to force "docker pull" before running an experiment using a docker image.
# This makes sure the docker image is updated.
docker_force_pull: false