多机训练时的环境变量
通过设置环境变量配置分布式训练,仅仅是为了在交互式 Python 环境下,方便查看实验效果。如果不是学习、试验目的,而是生产需求,可以直接通过 oneflow.distributed.launch 启动分布式训练,该模块内部根据命令行参数,自动设置了必要的环境变量。
1)MASTER_ADDR:多机训练的第0号机器的 IP。
2)MASTER_PORT:多机训练的第0号机器的监听端口,不与已经占用的端口冲突即可。
3)WORLD_SIZE:整个集群中计算设备的数目,因为目前还不支持各个机器上显卡数目不一致,因此 WORLD_SIZE 的数目实际上是机器数目每台机器上的显卡数目,机器数目每台机器上的显卡数目机器数目×每台机器上的显卡数目。如这个例子中,是单机2卡的情况,因此 WORLD_SIZE=2。
RANK 和 LOCAL_RANK 都是对计算设备的编号,不同的是 RANK 是全局视角的编号,LOCAL_RANK 某个特定机器上的局部视角的编号。当是单机训练(单机单卡或单机多卡)时,两者是没有区别的。以上的例子中,有两个显卡,分别是0号和1号。
当多机训练时,每台机器上的 LOCAL_RANK 的上限,就是每台机器上的计算设备的数目;RANK 的上限,就是所有机器上所有计算设备的总和,它们的编号均从0开始。(因为编号从0开始,所以不包含上限)
以两台机器、每台机器上有两张显卡为例,可以整理出每张显卡的 LOCAL_RANK 与 RANK 对应情况,见表5-1。
表5-1 整理出每张显卡的 LOCAL_RANK 与 RANK 对应情况
|
RANK |
LOCAL_RANK |
机器0的第0张显卡 |
0 |
0 |
机器0的第1张显卡 |
1 |
1 |
机器1的第0张显卡 |
2 |
0 |
机器1的第1张显卡 |
3 |
1 |
1. Boxing(自动转换 SBP)
已经通过以上代码的例子,知道一个算子会根据输入张量的 SBP 属性以及算子内置的 SBP签名,自动设置输出张量的SBP。
如果上游算子输出张量的 SBP,与下游算子输入的需要不一致时,怎么办呢?
比如,假设在模型并行中,有2层矩阵乘法,在第一层和和第二层都做模型并行。如图5-11所示。
图5-11假设在模型并行中,有2层矩阵乘法,在第一层和和第二层都做模型并行
因为第一层的输出的 SBP(split(1)),并不是第二层输入所期待的(broadcast),这时候,OneFlow 会自动在上一层的输出和下一层的输出之间,插入对抗操作,利用集合通信进行必要的数据转换。
从 split(1) 转换为广播,相当于做了一次 AllGather 操作。如图5-12所示。
图5-12 从 split(1) 转换为广播,相当于做了一次 AllGather 操作
因为有对抗机制的存在,使得用户只用关心少数关键地方(如源算子)的 SBP 设置,剩下的全部都可以交给 OneFlow 框架。
2. 2D SBP
介绍了集群的全局视角和全局张量 后,相信已经掌握了 SBP 和 SBP签名的基本概念,并且能够上手相关的编程任务。实际上,以上资料中涉及都是 1D SBP。
现在介绍 2D SBP,它能够更灵活地应对更复杂的分布式训练场景。
3. 2D 设备阵列
已经熟悉 1D SBP 的位置配置,在 1D SBP 的场景下,通过 oneflow.placement 接口配置集群,比如使用集群中的第 0~3 号 GPU 显卡:
>>> placement1 = flow.placement("cuda", ranks=[0, 1, 2, 3])
以上的 cuda指定了设备类型,ranks=[0, 1, 2, 3] 指定了集群中的计算设备。其实,ranks 不仅可以是一维的int list,还可以是多维的int数组:
placement2 = flow.placement("cuda", ranks=[[0, 1], [2, 3]])
当 ranks 是 ranks=[0, 1, 2, 3] 这种一维列表的形式时,集群中的所有设备组成了一个 1D 设备向量,这也是 1D SBP 名称的由来。
当 ranks 是多维数组的形式时,集群中的设备被分组为一个多维的设备阵列。ranks=[[0, 1], [2, 3]] 表示集群中的四个计算设备被划分为了 2×2 的设备阵列。
2D SBP
构造全局张量时,需要同时指定位置与 SBP。当位置中的集群是 2 维的设备阵列时;SBP 也必须与之对应,是一个长度为 2 的元祖,这个元祖中的第 0 个、第 1 个 元素,分别描述了全局张量张量在设备阵列第 0 维、第 1 维的分布。
以下代码,配置了 2×2 的设备阵列,并且设置 2D SBP 为 (broadcast, split(0))。
>>> a = flow.Tensor([[1,2],[3,4]])
>>> placement = flow.placement("cuda", ranks=[[0, 1], [2, 3]])
>>> sbp = (flow.sbp.broadcast, flow.sbp.split(0))
>>> a_to_global = a.to_global(placement=placement, sbp=sbp)
逻辑上的数据,在整个设备阵列上,在第 0 维度(竖着看)做 广播;在第 1 维度(横着看)做 split(0)。如图5-13所示。
图5-13 在第 0 维度(竖着看)做 广播;在第 1 维度(横着看)做 split(0)
此图的最左边是全局视角的数据,最右边是设备阵列上各个设备的数据。可以看到,从第 0 维的角度看,它们都是 broadcast 的关系:
1)(group0, device0) 与 (group1, device0) 中数据一致,互为 broadcast 关系
2)(group0, device1) 与 (group1, device1) 中数据一致,互为 broadcast 关系
从第 1 维的角度看,它们都是 split(0) 的关系:
1)(group0, device0) 与 (group0, device1) 互为 split(0) 关系。
2)(group1, device0) 与 (group1, device1) 互为 split(0) 关系。
直接理解逻辑数据和最终的设备阵列中的物理数据,对应关系可能有一定难度,在思考 2D SBP 时,可以假想一个中间状态(图5-13中灰色部分),以 (broadcast, split(0)) 为例:
1)原始逻辑张量,先经过广播,广播到 2 个 group 上,得到中间的状态。
2)在中间状态的基础上,继续在各自的 group 上,做 split(0),得到最终设备阵列中各个物理张量的状态。
4. 2D SBP签名
类似 1D SBP 有 SBP签名的概念,算子也有 2D SBP签名,在掌握了 1D SBP 及其 签名的基础上,2D SBP签名非常简单,只需要遵循一条原则:在各自的维度上独立推导即可。
以矩阵乘法为例,先回顾 1D SBP 的情况,假定有 𝑥×𝑤=𝑦 有以下的 SBP签名:
𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡×𝑠𝑝𝑙𝑖𝑡(1)=𝑠𝑝𝑙𝑖𝑡(1)
以及
𝑠𝑝𝑙𝑖𝑡(0)×𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡=𝑠𝑝𝑙𝑖𝑡(0)
假定给 𝑥 设置了 2D SBP 为:(𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡,𝑠𝑝𝑙𝑖𝑡(0)),给 𝑤 设置 2D SBP 为 (𝑠𝑝𝑙𝑖𝑡(1),𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡),那么,在 2D SBP 的背景下, 𝑥×𝑤=𝑦 运算,得到 𝑦 的 SBP 属性为 (𝑠𝑝𝑙𝑖𝑡(1),𝑠𝑝𝑙𝑖𝑡(0))。
也就是说,以下几个 2D SBP,构成矩阵乘法的 2D SBP签名:
(𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡,𝑠𝑝𝑙𝑖𝑡(0))×(𝑠𝑝𝑙𝑖𝑡(1),𝑏𝑟𝑜𝑎𝑑𝑐𝑎𝑠𝑡)=(𝑠𝑝𝑙𝑖𝑡(1),𝑠𝑝𝑙𝑖𝑡(0))
5. 2D SBP 使用示例
将通过一个简单的例子,演示如何使用 2D SBP 进行分布式训练。假设有一个 2×2 的设备阵列,如果没有多个 GPU 设备,将使用 CPU 来模拟 2×2 设备阵列的情形,对输入张量采用上文图中 (broadcast, split(0)) 的并行策略。
首先,导入依赖:
import oneflow as flow
import oneflow.nn as nn
然后,定义要使用到的 placement 和 sbp:
PLACEMENT = flow.placement("cpu", [[0, 1], [2, 3]])
BROADCAST = (flow.sbp.broadcast, flow.sbp.broadcast)
BS0 = (flow.sbp.broadcast, flow.sbp.split(0))
PLACEMENT 的 ranks 参数是一个二维列表,代表将集群中的设备划分成 2×2 的设备阵列。SBP 需要与其对应,指定为长度为 2 的 tuple。其中,BROADCAST 表示在设备阵列的第 0 维和第 1 维都进行广播,BS0 的含义与前文的描述相同。
假设有以下模型:
model = nn.Sequential(nn.Linear(8, 4),
nn.ReLU(),
nn.Linear(4, 2))
将模型在集群上广播:
model = model.to_global(placement=PLACEMENT, sbp=BROADCAST)
然后构造数据并进行前向推理:
x = flow.randn(1, 2, 8)
global_x = x.to_global(placement=PLACEMENT, sbp=BS0)
pred = model(global_x)
创建了一个形状为 (1, 2, 8) 的局部张量,然后通过 Tensor.to_global 方法,获取对应的 全局张量,最后将其输入到模型中进行推理。
通过 Tensor.to_local 方法,获取当前物理设备上的局部张量后,可以通过输出其形状和值来验证数据是否被正确处理:
local_x = global_x.to_local()
print(f'{local_x.device}, {local_x.shape}, \n{local_x}')
输出结果为:
cpu:2, oneflow.Size([1, 2, 8]),
tensor([[[ 0.6068, 0.1986, -0.6363, -0.5572, -0.2388, 1.1607, -0.7186, 1.2161],
[-0.1632, -1.5293, -0.6637, -1.0219, 0.1464, 1.1574, -0.0811, -1.6568]]], dtype=oneflow.float32)
cpu:3, oneflow.Size([1, 2, 8]),
tensor([[[-0.7676, 0.4519, -0.8810, 0.5648, 1.5428, 0.5752, 0.2466, -0.7708],
[-1.2131, 1.4590, 0.2749, 0.8824, -0.8286, 0.9989, 0.5599, -0.5099]]], dtype=oneflow.float32)
cpu:1, oneflow.Size([1, 2, 8]),
tensor([[[-0.7676, 0.4519, -0.8810, 0.5648, 1.5428, 0.5752, 0.2466, -0.7708],
[-1.2131, 1.4590, 0.2749, 0.8824, -0.8286, 0.9989, 0.5599, -0.5099]]], dtype=oneflow.float32)
cpu:0, oneflow.Size([1, 2, 8]),
tensor([[[ 0.6068, 0.1986, -0.6363, -0.5572, -0.2388, 1.1607, -0.7186, 1.2161],
[-0.1632, -1.5293, -0.6637, -1.0219, 0.1464, 1.1574, -0.0811, -1.6568]]], dtype=oneflow.float32)
通过比较这些不同设备上局部张量,符合上文图中描述的状态,证明数据已被正确分布到各个设备上。
需要注意的是,不能直接通过 python xxx.py 的方式执行上述代码,而需要通过 oneflow.distributed.launch 启动。此模块可以方便地启动分布式训练,在终端中执行下列命令(假设上述代码已经保存至当前目录中的名为2d_sbp.py的文件中):
python3 -m oneflow.distributed.launch --nproc_per_node=4 2d_sbp.py
通过将参数 nproc_per_node 指定为 4 来创建 4 个进程,模拟共有 4 个 GPU 的情形。
完整代码如下:
PLACEMENT = flow.placement("cpu", [[0, 1], [2, 3]])
BROADCAST = (flow.sbp.broadcast, flow.sbp.broadcast)
BS0 = (flow.sbp.broadcast, flow.sbp.split(0))
model = nn.Sequential(nn.Linear(8, 4),
nn.ReLU(),
nn.Linear(4, 2))
model = model.to_global(placement=PLACEMENT, sbp=BROADCAST)
x = flow.randn(1, 2, 8)
global_x = x.to_global(placement=PLACEMENT, sbp=BS0)
pred = model(global_x)
local_x = global_x.to_local()
print(f'{local_x.device}, {local_x.shape}, \n{local_x}')
6. 用 launch 模块启动分布式训练
OneFlow 提供了 oneflow.distributed.launch 模块,帮助用户更方便地启动分布式训练。
用户可以借助以下的形式,启动分布式训练:
python3 -m oneflow.distributed.launch [启动选项] train script.py
比如,启动单机两卡的训练:
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./script.py
再比如,启动两台机器,每台机器有两张显卡的训练。
在0号机器上运行:
python3 -m oneflow.distributed.launch \
--nnodes=2 \
--node_rank=0 \
--nproc_per_node=2 \
--master_addr="192.168.1.1" \
--master_port=7788 \
script.py
在1号机器上运行:
python3 -m oneflow.distributed.launch \
--nnodes=2 \
--node_rank=1 \
--nproc_per_node=2 \
--master_addr="192.168.1.1" \
--master_port=7788 \
script.py
7. 常见选项说明
通过 python3 -m oneflow.distributed.launch -h, 可以查看 launch 模块的选项说明,以下是部分常见选项。
1)--nnodes:机器的数目(number of nodes)
2)--node_rank: 机器的编号,从0开始
3)--nproc_per_node:每台机器上要启动的进程数目(number of processes per node),推荐与 GPU 数目一致
4)--logdir:子进程日志的相对存储路径
8. launch 模块与并行策略的关系
注意 oneflow.distributed.launch 的主要作用,是待用户完成分布式程序后,让用户可以更方便地启动分布式训练。它省去了配置集群中环境变量的繁琐。
但是 oneflow.distributed.launch 并不决定并行策略,并行策略是由设置数据、模型的分发方式、在物理设备上的放置位置决定的。
OneFlow 提供的 全局视角和全局张量,可以灵活地配置并行策略。并且针对数据并行,OneFlow 提供了 DistributedDataParallel 模块,可以在极少修改代码的前提下,将单机单卡的脚本改为数据并行的脚本。
5.2.4 数据并行训练
在 OneFlow中,提供了两种做数据并行的方式。
一种是使用 OneFlow 的原生的 SBP 概念,通过设置全局张量,进行数据并行训练,这也是用 OneFlow 做数据并行训练的推荐方式 。
此外,为了方便从 PyTorch 迁移到 OneFlow 的用户,OneFlow 提供了与 torch.nn.parallel.DistributedDataParallel 对齐一致的接口 oneflow.nn.parallel.DistributedDataParallel,它也能让用户方便地从单机训练脚本,扩展为数据并行训练。
1. 通过设置 SBP 做数据并行训练
以下代码,是通过配置设置全局张量,完成数据并行训练。
import oneflow as flow
import oneflow.nn as nn
import flowvision
import flowvision.transforms as transforms
BATCH_SIZE=64
EPOCH_NUM = 1
PLACEMENT = flow.placement("cuda", [0,1])
S0 = flow.sbp.split(0)
B = flow.sbp.broadcast
DEVICE = "cuda" if flow.cuda.is_available() else "cpu"
print("Using {} device".format(DEVICE))
training_data = flowvision.datasets.CIFAR10(
root="data",
train=True,
transform=transforms.ToTensor(),
download=True,
)
train_dataloader = flow.utils.data.DataLoader(
training_data, BATCH_SIZE, shuffle=True
)
model = flowvision.models.mobilenet_v2().to(DEVICE)
model.classifer = nn.Sequential(nn.Dropout(0.2), nn.Linear(model.last_channel, 10))
model = model.to_global(placement=PLACEMENT, sbp=B)
loss_fn = nn.CrossEntropyLoss().to(DEVICE)
optimizer = flow.optim.SGD(model.parameters(), lr=1e-3)
for t in range(EPOCH_NUM):
print(f"Epoch {t+1}\n-------------------------------")
size = len(train_dataloader.dataset)
for batch, (x, y) in enumerate(train_dataloader):
x = x.to_global(placement=PLACEMENT, sbp=S0)
y = y.to_global(placement=PLACEMENT, sbp=S0)
# Compute prediction error
pred = model(x)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
current = batch * BATCH_SIZE
if batch % 5 == 0:
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
这个脚本与单机单卡的训练脚本几乎是一样的。除了几行与全局张量有关的配置代码外,少数的区别是:
1)设置 placement,让训练放置在集群第 0号、1号 GPU 上:
PLACEMENT = flow.placement("cuda", [0,1])
2)模型在集群上做广播
model = model.to_global(placement=PLACEMENT, sbp=B)
3)数据在集群上按 split(0) 做切分:
x = x.to_global(placement=PLACEMENT, sbp=S0)
y = y.to_global(placement=PLACEMENT, sbp=S0)
这样,按照 常见的分布式并行策略 中的介绍,就通过对数据进行 split(0) 切分,对模型进行广播,进行了分布式数据并行训练。
使用 DistributedDataParallel 做数据并行训练
可以用以下命令快速体验 oneflow.nn.parallel.DistributedDataParallel 做数据并行:
wget https://docs.oneflow.org/master/code/parallelism/ddp_train.py #下载脚本
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py #数据并行训练
输出:
50/500 loss:0.004111831542104483
50/500 loss:0.00025336415274068713
...
500/500 loss:6.184563972055912e-11
500/500 loss:4.547473508864641e-12
w:tensor([[2.0000],
[3.0000]], device='cuda:1', dtype=oneflow.float32,
grad_fn=<accumulate_grad>)
w:tensor([[2.0000],
[3.0000]], device='cuda:0', dtype=oneflow.float32,
grad_fn=<accumulate_grad>)
点击以下 “Code” 可以展开以上运行脚本的代码。
import oneflow as flow
from oneflow.nn.parallel import DistributedDataParallel as ddp
train_x = [
flow.tensor([[1, 2], [2, 3]], dtype=flow.float32),
flow.tensor([[4, 6], [3, 1]], dtype=flow.float32),
]
train_y = [
flow.tensor([[8], [13]], dtype=flow.float32),
flow.tensor([[26], [9]], dtype=flow.float32),
]
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
self.lr = 0.01
self.iter_count = 500
self.w = flow.nn.Parameter(flow.tensor([[0], [0]], dtype=flow.float32))
def forward(self, x):
x = flow.matmul(x, self.w)
return x
m = Model().to("cuda")
m = ddp(m)
loss = flow.nn.MSELoss(reduction="sum")
optimizer = flow.optim.SGD(m.parameters(), m.lr)
for i in range(0, m.iter_count):
rank = flow.env.get_rank()
x = train_x[rank].to("cuda")
y = train_y[rank].to("cuda")
y_pred = m(x)
l = loss(y_pred, y)
if (i + 1) % 50 == 0:
print(f"{i+1}/{m.iter_count} loss:{l}")
optimizer.zero_grad()
l.backward()
optimizer.step()
print(f"\nw:{m.w}")
可以发现,它与单机单卡脚本,只有2个不同:
1)使用 DistributedDataParallel 处理一下 module 对象(m = ddp(m))。
2)使用 get_rank 获取当前设备编号,并针对设备分发数据。
然后使用 launcher 启动脚本,把剩下的一切都交给OneFlow,进行分布式训练,像单机单卡训练一样简单:
python3 -m oneflow.distributed.launch --nproc_per_node 2 ./ddp_train.py
DistributedSampler
为了简化问题,突出 DistributedDataParallel,因此使用的数据是手工分发的。在实际应用中,可以直接使用 DistributedSampler 配合数据并行使用。
DistributedSampler 会在每个进程中实例化 Dataloader,每个Dataloader实例会加载完整数据的一部分,自动完成数据的分发。