This commit is contained in:
revital 2023-05-02 14:41:01 +03:00
commit eea7931267
3 changed files with 32 additions and 19 deletions

View File

@ -4164,8 +4164,6 @@ class PipelineDecorator(PipelineController):
a_pipeline._task._set_runtime_properties(
dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter)))
a_pipeline._start(wait=False)
# sync arguments back (post deserialization and casting back)
for k in pipeline_kwargs.keys():
if k in a_pipeline.get_parameters():
@ -4178,6 +4176,8 @@ class PipelineDecorator(PipelineController):
a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue)
# when we get here it means we are running remotely
a_pipeline._start(wait=False)
# this time the pipeline is executed only on the remote machine
try:
pipeline_result = func(**pipeline_kwargs)

View File

@ -47,7 +47,10 @@ else:
try:
from requests.packages.urllib3.contrib import appengine as gaecontrib
except ImportError:
from urllib3.contrib import appengine as gaecontrib
try:
from urllib3.contrib import appengine as gaecontrib
except ImportError:
gaecontrib = None
if requests.__build__ < 0x021200:
PyOpenSSLContext = None

View File

@ -6,42 +6,38 @@ import subprocess
import sys
import time
from argparse import ArgumentParser
from random import randint
from clearml import Task
# fake data for us to "process"
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'],
)
def mp_worker(arguments):
print('sub process', os.getpid())
inputs, the_time = arguments
from random import randint
additional_parameters = {'stuff_' + str(randint(0, 100)): 'some stuff ' + str(randint(0, 100))}
Task.current_task().connect(additional_parameters)
data, extra_dict = arguments
inputs, the_time = data
Task.current_task().connect(extra_dict)
print(" Process %s\tWaiting %s seconds" % (inputs, the_time))
time.sleep(int(the_time))
print(" Process %s\tDONE" % inputs)
def mp_handler(use_subprocess):
def mp_handler(use_subprocess, data, additional_parameters):
if use_subprocess:
process = multiprocessing.Pool(4)
else:
process = multiprocessing.pool.ThreadPool(4)
process.map(mp_worker, data)
process.map(mp_worker, zip(data, additional_parameters))
process.close()
print('DONE main !!!')
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--num_workers', help='integer value', type=int, default=3)
parser.add_argument('--num-workers', help='integer value', type=int, default=3)
parser.add_argument('--use-subprocess', help="Use sub processes", dest='subprocess', action='store_true')
parser.add_argument('--no-subprocess', help="Use threads", dest='subprocess', action='store_false')
parser.add_argument('--additional-parameters', help='task parameters', type=str, nargs="+")
parser.set_defaults(subprocess=True)
# this argument we will not be logging, see below Task.init
parser.add_argument('--counter', help='integer value', type=int, default=-1)
@ -49,6 +45,18 @@ if __name__ == '__main__':
args = parser.parse_args()
print(os.getpid(), 'ARGS:', args)
# Fake data for us to "process"
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'],
)
if not args.additional_parameters:
# Random task parameters
args.additional_parameters = [
f"stuff_{str(randint(0, 100))}:some stuff {str(randint(0, 100))}" for _ in range(len(data))]
task_parameters = (dict([p.partition(":")[::2]]) for p in args.additional_parameters)
# We have to initialize the task in the master process,
# it will make sure that any sub-process calling Task.init will get the master task object
# notice that we exclude the `counter` argument, so we can launch multiple sub-processes with clearml-agent
@ -68,13 +76,15 @@ if __name__ == '__main__':
if counter > 0:
cmd = [sys.executable, sys.argv[0],
'--counter', str(counter - 1),
'--num_workers', str(args.num_workers),
'--use-subprocess' if args.subprocess else '--no-subprocess']
'--num-workers', str(args.num_workers),
'--use-subprocess' if args.subprocess else '--no-subprocess',
'--additional-parameters', *args.additional_parameters]
print(cmd)
p = subprocess.Popen(cmd, cwd=os.getcwd())
# the actual "processing" is done here
mp_handler(args.subprocess)
mp_handler(args.subprocess, data, task_parameters)
print('Done logging')
# wait for the process we launched