mirror of
https://github.com/clearml/clearml
synced 2025-06-26 18:16:07 +00:00
Refactor examples
This commit is contained in:
@@ -1,180 +0,0 @@
|
||||
# TRAINS - example of TRAINS torch distributed support
|
||||
# notice all nodes will be reporting to the master Task (experiment)
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
from math import ceil
|
||||
from random import Random
|
||||
|
||||
import torch as th
|
||||
import torch.nn as nn
|
||||
import torch.distributed as dist
|
||||
import torch.nn.functional as F
|
||||
from torch import optim
|
||||
from torchvision import datasets, transforms
|
||||
|
||||
from trains import Task
|
||||
|
||||
|
||||
local_dataset_path = './MNIST_data'
|
||||
|
||||
|
||||
class Net(nn.Module):
|
||||
def __init__(self):
|
||||
super(Net, self).__init__()
|
||||
self.conv1 = nn.Conv2d(1, 32, 3, 1)
|
||||
self.conv2 = nn.Conv2d(32, 64, 3, 1)
|
||||
self.dropout1 = nn.Dropout2d(0.25)
|
||||
self.dropout2 = nn.Dropout2d(0.5)
|
||||
self.fc1 = nn.Linear(9216, 128)
|
||||
self.fc2 = nn.Linear(128, 10)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.conv1(x)
|
||||
x = F.relu(x)
|
||||
x = self.conv2(x)
|
||||
x = F.max_pool2d(x, 2)
|
||||
x = self.dropout1(x)
|
||||
x = th.flatten(x, 1)
|
||||
x = self.fc1(x)
|
||||
x = F.relu(x)
|
||||
x = self.dropout2(x)
|
||||
x = self.fc2(x)
|
||||
output = F.log_softmax(x, dim=1)
|
||||
return output
|
||||
|
||||
|
||||
class Partition(object):
|
||||
""" Dataset partitioning helper """
|
||||
def __init__(self, data, index):
|
||||
self.data = data
|
||||
self.index = index
|
||||
|
||||
def __len__(self):
|
||||
return len(self.index)
|
||||
|
||||
def __getitem__(self, index):
|
||||
data_idx = self.index[index]
|
||||
return self.data[data_idx]
|
||||
|
||||
|
||||
class DataPartitioner(object):
|
||||
def __init__(self, data, sizes=(0.7, 0.2, 0.1), seed=1234):
|
||||
self.data = data
|
||||
self.partitions = []
|
||||
rng = Random()
|
||||
rng.seed(seed)
|
||||
data_len = len(data)
|
||||
indexes = [x for x in range(0, data_len)]
|
||||
rng.shuffle(indexes)
|
||||
|
||||
for frac in sizes:
|
||||
part_len = int(frac * data_len)
|
||||
self.partitions.append(indexes[0:part_len])
|
||||
indexes = indexes[part_len:]
|
||||
|
||||
def use(self, partition):
|
||||
return Partition(self.data, self.partitions[partition])
|
||||
|
||||
|
||||
def partition_dataset(num_workers=4):
|
||||
""" Partitioning MNIST """
|
||||
dataset = datasets.MNIST(root=local_dataset_path, train=True, download=True,
|
||||
transform=transforms.Compose([
|
||||
transforms.ToTensor(),
|
||||
transforms.Normalize((0.1307,), (0.3081,))
|
||||
]))
|
||||
size = dist.get_world_size()
|
||||
bsz = int(128 / float(size))
|
||||
partition_sizes = [1.0 / size for _ in range(size)]
|
||||
partition = DataPartitioner(dataset, partition_sizes)
|
||||
partition = partition.use(dist.get_rank())
|
||||
train_set = th.utils.data.DataLoader(
|
||||
partition, num_workers=num_workers, batch_size=bsz, shuffle=True)
|
||||
return train_set, bsz
|
||||
|
||||
|
||||
def run(num_workers):
|
||||
""" Distributed Synchronous SGD Example """
|
||||
th.manual_seed(1234)
|
||||
train_set, bsz = partition_dataset(num_workers)
|
||||
model = Net()
|
||||
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
|
||||
|
||||
num_batches = ceil(len(train_set.dataset) / float(bsz))
|
||||
|
||||
from random import randint
|
||||
param = {'worker_{}_stuff'.format(dist.get_rank()): 'some stuff ' + str(randint(0, 100))}
|
||||
Task.current_task().connect(param)
|
||||
Task.current_task().upload_artifact(
|
||||
'temp {:02d}'.format(dist.get_rank()), artifact_object={'worker_rank': dist.get_rank()})
|
||||
|
||||
for epoch in range(2):
|
||||
epoch_loss = 0.0
|
||||
for i, (data, target) in enumerate(train_set):
|
||||
optimizer.zero_grad()
|
||||
output = model(data)
|
||||
loss = F.nll_loss(output, target)
|
||||
epoch_loss += loss.item()
|
||||
loss.backward()
|
||||
average_gradients(model)
|
||||
optimizer.step()
|
||||
if i % 10 == 0:
|
||||
print('{}] Train Epoch {} - {} \tLoss {:.6f}'.format(dist.get_rank(), epoch, i, loss))
|
||||
Task.current_task().get_logger().report_scalar(
|
||||
'loss', 'worker {:02d}'.format(dist.get_rank()), value=loss.item(), iteration=i)
|
||||
if i > 100:
|
||||
break
|
||||
print('Rank ', dist.get_rank(), ', epoch ',
|
||||
epoch, ': ', epoch_loss / num_batches)
|
||||
|
||||
|
||||
def average_gradients(model):
|
||||
""" Gradient averaging. """
|
||||
size = float(dist.get_world_size())
|
||||
for param in model.parameters():
|
||||
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
|
||||
param.grad.data /= size
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument('--nodes', help='number of nodes', type=int, default=10)
|
||||
parser.add_argument('--workers_in_node', help='number of workers per node', type=int, default=3)
|
||||
# this argument we will not be logging, see below Task.init
|
||||
parser.add_argument('--rank', help='current rank', type=int)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# We have to initialize the task in the master process,
|
||||
# it will make sure that any sub-process calling Task.init will get the master task object
|
||||
# notice that we exclude the `rank` argument, so we can launch multiple sub-processes with trains-agent
|
||||
# otherwise, the `rank` will always be set to the original value.
|
||||
task = Task.init("examples", "test torch distributed", auto_connect_arg_parser={'rank': False})
|
||||
|
||||
if os.environ.get('MASTER_ADDR'):
|
||||
dist.init_process_group(backend='gloo', rank=args.rank, world_size=args.nodes)
|
||||
run(args.workers_in_node)
|
||||
else:
|
||||
# first let's download the dataset, if we have multiple machines,
|
||||
# they will take care of it when they get there
|
||||
datasets.MNIST(root=local_dataset_path, train=True, download=True)
|
||||
|
||||
os.environ['MASTER_ADDR'] = '127.0.0.1'
|
||||
os.environ['MASTER_PORT'] = '29500'
|
||||
|
||||
print(os.getpid(), 'ARGS:', args)
|
||||
processes = []
|
||||
for rank in range(args.nodes):
|
||||
cmd = [sys.executable, sys.argv[0],
|
||||
'--nodes', str(args.nodes),
|
||||
'--workers_in_node', str(args.workers_in_node),
|
||||
'--rank', str(rank)]
|
||||
print(cmd)
|
||||
p = subprocess.Popen(cmd, cwd=os.getcwd(), pass_fds=[], close_fds=True)
|
||||
processes.append(p)
|
||||
|
||||
for p in processes:
|
||||
p.wait()
|
||||
1
examples/distributed/pytorch_distributed_example.py
Symbolic link
1
examples/distributed/pytorch_distributed_example.py
Symbolic link
@@ -0,0 +1 @@
|
||||
../frameworks/pytorch/pytorch_distributed_example.py
|
||||
3
examples/distributed/requirements.txt
Normal file
3
examples/distributed/requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
torch>=1.1.0
|
||||
torchvision>=0.3.0
|
||||
trains
|
||||
Reference in New Issue
Block a user