0x00 摘要
前文咱们对DDP的一些支撑模块现已做了介绍,这为本文做了必要的铺垫,本文就开端介绍Python国际代码和C++国际的初始化部分。下文介绍C++国际的中心代码。
本系列其他文章如下:
[源码解析] PyTorch 分布式(1)——前史和概述
[源码解析] PyTorch 如何运用GPU
[源码解析] PyTorch 分布式(2) —– DataParallel(上)
[源码解析] PyTorch 分布式(3) —– DataParallel(下)
[源码解析] PyTorch 分布式(4)——分布式运用根底概念
[源码解析] PyTorch 分布式(5) —— DistributedDataParallel 总述&如何运用
[源码解析] PyTorch分布式(6) —DistributedDataParallel — 初始化&store
[源码解析] PyTorch 分布式(7) —– DistributedDataParallel 之进程组
[源码解析] PyTorch 分布式(8) ——– DistributedDataParallel之论文篇
0x01 综述
1.1 数据并行
DDP是数据并行练习的完结,为了唤醒咱们的回忆,咱们仍是要看看数据并行的一个全体流程,来自fairscale github源码。
1.2 DDP架构
以下文字翻译自 pytorch.org/docs/master…
下面是 DDP 完结组件。堆栈图显现了代码的结构。
咱们顺着此架构图从上往下看。
1.2.1 分布式数据并行
最上面是分布式数据并行组件。
- Distributed.py:
- 这是 DDP 的 Python 进口点。它完结了初始化过程,对应了
nn.parallel.DistributedDataParallel
模块的forward
函数,该模块会调用C++库。 - 它的
_sync_param
功能是:当一个DDP进程在多个设备上作业时,会履行进程内参数同步,而且它还从rank 0 进程向一切其他进程播送模型缓冲区。 - 进程间参数同步在
Reducer.cpp
之中完结。
- 这是 DDP 的 Python 进口点。它完结了初始化过程,对应了
- comm.h:完结合并播送帮手函数(coalesced broadcast helper ),该函数在初始化期间被调用以播送模型状况,并在前向传达之前同步模型缓冲区。
- reducer.h:供给反向传达中梯度同步的中心完结。它具有三个进口点函数:
-
Reducer
: 其结构函数在distributed.py
被调用,Reducer
将注册Reducer::autograd_hook()
到梯度累加器。 -
autograd_hook()
当梯度安排妥当时,autograd 引擎将调用该函数。 -
prepare_for_backward()
在distributed.py
之中,当 DDP 前向传递结束时,会调用prepare_for_backward()
。假如在DDP结构函数中,把find_unused_parameters
设置为True
,DDP 会遍历 autograd 核算图以查找未运用的参数。
-
1.2.2 进程
以下是两个进程相关组件。
- ProcessGroup.hpp :包含一切进程组完结的抽象 API。
c10d
库供给了 3 个开箱即用的完结,即 ProcessGroupGloo,ProcessGroupNCCL和ProcessGroupMPI。DistributedDataParallel
用ProcessGroup::broadcast()
在初始化期间将模型状况从rank 0 的进程发送到其他进程,并对ProcessGroup::allreduce()
梯度求和。 - Store.hpp :帮忙进程组实例的调集服务找到彼此。
1.3 DDP 整体完结
咱们把论文和 pytorch.org/docs/master… 结合起来,看看 DDP 整体完结。
咱们总结一次DistributedDataParallel迭代中的过程如下(与上图不完全共同,有部分细化):
-
Prerequisite:
- DDP 依赖 c10d
ProcessGroup
进行通讯。因而,运用程序必须ProcessGroup
在构建 DDP 之前创立实例。
- DDP 依赖 c10d
-
Constuctor:
- rank 0 进程会引证本地模块,把模型
state_dict()
参数播送到一切进程之中,这样能够确保一切进程运用相同初始化数值和模型副本进行练习。 - 每个 DDP 进程创立一个 local
Reducer
,稍后将在向后传递期间处理梯度同步。 - 为了进步通讯功率,
Reducer
将参数梯度组织成桶,一次规约一个桶。- 初始化桶,依照逆序把 parameters 分配到桶之中,这样能够进步通讯功率。
- 能够经过设置DDP 结构函数中的参数bucket_cap_mb来装备桶的巨细。
- 从参数梯度到桶的映射是在构建时根据桶巨细约束和参数巨细确认的。模型参数以(大致)
Model.parameters()
与给定模型相反的次第分配到桶中 。运用相反次第的原因是由于 DDP 希望梯度在反向传递期间以大约该次第准备安排妥当。 - 下图显现了一个示例。请留意,
grad0
和grad1
在bucket1
中,另外两个梯度在bucket0
中。当然,这种假定可能并不总是正确的,当这种情况发生时,它可能会损害 DDP 后向速度,由于它无法Reducer
尽早开端通讯。
- 除了分桶,
Reducer
还在结构期间注册 autograd 钩子,每个参数一个钩子。当梯度准备好时,将在向后传递期间触发这些钩子。详细便是遍历参数,为每个参数加上 grad_accumulator 和 autograd_hook。
- rank 0 进程会引证本地模块,把模型
-
Forward Pass:
- 每个进程读去自己的练习数据,DistributedSampler确保每个进程读到的数据不同。
- DDP 获取输入并将其传递给本地模型。
- 模型进行前向核算,成果设置为 out。现在核算都是在每个进程(CUDA设备)上完结。
- 假如
find_unused_parameters
设置为True
,DDP 会剖析本地模型的输出,从 out 开端遍历核算图,把未运用参数标明为 ready,由于每次核算图都会改动,所以每次都要遍历。- 此模式(Mode)答应在模型的子图上向后运转,而且 DDP 经过从模型输出out遍历 autograd 图并将一切未运用的参数标记为安排妥当,以削减反向传递中触及的参数。
- 在向后传递期间,
Reducer
只会等候未准备好的参数,但它仍然会规约一切桶。将参数梯度标记为安排妥当并不能协助 DDP 越过桶,但它会阻挠 DDP 在向后传递期间永远等候不存在的梯度。 - 请留意,遍历 autograd 图会引进额定的开销,因而运用程序仅应必要时才设置
find_unused_parameters
为True
。
- 回来out。模型网络输出不需求gather到 rank 0进程了,这与 DP不同。
-
Backward Pass:
-
backward()
在 loss 上直接调用该函数Tensor
,这是 DDP 无法控制的,DDP 运用结构时注册的 autograd hooks 来触发梯度同步。当一个梯度准备好时,它在该梯度累加器上的相应 DDP 钩子将触发。 - 在 autograd_hook 之中进行all-reduce。假定参数index是param_index,则运用param_index获取到参数,标明为ready,假如某个桶里边梯度都ready,则该桶是ready。
- 当一个桶中的梯度都准备好时,会 在该桶上
Reducer
发动异步allreduce
以核算一切进程的梯度均匀值。 - 假如一切桶都ready,则等候一切 all-reduce 完结。当一切桶都准备好时,
Reducer
将堵塞等候一切allreduce
操作完结。完结此操作后,将均匀梯度写入param.grad
一切参数的字段。 - 一切进程的梯度都会reduce,更新之后,咱们的模型权重都相同。所以在向后传达完结之后,跨不同DDP进程的对应的相同参数上的 grad 字段应该是持平的。
- 不需求像 DP 那样每次迭代之后还要播送参数。可是 Buffers 仍是需求在每次迭代由 rank 0 进程播送到其他进程之上。
-
-
Optimizer Step:
-
从优化器的角度来看,它正在优化本地模型。
-
一切 DDP 进程上的模型副本都能够坚持同步,由于它们都从相同的状况开端,而且在每次迭代中都具有相同的均匀梯度。
-
0x02 初始化
由于 Python 国际是能够在许多时间给类设置成员变量,因而咱们仍是从 __init__
看起。
2.1 __init__
其中心逻辑是:
-
设置设备类型。
-
设置设备IDs。
-
设置 self.process_group,默许便是 GroupMember.WORLD。
-
装备各品种成员变量。
-
查看 parameters。
-
设定bucket巨细。
-
构建参数。
-
将 rank 0 的state_dict() 播送到其他worker,以确保一切worker的模型初始状况相同。
-
树立reducer。
详细代码如下:
class DistributedDataParallel(Module):
def __init__(
self,
module,
device_ids=None,
output_device=None,
dim=0,
broadcast_buffers=True,
process_group=None,
bucket_cap_mb=25,
find_unused_parameters=False,
check_reduction=False,
gradient_as_bucket_view=False,
):
super(DistributedDataParallel, self).__init__()
# 设置设备类型
self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1
distinct_device_types = {p.device.type for p in module.parameters()}
self.device_type = list(distinct_device_types)[0]
# 设置设备IDs
if (
device_ids is None
or len(device_ids) == 0 # For backward compatibility.
or self.device_type == "cpu"
or self.is_multi_device_module
):
self.device_ids = None
self.output_device = None
else:
self.device_ids = [_get_device_index(x, True) for x in device_ids]
if output_device is None:
output_device = device_ids[0]
self.output_device = _get_device_index(output_device, True)
# 设置process group
if process_group is None:
self.process_group = _get_default_group()
else:
self.process_group = process_group
# 装备各种成员变量
self.static_graph = False
self.dim = dim
self.module = module
self.device = list(self.module.parameters())[0].device
self.broadcast_buffers = broadcast_buffers
self.find_unused_parameters = find_unused_parameters
self.require_backward_grad_sync = True
self.require_forward_param_sync = True
self.ddp_uneven_inputs_config = _DDPUnevenInputsConfig(
ddp_join_enabled=False,
ddp_join_divide_by_initial_world_size=False,
ddp_join_throw_on_early_termination=False,
)
self.gradient_as_bucket_view = gradient_as_bucket_view
if hasattr(module, "_ddp_params_and_buffers_to_ignore"):
self.parameters_to_ignore = module._ddp_params_and_buffers_to_ignore
else:
self.parameters_to_ignore = []
# 查看 parameters
# Check that a module does not have Uninitialized parameters
for param in module.parameters():
if isinstance(param, torch.nn.parameter.UninitializedParameter):
raise RuntimeError(
"Modules with uninitialized parameters can't be used with `DistributedDataParallel`. "
"Run a dummy forward pass to correctly initialize the modules"
)
# used for intra-node param sync and inter-node sync as wel
self.broadcast_bucket_size = int(250 * 1024 * 1024)
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
# Whether to perform input tensor CPU to GPU copies on a side-stream
self.use_side_stream_for_tensor_copies = (
os.environ.get("PYTORCH_DDP_USE_SIDE_STREAM", "1") == "1"
)
# 构建参数
# TODO(wayi@): Remove this field since SPMD is no longer supported,
# and also remove all the relevant unnecessary loops.
# Module replication within process (single-process multi device)
# 这儿需求留意,便是以后不支持了
self._module_copies = [self.module]
# Build parameters for reducer.
parameters, expect_sparse_gradient = self._build_params_for_reducer()
# Verify model equivalence.
dist._verify_model_across_ranks(self.process_group, parameters)
# Sync params and buffers. Ensures all DDP models start off at the same value.
# 将 rank 0 的state_dict() 播送到其他worker,以确保一切worker的模型初始状况相同;
self._sync_params_and_buffers(authoritative_rank=0)
# In debug mode, build a mapping of parameter index -> parameter.
if dist._get_debug_mode() != dist._DistributedDebugLevel.OFF:
param_to_name_mapping = self._build_param_to_name_mapping(parameters)
else:
param_to_name_mapping = {}
# Builds reducer.
self._ddp_init_helper(parameters, expect_sparse_gradient, param_to_name_mapping)
咱们接下来挑选一些重要过程进行剖析。
2.2 构建参数
关于 DDP,第一个要害步便是构建参数,这儿要留意,假如现在情况是单机多GPU,也便是单进程多设备(和DP相同了)情况,那么需求在进程之内进行模型仿制。
可是未来不会支持了,会去掉。所以 parameters 便是 [ToyModel] 的参数调集,parameters[0] 便是 ToyModel 的参数。后边介绍 BucketReplica 会提到。
# TODO(wayi@): Remove this field since SPMD is no longer supported,
# and also remove all the relevant unnecessary loops.
# Module replication within process (single-process multi device)
self._module_copies = [self.module] # 构建一个比方 [ToyModel] 这样的列表
# Build parameters for reducer.
parameters, expect_sparse_gradient = self._build_params_for_reducer()
咱们看看模型中有哪些重要参数:
- parameter :在反向传达之中需求被optimizer更新的参数。咱们能够经过
model.parameters()
得到这些参数。 - buffer : 在反向传达过程之中不需求被optimizer更新的参数。咱们能够经过
model.buffers()
得到这些参数。
2.2.1 _build_params_for_reducer
详细 _build_params_for_reducer 就为reducer树立参数,逻辑大致如下:
- 遍历_module_copies,得到(module, parameter)列表 modules_and_parameters,这些参数是需求求导的,不能在疏忽列表之中。
- 用调集去除可能在多个modules中共享的参数。
- 构建一个参数列表。
- 查看是否一个module期盼一个sparse梯度,把成果放到 expect_sparse_gradient 之中。
- 得到module的参数,与下面的buffer一同,都是用来同步到其他worker的。
- 得到module的buffer,module_buffers 在后续同步时分会用到。
- 回来参数列表和expect_sparse_gradient。
# 之前在初始化过程中,设定了 self._module_copies = [self.module]
def _build_params_for_reducer(self):
# Build tuple of (module, parameter) for all parameters that require grads.
modules_and_parameters = [
[
(module, parameter)
# 得到module列表
for module_name, module in replica.named_modules()
# 得到参数列表,而且参数是需求求导,不在疏忽列表之中
for parameter in [
param
# Note that we access module.named_parameters instead of
# parameters(module). parameters(module) is only needed in the
# single-process multi device case, where it accesses replicated
# parameters through _former_parameters.
for param_name, param in module.named_parameters(recurse=False)
if param.requires_grad
and f"{module_name}.{param_name}" not in self.parameters_to_ignore
]
]
for replica in self._module_copies
]
# Deduplicate any parameters that might be shared across child modules.
# 用调集去除可能在多个modules中共享的参数
memo = set()
modules_and_parameters = [
# "p not in memo" is the deduplication check.
# "not memo.add(p)" is always True, and it's only there to cause "add(p)" if needed.
[(m, p) for m, p in replica_mps if p not in memo and not memo.add(p)]
for replica_mps in modules_and_parameters
]
# Build list of parameters.
# 构建一个参数列表
parameters = [
list(parameter for _, parameter in replica)
for replica in modules_and_parameters
]
# Checks if a module will produce a sparse gradient.
def produces_sparse_gradient(module):
if isinstance(module, torch.nn.Embedding) or isinstance(
module, torch.nn.EmbeddingBag
):
return module.sparse
return False
# Build list of booleans indicating whether or not to expect sparse
# gradients for the corresponding parameters.
# 参数是否期盼sparse gradients
expect_sparse_gradient = [
list(produces_sparse_gradient(module) for module, _ in replica)
for replica in modules_and_parameters
]
# The following modules_params and modules_buffers are used for
# param/buffer sync in _sync_params.
# 得到module的参数,与下面的buffer一同,都是用来同步到其他worker的
self.modules_params = [
list(self._get_parameters(m)) for m in self._module_copies
]
# Collect buffers for modules, filtering out buffers that should be ignored.
# 得到module的buffer,module_buffers 在后续同步时分会用到
named_module_buffers = [
[(buffer, buffer_name) for buffer_name, buffer in m.named_buffers()]
for m in self._module_copies
]
self.modules_buffers = [
[
buffer
for (buffer, buffer_name) in module_buffers
if buffer_name not in self.parameters_to_ignore
]
for module_buffers in named_module_buffers
]
return parameters, expect_sparse_gradient
此刻 parameters 示例如下,能够看到其只要 [0] 元素有意义,这个 [0] 原始自身包括4个元素:
parameters = {list: 1}
0 = {list: 4}
0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02, 3.8828e-02, 1 )
1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033, 0.2771, 0.0721, )
2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319, 0.0713, 0.3155, )
3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008, 0.0582, -0.1245, -0.2538, )
__len__ = {int} 4
__len__ = {int} 1
2.2.2 modules_buffers
这儿多说一句,何处用到 self.modules_buffers?后来在播送参数时分就会用到,比方:
# When running in join mode, checks and performs sync of module buffers if
# the models have buffers that should be synchronized in the forward pass.
def _check_and_sync_module_buffers(self):
if self.will_sync_module_buffers():
authoritative_rank = self._find_common_rank(self._distributed_rank, False)
self._distributed_broadcast_coalesced(
self.modules_buffers[0], self.broadcast_bucket_size, authoritative_rank
)
这儿运用了 _find_common_rank 来得到现在 DDP 运用的一切有用 ranks。
def _find_common_rank(self, input_rank, rank_cond):
# -1 indicates that this rank is not under consideration to be the
# common_rank
rank_to_use = torch.tensor(
[input_rank if rank_cond else -1],
device=self.device,
)
# 运用MAX操作得到最大数值
dist.all_reduce(rank_to_use, op=ReduceOp.MAX, group=self.process_group)
if rank_to_use.item() == -1:
raise ValueError(
"BUG! Expected rank_cond to be true for at least one process."
)
return rank_to_use.item() # 回来悉数ranks
2.3 验证模型
接下来是验证模型阶段。
2.3.1 背景常识
由于后续用到了如下代码,所以咱们首要看看背景常识 broadcast。不熟悉这部分的朋友会有疑问是:为什么 broadcast 能够从 rank 0 播送到其他rank,分明一切rank都调用到了相同的 broadcast 代码。
process_group->broadcast(vec)->wait(); // 把 rank 0 的 meta 播送到对应的设备
咱们来到 torch/lib/c10d/ProcessGroupMPI.cpp。能够看到,其运用了 MPI 的 MPI_Bcast API 来进行播送操作,其中 opts.rootRank是要害所在。
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupMPI::broadcast(
std::vector<at::Tensor>& tensors,
const BroadcastOptions& opts) {
checkSingleTensor(tensors);
std::function<void(std::unique_ptr<WorkEntry>&)> runFunc =
[opts, this](std::unique_ptr<WorkEntry>& entry) {
auto data = (entry->src)[0];
c10::DeviceGuard guard(data.device());
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
MPI_CHECK(MPI_Bcast( // 调用MPI API
data.data_ptr(),
data.numel(),
mpiDatatype.at(data.scalar_type()),
opts.rootRank, // 这儿是要害,仅仅从root播送其他rank
pgComm_));
};
auto entry = std::make_unique<WorkEntry>(&tensors, &tensors, std::move(runFunc));
return enqueue(
std::move(entry),
"mpi:broadcast",
c10::optional<std::vector<at::Tensor>>(tensors));
}
opts 是 BroadcastOptions 的实例。
class BroadcastOptions:
rootRank: int
rootTensor: int
timeout: timedelta
在 C++ 国际对应了如下:
struct BroadcastOptions {
int rootRank = 0;
int rootTensor = 0;
std::chrono::milliseconds timeout = kUnsetTimeout;
};
在定义时分看到,BroadcastOptions 被C++自动初始化为0,所以一切 rank 的进程都是运用 rootRank = 0 进行调用 MPI_Bcast,成果便是从 rank = 0 来向其他 rank 进行播送。
c10::intrusive_ptr<ProcessGroup::Work> broadcast(
std::vector<at::Tensor>& data,
const BroadcastOptions& opts = BroadcastOptions()) override;
2.3.2 详细代码
咱们接下来看看如何验证模型。
_verify_model_across_ranks 的作用是验证模型(replica 0)的相关参数在播送之后,跨进程时分拥有相同的size/strides。
# Verify model equivalence.
dist._verify_model_across_ranks(self.process_group, parameters)
经过下面代码咱们可知,_verify_model_across_ranks 实践调用到verify_replica0_across_processes。
module.def(
"_verify_model_across_ranks",
&::c10d::verify_replica0_across_processes,
py::arg("process_group"),
py::arg("replicas"),
py::call_guard<py::gil_scoped_release>());
verify_replica0_across_processes 之中,参数model_replicas 便是前面的 parameters,其逻辑如下:
- 首要,从 model_replicas 得到 metadata。
- 然后把metadata克隆到metadata_dev。
- 然后,把 process 0 的 metadata_dev 播送到对应的设备。
- 每个进程都会运转相同的代码,可是 process_group->broadcast 之中,只要 rank 0 会设置为 root_rank,这样就只播送 rank 0 的数据。
- 播送之后,一切进程的 metadata_dev 都相同,便是 process 0 内的数据。
- 然后把 metadata_dev 拷贝回 control,把 control 和 model_replicas[0]比较,看看是否和本来持平。
- 查看 control 是否和 model_replicas 的尺度相同。
- 这儿运用了 accessor,LibTorch 运用 accessor 快速访问 Tensor,假如 tensor 在CPU上,运用 accessor,假如在 GPU上,运用 packed_accessor 访问,这部分在 “中心开发者全面解读PyTorch 内部机制” 有相关提及。
详细代码如下:
// Verifies corresponding params in replica 0 have the same sizes/strides
// across processes.
void verify_replica0_across_processes(
c10::intrusive_ptr<c10d::ProcessGroup> process_group,
std::vector<std::vector<at::Tensor>> model_replicas) {
size_t i = 0;
for (const auto& t : model_replicas[0]) {
i += 2 * t.dim();
}
at::TensorOptions options;
options = options.dtype(at::kLong);
auto metadata = at::empty({static_cast<long>(i)}, options);
// Technically, process 0 is the broadcast source, so only process 0 needs
// to populate metadata. But no harm keeping work aligned across processes.
auto metadata_accessor = metadata.accessor<int64_t, 1>();
i = 0;
// 把model_replicas[0]拷贝到metadata_accessor,其实便是metadata
for (const auto& t : model_replicas[0]) {
for (const auto& sz : t.sizes()) {
metadata_accessor[i++] = sz;
}
for (const auto& str : t.strides()) {
metadata_accessor[i++] = str;
}
}
// 然后把metadata克隆到metadata_dev
auto metadata_dev = metadata.clone().to(model_replicas[0][0].device());
std::vector<at::Tensor> vec{metadata_dev};
// 播送metadata_dev
process_group->broadcast(vec)->wait(); // 把process 0 的 meta 播送到对应的设备
// 这之后,metadata_dev 便是一切进程的成果咱们都相同了
// Technically, process 0 doesn't need to double-check metadata, because it
// was the source. But no harm keeping work aligned.
auto control = at::empty({static_cast<long>(i)}, options);
// 把 metadata_dev 拷贝回 control
control.copy_(metadata_dev, /*non_blocking=*/false);
// 然后把 control 和 model_replicas[0]比较,看看是否和本来持平
auto control_accessor = control.accessor<int64_t, 1>();
i = 0;
for (size_t p = 0; p < model_replicas[0].size(); p++) {
const auto& t = model_replicas[0][p];
// I'd like to include which process we are in the message,
// but ProcessGroup::getRank is not public!
for (const auto& sz : t.sizes()) {
TORCH_CHECK(
sz == control_accessor[i++],
"replicas[0][",
p,
"] in this process"
" with sizes ",
t.sizes(),
" appears not to match sizes of the same param in process 0.");
}
for (const auto& str : t.strides()) {
TORCH_CHECK(
str == control_accessor[i++],
"replicas[0][",
p,
"] in this process"
" with strides ",
t.strides(),
" appears not to match strides of the same param in process 0.");
}
}
}
2.4 播送状况
下一步是播送状况,把模型初始参数和变量从 rank 0 播送到其他 ranks。
# Sync params and buffers. Ensures all DDP models start off at the same value.
# 将 rank 0 的state_dict() 播送到其他worker,以确保一切worker的模型初始状况相同;
self._sync_params_and_buffers(authoritative_rank=0)
2.4.1 state_dict
咱们先来看看需求播送什么。
pytorch 的 state_dict 是一个字典对象,其将模型的每一层与它的对应参数树立映射联系,比方 model 每一层的weights及偏置等等。只要那些参数能够练习的层(比方卷积层,线性层等)才会被保存到模型的state_dict中,池化层、BN层这些自身没有参数的层就不会保存在 state_dict 之中,比方针对下面模型。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
state_dict 如下:
self.module.state_dict() = {OrderedDict: 4}
'net1.weight' = {Tensor: 10} tensor([[ 0.2687, 0.0840, -0.1032, 0.3079, 0.0385, -0.0495, -0.3068, -0.1271,\n -0.1067, -0.1966],\n [-0.1203, 0.1789, 0.0666, 0.1882, 0.1335, 0.1921, -0.1145, -0.1781,\n 0.0661, -0.2339],\n [ 0.1865, -0.2076, 0.2071, 0
'net1.bias' = {Tensor: 10} tensor([ 0.2146, -0.1599, 0.2350, -0.2843, -0.0773, -0.2151, 0.1864, -0.3068,\n -0.2093, 0.1365])
'net2.weight' = {Tensor: 5} tensor([[ 0.1922, -0.0148, -0.1884, 0.2124, -0.1361, 0.0172, -0.2371, 0.1946,\n 0.2047, -0.2697],\n [-0.2690, 0.1372, 0.2269, 0.0436, -0.1353, -0.2054, -0.2418, -0.2300,\n 0.1987, 0.0007],\n [ 0.0995, -0.2659, -0.2374, -0
'net2.bias' = {Tensor: 5} tensor([0.1488, 0.0791, 0.1667, 0.1449, 0.0545])
2.4.2 _sync_params_and_buffers
_sync_params_and_buffers 是根据 module的state_dict 来收集能够练习的参数,然后把这些参数播送出去。
详细代码是:
def _sync_params_and_buffers(self, authoritative_rank=0):
module_states = []
for name, param in self.module.state_dict().items():
if name not in self.parameters_to_ignore:
module_states.append(param)
# module_states = {list: 4} [tensor([[ 0.2687, 0.0840, -0.1032, 0.3079, 0.0385, -0.0495, -0.3068, -0.1271,\n -0.1067, -0.1966],\n [-0.1203, 0.1789, 0.0666, 0.1882, 0.1335, 0.1921, -0.1145, -0.1781,\n 0.0661, -0.2339],\n [ 0.1865, -0.2076, 0.2071,
if len(module_states) > 0:
self._distributed_broadcast_coalesced(
module_states, self.broadcast_bucket_size, authoritative_rank
)
咱们看看,_distributed_broadcast_coalesced
调用了 dist._broadcast_coalesced
import torch.distributed as dist
def _distributed_broadcast_coalesced(
self, tensors, buffer_size, authoritative_rank=0
):
dist._broadcast_coalesced(
self.process_group, tensors, buffer_size, authoritative_rank
)
2.4.3 dist._broadcast_coalesced
咱们沿着代码来寻找,首要来到 torch\distributed_init_.py,这儿会导入 _broadcast_coalesced。
if is_available():
from torch._C._distributed_c10d import (
Store,
FileStore,
TCPStore,
ProcessGroup,
PrefixStore,
Reducer,
Logger,
BuiltinCommHookType,
GradBucket,
_DEFAULT_FIRST_BUCKET_BYTES,
_register_comm_hook,
_register_builtin_comm_hook,
_broadcast_coalesced, # 在这儿导入
_compute_bucket_assignment_by_size,
_verify_model_across_ranks,
_test_python_store,
_DistributedDebugLevel,
_get_debug_mode
)
if sys.platform != 'win32':
from torch._C._distributed_c10d import (
HashStore,
_round_robin_process_groups,
)
from .distributed_c10d import * # noqa: F403
# Variables prefixed with underscore are not auto imported
# See the comment in `distributed_c10d.py` above `_backend` on why we expose
# this.
from .distributed_c10d import _backend, _all_gather_base
咱们持续找到 torch\csrc\distributed\c10d\init.cpp
module.def(
"_broadcast_coalesced",
// Define a lambda such that the pybind11 prototype can take a std::vector
// for the tensor list argument, but still pass it to the underlying
// function as a c10::ArrayRef.
[](c10::intrusive_ptr<::c10d::ProcessGroup> process_group,
std::vector<at::Tensor> tensors, // NOLINT
size_t buffer_size,
int rank) {
broadcast_coalesced( // 在这儿
std::move(process_group), tensors, buffer_size, rank);
},
py::arg("process_group"),
py::arg("tensors"),
py::arg("buffer_size"),
// The source of truth rank to broadcast the tensors from.
py::arg("src") = 0,
py::call_guard<py::gil_scoped_release>());
终究来到了 torch/lib/c10d/comm.cpp,这儿运用 ProcessGroup 对张量进行播送。
// Broadcast many tensors to all processes in the process group.
void broadcast_coalesced(
c10::intrusive_ptr<c10d::ProcessGroup> process_group,
at::TensorList tensors,
size_t buffer_size,
int rank) {
// Coalesce tensors into buckets taking into account the maximum buffer size.
// This routine is multi-device aware, so the tensors can be split across
// multiple devices and can contain a mix of CPU and CUDA tensors.
// 首要核算出桶
const auto buckets =
compute_bucket_assignment_by_size(tensors.vec(), {buffer_size});
// Returns tensor at specified index in input tensor list.
const auto lookup = [&tensors](size_t index) { return tensors[index]; };
// We maintain a maximum of 2 in flight broadcast operations to avoid
// allocating too much memory (in case the specified tensors are very large).
std::deque<BroadcastWork> in_flight; // 树立一个播送work列表
constexpr auto max_in_flight = 2;
for (const auto& bucket : buckets) { // 遍历桶
if (in_flight.size() >= max_in_flight) { // 由注释能够知道,播送维度是2,这样避免内存占用过大
in_flight.front().finish(); // 播送变量
in_flight.pop_front();
}
in_flight.emplace_back(process_group, c10::fmap(bucket, lookup), rank);
}
while (!in_flight.empty()) {
in_flight.front().finish();
in_flight.pop_front();
}
}
关于BroadcastWork,咱们补充阐明一下,便是运用 ProcessGroup 来把张量播送出去,ProcessGroup 详细能够拜见前面文章。
class BroadcastWork {
public:
BroadcastWork(
const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
std::vector<at::Tensor> bucket_tensors,
int root_rank = 0)
: bucket_tensors_(std::move(bucket_tensors)),
flat_tensor_({torch::utils::flatten_dense_tensors(bucket_tensors_)}) {
BroadcastOptions broadcastOptions;
broadcastOptions.rootRank = root_rank;
work_ = process_group->broadcast(flat_tensor_, broadcastOptions);
}
void finish() {
work_->wait();
// Copy the output of the broadcast operation back.
auto output_tensors = torch::utils::unflatten_dense_tensors(
flat_tensor_.front(), bucket_tensors_);
TORCH_INTERNAL_ASSERT(output_tensors.size() == bucket_tensors_.size());
for (size_t i = 0; i < output_tensors.size(); i++) {
bucket_tensors_[i].copy_(output_tensors[i], /*non_blocking=*/true);
}
}
protected:
// The list of tensors to broadcast. They are guaranteed to be
// placed on the same device and have the same dtype.
std::vector<at::Tensor> bucket_tensors_;
// The vector with a single flattened tensor containing the contents
// of the tensors in bucket_tensors_. It must be stored in a vector
// because c10d::ProcessGroup::broadcast takes a vector argument.
std::vector<at::Tensor> flat_tensor_;
private:
// The broadcast work that is kicked off upon construction.
c10::intrusive_ptr<c10d::ProcessGroup::Work> work_;
};
2.5 初始化功能函数
接下来会调用 _ddp_init_helper 进行初始化事务函数。
2.5.1 _ddp_init_helper
_ddp_init_helper 是用来初始化事务的函数,其主要逻辑如下:
- 对参数进行分桶,尽可能依照前向传达的逆序(前向传达中先核算出来的梯度,会先反向传达)把参数分配均匀分配入桶,这样能够进步通讯速度和归并速度;
- 重置分桶状况;
- 生成一个Reducer,其内部会注册 autograd_hook,其用来在反向传达时分进行梯度同步;
- 进行logging装备;
- 给SyncBatchNorm Layer传递 DDP handle;
详细代码如下:
def _ddp_init_helper(self, parameters, expect_sparse_gradient, param_to_name_mapping):
"""
Initialization helper function that does the following:
(1) bucketing the parameters for reductions
(2) resetting the bucketing states
(3) registering the grad hooks
(4) Logging constructin-time DDP logging data
(5) passing a handle of DDP to SyncBatchNorm Layer
"""
self.num_iterations = 0
# The bucket size limit is specified in the constructor.
# Additionally, we allow for a single small bucket for parameters
# that are defined first, such that their gradients don't spill into
# a much larger bucket, adding unnecessary latency after gradient
# computation finishes. Experiments showed 1MB is a reasonable value.
bucket_indices = dist._compute_bucket_assignment_by_size(
parameters[0],
[dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
expect_sparse_gradient[0],
)
# Note: reverse list of buckets because we want to approximate the
# order in which their gradients are produced, and assume they
# are used in the forward pass in the order they are defined.
self.reducer = dist.Reducer(
parameters,
list(reversed(bucket_indices)), # 运用桶index
self.process_group,
expect_sparse_gradient,
self.bucket_bytes_cap,
self.find_unused_parameters,
self.gradient_as_bucket_view,
param_to_name_mapping,
)
self.logger = dist.Logger(self.reducer)
# Set logging data that can be got during construction time.
self.logger.set_construction_data_and_log(
self.module.__class__.__name__,
[] if self.device_ids is None else self.device_ids,
-1 if self.output_device is None else self.output_device,
self.broadcast_buffers,
)
# passing a handle to torch.nn.SyncBatchNorm layer
self._passing_sync_batchnorm_handle(self._module_copies)
2.5.2 核算分桶
首要,_compute_bucket_assignment_by_size 完结了分桶功能。这儿parameters[0] 便是对应的张量列表。
_DEFAULT_FIRST_BUCKET_BYTES = 1048576
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
bucket_indices = dist._compute_bucket_assignment_by_size(
parameters[0],
# 桶的巨细约束是一个数组
[dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
expect_sparse_gradient[0],
)
2.5.2.1 论文内容
咱们接下来就要结合论文内容来剖析。
梯度bucketing的思维是基于这样一个观察,即调集通讯在大张量上更有用。
试验表明,假如DDP在短时间内等候并将多个梯度存储到一个AllReduce操作中,它能够完结更高的吞吐量和更低的延迟,而不是在每个梯度存储可用时立即发动专用的AllReduce。这关于具有许多小参数的模型特别有用。可是,DDP不应在一个AllReduce中传输一切数据,否则,在核算结束之前无法发动任何通讯。
参数到桶映射(Parameter-to-Bucket Mapping)对DDP速度有相当大的影响。在每次向后传达中,将一切参数梯度中的张量仿制到桶中,并在AllReduce之后将均匀梯度仿制回桶中。为了加快仿制操作,存储桶始终与参数在同一设备上创立。假如模型跨过多个设备,DDP会考虑设备关联性,以确保同一存储桶中的一切参数都位于同一设备上。AllReduce的次第也会对成果发生影响,由于它决议了多少通讯能够与核算堆叠。DDP按model.parameters()的相反次第发动AllReduce。
所以,为了进步通讯功率,DDP 将Reducer
参数梯度组织成为桶,一次规约一个桶。从参数梯度到桶的映射是在构建时根据桶巨细约束和参数巨细确认的,。用户能够经过设置bucket_cap_mb来装备桶的巨细。
模型参数以(大致)Model.parameters()
与给定模型相反的次第分配到桶中 。运用相反次第的原因是:
- 反向传达的次第是前向传达核算的反序。
- DDP 希望梯度在反向传递期间以前向传达的大致次第来安排妥当。
2.5.2.2 分组根据
DDP 依照类型和设备作为key来分组,由于不同设备上的tensor不应该分在一组上,同类型张量应该分在一桶。用类型和设备作为key 就能够确保同设备上同类型张量分配在同一个桶里。
// Tensors may be coalesced into buckets. Buckets must contain tensors of
// the same type, on the same device, so a bucket can identified by a
// composite key of a tensor's type identifier and its device.
struct BucketKey {
BucketKey(c10::ScalarType type, c10::Device device)
: type(std::move(type)), device(std::move(device)) {}
const c10::ScalarType type;
const c10::Device device;
// See torch/csrc/utils/hash.h for dispatch code.
static size_t hash(const BucketKey& key) {
return c10::get_hash(key.type, key.device); // 用类型和设备作为key
}
};
2.5.2.3 compute_bucket_assignment_by_size
其要害结构如下,BucketAccumulator 能够认为是实践的桶。
struct BucketAccumulator {
std::vector<size_t> indices; // 桶内容,是张量列表
size_t size = 0; // 桶巨细,比方若干mb
}; // 桶的逻辑内容
// Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
buckets; // 一切桶的列表,每一个实践桶能够认为是 BucketAccumulator
咱们来看看 compute_bucket_assignment_by_size的详细逻辑:
- 定义了桶巨细约束列表。bucket_size_limit_iterators。
- 定义了一切桶的列表 buckets,每一个实践桶能够认为是 BucketAccumulator。
- 遍历传入的一切张量:
- 给一切的tensor一个index,从0开端递加,一直到 tensors.size(),假如现已传入了 indices,就拿到张量的index。
- 假如装备了等待sparse gradient,则把这个张量自己放入一个桶,由于无法和其他张量放在一同。
- 运用张量信息构建桶的key,找到对应的桶。
- 拿到BucketAccumulator,往该桶的张量列表里边刺进新张量的index,indices 是 tensor index list。
- 添加对应桶巨细。
- 假如需求,就设定成巨细约束的初始值。
- 拿到当时最小值约束。
- 假如桶的尺度大于最小值约束,便是说现在桶的尺度现已达到了桶的最大约束,按说需求搬运到新桶了。
- 实践上的确搬运到了逻辑上的新桶,可是实践仍是在现有桶内履行,由于 type, device 仍是相同的,仍是应该在原有桶内持续累积,不过原有桶的indice现已搬运到了result之中,就相当于清空了。
- 把桶内容刺进到回来result,便是说,当桶尺度过大的时分,就先刺进到result之中。
- 从头生成桶,bucket是个引证,所以直接赋值,就相当于清空原有的桶,便是本来桶持续用,可是桶内原有的indices现已搬运到了result之中。
- 前进到下一个尺度约束。
- 把剩下的桶内indices刺进到回来值,由于之前现已有些直接刺进到了result之中。
- 对result 进行排序:
- 假如 tensor_indices 非空,阐明张量的次第现已是梯度准备好的次第,不需求再排序了。
- 假如 tensor_indices 是空的,根据最小张量index来排序,这儿假定张量的次第是他们运用的次第(或许说是他们梯度发生次第的反序)。这种排序可确保桶是依照接二连三的次第准备好。
- 留意,这儿便是正序摆放,等到创立Reducer的时分,才反序传入:list(reversed(bucket_indices))。
- 终究回来 result,result 终究如下,里边每个vector 都对应了一个bucket,里边是都是 tensor 的 index,这儿都是从小到大次第排序。
std::vector<std::vector<size_t>> compute_bucket_assignment_by_size(
const std::vector<at::Tensor>& tensors,
const std::vector<size_t>& bucket_size_limits, // 桶巨细约束
const std::vector<bool>& expect_sparse_gradient,
const std::vector<int64_t>& tensor_indices) { //实践上,初始化时分没有传入 tensor_indices
// Either expect_sparse_gradient is not specified or it has as many elements
// as the vector with tensors.
TORCH_INTERNAL_ASSERT(
expect_sparse_gradient.empty() ||
(tensors.size() == expect_sparse_gradient.size()));
TORCH_INTERNAL_ASSERT(tensors.size() > 0);
std::vector<std::vector<size_t>> result;
result.reserve(tensors.size()); // 预留巨细
// Keep iterator into the size_limit vector by tensor type and device.
// This is done so that we can use the consecutive bucket limits per type.
std::unordered_map<
BucketKey,
std::vector<size_t>::const_iterator,
c10::hash<BucketKey>>
bucket_size_limit_iterators;
// Local accumulator type for a single bucket.
struct BucketAccumulator {
std::vector<size_t> indices; // 桶内容,是张量列表
size_t size = 0; // 桶巨细,比方若干mb
}; // 桶的逻辑内容
// Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
buckets; // 一切桶的列表,每一个实践桶能够认为是 BucketAccumulator
for (size_t i = 0; i < tensors.size(); i++) { // 遍历传入的一切张量
const auto& tensor = tensors[i]; //拿到张量
TORCH_CHECK(!tensor.is_sparse(), "No support for sparse tensors.");
// when tensor_indices is empty, the index of tensors[i] assigned to
// bucket is i, otherwise the tensor index is tensor_indices[i].
auto tensor_index = i; // 便是给一切的tensor一个index,从0开端递加,一直到 tensors.size()
if (!tensor_indices.empty()) {
tensor_index = tensor_indices[i]; // 假如有index,就拿到张量的index
}
// If we expect a sparse gradient to be produced for this tensor, it cannot
// be grouped together with other gradients and gets its own bucket.
// 假如装备了等待sparse gradient,则把这个张量自己放入一个桶,由于无法和其他张量放在一同
if (!expect_sparse_gradient.empty() &&
expect_sparse_gradient[tensor_index]) {
result.push_back({tensor_index});
continue;
}
auto key = BucketKey(tensor.scalar_type(), tensor.device()); //运用张量信息构建桶的key
auto& bucket = buckets[key]; // 找到对应的桶, 拿到BucketAccumulator
bucket.indices.push_back(tensor_index); // 往该桶的张量列表里边刺进新张量的index,indices 是 tensor index list
bucket.size += tensor.numel() * tensor.element_size();// 添加对应桶巨细
// Initialize bucket size limit iterator if necessary.
// 假如需求,就设定成巨细约束的初始值
if (bucket_size_limit_iterators.count(key) == 0) {
bucket_size_limit_iterators[key] = bucket_size_limits.begin();
}
// bucket_size_limit_iterator 便是桶巨细的范围, 即 [_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_cap_mb * 1024 * 1024)]
auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key];
const auto bucket_size_limit = *bucket_size_limit_iterator; // 当时最小值约束
if (bucket.size >= bucket_size_limit) {
// 假如桶的尺度大于最小值约束,便是说现在桶的尺度现已达到了桶的最大约束,按说需求搬运到新桶了(实践上的确搬运到了逻辑上的新桶,可是实践仍是在现有桶内履行,由于 type, device 仍是相同的,仍是应该在原有桶内持续累积,不过原有桶的indice现已搬运到了result之中,就相当于清空了)
result.emplace_back(std::move(bucket.indices)); // 把桶内容刺进到回来result,便是说,当桶尺度过大的时分,就先刺进到result之中。
bucket = BucketAccumulator(); // 从头生成桶,bucket是个引证,所以直接赋值,就相当于清空原有的桶,便是本来桶持续用,可是桶内原有的indices现已搬运到了result之中。
// Advance to the next bucket size limit for this type/device.
// 前进到下一个尺度约束
auto next = bucket_size_limit_iterator + 1;
if (next != bucket_size_limits.end()) {
bucket_size_limit_iterator = next;
}
}
}
// Add remaining buckets. 把剩下的桶内indices刺进到回来值,由于之前现已有些直接刺进到了result之中
for (auto& it : buckets) {
auto& bucket = it.second;
if (!bucket.indices.empty()) {
result.emplace_back(std::move(bucket.indices));
}
}
// If tensor_indices is not empty, the order of the tensors is in the gradient
// ready order, so no need to sort.
// If tensor_indices is empty, sort resulting buckets by the minimum tensor
// index they include. We assume that the order of the tensors is the order in
// which they are used (or the reverse order in which their gradients are
// produced). This sorting step ensures that the buckets are ready in
// consecutive order.
// 假如 tensor_indices 非空,阐明张量的次第现已是梯度准备好的次第,不需求再排序了
// 假如 tensor_indices 是空的,根据最小张量index来排序,这儿假定张量的次第是他们运用的次第(或许说是他们梯度发生次第的反序)。这种排序可确保桶是依照接二连三的次第准备好。
// 留意,这儿便是正序摆放,等到创立Reducer的时分,才反序传入:list(reversed(bucket_indices))
if (tensor_indices.empty()) {
std::sort(
result.begin(),
result.end(),
[](const std::vector<size_t>& a, const std::vector<size_t>& b) {
// 关于任意两个vector,排序的根据是:用这两个vector之中最小index来排序
const auto amin = std::min_element(a.begin(), a.end()); // a中的最小index
const auto bmin = std::min_element(b.begin(), b.end()); // b中的最小index
return *amin < *bmin;
});
}
return result; // result 终究如下,里边每个vector 都对应了一个bucket,里边是都是 tensor 的 index,这儿都是从小到大次第排序。
}
result 终究如下,里边每个vector 都对应了一个bucket,里边是都是 tensor 的 index,这儿都是从小到大次第排序。
这儿留意的是:由于 传入参数 tensors便是 parameters[0],而 parameters[0] 是依照 parametes() 的回来成果来的,即,模型参数以(大致)Model.parameters()
与给定模型相反的次第分配到桶中 。运用相反次第的原因是由于 DDP 希望梯度在反向传递期间以大约该次第准备安排妥当。终究 DDP 是按model.parameters()的相反次第发动AllReduce。
+-----------------------------------------------------------------------+
| |
| <tensor index 1, tensor index 2, tensor index 3, tensor index 4> |
| |
| |
| <tensor index 5, tensor index 6, tensor 7> |
| |
| |
| ...... |
| |
| |
| <tensor index 8, tensor index 9, tensor index 10, tensor index 11> |
| |
+-----------------------------------------------------------------------+
2.5.3 Reducer
接下来的代码便是生成了一个Reducer。
self.reducer = dist.Reducer(
parameters,
list(reversed(bucket_indices)), # 运用桶index
self.process_group,
expect_sparse_gradient,
self.bucket_bytes_cap,
self.find_unused_parameters,
self.gradient_as_bucket_view,
param_to_name_mapping,
)
咱们在后续文章中会详细介绍 Reducer。
0xEE 个人信息
★★★★★★关于日子和技术的考虑★★★★★★
微信大众账号:罗西的考虑
0xFF 参考
pytorch分布式系列3——分布式练习时,torch.utils.data.distributed.DistributedSampler做了什么?
pytorch分布式系列1——搞清torch.distributed.launch相关的环境变量
pytorch分布式系列2——DistributedDataParallel是如何做同步的?
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
Pytorch的nn.DataParallel
discuss.pytorch.org/t/dataparal…
pytorch.org/docs/stable…
PyTorch 源码解读之分布式练习了解一下?
实操教程|PyTorch AutoGrad C++层完结
PYTORCH 自动微分(一)
PyTorch如何加快数据并行练习?分布式秘籍大揭秘
pytorch分布式练习(二init_process_group)
pytorch.org/tutorials/i…
pytorch.org/docs/master…
pytorch.org/tutorials/i…
PyTorch 源码解读之 DP & DDP:模型并行和分布式练习解析
Pytorch模型中的parameter与buffer
pytorch.org/docs/master…