ml.utils.torch_distributed

Defines utilities for training distributed PyTorch models.

The canonical way to use this library is to call launch_subprocess() from the main function of your script. This will launch a subprocess for each device and initialize the process group for distributed training. You can modify the number of processes and the backend by changing the provided config.

class ml.utils.torch_distributed.MultiprocessConfig(rank: int = -1, local_rank: int = -1, world_size: int = '???', local_world_size: int = '???', master_addr: str = '127.0.0.1', master_port: int = '???', init_method: str = 'env://', model_parallelism: int = 1, pipeline_parallelism: int = 1, backend: str | None = None, model_parallel_backend: str | None = None, pipeline_parallel_backend: str | None = None, data_parallel_backend: str | None = None, launch_method: str = 'forkserver')[source]

Bases: object

rank: int = -1
local_rank: int = -1
world_size: int = '???'
local_world_size: int = '???'
master_addr: str = '127.0.0.1'
master_port: int = '???'
init_method: str = 'env://'
model_parallelism: int = 1
pipeline_parallelism: int = 1
backend: str | None = None
model_parallel_backend: str | None = None
pipeline_parallel_backend: str | None = None
data_parallel_backend: str | None = None
launch_method: str = 'forkserver'
classmethod resolve(config: MultiprocessConfig) None[source]
ml.utils.torch_distributed.init_process_group_from_backend(backend: str | Backend | None = None) None[source]
ml.utils.torch_distributed.init_dist(rank: int, local_rank: int, world_size: int, local_world_size: int, master_addr: str, master_port: int, init_method: str, backend: str | Backend | None = None) None[source]

Initializes distributed environment.

Parameters:
  • rank – The rank of the current process.

  • local_rank – The local rank of the current process.

  • world_size – The total number of processes.

  • local_world_size – The number of processes per machine.

  • master_addr – The address of the master process.

  • master_port – The port of the master process.

  • init_method – The initialization method.

  • backend – The distributed backend.

ml.utils.torch_distributed.default_backend() str[source]
ml.utils.torch_distributed.get_distributed_backend() Backend[source]
ml.utils.torch_distributed.set_distributed_backend(backend: str) None[source]
ml.utils.torch_distributed.init_and_run(func: ~typing.Callable[[~P], None], cfg: ~ml.utils.torch_distributed.MultiprocessConfig, *args: ~typing.~P, **kwargs: ~typing.~P) None[source]
ml.utils.torch_distributed.launch_subprocesses(func: ~typing.Callable[[~P], None], cfg: ~ml.utils.torch_distributed.MultiprocessConfig, setup: ~typing.Callable[[], None] | None = None, rank_offset: int = 0, *args: ~typing.~P, **kwargs: ~typing.~P) None[source]

Launches a function in multiple subprocesses.

Parameters:
  • func – The function to launch.

  • cfg – The configuration for the function.

  • args – The positional arguments to pass to the function.

  • setup – A function to run before launching the subprocesses.

  • rank_offset – The offset to add to the rank of each subprocess.

  • kwargs – The keyword arguments to pass to the function.

Raises:

RuntimeError – If the function fails in any subprocess.

ml.utils.torch_distributed.all_to_all(input: Tensor, group: ProcessGroup | None) Tensor[source]

Performs an all-to-all operation on the input tensor.

Parameters:
  • input – The input tensor.

  • group – The process group to use for the all-to-all operation.

Returns:

The output tensor.