ml.models.parallel
Defines primitive model parallel layers.
Before using this module, you should initialize the parallel process groups
using ml.utils.parallel.init_parallelism()
. This will create
three process group for model parallelism, pipeline parallelism, and data
parallelism. The process group information can be accessed using
ml.utils.parallel.parallel_group_info()
.
The following layers are defined:
ParallelEmbedding
: A model-parallel embedding layer.ColumnParallelLinear
: A column model-parallel linear layer.RowParallelLinear
: A row model-parallel linear layer.
The RowParallelLinear
and ColumnParallelLinear
layers can
be used to create a model parallel two-layer MLP, as shown below.
# Create a parallel embedding layer.
parallel_embedding = ParallelEmbedding(
num_embeddings=vocab_size,
embedding_dim=in_features,
)
# Create a column parallel linear layer.
column_parallel_linear = ColumnParallelLinear(
in_features=in_features,
out_features=out_features,
bias=bias,
gather_output=False,
)
# Create a row parallel linear layer.
row_parallel_linear = RowParallelLinear(
in_features=out_features,
out_features=out_features,
bias=bias,
input_is_parallel=True,
)
# Applies the two linear layers together.
x = torch.randint(0, vocab_size - 1, (bsz, tsz))
y = row_parallel_linear(column_parallel_linear(parallel_embedding(x)))
This is equivalent to the following single-process implementation.
# Create a sequential model.
model = nn.Sequential(
nn.Embedding(vocab_size, in_features),
nn.Linear(in_features, out_features, bias=bias),
nn.Linear(out_features, out_features, bias=bias),
)
# Applies the sequential model.
x = torch.randint(0, vocab_size - 1, (bsz, tsz))
y = model(x)
- ml.models.parallel.mp_copy(x: ~torch.Tensor, op: ~typing.Any = <RedOpType.SUM: 0>) Tensor [source]
Copies the input to the model parallel region.
Forward this is a no-op, but backward it reduces the gradient across model parallel replicas (i.e., it is a cross-replica sum).
- Parameters:
x – Input tensor, with shape
(*)
.op – Reduction operation to use when reducing the gradient.
- Returns:
Output tensor, with shape
(*)
.
- ml.models.parallel.mp_reduce(x: ~torch.Tensor, op: ~typing.Any = <RedOpType.SUM: 0>) Tensor [source]
Reduces the input from the model parallel region.
Forward this reduces the input across model parallel replicas (i.e., it is a cross-replica sum), but backward it is a no-op.
- Parameters:
x – Input tensor, with shape
(*)
.op – Reduction operation to use when reducing the gradient.
- Returns:
Output tensor, with shape
(*)
.
- ml.models.parallel.mp_scatter(x: Tensor, dim: int = -1) Tensor [source]
Scatters the input across model parallel regions.
- Parameters:
x – Input tensor, with shape
(..., N, ...)
.dim – Dimension to scatter along.
- Returns:
Output tensor, with shape
(..., N // world_size, ...)
.
- ml.models.parallel.mp_gather(x: Tensor, dim: int = -1) Tensor [source]
Gathers the input from model parallel regions.
- Parameters:
x – Input tensor, with shape
(..., N, ...)
.dim – Dimension to gather along.
- Returns:
Output tensor, with shape
(..., N * world_size, ...)
.
- ml.models.parallel.initialize_model_parallel_affine_weight_(weight: Tensor, out_features: int, in_features: int, per_partition_size: int, partition_dim: int, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal', stride: int = 1) None [source]
Initializes an affine weight tensor for model-parallel training.
- Parameters:
weight – Weight tensor to initialize.
out_features – Number of output features.
in_features – Number of input features.
per_partition_size – Size of each partition.
partition_dim – Partition dimension.
init_type – Initialization type.
stride – Stride for the initialization.
- class ml.models.parallel.ParallelEmbedding(num_embeddings: int, embedding_dim: int, padding_idx: int | None = None, max_norm: float | None = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal')[source]
Bases:
Module
Model-parallel embeddings.
Embeddings are partitioned along the
embedding_dim
dimension.- Parameters:
num_embeddings – Number of embeddings (vocabulary size).
embedding_dim – Embedding dimension; must be divisible by the model-parallel size.
padding_idx – See
nn.Embedding
.max_norm – See
nn.Embedding
.norm_type – See
nn.Embedding
.scale_grad_by_freq – See
nn.Embedding
.sparse – See
nn.Embedding
.init_type – Initialization type.
- property master_weight: Tensor
- forward(x: Tensor) Tensor [source]
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.
- class ml.models.parallel.ColumnParallelLinear(in_features: int, out_features: int, bias: bool = True, gather_output: bool = True, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal', stride: int = 1)[source]
Bases:
Module
A column parallel linear layer.
This layer splits the weight matrix along the output feature dimension, and each rank is only responsible for
out_features // world_size
number of output features.- Parameters:
in_features – Number of input features.
out_features – Number of output features.
bias – Whether to include a bias term.
gather_output – Whether to gather the output from all the model parallel GPUs.
init_type – Initialization type.
stride – Stride for the initialization.
lora_rank – The LoRA rank to use, if any.
- property master_weight: Tensor
- property master_bias: Tensor | None
- class ml.models.parallel.RowParallelLinear(in_features: int, out_features: int, bias: bool = True, input_is_parallel: bool = False, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal', stride: int = 1)[source]
Bases:
Module
A row parallel linear layer.
This layer splits the weight matrix along the input feature dimension, and each rank is only responsible for
in_features // world_size
number of input features.This can be paired with a column parallel layer to create a model parallel two-stage linear layer.
- Parameters:
in_features – Number of input features.
out_features – Number of output features.
bias – Whether to include a bias term.
input_is_parallel – Whether the input tensor is already split along the feature dimension.
init_type – Initialization type.
stride – Stride for the initialization.
- property master_weight: Tensor
- property master_bias: Tensor | None