From 9e0a5a36fbff10b431e94fa5f1531bb7f428ab6a Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 24 May 2020 15:37:55 +0300 Subject: [PATCH] Added distributed examples --- examples/distributed/example_subprocess.py | 74 ++++++++ .../distributed/example_torch_distributed.py | 174 ++++++++++++++++++ 2 files changed, 248 insertions(+) create mode 100644 examples/distributed/example_subprocess.py create mode 100644 examples/distributed/example_torch_distributed.py diff --git a/examples/distributed/example_subprocess.py b/examples/distributed/example_subprocess.py new file mode 100644 index 00000000..5c0be3b5 --- /dev/null +++ b/examples/distributed/example_subprocess.py @@ -0,0 +1,74 @@ +# TRAINS - example of multiple sub-processes interacting and reporting to a single master experiment + +import multiprocessing +import os +import subprocess +import sys +import time +from argparse import ArgumentParser + +from trains import Task + +data = ( + ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], + ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'], +) + + +def mp_worker(arguments): + print('sub process', os.getpid()) + inputs, the_time = arguments + from random import randint + additional_parameters = {'stuff_' + str(randint(0, 100)): 'some stuff ' + str(randint(0, 100))} + Task.current_task().connect(additional_parameters) + print(" Process %s\tWaiting %s seconds" % (inputs, the_time)) + time.sleep(int(the_time)) + print(" Process %s\tDONE" % inputs) + + +def mp_handler(use_subprocess): + if use_subprocess: + process = multiprocessing.Pool(4) + else: + process = multiprocessing.pool.ThreadPool(4) + process.map(mp_worker, data) + process.close() + print('DONE main !!!') + + +if __name__ == '__main__': + parser = ArgumentParser() + parser.add_argument('--num_workers', help='integer value', type=int, default=3) + parser.add_argument('--counter', help='integer value', type=int, default=-1) + parser.add_argument('--use_subprocess', help='integer value', type=int, default=1) + + args = parser.parse_args() + print(os.getpid(), 'ARGS:', args) + + # We have to initialize the task in the master process, + # it will make sure that any sub-process calling Task.init will get the master task object + # notice that we exclude the counter argument, so we can launch multiple sub-processes with trains-agent + # otherwise, the counter will always be set to the original value + task = Task.init('examples', 'POpen example', auto_connect_arg_parser={'counter': False}) + + # we can connect multiple dictionaries, each from different process, as long as the keys have different names + param = {'args_{}'.format(args.num_workers): 'some value {}'.format(args.num_workers)} + task.connect(param) + + # check if we need to start the process, meaning counter is negative + counter = args.num_workers if args.counter < 0 else args.counter + + p = None + if counter > 0: + cmd = [sys.executable, sys.argv[0], + '--counter', str(counter - 1), + '--num_workers', str(args.num_workers), + '--use_subprocess', str(args.use_subprocess)] + print(cmd) + p = subprocess.Popen(cmd, cwd=os.getcwd()) + + mp_handler(args.use_subprocess) + print('Done logging') + if p and counter > 0: + p.wait() + print('Exiting') diff --git a/examples/distributed/example_torch_distributed.py b/examples/distributed/example_torch_distributed.py new file mode 100644 index 00000000..d1578eba --- /dev/null +++ b/examples/distributed/example_torch_distributed.py @@ -0,0 +1,174 @@ +# TRAINS - example of TRAINS torch distributed support +# notice all nodes will be reporting to the master Task (experiment) + +import os +import subprocess +import sys +from argparse import ArgumentParser +from math import ceil +from random import Random + +import torch as th +import torch.nn as nn +import torch.distributed as dist +import torch.nn.functional as F +from torch import optim +from torchvision import datasets, transforms + +from trains import Task + + +local_dataset_path = './MNIST_data' + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 32, 3, 1) + self.conv2 = nn.Conv2d(32, 64, 3, 1) + self.dropout1 = nn.Dropout2d(0.25) + self.dropout2 = nn.Dropout2d(0.5) + self.fc1 = nn.Linear(9216, 128) + self.fc2 = nn.Linear(128, 10) + + def forward(self, x): + x = self.conv1(x) + x = F.relu(x) + x = self.conv2(x) + x = F.max_pool2d(x, 2) + x = self.dropout1(x) + x = th.flatten(x, 1) + x = self.fc1(x) + x = F.relu(x) + x = self.dropout2(x) + x = self.fc2(x) + output = F.log_softmax(x, dim=1) + return output + + +class Partition(object): + """ Dataset partitioning helper """ + def __init__(self, data, index): + self.data = data + self.index = index + + def __len__(self): + return len(self.index) + + def __getitem__(self, index): + data_idx = self.index[index] + return self.data[data_idx] + + +class DataPartitioner(object): + def __init__(self, data, sizes=(0.7, 0.2, 0.1), seed=1234): + self.data = data + self.partitions = [] + rng = Random() + rng.seed(seed) + data_len = len(data) + indexes = [x for x in range(0, data_len)] + rng.shuffle(indexes) + + for frac in sizes: + part_len = int(frac * data_len) + self.partitions.append(indexes[0:part_len]) + indexes = indexes[part_len:] + + def use(self, partition): + return Partition(self.data, self.partitions[partition]) + + +def partition_dataset(num_workers=4): + """ Partitioning MNIST """ + dataset = datasets.MNIST(root=local_dataset_path, train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + size = dist.get_world_size() + bsz = int(128 / float(size)) + partition_sizes = [1.0 / size for _ in range(size)] + partition = DataPartitioner(dataset, partition_sizes) + partition = partition.use(dist.get_rank()) + train_set = th.utils.data.DataLoader( + partition, num_workers=num_workers, batch_size=bsz, shuffle=True) + return train_set, bsz + + +def run(num_workers): + """ Distributed Synchronous SGD Example """ + th.manual_seed(1234) + train_set, bsz = partition_dataset(num_workers) + model = Net() + optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) + + num_batches = ceil(len(train_set.dataset) / float(bsz)) + + from random import randint + param = {'worker_{}_stuff'.format(dist.get_rank()): 'some stuff ' + str(randint(0, 100))} + Task.current_task().connect(param) + Task.current_task().upload_artifact( + 'temp {:02d}'.format(dist.get_rank()), artifact_object={'worker_rank': dist.get_rank()}) + + for epoch in range(2): + epoch_loss = 0.0 + for i, (data, target) in enumerate(train_set): + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + epoch_loss += loss.item() + loss.backward() + average_gradients(model) + optimizer.step() + if i % 10 == 0: + print('{}] Train Epoch {} - {} \tLoss {:.6f}'.format(dist.get_rank(), epoch, i, loss)) + Task.current_task().get_logger().report_scalar( + 'loss', 'worker {:02d}'.format(dist.get_rank()), value=loss.item(), iteration=i) + if i > 100: + break + print('Rank ', dist.get_rank(), ', epoch ', + epoch, ': ', epoch_loss / num_batches) + + +def average_gradients(model): + """ Gradient averaging. """ + size = float(dist.get_world_size()) + for param in model.parameters(): + dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM) + param.grad.data /= size + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument('--nodes', help='number of nodes', type=int, default=10) + parser.add_argument('--workers_in_node', help='number of workers per node', type=int, default=3) + parser.add_argument('--rank', help='current rank', type=int) + + args = parser.parse_args() + task = Task.init("examples", "test torch distributed") + + if os.environ.get('MASTER_ADDR'): + dist.init_process_group(backend='gloo', rank=args.rank, world_size=args.nodes) + run(args.workers_in_node) + else: + # first let's download the dataset, if we have multiple machines, + # they will take care of it when they get there + datasets.MNIST(root=local_dataset_path, train=True, download=True) + + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + + print(os.getpid(), 'ARGS:', args) + processes = [] + for rank in range(args.nodes): + cmd = [sys.executable, sys.argv[0], + '--nodes', str(args.nodes), + '--workers_in_node', str(args.workers_in_node), + '--rank', str(rank)] + print(cmd) + p = subprocess.Popen(cmd, cwd=os.getcwd(), pass_fds=[], close_fds=True) + processes.append(p) + + for p in processes: + p.wait()