"""Defines general-purpose helper functions for initializing norm layers.
.. highlight:: python
.. code-block:: python
from ml.models.norms import get_norm_linear, get_norm_1d, get_norm_2d, get_norm_3d, cast_norm_type
linear = nn.Sequential(nn.Linear(32, 32), get_norm_linear("layer", dim=32))
conv_1d = nn.Sequential(nn.Conv1d(32, 32, 3), get_norm_1d("layer", dim=32, groups=4))
conv_2d = nn.Sequential(nn.Conv2d(32, 32, 3), get_norm_2d("layer", dim=32, groups=4))
conv_3d = nn.Sequential(nn.Conv3d(32, 32, 3), get_norm_3d("layer", dim=32, groups=4))
# This lets you parametrize the norm type as a string.
linear = nn.Sequential(nn.Linear(32, 32), get_norm_linear(cast_norm_type(my_norm), dim=32))
Choices for the norm type are:
- ``"no_norm"``: No normalization
- ``"batch"`` or ``"batch_affine"``: Batch normalization
- ``"instance"`` or ``"instance_affine"``: Instance normalization
- ``"group"`` or ``"group_affine"``: Group normalization
- ``"layer"`` or ``"layer_affine"``: Layer normalization
Note that instance norm and group norm are not available for linear layers.
"""
from typing import Literal, TypeVar, cast, get_args
import torch
from torch import Tensor, nn
T_module = TypeVar("T_module", bound=nn.Module)
NormType = Literal[
"no_norm",
"batch",
"batch_affine",
"instance",
"instance_affine",
"group",
"group_affine",
"layer",
"layer_affine",
]
ParametrizationNormType = Literal[
"no_norm",
"weight",
"spectral",
]
[docs]def cast_norm_type(s: str) -> NormType:
args = get_args(NormType)
assert s in args, f"Invalid norm type: '{s}' Valid options are {args}"
return cast(NormType, s)
[docs]def cast_parametrize_norm_type(s: str) -> ParametrizationNormType:
args = get_args(ParametrizationNormType)
assert s in args, f"Invalid parametrization norm type: '{s}' Valid options are {args}"
return cast(ParametrizationNormType, s)
[docs]class LastBatchNorm(nn.Module):
"""Applies batch norm along final dimension without transposing the tensor.
The normalization is pretty simple, it basically just tracks the running
mean and variance for each channel, then normalizes each channel to have
a unit normal distribution.
Input:
x: Tensor with shape (..., N)
Output:
The tensor, normalized by the running mean and variance
"""
__constants__ = ["channels", "momentum", "affine", "eps"]
mean: Tensor
var: Tensor
def __init__(
self,
channels: int,
momentum: float = 0.99,
affine: bool = True,
eps: float = 1e-4,
device: torch.device | None = None,
dtype: torch.dtype | None = None,
) -> None:
super().__init__()
self.channels = channels
self.momentum = momentum
self.affine = affine
self.eps = eps
if dtype is None:
mean_tensor = torch.zeros(channels, device=device)
var_tensor = torch.ones(channels, device=device)
else:
mean_tensor = torch.zeros(channels, device=device, dtype=dtype)
var_tensor = torch.ones(channels, device=device, dtype=dtype)
self.register_buffer("mean", mean_tensor)
self.register_buffer("var", var_tensor)
if self.affine:
self.affine_transform = nn.Linear(channels, channels, device=device, dtype=dtype)
[docs] def forward(self, x: Tensor) -> Tensor:
if self.affine:
x = self.affine_transform(x)
if self.training:
x_flat = x.flatten(0, -2)
mean, var = x_flat.mean(dim=0).detach(), x_flat.var(dim=0).detach()
new_mean = mean * (1 - self.momentum) + self.mean * self.momentum
new_var = var * (1 - self.momentum) + self.var * self.momentum
x_out = (x - new_mean.expand_as(x)) / (new_var.expand_as(x) + self.eps)
self.mean.copy_(new_mean, non_blocking=True)
self.var.copy_(new_var, non_blocking=True)
else:
x_out = (x - self.mean.expand_as(x)) / (self.var.expand_as(x) + self.eps)
return x_out
[docs]class ConvLayerNorm(nn.Module):
__constants__ = ["channels", "eps", "elementwise_affine", "static_shape"]
def __init__(
self,
channels: int,
*,
dims: int | None = None,
eps: float = 1e-5,
elementwise_affine: bool = True,
device: torch.device | None = None,
dtype: torch.dtype | None = None,
) -> None:
super().__init__()
self.channels = channels
self.eps = eps
self.elementwise_affine = elementwise_affine
if self.elementwise_affine:
if dtype is None:
self.weight = nn.Parameter(torch.empty(self.channels, device=device))
self.bias = nn.Parameter(torch.empty(self.channels, device=device))
else:
self.weight = nn.Parameter(torch.empty(self.channels, device=device, dtype=dtype))
self.bias = nn.Parameter(torch.empty(self.channels, device=device, dtype=dtype))
else:
self.register_parameter("weight", None)
self.register_parameter("bias", None)
self.static_shape = None if dims is None else (1, -1) + (1,) * dims
self.reset_parameters()
[docs] def reset_parameters(self) -> None:
if self.elementwise_affine:
nn.init.ones_(self.weight)
nn.init.zeros_(self.bias)
[docs] def forward(self, inputs: Tensor) -> Tensor:
mean = inputs.mean(dim=1, keepdim=True)
var = torch.square(inputs - mean).mean(dim=1, keepdim=True)
normalized_inputs = (inputs - mean) / (var + self.eps).sqrt()
if self.elementwise_affine:
if self.static_shape is None:
weight = self.weight.unflatten(0, (-1,) + (1,) * (len(inputs.shape) - 2))
bias = self.bias.unflatten(0, (-1,) + (1,) * (len(inputs.shape) - 2))
else:
weight = self.weight.view(self.static_shape)
bias = self.bias.view(self.static_shape)
normalized_inputs = normalized_inputs * weight + bias
return normalized_inputs
[docs]def get_norm_1d(
norm: NormType,
*,
dim: int | None = None,
groups: int | None = None,
device: torch.device | None = None,
dtype: torch.dtype | None = None,
) -> nn.Module:
"""Returns a normalization layer for tensors with shape (B, C, T).
Args:
norm: The norm type to use
dim: The number of dimensions in the input tensor
groups: The number of groups to use for group normalization
device: The device to use for the layer
dtype: The dtype to use for the layer
Returns:
A normalization layer
Raises:
NotImplementedError: If `norm` is not a valid 1D norm type
"""
match norm:
case "no_norm":
return nn.Identity()
case "batch" | "batch_affine":
if dim is None:
return nn.LazyBatchNorm1d(affine=norm == "batch_affine", device=device, dtype=dtype)
return nn.BatchNorm1d(dim, affine=norm == "batch_affine", device=device, dtype=dtype)
case "instance" | "instance_affine":
if dim is None:
return nn.LazyInstanceNorm1d(affine=norm == "instance_affine", device=device, dtype=dtype)
return nn.InstanceNorm1d(dim, affine=norm == "instance_affine", device=device, dtype=dtype)
case "group" | "group_affine":
assert dim is not None, "`dim` is required for group norm"
assert groups is not None, "`groups` is required for group norm"
return nn.GroupNorm(groups, dim, affine=norm == "group_affine", device=device, dtype=dtype)
case "layer" | "layer_affine":
assert dim is not None
return ConvLayerNorm(dim, dims=1, elementwise_affine=norm == "layer_affine", device=device, dtype=dtype)
case _:
raise NotImplementedError(f"Invalid 1D norm type: {norm}")
[docs]def get_norm_2d(
norm: NormType,
*,
dim: int | None = None,
groups: int | None = None,
device: torch.device | None = None,
dtype: torch.dtype | None = None,
) -> nn.Module:
"""Returns a normalization layer for tensors with shape (B, C, H, W).
Args:
norm: The norm type to use
dim: The number of dimensions in the input tensor
groups: The number of groups to use for group normalization
device: The device to use for the layer
dtype: The dtype to use for the layer
Returns:
A normalization layer
Raises:
NotImplementedError: If `norm` is not a valid 2D norm type
"""
match norm:
case "no_norm":
return nn.Identity()
case "batch" | "batch_affine":
if dim is None:
return nn.LazyBatchNorm2d(affine=norm == "batch_affine", device=device, dtype=dtype)
return nn.BatchNorm2d(dim, affine=norm == "batch_affine", device=device, dtype=dtype)
case "instance" | "instance_affine":
if dim is None:
return nn.LazyInstanceNorm2d(affine=norm == "instance_affine", device=device, dtype=dtype)
return nn.InstanceNorm2d(dim, affine=norm == "instance_affine", device=device, dtype=dtype)
case "group" | "group_affine":
assert dim is not None, "`dim` is required for group norm"
assert groups is not None, "`groups` is required for group norm"
return nn.GroupNorm(groups, dim, affine=norm == "group_affine", device=device, dtype=dtype)
case "layer" | "layer_affine":
assert dim is not None
return ConvLayerNorm(dim, dims=2, elementwise_affine=norm == "layer_affine", device=device, dtype=dtype)
case _:
raise NotImplementedError(f"Invalid 2D norm type: {norm}")
[docs]def get_norm_3d(
norm: NormType,
*,
dim: int | None = None,
groups: int | None = None,
device: torch.device | None = None,
dtype: torch.dtype | None = None,
) -> nn.Module:
"""Returns a normalization layer for tensors with shape (B, C, D, H, W).
Args:
norm: The norm type to use
dim: The number of dimensions in the input tensor
groups: The number of groups to use for group normalization
device: The device to use for the layer
dtype: The dtype to use for the layer
Returns:
A normalization layer
Raises:
NotImplementedError: If `norm` is not a valid 3D norm type
"""
match norm:
case "no_norm":
return nn.Identity()
case "batch" | "batch_affine":
if dim is None:
return nn.LazyBatchNorm3d(affine=norm == "batch_affine", device=device, dtype=dtype)
return nn.BatchNorm3d(dim, affine=norm == "batch_affine", device=device, dtype=dtype)
case "instance" | "instance_affine":
if dim is None:
return nn.LazyInstanceNorm3d(affine=norm == "instance_affine", device=device, dtype=dtype)
return nn.InstanceNorm3d(dim, affine=norm == "instance_affine", device=device, dtype=dtype)
case "group" | "group_affine":
assert dim is not None, "`dim` is required for group norm"
assert groups is not None, "`groups` is required for group norm"
return nn.GroupNorm(groups, dim, affine=norm == "group_affine", device=device, dtype=dtype)
case "layer" | "layer_affine":
assert dim is not None
return ConvLayerNorm(dim, dims=3, elementwise_affine=norm == "layer_affine", device=device, dtype=dtype)
case _:
raise NotImplementedError(f"Invalid 3D norm type: {norm}")
[docs]def get_norm_linear(
norm: NormType,
*,
dim: int | None = None,
device: torch.device | None = None,
dtype: torch.dtype | None = None,
) -> nn.Module:
"""Returns a normalization layer for tensors with shape (B, ..., C).
Args:
norm: The norm type to use
dim: The number of dimensions in the input tensor
device: The device to use for the layer
dtype: The dtype to use for the layer
Returns:
A normalization layer
Raises:
NotImplementedError: If `norm` is not a valid linear norm type
"""
match norm:
case "no_norm":
return nn.Identity()
case "batch" | "batch_affine":
assert dim is not None, "`dim` is required for batch norm"
return LastBatchNorm(dim, affine=norm == "batch_affine", device=device, dtype=dtype)
case "layer" | "layer_affine":
assert dim is not None, "`dim` is required for layer norm"
return nn.LayerNorm(dim, elementwise_affine=norm == "layer_affine", device=device, dtype=dtype)
case _:
raise NotImplementedError(f"Invalid linear norm type: {norm}")
[docs]def get_parametrization_norm(
module: T_module,
norm: ParametrizationNormType,
*,
name: str = "weight",
n_power_iterations: int = 1,
eps: float = 1e-12,
weight_dim: int = 0,
spectral_dim: int | None = None,
) -> T_module:
"""Returns a parametrized version of the module.
Args:
module: The module to parametrize
norm: The parametrization norm type to use
name: The name of the parameter to use for the parametrization; this
should reference the name on the module (for instance, ``weight``
for a ``nn.Linear`` module)
n_power_iterations: The number of power iterations to use for spectral
normalization
eps: The epsilon value to use for spectral normalization
weight_dim: The dimension of the weight parameter to normalize when
using weight normalization
spectral_dim: The dimension of the weight parameter to normalize when
using spectral normalization
Returns:
The parametrized module
"""
match norm:
case "no_norm":
return module
case "weight":
return nn.utils.weight_norm(
module,
name=name,
dim=weight_dim,
)
case "spectral":
return nn.utils.spectral_norm(
module,
name=name,
n_power_iterations=n_power_iterations,
eps=eps,
dim=spectral_dim,
)
case _:
raise NotImplementedError(f"Invalid parametrization norm type: {norm}")