概述
本教程假定你现已对于 PyToch 练习一个简略模型有必定的根底了解。本教程将展现运用 3 种封装层级不同的方法调用 DDP (DistributedDataParallel) 进程,在多个 GPU 上练习同一个模型:
- 运用
pytorch.distributed
模块的原生 PyTorch DDP 模块 - 运用 Accelerate 对
pytorch.distributed
的轻量封装,保证程序可以在不修正代码或许少量修正代码的情况下在单个 GPU 或 TPU 下正常运转 - 运用 Transformer 的高级 Trainer API ,该 API 抽象封装了一切代码模板而且支持不同设备和分布式场景。
什么是分布式练习,为什么它很重要?
下面是一些非常根底的 PyTorch 练习代码,它基于 Pytorch 官方在 MNIST 上创立和练习模型的示例。
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
咱们界说练习设备 (cuda
):
device = "cuda"
构建一些基本的 PyTorch DataLoaders
:
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
把模型放入 CUDA 设备:
model = BasicNet().to(device)
构建 PyTorch optimizer
(优化器)
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
最终创立一个简略的练习和评价循环,练习循环会运用悉数练习数据集进行练习,评价循环会核算练习后模型在测试数据集上的准确度:
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
一般从这儿开端,就可以将一切的代码放入 Python 脚本或在 Jupyter Notebook 上运转它。
但是,只履行 python myscript.py
只会运用单个 GPU 运转脚本。如果有多个 GPU 资源可用,您将如何让这个脚本在两个 GPU 或多台机器上运转,经过分布式练习提高练习速度?这是 torch.distributed
发挥作用的当地。
PyTorch 分布式数据并行
望文生义,torch.distributed
旨在装备分布式练习。你可以运用它装备多个节点进行练习,例如:多机器下的单个 GPU,或许单台机器下的多个 GPU,或许两者的任意组合。
为了将上述代码转换为分布式练习,有必要首要界说一些设置装备,详细细节请参阅 DDP 运用教程
首要有必要声明 setup
和 cleanup
函数。这将创立一个进程组,而且一切核算进程都可以经过这个进程组通讯。
留意:在本教程的这一部分中,假定这些代码是在 Python 脚本文件中发动。稍后将讨论运用 Accelerate 的发动器,就不用声明
setup
和cleanup
函数了
import os
import torch.distributed as dist
def setup(rank, world_size):
"Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
os.environ["MASTER_ADDR"] = 'localhost'
os.environ["MASTER_PORT"] = "12355"
# Initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
"Cleans up the distributed environment"
dist.destroy_process_group()
最终一个疑问是,我怎样把我的数据和模型发送到另一个 GPU 上?
这正是 DistributedDataParallel
模块发挥作用的当地, 它将您的模型仿制到每个 GPU 上 ,而且当 loss.backward()
被调用进行反向传播的时候,一切这些模型副本的梯度将被同步地均匀/下降 (reduce)。这保证每个设备在履行优化器过程后具有相同的权重。
下面是咱们的练习设置示例,咱们运用了 DistributedDataParallel
重构了练习函数:
留意:此处的 rank 是当时 GPU 与一切其他可用 GPU 相比的总体 rank,这意味着它们的 rank 为
0 -> n-1
from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# Train for one epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
cleanup()
在上述的代码中需求为每个副本设备上的模型 (因此在这儿是ddp_model
的参数而不是 model
的参数) 声明优化器,以便正确核算每个副本设备上的梯度。
最终,要运转脚本,PyTorch 有一个便利的 torchrun
命令行模块可以供给协助。只需传入它应该运用的节点数以及要运转的脚本即可:
torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py
上面的代码可以在在一台机器上的两个 GPU 上运转练习脚本,这是运用 PyTorch 只进行分布式练习的情况 (不可以在单机单卡上运转)。
现在让咱们谈谈 Accelerate,一个旨在使并行化愈加无缝并有助于一些最佳实践的库。
Accelerate
Accelerate 是一个库,旨在无需大幅修正代码的情况下完结并行化。除此之外, Accelerate 顺便的数据 pipeline
还可以提高代码的功用。
首要,让咱们将刚刚履行的一切上述代码封装到一个函数中,以协助咱们直观地看到差异:
def train_ddp(rank, world_size):
setup(rank, world_size)
# Build DataLoaders
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# Build model
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
# Build optimizer
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# Evaluate
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
接下来让咱们谈谈 Accelerate 如何便利地完成并行化的。上面的代码有几个问题:
- 该代码有点低效,因为每个设备都会创立一个
dataloader
。 - 这些代码只能运转在多 GPU 下,当想让这个代码运转在单个 GPU 或 TPU 时,还需求额定进行一些修正。
Accelerate 经过 Accelerator
类解决上述问题。经过它,不论是单节点仍是多节点,除了三行代码外,其余代码简直保持不变,如下所示:
def train_ddp_accelerate():
accelerator = Accelerator()
# Build DataLoaders
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# Build model
model = BasicModel()
# Build optimizer
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# Send everything through `accelerator.prepare`
train_loader, test_loader, model, optimizer = accelerator.prepare(
train_loader, test_loader, model, optimizer
)
# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, target)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
# Evaluate
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
借助 Accelerator
对象,您的 PyTorch 练习循环现在已装备为可以在任何分布式情况运转。运用 Accelerator
改造后的代码依然可以经过 torchrun CLI
或经过 Accelerate 自己的 CLI 界面发动(发动你的 Accelerate 脚本)。
因此,现在可以尽可能保持 PyTorch 原生代码不变的前提下,运用 Accelerate 履行分布式练习。
早些时候有人说到 Accelerate 还可以使 DataLoaders
更高效。这是经过自界说采样器完成的,它可以在练习期间自动将部分批次发送到不同的设备,然后答应每个设备只需求储存数据的一部分,而不是一次将数据仿制四份存入内存,详细取决于装备。因此,内存总量中只要原始数据集的一个完整副本。该数据集会拆分后分配到各个练习节点上,然后答应在单个实例上练习更大的数据集,而不会使内存爆破
运用 notebook_launcher
之前说到您可以直接从 Jupyter Notebook 运转分布式代码。这来自 Accelerate 的 notebook_launcher
模块,它可以在 Jupyter Notebook 内部的代码发动多 GPU 练习。
运用它就像导入 launcher
一样简略:
from accelerate import notebook_launcher
接着传递咱们之前声明的练习函数、要传递的任何参数以及要运用的进程数(例如 TPU 上的 8 个,或两个 GPU 上的 2 个)。下面两个练习函数都可以运转,但请留意,发动单次发动后,实例需求重新发动才能发生另一个:
notebook_launcher(train_ddp, args=(), num_processes=2)
或许:
notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)
运用 Trainer
终于咱们来到了最高级的 API——Hugging Face Trainer.
它涵盖了尽可能多的练习类型,一起依然可以在分布式系统上进行练习,用户根本不需求做任何事情。
首要咱们需求导入 Trainer:
from transformers import Trainer
然后咱们界说一些 TrainingArguments
来控制一切常用的超参数。 Trainer 需求的练习数据是字典类型的,因此需求制作自界说收拾功用。
最终,咱们将练习器子类化并编写咱们自己的 compute_loss
.
之后,这段代码也可以分布式运转,而无需修正任何练习代码!
from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
***** Running training *****
Num examples = 60000
Num Epochs = 1
Instantaneous batch size per device = 64
Total train batch size (w. parallel, distributed & accumulation) = 64
Gradient Accumulation steps = 1
Total optimization steps = 938
Epoch | 练习丢失 | 验证丢失 |
---|---|---|
1 | 0.875700 | 0.282633 |
与上面的 notebook_launcher
示例相似,也可以将这个过程封装成一个练习函数:
def train_trainer_ddp():
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)
相关资源
- 要了解有关 PyTorch 分布式数据并行性的更多信息,请检查:
pytorch.org/docs/stable… - 要了解有关 Accelerate 的更多信息,请检查:
hf.co/docs/accele… - 要了解有关 Transformer 的更多信息,请检查:
hf.co/docs/transf…
原文作者:Zachary Mueller
译者:innovation64 (李洋)
审校:yaoqi (胡耀淇)
排版:zhongdongy (阿东)