ml.tasks.environments.worker

Defines workers for reinforcement learning environments.

ml.tasks.environments.worker.clear_queue(q: Queue) None[source]
ml.tasks.environments.worker.cast_worker_mode(m: str) Literal['thread', 'process'][source]
class ml.tasks.environments.worker.BaseEnvironmentWorker[source]

Bases: ABC, Generic[RLState, RLAction]

abstract cleanup() None[source]

Cleanup the worker.

abstract get_state() RLState | Literal['terminated'][source]

Returns the current environment states.

Returns:

The current environment state.

abstract send_action(action: RLAction | Literal['reset', 'close']) None[source]

Sends an action to the environment.

Parameters:

action – The action to send to the environment

abstract classmethod from_environment(env: Environment[RLState, RLAction], num_workers: int) Sequence[BaseEnvironmentWorker[RLState, RLAction]][source]

Creates a worker from an environment.

Parameters:
  • env – The environment to create the worker from.

  • num_workers – The number of workers to create.

Returns:

The workers.

class ml.tasks.environments.worker.SyncEnvironmentWorker(env: Environment[RLState, RLAction], seed: int = 1337)[source]

Bases: BaseEnvironmentWorker[RLState, RLAction], Generic[RLState, RLAction]

Defines a synchronous environment worker.

Parameters:
  • env – The environment to wrap.

  • seed – The random seed to use.

classmethod from_environment(env: Environment[RLState, RLAction], num_workers: int) Sequence[SyncEnvironmentWorker[RLState, RLAction]][source]

Creates a worker from an environment.

Parameters:
  • env – The environment to create the worker from.

  • num_workers – The number of workers to create.

Returns:

The workers.

cleanup() None[source]

Cleanup the worker.

get_state() RLState | Literal['terminated'][source]

Returns the current environment states.

Returns:

The current environment state.

send_action(action: RLAction | Literal['reset', 'close']) None[source]

Sends an action to the environment.

Parameters:

action – The action to send to the environment

class ml.tasks.environments.worker.AsyncEnvironmentWorker(env: Environment[RLState, RLAction], manager: SyncManager, rank: int | None = None, world_size: int | None = None, seed: int = 1337, cleanup_time: float = 5.0, mode: Literal['thread', 'process'] = 'process', daemon: bool = True)[source]

Bases: BaseEnvironmentWorker[RLState, RLAction], Generic[RLState, RLAction]

Defines an asynchronous environment worker.

This worker either runs in a separate thread or process, and is used to asynchronously interact with an environment. This is useful for environments that are slow to interact with, such as a simulator.

Parameters:
  • env – The environment to wrap.

  • manager – The manager to use for shared memory.

  • rank – The rank of the worker.

  • world_size – The number of workers.

  • seed – The random seed to use.

  • cleanup_time – The time to wait for the worker to finish before killing it.

  • mode – The mode to use for the worker.

  • daemon – Whether to run the worker as a daemon.

Raises:

ValueError – If the mode is invalid.

classmethod from_environment(env: Environment[RLState, RLAction], num_workers: int) Sequence[AsyncEnvironmentWorker[RLState, RLAction]][source]

Creates a worker from an environment.

Parameters:
  • env – The environment to create the worker from.

  • num_workers – The number of workers to create.

Returns:

The workers.

cleanup() None[source]

Cleanup the worker.

get_state() RLState | Literal['terminated'][source]

Returns the current environment states.

Returns:

The current environment state.

send_action(action: RLAction | Literal['reset', 'close']) None[source]

Sends an action to the environment.

Parameters:

action – The action to send to the environment

class ml.tasks.environments.worker.WorkerPool[source]

Bases: Generic[RLState, RLAction]

abstract reset() None[source]
get_state(timeout: float) tuple[Union[RLState, Literal['terminated']], int] | None[source]
get_state(timeout: None = None) tuple[Union[RLState, Literal['terminated']], int]
abstract send_action(action: RLAction | Literal['reset', 'close'], worker_id: int) None[source]

Sends an action to the given worker.

Parameters:
  • action – The action to send.

  • worker_id – The ID of the worker to send the action to.

abstract classmethod from_workers(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]]) WorkerPool[RLState, RLAction][source]

Creates a worker pool from a list of workers.

Parameters:

workers – The list of workers.

Returns:

The worker pool.

class ml.tasks.environments.worker.SyncWorkerPool(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]])[source]

Bases: WorkerPool[RLState, RLAction], Generic[RLState, RLAction]

reset() None[source]
send_action(action: RLAction | Literal['reset', 'close'], worker_id: int) None[source]

Sends an action to the given worker.

Parameters:
  • action – The action to send.

  • worker_id – The ID of the worker to send the action to.

classmethod from_workers(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]]) SyncWorkerPool[RLState, RLAction][source]

Creates a worker pool from a list of workers.

Parameters:

workers – The list of workers.

Returns:

The worker pool.

class ml.tasks.environments.worker.AsyncWorkerPool(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]], daemon: bool = True)[source]

Bases: WorkerPool[RLState, RLAction], Generic[RLState, RLAction]

cleanup() None[source]
reset() None[source]
send_action(action: RLAction | Literal['reset', 'close'], worker_id: int) None[source]

Sends an action to the given worker.

Parameters:
  • action – The action to send.

  • worker_id – The ID of the worker to send the action to.

classmethod from_workers(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]]) AsyncWorkerPool[RLState, RLAction][source]

Creates a worker pool from a list of workers.

Parameters:

workers – The list of workers.

Returns:

The worker pool.

ml.tasks.environments.worker.get_worker_pool(workers: Sequence[BaseEnvironmentWorker[RLState, RLAction]], force_sync: bool = False) WorkerPool[RLState, RLAction][source]