From 6d6b54f5a1c01acfc1b50b46402ab2fbd2ddff4b Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 14 Oct 2022 10:32:36 +0300 Subject: [PATCH] Allow pipeline steps to return string paths without them being treated as a folder artifact and zipped (#780) --- clearml/automation/controller.py | 482 +++++++++++++++------ clearml/backend_interface/task/populate.py | 35 +- 2 files changed, 375 insertions(+), 142 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 762de15c..5be4e5f7 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -691,112 +691,42 @@ class PipelineController(object): :return: True if successful """ - # always store callback functions (even when running remotely) - if pre_execute_callback: - self._pre_step_callbacks[name] = pre_execute_callback - if post_execute_callback: - self._post_step_callbacks[name] = post_execute_callback - - self._verify_node_name(name) - function_kwargs = function_kwargs or {} - function_input_artifacts = {} - # go over function_kwargs, split it into string and input artifacts - for k, v in function_kwargs.items(): - if v is None: - continue - if self._step_ref_pattern.match(str(v)): - # check for step artifacts - step, _, artifact = v[2:-1].partition('.') - if step in self._nodes and artifact in self._nodes[step].return_artifacts: - function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact) - continue - # verify the reference only if we are running locally (on remote when we have multiple - # steps from tasks the _nodes is till empty, only after deserializing we will have the full DAG) - if self._task.running_locally(): - self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v) - elif not isinstance(v, (float, int, bool, six.string_types)): - function_input_artifacts[k] = "{}.{}.{}".format(self._task.id, name, k) - self._task.upload_artifact( - "{}.{}".format(name, k), - artifact_object=v, - wait_on_upload=True, - extension_name=".pkl" if isinstance(v, dict) else None, - ) + default_kwargs = inspect.getfullargspec(function) + if default_kwargs and default_kwargs.args and default_kwargs.defaults: + for key, val in zip(default_kwargs.args[-len(default_kwargs.defaults):], default_kwargs.defaults): + function_kwargs.setdefault(key, val) - function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts} - parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()} - if function_input_artifacts: - parameters.update( - {"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v) - for k, v in function_input_artifacts.items()} - ) - - job_code_section = None - task_name = task_name or name or None - - if self._mock_execution: - project_name = project_name or self._get_target_project() or self._task.get_project_name() - - task_definition = self._create_task_from_function( - docker, docker_args, docker_bash_setup_script, function, - function_input_artifacts, function_kwargs, function_return, - auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, - task_type, repo, repo_branch, repo_commit, helper_functions) - - elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None: - project_name = project_name or self._get_target_project() or self._task.get_project_name() - - task_definition = self._create_task_from_function( - docker, docker_args, docker_bash_setup_script, function, - function_input_artifacts, function_kwargs, function_return, - auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, - task_type, repo, repo_branch, repo_commit, helper_functions) - # update configuration with the task definitions - # noinspection PyProtectedMember - self._task._set_configuration( - name=name, config_type='json', - config_text=json.dumps(task_definition, indent=1) - ) - job_code_section = name - else: - # load task definition from configuration - # noinspection PyProtectedMember - config_text = self._task._get_configuration_text(name=name) - task_definition = json.loads(config_text) if config_text else dict() - - def _create_task(_): - a_task = Task.create( - project_name=project_name, - task_name=task_definition.get('name'), - task_type=task_definition.get('type'), - ) - # replace reference - a_task.update_task(task_definition) - return a_task - - self._nodes[name] = self.Node( - name=name, base_task_id=None, parents=parents or [], - queue=execution_queue, timeout=time_limit, - parameters=parameters, - clone_task=False, - cache_executed_step=cache_executed_step, - task_factory_func=_create_task, - continue_on_fail=continue_on_fail, - return_artifacts=function_return, - monitor_artifacts=monitor_artifacts, + return self._add_function_step( + name=name, + function=function, + function_kwargs=function_kwargs, + function_return=function_return, + project_name=project_name, + task_name=task_name, + task_type=task_type, + auto_connect_frameworks=auto_connect_frameworks, + auto_connect_arg_parser=auto_connect_arg_parser, + packages=packages, + repo=repo, + repo_branch=repo_branch, + repo_commit=repo_commit, + helper_functions=helper_functions, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + parents=parents, + execution_queue=execution_queue, monitor_metrics=monitor_metrics, + monitor_artifacts=monitor_artifacts, monitor_models=monitor_models, - job_code_section=job_code_section, + time_limit=time_limit, + continue_on_fail=continue_on_fail, + pre_execute_callback=pre_execute_callback, + post_execute_callback=post_execute_callback, + cache_executed_step=cache_executed_step, + retry_on_failure=retry_on_failure, ) - self._retries[name] = 0 - self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ - (functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure) - if isinstance(retry_on_failure, int) else self._retry_on_failure_callback) - - return True def start( self, @@ -1659,6 +1589,289 @@ class PipelineController(object): # return False if we did not cover all the nodes return not bool(set(self._nodes.keys()) - visited) + def _add_function_step( + self, + name, # type: str + function, # type: Callable + function_kwargs=None, # type: Optional[Dict[str, Any]] + function_return=None, # type: Optional[List[str]] + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] + packages=None, # type: Optional[Union[str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + helper_functions=None, # type: Optional[Sequence[Callable]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + parents=None, # type: Optional[Sequence[str]], + execution_queue=None, # type: Optional[str] + monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] + monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + time_limit=None, # type: Optional[float] + continue_on_fail=False, # type: bool + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + cache_executed_step=False, # type: bool + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + ): + # type: (...) -> bool + """ + Create a Task from a function, including wrapping the function input arguments + into the hyper-parameter section as kwargs, and storing function results as named artifacts + + Example: + + .. code-block:: py + + def mock_func(a=6, b=9): + c = a*b + print(a, b, c) + return c, c**2 + + create_task_from_function(mock_func, function_return=['mul', 'square']) + + Example arguments from other Tasks (artifact): + + .. code-block:: py + + def mock_func(matrix_np): + c = matrix_np*matrix_np + print(matrix_np, c) + return c + + create_task_from_function( + mock_func, + function_kwargs={'matrix_np': 'aabb1122.previous_matrix'}, + function_return=['square_matrix'] + ) + + :param name: Unique of the step. For example `stage1` + :param function: A global function to convert into a standalone Task + :param function_kwargs: Optional, provide subset of function arguments and default values to expose. + If not provided automatically take all function arguments & defaults + Optional, pass input arguments to the function from other Tasks's output artifact. + Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: + {'numpy_matrix': 'aabbcc.answer'} + :param function_return: Provide a list of names for all the results. + If not provided, no results will be stored as artifacts. + :param project_name: Set the project name for the task. Required if base_task_id is None. + :param task_name: Set the name of the remote task, if not provided use `name` argument. + :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', + 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks + :param auto_connect_arg_parser: Control the ArgParser auto connect, see `Task.init` auto_connect_arg_parser + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" + If not provided, packages are automatically added based on the imports used in the function. + :param repo: Optional, specify a repository to attach to the function, when remotely executing. + Allow users to execute the function inside the specified repository, enabling to load modules/script + from a repository Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path. + Example remote url: 'https://github.com/user/repo.git' + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy + :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used) + :param helper_functions: Optional, a list of helper functions to make available + for the standalone function Task. + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param parents: Optional list of parent nodes in the DAG. + The current step in the pipeline will be sent for execution only after all the parent nodes + have been executed successfully. + :param execution_queue: Optional, the queue to use for executing this specific step. + If not provided, the task will be sent to the default execution queue, as defined on the class + :param monitor_metrics: Optional, log the step's metrics on the pipeline Task. + Format is a list of pairs metric (title, series) to log: + [(step_metric_title, step_metric_series), ] + Example: [('test', 'accuracy'), ] + Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task: + [((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ] + Example: [[('test', 'accuracy'), ('model', 'accuracy')], ] + :param monitor_artifacts: Optional, log the step's artifacts on the pipeline Task. + Provided a list of artifact names existing on the step's Task, they will also appear on the Pipeline itself. + Example: [('processed_data', 'final_processed_data'), ] + Alternatively user can also provide a list of artifacts to monitor + (target artifact name will be the same as original artifact name) + Example: ['processed_data', ] + :param monitor_models: Optional, log the step's output models on the pipeline Task. + Provided a list of model names existing on the step's Task, they will also appear on the Pipeline itself. + Example: [('model_weights', 'final_model_weights'), ] + Alternatively user can also provide a list of models to monitor + (target models name will be the same as original model) + Example: ['model_weights', ] + To select the latest (lexicographic) model use "model_*", or the last created model with just "*" + Example: ['model_weights_*', ] + :param time_limit: Default None, no time limit. + Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed. + :param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop + (or marked as failed). Notice, that steps that are connected (or indirectly connected) + to the failed step will be skipped. + :param pre_execute_callback: Callback function, called when the step (Task) is created + and before it is sent for execution. Allows a user to modify the Task before launch. + Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. + `parameters` are the configuration arguments passed to the ClearmlJob. + + If the callback returned value is `False`, + the Node is skipped and so is any node in the DAG that relies on this node. + + Notice the `parameters` are already parsed, + e.g. `${step1.parameters.Args/param}` is replaced with relevant value. + + .. code-block:: py + + def step_created_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + parameters, # type: dict + ): + pass + + :param post_execute_callback: Callback function, called when a step (Task) is completed + and it other jobs are executed. Allows a user to modify the Task status after completion. + + .. code-block:: py + + def step_completed_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + ): + pass + + :param cache_executed_step: If True, before launching the new step, + after updating with the latest configuration, check if an exact Task with the same parameter/code + was already executed. If it was found, use it instead of launching a new Task. + Default: False, a new cloned copy of base_task is always used. + Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. + + :param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry + - Integer: In case of node failure, retry the node the number of times indicated by this parameter. + - Callable: A function called on node failure. Takes as parameters: + the PipelineController instance, the PipelineController.Node that failed and an int + representing the number of previous retries for the node that failed + The function must return a `bool`: True if the node should be retried and False otherwise. + If True, the node will be re-queued and the number of retries left will be decremented by 1. + By default, if this callback is not specified, the function will be retried the number of + times indicated by `retry_on_failure`. + + .. code-block:: py + + def example_retry_on_failure_callback(pipeline, node, retries): + print(node.name, ' failed') + # allow up to 5 retries (total of 6 runs) + return retries < 5 + + :return: True if successful + """ + # always store callback functions (even when running remotely) + if pre_execute_callback: + self._pre_step_callbacks[name] = pre_execute_callback + if post_execute_callback: + self._post_step_callbacks[name] = post_execute_callback + + self._verify_node_name(name) + + function_input_artifacts = {} + # go over function_kwargs, split it into string and input artifacts + for k, v in function_kwargs.items(): + if v is None: + continue + if self._step_ref_pattern.match(str(v)): + # check for step artifacts + step, _, artifact = v[2:-1].partition('.') + if step in self._nodes and artifact in self._nodes[step].return_artifacts: + function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact) + continue + # verify the reference only if we are running locally (on remote when we have multiple + # steps from tasks the _nodes is till empty, only after deserializing we will have the full DAG) + if self._task.running_locally(): + self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v) + elif not isinstance(v, (float, int, bool, six.string_types)): + function_input_artifacts[k] = "{}.{}.{}".format(self._task.id, name, k) + self._upload_pipeline_artifact(artifact_name="{}.{}".format(name, k), artifact_object=v) + + function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts} + parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()} + if function_input_artifacts: + parameters.update( + {"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v) + for k, v in function_input_artifacts.items()} + ) + + job_code_section = None + task_name = task_name or name or None + + if self._mock_execution: + project_name = project_name or self._get_target_project() or self._task.get_project_name() + + task_definition = self._create_task_from_function( + docker, docker_args, docker_bash_setup_script, function, + function_input_artifacts, function_kwargs, function_return, + auto_connect_frameworks, auto_connect_arg_parser, + packages, project_name, task_name, + task_type, repo, repo_branch, repo_commit, helper_functions) + + elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None: + project_name = project_name or self._get_target_project() or self._task.get_project_name() + + task_definition = self._create_task_from_function( + docker, docker_args, docker_bash_setup_script, function, + function_input_artifacts, function_kwargs, function_return, + auto_connect_frameworks, auto_connect_arg_parser, + packages, project_name, task_name, + task_type, repo, repo_branch, repo_commit, helper_functions) + # update configuration with the task definitions + # noinspection PyProtectedMember + self._task._set_configuration( + name=name, config_type='json', + config_text=json.dumps(task_definition, indent=1) + ) + job_code_section = name + else: + # load task definition from configuration + # noinspection PyProtectedMember + config_text = self._task._get_configuration_text(name=name) + task_definition = json.loads(config_text) if config_text else dict() + + def _create_task(_): + a_task = Task.create( + project_name=project_name, + task_name=task_definition.get('name'), + task_type=task_definition.get('type'), + ) + # replace reference + a_task.update_task(task_definition) + return a_task + + self._nodes[name] = self.Node( + name=name, base_task_id=None, parents=parents or [], + queue=execution_queue, timeout=time_limit, + parameters=parameters, + clone_task=False, + cache_executed_step=cache_executed_step, + task_factory_func=_create_task, + continue_on_fail=continue_on_fail, + return_artifacts=function_return, + monitor_artifacts=monitor_artifacts, + monitor_metrics=monitor_metrics, + monitor_models=monitor_models, + job_code_section=job_code_section, + ) + self._retries[name] = 0 + self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ + (functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure) + if isinstance(retry_on_failure, int) else self._retry_on_failure_callback) + + return True + def _relaunch_node(self, node): if not node.job: getLogger("clearml.automation.controller").warning( @@ -1941,24 +2154,6 @@ class PipelineController(object): ) return False - @classmethod - def _wait_for_node(cls, node): - pool_period = 5.0 if cls._debug_execute_step_process else 20.0 - while True: - node.job.wait(pool_period=pool_period, aborted_nonresponsive_as_running=True) - job_status = str(node.job.status(force=True)) - if ( - ( - job_status == str(Task.TaskStatusEnum.stopped) - and node.job.status_message() == cls._relaunch_status_message - ) - or (job_status == str(Task.TaskStatusEnum.failed) and not cls._final_failure.get(node.name)) - or not node.job.is_stopped() - ): - sleep(pool_period) - else: - break - @classmethod def _get_node_color(cls, node): # type (self.Mode) -> str @@ -2637,6 +2832,14 @@ class PipelineController(object): def _default_retry_on_failure_callback(self, _pipeline_controller, _node, retries, max_retries=None): return retries < (self._def_max_retry_on_failure if max_retries is None else max_retries) + def _upload_pipeline_artifact(self, artifact_name, artifact_object): + self._task.upload_artifact( + name=artifact_name, + artifact_object=artifact_object, + wait_on_upload=True, + extension_name=".pkl" if isinstance(artifact_object, dict) else None, + ) + class PipelineDecorator(PipelineController): _added_decorator = [] # type: List[dict] @@ -2722,7 +2925,7 @@ class PipelineDecorator(PipelineController): PipelineDecorator._default_execution_queue) for n in self._added_decorator: - self.add_function_step(**n) + self._add_function_step(**n) self._added_decorator.clear() PipelineDecorator._singleton = self self._reference_callback = [] @@ -3001,6 +3204,24 @@ class PipelineDecorator(PipelineController): return task_hash + @classmethod + def _wait_for_node(cls, node): + pool_period = 5.0 if cls._debug_execute_step_process else 20.0 + while True: + node.job.wait(pool_period=pool_period, aborted_nonresponsive_as_running=True) + job_status = str(node.job.status(force=True)) + if ( + ( + job_status == str(Task.TaskStatusEnum.stopped) + and node.job.status_message() == cls._relaunch_status_message + ) + or (job_status == str(Task.TaskStatusEnum.failed) and not cls._final_failure.get(node.name)) + or not node.job.is_stopped() + ): + sleep(pool_period) + else: + break + @classmethod def component( cls, @@ -3157,7 +3378,7 @@ class PipelineDecorator(PipelineController): ) if cls._singleton: - cls._singleton.add_function_step(**add_step_spec) + cls._singleton._add_function_step(**add_step_spec) else: cls._added_decorator.append(add_step_spec) @@ -3319,7 +3540,10 @@ class PipelineDecorator(PipelineController): cls._evaluated_return_values[_tid] = [] cls._evaluated_return_values[_tid].append(_node.name) - return Task.get_task(_node.job.task_id()).artifacts[return_name].get() + task = Task.get_task(_node.job.task_id()) + if return_name in task.artifacts: + return task.artifacts[return_name].get() + return task.get_parameters(cast=True)[CreateFromFunction.return_section + "/" + return_name] return_w = [LazyEvalWrapper( callback=functools.partial(result_wrapper, n), @@ -3562,11 +3786,8 @@ class PipelineDecorator(PipelineController): waited = True # store the pipeline result of we have any: if return_value and pipeline_result is not None: - a_pipeline._task.upload_artifact( - name=str(return_value), - artifact_object=pipeline_result, - wait_on_upload=True, - extension_name=".pkl" if isinstance(pipeline_result, dict) else None, + a_pipeline._upload_pipeline_artifact( + artifact_name=str(return_value), artifact_object=pipeline_result ) # now we can stop the pipeline @@ -3674,12 +3895,7 @@ class PipelineDecorator(PipelineController): else: # we need to create an artifact artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k) - cls._singleton._task.upload_artifact( - name=artifact_name, - artifact_object=v, - wait_on_upload=True, - extension_name=".pkl" if isinstance(v, dict) else None, - ) + cls._singleton._upload_pipeline_artifact(artifact_name=artifact_name, artifact_object=v) _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \ "{}.{}".format(cls._singleton._task.id, artifact_name) diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index 5dc9ba30..cf09a910 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -471,10 +471,12 @@ class CreateAndPopulate(object): class CreateFromFunction(object): - kwargs_section = 'kwargs' - input_artifact_section = 'kwargs_artifacts' + kwargs_section = "kwargs" + return_section = "return" + input_artifact_section = "kwargs_artifacts" task_template = """from clearml import Task, TaskTypes from clearml.automation.controller import PipelineDecorator +from clearml.utilities.proxy_object import get_basic_type {function_source} @@ -488,23 +490,36 @@ if __name__ == '__main__': task.connect(kwargs, name='{kwargs_section}') function_input_artifacts = {function_input_artifacts} params = task.get_parameters() or dict() + return_section = '{return_section}' for k, v in params.items(): if not v or not k.startswith('{input_artifact_section}/'): continue k = k.replace('{input_artifact_section}/', '', 1) task_id, artifact_name = v.split('.', 1) - kwargs[k] = Task.get_task(task_id=task_id).artifacts[artifact_name].get() + parent_task = Task.get_task(task_id=task_id) + if artifact_name in parent_task.artifacts: + kwargs[k] = parent_task.artifacts[artifact_name].get() + else: + kwargs[k] = parent_task.get_parameters(cast=True)[return_section + '/' + artifact_name] results = {function_name}(**kwargs) result_names = {function_return} if result_names: if not isinstance(results, (tuple, list)) or len(result_names) == 1: results = [results] + parameters = dict() + parameters_types = dict() for name, artifact in zip(result_names, results): - task.upload_artifact( - name=name, - artifact_object=artifact, - extension_name='.pkl' if isinstance(artifact, dict) else None - ) + if isinstance(artifact, (float, int, bool, str)): + parameters[return_section + '/' + name] = artifact + parameters_types[return_section + '/' + name] = get_basic_type(artifact) + else: + task.upload_artifact( + name=name, + artifact_object=artifact, + extension_name='.pkl' if isinstance(artifact, dict) else None + ) + if parameters: + task._set_parameters(parameters, __parameters_types=parameters_types, __update=True) """ @classmethod @@ -663,7 +678,9 @@ if __name__ == '__main__': function_kwargs=function_kwargs, function_input_artifacts=function_input_artifacts, function_name=function_name, - function_return=function_return) + function_return=function_return, + return_section=cls.return_section, + ) temp_dir = repo if repo and os.path.isdir(repo) else None with tempfile.NamedTemporaryFile('w', suffix='.py', dir=temp_dir) as temp_file: