ml.tasks.datasets.streaming

Defines a dataset which combines many streaming datasets.

This dataset takes a set of child iterable datasets and iterates from them infinitely. When a child dataset is exhausted, it is returned to the reservoir and restarted, while another dataset is chosen.

class ml.tasks.datasets.streaming.StreamingDataset(datasets: Collection[IterableDataset[Batch]], max_simultaneous: int)[source]

Bases: IterableDataset[tuple[int, Batch]], Generic[Batch]

Defines a dataset which combines many streaming datasets.

This dataset takes a set of child iterable datasets and iterates from them infinitely. When a child dataset is exhausted, it is returned to the reservoir and restarted, while another dataset is chosen.

An example usage for this dataset is to get samples from many videos, where each sub-dataset yields video samples. This way the child dataset can be used to run inference on a single video, while the parent streaming dataset can be used to train on a mixture of videos. The child dataset can then be optimized to make video loading times fast.

Initializes a new streaming dataset.

Parameters:
  • datasets – The sub-datasets to iterate from

  • max_simultaneous – The maximum number of simultaneous datasets to iterate from. Increasing this number increases the dataset diversity but also increases memory usage as samples need to be stored in memory

Raises:

ValueError – If no datasets are provided

worker_datasets: dict[int, torch.utils.data.dataset.IterableDataset[Batch]]
iterators: dict[int, Iterator[Batch]]
reservoir: list[int]
reservoir_pointer: int
swap_reservoir(a: int, b: int) None[source]
fill_reservoir() None[source]
sample_reservoir_id() int[source]
return_dataset(reservoir_id: int) None[source]
class ml.tasks.datasets.streaming.StreamingDatasetNoIndex(datasets: Collection[IterableDataset[Batch]], max_simultaneous: int)[source]

Bases: StreamingDataset[Batch], IterableDataset[tuple[int, Batch]], Generic[Batch]

Defines a streaming dataset which only yields the batch.

This dataset is identical to the StreamingDataset, except that it cuts off the dataset index and only yields the batch.

Initializes a new streaming dataset.

Parameters:
  • datasets – The sub-datasets to iterate from

  • max_simultaneous – The maximum number of simultaneous datasets to iterate from. Increasing this number increases the dataset diversity but also increases memory usage as samples need to be stored in memory

Raises:

ValueError – If no datasets are provided