PyTorch训练中的分布式与多进程并行机制
PyTorch训练中的多进程并行机制
基本概念:进程(processing)与线程(thread)
概念说明:进程是资源分配的基本单元,而线程是CPU任务调度的最小单元。
一个进程由至少一个线程组成,线程是比进程更加细粒度、轻量的概念。进程内的线程可以独立工作完成子任务,而且由于进程是资源分配的单元,因此进程内的线程可以共享很多资源,比如地址空间和数据,而进程之间通常不能共享。线程的少开销与共享数据提供了便利,但是也带来了一个重要的问题:如果多个线程共同访问某个资源,那么可能会产生线程不安全的问题(比如两个线程同时对一个位置的变量进行自增,由于计算需要时间,线程1取到数值后,还没来得及更新数值,原来的数值就被线程2取到,导致线程2处理的还是旧数据,两个线程的共同效果并不是预期的两者叠加,造成结果的不可预期),因此线程不利于资源的保护。
线程的这种缺陷引入了新的技术:线程同步与线程安全。最直观的想法就是加锁🔐(互斥锁),即当一个线程用到某个资源时,禁止其他线程再用(阻塞),这样就避免了结果不确定。
Parameter Server与Ring-allreduce
Parameter Server架构(PS架构)和Ring-allreduce是分布式训练的两种不同架构。
PS架构的基本流程:各个node分为两类:server和worker,各个worker分别计算结果,发送给server进行汇总,然后再分发给各个worker。对于训练模型来说,就是各个worker计算梯度,然后发送到server进行平均,然后更新模型参数,最后将得到的新参数再广播到各个worker。这个架构的问题很明显,那就是各个worker都完成后才能更新参数,这样不能最优地利用带宽。
Ring-allreduce的基本流程:各个node形成一个环路,每个node计算出各自的梯度后,将对应指定的一个梯度参数块向下游传递,并且接受上游传来的与前一个node对应的参数块,全部完成后,每个node对应的参数块都已经获取了全部node的更新,因此是最终版。各个node将自己最终版的参数传到其他node,即可将所有的node更新成相同的参数值,并且其中各个参数块都是最新版。最后,各个node即可用同样的梯度进行模型参数更新。
下面通过一个简单的示例说明Ring-allreduce具体细节:TODO
模型并行与数据并行
模型并行:将网络的不同层或者不同参数放到不同的卡上计算;
数据并行:模型拷贝到各个卡上,将数据分割成不同的子集,各自送到对应卡上计算。
模型并行由于模型本身计算的通常有串行性质,因此存在前后依赖。而数据并行相对简单,因为数据sample可以较好的分开,最后只需要各个模型同步一下更新参数即可(可以同步更新or异步更新)。
分布式训练方式:DataParallel和DistributedDataParallel
DataParallel
DP的操作模式是模型复制,再加上数据分散,每个前向过程需要重新同步复制模型。优点:代码修改简单,只需要用DataParallel将模型包一下:model = nn.DataParallel(model)
即可,可以视为从单机单卡的demo最小成本迁移到单机多卡上的策略。缺点:性能问题,DP是单进程多线程(考虑到Python多线程GIL (Global Interpreter Lock,全局解释器锁) 的问题,实际上DP不能最好地利用多卡资源)。
DistributedDataParallel
DDP是多进程并行(对比DP多线程并行),所以模型副本之间没有GIL问题,因此效率通常更高。其启动和运行方式与DP不同。主要步骤:
定义环境变量MASTER_ADDR和MASTER_PORT
设置world size,并多线程启动基础训练脚本(什么是world size:由于DDP可以支持多机/node多卡/GPU,如果我们有N个node和G块GPU卡,那么在某一时间运行在所有node上的总进程数就是world size,对应地,运行在某个node上的就是local world size;根据world size,会为每个GPU分配一个rank id,global rank对应于上面的world size,rank对应于local world size,后面我们会看到如何通过rank来识别各个进程并完成对应操作)
dist.init_process_group
启动一个进程组DDP封装模型,并开始训练过程。其中需要注意data_sampler和sync_bn等相关问题。
DDP训练模型的内部过程大致如下:
首先,通过初始化得到一个process group,后续各种同步都在该group内进行;
DDP constructor将rank 0的信息广播到各个进程以保证各个模型副本状态一致,于是每个进程得到一份相同的模型;
每个进程创建一个Reducer,用于backward的时候同步梯度;
Forward:local数据送入local 模型进行前向传播,计算预测结果;
Backward:根据预测结果计算loss,并进行反向传播backward。在反向传播过程中,利用autograd hook实现进程间的同步,具体实现方式:当摸个grad计算完成后,它将被标记为可以reduce(实际上由于DDP将parameters组织成bucket的形式,bucket是reduce的基本单元,因此需要等一个bucket内的所有grad都就绪之后,才会被reduce)。Reducer启动一个异步的allreduce操作对grad进行平均,当allreduce结束时,所有的模型副本的参数的grad都已经相同(即reduce后的均值),因此可以直接进行optimize操作。考虑到初始状态相同,backward后梯度也相同,因此optimize与直接单卡模型的优化过程一致。
init_process_group代码示例:
1 | torch.distributed.init_process_group(backend=None, |
其中,backend通常为gloo或nccl,world_size是参与该任务的进程数,rank是当前进程的rank。
启动DDP的方式:torch.distributed.launch
launch DDP的脚本示例:
单机多卡(single node multi process)
1 | python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE |
多机多卡(multi node multi process),以两个node为例,两个机器上需要分别启动任务,脚本分别如下:
1 | # node 1: |
并行进程的手动同步:torch.distributed.barrier()
dist.barrier()
的功能是实现进程间的同步(synchronize all processes)。其具体实现方式是:当每个进程都走到barrier时,才会继续往后走。也就是说,如果某个或者部分进程走到barrier,那么它/它们将在此等待,直到所有的进程都进入barrier,在一起开启后续流程。这个函数通常用在base process需要load数据或者collect结果的情况(处理时间较长)。此时我们可以将通过barrier将其他进程阻塞,然后让某个rank工作,等它处理完后,让它也进入barrier,此时所有进程都将继续向前进行。一个示例(python - How does torch.distributed.barrier() work - Stack Overflow):
1 | if args.local_rank not in [-1, 0]: |
数据读取的并行化策略:DistributedSampler的原理和用法
DistributedSampler 用于sample数据集的一个子集,并且子集之间不交叉重叠。对于DDP并行训练的情况,每个进程都有一个DistributedSampler,用于sample数据并传到DataLoader。
示例代码:
1 | sampler = DistributedSampler(dataset) if is_distributed else None |
reference
Distributed Data Parallel — PyTorch 2.2 documentation
PyTorch Distributed Overview — PyTorch Tutorials 2.2.0+cu121 documentation
Writing Distributed Applications with PyTorch — PyTorch Tutorials 2.2.0+cu121 documentation
https://github.com/pytorch/examples/blob/main/distributed/ddp/README.md
Distributed communication package - torch.distributed — PyTorch 2.2 documentation