ml.models.parallel

Defines primitive model parallel layers.

Before using this module, you should initialize the parallel process groups using ml.utils.parallel.init_parallelism(). This will create three process group for model parallelism, pipeline parallelism, and data parallelism. The process group information can be accessed using ml.utils.parallel.parallel_group_info().

The following layers are defined:

The RowParallelLinear and ColumnParallelLinear layers can be used to create a model parallel two-layer MLP, as shown below.

# Create a parallel embedding layer.
parallel_embedding = ParallelEmbedding(
    num_embeddings=vocab_size,
    embedding_dim=in_features,
)

# Create a column parallel linear layer.
column_parallel_linear = ColumnParallelLinear(
    in_features=in_features,
    out_features=out_features,
    bias=bias,
    gather_output=False,
)

# Create a row parallel linear layer.
row_parallel_linear = RowParallelLinear(
    in_features=out_features,
    out_features=out_features,
    bias=bias,
    input_is_parallel=True,
)

# Applies the two linear layers together.
x = torch.randint(0, vocab_size - 1, (bsz, tsz))
y = row_parallel_linear(column_parallel_linear(parallel_embedding(x)))

This is equivalent to the following single-process implementation.

# Create a sequential model.
model = nn.Sequential(
    nn.Embedding(vocab_size, in_features),
    nn.Linear(in_features, out_features, bias=bias),
    nn.Linear(out_features, out_features, bias=bias),
)

# Applies the sequential model.
x = torch.randint(0, vocab_size - 1, (bsz, tsz))
y = model(x)
ml.models.parallel.mp_copy(x: ~torch.Tensor, op: ~typing.Any = <RedOpType.SUM: 0>) Tensor[source]

Copies the input to the model parallel region.

Forward this is a no-op, but backward it reduces the gradient across model parallel replicas (i.e., it is a cross-replica sum).

Parameters:
  • x – Input tensor, with shape (*).

  • op – Reduction operation to use when reducing the gradient.

Returns:

Output tensor, with shape (*).

ml.models.parallel.mp_reduce(x: ~torch.Tensor, op: ~typing.Any = <RedOpType.SUM: 0>) Tensor[source]

Reduces the input from the model parallel region.

Forward this reduces the input across model parallel replicas (i.e., it is a cross-replica sum), but backward it is a no-op.

Parameters:
  • x – Input tensor, with shape (*).

  • op – Reduction operation to use when reducing the gradient.

Returns:

Output tensor, with shape (*).

ml.models.parallel.mp_scatter(x: Tensor, dim: int = -1) Tensor[source]

Scatters the input across model parallel regions.

Parameters:
  • x – Input tensor, with shape (..., N, ...).

  • dim – Dimension to scatter along.

Returns:

Output tensor, with shape (..., N // world_size, ...).

ml.models.parallel.mp_gather(x: Tensor, dim: int = -1) Tensor[source]

Gathers the input from model parallel regions.

Parameters:
  • x – Input tensor, with shape (..., N, ...).

  • dim – Dimension to gather along.

Returns:

Output tensor, with shape (..., N * world_size, ...).

ml.models.parallel.initialize_model_parallel_affine_weight_(weight: Tensor, out_features: int, in_features: int, per_partition_size: int, partition_dim: int, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal', stride: int = 1) None[source]

Initializes an affine weight tensor for model-parallel training.

Parameters:
  • weight – Weight tensor to initialize.

  • out_features – Number of output features.

  • in_features – Number of input features.

  • per_partition_size – Size of each partition.

  • partition_dim – Partition dimension.

  • init_type – Initialization type.

  • stride – Stride for the initialization.

class ml.models.parallel.ParallelEmbedding(num_embeddings: int, embedding_dim: int, padding_idx: int | None = None, max_norm: float | None = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal')[source]

Bases: Module

Model-parallel embeddings.

Embeddings are partitioned along the embedding_dim dimension.

Parameters:
  • num_embeddings – Number of embeddings (vocabulary size).

  • embedding_dim – Embedding dimension; must be divisible by the model-parallel size.

  • padding_idx – See nn.Embedding.

  • max_norm – See nn.Embedding.

  • norm_type – See nn.Embedding.

  • scale_grad_by_freq – See nn.Embedding.

  • sparse – See nn.Embedding.

  • init_type – Initialization type.

property master_weight: Tensor
reset_parameters() None[source]
forward(x: Tensor) Tensor[source]

Defines the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

class ml.models.parallel.ColumnParallelLinear(in_features: int, out_features: int, bias: bool = True, gather_output: bool = True, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal', stride: int = 1)[source]

Bases: Module

A column parallel linear layer.

This layer splits the weight matrix along the output feature dimension, and each rank is only responsible for out_features // world_size number of output features.

Parameters:
  • in_features – Number of input features.

  • out_features – Number of output features.

  • bias – Whether to include a bias term.

  • gather_output – Whether to gather the output from all the model parallel GPUs.

  • init_type – Initialization type.

  • stride – Stride for the initialization.

  • lora_rank – The LoRA rank to use, if any.

reset_parameters() None[source]
property master_weight: Tensor
property master_bias: Tensor | None
forward(x: Tensor) Tensor[source]

Forward method.

Parameters:

x – input tensor of size (*, in_features)

Returns:

Output tensor of size (*, out_features // world_size), or (*, out_features) if gather_output is set to True.

class ml.models.parallel.RowParallelLinear(in_features: int, out_features: int, bias: bool = True, input_is_parallel: bool = False, init_type: Literal['orthogonal', 'normal', 'biased_normal', 'uniform', 'kaiming_uniform', 'kaiming_normal', 'xavier_uniform', 'xavier_normal', 'trunc_normal', 'dirac', 'constant', 'zeros', 'ones'] = 'xavier_normal', stride: int = 1)[source]

Bases: Module

A row parallel linear layer.

This layer splits the weight matrix along the input feature dimension, and each rank is only responsible for in_features // world_size number of input features.

This can be paired with a column parallel layer to create a model parallel two-stage linear layer.

Parameters:
  • in_features – Number of input features.

  • out_features – Number of output features.

  • bias – Whether to include a bias term.

  • input_is_parallel – Whether the input tensor is already split along the feature dimension.

  • init_type – Initialization type.

  • stride – Stride for the initialization.

reset_parameters() None[source]
property master_weight: Tensor
property master_bias: Tensor | None
forward(x: Tensor) Tensor[source]

Forward method.

Parameters:

x – input tensor of size (*, in_features), or (*, in_features // world_size) if input_is_parallel is set to True.

Returns:

Output tensor of size (*, out_features).