Source code for ml.utils.video

# mypy: disable-error-code="import"
"""Defines utilites for saving and loading video streams.

The main API for using this module is:

.. code-block:: python

    from ml.utils.video import read_video, write_video

    def frame_iterator() -> Iterator[Tensor]:
        for frame in read_video("/path/to/video.mp4"):
            yield frame

    write_video(frame_iterator(), "/path/to/other/video.mp4")

This just uses FFMPEG so it should be reasonably quick.
"""

import asyncio
import functools
import re
import shutil
import warnings
from dataclasses import dataclass
from fractions import Fraction
from pathlib import Path
from typing import AsyncGenerator, Iterator, Literal

import av
import numpy as np
import torchvision.transforms.functional as F
from torch import Tensor

from ml.utils.image import as_uint8, standardize_image
from ml.utils.io import prefetch_samples


[docs]@functools.lru_cache() def ffmpeg_python_available() -> bool: try: import ffmpeg assert ffmpeg is not None # Silence unused import warning except ModuleNotFoundError: return False else: return True
[docs]@functools.lru_cache() def mpl_available() -> bool: try: import matplotlib assert matplotlib is not None # Silence unused import warning except ModuleNotFoundError: return False else: return True
[docs]@functools.lru_cache() def cv2_available() -> bool: try: import cv2 assert cv2 is not None # Silence unused import warning except ModuleNotFoundError: return False else: return True
[docs]@dataclass class VideoProps: frame_width: int frame_height: int frame_count: int fps: Fraction
[docs] @classmethod def from_file_av(cls, fpath: str | Path) -> "VideoProps": container = av.open(str(fpath)) stream = container.streams.video[0] return cls( frame_width=stream.width, frame_height=stream.height, frame_count=stream.frames, fps=Fraction(stream.average_rate), )
[docs] @classmethod def from_file_opencv(cls, fpath: str | Path) -> "VideoProps": try: import cv2 except ModuleNotFoundError as e: raise ModuleNotFoundError("Install OpenCV to use this function: `pip install opencv-python`") from e cap = cv2.VideoCapture(str(fpath)) return cls( frame_width=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), frame_height=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), frame_count=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), fps=Fraction(cap.get(cv2.CAP_PROP_FPS)), )
[docs] @classmethod def from_file_ffmpeg(cls, fpath: str | Path) -> "VideoProps": try: import ffmpeg except ModuleNotFoundError as e: raise ModuleNotFoundError("Install matplotlib to use this function: `pip install ffmpeg-python`") from e probe = ffmpeg.probe(str(fpath)) for stream in probe["streams"]: if stream["codec_type"] == "video": width, height, count = stream["width"], stream["height"], int(stream["nb_frames"]) fps_num, fps_denom = stream["r_frame_rate"].split("/") return cls( frame_width=width, frame_height=height, frame_count=count, fps=Fraction(int(fps_num), int(fps_denom)), ) raise ValueError(f"Could not parse video properties from video in {fpath}")
def _resample_video( video_chunks: Iterator[np.ndarray], *, prefetch_n: int = 1, ) -> Iterator[np.ndarray]: yield from prefetch_samples(video_chunks, prefetch_n)
[docs]def read_video_av( in_file: str | Path, *, target_dims: tuple[int | None, int | None] | None = None, ) -> Iterator[np.ndarray]: """Function that reads a video file to a stream of numpy arrays using PyAV. Args: in_file: The input video to read target_dims: If not None, resize each frame to this size Yields: Frames from the video as numpy arrays with shape (H, W, C) """ container = av.open(str(in_file)) for frame in container.decode(video=0): frame = frame.to_rgb().to_ndarray() if target_dims is not None: frame = F.resize(frame, target_dims[::-1]) yield as_uint8(frame)
[docs]def read_video_ffmpeg( in_file: str | Path, *, output_fmt: str = "rgb24", channels: int = 3, ) -> Iterator[np.ndarray]: """Function that reads a video file to a stream of numpy arrays using FFMPEG. Args: in_file: The input video to read output_fmt: The output image format channels: Number of output channels for each video frame Yields: Frames from the video as numpy arrays with shape (H, W, C) """ try: import ffmpeg except ModuleNotFoundError as e: raise ModuleNotFoundError("Install matplotlib to use this function: `pip install ffmpeg-python`") from e props = VideoProps.from_file_ffmpeg(in_file) stream = ffmpeg.input(str(in_file)) stream = ffmpeg.output(stream, "pipe:", format="rawvideo", pix_fmt=output_fmt, r=float(props.fps)) stream = ffmpeg.run_async(stream, pipe_stdout=True) while True: in_bytes = stream.stdout.read(props.frame_width * props.frame_height * channels) if not in_bytes: break yield np.frombuffer(in_bytes, np.uint8).reshape((props.frame_height, props.frame_width, channels)) stream.stdout.close() stream.wait()
[docs]async def read_video_with_timestamps_ffmpeg( in_file: str | Path, *, output_fmt: str = "rgb24", channels: int = 3, target_dims: tuple[int | None, int | None] | None = None, ) -> AsyncGenerator[tuple[np.ndarray, float], None]: """Like `read_video_ffmpeg` but also returns timestamps. Args: in_file: The input video to read output_fmt: The output image format channels: Number of output channels for each video frame target_dims: (width, height) dimensions for images being loaded, with None meaning that the aspect ratio should be kept the same Yields: Frames from the video as numpy arrays with shape (H, W, C), along with the frame timestamps """ try: import ffmpeg except ModuleNotFoundError as e: raise ModuleNotFoundError("Install matplotlib to use this function: `pip install ffmpeg-python`") from e props = VideoProps.from_file_ffmpeg(in_file) def aspect_ratio(x: int, a: int, b: int) -> int: return (x * a + b - 1) // b vf: list[str] = [] if target_dims is not None: width_opt, height_opt = target_dims if width_opt is None: assert height_opt is not None, "If width is None, height must not be None" width_opt = aspect_ratio(height_opt, props.frame_width, props.frame_height) if height_opt is None: assert width_opt is not None, "If height is None, width must not be None" height_opt = aspect_ratio(width_opt, props.frame_height, props.frame_width) assert (width := width_opt) is not None and (height := height_opt) is not None vf.append(f"scale={width}:{height}") else: width, height = props.frame_width, props.frame_height vf.append("showinfo") stream = ffmpeg.input(str(in_file)) stream = ffmpeg.output(stream, "pipe:", format="rawvideo", pix_fmt=output_fmt, r=float(props.fps), vf=",".join(vf)) stream = ffmpeg.run_async(stream, pipe_stdout=True, pipe_stderr=True) async def gen_frames() -> AsyncGenerator[np.ndarray, None]: while True: in_bytes = stream.stdout.read(height * width * channels) if not in_bytes: await asyncio.sleep(10) raise StopAsyncIteration frame = np.frombuffer(in_bytes, np.uint8).reshape((height, width, channels)) yield frame async def gen_timestamps() -> AsyncGenerator[float, None]: exp = re.compile(rb"n:\s*(\d+)\s*pts:\s*(\d+)\s*pts_time:\s*([\d\.]+)") while True: in_line = stream.stderr.readline() if not in_line: raise StopAsyncIteration exp_match = exp.search(in_line) if exp_match is None: continue _, _, pts_time = exp_match.groups() yield float(pts_time.decode("utf-8")) frame_iter, ts_iter = gen_frames(), gen_timestamps() try: while True: frame, ts = await asyncio.gather(frame_iter.__anext__(), ts_iter.__anext__()) yield frame, ts except StopAsyncIteration: stream.stdout.close() stream.stderr.close() stream.wait()
[docs]def read_video_opencv(in_file: str | Path) -> Iterator[np.ndarray]: """Reads a video as a stream using OpenCV. Args: in_file: The input video to read Yields: Frames from the video as numpy arrays with shape (H, W, C) """ try: import cv2 except ModuleNotFoundError as e: raise ModuleNotFoundError("Install OpenCV to use this function: `pip install opencv-python`") from e cap = cv2.VideoCapture(str(in_file)) while True: ret, buffer = cap.read() if not ret: cap.release() return yield buffer
[docs]def write_video_opencv( itr: Iterator[np.ndarray | Tensor], out_file: str | Path, *, keep_resolution: bool = False, fps: int | Fraction = 30, codec: str = "MP4V", ) -> None: """Function that writes a video from a stream of numpy arrays using OpenCV. Args: itr: The image iterator, yielding images with shape (H, W, C). out_file: The path to the output file. keep_resolution: If set, don't change the image resolution, otherwise resize to a human-friendly resolution. fps: Frames per second for the video. codec: FourCC code specifying OpenCV video codec type. Examples are MPEG, MP4V, DIVX, AVC1, H236. """ try: import cv2 except ModuleNotFoundError as e: raise ModuleNotFoundError("Install OpenCV to use this function: `pip install opencv-python`") from e Path(out_file).parent.mkdir(exist_ok=True, parents=True) first_img = standardize_image(next(itr), keep_resolution=keep_resolution) height, width, _ = first_img.shape fourcc = cv2.VideoWriter_fourcc(*codec) stream = cv2.VideoWriter(str(out_file), fourcc, fps if isinstance(fps, int) else round(fps), (width, height)) def write_frame(img: np.ndarray) -> None: stream.write(as_uint8(img)) write_frame(first_img) for img in itr: write_frame(standardize_image(img, keep_resolution=keep_resolution)) stream.release() cv2.destroyAllWindows()
[docs]def write_video_av( itr: Iterator[np.ndarray | Tensor], out_file: str | Path, *, keep_resolution: bool = False, fps: int | Fraction = 30, codec: str = "libx264", input_fmt: str = "rgb24", output_fmt: str = "yuv420p", ) -> None: """Function that writes an video from a stream of numpy arrays using PyAV. Args: itr: The image iterator, yielding images with shape (H, W, C). out_file: The path to the output file. keep_resolution: If set, don't change the image resolution, otherwise resize to a human-friendly resolution. fps: Frames per second for the video. codec: The video codec to use for the output video input_fmt: The input pixel format output_fmt: The output pixel format """ Path(out_file).parent.mkdir(exist_ok=True, parents=True) first_img = standardize_image(next(itr), keep_resolution=keep_resolution) output = av.open(out_file, "w", format="mp4") stream = output.add_stream(codec, rate=fps) stream.pix_fmt = output_fmt def write_frame(img: np.ndarray) -> None: frame = av.VideoFrame.from_ndarray(as_uint8(img), format=input_fmt) packet = stream.encode(frame) if packet is not None: output.mux(packet) write_frame(first_img) for img in itr: write_frame(standardize_image(img, keep_resolution=keep_resolution)) packet = stream.encode() if packet is not None: output.mux(packet) output.close()
[docs]def write_video_ffmpeg( itr: Iterator[np.ndarray | Tensor], out_file: str | Path, *, keep_resolution: bool = False, fps: int | Fraction = 30, out_fps: int | Fraction = 30, vcodec: str = "libx264", input_fmt: str = "rgb24", output_fmt: str = "yuv420p", ) -> None: """Function that writes an video from a stream of numpy arrays using FFMPEG. Args: itr: The image iterator, yielding images with shape (H, W, C). out_file: The path to the output file. keep_resolution: If set, don't change the image resolution, otherwise resize to a human-friendly resolution. fps: Frames per second for the video. out_fps: Frames per second for the saved video. vcodec: The video codec to use for the output video input_fmt: The input image format output_fmt: The output image format """ try: import ffmpeg except ModuleNotFoundError as e: raise ModuleNotFoundError("Install matplotlib to use this function: `pip install ffmpeg-python`") from e Path(out_file).parent.mkdir(exist_ok=True, parents=True) first_img = standardize_image(next(itr), keep_resolution=keep_resolution) height, width, _ = first_img.shape stream = ffmpeg.input("pipe:", format="rawvideo", pix_fmt=input_fmt, s=f"{width}x{height}", r=float(fps)) stream = ffmpeg.output(stream, str(out_file), pix_fmt=output_fmt, vcodec=vcodec, r=float(out_fps)) stream = ffmpeg.overwrite_output(stream) stream = ffmpeg.run_async(stream, pipe_stdin=True) def write_frame(img: np.ndarray) -> None: stream.stdin.write(as_uint8(img).tobytes()) # Writes all the video frames to the file. write_frame(first_img) for img in itr: write_frame(standardize_image(img, keep_resolution=keep_resolution)) stream.stdin.close() stream.wait()
[docs]def write_video_matplotlib( itr: Iterator[np.ndarray | Tensor], out_file: str | Path, *, keep_resolution: bool = False, dpi: int = 50, fps: int | Fraction = 30, title: str = "Video", comment: str | None = None, writer: str = "ffmpeg", ) -> None: """Function that writes an video from a stream of input tensors. Args: itr: The image iterator, yielding images with shape (H, W, C). out_file: The path to the output file. keep_resolution: If set, don't change the image resolution, otherwise resize to a human-friendly resolution. dpi: Dots per inch for output image. fps: Frames per second for the video. title: Title for the video metadata. comment: Comment for the video metadata. writer: The Matplotlib video writer to use (if you use the default one, make sure you have `ffmpeg` installed on your system). """ try: import matplotlib.animation as ani import matplotlib.pyplot as plt except ModuleNotFoundError as e: raise ModuleNotFoundError("Install matplotlib to use this function: `pip install matplotlib`") from e Path(out_file).parent.mkdir(exist_ok=True, parents=True) first_img = standardize_image(next(itr), keep_resolution=keep_resolution) height, width, _ = first_img.shape fig, ax = plt.subplots(figsize=(width / dpi, height / dpi)) # Ensures that there's no extra space around the image. fig.subplots_adjust( left=0, bottom=0, right=1, top=1, wspace=None, hspace=None, ) # Creates the writer with the given metadata. writer_obj = ani.writers[writer] metadata = { "title": title, "artist": __name__, "comment": comment, } mpl_writer = writer_obj( fps=fps if isinstance(fps, int) else round(fps), metadata={k: v for k, v in metadata.items() if v is not None}, ) with mpl_writer.saving(fig, out_file, dpi=dpi): im = ax.imshow(as_uint8(first_img), interpolation="nearest") mpl_writer.grab_frame() for img in itr: im.set_data(as_uint8(standardize_image(img, keep_resolution=keep_resolution))) mpl_writer.grab_frame()
Reader = Literal["ffmpeg", "av", "opencv"] Writer = Literal["ffmpeg", "matplotlib", "av", "opencv"]
[docs]def get_video_props(in_file: str | Path, *, reader: Reader = "av") -> VideoProps: if reader == "ffmpeg": if not shutil.which("ffmpeg"): warnings.warn("FFMPEG is not available in this system.") reader = "av" elif not ffmpeg_python_available(): warnings.warn("FFMPEG Python is not installed; install with `pip install ffmpeg-python`") reader = "av" else: return VideoProps.from_file_ffmpeg(in_file) if reader == "opencv": if not cv2_available(): warnings.warn("OpenCV is not installed; install with `pip install opencv-python`") reader = "av" else: return VideoProps.from_file_opencv(in_file) if reader == "av": return VideoProps.from_file_av(in_file) raise ValueError(f"Unknown reader {reader}")
[docs]def read_video( in_file: str | Path, *, prefetch_n: int = 1, reader: Reader = "av", ) -> Iterator[np.ndarray]: """Function that reads a video from a file to a stream of Numpy arrays. Args: in_file: The path to the input file. prefetch_n: Number of chunks to prefetch. reader: The video reader to use. Yields: The video frames as Numpy arrays. """ if reader == "ffmpeg": if not shutil.which("ffmpeg"): warnings.warn("FFMPEG is not available in the system.") reader = "av" elif not ffmpeg_python_available(): warnings.warn("FFMPEG Python is not installed; install with `pip install ffmpeg-python`") reader = "av" else: return _resample_video( read_video_ffmpeg(in_file), prefetch_n=prefetch_n, ) if reader == "opencv": if not cv2_available(): warnings.warn("OpenCV is not installed; install with `pip install opencv-python`") reader = "av" else: return _resample_video( read_video_opencv(in_file), prefetch_n=prefetch_n, ) if reader == "av": return _resample_video( read_video_av(in_file), prefetch_n=prefetch_n, ) raise ValueError(f"Invalid reader: {reader}")
[docs]def write_video( itr: Iterator[np.ndarray | Tensor], out_file: str | Path, *, fps: int | Fraction = 30, keep_resolution: bool = False, writer: Writer = "av", ) -> None: """Function that writes an video from a stream of input tensors. Args: itr: The image iterator, yielding images with shape (H, W, C). out_file: The path to the output file. fps: Frames per second for the video. keep_resolution: If set, don't change the image resolution, otherwise resize to a human-friendly resolution. writer: The video writer to use. Raises: ValueError: If the writer is invalid. """ if writer == "ffmpeg": if not shutil.which("ffmpeg"): warnings.warn("FFMPEG is not available in the system.") writer = "av" elif not ffmpeg_python_available(): warnings.warn("FFMPEG Python is not installed; install with `pip install ffmpeg-python`") writer = "av" else: write_video_ffmpeg(itr, out_file, fps=fps, keep_resolution=keep_resolution) return if writer == "matplotlib": if not shutil.which("ffmpeg"): warnings.warn("FFMPEG is not available in the system.") writer = "av" elif not mpl_available(): warnings.warn("Matplotlib is not available in the system.") writer = "av" else: write_video_matplotlib(itr, out_file, fps=fps, keep_resolution=keep_resolution) return if writer == "opencv": if not cv2_available(): warnings.warn("OpenCV is not installed; install with `pip install opencv-python`") writer = "av" else: write_video_opencv(itr, out_file, fps=fps, keep_resolution=keep_resolution) return if writer == "av": write_video_av(itr, out_file, fps=fps, keep_resolution=keep_resolution) return raise ValueError(f"Invalid writer: {writer}")