Pytorch实现多GPU并行训练(DDP)

Pytorch实现并行训练通常有两个接口:DP(DataParallel)DDP(DistributedDataParallel)。目前DP(DataParallel)已经被Pytorch官方deprecate掉了,原因有二:1,DP(DataParallel)只支持单机多卡,无法支持多机多卡;2,DP(DataParallel)即便在单机多卡模式下效率也不及DDP(DistributedDataParallel)。我们可以看一下官方文档里的描述:

DistributedDataParallel is proven to be significantly faster than torch.nn.DataParallel for single-node multi-GPU data parallel training.

基于这两个缺点,DP(DataParallel)本文就不介绍了,即使DP(DataParallel)更好上手。本文重点讲解如何使用DDP(DistributedDataParallel)来完成并行训练。

官方文档:https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html
官方视频教程:https://pytorch.org/tutorials/beginner/ddp_series_intro.html


DDP

DDP是基于多进程来实现并行训练,每个GPU依靠独立的进程来驱动,进程之间有特殊的通信机制。先介绍几个变量:

变量名意义
rank0在节点中的编号,比如当前是第0张GPU
world_size4整个节点数量,比如一共4张GPU

我们先看一个单GPU代码,然后将其转换成DDP并行训练代码,观察变化来进行学习
单GPU版本:(实验代码来自https://github.com/pytorch/examples/tree/main/distributed/ddp-tutorial-series)

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoaderclass MyTrainDataset(Dataset):def __init__(self, size):self.size = sizeself.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_everydef _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")for source, targets in self.train_data:source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp = self.model.state_dict()PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if epoch % self.save_every == 0:self._save_checkpoint(epoch)def load_train_objs():train_set = MyTrainDataset(2048)  # load your datasetmodel = torch.nn.Linear(20, 1)  # load your modeloptimizer = torch.optim.SGD(model.parameters(), lr=1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=True)def main(device, total_epochs, save_every, batch_size):dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, device, save_every)trainer.train(total_epochs)if __name__ == "__main__":import argparseparser = argparse.ArgumentParser(description='simple distributed training job')parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')parser.add_argument('save_every', type=int, help='How often to save a snapshot')parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')args = parser.parse_args()device = 0  # shorthand for cuda:0main(device, args.total_epochs, args.save_every, args.batch_size)

直接复制上面代码,保存为single_gpu.py,然后用以下命令就可以让代码运行:

python single_gpu.py 10000 1000

这一步如果运行失败,请留言告知。如果运行成功,我们观察GPU,会发现该程序只会调用第一张GPU。我们将上述代码稍微改进,就可以得到多GPU版本了:

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoaderimport torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import osclass MyTrainDataset(Dataset):def __init__(self, size):self.size = sizeself.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]def ddp_setup(rank, world_size):"""Args:rank: Unique identifier of each processworld_size: Total number of processes"""os.environ["MASTER_ADDR"] = "localhost"os.environ["MASTER_PORT"] = "12355"init_process_group(backend="nccl", rank=rank, world_size=world_size)torch.cuda.set_device(rank)class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_everyself.model = DDP(model, device_ids=[gpu_id])def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_data.sampler.set_epoch(epoch)for source, targets in self.train_data:source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp = self.model.module.state_dict()PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)def load_train_objs():train_set = MyTrainDataset(2048)  # load your datasetmodel = torch.nn.Linear(20, 1)  # load your modeloptimizer = torch.optim.SGD(model.parameters(), lr=1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False,sampler=DistributedSampler(dataset))def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):ddp_setup(rank, world_size)dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)destroy_process_group()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser(description='simple distributed training job')parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')parser.add_argument('save_every', type=int, help='How often to save a snapshot')parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')args = parser.parse_args()world_size = torch.cuda.device_count()mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

将上述代码保存为mp_train.py,然后直接运行:

python mp_train.py 10000 1000

我们再观察GPU,会发现所有GPU均被调用。到此,实验部分已经结束。下面进入分析阶段。


从单卡变为多卡,需要进行哪些修改?

我们可以简单比对以下差别:

vimdiff single_gpu.py mp_train.py

vimdiff可以比较两个文档的差别,这是一个小trick,分享给大家。

  • 第一步:初始化
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import osdef ddp_setup(rank, world_size):"""Args:rank: Unique identifier of each processworld_size: Total number of processes"""os.environ["MASTER_ADDR"] = "localhost"os.environ["MASTER_PORT"] = "12355"init_process_group(backend="nccl", rank=rank, world_size=world_size)torch.cuda.set_device(rank)

init_process_group就是初始化群组,相当于给GPU们打了个招呼,我们要进行并行训练啦。这里的"os.environ"可以不用修改,因为只是单机多卡,不涉及多机多卡。"nccl"表示Nvidia Collective Communications Library,是一个跨GPU的通信后端类型,一般也不用修改。这里需要注意的是rankworld_size,比如咱们是4卡机,rank的取值范围是[0,1,2,3],而world_size=4。

  • 第二步:改造模型和数据加载器
    vimdiff
    vimdiff2
    左边是单卡模式,右边是多卡模式。
  • 步骤三:用多进程启动
world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

如有不解,欢迎留言讨论~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/612.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VisualStudio离线包制作

因为需要,需要制作VisualStudio离线包,之前尝试了很多的方案,均没有下载成功。今天偶然看到一个可行的方案,这里在这里分享下。 从微软官网下载VisualStudio离线包 1 下载安装文件 visualstudio官网 首先进入到官网中&#xff0…

最小二乘法的原理及实现

1.最小二乘法的原理及实现 笔记来源于《白话机器学习的数学》 1.1 最小二乘法的原理 预测一个变量 x x x与一个变量 y y y的关系 例如:广告费 x x x与点击量 y y y 用直线拟合数据 1.2 最小二乘法的实现 广告费x和点击量y,找到一条直线表达式&#x…

MySQL数据表查询

😇作者介绍:一个有梦想、有理想、有目标的,且渴望能够学有所成的追梦人。 🎆学习格言:不读书的人,思想就会停止。——狄德罗 ⛪️个人主页:进入博主主页 🗼专栏系列:进入MySQL知识专…

每天一点Python——day43

#第四十三天字典的视图操作: ①keys()获取字典中所有的键 ②values()获取字典中所有的值 ③items()获取字典中所有的键值对#如图: #例:获取所有的键 a{哥哥:18,妹妹:16,姐姐:17}#字典创立 ba.keys()#获取后我们存在变量b中,右边的…

Vision Pro中VR游戏空间边界为3×3米圆形

6月25日青亭网报道,此前我们已经报道了苹果visionOS有三种应用类型:FullSpaces、Windows、Volume。其中FullSpaces是仅显示一款应用的类型,后两种为共享空间可多窗口、多应用显示。 在FullSpaces模式下,苹果的一份文档显示visionO…

Linux下 MHA故障切换 主从角色提升

目录 所有主机共同操作 manger操作 其他四台安装 搭建主从复制环境 nobe slave1 配置 slave2 slave3 配置 配置MHA环境 简述MHA: MHA(Master High Availability)目前在MySQL高可用方面是一个相对成熟的解决方案,它由…

深入理解深度学习——Transformer:编码器(Encoder)部分

分类目录:《深入理解深度学习》总目录 相关文章: 注意力机制(AttentionMechanism):基础知识 注意力机制(AttentionMechanism):注意力汇聚与Nadaraya-Watson核回归 注意力机制&#…

【Logback技术专题】「入门到精通系列教程」深入探索Logback日志框架的原理分析和开发实战技术指南(下篇)

深入探索Logback日志框架的原理分析和开发实战技术指南&#xff08;下篇&#xff09; 根节点configuration包含的属性基本参数详解子节点介绍设置上下文名称<contextName>使用案例 设置变量属性值<property>获取时间戳字符串<timestamp>设置loggerroot根节点…

Jenkins 发送文件到远程服务器:Publish Over SSH 插件

Jenkins 发送文件到远程服务器&#xff1a;Publish Over SSH 插件 文章目录 Jenkins 发送文件到远程服务器&#xff1a;Publish Over SSH 插件一、Publish Over SSH 插件1、概述2、主要功能和特点3、插件主页4、安装 Publish Over SSH 插件5、配置远程主机 二、发送文件到远程主…

文本向量化

文章目录 文本的tokenization向量化1.one-hot编码2.word embedding3.API 文本的tokenization tokenization就是通常说的分词&#xff0c;分出的每一个词我们把它称为token。 常见的分词工具有很多&#xff0c;比如&#xff1a; jieba分词清华大学的分词工具THULAC 中文分词…

java版鸿鹄工程项目管理系统 Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统源代码

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

消息队列使用场景介绍

消息队列中间件是分布式系统中重要的组件&#xff0c;主要解决应用耦合&#xff0c;异步消息&#xff0c;流量削锋等问题 实现高性能&#xff0c;高可用&#xff0c;可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ&#xff0c;RabbitMQ&#xff0c;ZeroMQ&#xff0c;K…