撰文 |李响

1

大规模模型分片存储简介

在模型比较小时(如 100G 以下),还有或许选用单机存储。当模型参数量比较大时,要求的样本数也更大,练习后做 dump 出来的模型也会很大,单机必定放不下。

比方,由 DeepSpeed 和 Megatron 驱动的 Megatron 图灵自然语言生成模型(MT-NLG)具有 5300 亿个参数,是迄今为止练习过的最大和最强大的单片 Transformer 语言模型,支撑这样的大规模语言模型需求分片保存和加载,不会运用单机内存。此外,在其他 CV、搜索、引荐和广告类等场景下,读取样本量增多和模型复杂度增加都会带来模型存储上的难题。

本文将介绍 OneFlow 的大模型分片保存、加载战略以及运用办法。

2

OneFlow 模型分片保存和加载

OneFlow 的大模型分片保存和加载的完成根据大局视角(Global View,
docs.oneflow.org/master/cook… )的概念,既运用 Placement 与 SBP 完结模型文件(下文都用 state dict 表示)在各个物理设备上的切分,适用于当模型大到无法在单个设备的内存或显存上包容下的场景。

flow.utils.global_view.to_global() 接口介绍

为了更好了解下文保存模型和加载模型两个部分的内容,首要对
flow.utils.global_view.to_global()接口和其完成思路进行分析。

区别于现有的Tensor.to_global()模式(能够处理一般的 Tensor,
oneflow.readthedocs.io/en/master/g… ),提供了多种类型的输入支撑,包含 None、Tensor、List、Tuple、nn.Module 的 state dict 、nn.Graph 的 state dict 和几种类型的任意组合,既将 List/Tuple/Dict 中的输入 Tensor 转化为 Global Tensor。值得注意的是,其传入参数中的 SBP 支撑用户自定义一个(x, tensor) -> sbp的函数来处理不同 Tensor 对应不同 SBP 的需求。

并且,与 to_global() 对应的还有
flow.utils.global_view.to_local()接口。能够参阅 API 文档中关于 to_global() 和 to_local() 更具体的介绍(
oneflow.readthedocs.io/en/master/u… )。在
flow.utils.global_view.to_global()的完成(
github.com/Oneflow-Inc… )中,支撑了多种输入类型适用于现有的Tensor.to_global()接口。完成的整体思路大致为检查输入、广播(空)结构,遍历节点、调用回调函数和返回 to_global() 后的成果。

再回到咱们重视的当地,这个接口怎么做到模型分片保存和加载?

比方关于模型并行/流水并行,模型的参数涣散在多个 Rank 上,在保存模型前经过
flow.utils.global_view.to_global()将 state dict 里的每个 Tensor 在指定 Placement 上转为 Global Tensor,SBP 的类型为flow.sbp.split,能够设置在特定维度上的切分。同样的,模型也能够按 Split 被加载。当然,SBP 也能够为 Broadcast,支撑不同的 SBP 和 Placement 组合。这样,超大规模模型分片存储的问题就被非常好地处理了。

保存模型

大致了解
flow.utils.global_view.to_global()接口后,在这一部分演示了怎么分片保存模型,代码如下:

# 自定义 get_sbp 函数。
def get_sbp(state_dict, tensor):
    if tensor is state_dict["System-Train-TrainStep"]:
        return flow.sbp.broadcast
    if tensor is state_dict["module_pipeline"]["m_stage3.linear.weight"]:
        return flow.sbp.split(1)
    if tensor is state_dict["module_pipeline"]["m_stage3.linear.bias"]:
        return flow.sbp.broadcast
    return flow.sbp.split(0)
model_file_state_dict = flow.utils.global_view.to_global(
    state_dict, placement=model_file_placement, sbp=get_sbp, 
    ) # 运用 sbp=get_sbp 处理特别的键,也支撑指定一般的 SBP。
rank_id = flow.env.get_rank()
# 保存模型分片的途径,一个 rank 对应一个途径。
state_dict_dir = "./graph_save_load_global_" + str(rank_id)
if flow.env.get_rank() in model_file_placement.ranks:
    flow.save(
        flow.utils.global_view.to_local(model_file_state_dict),
        state_dict_dir,
    )

首要,将原模型(state_dict)转化到模型文件的 Placement 和 SBP 上,model_file_placement 为要分片保存模型的设备阵列,也便是将 state dict 按 split(0) 分片到 model_file_placement 上。

这儿之所以自定义 get_sbp 函数,是因为用户能够传进来一个(x, tensor) -> sbp的函数来处理特别 Tensor 对应不同 SBP 的需求。

举个比如(当时比如根据 Graph 模式),关于state_dict[“System-Train-TrainStep”]这种 shape 为 [1] 的 Tensor,咱们就不能按 split(0) 分片了,SBP 能够选用 broadcast。而state_dict[“module_pipeline”][“m_stage3.linear.weight”]只能在第 1 维度切分,关于state_dict[“module_pipeline”][“m_stage3.linear.bias”]这种不可切分的小 Tensor(s),SBP 能够选用 broadcast。这样支撑用户 DIY SBP 的处理,更加灵活。

在后面的处理中,运用
flow.utils.global_view.to_local() 接口得到 model_file_state_dict 的本地重量,并调用 save() 保存模型。其间,state_dict_dir 是带有设备 id 的目录,需求区分不同设备,引荐一个 rank 对应一个途径,途径名用 rank id 的方式。

加载模型

在指定设备上分片保存模型后,加载模型的代码如下:

if cur_rank in model_file_placement.ranks:
    local_state_dict = flow.load(state_dict_dir)
else:
    local_state_dict = None
global_state_dict = flow.utils.global_view.to_global(
     local_state_dict, placement=model_file_placement, sbp=get_sbp,
)
graph_model.load_state_dict(global_state_dict)

首要,用load()办法在每个保存切片的设备上加载 state dict。对应的,需求把 local rank 上的 state dict 转化到模型文件的 placement 和 sbp 上,得到了 global_state_dict。这一步和保存模型应该是对应的,SBP 和 Placement 也是共同的。

终究,global_state_dict 能够成功加载到 graph_model(nn.Graph) 中。当然,nn.Module 和 nn.Graph 处理办法是共同的。

将 state dict 加载到 nn.Module 中

除了以上两个特征外,在将 state dict 加载到 nn.Module 时,OneFlow 提供了 SBP 和 Placement 的主动转化。

鄙人面的比如中,首要结构一个 m(nn.Module)对象,再将 global_state_dict 的 SBP 设置为 split(0),而 m 的 SBP 为 broadcast。同时 placement 也放生了改变,从placement(“cpu”, ranks=[0, 1])到flow.placement(“cpu”, ranks=[0])。这时用户不需求其他操作,OneFlow 会主动做 SBP 和 placement 的转化进程。

import oneflow as flow
m = flow.nn.Linear(2,6)
model_file_placement = flow.placement("cpu", ranks=[0, 1])
state_dict = {"weight":flow.ones(3,2), "bias":flow.zeros(3)}
global_state_dict = flow.utils.global_view.to_global(
    state_dict, placement=model_file_placement, sbp=flow.sbp.split(0),
)
m.to_global(placement=flow.placement("cpu", ranks=[0]), sbp=flow.sbp.broadcast)
m.load_state_dict(global_state_dict)
print(m.state_dict())

运用 2 卡运行上面的代码,能够看到,咱们自己结构的字典中的大局张量,现已被加载到 m Module 中。此外,输出 OrderedDict 的 tensor 的 SBP 现已从 split(0) 主动转化为 broadcast,’weight’ 对应 tensor 的形状也是咱们期待的 [6, 2],’bias’ 形状为 [6]。

OrderedDict([('weight', tensor([[1., 1.],
        [1., 1.],
        [1., 1.],
        [1., 1.],
        [1., 1.],
        [1., 1.]], placement=oneflow.placement(type="cpu", ranks=[0]), sbp=(oneflow.sbp.broadcast,), dtype=oneflow.float32,
       requires_grad=True)), ('bias', tensor([0., 0., 0., 0., 0., 0.], placement=oneflow.placement(type="cpu", ranks=[0]), sbp=(oneflow.sbp.broadcast,),
       dtype=oneflow.float32, requires_grad=True))])

3

一个完好示例

上面演示了怎么分片保存和加载模型。在这一部分,提供一份完好的代码参阅,下面的比如为 4 个 ranks 上的流水并行,模拟了模型分片保存和加载的进程。

import os
import numpy as np
import oneflow as flow
model_tensor_placement = flow.placement("cuda", ranks=[0, 1, 2, 3])
# model_file_placement 为存储模型分片的设备的 placement,表示在 Rank 2 和 Rank 3 上可为 None。
model_file_placement = flow.placement("cpu", ranks=[0, 1])
P0 = flow.placement(model_tensor_placement.type, ranks=[0])
P1 = flow.placement(model_tensor_placement.type, ranks=[1])
P2 = flow.placement(model_tensor_placement.type, ranks=[2])
P3 = flow.placement(model_tensor_placement.type, ranks=[3])
def get_sbp(state_dict, tensor):
    if tensor is state_dict["System-Train-TrainStep"]:
        return flow.sbp.broadcast
    if tensor is state_dict["module_pipeline"]["m_stage3.linear.weight"]:
        return flow.sbp.split(1)
    if tensor is state_dict["module_pipeline"]["m_stage3.linear.bias"]:
        return flow.sbp.broadcast
    return flow.sbp.split(0)
class Stage0Module(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = flow.nn.Linear(16, 8)
        self.relu = flow.nn.ReLU()
    def forward(self, x):
        return self.relu(self.linear(x))
class Stage1Module(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = flow.nn.Linear(8, 4)
        self.relu = flow.nn.ReLU()
    def forward(self, x):
        return self.relu(self.linear(x))
class Stage2Module(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = flow.nn.Linear(4, 2)
        self.relu = flow.nn.ReLU()
    def forward(self, x):
        return self.relu(self.linear(x))
class Stage3Module(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = flow.nn.Linear(2, 1)
    def forward(self, x):
        return self.linear(x)
# 模拟 4 个 ranks 上的流水并行
class PipelineModule(flow.nn.Module):
    def __init__(self):
        super().__init__()
        self.m_stage0 = Stage0Module()
        self.m_stage1 = Stage1Module()
        self.m_stage2 = Stage2Module()
        self.m_stage3 = Stage3Module()
        self.m_stage0.to_global(placement=P0, sbp=flow.sbp.broadcast)
        self.m_stage1.to_global(placement=P1, sbp=flow.sbp.broadcast)
        self.m_stage2.to_global(placement=P2, sbp=flow.sbp.broadcast)
        self.m_stage3.to_global(placement=P3, sbp=flow.sbp.broadcast)
    def forward(self, x):
        out_stage0 = self.m_stage0(x)
        in_stage1 = out_stage0.to_global(placement=P1, sbp=flow.sbp.broadcast)
        out_stage1 = self.m_stage1(in_stage1)
        in_stage2 = out_stage1.to_global(placement=P2, sbp=flow.sbp.broadcast)
        out_stage2 = self.m_stage2(in_stage2)
        in_stage3 = out_stage2.to_global(placement=P3, sbp=flow.sbp.broadcast)
        out_stage3 = self.m_stage3(in_stage3)
        return out_stage3
class PipelineGraph(flow.nn.Graph):
    def __init__(self, module_pipeline):
        super().__init__()
        self.module_pipeline = module_pipeline
        self.module_pipeline.m_stage0.config.set_stage(0, P0)
        self.module_pipeline.m_stage1.config.set_stage(1, P1)
        self.module_pipeline.m_stage2.config.set_stage(2, P2)
        self.module_pipeline.m_stage3.config.set_stage(3, P3)
        self.config.set_gradient_accumulation_steps(2)
        self.add_optimizer(
            flow.optim.SGD(self.module_pipeline.parameters(), lr=0.001)
        )
    def build(self, x):
        out = self.module_pipeline(x)
        out = out.sum()
        out.backward()
        return out
def train_with_graph(call_cnt=0, state_dict_dir=None, last_state_dict=None):
    # 形状为 [2, 16] 的固定输入张量
    x = flow.tensor(
        [
            [
                0.4286,
                0.7402,
                0.4161,
                0.6103,
                0.7394,
                1.1330,
                -0.2311,
                -0.1013,
                0.8537,
                0.9757,
                -0.9842,
                0.3839,
                -0.5551,
                -0.8832,
                0.7820,
                0.7421,
            ],
            [
                -0.1581,
                -1.0319,
                1.8430,
                0.3576,
                0.7288,
                -0.6912,
                0.9966,
                1.0840,
                -1.1760,
                1.5683,
                -0.2098,
                -1.6439,
                -2.7049,
                0.1949,
                1.6377,
                0.0745,
            ],
        ],
        dtype=flow.float32,
        placement=P0,
        sbp=flow.sbp.broadcast,
    )
    module_pipeline = PipelineModule()
    graph_model = PipelineGraph(module_pipeline)
    cur_rank = flow.env.get_rank()
    if call_cnt == 1:
        if cur_rank in model_file_placement.ranks:
            local_state_dict = flow.load(state_dict_dir)
        else:
            local_state_dict = None
        # 运用 sbp=get_sbp 处理特别的键
        global_state_dict = flow.utils.global_view.to_global(
            local_state_dict, placement=model_file_placement, sbp=get_sbp,
        )
        graph_model.load_state_dict(global_state_dict)
    graph_model(x)
    state_dict = graph_model.state_dict()
    if call_cnt == 0:
        model_file_state_dict = flow.utils.global_view.to_global(
            state_dict, placement=model_file_placement, sbp=get_sbp,
        )
        if flow.env.get_rank() in model_file_placement.ranks:
            flow.save(
                flow.utils.global_view.to_local(model_file_state_dict),
                state_dict_dir,
            )
if __name__=="__main__":
    rank_id = flow.env.get_rank()
    # 保存途径,一个 rank 对应一个途径。
    state_dict_dir = "./graph_save_load_global_" + str(rank_id)
    # 保存模型
    train_with_graph(0, state_dict_dir)
    # 加载模型
    train_with_graph(1, state_dict_dir)

4

结语

本文从简略介绍大规模模型分片存储开端,终究演示了 OneFlow 的怎么做模型分片保存和加载的进程,后续 OneFlow 的大模型分片存储的接口还会不断完善。

欢迎下载体验 OneFlow v0.8.0 最新版本:
github.com/Oneflow-Inc…