mirror of
https://github.com/clearml/clearml
synced 2025-04-18 13:24:41 +00:00
Enhance example code for unit-testing
This commit is contained in:
parent
c58f8649a2
commit
4f68e0bb01
@ -6,42 +6,38 @@ import subprocess
|
|||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
from random import randint
|
||||||
|
|
||||||
from clearml import Task
|
from clearml import Task
|
||||||
|
|
||||||
# fake data for us to "process"
|
|
||||||
data = (
|
|
||||||
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
|
|
||||||
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def mp_worker(arguments):
|
def mp_worker(arguments):
|
||||||
print('sub process', os.getpid())
|
print('sub process', os.getpid())
|
||||||
inputs, the_time = arguments
|
data, extra_dict = arguments
|
||||||
from random import randint
|
inputs, the_time = data
|
||||||
additional_parameters = {'stuff_' + str(randint(0, 100)): 'some stuff ' + str(randint(0, 100))}
|
Task.current_task().connect(extra_dict)
|
||||||
Task.current_task().connect(additional_parameters)
|
|
||||||
print(" Process %s\tWaiting %s seconds" % (inputs, the_time))
|
print(" Process %s\tWaiting %s seconds" % (inputs, the_time))
|
||||||
time.sleep(int(the_time))
|
time.sleep(int(the_time))
|
||||||
print(" Process %s\tDONE" % inputs)
|
print(" Process %s\tDONE" % inputs)
|
||||||
|
|
||||||
|
|
||||||
def mp_handler(use_subprocess):
|
def mp_handler(use_subprocess, data, additional_parameters):
|
||||||
if use_subprocess:
|
if use_subprocess:
|
||||||
process = multiprocessing.Pool(4)
|
process = multiprocessing.Pool(4)
|
||||||
else:
|
else:
|
||||||
process = multiprocessing.pool.ThreadPool(4)
|
process = multiprocessing.pool.ThreadPool(4)
|
||||||
process.map(mp_worker, data)
|
|
||||||
|
process.map(mp_worker, zip(data, additional_parameters))
|
||||||
process.close()
|
process.close()
|
||||||
print('DONE main !!!')
|
print('DONE main !!!')
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
parser = ArgumentParser()
|
parser = ArgumentParser()
|
||||||
parser.add_argument('--num_workers', help='integer value', type=int, default=3)
|
parser.add_argument('--num-workers', help='integer value', type=int, default=3)
|
||||||
parser.add_argument('--use-subprocess', help="Use sub processes", dest='subprocess', action='store_true')
|
parser.add_argument('--use-subprocess', help="Use sub processes", dest='subprocess', action='store_true')
|
||||||
parser.add_argument('--no-subprocess', help="Use threads", dest='subprocess', action='store_false')
|
parser.add_argument('--no-subprocess', help="Use threads", dest='subprocess', action='store_false')
|
||||||
|
parser.add_argument('--additional-parameters', help='task parameters', type=str, nargs="+")
|
||||||
parser.set_defaults(subprocess=True)
|
parser.set_defaults(subprocess=True)
|
||||||
# this argument we will not be logging, see below Task.init
|
# this argument we will not be logging, see below Task.init
|
||||||
parser.add_argument('--counter', help='integer value', type=int, default=-1)
|
parser.add_argument('--counter', help='integer value', type=int, default=-1)
|
||||||
@ -49,6 +45,18 @@ if __name__ == '__main__':
|
|||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
print(os.getpid(), 'ARGS:', args)
|
print(os.getpid(), 'ARGS:', args)
|
||||||
|
|
||||||
|
# Fake data for us to "process"
|
||||||
|
data = (
|
||||||
|
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
|
||||||
|
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'],
|
||||||
|
)
|
||||||
|
|
||||||
|
if not args.additional_parameters:
|
||||||
|
# Random task parameters
|
||||||
|
args.additional_parameters = [
|
||||||
|
f"stuff_{str(randint(0, 100))}:some stuff {str(randint(0, 100))}" for _ in range(len(data))]
|
||||||
|
task_parameters = (dict([p.partition(":")[::2]]) for p in args.additional_parameters)
|
||||||
|
|
||||||
# We have to initialize the task in the master process,
|
# We have to initialize the task in the master process,
|
||||||
# it will make sure that any sub-process calling Task.init will get the master task object
|
# it will make sure that any sub-process calling Task.init will get the master task object
|
||||||
# notice that we exclude the `counter` argument, so we can launch multiple sub-processes with clearml-agent
|
# notice that we exclude the `counter` argument, so we can launch multiple sub-processes with clearml-agent
|
||||||
@ -68,13 +76,15 @@ if __name__ == '__main__':
|
|||||||
if counter > 0:
|
if counter > 0:
|
||||||
cmd = [sys.executable, sys.argv[0],
|
cmd = [sys.executable, sys.argv[0],
|
||||||
'--counter', str(counter - 1),
|
'--counter', str(counter - 1),
|
||||||
'--num_workers', str(args.num_workers),
|
'--num-workers', str(args.num_workers),
|
||||||
'--use-subprocess' if args.subprocess else '--no-subprocess']
|
'--use-subprocess' if args.subprocess else '--no-subprocess',
|
||||||
|
'--additional-parameters', *args.additional_parameters]
|
||||||
|
|
||||||
print(cmd)
|
print(cmd)
|
||||||
p = subprocess.Popen(cmd, cwd=os.getcwd())
|
p = subprocess.Popen(cmd, cwd=os.getcwd())
|
||||||
|
|
||||||
# the actual "processing" is done here
|
# the actual "processing" is done here
|
||||||
mp_handler(args.subprocess)
|
mp_handler(args.subprocess, data, task_parameters)
|
||||||
print('Done logging')
|
print('Done logging')
|
||||||
|
|
||||||
# wait for the process we launched
|
# wait for the process we launched
|
||||||
|
Loading…
Reference in New Issue
Block a user