PyTorch : Distributed Data Parallel 详解

作者:光火

邮箱:victor_b_zhang@163.com

Distributed Data Parallel 简称 DDP,是 PyTorch 结构下一种适用于单机多卡、多机多卡使命的数据并行方法。因为其良好的履行功率及广泛的显卡支撑,熟练掌握 DDP 已经成为深度学习从业者所必备的技术之一。本文结合详细代码,详细地说明了 DDP 在项目中的运用方法。读者依照本文所给的范例,只需稍经调试,即可完成 DDP 的整套流程。

概念辨析

  • 详细讲解 DDP 之前,我们先了解了解它和 Data Parallel (DP) 之间的区别。DP 同样是 PyTorch 常见的多 GPU 并行方法之一,且它的完成十分简洁:

    # 函数定义
    torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
    '''
    module : 模型
    device_ids : 参加训练的 GPU 列表
    output_device : 指定输出的 GPU, 一般省掉, 即默许运用索引为 0 的显卡
    '''
    # 程序模板
    device_ids = [0, 1]
    net = torch.nn.DataParallel(net, device_ids=device_ids)
    

    基本原理及固有缺点:在 Data Parallel 形式下,数据会被自动切分,加载到 GPU。同时,模型也将复制至各个 GPU 进行正向传达。在多个进程之间,会有一个进程充任 master 节点,担任收集各张显卡堆集的梯度,并据此更新参数,再一致发送至其他显卡。因而整体而言,master 节点承当了更多的核算与通讯使命,容易造成网络堵塞,影响训练速度。

    常见问题及解决方案:Data Parallel 要求模型必须在 device_ids[0] 具有参数及缓冲区,因而当卡 0 被占用时,能够在 nn.DataParallel 之前增加如下代码:

    # 依照 PIC_BUS_ID 次序自 0 开始排列 GPU 设备
    os.environ['CUDA_DEVICE_ORDER'] = 'PIC_BUS_ID'
    # 设置当时运用的 GPU 为 2、3 号设备
    os.environ['CUDA_VISIBLE_DEVICES'] = '2, 3'
    

    如此,device_ids[0] 将被默许为 2 号卡,device_ids[1] 则对应 3 号卡

  • 相较于 DP, Distributed Data Parallel 的完成要复杂得多,但是它的优势也十分显着:

    • DDP 速度更快,能够到达略低于显卡数量的加速比;
    • DDP 能够完成负载的均匀分配,克服了 DP 需要一个进程充任 master 节点的固有缺点;
    • 采用 DDP 一般能够支撑更大的 batch size,不会像 DP 那样出现其他显卡尚有余力,而卡 0 直接 out of memory 的状况;
    • 另外,在 DDP 形式下,输入到 data loaderbacth size 不再代表总数,而是每块 GPU 各自担任的 sample 数量。比方说,batch_size = 30,有两块 GPU。在 DP 形式下,每块 GPU 会担任 15 个样本。而在 DDP 形式下,每块 GPU 会各自担任 30 个样本;
  • DDP 基本原理:假使我们具有 N 张显卡,则在 Distributed Data Parallel 形式下,就会发动 N 个进程。每个进程在各自的卡上加载模型,且模型的参数完全相同。训练过程中,各个进程通过一种名为 Ring-Reduce 的方法与其他进程通讯,交换彼此的梯度,从而取得一切的梯度信息。随后,各个进程运用梯度的平均值更新参数。因为初始值和更新量完全相同,所以各个进程更新后的参数仍保持一致。

常用术语

  • rank
    • 进程号
    • 多进程上下文中,一般假定 rank = 0 为主进程或第一个进程
  • node
    • 物理节点,表明一个容器或一台机器
    • 节点内部能够包括多个 GPU
  • local_rank
    • 一个 node 中,进程的相对序号
    • local_ranknode 之间独立
  • world_size
    • 大局进程数
    • 一个分布式使命中 rank 的数量
  • group
    • 进程组
    • 一个分布式使命就对应一个进程组
    • 只有当用户创立多个进程组时,才会用到

PyTorch : Distributed Data Parallel  详解

代码完成

  • Distributed Data Parallel 能够通过 Pythontorch.distributed.launch 发动器,在命令行分布式地履行 Python 文件。履行过程中,发动器会将当时进程(其实便是 GPU)的 index 通过参数传递给 Python,而我们能够运用如下方法获取这个 index
    importargparse
    parser=argparse.ArgumentParser()
    parser.add_argument('--local_rank',default=-1,type=int,
                        metavar='N',help='Localprocessrank.')
    args=parser.parse_args()
    #print(args.local_rank)
    #local_rank表明本地进程序号
    
  • 随后,初始化进程组。关于在 GPU 履行的使命,建议挑选 nccl (由 NVIDIA 推出) 作为通讯后端。关于在 CPU 履行的使命,建议挑选 gloo (由 Facebook 推出) 作为通讯后端。假使不传入 init_method,则默许为 env://,表明自环境变量读取分布式信息
    dist.init_process_group(backend='nccl',init_method='env://')
    #初始化进程组之后,一般会履行这两行代码
    torch.cuda.set_device(args.local_rank)
    device=torch.device('cuda',args.local_rank)
    #后续的model=model.to(device),tensor.cuda(device)
    #对应的都是这里由 args.local_rank初始化得到的device
    
  • 数据部分,运用 Distributed Sampler 区分数据集,并将 sampler 传入 data loader。需要留意的是,此时在 data loader 中不能指定 shuffleTrue,不然会报错 (sampler 已具有随机打乱功用)
    dev_sampler=data.DistributedSampler(dev_data_set)
    train_sampler=data.DistributedSampler(train_data_set)
    dev_loader=data.DataLoader(dev_data_set,batch_size=dev_batch_size,
                                 shuffle=False,sampler=dev_sampler)
    train_loader=data.DataLoader(train_data_set,batch_size=train_batch_size,
                                   shuffle=False,sampler=train_sampler)
    
  • 模型部分,首要将将模型送至 device,即对应的 GPU 上,再运用 Distributed Data Parallel 包装模型(次序颠倒会报错)
    model=model.to(device)
    model=nn.parallel.DistributedDataParallel(
        model,device_ids=[args.local_rank],output_device=args.local_rank
    )
    
  • Distributed Data Parallel 形式下,保存模型应运用 net.module.state_dict(),而非 net.state_dict()。且无论是保存模型,仍是 LOGGER 打印,只对 local_rank0 的进程操作即可,因而代码中会有很多 args.local_rank == 0 的判别
    ifargs.local_rank==0:
        LOGGER.info(f'savinglatestmodel:{output_path}')
        torch.save({'model':model.module.state_dict(),
                    'optimizer':None,'epoch':epoch,'best-f1':best_f1},
                   open(os.path.join(output_path,'latest_model_{}.pth'.format(fold)),'wb'))
    
  • 运用 torch.load 加载模型时,设置 map_location=device,不然卡 0 会承当更多的开销
    load_model = torch.load(best_path, map_location=device)
    model.load_state_dict(load_model['model'])
    
  • dist.barrier() 可用于同步多个进程,建议只在必要的方位运用,如初始化 DDP 模型之前、权重更新之后、敞开新一轮 epoch 之前
  • 核算 accuracy 时,能够运用 dist.all_reduce(score, op=dist.ReduceOp.SUM),将各个进程核算的准确率求平均
  • 核算 f1-score 时,能够运用 dist.all_gather(all_prediction_list, prediction_list),将各个进程取得的预测值和真实值汇总到 all_list,再一致代入公式

发动方法

  • torch.distributed.launch
    # 此处 --nproc_per_node4 的意义是 server 有 4 张显卡
    pythontorch.distributed.launch--nproc_per_node4train.py
    # 假使运用 nohup, 则留意输入命令后 exit 当时终端
    pythontorch.distributed.launch--nproc_per_node4train.py
    
  • torchrun,推荐运用这种方法,因为 torch.distributed.launch 即将弃用
    • 代码中,只需将 Argument Parser 相关的部分替换为
      local_rank=int(os.environ['LOCAL_RANK'])
      
      然后将 args.local_rank 悉数改为 local_rank 即可
    • 发动命令
      # 单机多卡训练时, 能够不指定 nnodes
      torchrun--nnodes=1--nproc_per_node=4train.py
      # 假使运用 nohup, 则留意输入命令后 exit 当时终端
      nohuptorchrun--nnodes=1--nproc_per_node=4train.py>nohup.out&