0x00 摘要
前文现已对Reducer怎么构建和几个重要场景做了介绍,本文就来剖析 Reducer 怎么完结前向传达。
本系列其他文章如下:
本系列其他文章如下:
[源码解析] PyTorch 分布式(1)——历史和概述
[源码解析] PyTorch 怎么运用GPU
[源码解析] PyTorch 分布式(2) —– DataParallel(上)
[源码解析] PyTorch 分布式(3) —– DataParallel(下)
[源码解析] PyTorch 分布式(4)——分布式应用基础概念
[源码解析] PyTorch 分布式(5) —— DistributedDataParallel 总述&怎么运用
[源码解析] PyTorch分布式(6) —DistributedDataParallel — 初始化&store
[源码解析] PyTorch 分布式(7) —– DistributedDataParallel 之进程组
[源码解析] PyTorch 分布式(8) ——– DistributedDataParallel之论文篇
[源码解析] PyTorch 分布式(9) —– DistributedDataParallel 之初始化
[源码解析] PyTorch 分布式(10)——DistributedDataParallel之Reducer静态架构
[源码解析] PyTorch 分布式(11) —– DistributedDataParallel 之 构建Reducer和Join操作
0x01 整体逻辑
咱们仍是需求祭出法宝,看看论文中的DDP整体逻辑:
然后给出一个前向传达的整体战略如下:
Forward Pass:
- 每个进程读去自己的练习数据,DistributedSampler保证每个进程读到的数据不同。
- DDP 获取输入并将其传递给本地模型。
- 模型进行前向核算,成果设置为 out。现在核算都是在每个进程(CUDA设备)上完结。
- 假如
find_unused_parameters
设置为True
,DDP 会剖析本地模型的输出,从 out 开端遍历核算图,把未运用参数标示为 ready,由于每次核算图都会改动,所以每次都要遍历。- 此模式(Mode)答应在模型的子图上向后运转,并且 DDP 通过从模型输出out遍历 autograd 图,将一切未运用的参数标记为就绪,以削减反向传递中触及的参数。
- 在后向传达期间,Reducer会规约一切桶,在此进程中,
Reducer
会等待未预备好的参数。将参数梯度标记为就绪并不能协助 DDP 越过桶,但它会阻止 DDP 在向后传递期间永久等待不存在的梯度。 - 请留意,遍历 autograd 图会引入额外的开销,因而应用程序仅在必要时才设置
find_unused_parameters
为True
。
- 回来out即可。这点与 DP不同,DDP的模型网络输出不需求被gather到 rank 0进程。
0x02 Python 国际
咱们仍是从 Python 代码入手开端剖析,代码坐落:torch/nn/parallel/distributed.py。
咱们这儿省掉 join 相关,只重视主体部分,forward 方法逻辑如下:
- 保存线程本地状况。
- 假如做装备,则调用 reducer.prepare_for_forward 为forward做预备。
- 假如装备ddp_join_enabled,做相应处理。
- 在前向传达之前运用 _rebuild_buckets 来重置桶。
- 在 _rebuild_buckets 函数之中,或许会在释放旧bucket之前分配新bucket。
- 假如要节约峰值内存运用量,请在正向核算期间峰值内存运用量添加之前调用
_rebuild_bucket
。
- 假如需求同步,则调用_sync_params对前向传达参数进行前向传达参数。
- 进行前向传达。
- 假如需求同步后向传达梯度,则调用prepare_for_backward。
- 当DDP参数 find_unused_parameter 为 true 时,其会在 forward 完毕时,发动一个回溯,标记出一切没被用到的 parameter,提前把这些设定为 ready,这样 backward 就能够在一个 subgraph 之上进行,但这样会献身一部分时刻。
详细代码如下:
def forward(self, *inputs, **kwargs):
with torch.autograd.profiler.record_function("DistributedDataParallel.forward"):
# 保存线程本地状况
self.reducer.save_thread_local_state()
# 假如做装备,则调用 reducer 为forward做预备
if torch.is_grad_enabled() and self.require_backward_grad_sync:
self.logger.set_runtime_stats_and_log()
self.num_iterations += 1
self.reducer.prepare_for_forward()
# 假如装备ddp_join_enabled,做相应处理
if self.ddp_uneven_inputs_config.ddp_join_enabled:
ones = torch.ones(1, device=self.device)
work = dist.all_reduce(ones, group=self.process_group, async_op=True)
if self.ddp_uneven_inputs_config.ddp_join_throw_on_early_termination:
# Active ranks schedule an allreduce with zeros, inactive
# ranks schedule them with 1. If the result != 0 it
# indicates at least one rank has terminated and we should
# throw.
zeros = torch.zeros(1, device=self.device)
dist.all_reduce(zeros, group=self.process_group)
should_throw_stop_iteration = zeros.item()
if should_throw_stop_iteration:
raise RuntimeError(
"Detected at least one rank that exhausted inputs. Throwing across all ranks."
)
else:
self.reducer._set_forward_pass_work_handle( # 是join这儿用到
work,
self.ddp_uneven_inputs_config.ddp_join_divide_by_initial_world_size,
)
# Calling _rebuild_buckets before forward compuation,
# It may allocate new buckets before deallocating old buckets
# inside _rebuild_buckets. To save peak memory usage,
# call _rebuild_buckets before the peak memory usage increases
# during forward computation.
# This should be called only once during whole training period.
# 在前向传达之前运用 _rebuild_buckets 来重置桶
# 在此函数内,或许在释放旧bucket之前分配新bucket。
# 假如要节约峰值内存运用量,请在正向核算期间峰值内存运用量添加之前调用_rebuild_bucket。
# 在整个练习期间,这只能调用一次。
if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
logging.info("Reducer buckets have been rebuilt in this iteration.")
# 假如需求同步前向传达参数,则进行同步
if self.require_forward_param_sync:
self._sync_params()
if self.ddp_uneven_inputs_config.ddp_join_enabled:
# Notify joined ranks whether they should sync in backwards pass or not.
self._check_global_requires_backward_grad_sync(is_joined_rank=False)
# 进行前向传达
if self.device_ids:
# 多卡情况
inputs, kwargs = self.to_kwargs(inputs, kwargs, self.device_ids[0])
output = self.module(*inputs[0], **kwargs[0])
else:
output = self.module(*inputs, **kwargs)
# 假如需求同步后向传达梯度,则调用prepare_for_backward
if torch.is_grad_enabled() and self.require_backward_grad_sync:
# 当DDP参数 find_unused_parameter 为 true 时,其会在 forward 完毕时,发动一个回溯,标记出一切没被用到的 parameter,提前把这些设定为 ready,这样 backward 就能够在一个 subgraph 进行,但这样会献身一部分时刻。
self.require_forward_param_sync = True
# We'll return the output object verbatim since it is a freeform
# object. We need to find any tensors in this object, though,
# because we need to figure out which parameters were used during
# this forward pass, to ensure we short circuit reduction for any
# unused parameters. Only if `find_unused_parameters` is set.
if self.find_unused_parameters and not self.static_graph:
# Do not need to populate this for static graph.
self.reducer.prepare_for_backward(list(_find_tensors(output)))
else:
self.reducer.prepare_for_backward([])
else:
self.require_forward_param_sync = False
# TODO. Right now we add this sink for static_graph training only. once
# this feature is stable, we will add this sink for all cases. E.g.
# This sink can help capture more accuracte backward start time as well.
if self.static_graph and self.num_iterations == 1:
# Need to grab list of tensors from user output in order to pass
# to custom autograd function.
output_tensor_list, treespec = tree_flatten(output)
passthrough_tensor_list = _DDPSink.apply(
self.reducer,
*output_tensor_list
)
# Reconstruct output data structure.
output = tree_unflatten(passthrough_tensor_list, treespec)
return output
其间,运用 _sync_params 来同步模型参数,详细是运用 _distributed_broadcast_coalesced 进行完结。
def _sync_params(self):
with torch.no_grad():
# module buffer sync
if self.will_sync_module_buffers():
# Synchronize buffers across processes.
# If we are running DDP with the join manager, we have to agree
# upon a rank to sync module buffers from, since rank 0 may
# already have been joined and have stale module buffers.
if self.ddp_uneven_inputs_config.ddp_join_enabled:
authoritative_rank = self._find_common_rank(
self._distributed_rank, True
)
else:
# The process with rank 0 is considered the authoritative copy.
authoritative_rank = 0
self._distributed_broadcast_coalesced(
self.modules_buffers[0],
self.broadcast_bucket_size,
authoritative_rank,
)
0x03 C++国际
咱们接下来进入到 C++ 国际,看看这儿怎么支撑前向传达。详细分为:预备前向传达,重建桶,预备后向传达这几部分。
3.1 预备前向传达
这儿把 num_iterations_ 添加,并且记载时刻。
void Reducer::prepare_for_forward() {
std::lock_guard<std::mutex> lock(mutex_);
num_iterations_++; // 这儿会递增
if (should_collect_runtime_stats()) {
record_forward_compute_start_time();
}
}
4.2 重建桶
接下来进行重建桶,详细分为:
- 装备各种尺度约束。
- 核算桶的尺度。
- 同步桶indices。
- 初始化桶。
bool Reducer::rebuild_buckets() {
// Ensure reduction for previous backwards pass is finished. If user's model
// has unused parameters for example, this will raise an error recommending to
// run with find_unused_parameters=True, instead of the size mismatch
// exception below.
std::lock_guard<std::mutex> lock(mutex_);
ensure_prior_reduction_finished();
if (!should_rebuild_buckets() || rebuilt_params_.empty()) {
return false;
}
std::vector<std::vector<size_t>> rebuilt_bucket_indices;
// 装备各种尺度约束
std::vector<size_t> bucket_size_limits;
bucket_size_limits.push_back(kDefaultFirstBucketBytes);
bucket_size_limits.push_back(bucket_bytes_cap_);
// 核算桶的尺度
rebuilt_bucket_indices = compute_bucket_assignment_by_size(
rebuilt_params_,
bucket_size_limits,
expect_sparse_gradients_[0],
rebuilt_param_indices_);
// For rebuilt bucket indices, it needs to be synced across all ranks.
// Broadcast the newly rebuilt bucket indices from rank 0 in default.
// After syncing up rebuilt bucket indices, initialize buckets for reducer.
// 同步桶indices
sync_bucket_indices(rebuilt_bucket_indices);
has_rebuilt_bucket_ = true;
rebuilt_params_.clear();
rebuilt_param_indices_.clear();
// 初始化桶
initialize_buckets(std::move(rebuilt_bucket_indices));
return true;
}
咱们接下来详细看看怎么重建。
3.2.1 核算桶尺度
咱们首先要看看compute_bucket_assignment_by_size 之中关键结构如下,BucketAccumulator 能够认为是实践的桶。
struct BucketAccumulator {
std::vector<size_t> indices; // 桶内容,是张量列表
size_t size = 0; // 桶巨细,比如若干mb
}; // 桶的逻辑内容
// Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
buckets; // 一切桶的列表,每一个实践桶能够认为是 BucketAccumulator
其次,咱们来看看 compute_bucket_assignment_by_size的详细逻辑:
-
生成一个核算成果 result,并且运用参数tensors的巨细来为result预留出空间。
-
生成一个buckets,这是一切桶的列表,每一个实践桶能够认为是 BucketAccumulator
-
遍历传入的一切张量,关于每一个张量:
- 假如有index,就拿到张量的index。
- 假如装备了期待sparse gradient,则把这个张量自己放入一个桶,由于无法和其他张量放在一同。
- 运用张量信息构建桶的key。
- 运用 key 找到对应的桶, 拿到BucketAccumulator。
- 向该桶的张量列表 indices 里边刺进新张量的index,indices 是 tensor index list。
- 添加对应桶巨细。
- 假如需求,就设定成巨细约束的初始值。
- 假如桶的尺度大于最小值约束,便是说目前桶的尺度现已达到了桶的最大约束,按说需求搬运到新桶了(实践上的确搬运到了逻辑的新桶,可是实践仍是在现有桶内履行,由于 type, device 仍是相同的,仍是应该在原有桶内持续累积,不过原有桶的indice现已搬运到了result之中,就相当于清空了)。
- 把桶内容刺进到回来result,便是说,当桶尺度过大的时分,就先刺进到result之中。
- 利用 BucketAccumulator() 从头生成桶,bucket是个引证,所以直接赋值,就相当于清空原有的桶,便是原来桶持续用,可是桶内原有的indices现已搬运到了result之中。
-
把剩下的桶内indices刺进到回来值result。之前现已有些直接刺进到了result之中。
-
对 result 进行排序:
- 假如 tensor_indices 非空,阐明张量的次序现已是梯度预备好的次序,不需求再排序了。
- 假如 tensor_indices 是空的,根据最小张量index来排序,这儿假定张量的次序是他们运用的次序(或许说是他们梯度发生次序的反序)。这种排序可保证桶是依照接二连三的次序预备好。
- 留意,这儿便是正序排列,等到创建Reducer的时分,才反序传入:list(reversed(bucket_indices))
别的需求留意的是:由于 tensors便是 Python 代码中的参数 parameters[0],而 parameters[0] 是依照 parametes() 的回来成果来的,所以DDP终究是按model.parameters()的相反次序发动AllReduce。
std::vector<std::vector<size_t>> compute_bucket_assignment_by_size(
const std::vector<at::Tensor>& tensors,
const std::vector<size_t>& bucket_size_limits, // 桶巨细约束
const std::vector<bool>& expect_sparse_gradient,
const std::vector<int64_t>& tensor_indices) { //实践上,初始化时分没有传入 tensor_indices
// Either expect_sparse_gradient is not specified or it has as many elements
// as the vector with tensors.
TORCH_INTERNAL_ASSERT(
expect_sparse_gradient.empty() ||
(tensors.size() == expect_sparse_gradient.size()));
TORCH_INTERNAL_ASSERT(tensors.size() > 0);
std::vector<std::vector<size_t>> result;
result.reserve(tensors.size()); // 预留巨细
// Keep iterator into the size_limit vector by tensor type and device.
// This is done so that we can use the consecutive bucket limits per type.
std::unordered_map<
BucketKey,
std::vector<size_t>::const_iterator,
c10::hash<BucketKey>>
bucket_size_limit_iterators;
// Local accumulator type for a single bucket.
struct BucketAccumulator {
std::vector<size_t> indices; // 桶内容,是张量列表
size_t size = 0; // 桶巨细,比如若干mb
}; // 桶的逻辑内容
// Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
buckets; // 一切桶的列表,每一个实践桶能够认为是 BucketAccumulator
for (size_t i = 0; i < tensors.size(); i++) { // 遍历传入的一切张量
const auto& tensor = tensors[i]; //拿到张量
TORCH_CHECK(!tensor.is_sparse(), "No support for sparse tensors.");
// when tensor_indices is empty, the index of tensors[i] assigned to
// bucket is i, otherwise the tensor index is tensor_indices[i].
auto tensor_index = i; // 便是给一切的tensor一个index,从0开端递增,一直到 tensors.size()
if (!tensor_indices.empty()) {
tensor_index = tensor_indices[i]; // 假如有index,就拿到张量的index
}
// If we expect a sparse gradient to be produced for this tensor, it cannot
// be grouped together with other gradients and gets its own bucket.
// 假如装备了期待sparse gradient,则把这个张量自己放入一个桶,由于无法和其他张量放在一同
if (!expect_sparse_gradient.empty() &&
expect_sparse_gradient[tensor_index]) {
result.push_back({tensor_index});
continue;
}
auto key = BucketKey(tensor.scalar_type(), tensor.device()); //运用张量信息构建桶的key
auto& bucket = buckets[key]; // 找到对应的桶, 拿到BucketAccumulator
bucket.indices.push_back(tensor_index); // 该桶的张量列表里边刺进新张量的index,indices 是 tensor index list
bucket.size += tensor.numel() * tensor.element_size();// 添加对应桶巨细
// Initialize bucket size limit iterator if necessary.
// 假如需求,就设定成巨细约束的初始值
if (bucket_size_limit_iterators.count(key) == 0) {
bucket_size_limit_iterators[key] = bucket_size_limits.begin();
}
// bucket_size_limit_iterator 便是桶巨细的规模, 即 [_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_cap_mb * 1024 * 1024)]
auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key];
const auto bucket_size_limit = *bucket_size_limit_iterator; // 当前最小值约束
if (bucket.size >= bucket_size_limit) {
// 假如桶的尺度大于最小值约束,便是说目前桶的尺度现已达到了桶的最大约束,按说需求搬运到新桶了(实践上的确搬运到了逻辑的新桶,可是实践仍是在现有桶内履行,由于 type, device 仍是相同的,仍是应该在原有桶内持续累积,不过原有桶的indice现已搬运到了result之中,就相当于清空了)
result.emplace_back(std::move(bucket.indices)); // 把桶内容刺进到回来result,便是说,当桶尺度过大的时分,就先刺进到result之中。
bucket = BucketAccumulator(); // 从头生成桶,bucket是个引证,所以直接赋值,就相当于清空原有的桶,便是原来桶持续用,可是桶内原有的indices现已搬运到了result之中。
// Advance to the next bucket size limit for this type/device.
// 前进到下一个尺度约束
auto next = bucket_size_limit_iterator + 1;
if (next != bucket_size_limits.end()) {
bucket_size_limit_iterator = next;
}
}
}
// Add remaining buckets. 把剩下的桶内indices刺进到回来值,由于之前现已有些直接刺进到了result之中
for (auto& it : buckets) {
auto& bucket = it.second;
if (!bucket.indices.empty()) {
result.emplace_back(std::move(bucket.indices));
}
}
// If tensor_indices is not empty, the order of the tensors is in the gradient
// ready order, so no need to sort.
// If tensor_indices is empty, sort resulting buckets by the minimum tensor
// index they include. We assume that the order of the tensors is the order in
// which they are used (or the reverse order in which their gradients are
// produced). This sorting step ensures that the buckets are ready in
// consecutive order.
// 假如 tensor_indices 非空,阐明张量的次序现已是梯度预备好的次序,不需求再排序了
// 假如 tensor_indices 是空的,根据最小张量index来排序,这儿假定张量的次序是他们运用的次序(或许说是他们梯度发生次序的反序)。这种排序可保证桶是依照接二连三的次序预备好。
// 留意,这儿便是正序排列,等到创建Reducer的时分,才反序传入:list(reversed(bucket_indices))
if (tensor_indices.empty()) {
std::sort(
result.begin(),
result.end(),
[](const std::vector<size_t>& a, const std::vector<size_t>& b) {
// 关于任意两个vector,排序的根据是:用这两个vector之中最小index来排序
const auto amin = std::min_element(a.begin(), a.end()); // a中的最小index
const auto bmin = std::min_element(b.begin(), b.end()); // b中的最小index
return *amin < *bmin;
});
}
return result;
}
result 终究如下,里边每个vector 都对应了一个bucket,里边是都是 tensor 的 index,这儿都是从小到大次序排序。模型参数以(大致)Model.parameters()
与给定模型相反的次序分配到桶中 。运用相反次序的原因是由于 DDP 希望梯度在反向传递期间以大约该次序预备就绪。
+-----------------------------------------------------------------------+
| |
| <tensor index 1, tensor index 2, tensor index 3, tensor index 4> |
| |
| |
| <tensor index 5, tensor index 6, tensor 7> |
| |
| |
| ...... |
| |
| |
| <tensor index 8, tensor index 9, tensor index 10, tensor index 11> |
| |
+-----------------------------------------------------------------------+
3.2.2 同步桶indices
发生尺度之后,就运用 sync_bucket_indices 同步桶的indices,其逻辑如下:
- 遍历桶,把桶的巨细都记载到bucket_sizes。
- 装备TensorOptions。
- 把桶对应的indices和桶数目放入indices_tensor,这儿是通过 PyTorch accessor来对张量进行读写,accessor就像是一个张量,但它将张量的维度和 dtype 硬编码为了模板参数,能够高效的拜访元素。
- 由于 NCCL这样的 ProcessGroup 只支撑device之间的操作,所以把indices_tensor拷贝到indices_tensor_device。
- 对 indices_tensor_device 进行播送。
- 类似,对桶尺度进行播送。
- 播送完毕之后,遍历桶,运用从rank 0收到的num_buckets, bucket_sizes_tensor 和 indices_tensor 更新传进来的参数bucket_indices。
void Reducer::sync_bucket_indices(
std::vector<std::vector<size_t>>& bucket_indices) {
auto num_buckets = bucket_indices.size();
std::vector<size_t> bucket_sizes;
bucket_sizes.reserve(num_buckets);
int64_t total_size = 0;
//遍历桶,把桶的巨细都记载到bucket_sizes
for (size_t i = 0; i < num_buckets; i++) {
auto bucket_size = bucket_indices.at(i).size();
bucket_sizes.push_back(bucket_size);
total_size += bucket_size;
}
// 装备TensorOptions
at::TensorOptions options;
options = options.dtype(at::kInt);
options = options.device(replicas_[0][0].device());
// Group indices and num_bucket together into indices_tensor
// Broadcast this tensor first, as its size is equal among all processes
// 把桶对应的indices和桶数目放入indices_tensor,这儿是通过 PyTorch accessor来对张量进行读写,accessor就像是一个张量,但它将张量的维度和 dtype 硬编码为了模板参数,能够高效的拜访元素
auto indices_tensor = at::empty({total_size + 1}, at::kInt);
auto indices_accessor = indices_tensor.accessor<int, 1>();
auto indices_accessor_Index = 0;
for (size_t i = 0; i < num_buckets; i++) {
const auto& bucket_size = bucket_indices.at(i).size();
for (size_t j = 0; j < bucket_size; j++) {
indices_accessor[indices_accessor_Index++] = bucket_indices[i][j];
}
}
indices_accessor[indices_accessor_Index] = num_buckets;
// Copy CPU tensor to device tensor, as the process_group_ could be NCCL and
// it can only broadcast device tensors.
auto indices_tensor_device = at::empty({total_size + 1}, options);
// 由于 NCCL这样的 ProcessGroup 只支撑device之间的操作,所以把indices_tensor拷贝到indices_tensor_device
indices_tensor_device.copy_(indices_tensor, /*non_blocking=*/true);
std::vector<at::Tensor> indices_tensor_list = {indices_tensor_device};
// 对 indices_tensor_device 进行播送
process_group_->broadcast(indices_tensor_list)->wait();
indices_tensor.copy_(indices_tensor_list.front(), /*non_blocking=*/false);
// Update num_buckets after receiving it from rank 0
num_buckets = indices_accessor[indices_accessor_Index];
// Broadcast bucket_sizes
// 类似,对桶尺度进行播送
auto bucket_sizes_tensor = at::empty({(int64_t)num_buckets}, at::kInt);
auto bucket_sizes_accessor = bucket_sizes_tensor.accessor<int, 1>();
for (size_t i = 0; i < num_buckets; i++) {
// For rank != 0, it is possible that local num buckets bucket_sizes.size()
// is smaller than broadcasted num_buckets
bucket_sizes_accessor[i] =
bucket_sizes.at(std::min(i, (bucket_sizes.size() - 1)));
}
auto bucket_sizes_tensor_device = at::empty({(int64_t)num_buckets}, options);
bucket_sizes_tensor_device.copy_(bucket_sizes_tensor, /*non_blocking=*/true);
std::vector<at::Tensor> bucket_sizes_tensor_list = {
bucket_sizes_tensor_device};
process_group_->broadcast(bucket_sizes_tensor_list)->wait();
bucket_sizes_tensor.copy_(
bucket_sizes_tensor_list.front(), /*non_blocking=*/false);
// Clear bucket_indices first, and then update bucket_indices using received
// num_buckets, bucket_sizes_tensor and indices_tensor from rank 0
bucket_indices.clear();
bucket_indices.reserve(num_buckets);
indices_accessor_Index = 0;
// 遍历桶,运用从rank 0收到的num_buckets, bucket_sizes_tensor 和 indices_tensor 更新传进来的参数bucket_indices
for (size_t i = 0; i < num_buckets; i++) {
const auto& bucket_size = bucket_sizes_accessor[i];
std::vector<size_t> bucket;
bucket.reserve(bucket_size);
for (size_t j = 0; j < bucket_size; j++) {
bucket.push_back(indices_accessor[indices_accessor_Index++]);
}
bucket_indices.emplace_back(std::move(bucket));
}
}
3.2.3 初始化桶
同步之后便是初始化桶,本部分代码在前文现已剖析过,故此省掉。
3.3 预备后向传达
前向传达完结之后,调用 prepare_for_backward 完结了后向传达的预备。
详细大致分为两步:重置,查找未运用的参数。
void Reducer::prepare_for_backward(
const std::vector<torch::autograd::Variable>& outputs) {
std::lock_guard<std::mutex> lock(mutex_);
// 记载开端时刻
cpu_timer_.backward_compute_start_time = current_time_in_nanos();
if (should_collect_runtime_stats()) {
record_backward_compute_start_time();
}
// Reset accounting.
expect_autograd_hooks_ = true;
reset_bucket_counting();
// Reset unused parameter accounting.
has_marked_unused_parameters_ = false;
// Reset per iteration marked ready parameters.
perIterationReadyParams_.clear(); // 重置每次迭代的marked ready parameters
// If static graph is not set, search graph to detect unused parameters.
// When static graph is set, unused_parameters_ will be detected and will
// not change after 1st iteration.
// If static_graph_ = false and find_unused_parameters_ is false,
// we assume that autograd hooks for ALL variables will be called,
// and we don't have to search the autograd graph for presence of these hooks.
if (dynamic_graph_find_unused()) {
unused_parameters_.clear();
search_unused_parameters(outputs); // 查找没有运用的参数
}
}
3.3.1 重置
这儿会遍历桶,关于每个桶,重置其副本的pending状况,某一个模型副本pending状况是由这个模型副本中对应桶的变量数目决议。
假如是静态图,则重置numGradHooksTriggeredMapPerIteration_。
void Reducer::reset_bucket_counting() {
next_bucket_ = 0;
// Reset num_buckets_ready_ at the beginning of backward computation
// in each iteration.
num_buckets_ready_ = 0;
for (auto& bucket : buckets_) { // 遍历桶
for (auto& replica : bucket.replicas) {
replica.pending = replica.variables.size(); //关于每个桶,重置其副本的pending状况,某一个模型副本pending,是由这个模型副本中,本桶的变量数目决议
}
bucket.pending = bucket.replicas.size(); // 重置桶的pending状况,桶pending是由多少个模型副本决议
}
if (static_graph_) {
// 重置numGradHooksTriggeredMapPerIteration_
numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
}
}
3.3.2 查找未运用的参数
search_unused_parameters 完结了 “查找未运用的参数” 功用。
咱们首先要看看 Reducer 的 find_unused_parameters_ 成员变量。假如 find_unused_parameters_ 被设置为 true,则 DDP 会在前向传达完毕时分,从指定的输出进行回溯,遍历autograd核算图来找到一切没有运用过的参数,并且逐个标记为就绪 ready。
关于一切参数,DDP 都有一个指向它们的梯度累积函数的指针,但关于那些autograd图中不存在的参数,它们将在第一次调用autograd钩子时就被标记为预备就绪。
由于模型输出或许会被疏忽,所以这个操作不是立即完结的,咱们只是像在torch.autograd.backward()
这儿开端履行规约操作。
咱们能够发现,这么做开销会很大,为什么要这么做?这是由于核算动态图会改动。
- 练习时分,某次迭代或许只用到模型的一个子图,并且由于PyTorch 是动态核算,所以子图会在迭代期间改动,便是说,某些参数或许在下一次迭代练习时分被越过。
- 同时,由于一切参数在一开端就现已被分好桶,而 hook 又规则了只要整个桶 ready (即,pending == 0)之后才会进行通信,所以假如咱们不将未运用参数标记为 ready,整个通信进程就会无法进行。
// Traverse the autograd graph starting at the specified output.
// All parameters for which we have a pointer to their gradient accumulation
// functions, but don't show up in the autograd graph will be marked ready for
// for reduction as soon as the first autograd hook is called. This is not
// done immediately because the model output may be ignored, and we only
// want to start performing reductions on `torch.autograd.backward()`.
void Reducer::search_unused_parameters(
const std::vector<torch::autograd::Variable>& outputs) {
std::unordered_set<torch::autograd::Node*> seen;
std::vector<torch::autograd::Node*> queue;
RECORD_FUNCTION(
"torch.distributed.ddp.reducer::search_unused_parameters",
std::vector<c10::IValue>());
// Seed queue with the grad functions of all outputs.
for (const auto& output : outputs) {
const auto& grad_fn = output.grad_fn();
if (grad_fn) {
queue.push_back(grad_fn.get()); // 把一切输出节点的梯度函数刺进到queue
}
}
// Traverse the autograd graph starting at the specified output.
// 遍历这个queue中的元素,关于每一个函数,找到其后向图之中的后续边,然后把后续边指向的节点再刺进queue,然后持续循环,终究 seen 里边是一切从output动身,一切节点的梯度函数
while (!queue.empty()) {
auto fn = queue.back();
queue.pop_back();
for (const auto& edge : fn->next_edges()) {
if (auto next_ptr = edge.function.get()) {
const bool was_inserted = seen.insert(next_ptr).second;
if (was_inserted) {
queue.push_back(next_ptr);
}
}
}
}
// Find accumulator functions that don't show up in this graph.
// gradAccToVariableMap_ 里边是一切需求被规约的variable
// 遍历gradAccToVariableMap_,假如 seen 之中没有,就阐明这个参数没有被运用,刺进到unused_parameters_
for (const auto& it : gradAccToVariableMap_) {
// If the accumulator function is present in the graph, we know
// a gradient will be computed for the corresponding parameter.
if (seen.count(it.first) == 0) {
unused_parameters_.push_back(it.second);
}
}
// Warn user about unnecessary perf hit if all parameters were used in
// forward.
if (unused_parameters_.empty()) {
TORCH_WARN_ONCE(
"find_unused_parameters=True was specified in DDP constructor, "
"but did not find any unused parameters in the forward pass. This flag "
"results in an extra traversal of the autograd graph every iteration, "
" which can adversely affect performance. If your model indeed never "
"has any unused parameters in the forward pass, consider turning this "
"flag off. Note that this warning may be a false positive if your model "
"has flow control causing later iterations to have unused parameters.");
}
}
至此,前向传达现已完毕,咱们得到了如下:
- 需求核算梯度的参数现已分桶。
- 桶现已重建完毕。
- 前向传达现已完结。
- 从指定的输出进行回溯,遍历autograd核算图来找到一切没有运用过的参数,并且逐个标记为就绪 ready。
咱们在下一篇就剖析后向传达。
0xEE 个人信息
★★★★★★关于生活和技术的考虑★★★★★★
微信大众账号:罗西的考虑
0xFF 参考
pytorch分布式系列3——分布式练习时,torch.utils.data.distributed.DistributedSampler做了什么?
pytorch分布式系列1——搞清torch.distributed.launch相关的环境变量
pytorch分布式系列2——DistributedDataParallel是怎么做同步的?
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
Pytorch的nn.DataParallel
discuss.pytorch.org/t/dataparal…
pytorch.org/docs/stable…
PyTorch 源码解读之分布式练习了解一下?
实操教程|PyTorch AutoGrad C++层完结
PYTORCH 自动微分(一)
PyTorch怎么加快数据并行练习?分布式秘籍大揭秘
pytorch分布式练习(二init_process_group)
pytorch.org/tutorials/i…
pytorch.org/docs/master…
pytorch.org/tutorials/i…
PyTorch 源码解读之 DP & DDP:模型并行和分布式练习解析
Pytorch模型中的parameter与buffer