ml.tasks.environments.worker
Defines workers for reinforcement learning environments.
- class ml.tasks.environments.worker.BaseEnvironmentWorker[source]
Bases:
ABC
,Generic
[RLState
,RLAction
]- abstract get_state() RLState | Literal['terminated'] [source]
Returns the current environment states.
- Returns:
The current environment state.
- abstract send_action(action: RLAction | Literal['reset', 'close']) None [source]
Sends an action to the environment.
- Parameters:
action – The action to send to the environment
- abstract classmethod from_environment(env: Environment[RLState, RLAction], num_workers: int) Sequence[BaseEnvironmentWorker[RLState, RLAction]] [source]
Creates a worker from an environment.
- Parameters:
env – The environment to create the worker from.
num_workers – The number of workers to create.
- Returns:
The workers.
- class ml.tasks.environments.worker.SyncEnvironmentWorker(env: Environment[RLState, RLAction], seed: int = 1337)[source]
Bases:
BaseEnvironmentWorker
[RLState
,RLAction
],Generic
[RLState
,RLAction
]Defines a synchronous environment worker.
- Parameters:
env – The environment to wrap.
seed – The random seed to use.
- classmethod from_environment(env: Environment[RLState, RLAction], num_workers: int) Sequence[SyncEnvironmentWorker[RLState, RLAction]] [source]
Creates a worker from an environment.
- Parameters:
env – The environment to create the worker from.
num_workers – The number of workers to create.
- Returns:
The workers.
- class ml.tasks.environments.worker.AsyncEnvironmentWorker(env: Environment[RLState, RLAction], manager: SyncManager, rank: int | None = None, world_size: int | None = None, seed: int = 1337, cleanup_time: float = 5.0, mode: Literal['thread', 'process'] = 'process', daemon: bool = True)[source]
Bases:
BaseEnvironmentWorker
[RLState
,RLAction
],Generic
[RLState
,RLAction
]Defines an asynchronous environment worker.
This worker either runs in a separate thread or process, and is used to asynchronously interact with an environment. This is useful for environments that are slow to interact with, such as a simulator.
- Parameters:
env – The environment to wrap.
manager – The manager to use for shared memory.
rank – The rank of the worker.
world_size – The number of workers.
seed – The random seed to use.
cleanup_time – The time to wait for the worker to finish before killing it.
mode – The mode to use for the worker.
daemon – Whether to run the worker as a daemon.
- Raises:
ValueError – If the mode is invalid.
- classmethod from_environment(env: Environment[RLState, RLAction], num_workers: int) Sequence[AsyncEnvironmentWorker[RLState, RLAction]] [source]
Creates a worker from an environment.
- Parameters:
env – The environment to create the worker from.
num_workers – The number of workers to create.
- Returns:
The workers.
- class ml.tasks.environments.worker.WorkerPool[source]
Bases:
Generic
[RLState
,RLAction
]- get_state(timeout: float) tuple[Union[RLState, Literal['terminated']], int] | None [source]
- get_state(timeout: None = None) tuple[Union[RLState, Literal['terminated']], int]
- abstract send_action(action: RLAction | Literal['reset', 'close'], worker_id: int) None [source]
Sends an action to the given worker.
- Parameters:
action – The action to send.
worker_id – The ID of the worker to send the action to.
- abstract classmethod from_workers(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]]) WorkerPool[RLState, RLAction] [source]
Creates a worker pool from a list of workers.
- Parameters:
workers – The list of workers.
- Returns:
The worker pool.
- class ml.tasks.environments.worker.SyncWorkerPool(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]])[source]
Bases:
WorkerPool
[RLState
,RLAction
],Generic
[RLState
,RLAction
]- send_action(action: RLAction | Literal['reset', 'close'], worker_id: int) None [source]
Sends an action to the given worker.
- Parameters:
action – The action to send.
worker_id – The ID of the worker to send the action to.
- classmethod from_workers(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]]) SyncWorkerPool[RLState, RLAction] [source]
Creates a worker pool from a list of workers.
- Parameters:
workers – The list of workers.
- Returns:
The worker pool.
- class ml.tasks.environments.worker.AsyncWorkerPool(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]], daemon: bool = True)[source]
Bases:
WorkerPool
[RLState
,RLAction
],Generic
[RLState
,RLAction
]- send_action(action: RLAction | Literal['reset', 'close'], worker_id: int) None [source]
Sends an action to the given worker.
- Parameters:
action – The action to send.
worker_id – The ID of the worker to send the action to.
- classmethod from_workers(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]]) AsyncWorkerPool[RLState, RLAction] [source]
Creates a worker pool from a list of workers.
- Parameters:
workers – The list of workers.
- Returns:
The worker pool.
- ml.tasks.environments.worker.get_worker_pool(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]], force_sync: bool = False) WorkerPool[RLState, RLAction] [source]