Added distributed examples

This commit is contained in:
allegroai 2020-05-24 15:37:55 +03:00
parent ba4c4c558f
commit 9e0a5a36fb
2 changed files with 248 additions and 0 deletions

View File

@ -0,0 +1,74 @@
# TRAINS - example of multiple sub-processes interacting and reporting to a single master experiment
import multiprocessing
import os
import subprocess
import sys
import time
from argparse import ArgumentParser
from trains import Task
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'],
)
def mp_worker(arguments):
print('sub process', os.getpid())
inputs, the_time = arguments
from random import randint
additional_parameters = {'stuff_' + str(randint(0, 100)): 'some stuff ' + str(randint(0, 100))}
Task.current_task().connect(additional_parameters)
print(" Process %s\tWaiting %s seconds" % (inputs, the_time))
time.sleep(int(the_time))
print(" Process %s\tDONE" % inputs)
def mp_handler(use_subprocess):
if use_subprocess:
process = multiprocessing.Pool(4)
else:
process = multiprocessing.pool.ThreadPool(4)
process.map(mp_worker, data)
process.close()
print('DONE main !!!')
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--num_workers', help='integer value', type=int, default=3)
parser.add_argument('--counter', help='integer value', type=int, default=-1)
parser.add_argument('--use_subprocess', help='integer value', type=int, default=1)
args = parser.parse_args()
print(os.getpid(), 'ARGS:', args)
# We have to initialize the task in the master process,
# it will make sure that any sub-process calling Task.init will get the master task object
# notice that we exclude the counter argument, so we can launch multiple sub-processes with trains-agent
# otherwise, the counter will always be set to the original value
task = Task.init('examples', 'POpen example', auto_connect_arg_parser={'counter': False})
# we can connect multiple dictionaries, each from different process, as long as the keys have different names
param = {'args_{}'.format(args.num_workers): 'some value {}'.format(args.num_workers)}
task.connect(param)
# check if we need to start the process, meaning counter is negative
counter = args.num_workers if args.counter < 0 else args.counter
p = None
if counter > 0:
cmd = [sys.executable, sys.argv[0],
'--counter', str(counter - 1),
'--num_workers', str(args.num_workers),
'--use_subprocess', str(args.use_subprocess)]
print(cmd)
p = subprocess.Popen(cmd, cwd=os.getcwd())
mp_handler(args.use_subprocess)
print('Done logging')
if p and counter > 0:
p.wait()
print('Exiting')

View File

@ -0,0 +1,174 @@
# TRAINS - example of TRAINS torch distributed support
# notice all nodes will be reporting to the master Task (experiment)
import os
import subprocess
import sys
from argparse import ArgumentParser
from math import ceil
from random import Random
import torch as th
import torch.nn as nn
import torch.distributed as dist
import torch.nn.functional as F
from torch import optim
from torchvision import datasets, transforms
from trains import Task
local_dataset_path = './MNIST_data'
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout2d(0.25)
self.dropout2 = nn.Dropout2d(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = th.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
class Partition(object):
""" Dataset partitioning helper """
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=(0.7, 0.2, 0.1), seed=1234):
self.data = data
self.partitions = []
rng = Random()
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
def partition_dataset(num_workers=4):
""" Partitioning MNIST """
dataset = datasets.MNIST(root=local_dataset_path, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = int(128 / float(size))
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = th.utils.data.DataLoader(
partition, num_workers=num_workers, batch_size=bsz, shuffle=True)
return train_set, bsz
def run(num_workers):
""" Distributed Synchronous SGD Example """
th.manual_seed(1234)
train_set, bsz = partition_dataset(num_workers)
model = Net()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
from random import randint
param = {'worker_{}_stuff'.format(dist.get_rank()): 'some stuff ' + str(randint(0, 100))}
Task.current_task().connect(param)
Task.current_task().upload_artifact(
'temp {:02d}'.format(dist.get_rank()), artifact_object={'worker_rank': dist.get_rank()})
for epoch in range(2):
epoch_loss = 0.0
for i, (data, target) in enumerate(train_set):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
if i % 10 == 0:
print('{}] Train Epoch {} - {} \tLoss {:.6f}'.format(dist.get_rank(), epoch, i, loss))
Task.current_task().get_logger().report_scalar(
'loss', 'worker {:02d}'.format(dist.get_rank()), value=loss.item(), iteration=i)
if i > 100:
break
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
def average_gradients(model):
""" Gradient averaging. """
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
param.grad.data /= size
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument('--nodes', help='number of nodes', type=int, default=10)
parser.add_argument('--workers_in_node', help='number of workers per node', type=int, default=3)
parser.add_argument('--rank', help='current rank', type=int)
args = parser.parse_args()
task = Task.init("examples", "test torch distributed")
if os.environ.get('MASTER_ADDR'):
dist.init_process_group(backend='gloo', rank=args.rank, world_size=args.nodes)
run(args.workers_in_node)
else:
# first let's download the dataset, if we have multiple machines,
# they will take care of it when they get there
datasets.MNIST(root=local_dataset_path, train=True, download=True)
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
print(os.getpid(), 'ARGS:', args)
processes = []
for rank in range(args.nodes):
cmd = [sys.executable, sys.argv[0],
'--nodes', str(args.nodes),
'--workers_in_node', str(args.workers_in_node),
'--rank', str(rank)]
print(cmd)
p = subprocess.Popen(cmd, cwd=os.getcwd(), pass_fds=[], close_fds=True)
processes.append(p)
for p in processes:
p.wait()