本文章涉及的Megatron-llm的XMind思维导图源文件和PDF文件,可在网盘下载: https://pan.baidu.com/s/1xRZD-IP95y7-4Fn0C_VJMg 提取码: qxff
一、引言
Megatron-Core 是一个基于 PyTorch 的开源库,专为在 NVIDIA GPU 上高效训练大型语言模型(LLMs)而设计。它提供了一系列 GPU 优化的训练技术和系统级优化,旨在解决大规模模型训练中的内存和计算挑战。本文将从代码结构和关键技术点两个方面深入分析 Megatron-Core。
二、代码结构分析
Megatron-LLM 的代码结构高度模块化,主要分为以下几个部分:
- megatron/core/
- distributed/: 包含分布式训练的核心代码,支持梯度累加、通信优化等功能。
- module/: 定义 Transformer 模型的基本构建块,如 Transformer 层、注意力机制等。
- optim/: 实现优化的算法和工具,如分布式优化器。
- partition/: 与模型并行相关的代码,支持张量并行、序列并行等。
- schedules/: 学习率调度等训练计划相关的代码。
- utils/: 提供一系列实用工具函数,如日志记录、打印等。
- megatron/tasks/
- 定义具体的模型架构,如 BERT、GPT 等,这些模型通过组合 megatron/core/module/ 中的基本构建块实现。
- megatron/training/
- 包含训练过程的控制逻辑,如模型初始化、训练循环、评估等。
三、关键技术点分析
- 并行技术
- 数据并行(Data Parallelism):是最基本的并行方式,通过将数据分批次分配到不同的 GPU 上进行处理来实现。Megatron-Core 通过 PyTorch 的分布式数据并行功能来实现。
- 模型并行(Model Parallelism):针对模型的不同部分进行并行处理,适用于单个 GPU 无法容纳整个模型的情况。Megatron-Core 支持多种模型并行策略,包括:
- 张量并行(Tensor Parallelism):将模型的不同部分(如不同的 Transformer 层)分配到不同的 GPU 上。
- 序列并行(Sequence Parallelism):将输入序列分割成多个片段,每个片段由一个 GPU 处理。
- 流水线并行(Pipeline Parallelism):将模型分割成多个阶段,每个阶段由一个或多个 GPU 处理,数据通过流水线方式传递。
- 通信优化
- Megatron-Core 通过重叠通信和计算来减少训练过程中的等待时间。例如,可以在后向传播过程中重叠梯度的通信和累积。
- 使用高效的通信库(如 NCCL)来优化 GPU 之间的数据传输。
- 激活重新计算(Activation Recomputation)
- 为了减少 GPU 内存的使用,Megatron-Core 支持在训练过程中只保存必要的激活值,并在需要时重新计算其他激活值。这种方式可以在不增加显著计算开销的情况下,显著减少内存占用。
- 分布式优化器(Distributed Optimizer)
- Megatron-Core 实现了分布式优化器,通过将优化器状态均匀分布到数据并行的多个 GPU 上,来减少每个 GPU 的内存占用。这对于训练具有数十亿甚至数万亿参数的模型至关重要。
- 高级 API 和模块化设计
- Megatron-Core 提供了一组可组合和模块化的 API,允许研究人员和开发者轻松地定义和训练自定义的 Transformer 模型。这种设计使得 Megatron-Core 能够适应不同的训练场景和需求
四、Core目录源码
Megatron-Core 通过一系列 GPU 优化技术和系统级优化,为在 NVIDIA GPU 上高效训练大型语言模型提供了强大的支持。其模块化的设计和灵活的 API 使得 Megatron-Core 能够适应各种训练场景和需求。通过深入研究 Megatron-Core 的代码和关键技术点,我们可以更好地理解如何在大规模计算环境中高效地训练语言模型。
core主要包含datasets、models、transformer、fusion、distributed、tensor_parallel、pipline_parallel、inference子目录,我们分为数据集、模型结构、并行策略和推理四个模块来解读。
4.1 数据集构造
Megatron在datasets层面抽象了两个关键概念:MegatronTokenizer 和 MegatronDataset:
- 前者负责将文本 string 与 int 互相映射,实现tokenizer和detokenizer的功能;
- 后者负责对数据集文本进行各种策略的切分、缓存、分布式派发等逻辑;
4.1.1 MegatonTokenizer
从源码上来看,所有派生的上层Tokenizer子类都必须实现如下4个函数,值得注意的是 detokenizer 函数并不是抽象接口,可选实现。
class MegatronTokenizer(ABC):"""Abstract class for tokenizer"""@abstractmethoddef tokenize(self, text: str) -> numpy.ndarray:pass@property@abstractmethoddef vocab(self):"""Dictionary from vocab text token to id token"""pass@property@abstractmethoddef inv_vocab(self):"""Dictionary from vocab id token to text token"""pass@property@abstractmethoddef vocab_size(self):"""The vocabulary size"""pass
在trainnig目录定义了各种常见模型所属的Tokenizer,针对不同的数据集和训练需求,定制化实现了tokenizer等逻辑。在训练逻辑中,Tokenizer的创建是全权交给 build_tokenizer来做的,通过用户的config来dispatch 实例化,这对于用户是非常轻便友好的。用户也可以扩展更多的模型级别的Tokenizer,只需要通过config选项暴露即可。
我们可以阅读 builder_tokenizer源码即可了解 tokenizer的派发逻辑,这里有很多if-else,实现上并不是很优雅。
def build_tokenizer(args, **kwargs):"""Initialize tokenizer."""if args.rank == 0:print('> building {} tokenizer ...'.format(args.tokenizer_type),flush=True)# Select and instantiate the tokenizer.if args.tokenizer_type == 'BertWordPieceLowerCase':assert args.vocab_file is not Nonetokenizer = _BertWordPieceTokenizer(vocab_file=args.vocab_file,lower_case=True,vocab_extra_ids=args.vocab_extra_ids)elif args.tokenizer_type == 'BertWordPieceCase':assert args.vocab_file is not Nonetokenizer = _BertWordPieceTokenizer(vocab_file=args.vocab_file,lower_case=False,vocab_extra_ids=args.vocab_extra_ids)elif args.tokenizer_type == 'GPT2BPETokenizer':assert args.vocab_file is not Noneassert args.merge_file is not Nonetokenizer = _GPT2BPETokenizer(args.vocab_file, args.merge_file)elif args.tokenizer_type == 'SentencePieceTokenizer':assert args.tokenizer_model is not Nonetokenizer = _SentencePieceTokenizer(args.tokenizer_model, vocab_extra_ids=args.vocab_extra_ids)elif args.tokenizer_type == 'GPTSentencePieceTokenizer':assert args.tokenizer_model is not Nonetokenizer = _GPTSentencePieceTokenizer(args.tokenizer_model)elif args.tokenizer_type == 'HuggingFaceTokenizer':tokenizer = _HuggingFaceTokenizer(args.tokenizer_model, **kwargs)elif args.tokenizer_type == 'Llama2Tokenizer':assert args.tokenizer_model is not Nonetokenizer = _Llama2Tokenizer(args.tokenizer_model)elif args.tokenizer_type == 'TikTokenizer':assert args.tokenizer_model is not Noneassert args.tiktoken_pattern is not Noneassert args.tiktoken_pattern in {"v1", "v2"}pattern = PATTERN_TIKTOKEN if args.tiktoken_pattern == "v1" else PATTERN_TIKTOKEN_V2tokenizer = CustomTikTokenizer(path=args.tokenizer_model,pattern=pattern,vocab_size=args.vocab_size,num_special_tokens=args.tiktoken_num_special_tokens,special_tokens=args.tiktoken_special_tokens,)elif args.tokenizer_type == 'NullTokenizer':assert args.vocab_size is not Nonetokenizer = _NullTokenizer(args.vocab_size)else:raise NotImplementedError('{} tokenizer is not ''implemented.'.format(args.tokenizer_type))# Add vocab size (if not already set from a checkpoint).if getattr(args, "padded_vocab_size", None) is None:args.padded_vocab_size = _vocab_size_with_padding(tokenizer.vocab_size,args)return tokenizer
4.1.2 MegatonDataset
MegatronDataset 是所有上层dataset均须继承的抽象基类,且必须实现__len__和__getitem__抽象接口,同时定义了3个关键的静态方法:
- numel_low_level_dataset(LowLevelDataset):返回底层dataset数据集中的样本个数,用于切分train/test/valid 数据集
- build_low_level_dataset(dataset_path, BlendedMegatronDatasetConfig):根据传入的底层dataset的路径构建IndexDataset 对象
- _key_config_attributes()->List[str]:主要用来获取关键key,以计算md5 hash,用于缓存(多机各节点只需要一次处理数据,然后缓存到各个节点,提升IO效率?)
这里我们提到了IndexDataset,它也是继承torch.utils.Dataset,它主要负责真正面向数据集文件的读取、解析、转换等操作的底层类,其内置了_IndexReader、_IndexWriter、_BinReaderIO 读写函数逻辑。
另一个比较重要的类是:BlendedDataset,直接继承torch.utils.Dataset,从构造函数里我们能比较容易看出它是一个顶层策略类,仅重写了_build_indices函数逻辑
class BlendedDataset(torch.utils.data.Dataset):"""Conjugating class for a set of MegatronDataset instances"""def __init__(self,datasets: List[MegatronDataset],weights: List[Union[int, float]],size: Optional[int],config: BlendedMegatronDatasetConfig,) -> None:# ......def _build_indices(self) -> Tuple[numpy.ndarray, numpy.ndarray]:# .....
在Megatron里,Dataset是划分为了LowLevel、MidLevel、TopLevel 3种层次,分别对应于上面哪些类?
LowLevelDataset := IndexedDataset | Iterable
MidLevelDataset := MegatronDataset
TopLevelDataset := BlendedDataset | MidLevelDataset
DistributedDataset := TopLevelDataset | MidLevelDataset | LowLevelDataset | torch.utils.data.Dataset其中:
BlendedDataset := [MegatronDataset]
4.2 模型结构
此部分自顶向下总共分为三层:
- models 目录:定义最上层的模型类,比如GPTModule、LLavaModel、T5LMHead等
- trainsform 目录:定义子网络结构,且天然支持分布式混合并行策略,如TransformLayer、MOELayer、Attention模块、custom_layerfy
- fusion 目录:定义一些可被@torch.compilier装饰加速的网络结构,如fused_bias_dropout、fused_layer_norm等
4.2.1 顶层模型定义
我们先看下作为基类的 MegatronModule
,其直接继承 torch.nn.Module
,并不是一个抽象基类,仅额外定义了3个和分布式 checkpoint 有关的逻辑函数:
class MegatronModule(torch.nn.Module):"""Base Megatron module inhertied by all Models.Megatron specific extensions of torch Module with supportfor pipeliningArgs:config (TransformerConfig): Transformer config"""def state_dict_for_save_checkpoint(self, prefix: str = '', keep_vars: bool = False):"""Override state dict for saving checkpoints Use this function to override thestate dict for saving checkpoints."""return self.state_dict(prefix=prefix, keep_vars=keep_vars)def sharded_state_dict(self,prefix: str = '',sharded_offsets: Tuple[Tuple[int, int, int]] = (),metadata: Optional[dict] = None,) -> ShardedStateDict:"""Default implementation for sharded state dict for distributed checkpointing."""sharded_state_dict = {}# Save parametersself._save_to_state_dict(sharded_state_dict, '', keep_vars=True)sharded_state_dict = make_sharded_tensors_for_checkpoint(sharded_state_dict, prefix, sharded_offsets=sharded_offsets)# Recurse into submodulesfor name, module in self.named_children():sharded_state_dict.update(sharded_state_dict_default(module, f'{prefix}{name}.', sharded_offsets, metadata))return sharded_state_dictdef set_is_first_microbatch(self):"""Sets the is_first_microbatch flag if it exists. When this flag is set, TE modules willupdate their fp8 parameter cache."""for m in self.modules():if hasattr(m, "is_first_microbatch"):m.is_first_microbatch = True
对于各个LM和Vision领域的模型,分别基于此派生出了 LanguageModule
和 VisionModel
:
class VisionModule(MegatronModule):"""Base vision module that has common helper functions used across CLIP, ViT, etc.Args:config (TransformerConfig): Input transformer config for the model"""def __init__(self, config: TransformerConfig) -> None:super().__init__(config=config)class LanguageModule(MegatronModule):"""Base language module that has common helper functions used across GPT, BERT etc."""def compute_language_model_loss(self, labels: Tensor, logits: Tensor) -> Tensor:"""Computes the language model loss (Cross entropy across vocabulary)"""passdef setup_embeddings_and_output_layer(self) -> None:"""Sets up embedding layer in first stage and output layer in last stage."""passdef shared_embedding_or_output_weight(self) -> Tensor:"""Gets the emedding weight or output logit weights when share embedding and output weights set to True."""passdef tie_embeddings_and_output_weights_state_dict(self,sharded_state_dict: ShardedStateDict,output_layer_weight_key: str,first_stage_word_emb_key: str,) -> None:"""Ties the embedding and output weights in a given sharded state dict."""passs
对于顶层类似 GPTModule,主要逻辑包含2步:
- 在
__init__
中根据TransformerConfig
和ModuleSpec
来初始化encoder、decoder、loss计算等网络模型 - 在 forward 中定义模型前馈逻辑
4.2.2 子层Layer定义
这一部分也包含顶层模块、moe、custom_layer三个模块。其中类似MegatronModule、Float16Module、TransformLayer都在顶层模块里,熟悉算法的同学可以直接跳过这部分,各个框架实现差不多。
我们重点来看下Megatron里的Attention模块,下图是各个核心类的的交互关系,主要包括Submodules、Attention基类、各Attention策略实现。
首先我们看下Submodules的角色,主要是用来描述不同Attention策略的的子Layer的ModelSpec(用于构建nn.Module对象):
@dataclass
class SelfAttentionSubmodules:linear_qkv: Union[ModuleSpec, type] = Nonecore_attention: Union[ModuleSpec, type] = Nonelinear_proj: Union[ModuleSpec, type] = Noneq_layernorm: Union[ModuleSpec, type] = Nonek_layernorm: Union[ModuleSpec, type] = None@dataclass
class CrossAttentionSubmodules:linear_q: Union[ModuleSpec, type] = Nonelinear_kv: Union[ModuleSpec, type] = Nonecore_attention: Union[ModuleSpec, type] = Nonelinear_proj: Union[ModuleSpec, type] = None
Submodules 主要在Attention基类的__init__里被用来build_module,同时Attention基类的forward函数是已经写好了前馈逻辑,代码概览如下:
class Attention(MegatronModule, ABC):"""Attention layer abstract class.This layer only contains common modules required for the "self attn" and"cross attn" specializations."""def __init__(self,config: TransformerConfig,submodules: Union[SelfAttentionSubmodules, CrossAttentionSubmodules],layer_number: int,attn_mask_type: AttnMaskType,attention_type: str,):# .....self.core_attention = build_module(submodules.core_attention, # <<<<<<<<config=self.config,layer_number=self.layer_number,attn_mask_type=self.attn_mask_type,attention_type=self.attention_type,)# .....@abstractmethoddef get_query_key_value_tensors(self, hidden_states, key_value_states):"""This method needs to be implemented based on whether the derived classis "self-attn" or "cross-attn"."""passdef forward(self,hidden_states,attention_mask,key_value_states=None,inference_params=None,rotary_pos_emb=None,packed_seq_params=None):# .....return output, bias
其中 SelfAttention 和 CrossAttention 都是复用Attention基类的 forward 逻辑,仅仅重写了 get_query_key_value_tensors
。
在 transformer/moe
目录中定义了 MoELayer,以及路由机制 TopkRouter。
其中 BaseMoELayer
是一个抽象基类,继承 MegatronModule
,所有的派生类都需要实现forward函数:
class BaseMoELayer(MegatronModule, ABC):"""Base class for a mixture of experts layer.Args:config (TransformerConfig): Configuration object for the transformer model."""@abstractmethoddef forward(self, hidden_states):"""Forward method for the MoE layer."""passdef set_layer_number(self, layer_number: int):"""Set the layer number for the MoE layer."""self.layer_number = layer_numberself.router.set_layer_number(layer_number)
这里我们可以看下 MoELayer 的 forward 实现逻辑:
class MoELayer(BaseMoELayer):"""Mixture of experts Layer **currently only supports no token dropping**.Args:BaseMoELayer (MegatronModule): Base class for MoE layers"""def forward(self, hidden_states: torch.Tensor):if (self.trainingand self.config.tensor_model_parallel_size > 1and not self.config.sequence_parallel):raise ValueError("During training, performance may degrade if MoE and tensor parallelism""are enabled without also enabling sequence parallelism.")# process MoEdef custom_forward(hidden_states):probs, indices = self.router(hidden_states)(dispatched_input, tokens_per_expert) = self.token_dispatcher.token_permutation(hidden_states, probs, indices)expert_output, mlp_bias = self.experts(dispatched_input, tokens_per_expert)output, mlp_bias = self.token_dispatcher.token_unpermutation(expert_output, mlp_bias)return output, mlp_biasif self.moe_layer_recompute:output, mlp_bias = tensor_parallel.checkpoint(custom_forward, False, hidden_states)else:output, mlp_bias = custom_forward(hidden_states)return output, mlp_bias
4.3 并行策略
这里主要包含四个关键目录:
- distributed 目录:封装了 DistributedDataParallel 顶层分布式策略
- tensor_parallel 目录:定义了一些张量并行子Layer,比如
VocabParallelEmbedding
、RowParallelLinear
等 - pipline_parallel 目录:以函数的形式封装了pipline并行策略,比如
foward_step
、backward_step
等,且内置了一些p2p通信组件
4.3.1 张量并行
在Megatron-llm中的张量并行模块里,主要包含三大块:
词嵌入矩阵的并行。涉及到 VocabParallelEmbedding,会按照vocab_size的维度划分range区间进行切片,每个节点负责一部分的embedding映射。
class VocabParallelEmbedding(torch.nn.Module):"""Embedding parallelized in the vocabulary dimension.This is mainly adapted from torch.nn.Embedding and all the defaultvalues are kept."""def __init__(self,num_embeddings: int,embedding_dim: int,*,init_method: Callable,reduce_scatter_embeddings: bool = False,config: ModelParallelConfig,):# .....# Divide the weight matrix along the vocaburaly dimension.(self.vocab_start_index, self.vocab_end_index) = (VocabUtility.vocab_range_from_global_vocab_size(self.num_embeddings,get_tensor_model_parallel_rank(),self.tensor_model_parallel_size,))self.num_embeddings_per_partition = self.vocab_end_index - self.vocab_start_index# ....def forward(self, input_):if self.tensor_model_parallel_size > 1:# Build the mask.input_mask = (input_ < self.vocab_start_index) | (input_ >= self.vocab_end_index)# Mask the input.masked_input = input_.clone() - self.vocab_start_indexmasked_input[input_mask] = 0else:masked_input = input_# Get the embeddings.if self.deterministic_mode:output_parallel = self.weight[masked_input]else:# F.embedding currently has a non-deterministic backward functionoutput_parallel = F.embedding(masked_input, self.weight)# Mask the output embedding.if self.tensor_model_parallel_size > 1:output_parallel[input_mask, :] = 0.0if self.reduce_scatter_embeddings:# Data format change to avoid explicit tranposes : [b s h] --> [s b h].output_parallel = output_parallel.transpose(0, 1).contiguous()output = reduce_scatter_to_sequence_parallel_region(output_parallel)else:# Reduce across all the model parallel GPUs.output = reduce_from_tensor_model_parallel_region(output_parallel)return output
线性矩阵乘的并行。涉及ColumParallelLinear、RowParallelLinear、LinearWithFrozedWeight三个重要的子Layer封装。我们重点看下行、列切分的源码实现;
列切分: 分别将A1,A2放置在两张卡上。两张卡分别计算Y1=XA1和Y2=XA2。计算完成后,通过 collective 通信 AllGather ,获取其它卡上的计算结果,拼接在一起得到最终的结果矩阵Y
class ColumnParallelLinear(torch.nn.Module):"""Linear layer with column parallelism.The linear layer is defined as Y = XA + b. A is parallelized alongits second dimension as A = [A_1, ..., A_p]."""def __init__(self,input_size,output_size,*,config: ModelParallelConfig,...):# Keep input parametersself.input_size = input_sizeself.output_size = output_sizeself.output_size_per_partition = divide(output_size, world_size)# ....self._forward_impl = linear_with_grad_accumulation_and_async_allreducedef forward(self, input_: torch.Tensor, weight: Optional[torch.Tensor] = None):"""Forward of ColumnParallelLinear"""### <<<<<<<< 考虑并行策略,处理输入input >>>>>>>> ###if (self.allreduce_dgrador self.sequence_parallelor self.explicit_expert_commor self.disable_grad_reduce):input_parallel = input_else:input_parallel = copy_to_tensor_model_parallel_region(input_)### <<<<<<<< 考虑训练或推理,处理FFN算法策略 >>>>>>>> ###if not weight.requires_grad:self._forward_impl = linear_with_frozen_weightelse:self._forward_impl = linear_with_grad_accumulation_and_async_allreduce### <<<<<<<< 考虑并行策略,计算得到输出 >>>>>>>> ###output_parallel = self._forward_impl(input=input_parallel, weight=weight, bias=bias,....)### <<<<<<<< 考虑并行策略,是否需要 gather >>>>>>>> ###if self.gather_output:# All-gather across the partitions.assert not self.sequence_paralleloutput = gather_from_tensor_model_parallel_region(output_parallel)else:output = output_paralleloutput_bias = self.bias if self.skip_bias_add else Nonereturn output, output_bias
行切分: 对矩阵X按照行切分,为了满足矩阵乘法规则,输入矩阵X需要按列切分 X=[X1 | X2]。同时,将矩阵分块,分别放置在两张卡上,每张卡分别计算 Y1=X1A1,Y2=X2A2。计算完成后,通过collective通信Allreduce_sum,归约其他处理器上的计算结果,可以得到最终的结果矩阵Y。
class RowParallelLinear(torch.nn.Module):"""Linear layer with row parallelism.The linear layer is defined as Y = XA + b. A is parallelized along its first dimension and Xalong its second dimension. A = transpose([A_1 .. A_p]) X = [X_1, ..., X_p]"""def __init__(self,input_size: int,output_size: int,*,config: ModelParallelConfig,....):# Keep input parametersself.input_size = input_sizeself.output_size = output_sizeself.input_is_parallel = input_is_parallelself.input_size_per_partition = divide(input_size, world_size)# 此处的 Weight 是转置后的,input_size_per_partition就是 K 维度(切分后的)self.weight = Parameter(torch.empty(self.output_size, self.input_size_per_partition, dtype=config.params_dtype))# ....self._forward_impl = linear_with_grad_accumulation_and_async_allreducedef forward(self, input_):"""Forward of RowParallelLinear"""# Set up backprop all-reduce.if self.input_is_parallel:input_parallel = input_else:assert not self.sequence_parallelinput_parallel = scatter_to_tensor_model_parallel_region(input_)# Matrix multiply.if not self.weight.requires_grad:self._forward_impl = linear_with_frozen_weightelse:self._forward_impl = linear_with_grad_accumulation_and_async_allreduceoutput_parallel = self._forward_impl(input=input_parallel, weight=self.weight,bias=None, ....)# All-reduce across all the partitions.if self.explicit_expert_comm:assert self.skip_bias_addoutput_ = output_parallelelif self.sequence_parallel:output_ = reduce_scatter_to_sequence_parallel_region(output_parallel)else:output_ = reduce_from_tensor_model_parallel_region(output_parallel)# .....return output, output_bias
可以看出,行、列切分是针对第二输入(常见的是Weight)而言的。上述源码实现是前向,我们同样关注下ColumnParallelLinear和RowParallelLinear下反向的计算逻辑,可以借助下图来理解:
如下 f
与 g
是两个共轭算子( f
代表前向,g
代表反向):
在列切割中:
f
: forward中,直接copy输入;backward中,对梯度做AllReduce。在代码里定义为class _CopyToModelParallelRegion
(同文件夹下的 mappings.py)。g
: forward中,all-gather输出;backward中,对梯度做split(每张卡经过all-gather已有完整的Y了,因此以Y为起点计算梯度后,沿着列做split就可得到Y1和Y2的梯度)。在代码里定义为class _GatherFromModelParallelRegion
(同文件夹下的 mappings.py)。
在行切割中:
f
: forward中,按列split输入;backward中,all-gather梯度。在代码里定义为class _ScatterToModelParallelRegion
(同文件夹下的 mappings.py)。g
: forward中,AllReduce输出;backward中,直接输出梯度,无需做任何通讯(因为经过g的foward,每块GPU上已拥有了Yi和Y,则根据图中g的backward公式可知,每块卡独立计算梯度)。在代码里定义为class _ReduceFromModelParallelRegion
(同文件夹下的 mappings.py)。
重要的多节点通信,包括 gather、scatter、reduce、和all_to_all,主要服务于跨节点通信操作。Megatron-llm的此类通信函数包括:
4.3.2 流水线并行
在 Megatron-llm/core
目录下的README中给出的教程样例,就是以 GPTModel+Pipline 为例的,我们可以简单回顾下这个简单的代码样
from functools import partial# 定义前向 step 主函数逻辑
def forward_step_func(data_iterator, model):def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):losses = output_tensor.float()loss_mask = loss_mask.view(-1).float()loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()# If you have data parallel reduce loss across data parallel groups. # If pipeline parallel, loss computation is done only in last stage.return loss, {'lm loss': loss}data = next(data_iterator)tokens = data['tokens'].to(device)attention_mask = data['attention_mask'].to(device)position_ids = data['position_ids'].to(device)labels = data['labels'].to(device)loss_mask = data['loss_mask'].to(device)output_tensor = model(tokens, position_ids, attention_mask,labels=labels)return output_tensor, partial(loss_func, loss_mask) from pathlib import Path
from torch.optim import Adam
# <<<<<<<< 导入 get_forward_backward_func 公共函数 <<<<<<<<<
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seedif __name__ == "__main__":initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)model_parallel_cuda_manual_seed(123)gpt_model = model_provider()device = torch.device("cuda")gpt_model.to(device)optim = Adam(gpt_model.parameters())train_iterator = get_train_data_iterator() # <---- 用户定义的forward_backward_func = get_forward_backward_func() # <---- 框架提供的# Running the model for 5 iterationsfor _ in range(5):optim.zero_grad()losses_reduced = forward_backward_func(forward_step_func=forward_step_func, # <------ 封装用户的foward_step_funcdata_iterator=train_iterator,model=gpt_model,num_microbatches=1,seq_length=64,micro_batch_size=8,decoder_seq_length=64,forward_only=False)optim.step()print(f'Losses reduced : {losses_reduced}')# Saving the modelsave_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')# Loading the modelgpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')gpt_model.to(device)print('Successfully loaded the model')
我们来研究下框架提供的 get_forward_backward_func
主要承接了什么工作,如下是其函数实现逻辑,可以看出是一个根据分布式策略自适应分发的 Dispatcher:
- 如果是
PP/MP
,则:- 若还开启了VP,则使用
forward_backward_pipelining_with_interleaving
- 否则使用:
forward_backward_pipelining_without_interleaving
- 若还开启了VP,则使用
- 否则使用:
forward_backward_no_pipelining
def get_forward_backward_func():"""Retrieves the appropriate forward_backward function given theconfiguration of parallel_state.Returns a function that will perform all of the forward andbackward passes of the model given the pipeline model parallelworld size and virtual pipeline model parallel world size in theglobal parallel_state.Note that if using sequence parallelism, the sequence length component ofthe tensor shape is updated to original_sequence_length /tensor_model_parallel_world_size."""pipeline_model_parallel_size = parallel_state.get_pipeline_model_parallel_world_size()if pipeline_model_parallel_size > 1:if parallel_state.get_virtual_pipeline_model_parallel_world_size() is not None:forward_backward_func = forward_backward_pipelining_with_interleavingelse:forward_backward_func = forward_backward_pipelining_without_interleavingelse:forward_backward_func = forward_backward_no_pipeliningreturn forward_backward_func
我们先看不开启任意分布式策略下的forward_backward_no_pipelining的骨干逻辑,这有利于我们更进一步对比理解开启分布式后的逻辑变动。
首先,理解函数签名很重要,因为这里是不同策略的统一签名,要保持一致,关键的参数是:
- forward_step_func:用户自定义的前向过程函数
- data_iterator:用户自定义的数据迭代器
- 剩余是一些分布式策略相关的参数,在单机下可能用不到。
def forward_backward_no_pipelining(*,forward_step_func,data_iterator: Union[Iterator, List[Iterator]],model: Union[torch.nn.Module, List[torch.nn.Module]],num_microbatches: int,seq_length: int, # unusedmicro_batch_size: int, # unuseddecoder_seq_length: int = None, # unusedforward_only: bool = False,collect_non_loss_data: bool = False,first_val_step: bool = None,
):"""Run forward and backward passes with no pipeline parallelism(no inter-stage communication).Returns dictionary with losses.See get_forward_backward_func() for argument details"""# .......forward_data_store = []input_tensor, output_tensor_grad = None, Nonetotal_num_tokens = torch.zeros([], dtype=torch.int, device="cuda")# <<<<<<<<<<<< Step 1: 执行前向 forward_step, 内在调用用户定义的forward_step_func <<<<<<<with no_sync_func():for i in range(num_microbatches - 1):output_tensor, num_tokens = forward_step(forward_step_func,data_iterator,model,num_microbatches,input_tensor,forward_data_store,config,collect_non_loss_data,is_first_microbatch=check_first_val_step(first_val_step, forward_only, i == 0),current_microbatch=i,)total_num_tokens += num_tokens.item()# <<<<<<<<< Step 2: 如果是训练场景,则需要额外调用 backward_step;推理则不需要 <<<<<<<<if not forward_only:backward_step(input_tensor, output_tensor, output_tensor_grad, model_type, config)# <<<<<<<<< Step 3: 最后在调用一次 forward_step & backward_step <<<<<<<<<# Run computation for last microbatch out of context handler (want to# synchronize gradients).output_tensor, num_tokens = forward_step(forward_step_func,data_iterator,model,num_microbatches,input_tensor,forward_data_store,config,collect_non_loss_data,is_first_microbatch=check_first_val_step(first_val_step, forward_only, num_microbatches == 1),current_microbatch=num_microbatches - 1,)total_num_tokens += num_tokens.item()if not forward_only:backward_step(input_tensor, output_tensor, output_tensor_grad, model_type, config)return forward_data_store # 返回的是一个 list
上面函数中核心的逻辑是交给了 forward_step 和 backward_step 两个函数来做的,此处按住不表。我们先来看下PP/MP下的 forward_backward_pipelining_without_interleaving
多了哪些逻辑:
- 三大阶段:①warmup阶段,跑多轮微Batch前向 ②正常前反向训练阶段 ③ cooldown阶段,跑多轮微Batch对应的反向
- 额外通信:①recv_forward 接收前序过程节点的张量作为此节点输入 ② send_forward 发送此节点的输出,作为后续节点的输入
- 反向过程:send_forward_recv_backward + backward_step + send_backward_recv_forward
def forward_backward_pipelining_without_interleaving(....): # <<< 函数签名是一致的disable_grad_sync()# <<<<<<<<< Step 1: 获取上下游rank接发的tensor shape信息 <<<<<rank = parallel_state.get_pipeline_model_parallel_rank()recv_tensor_shapes = get_tensor_shapes(rank=rank - 1,....)send_tensor_shapes = get_tensor_shapes(rank=rank, ....)# <<<<<<<< Step 2: Run warmup forward passes. <<<<<<<for i in range(num_warmup_microbatches):input_tensor = recv_forward(recv_tensor_shapes, config)output_tensor, num_tokens = forward_step(forward_step_func,data_iterator,model, ...)send_forward(output_tensor, send_tensor_shapes, config)# Before running 1F1B, need to receive first forward tensor.# If all microbatches are run in warmup / cooldown phase, then no need to# receive this tensor here.if num_microbatches_remaining > 0:input_tensor = recv_forward(recv_tensor_shapes, config)# <<<<<<<<<<< Step 3: Run 1F1B in steady state. <<<<<<for i in range(num_microbatches_remaining):last_iteration = i == (num_microbatches_remaining - 1)output_tensor, num_tokens = forward_step(forward_step_func,data_iterator,model, ...)send_forward(output_tensor, send_tensor_shapes, config)if forward_only:send_forward(output_tensor, send_tensor_shapes, config)else:# <<<<<<<<<<< Step 4: 计算反向,并通信output_tensor_grad = send_forward_recv_backward(output_tensor, send_tensor_shapes, config)input_tensor_grad = backward_step(input_tensor, output_tensor, output_tensor_grad, model_type, config)input_tensor = send_backward_recv_forward(input_tensor_grad, recv_tensor_shapes, config)# <<<<<<<<< Step 5: Run cooldown backward passes.if not forward_only:for i in range(num_warmup_microbatches):# recv_backward -> backward_step -> send_backwardreturn forward_data_store
此处我们回到刚才的forward_step和backward_step,简要介绍下其思维导图,后续有机会可以单独开一小节介绍。同时上述类似 revc/send_forward/backward
等p2p通信组件也都封装在同级目录下:
4.3.3 数据并行
相对于张量并行和流水线并行,数据并行的逻辑就比较简单了。主要在 distributed 目录下的 DistributedDataParallel
类中定义
class DistributedDataParallel(MegatronModule):"""DDP wrapper which stores grads in contiguous buffers. Also has option of overlappingcommunication with backprop computation by breaking up full model's gradients into smallerbuckets and running all-reduce / reduce-scatter on each bucket asynchronously. This classalso provides the option to do the gradient accumulation in a type other than the param type(e.g., fp32 for a bf16 model)."""def __init__(self,config: TransformerConfig,ddp_config: DistributedDataParallelConfig,module: torch.nn.Module,disable_bucketing: bool = False,):# If bucket_size is not provided as an input, use sane default.# If using very large dp_sizes, make buckets larger to ensure that chunks used in NCCL# ring-reduce implementations are large enough to remain bandwidth-bound rather than# latency-bound.if ddp_config.bucket_size is None:ddp_config.bucket_size = max(40000000, 1000000 * parallel_state.get_data_parallel_world_size())# <<<<<<<<<< 所有的参数和梯度会根据dtype进行分桶,保持桶内的参数和梯度内存区域是连续的,提升通信效率 <<<<<# Allocate the param+grad buffers for dense params' grads.self.buffers = allocate_buffers_for_parameters(dense_params,parallel_state.get_data_parallel_group(with_context_parallel=True),gradient_scaling_factor=gradient_scaling_factor,)# Register backward hook.# Accumulation function for the gradients need to be stored so they# don't go out of scope.self.grad_accs = []for param in self.module.parameters():if param.requires_grad:# Expand so we get access to grad_fn.param_tmp = param.expand_as(param)# Get the gradient accumulator function.grad_acc = param_tmp.grad_fn.next_functions[0][0]grad_acc.register_hook(self._make_param_hook(param, self.param_to_buffer))self.grad_accs.append(grad_acc)def forward(self, *inputs, **kwargs):"""Calls the wrapped module's forward() method."""return self.module(*inputs, **kwargs)
在 DistributedDataParallel 中有一个重要的逻辑:参数的广播,在Megatron-LLM中是通过 torch.distributed.broadcast
API按照Group进行广播的。
def broadcast_params(self):"""Syncs parameters across all DP ranks."""for param in self.module.parameters():is_expert_parallel = not getattr(param, 'allreduce', True)if is_expert_parallel:data_parallel_group = parallel_state.get_data_modulo_expert_parallel_group(with_context_parallel=True)else:data_parallel_group = parallel_state.get_data_parallel_group(with_context_parallel=True)torch.distributed.broadcast(param.data,src=torch.distributed.get_global_rank(data_parallel_group, 0),group=data_parallel_group,)
4.4 推理服务
Megatron-LLM提供了Inference推理服务引擎,主要封装在inference目录,主要设计到如下几个核心的模块设计:
4.4.1 Engine 体系
抽象了一层AbstractEgnine,派生出了MCoreEngine,内含了scheduler、controller,提供了generate和run_engine两个核心接口
4.4.2 Infer Request
InferenceRequest是一个dataclass,负责端到端请求、返回的信息管理,包括prompt、status、以及预测得到的generated_tokens,有点类似网路通信中Package的概念
4.4.3 Controller
以SimpleTextGenerationController形态提供,封装tokenizer、model.infer、detokenizer等核心逻辑
4.4.4 ModelWrapper
ModelWrapper 是推理模型接口的抽象代理类(即AbstractModekInferenceWarpper),默认提供了GPTInferenceWrapper,override了get_batch_for_context_window和prep_model_for_inference
附录:一些变量和碎碎念
F1. CUDA_DEVICE_MAX_CONNECTIONS 环境变量
- 定义:CUDA_DEVICE_MAX_CONNECTIONS是一个环境变量,用于指定在CUDA应用程序中,每个GPU设备允许的最大并行硬件连接数。这些连接通常与CUDA流(Streams)或其他并发执行单元相关。
- 用途:通过调整这个环境变量的值,开发者可以控制GPU上的并行度和资源使用方式,从而优化程序的性能和资源利用率。特别是在进行多卡并行训练或复杂计算任务时,合理设置CUDA_DEVICE_MAX_CONNECTIONS可以显著提高计算效率。
影响与效果 - 并行度控制:增加CUDA_DEVICE_MAX_CONNECTIONS的值可以允许更多的并行硬件连接,这通常意味着更高的并行度和潜在的性能提升。然而,过高的值也可能导致资源竞争和性能下降,因为GPU的硬件资源是有限的。
- 资源分配:每个并行连接都需要一定的GPU资源来支持,包括内存、寄存器等。因此,CUDA_DEVICE_MAX_CONNECTIONS的设置也会影响GPU资源的分配和使用方式。
- 程序兼容性:在某些情况下,特定的CUDA程序或库可能对CUDA_DEVICE_MAX_CONNECTIONS有特定的要求。因此,了解并正确设置这个环境变量对于确保程序的兼容性和性能至关重要。
设置与默认值
- 设置方法:在Linux系统中,可以通过export CUDA_DEVICE_MAX_CONNECTIONS=N命令来设置这个环境变量,其中N是你想要设置的值。在Windows系统中,可以通过系统属性或命令行工具来设置环境变量。
- 默认值:不同的GPU型号和CUDA版本可能有不同的默认值。通常,这个默认值是根据GPU的硬件特性和CUDA的设计来确定的,旨在提供一个合理的并行度和资源利用率的平衡点。然而,对于某些特定的计算任务或程序,可能需要手动调整这个值以获得最佳性能。
注意事项
- 在调整CUDA_DEVICE_MAX_CONNECTIONS时,需要谨慎考虑GPU的硬件资源和当前计算任务的需求。过高的值可能导致资源竞争和性能下降,而过低的值则可能限制并行度和计算效率。
- 在使用多卡并行训练或复杂计算任务时,建议通过实验来确定最佳的CUDA_DEVICE_MAX_CONNECTIONS设置值。这通常涉及到在不同的设置下运行程序并比较性能结果。
F2. NVTE_APPLY_QK_LAYER_SCALING 环境变量
NVTE_APPLY_QK_LAYER_SCALING 环境变量通常与NVIDIA TensorRT(TensorRT是一个高性能深度学习推理(Inference)优化器和运行时,用于生产部署)和NVIDIA Transformer Engine(NVTE)相关。然而,直接针对NVTE_APPLY_QK_LAYER_SCALING环境变量的官方文档或详细描述可能相对较少,因为它可能是一个内部或特定于某些NVIDIA软件版本、工具或库的配置选项。
不过,从环境变量的命名和Transformer模型的上下文来看,我们可以做一些合理的推测:
- QK Layer Scaling:QK很可能指的是Transformer模型中的“查询(Query)”和“键(Key)”操作,这是Transformer架构中自注意力(Self-Attention)机制的核心部分。Layer Scaling可能指的是对QK操作结果的一种缩放(Scaling)技术,用于改善模型的稳定性和性能。
- 环境变量的作用:设置NVTE_APPLY_QK_LAYER_SCALING环境变量可能用于控制是否在应用Transformer模型时启用QK Layer Scaling功能。如果设置为启用(如1或true),则可能在Transformer的QK计算中应用某种缩放策略;如果设置为禁用(如0或false),则可能不应用这种缩放。
- 性能影响:启用QK Layer Scaling可能会对Transformer模型的推理性能产生影响,具体取决于缩放策略的实现细节和模型的特定使用场景。在某些情况下,它可能有助于减少数值不稳定、提高模型精度或加速推理过程;但在其他情况下,它可能对性能没有显著影响,甚至可能产生负面影响。
- 配置注意事项:如果NVTE_APPLY_QK_LAYER_SCALING是一个有效的环境变量,并且您打算使用它,请确保您了解它的确切含义和预期效果。此外,还需要检查您的NVIDIA软件版本、TensorRT版本和Transformer Engine实现是否支持此功能,并遵循任何相关的配置指南或最佳实践。
- 替代方案:如果您正在寻找提高Transformer模型推理性能的方法,并且NVTE_APPLY_QK_LAYER_SCALING不适用于您的场景,请考虑其他优化技术,如量化、剪枝、模型蒸馏或使用更高效的模型架构。
请注意,由于NVTE_APPLY_QK_LAYER_SCALING可能不是一个广泛文档化或公开讨论的环境变量,因此上述信息基于命名约定和Transformer模型的一般知识。为了获取最准确的信息,建议直接参考NVIDIA的官方文档、论坛或联系NVIDIA的技术支持。
F3. torch.distributed.P2POp 的作用
torch.distributed.P2POp 是 PyTorch 分布式包中的一个类,用于表示点对点(Peer-to-Peer, P2P)通信操作,用于构建和优化跨多个计算设备的数据传输操作。在分布式训练中,当需要在不同的计算设备(例如,不同的 GPU 或不同的机器)之间直接交换数据时,可以使用 P2P 操作。具体来说,P2POp 可以用来执行如发送(send)和接收(recv)等操作,使得数据可以在不同的进程之间直接传输。这对于实现并行计算和优化分布式训练的性能至关重要。
使用 P2POp 时,通常会结合 torch.distributed.ProcessGroup 一起使用,后者用于定义一组参与分布式计算的进程。ProcessGroup 提供了进程间通信的上下文,而 P2POp 则定义了在这些进程间执行的具体通信操作。P2POp实际上是一个更底层的操作,它通常不直接由用户代码调用,而是由torch.distributed的高级API(如send和recv)在内部使用。
import torch
import torch.distributed as dist
from torch.multiprocessing import Process def send_recv_process(rank, size): dist.init_process_group("gloo", rank=rank, world_size=size) tensor = torch.tensor([rank]) if rank == 0: # 进程0发送数据到进程1 dist.send(tensor, dst=1) else: # 进程1接收来自进程0的数据 received_tensor = dist.recv(src=0) print(f"Received tensor in process {rank}: {received_tensor}") dist.destroy_process_group() if __name__ == "__main__": size = 2 processes = [] for rank in range(size): p = Process(target=send_recv_process, args=(rank, size)) p.start() processes.append(p) for p in processes: p.join()
如下是megetaon-llm框架里关于P2POp的使用样例:
# TODO: use functions from megatron/p2p
def recv_from_prev_pipeline_rank_(recv_buffer=None):"""Receive from previous pipeline stage and update theinput buffer inplace."""if not mpu.is_pipeline_first_stage():assert recv_buffer is not Nonerecv_prev_op = torch.distributed.P2POp(torch.distributed.irecv, recv_buffer,mpu.get_pipeline_model_parallel_prev_rank())reqs = torch.distributed.batch_isend_irecv([recv_prev_op])for req in reqs:req.wait()# To protect against race condition when using batch_isend_irecv().torch.cuda.synchronize()# TODO: use functions from megatron/p2p
def send_to_next_pipeline_rank(tensor=None):"""Send output to the next pipeline stage."""if not mpu.is_pipeline_last_stage():assert tensor is not Nonesend_next_op = torch.distributed.P2POp(torch.distributed.isend, tensor,mpu.get_pipeline_model_parallel_next_rank())reqs = torch.distributed.batch_isend_irecv([send_next_op])for req in reqs:req.wait()# To protect against race condition when using batch_isend_irecv().torch.cuda.synchronize()
F4. torch.distributed.barrier() 的作用
在分布式训练中,多个进程可能会并行地执行计算任务,这些进程可能运行在不同的机器或者同一个机器的不同核心上。由于各种原因(比如计算速度不一致、网络延迟等),这些进程的执行速度可能会有所不同。这就可能导致一些问题,比如某个进程需要等待其他进程完成某些计算后才能继续执行。
torch.distributed.barrier() 函数就是用来解决这个问题的。当所有进程都调用这个函数时,它们会在这一点上进行同步,即所有进程都会等待,直到所有进程都到达了这个同步点,然后它们才能继续执行后面的代码。
F5. torch.distributed 中 Group 的概念
在PyTorch中,torch.distributed模块提供了支持分布式训练的功能,它允许开发者将神经网络训练任务分散到多个计算节点上进行。当在torch.distributed的某些函数或方法中指定group参数时,它带来了额外的灵活性和控制性,特别是在涉及多进程或多GPU的分布式训练中。以下是指定group参数的一些额外作用:
- 进程组通信:
- 在分布式训练中,进程组(group)是一组参与特定通信操作的进程集合。通过指定group参数,开发者可以控制哪些进程参与特定的通信操作,如数据广播(broadcast)、规约(reduce)、全规约(all_reduce)等。
- 这允许开发者根据需要在不同的进程组之间进行不同的通信操作,从而优化训练过程中的数据流动和同步。
- 资源隔离:
- 在某些情况下,可能希望将训练任务中的不同部分隔离到不同的进程组中,以避免不必要的通信开销或资源竞争。通过指定不同的group参数,可以实现这种隔离,确保每个进程组内的通信和资源使用是独立的。
- 动态进程组:
- 在训练过程中,有时需要根据实际情况动态地创建或修改进程组。通过torch.distributed.new_group()等函数,开发者可以根据需要创建新的进程组,并在后续的操作中通过指定这些新的group参数来控制通信行为。
- 错误处理与恢复:
- 在分布式训练中,某个进程可能会因为各种原因(如硬件故障、网络问题等)而失败。通过指定group参数,开发者可以更精确地控制哪些进程参与了某个特定的操作,从而更容易地检测和隔离问题进程,并采取相应的恢复措施。
- 性能优化:
- 在某些情况下,通过合理地划分进程组并利用不同进程组之间的并行性,可以进一步优化训练性能。例如,可以将计算密集型和通信密集型的任务分配到不同的进程组中,以减少整体训练时间。
F6. TP/PP/DP 混合并行时Group分组策略
参考:https://zhuanlan.zhihu.com/p/714733330