0x00 摘要

NVIDIA Megatron 是一个依据 PyTorch 的分布式练习框架,用来练习超大Transformer言语模型,其经过综合运用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得咱们深入剖析其背面机理。本系列有 5 篇文章,经过论文和源码和大家一同学习研讨。本文将看看 Megatron 怎样给流水线各个阶段组织履行履行序列。

本系列其他文章为:

[源码解析] 模型并行分布式练习Megatron (1) — 论文&根底

[源码解析] 模型并行分布式练习Megatron (2) — 整体架构

[源码解析] 模型并行分布式练习 Megatron (3) —模型并行完成

[源码解析] 模型并行分布式练习 Megatron (4) — 怎样设置各种并行

0x01 背景

在流水线练习之中,怎样给流水线各个阶段组织履行履行序列是一个要害,所以这儿咱们看看怎样做schedule。

关于 Megatron 来说,在练习时分,get_forward_backward_func 获取pipeline 的schedule,这儿分为 flush 和 interleaving 两种, 由于时刻所限,咱们只剖析 flush 的schedule,有爱好的读者能够自行研讨 interleaving。

def get_forward_backward_func():
    args = get_args()
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        if args.virtual_pipeline_model_parallel_size is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func

概括来说,Megatron 是依据 PipeDream-2BW 之上完成了定时改写。

  • PipeDream-2BW 在流水线之中保护了两个版别的模型权重,“2BW” 是 双缓冲权重(double-buffered weights)”,PipeDream-2BW 会为每个微批次生成一个新的模型版别K(K>d),可是由于有些剩余后向传递依然依赖于旧版别模型,所以新的模型版别无法当即替代旧版别,可是由于只保存了两个版别,所以极大下降了内存占用。
  • PipeDream-flush 则在 PipeDream-2BW 之上添加了一个大局同步的流水线更新改写操作,思路类似 GPipe。这种办法经过献身吞吐量的才能部分下降的价值来削减了内存占用(即只保护一个版别的模型权重)。

0x02 论文

Memory-Efficient Pipeline-Parallel DNN Training 和 Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM 是 Megatron 对应的相关论文,咱们就从论文开端研讨。注:下面论说内容是依据原论文发表时刻,由于各个开源系统也在演进,所以其针对其他开源系统的论说在今天看来不一定完全正确。

2.1 引论

近来,一些作业提出了流水线模型并行以加速模型并行练习。例如 GPipe(Huang等人,2019年)和PipeDream(Harlap等人 2018年;Narayanan等人,2019年)把多个输入顺序推送到一系列worker之中来练习,每个worker担任一个模型分区,这答应不同worker并行处理不同的输入。

  • 由于特定输入的向前和向后传达之间的权重版别不一致,Native 流水线可能会形成模型不收敛,现有技能权衡了内存占用和吞吐量,以不同的办法来避免这种状况。
  • GPipe保护单一权重版别,可是会定时进行流水线改写(图1a),详细改写时刻是在流水线练习完输入要更新权重时分,由于资源闲暇,这些改写约束了整体吞吐量。
  • PipeDream不会定时Flush流水线,但会存储多个权重版别,这增加了吞吐量,但也增加了内存占用,由于内存约束,无法练习大型模型。

所以,有效地练习大型模型需求一种一起具有高吞吐量和低内存占用的办法。此外,流水线并行系统的功能取决于DNN模型operators在 worker 上的区分办法。这具有挑战性,原因有三:

  • 内存容量约束:与模型分区相关的参数和中心激活需求能够放置在加速器的主设备内存之中。
  • 异构网络互连:现在的练习布置具有异构网络拓扑性,同一服务器上的设备之间具有更高的带宽链路。
  • 运算符怎样放置的大查找空间:跟着模型尺寸的增加,拆分运算符图在核算上变得十分贵重,由于不同分区办法数量是指数级的。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

图1a。不同流水线并行履行的timeline。后向传达的时刻假定为向前传达的两倍;向前传达以蓝色显现,后向传达以绿色显现。数字表明微批次ID,时刻沿x轴显现,每个worker的运用率沿y轴显现。GPipe保护单一权重版别,但会定时改写flush流水线。PipeDream不引进周期性流水线改写,但保护多个权重版别。

在论文中,作者介绍了PipeDream-2BW,一个高效的DNN模型流水线并行练习系统。PipeDream-2BW经过两个要害奉献完成了高吞吐和低内存占用。首要,作者提出了双缓冲权重更新(2BW),这是一种在避免流水线改写的一起削减练习内存占用的技能。

作者运用了这样一个现实,即每个输入生成的梯度不需求当即运用于权重,而是能够累积为“兼并(coalesced)”梯度,以约束保存的权重版别的数量。2BW没有在运用最近更新的权重之前改写流水线,而是将新权重用于新进入流水线的输入,一起将以前的权重版别(称为暗影版别)用于已在练习中的输入(in-flight inputs)。

每个 worker 的权重双缓冲产生了一种流水线方案,其吞吐量高于GPipe(无流水线改写),内存功率高于PipeDream(这儿是2个权重版别,而在PipeDream 的一个depth-d流水线中,最差的状况是d个权重版别)。

作者还介绍了2BW的一种变体(称为PipeDream Flush),它在吞吐量上进行权衡,以取得更低的内存占用和更高的功能。

2.2 背景

在本节中,作者简要概述DNN模型分布式练习的相关技能。

  • 数据并行
    • 数据并行用于扩展模型练习。运用数据并行性(Xing等人,2015),每个worker都有整个模型的副本,输入数据集在worker之间分片。Worker 定时聚合他们的梯度,以保证一切 Worker 都看到一致版别的权重。数据并行性不能练习没法放入单个worker的大型模型,但能够用于较小的模型分区(model partitions)。
    • 数据并行scale-out一般作业杰出,但存在两个约束:a)超出某一点,每个GPU的batch size变得太小,下降了GPU运用率并增加了通讯成本;b)能够运用的最大设备数量是batch size,这约束了可用于练习的加速器数量。所以人们提出了各种模型并行技能来处理这两个挑战。
  • 模型并行。关于不适合单个worker的大型模型,一般来说运用模型并行练习。
    • 运用模型并行性(Dean等人,2012年;Chilimbi等人,2014年),模型中的权重参数在可用worker上进行切割(每个transformer层内的矩阵乘法在多个GPU上切割),worker之间交流中心激活和梯度。层间模型并行性未充分运用资源,由于在任何时刻点最多只要一个作业进程处于活动状况。
    • Tensor(intra-layer)模型并行性(Shoeybi et al.,2019)容易导致要害路径中all-to-all通讯过于贵重。由于 tensor 并行所需的all-reduce通讯需求经过服务器间链路,这比多GPU服务器中可用的高带宽NVLink要慢,所以容易将模型分区的数量约束为单个服务器中的GPU数量。而且高度的模型并行会创建大量的小矩阵乘法 (GEMMs),这可能会下降GPU的运用率。
    • FlexFlow(Jia et al.,2018)展示了怎样运用模型和数据并行性拆分模型图,但在运用模型并行性时依然存在资源运用率低的问题。
  • 流水线并行。为了处理模型并行性的缺陷,最近的作业如PipeDream和GPipe提出了流水线并行性。
    • 经过流水线并行,将多个输入(而不是1个)注入到由层间(inter-layer)模型分区组成的练习中。这保证了核算资源得到更好的运用。
    • 一个批(batch)被切割成更小的微批(microbatches),并在这些微批之间以流水线办法履行。能够用各种办法将层分配给worker,而且输入的向前和向后传达能够运用各种不同方案。
    • 可是,简略的流水线可能会导致特定输入的前后传递之间的权重版别不匹配。详细来说,假如立刻用最新的权重版别来进行权重更新,那么在流水线之中,一个输入可能会看到的是向后传达更新的权重,而不是它在向前传达时分看到的权重,从而导致不正确的梯度核算。
    • 层分配和调度战略导致不同的功能权衡。不论方案怎样,为了坚持严厉的优化器语义,优化器过程需求跨设备同步,从而在每个批处理结束时进行流水线改写,答应微批处理完成履行(此刻不参加新的微批)用来Flush流水线的时刻最高能够到达50%,这取决于注入流水线的微批次数量。微批次数量与流水线尺寸的比率越大,流水线Flush所花费的时刻越短。因而,为了完成高功率,一般需求更大的batch size。

用户能够运用各种技能练习他们的大型模型,每种技能都有不同的权衡。此外,这些技能能够结合运用。可是,结合这些技能会产生杂乱(non-trivial)的交互,为了取得杰出的功能,需求细心地进行推理,才能做到在坚持严厉的优化器语义的一起最大化给定batch size的大型模型练习吞吐量。

要完成大规模的吞吐量,需求沿着多个轴进行创新和精心设计:高效的内核完成使大部分练习受核算约束,而不是内存约束;应该在设备上对核算图进行智能分区,以削减经过网络链路发送的字节数,一起约束设备闲暇时刻;运用特定范畴的通讯优化和快速硬件(最先进的GPU以及相同和不同服务器上GPU之间的高带宽链路)。

此外,论文作者还研讨了影响吞吐量的各种成分之间的相互作用,包括经历和剖析。依据这些研讨,论文作者就怎样配置分布式培训供给以下辅导原则:

  • 不同办法的并行以杂乱的办法相互作用:并行化战略影响通讯量、履行内核的核算功率,以及 Worker 因流水线改写(流水线气泡)而等候核算的闲暇时刻。例如,张量和流水线模型并行性的次优组合能够导致高达2更低的吞吐量,即便服务器之间的网络链路带宽较高;张量模型并行性在多GPU服务器中是有效的,但流水线模型并行性有必要用于更大的模型。

  • 用于流水线并行性的方案会影响通讯量、流水线气泡巨细以及用于存储激活的内存。

    • 超参数的值(如微批次巨细)会影响内存占用、在辅佐进程上履行的内核的算术功率以及流水线气泡巨细。微批次巨细的最佳值取决于详细问题,一个适宜取值能够将吞吐量提高15%。
    • 分布式培训是通讯密集型的。假如节点间互连较慢或更多的通讯密集型分区将阻止功能的扩展。
    • 论文没有研讨怎样自动探索并行战略的查找空间(如FlexFlow、PipeDream、Tarnawski 和DAPPLE),而是建议运用那些在实践中作用杰出的启发式办法。

2.3 流水线权重问题

咱们这儿回顾一下流水线权重问题。下图是朴素流水线履行状况,本质上是一种 async SGD。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

2.3.1 问题1

遇到的第一个问题是:在一般状况下,当核算第二个迭代时分,咱们需求依据第一个迭代更新之后的模型来核算。可是如下图所示,关于机器 1,当第二轮迭代开端时分(赤色圆圈的深蓝色2号),第一轮迭代的反向传达(浅绿色1号格)还没有开端。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

2.3.2 问题2

第二个问题是:关于机器2,当它进行第5个mini-batch的前向传达时分(第二行蓝色5),它依据更新两次的权重来进行前向核算(第二行蓝色5之前有两个绿色格子,意味着权重被更新了两次)。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

但进行第5个mini-batch的反向传达(第二行浅绿色5)时分,用到的权重是更新了4次的(第二行前面浅绿色的1,2,3,4总共会更新权重4次)。这与单节点深度学习假定冲突,会导致练习作用下降。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

PipeDream 作者为了处理这些问题,提出了 Weight Stashing,以保证相同输入的向前和后向传达中运用相同的权重版别(原论文图1b)。详细便是每个机器多备份几个版别的权重,前向传达用哪个权重核算,反向传达还用这个权重核算。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

就上图来说,机器1需求保存4个版别的权重,机器2需求保存3个版别的权重,机器3需求保存2个版别的权重,机器4需求保存1个版别的权重。在最坏的状况下,储存的权重版别总数是d,其中d是流水线深度,这关于大型模型来说内存占用太高了。运用PipeDream的默许权重更新语义,每个阶段的权重更新都有不同的延迟项,而且不会在流水线内履行累积。

2.3.3 问题3

另外一个问题是:现在做前向传达时分,每个机器核算时分,其依据的权重被更新的不同次数,比方第5个mini-batch(深蓝色的5),在机器 1 核算 5 时分,依据的权重是更新一次的(其前面有一个绿色),可是机器 2 核算 5 时分,依据的权重是更新两次的(其前面有两个绿色)。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

处理思路是:每次前向传达时分,每个机器依据更新最少的权重来核算,比方关于机器2,就疏忽绿色2更新的权重,关于机器3,就疏忽绿色2,3两次更新之后的权重,它们都运用被绿色1更新一次之后的权重(图上矩形框黄色 1 )。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

2.4 PipeDream-2BW 系统设计

PipeDream-2BW运用内存高效的流水线并行性来练习不适合单个加速器的大型模型。它的双缓冲权重更新(2BW)和改写机制保证了高吞吐量、低内存占用和类似于数据并行的权重更新语义。PipeDream-2BW将模型拆分为多个Worker上的多个阶段,并对每个阶段进行相同次数的仿制(在同一阶段的副本之间进行数据并行更新)。这种平行流水线适用于每层重复固定次数的模型(例如transformer模型)。

2.4.1 GPipe

GPipe保护模型权重的单一版别。输入批次被分红更小的微批次。权重梯度是累积的,不会当即运用,而且定时flush 流水线,以保证不需求坚持多个权重版别。GPipe供给了类似于数据并行的权重更新语义。原论文图1a显现了GPipe履行的时刻线。周期性流水线Flush可能会很贵重,从而约束吞吐量。缓解这一开支的一种办法是在流水线内进行额定的累积,但这并不总是切实可行的:a)在large scale factors下,能支撑的最小batch size较大(与scale factor成份额),且大批量会影响一切模型的收敛性,b)GPipe需求坚持与批巨细成份额的激活存储。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

2.4.2 Double-Buffered Weight Updates (2BW)

PipeDream-2BW结合1F1B调度(Narayanan等人,2019年)运用了一种新颖的双缓冲权重更新(2BW)方案,其中每个 worker 在不同输入的向前和向后传递之间替换,以保证在特定输入的向前和向后传递中运用相同的权重版别(论文原图2)。2BW的内存占用比PipeDream和GPipe低,而且避免了GPipe贵重的流水线改写。

梯度是以较小的mi-crobatches粒度核算的。关于任何输入微批次,PipeDream-2BW对输入的向前和向后传达运用相同的权重版别。在以批的粒度运用更新之前,会在多个微批次上累积更新,从而约束生成和保护的权重版别的数量。图2显现了2BW的时刻线示例。

PipeDream-2BW 为每m个微批次生成一个新的权重版别(m≥ d, d是流水线深度)。为了简略起见,作者首要假定m=d(图2中的d=4)。新权重版别不能当即运用。特别是,进行中的输入(in-flight)不能运用最新的权重版别进行向后传达(例如,在t=21时, worker 3上的输入7),由于这些输入的向前传递已在不同阶段运用较旧的权重版别发动。

因而,新生成的权重版别需求缓冲以备将来运用。可是,需求保护的权重版别总数最多为2,由于用于生成新权重版别的权重版别能够当即丢掉(经过该阶段的未来输入不再运用旧权重版别)。例如,在图2中,每个 worker 在处理完输入8 的 backward pass后都能够丢掉W(0),由于一切后续输入的前向传递和后向传递都运用更高的权重版别。

给定输入微批次k(依据1开端的索引)运用的权重版别为 max(⌊(k−1)/m⌋−1,0)max(⌊(k − 1)/m⌋ − 1, 0),其中m是批次中的微批次数(图2中的4)。关于输入k的向前和向后传达,此权重版别相同。m能够是任何 ≥ d 的数字,额定的梯度累积(较大的m)会增加大局 batch size。

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

论文原图2。时刻轴显现PipeDream-2BW的双缓冲权重更新 (2BW) 方案,时刻轴沿x轴进行。在不损失通用性的状况下,假定向后传达的时刻是向前传达的两倍。PipeDream-2BW在每个worker上只存储两个权重版别,削减了总内存占用,一起不再需求贵重的流水线暂停。Wi(v)W_i^{(v)}表明worker i上的具有版别v的权重(包括从输入v生成的权重梯度)。在方格绿色框中会生成新的权重版别; W4(4)W_4^{(4)}首要用在输入9的向前传达之中。

上图中的 Before 意思是丢掉版别之前系统的两个权重buffer,After 意思是做了丢掉动作之后系统的两个权重buffer。

2.4.2. Weight Updates with Flushes (PipeDream-Flush)

作者还提出了第二个内存高效的流水线方案,称为PipeDream Flush。它的内存占用比2BW和vanilla优化器语义更低,但以较低的吞吐量为价值。该方案重用PipeDream(Narayanan等人,2019年)的1F1B方案,但坚持单一权重版别,并引进定时流水线改写,以保证权重更新期间的一致权重版别。具有两个流水线阶段的PipeDream-Flush和GPipe的时刻表如图3所示。

为何要挑选 1F1B?由于它将n-flight microbatches 数量缩减到流水线深度 d,而不是GPipe的微批次数目 m,所以 1F1B 是memory-efficient。为了下降bubble time,一般来说,m >> d。

内存占用。运用PipeDream Flush,in-flight “活动”输入激活的总数小于或等于流水线深度,这使其内存占用比GPipe低,GPipe有必要坚持输入激活与梯度平均(m)的微批次数量成份额。PipeDream Flush的内存占用也低于PipeDream-2BW,由于它只需求保护一个权重版别(而PipeDream-2BW只需求2个)。

Semantics。定时流水线改写保证能够运用最新权重版别核算的梯度履行权重更新。这将使权重更新用如下办法进行:W(t+1)=W(t)−⋅∇f(W(t))W^{(t+1)} = W^{(t)} − ∇f(W^{(t)})

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

论文原图3。GPipe和PipeDream的时刻表将分为两个阶段。GPipe和PipeDream Flush都运用管道改写;PipeDream-Flush在安稳状况下替换进行向前和向后传达,经过约束仅保存进行中(in-flight)微批次的激活来坚持较低的内存占用。

0x03 PipeDream-Flush 完成

咱们前面提到,当没有设置 virtual_pipeline_model_parallel_size 时分,便是运用 Flush 办法得到流水线schedule,详细函数是 forward_backward_pipelining_without_interleaving。

def get_forward_backward_func():
    args = get_args()
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        if args.virtual_pipeline_model_parallel_size is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            # Flush schedule
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func

为何要挑选 1F1B?论文作者提到,由于它将in-flight microbatches 数量缩减到流水线深度 d,而不是GPipe的微批次数目 m,所以 1F1B 是memory-efficient。为了下降bubble time,一般来说,m >> d。

3.1 整体思路

3.1.1 缺省方案

GPipe提出了一个履行方案,其中首要履行一个批次中一切微批次的正向传达,然后履行一切微批次的反向传达(如图3所示)。咱们能够量化GPipe流水线气泡的巨细( )。咱们将批次中的微批次数量表明为,流水线阶段的数量(用于流水线并行的设备数量)为,每次迭代的抱负时刻为 (假定完美或抱负的缩放),以及履行单个微批次前进和后退通道的时刻 和。

在此方案中,流水线气泡包括:

  • 在批次开端时的 − 1 个前向传达。
  • 在批次结束时分的 − 1 个向后传达。

在流水线中花费的总时刻 = (−1)( +),所以此任务的处理时刻为 =( +)。因而,在流水线气泡中花费的核算时刻的抱负占比(fraction)为:

Bubbletimefraction(pipelinebubblesize)=tpbtid=p−1mBubble\ time\ fraction (pipeline\ bubble\ size) = \frac{t_{pb}}{t_{id}} = \frac{p-1}{m}

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

图3 : GPipe流水线方案,一切微批次(以数字表明)均为前向传达(蓝色),然后为后向传达(绿色)。灰色区域表明流水线气泡。为简略起见,咱们假定前向传达的时刻是后向传达的两倍。流水线方案的功率不取决于此刻刻要素。本例中的每个批次由8个微批次组成,每个蓝色或绿色框中的数字是给相应微批次的仅有标识符(比方,第一批由1− 8个微批次组成,第二批由微批次9− 16组成等)。优化器在流水线改写时进行步进(step)并更新权重参数,以保证严厉的优化器语义。

为了使气泡时刻占比(fraction)很小,咱们需求 ≫ 。可是关于这么大的, 这种办法具有很高的内存占用,由于它需求将中心激活(或在运用激活重新编译时仅为每个流水线阶段输入激活)保存在内存中,以供一切 个微批次在练习迭代的整个生命周期中都运用到。

3.1.2 PipeDream方案

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

PipeDream-Flush 把一个迭代分红三个阶段:

  • 预热前向传达阶段(warmup forward passes):在这儿,除了最终一个stage,每个worker 会做前向核算,进行不同数目的前向传达,而且向其下流发送激活,一向到最终一个stage被激起。该方案将履行中的(in-flight)微批次数量(未完成反向传达且需求坚持激活的微批次数量)约束在流水线深度之内,而不是一个批次中的微批次数量。

  • 安稳 1F1B 阶段(Run 1F1B in steady state):进入安稳状况之后,每个 worker 都进行1F1B 操作。

  • 冷却反向传达阶段(Cooldown backward passes):此阶段会把履行中的(in-flight)的微批次履行结束,仅仅履行反向核算和向反向核算下流发送梯度。

这个新方案在气泡中花费的时刻与GPipe是相同的,可是未完成的向前传达的数量最多和流水线阶段的数量一样。因而,该方案要求将激活削减到 或更少的微批次(GPipe方案则是 m 个微批次)。因而,当 ≫ 的时分, PipeDream-Flush 的内存功率比GPipe高得多。

咱们首要给出详细代码如下,后续会逐步剖析。

def forward_backward_pipelining_without_interleaving(forward_step_func, data_iterator,
                                                     model, optimizer, timers,
                                                     forward_only):
    """Run non-interleaved 1F1B schedule, with communication between pipeline
    stages.
    Returns dictionary with losses if the last stage, empty dict otherwise."""
    timers = get_timers()
    assert len(model) == 1
    model = model[0]
    # Compute number of warmup microbatches.
    num_microbatches = get_num_microbatches()
    num_warmup_microbatches = \
        (mpu.get_pipeline_model_parallel_world_size() -
         mpu.get_pipeline_model_parallel_rank() - 1)
    num_warmup_microbatches = min(
        num_warmup_microbatches,
        num_microbatches)
    num_microbatches_remaining = \
        num_microbatches - num_warmup_microbatches
    unwrapped_model = unwrap_model(
        model, (torchDDP, LocalDDP, Float16Module))
    model_type = unwrapped_model.model_type
    rank = mpu.get_pipeline_model_parallel_rank()
    recv_tensor_shapes = get_tensor_shapes(rank-1, model_type)
    send_tensor_shapes = get_tensor_shapes(rank, model_type)
    # Input, output tensors only need to be saved when doing backward passes
    input_tensors = None
    output_tensors = None
    if not forward_only:
        input_tensors = []
        output_tensors = []
    losses_reduced = []
    # Run warmup forward passes.
    for i in range(num_warmup_microbatches):
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
        output_tensor = forward_step(forward_step_func, data_iterator, model,
                                     input_tensor, losses_reduced)
        send_forward(output_tensor, send_tensor_shapes, timers=timers)
        if not forward_only:
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)
    # Before running 1F1B, need to receive first forward tensor.
    # If all microbatches are run in warmup / cooldown phase, then no need to
    # receive this tensor here.
    if num_microbatches_remaining > 0:
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
    # Run 1F1B in steady state.
    for i in range(num_microbatches_remaining):
        last_iteration = (i == (num_microbatches_remaining - 1))
        output_tensor = forward_step(forward_step_func, data_iterator, model,
                                     input_tensor, losses_reduced)
        if forward_only:
            send_forward(output_tensor, send_tensor_shapes, timers=timers)
            if not last_iteration:
                input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
        else:
            output_tensor_grad = \
                send_forward_recv_backward(output_tensor,
                                           send_tensor_shapes,
                                           timers=timers)
            # Add input_tensor and output_tensor to end of list.
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)
            # Pop input_tensor and output_tensor from the start of the list for
            # the backward pass.
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)
            input_tensor_grad = \
                backward_step(optimizer, input_tensor, output_tensor,
                              output_tensor_grad)
            if last_iteration:
                input_tensor = None
                send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
            else:
                input_tensor = \
                    send_backward_recv_forward(
                        input_tensor_grad, recv_tensor_shapes, timers=timers)
    # Run cooldown backward passes.
    if not forward_only:
        for i in range(num_warmup_microbatches):
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)
            output_tensor_grad = recv_backward(send_tensor_shapes, timers=timers)
            input_tensor_grad = \
                backward_step(optimizer, input_tensor, output_tensor,
                              output_tensor_grad)
            send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
    return losses_reduced

3.2 发动阶段

这是在每个 worker 之上都会做的,每个worker 的rank 不同,详细逻辑如下:

  • 首要需求确定本worker在热身阶段需求履行的微批次数目,是min((world-size – rank – 1), num_microbatches),由于rank是顺次递增,所以热身所需的微批次会逐次递减,直到为0,这样就会直接进入安稳阶段进行核算,比方 world size 为5,rank区间为0~4,微批次数目为4,则从前往后几个stage的热身批次为 5 – 0 – 1 = 4, 5 – 1 – 1 = 3, 5 – 2 – 1 = 2, 5 – 3 – 1 = 1, 5 – 4 – 1 = 0(就直接进入安稳状况)。
  • 其次核算安稳阶段所需求核算的微批次。
  • 当需求进行反向传达时分,需求树立两个FIFO行列,input_tensors 保存来自上游的激活,output_tensors 保存来自下流的激活。
timers = get_timers()
assert len(model) == 1
model = model[0]
# Compute number of warmup microbatches.
num_microbatches = get_num_microbatches() # 得到微批次数目
# 需求确定本worker在热身阶段需求履行的微批次数目,是min((world-size - rank - 1), num_microbatches)
# 由于rank是顺次递增,所以热身所需的微批次会逐次递减,直到为0,这样就会直接进入安稳阶段进行核算
# 比方 world size 为5,rank区间为0~4,微批次数目为4,则从前往后几个stage的热身批次为 5 - 0 - 1, 5 - 1 - 1, 5 - 2 - 1, 5 - 3 - 1, 5 - 4 - 1。
num_warmup_microbatches = \
    (mpu.get_pipeline_model_parallel_world_size() -
     mpu.get_pipeline_model_parallel_rank() - 1) 
num_warmup_microbatches = min(
    num_warmup_microbatches,
    num_microbatches) 
# 核算安稳阶段所需求核算的微批次
num_microbatches_remaining = \
    num_microbatches - num_warmup_microbatches
unwrapped_model = unwrap_model(
    model, (torchDDP, LocalDDP, Float16Module))
model_type = unwrapped_model.model_type
rank = mpu.get_pipeline_model_parallel_rank()
recv_tensor_shapes = get_tensor_shapes(rank-1, model_type)
send_tensor_shapes = get_tensor_shapes(rank, model_type)
# Input, output tensors only need to be saved when doing backward passes
# 当需求进行反向传达时分,需求树立两个行列,input_tensors 保存来自上游的激活,output_tensors 保存来自下流的激活
input_tensors = None
output_tensors = None
if not forward_only:
    input_tensors = []
    output_tensors = []
losses_reduced = []

3.3 热身阶段

热身阶段会依据本worker在热身阶段需求履行的微批次数目,顺次进行处理:

  • 从上游获取输入激活。
  • 本地进行前向核算,上游输入的激活便是本stage的输入。
  • 向下流发送本地激活。
  • 假如需求反向传达,则每个 worker 在 input_tensor 之中保存上游激活,在output_tensor 之中保存发送给下流的激活。
  • 早期阶段会运转尽可能多的向前传达,这样后期阶段能够当即从1F1B开端。
# Run warmup forward passes.
for i in range(num_warmup_microbatches):
    # 从上游获取输入激活
    input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
    # 本地进行前向核算,上游输入的激活便是本stage的输入
    output_tensor = forward_step(forward_step_func, data_iterator, model,
                                 input_tensor, losses_reduced)
    # 向下流发送本地激活
    send_forward(output_tensor, send_tensor_shapes, timers=timers)
    if not forward_only:
        input_tensors.append(input_tensor) # 保存上游激活
        output_tensors.append(output_tensor) # 保存本地核算的激活,便是发送给下流的激活

其中,第一个stage由于没有上游,所以recv_forward将会回来None,其他状况下将回来一个上游激活。

def recv_forward(tensor_shapes, timers):
    input_tensors = []
    for tensor_shape in tensor_shapes:
        if tensor_shape is None:
            input_tensors.append(None)
        else:
            input_tensors.append(p2p_communication.recv_forward(tensor_shape,
                                                                timers=timers))
    return input_tensors

3.4 通讯模块

3.4.1 根底通讯办法

pipeline parallelism需求inter-stage的P2P通讯,其首要完成是_communnicate函数,_communicate 函数首要是封装了 PyTorch 的根底通讯函数,给流水线并行供给了stage之间的双向P2P通讯。在此根底之上,又封装了一些API办法。这个函数的注释写得不错,解释得十分清楚。这儿需求注意的是:每个层怎样知道自己在流水线之中上下流 rank 是什么?这是经过例如这样的调用mpu.get_pipeline_model_parallel_next_rank() 来知道的。

_communicate 详细代码如下:

def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
                 tensor_shape,
                 use_ring_exchange=False,
                 dtype_=None):
    """Communicate tensors between stages. Used as helper method in other
    communication methods that are used in megatron/schedules.py.
    Takes the following arguments:
        tensor_send_next: tensor to send to next rank (no tensor sent if
                          set to None).
        tensor_send_prev: tensor to send to prev rank (no tensor sent if
                          set to None).
        recv_prev: boolean for whether tensor should be received from
                   previous rank.
        recv_next: boolean for whether tensor should be received from
                   next rank.
        tensor_shape: shape of tensor to receive (this method assumes that all
                      tensors sent and received in a single function call are
                      the same shape).
        use_ring_exchange: boolean for whether torch.distributed.ring_exchange()
                           API should be used.
        dtype_: optional, this is used when the tensor that needs to be
                communicated is different from args.params_dtype.
    Returns:
        (tensor_recv_prev, tensor_recv_next)
    """
    args = get_args()
    # Create placeholder tensors for receive in forward and backward directions
    # if needed.
    tensor_recv_prev = None
    tensor_recv_next = None
    # Some legacy inference code doesn't set the tensor shape, do so now
    # for the normal values for gpt/bert. This could be removed if inference
    # code is changed to provide tensor_shape.
    if tensor_shape is None:
        tensor_shape = (args.seq_length, args.micro_batch_size, args.hidden_size)
    override_scatter_gather_tensors_in_pipeline = False
    if args.scatter_gather_tensors_in_pipeline:
        tensor_chunk_shape = reduce(operator.mul, tensor_shape, 1)
        if tensor_chunk_shape % mpu.get_tensor_model_parallel_world_size() == 0:
            tensor_chunk_shape = tensor_chunk_shape // \
                mpu.get_tensor_model_parallel_world_size()
        else:
            tensor_chunk_shape = tensor_shape
            override_scatter_gather_tensors_in_pipeline = True
    else:
        tensor_chunk_shape = tensor_shape
    dtype = args.params_dtype
    if args.fp32_residual_connection:
        dtype = torch.float
    requires_grad = True
    if dtype_ is not None:
        dtype = dtype_
        requires_grad = False
    # 假如需求承受张量,则先分配空张量,承受的张量会存在此处    
    if recv_prev:
        tensor_recv_prev = torch.empty(tensor_chunk_shape,
                                       requires_grad=requires_grad,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)
    if recv_next:
        tensor_recv_next = torch.empty(tensor_chunk_shape,
                                       requires_grad=requires_grad,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)
    # Split tensor into smaller chunks if using scatter-gather optimization.
    if not override_scatter_gather_tensors_in_pipeline and \
            args.scatter_gather_tensors_in_pipeline:
        if tensor_send_next is not None:
            tensor_send_next = mpu.split_tensor_into_1d_equal_chunks(tensor_send_next)
        if tensor_send_prev is not None:
            tensor_send_prev = mpu.split_tensor_into_1d_equal_chunks(tensor_send_prev)
    # Send tensors in both the forward and backward directions as appropriate.
    if use_ring_exchange:
        # 假如需求,则运用ring exchange,这个是新版别PyTorch才有
        torch.distributed.ring_exchange(tensor_send_prev=tensor_send_prev,
                                        tensor_recv_prev=tensor_recv_prev,
                                        tensor_send_next=tensor_send_next,
                                        tensor_recv_next=tensor_recv_next,
                                        group=mpu.get_pipeline_model_parallel_group())
    else:
        # 先依据方针rank生成对应的torch.distributed.P2POp,放入列表
        ops = []
        if tensor_send_prev is not None:
            send_prev_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(send_prev_op)
        if tensor_recv_prev is not None:
            recv_prev_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(recv_prev_op)
        if tensor_send_next is not None:
            send_next_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(send_next_op)
        if tensor_recv_next is not None:
            recv_next_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(recv_next_op)
        # 然后做批量异步send/recv
        if len(ops) > 0:
            reqs = torch.distributed.batch_isend_irecv(ops)
            for req in reqs:
                req.wait() # 用wait来同步
    # To protect against race condition when using batch_isend_irecv().
    torch.cuda.synchronize()
    # If using scatter-gather optimization, gather smaller chunks.
    # 特殊优化,21年论文中提到,大概由于做了all-reduce,因而能够先split发送,下流gather成一致数据
    # 有爱好读者能够深入研讨论文和代码,
    if not override_scatter_gather_tensors_in_pipeline and \
            args.scatter_gather_tensors_in_pipeline:
        if recv_prev:
            tensor_recv_prev = mpu.gather_split_1d_tensor(
                tensor_recv_prev).view(tensor_shape).requires_grad_()
        if recv_next:
            tensor_recv_next = mpu.gather_split_1d_tensor(
                tensor_recv_next).view(tensor_shape).requires_grad_()
    return tensor_recv_prev, tensor_recv_next

3.4.2 API

在 _communicate 的根底之上,封装了众多API函数,首要便是依据参数的不同来做不同处理,比方:

def send_backward_recv_forward(input_tensor_grad, tensor_shape=None, timers=None):
    """Batched send and recv with previous rank in pipeline."""
    if mpu.is_pipeline_first_stage():
        input_tensor = None
    else:
        input_tensor, _ = _communicate(
            tensor_send_next=None,
            tensor_send_prev=input_tensor_grad,
            recv_prev=True,
            recv_next=False,
            tensor_shape=tensor_shape)
    return input_tensor

3.4.3 流水线上下流

以下若干函数用来确定流水线上下流,结合前一篇文章咱们知道,假如本进程是 rank 2,则流水线进程组 ranks 是 [g2, g6, g10, g14],那么其下流便是 rank 6。


def get_pipeline_model_parallel_first_rank():
    return _PIPELINE_GLOBAL_RANKS[0]
def get_pipeline_model_parallel_last_rank():
    last_rank_local = get_pipeline_model_parallel_world_size() - 1
    return _PIPELINE_GLOBAL_RANKS[last_rank_local]
def get_pipeline_model_parallel_next_rank():
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]
def get_pipeline_model_parallel_prev_rank():
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]

3.5 安稳阶段

安稳阶段的整体逻辑如下:前向核算 -> 发送激活给前向核算下流 & 从下流承受梯度 -> 后向核算 -> 给上游发送本worker核算的梯度 & 从上游承受激活。

3.5.1 逻辑

安稳阶段详细逻辑如下:

  1. forward_step :拿到一个微批次(上游激活),进行本地前向核算。
  2. send_forward:
    1. 假如仅仅前向传达,则调用send_forward把本地结算结果发送给下流。
    2. 否则调用 send_forward_recv_backward : 本地核算结果发给下流,再从下流承受其梯度。
  3. 每个 worker 在 input_tensor 之中保存上游激活,在output_tensor 之中保存发送给下流的激活。
  4. backward_step : 本地后向核算。
    1. 从行列中弹出第一个未处理的(便是最早未处理的)上游激活。
    2. 从行列弹出对应的本地激活。
    3. 进行反向核算,运用(上游激活,本地激活,下流梯度)来对最早的未处理的微批次进行反向核算,得到本地梯度。
  5. send_backward:
    1. 假如是最终一个微批次,只需求把本地梯度 input_tensor_grad 传递给前向核算的上游。
    2. 否则调用 send_backward_recv_forward 把本地梯度 input_tensor_grad 传递给前向核算的上游,还需求从上游再获取一个激活值。
  6. 跳回1继续处理下一个微批次(上游激活)。
# Before running 1F1B, need to receive first forward tensor.
# If all microbatches are run in warmup / cooldown phase, then no need to
# receive this tensor here.
if num_microbatches_remaining > 0:
    # 需求在安稳状况下运转,所以得拿到前面层的激活值
    input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
# Run 1F1B in steady state.
for i in range(num_microbatches_remaining):
    last_iteration = (i == (num_microbatches_remaining - 1))
    # 前向核算
    output_tensor = forward_step(forward_step_func, data_iterator, model,
                                 input_tensor, losses_reduced)
    if forward_only:
        send_forward(output_tensor, send_tensor_shapes, timers=timers)
        if not last_iteration:
            input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
    else:
        # 发送中心激活给下流,而且从下流获取其反向梯度
        output_tensor_grad = \
            send_forward_recv_backward(output_tensor,
                                       send_tensor_shapes,
                                       timers=timers)
        # Add input_tensor and output_tensor to end of list.
        input_tensors.append(input_tensor) # 保存上游激活到行列
        output_tensors.append(output_tensor) # 保存本地核算的激活,便是发送给下流的激活到行列
        # Pop input_tensor and output_tensor from the start of the list for
        # the backward pass.        
        input_tensor = input_tensors.pop(0) # 从行列中弹出第一个未处理的(便是最早未处理的)上游激活
        output_tensor = output_tensors.pop(0) # 从行列弹出对应的本地激活
        # 反向核算,运用(上游激活,本地激活,下流梯度)来对最早的未处理的微批次进行反向核算,得到本地梯度
        input_tensor_grad = \
            backward_step(optimizer, input_tensor, output_tensor,
                          output_tensor_grad) # 下流传来的梯度在这儿
        if last_iteration:
            input_tensor = None
            # 假如是最终一个微批次,把本地梯度 input_tensor_grad 传递给前向核算的上游
            send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
        else:
            # 假如不是最终一个微批次,把本地梯度 input_tensor_grad 传递给前向核算的上游,还需求从上游再获取一个激活值
            input_tensor = \
                send_backward_recv_forward(
                    input_tensor_grad, recv_tensor_shapes, timers=timers)

3.5.2 串行

其中,send_forward_recv_backward 这个从姓名就能看到逻辑,这个函数先发送给下流,再从下流承受。

def send_forward_recv_backward(output_tensors, tensor_shapes, timers):
    if not isinstance(output_tensors, list):
        output_tensors = [output_tensors]
    output_tensor_grads = []
    for (output_tensor, tensor_shape) in zip(output_tensors, tensor_shapes):
        if tensor_shape is None:
            output_tensor_grads.append(None)
            continue
        # 发送自己的激活,然后得到下流传上来的梯度
        output_tensor_grad = p2p_communication.send_forward_recv_backward(
                output_tensor, tensor_shape, timers=timers)
        output_tensor_grads.append(output_tensor_grad)
    return output_tensor_grads #回来梯度

能够发现,关于单个 worker,都是堵塞进行,由于 send 和 recv 都是堵塞,这样通讯和核算有必要串行,不能堆叠。由于前面热身阶段已经把前向传递一向从 worker 0 传送到 worker d,所以 worker d 能够直接拿到 input,就进行处理,然后直接进行反向核算,然后回来给上游。所以串行也无所谓。咱们从论文之中的图例也能够看出来:

[源码解析] 模型并行分布式训练Megatron (5) --Pipedream Flush

图:PipeDream-Flush在安稳状况下替换进行向前和向后传达,经过将激活躲藏约束为仅履行中(in-flight)的微批次来坚持较低的内存占用。从图上能够看到:

  • Worker 1 的履行序列是:1 FW(warmup), 2 FW, 1 BW,3 FW,2 BW,4 FW,3 BW,4 BW(cooldown)
  • Worker 2 的履行序列是:1 FW,1BW, 2 FW, 2 BW, 3 FW, 3 BW, 4 FW, 4 BW,worker 2直接就进入了安稳状况。

3.6 冷却阶段

冷却阶段和热身阶段对称,也履行num_warmup_microbatches个过程,可是只做反向传达。这个阶段由所以整理未结束的反向传达,所以仅仅从行列中pop。详细便是弹出上游激活和传递给下流的激活,然后进行梯度核算。

# Run cooldown backward passes.
if not forward_only:
    for i in range(num_warmup_microbatches):
        input_tensor = input_tensors.pop(0)
        output_tensor = output_tensors.pop(0)
        output_tensor_grad = recv_backward(send_tensor_shapes, timers=timers)
        input_tensor_grad = \
            backward_step(optimizer, input_tensor, output_tensor,
                          output_tensor_grad)
        send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
return losses_reduced

3.7 Flush 体现在哪里?

咱们需求看看 megatron/training.py。便是一次练习step的流程。这儿在 update_successful, grad_norm, num_zeros_in_grad = optimizer.step() 时分会调用优化器进行参数更新,此刻,内部两个激活值行列也悉数清空过了,所以在这个时刻点上,flush也就完成了。

def train_step(forward_step_func, data_iterator,
               model, optimizer, lr_scheduler):
    """Single training step."""
    args = get_args()
    timers = get_timers()
    # 1. 把梯度归零
    # Set grad to zero.
    if args.DDP_impl == 'local' and args.use_contiguous_buffers_in_local_ddp:
        for partition in model:
            partition.zero_grad_buffer()
    optimizer.zero_grad()
    # 2. 进行前向,后向传达,关于本章来说,便是调用forward_backward_pipelining_without_interleaving
    forward_backward_func = get_forward_backward_func()
    losses_reduced = forward_backward_func(
        forward_step_func, data_iterator, model,
        optimizer, timers, forward_only=False)
    # 到了这儿,整个流水线处理结束,loss 和 梯度都核算结束
    # Empty unused memory
    if args.empty_unused_memory_level >= 1:
        torch.cuda.empty_cache()
    # 3. 数据并行的all-reduce
    # All-reduce if needed.
    if args.DDP_impl == 'local':
        for model_module in model:
            model_module.allreduce_gradients()
    # All-reduce word_embeddings' grad across first and last stages to ensure
    # that word_embeddings parameters stay in sync.
    # This should only run for models that support pipelined model parallelism
    # (BERT and GPT-2).
    # 4. 嵌入层 all-reduce,嵌入层也进行了权重共享,所以要进行all-reduce来保证参数一致
    if mpu.is_rank_in_embedding_group(ignore_virtual=True) and \
            mpu.get_pipeline_model_parallel_world_size() > 1:
        if mpu.is_pipeline_first_stage(ignore_virtual=True):
            unwrapped_model = model[0]
        elif mpu.is_pipeline_last_stage(ignore_virtual=True):
            unwrapped_model = model[-1]
        else:  # We do not support the interleaved schedule for T5 yet.
            unwrapped_model = model[0]
        unwrapped_model = unwrap_model(
            unwrapped_model, (torchDDP, LocalDDP, Float16Module))
        if unwrapped_model.share_word_embeddings:
            word_embeddings_weight = unwrapped_model.word_embeddings_weight()
            if args.DDP_impl == 'local':
                grad = word_embeddings_weight.main_grad
            else:
                grad = word_embeddings_weight.grad
            torch.distributed.all_reduce(grad, group=mpu.get_embedding_group())
    # Update parameters.
    # 5. 更新参数,这儿才进行Flush,
    update_successful, grad_norm, num_zeros_in_grad = optimizer.step()
    # Update learning rate.
    if update_successful:
        increment = get_num_microbatches() * \
                    args.micro_batch_size * \
                    args.data_parallel_size
        lr_scheduler.step(increment=increment)
        skipped_iter = 0
    else:
        skipped_iter = 1
    # Empty unused memory
    if args.empty_unused_memory_level >= 2:
        torch.cuda.empty_cache()
    if mpu.is_pipeline_last_stage(ignore_virtual=True):
        # Average loss across microbatches.
        loss_reduced = {}
        for key in losses_reduced[0]:
            losses_reduced_for_key = [x[key] for x in losses_reduced]
            loss_reduced[key] = sum(losses_reduced_for_key) / len(losses_reduced_for_key)
        return loss_reduced, skipped_iter, grad_norm, num_zeros_in_grad
    return {}, skipped_iter, grad_norm, num_zeros_in_grad

至此,NVIDIA Megetron 剖析结束,咱们接下来运用 NVIDIA HugeCTR 看看怎样处理大型稀疏嵌入。

0xEE 个人信息

★★★★★★关于生活和技能的考虑★★★★★★

微信公众账号:罗西的考虑

0xFF 参考

[细读经典]Megatron论文和代码详细剖析(2)

[细读经典]Megatron论文和代码详细剖析(1)

Megatron-LM源码阅读(一)

Megatron-LM源码阅读(二)

megatron学习总结

GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism

PipeDream-Flush

PipeDream: 数据并行+流水线

PipeDream-interleaved