Add task.upload_artifact auto_pickle=True #153

Fix multiple argparse calls before task.init
Add mark_started force argument #159
This commit is contained in:
allegroai 2020-07-02 01:21:00 +03:00
parent efd843d863
commit b07a17890d
2 changed files with 168 additions and 58 deletions

View File

@ -2,6 +2,7 @@ import hashlib
import json
import mimetypes
import os
import pickle
from six.moves.urllib.parse import quote
from copy import deepcopy
from datetime import datetime
@ -160,6 +161,9 @@ class Artifact(object):
elif self.type == 'JSON':
with open(local_file, 'rt') as f:
self._object = json.load(f)
elif self.type == 'pickle':
with open(local_file, 'rb') as f:
self._object = pickle.load(f)
local_file = Path(local_file)
@ -197,6 +201,8 @@ class Artifact(object):
class Artifacts(object):
max_preview_size_bytes = 65536
_flush_frequency_sec = 300.
# notice these two should match
_save_format = '.csv.gz'
@ -293,8 +299,8 @@ class Artifacts(object):
self._unregister_request.add(name)
self.flush()
def upload_artifact(self, name, artifact_object=None, metadata=None, delete_after_upload=False):
# type: (str, Optional[object], Optional[dict], bool) -> bool
def upload_artifact(self, name, artifact_object=None, metadata=None, delete_after_upload=False, auto_pickle=True):
# type: (str, Optional[object], Optional[dict], bool, bool) -> bool
if not Session.check_min_api_version('2.3'):
LoggerRoot.get_base_logger().warning('Artifacts not supported by your TRAINS-server version, '
'please upgrade to the latest server version')
@ -303,6 +309,16 @@ class Artifacts(object):
if name in self._artifacts_container:
raise ValueError("Artifact by the name of {} is already registered, use register_artifact".format(name))
# convert string to object if try is a file/folder (dont try to serialize long texts
if isinstance(artifact_object, six.string_types) and len(artifact_object) < 2048:
# noinspection PyBroadException
try:
artifact_path = Path(artifact_object)
if artifact_path.exists():
artifact_object = artifact_path
except Exception:
pass
artifact_type_data = tasks.ArtifactTypeData()
override_filename_in_uri = None
override_filename_ext_in_uri = None
@ -347,19 +363,15 @@ class Artifacts(object):
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
os.write(fd, bytes(preview.encode()))
os.close(fd)
artifact_type_data.preview = preview
if len(preview) < self.max_preview_size_bytes:
artifact_type_data.preview = preview
else:
artifact_type_data.preview = '# full json too large to store, storing first {}kb\n{}'.format(
len(preview)//1024, preview[:self.max_preview_size_bytes]
)
delete_after_upload = True
elif (
isinstance(artifact_object, six.string_types)
and urlparse(artifact_object).scheme in remote_driver_schemes
):
# we should not upload this, just register
local_filename = None
uri = artifact_object
artifact_type = 'custom'
artifact_type_data.content_type = mimetypes.guess_type(artifact_object)[0]
elif isinstance(
artifact_object, six.string_types + (Path, pathlib_Path,) if pathlib_Path is not None else (Path,)):
elif isinstance(artifact_object, (Path, pathlib_Path,) if pathlib_Path is not None else (Path,)):
# check if single file
artifact_object = Path(artifact_object)
@ -420,6 +432,32 @@ class Artifacts(object):
artifact_type = 'custom'
artifact_type_data.content_type = mimetypes.guess_type(artifact_object)[0]
local_filename = artifact_object
elif (
isinstance(artifact_object, six.string_types)
and urlparse(artifact_object).scheme in remote_driver_schemes
):
# we should not upload this, just register
local_filename = None
uri = artifact_object
artifact_type = 'custom'
artifact_type_data.content_type = mimetypes.guess_type(artifact_object)[0]
elif auto_pickle:
# if we are here it means we do not know what to do with the object, so we serialize it with pickle.
artifact_type = 'pickle'
artifact_type_data.content_type = 'application/pickle'
artifact_type_data.preview = str(artifact_object.__repr__())[:self.max_preview_size_bytes]
delete_after_upload = True
override_filename_ext_in_uri = '.pkl'
override_filename_in_uri = name + override_filename_ext_in_uri
fd, local_filename = mkstemp(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
os.close(fd)
try:
with open(local_filename, 'wb') as f:
pickle.dump(artifact_object, f)
except Exception as ex:
# cleanup and raise exception
os.unlink(local_filename)
raise
else:
raise ValueError("Artifact type {} not supported".format(type(artifact_object)))

View File

@ -80,10 +80,16 @@ class Task(_Task):
For detailed information about creating Task objects, see the following methods:
- :meth:`Task.init` - Create a new reproducible Task, or reuse one.
- :meth:`Task.create` - Create a new non-reproducible Task.
- :meth:`Task.current_task` - Get the current running Task.
- :meth:`Task.get_task` - Get another Task (whose metadata the **Trains Server** maintains).
- Create a new reproducible Task - :meth:`Task.init`
.. important::
In some cases, ``Task.init`` may return a Task object which is already stored in **Trains Server** (already
initialized), instead of creating a new Task. For a detailed explanation of those cases, see the ``Task.init``
method.
- Create a new non-reproducible Task - :meth:`Task.create`
- Get the current running Task - :meth:`Task.current_task`
- Get another (different) Task - :meth:`Task.get_task`
.. note::
The **Trains** documentation often refers to a Task as, "Task (experiment)".
@ -98,8 +104,8 @@ class Task(_Task):
Therefore, a "Task" is effectively an "experiment", and "Task (experiment)" encompasses its usage throughout
the Trains.
The exception to this Task behavior is sub-tasks (non-reproducible Tasks), which do not use the main execution
Task. Creating a sub-task always creates a new Task with a new Task ID.
The exception to this Task behavior is sub-tasks (non-reproducible Tasks), which do not use the main execution
Task. Creating a sub-task always creates a new Task with a new Task ID.
"""
TaskTypes = _Task.TaskTypes
@ -176,22 +182,43 @@ class Task(_Task):
):
# type: (...) -> Task
"""
Creates a new Task (experiment), or returns the existing Task, depending upon the following:
Creates a new Task (experiment) if:
- If **any** of the following are true, Trains creates a new Task and a new Task ID:
- The Task never ran before. No Task with the same ``task_name`` and ``project_name`` is stored in
**Trains Server**.
- The Task has run before (the same ``task_name`` and ``project_name``), and (a) it stored models and / or
artifacts, or (b) its status is Published , or (c) it is Archived.
- A new Task is forced by calling ``Task.init`` with ``reuse_last_task_id=False``.
- a Task in the same project with same name does not exist, **or**
- a Task in the same project with same name does exist and its status is ``Published``, **or**
- the ``reuse_last_task_id`` parameter is assigned ``False``.
Otherwise, the already initialized Task object for the same ``task_name`` and ``project_name`` is returned.
- If **all** of the following are true, Trains returns the existing Task with the existing Task ID:
.. note::
To reference another Task, instead of initializing the same Task more than once, call
:meth:`Task.get_task`. For example, to "share" the same experiment in more than one script,
call ``Task.get_task``. See the ``Task.get_task`` method for an example.
- a Task in the same project with the same name does exist, **and**
- the Task's status is ``Draft``, ``Completed``, ``Failed``, or ``Aborted``, **and**
- the ``reuse_last_task_id`` parameter is the default value of ``True``.
For example:
.. warning::
When a Python experiment script runs using an existing Task, it overwrites previous experiment output.
The first time the following code runs, it will create a new Task. The status will be Completed.
.. code-block:: py
from trains import Task
task = Task.init('myProject', 'myTask')
If this code runs again, it will not create a new Task. It does not store a model or artifact,
it is not Published (its status Completed) , it was not Archived, and a new Task is not forced.
If the Task is Published or Archived, and run again, it will create a new Task with a new Task ID.
The following code will create a new Task every time it runs, because it stores an artifact.
.. code-block:: py
task = Task.init('myProject', 'myOtherTask')
d = {'a': '1'}
task.upload_artifact('myArtifact', d)
:param str project_name: The name of the project in which the experiment will be created. If the project does
not exist, it is created. If ``project_name`` is ``None``, the repository name is used. (Optional)
@ -464,8 +491,8 @@ class Task(_Task):
# Check if parse args already called. If so, sync task parameters with parser
if argparser_parseargs_called():
parser, parsed_args = get_argparser_last_args()
task._connect_argparse(parser=parser, parsed_args=parsed_args)
for parser, parsed_args in get_argparser_last_args():
task._connect_argparse(parser=parser, parsed_args=parsed_args)
elif argparser_parseargs_called():
# actually we have nothing to do, in remote running, the argparser will ignore
# all non argparser parameters, only caveat if parameter connected with the same name
@ -494,8 +521,8 @@ class Task(_Task):
Create a new, non-reproducible Task (experiment). This is called a sub-task.
.. note::
- This method always creates a new Task.
- To create reproducible Tasks, use the :meth:`Task.init` method.
This method always creates a new, non-reproducible Task. To create a reproducible Task, call the
:meth:`Task.init` method. To reference another Task, call the :meth:`Task.get_task` method .
:param str project_name: The name of the project in which the experiment will be created.
If ``project_name`` is ``None``, and the main execution Task is initialized (see :meth:`Task.init`),
@ -545,6 +572,36 @@ class Task(_Task):
"""
Get a Task by Id, or project name / task name combination.
For example:
The following code demonstrates calling ``Task.get_task`` to report a scalar to another Task. The output
of :meth:`.Logger.report_scalar` from testing is associated with the Task named ``training``. It allows
training and testing to run concurrently, because they initialized different Tasks (see :meth:`Task.init`
for information about initializing Tasks).
The training script:
.. code-block:: py
# initialize the training Task
task = Task.init('myProject', 'training')
# do some training
The testing script:
.. code-block:: py
# initialize the testing Task
task = Task.init('myProject', 'testing')
# get the training Task
train_task = Task.get_task(project_name='myProject', task_name='training')
# report metrics in the training Task
for x in range(10):
train_task.get_logger().report_scalar('title', 'series', value=x * 2, iteration=x)
:param str task_id: The Id (system UUID) of the experiment to get.
If specified, ``project_name`` and ``task_name`` are ignored.
:param str project_name: The project name of the Task to get.
@ -968,22 +1025,28 @@ class Task(_Task):
"""
return self._get_logger()
def mark_started(self):
def mark_started(self, force=False):
# type: (bool) -> ()
"""
Manually mark a Task as started (happens automatically)
:param bool force: If True the task status will be changed to `started` regardless of the current Task state.
"""
# UI won't let us see metrics if we're not started
self.started()
self.started(force=force)
self.reload()
def mark_stopped(self):
def mark_stopped(self, force=False):
# type: (bool) -> ()
"""
Manually mark a Task as stopped (also used in :meth:`_at_exit`)
:param bool force: If True the task status will be changed to `stopped` regardless of the current Task state.
"""
# flush any outstanding logs
self.flush(wait_for_uploads=True)
# mark task as stopped
self.stopped()
self.stopped(force=force)
def flush(self, wait_for_uploads=False):
# type: (bool) -> bool
@ -1114,9 +1177,10 @@ class Task(_Task):
def upload_artifact(
self,
name, # type: str
artifact_object, # type: Union[str, Mapping, pandas.DataFrame, numpy.ndarray, Image.Image]
artifact_object, # type: Union[str, Mapping, pandas.DataFrame, numpy.ndarray, Image.Image, Any]
metadata=None, # type: Optional[Mapping]
delete_after_upload=False # type: bool
delete_after_upload=False, # type: bool
auto_pickle=True, # type: bool
):
# type: (...) -> bool
"""
@ -1130,6 +1194,7 @@ class Task(_Task):
- pandas.DataFrame - Trains stores a pandas.DataFrame as ``.csv.gz`` (compressed CSV) file and uploads it.
- numpy.ndarray - Trains stores a numpy.ndarray as ``.npz`` file and uploads it.
- PIL.Image - Trains stores a PIL.Image as ``.png`` file and uploads it.
- Any - If called with auto_pickle=True, the object will be pickled and uploaded.
:param str name: The artifact name.
@ -1144,6 +1209,10 @@ class Task(_Task):
- ``True`` - Delete the local copy of the artifact.
- ``False`` - Do not delete. (default)
:param bool auto_pickle: If True (default) and the artifact_object is not one of the following types:
pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string)
the artifact_object will be pickled and uploaded as pickle file artifact (with file extension .pkl)
:return: The status of the upload.
- ``True`` - Upload succeeded.
@ -1151,8 +1220,9 @@ class Task(_Task):
:raise: If the artifact object type is not supported, raise a ``ValueError``.
"""
return self._artifacts_manager.upload_artifact(name=name, artifact_object=artifact_object,
metadata=metadata, delete_after_upload=delete_after_upload)
return self._artifacts_manager.upload_artifact(
name=name, artifact_object=artifact_object, metadata=metadata,
delete_after_upload=delete_after_upload, auto_pickle=auto_pickle)
def get_models(self):
# type: () -> Dict[str, Sequence[Model]]
@ -1269,16 +1339,6 @@ class Task(_Task):
self._reload_last_iteration()
return max(self.data.last_iteration, self._reporter.max_iteration if self._reporter else 0)
def set_last_iteration(self, last_iteration):
# type: (int) -> None
"""
Forcefully set the last reported iteration, which is the last iteration for which the Task reported a metric.
:param int last_iteration: The last reported iteration number.
"""
self.data.last_iteration = int(last_iteration)
self._edit(last_iteration=self.data.last_iteration)
def set_initial_iteration(self, offset=0):
# type: (int) -> int
"""
@ -1632,6 +1692,10 @@ class Task(_Task):
# set default docker image from env.
task._set_default_docker_image()
# mark us as the main Task, there should only be one dev Task at a time.
if not Task.__main_task:
Task.__main_task = task
# mark the task as started
task.started()
# reload, making sure we are synced
@ -1794,11 +1858,19 @@ class Task(_Task):
argparser_update_currenttask(self)
if (parser is None or parsed_args is None) and argparser_parseargs_called():
_parser, _parsed_args = get_argparser_last_args()
if parser is None:
parser = _parser
if parsed_args is None and parser == _parser:
parsed_args = _parsed_args
# if we have a parser but nor parsed_args, we need to find the parser
if parser and not parsed_args:
for _parser, _parsed_args in get_argparser_last_args():
if _parser == parser:
parsed_args = _parsed_args
break
else:
# prefer the first argparser (hopefully it is more relevant?!
for _parser, _parsed_args in get_argparser_last_args():
if parser is None:
parser = _parser
if parsed_args is None and parser == _parser:
parsed_args = _parsed_args
if running_remotely() and self.is_main_task():
self._arguments.copy_to_parser(parser, parsed_args)