diff --git a/trains/binding/artifacts.py b/trains/binding/artifacts.py index 3ea29062..bade86ed 100644 --- a/trains/binding/artifacts.py +++ b/trains/binding/artifacts.py @@ -2,6 +2,7 @@ import hashlib import json import mimetypes import os +import pickle from six.moves.urllib.parse import quote from copy import deepcopy from datetime import datetime @@ -160,6 +161,9 @@ class Artifact(object): elif self.type == 'JSON': with open(local_file, 'rt') as f: self._object = json.load(f) + elif self.type == 'pickle': + with open(local_file, 'rb') as f: + self._object = pickle.load(f) local_file = Path(local_file) @@ -197,6 +201,8 @@ class Artifact(object): class Artifacts(object): + max_preview_size_bytes = 65536 + _flush_frequency_sec = 300. # notice these two should match _save_format = '.csv.gz' @@ -293,8 +299,8 @@ class Artifacts(object): self._unregister_request.add(name) self.flush() - def upload_artifact(self, name, artifact_object=None, metadata=None, delete_after_upload=False): - # type: (str, Optional[object], Optional[dict], bool) -> bool + def upload_artifact(self, name, artifact_object=None, metadata=None, delete_after_upload=False, auto_pickle=True): + # type: (str, Optional[object], Optional[dict], bool, bool) -> bool if not Session.check_min_api_version('2.3'): LoggerRoot.get_base_logger().warning('Artifacts not supported by your TRAINS-server version, ' 'please upgrade to the latest server version') @@ -303,6 +309,16 @@ class Artifacts(object): if name in self._artifacts_container: raise ValueError("Artifact by the name of {} is already registered, use register_artifact".format(name)) + # convert string to object if try is a file/folder (dont try to serialize long texts + if isinstance(artifact_object, six.string_types) and len(artifact_object) < 2048: + # noinspection PyBroadException + try: + artifact_path = Path(artifact_object) + if artifact_path.exists(): + artifact_object = artifact_path + except Exception: + pass + artifact_type_data = tasks.ArtifactTypeData() override_filename_in_uri = None override_filename_ext_in_uri = None @@ -347,19 +363,15 @@ class Artifacts(object): fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) os.write(fd, bytes(preview.encode())) os.close(fd) - artifact_type_data.preview = preview + if len(preview) < self.max_preview_size_bytes: + artifact_type_data.preview = preview + else: + artifact_type_data.preview = '# full json too large to store, storing first {}kb\n{}'.format( + len(preview)//1024, preview[:self.max_preview_size_bytes] + ) + delete_after_upload = True - elif ( - isinstance(artifact_object, six.string_types) - and urlparse(artifact_object).scheme in remote_driver_schemes - ): - # we should not upload this, just register - local_filename = None - uri = artifact_object - artifact_type = 'custom' - artifact_type_data.content_type = mimetypes.guess_type(artifact_object)[0] - elif isinstance( - artifact_object, six.string_types + (Path, pathlib_Path,) if pathlib_Path is not None else (Path,)): + elif isinstance(artifact_object, (Path, pathlib_Path,) if pathlib_Path is not None else (Path,)): # check if single file artifact_object = Path(artifact_object) @@ -420,6 +432,32 @@ class Artifacts(object): artifact_type = 'custom' artifact_type_data.content_type = mimetypes.guess_type(artifact_object)[0] local_filename = artifact_object + elif ( + isinstance(artifact_object, six.string_types) + and urlparse(artifact_object).scheme in remote_driver_schemes + ): + # we should not upload this, just register + local_filename = None + uri = artifact_object + artifact_type = 'custom' + artifact_type_data.content_type = mimetypes.guess_type(artifact_object)[0] + elif auto_pickle: + # if we are here it means we do not know what to do with the object, so we serialize it with pickle. + artifact_type = 'pickle' + artifact_type_data.content_type = 'application/pickle' + artifact_type_data.preview = str(artifact_object.__repr__())[:self.max_preview_size_bytes] + delete_after_upload = True + override_filename_ext_in_uri = '.pkl' + override_filename_in_uri = name + override_filename_ext_in_uri + fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri) + os.close(fd) + try: + with open(local_filename, 'wb') as f: + pickle.dump(artifact_object, f) + except Exception as ex: + # cleanup and raise exception + os.unlink(local_filename) + raise else: raise ValueError("Artifact type {} not supported".format(type(artifact_object))) diff --git a/trains/task.py b/trains/task.py index ca6de0ba..67b19206 100644 --- a/trains/task.py +++ b/trains/task.py @@ -80,10 +80,16 @@ class Task(_Task): For detailed information about creating Task objects, see the following methods: - - :meth:`Task.init` - Create a new reproducible Task, or reuse one. - - :meth:`Task.create` - Create a new non-reproducible Task. - - :meth:`Task.current_task` - Get the current running Task. - - :meth:`Task.get_task` - Get another Task (whose metadata the **Trains Server** maintains). + - Create a new reproducible Task - :meth:`Task.init` + + .. important:: + In some cases, ``Task.init`` may return a Task object which is already stored in **Trains Server** (already + initialized), instead of creating a new Task. For a detailed explanation of those cases, see the ``Task.init`` + method. + + - Create a new non-reproducible Task - :meth:`Task.create` + - Get the current running Task - :meth:`Task.current_task` + - Get another (different) Task - :meth:`Task.get_task` .. note:: The **Trains** documentation often refers to a Task as, "Task (experiment)". @@ -98,8 +104,8 @@ class Task(_Task): Therefore, a "Task" is effectively an "experiment", and "Task (experiment)" encompasses its usage throughout the Trains. - The exception to this Task behavior is sub-tasks (non-reproducible Tasks), which do not use the main execution - Task. Creating a sub-task always creates a new Task with a new Task ID. + The exception to this Task behavior is sub-tasks (non-reproducible Tasks), which do not use the main execution + Task. Creating a sub-task always creates a new Task with a new Task ID. """ TaskTypes = _Task.TaskTypes @@ -176,22 +182,43 @@ class Task(_Task): ): # type: (...) -> Task """ - Creates a new Task (experiment), or returns the existing Task, depending upon the following: + Creates a new Task (experiment) if: - - If **any** of the following are true, Trains creates a new Task and a new Task ID: + - The Task never ran before. No Task with the same ``task_name`` and ``project_name`` is stored in + **Trains Server**. + - The Task has run before (the same ``task_name`` and ``project_name``), and (a) it stored models and / or + artifacts, or (b) its status is Published , or (c) it is Archived. + - A new Task is forced by calling ``Task.init`` with ``reuse_last_task_id=False``. - - a Task in the same project with same name does not exist, **or** - - a Task in the same project with same name does exist and its status is ``Published``, **or** - - the ``reuse_last_task_id`` parameter is assigned ``False``. + Otherwise, the already initialized Task object for the same ``task_name`` and ``project_name`` is returned. - - If **all** of the following are true, Trains returns the existing Task with the existing Task ID: + .. note:: + To reference another Task, instead of initializing the same Task more than once, call + :meth:`Task.get_task`. For example, to "share" the same experiment in more than one script, + call ``Task.get_task``. See the ``Task.get_task`` method for an example. - - a Task in the same project with the same name does exist, **and** - - the Task's status is ``Draft``, ``Completed``, ``Failed``, or ``Aborted``, **and** - - the ``reuse_last_task_id`` parameter is the default value of ``True``. + For example: - .. warning:: - When a Python experiment script runs using an existing Task, it overwrites previous experiment output. + The first time the following code runs, it will create a new Task. The status will be Completed. + + .. code-block:: py + + from trains import Task + task = Task.init('myProject', 'myTask') + + If this code runs again, it will not create a new Task. It does not store a model or artifact, + it is not Published (its status Completed) , it was not Archived, and a new Task is not forced. + + If the Task is Published or Archived, and run again, it will create a new Task with a new Task ID. + + The following code will create a new Task every time it runs, because it stores an artifact. + + .. code-block:: py + + task = Task.init('myProject', 'myOtherTask') + + d = {'a': '1'} + task.upload_artifact('myArtifact', d) :param str project_name: The name of the project in which the experiment will be created. If the project does not exist, it is created. If ``project_name`` is ``None``, the repository name is used. (Optional) @@ -464,8 +491,8 @@ class Task(_Task): # Check if parse args already called. If so, sync task parameters with parser if argparser_parseargs_called(): - parser, parsed_args = get_argparser_last_args() - task._connect_argparse(parser=parser, parsed_args=parsed_args) + for parser, parsed_args in get_argparser_last_args(): + task._connect_argparse(parser=parser, parsed_args=parsed_args) elif argparser_parseargs_called(): # actually we have nothing to do, in remote running, the argparser will ignore # all non argparser parameters, only caveat if parameter connected with the same name @@ -494,8 +521,8 @@ class Task(_Task): Create a new, non-reproducible Task (experiment). This is called a sub-task. .. note:: - - This method always creates a new Task. - - To create reproducible Tasks, use the :meth:`Task.init` method. + This method always creates a new, non-reproducible Task. To create a reproducible Task, call the + :meth:`Task.init` method. To reference another Task, call the :meth:`Task.get_task` method . :param str project_name: The name of the project in which the experiment will be created. If ``project_name`` is ``None``, and the main execution Task is initialized (see :meth:`Task.init`), @@ -545,6 +572,36 @@ class Task(_Task): """ Get a Task by Id, or project name / task name combination. + For example: + + The following code demonstrates calling ``Task.get_task`` to report a scalar to another Task. The output + of :meth:`.Logger.report_scalar` from testing is associated with the Task named ``training``. It allows + training and testing to run concurrently, because they initialized different Tasks (see :meth:`Task.init` + for information about initializing Tasks). + + The training script: + + .. code-block:: py + + # initialize the training Task + task = Task.init('myProject', 'training') + + # do some training + + The testing script: + + .. code-block:: py + + # initialize the testing Task + task = Task.init('myProject', 'testing') + + # get the training Task + train_task = Task.get_task(project_name='myProject', task_name='training') + + # report metrics in the training Task + for x in range(10): + train_task.get_logger().report_scalar('title', 'series', value=x * 2, iteration=x) + :param str task_id: The Id (system UUID) of the experiment to get. If specified, ``project_name`` and ``task_name`` are ignored. :param str project_name: The project name of the Task to get. @@ -968,22 +1025,28 @@ class Task(_Task): """ return self._get_logger() - def mark_started(self): + def mark_started(self, force=False): + # type: (bool) -> () """ Manually mark a Task as started (happens automatically) + + :param bool force: If True the task status will be changed to `started` regardless of the current Task state. """ # UI won't let us see metrics if we're not started - self.started() + self.started(force=force) self.reload() - def mark_stopped(self): + def mark_stopped(self, force=False): + # type: (bool) -> () """ Manually mark a Task as stopped (also used in :meth:`_at_exit`) + + :param bool force: If True the task status will be changed to `stopped` regardless of the current Task state. """ # flush any outstanding logs self.flush(wait_for_uploads=True) # mark task as stopped - self.stopped() + self.stopped(force=force) def flush(self, wait_for_uploads=False): # type: (bool) -> bool @@ -1114,9 +1177,10 @@ class Task(_Task): def upload_artifact( self, name, # type: str - artifact_object, # type: Union[str, Mapping, pandas.DataFrame, numpy.ndarray, Image.Image] + artifact_object, # type: Union[str, Mapping, pandas.DataFrame, numpy.ndarray, Image.Image, Any] metadata=None, # type: Optional[Mapping] - delete_after_upload=False # type: bool + delete_after_upload=False, # type: bool + auto_pickle=True, # type: bool ): # type: (...) -> bool """ @@ -1130,6 +1194,7 @@ class Task(_Task): - pandas.DataFrame - Trains stores a pandas.DataFrame as ``.csv.gz`` (compressed CSV) file and uploads it. - numpy.ndarray - Trains stores a numpy.ndarray as ``.npz`` file and uploads it. - PIL.Image - Trains stores a PIL.Image as ``.png`` file and uploads it. + - Any - If called with auto_pickle=True, the object will be pickled and uploaded. :param str name: The artifact name. @@ -1144,6 +1209,10 @@ class Task(_Task): - ``True`` - Delete the local copy of the artifact. - ``False`` - Do not delete. (default) + :param bool auto_pickle: If True (default) and the artifact_object is not one of the following types: + pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) + the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl) + :return: The status of the upload. - ``True`` - Upload succeeded. @@ -1151,8 +1220,9 @@ class Task(_Task): :raise: If the artifact object type is not supported, raise a ``ValueError``. """ - return self._artifacts_manager.upload_artifact(name=name, artifact_object=artifact_object, - metadata=metadata, delete_after_upload=delete_after_upload) + return self._artifacts_manager.upload_artifact( + name=name, artifact_object=artifact_object, metadata=metadata, + delete_after_upload=delete_after_upload, auto_pickle=auto_pickle) def get_models(self): # type: () -> Dict[str, Sequence[Model]] @@ -1269,16 +1339,6 @@ class Task(_Task): self._reload_last_iteration() return max(self.data.last_iteration, self._reporter.max_iteration if self._reporter else 0) - def set_last_iteration(self, last_iteration): - # type: (int) -> None - """ - Forcefully set the last reported iteration, which is the last iteration for which the Task reported a metric. - - :param int last_iteration: The last reported iteration number. - """ - self.data.last_iteration = int(last_iteration) - self._edit(last_iteration=self.data.last_iteration) - def set_initial_iteration(self, offset=0): # type: (int) -> int """ @@ -1632,6 +1692,10 @@ class Task(_Task): # set default docker image from env. task._set_default_docker_image() + # mark us as the main Task, there should only be one dev Task at a time. + if not Task.__main_task: + Task.__main_task = task + # mark the task as started task.started() # reload, making sure we are synced @@ -1794,11 +1858,19 @@ class Task(_Task): argparser_update_currenttask(self) if (parser is None or parsed_args is None) and argparser_parseargs_called(): - _parser, _parsed_args = get_argparser_last_args() - if parser is None: - parser = _parser - if parsed_args is None and parser == _parser: - parsed_args = _parsed_args + # if we have a parser but nor parsed_args, we need to find the parser + if parser and not parsed_args: + for _parser, _parsed_args in get_argparser_last_args(): + if _parser == parser: + parsed_args = _parsed_args + break + else: + # prefer the first argparser (hopefully it is more relevant?! + for _parser, _parsed_args in get_argparser_last_args(): + if parser is None: + parser = _parser + if parsed_args is None and parser == _parser: + parsed_args = _parsed_args if running_remotely() and self.is_main_task(): self._arguments.copy_to_parser(parser, parsed_args)