# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.
"""Basic Reinforcement Learning (RL) Environment."""
from typing import Tuple, Optional, Any
import gym
from gym.spaces import Box, Discrete
import numpy as np
from sklearn.utils import check_scalar, check_random_state
from .simulator.base import BaseStateTransitionFunction
from .simulator.function import StateTransitionFunction
from .simulator.base import BaseRewardFunction
from .simulator.function import RewardFunction
from ..utils import check_array
from ..types import Action
[docs]class BasicEnv(gym.Env):
"""Class for a basic environment for reinforcement learning (RL) agent to interact.
Bases: :class:`gym.Env`
Imported as: :class:`basicgym.BasicEnv`
Note
-------
SyntheticGym works with OpenAI Gym and Gymnasium-like interface. See Examples below for the usage.
Markov Decision Process (CMDP) definition are given as follows:
timestep: int (> 0)
state: array-like of shape (state_dim, )
action: int, float, or array-like of shape (action_dim, )
reward: bool or continuous
discount_rate: float
Parameters
-------
step_per_episode: int, default=10 (> 0)
Number of timesteps in an episode.
state_dim: int, default=5 (> 0)
Dimension of the state.
action_type: {"discrete", "continuous"}, default="continuous"
Type of the action space.
action_dim: int
Dimension of the action (context).
n_actions: int, default=10 (> 0)
Number of actions in the discrete action case.
action_context: array-like of shape (n_actions, action_dim), default=None
Feature vectors that characterizes each action. Applicable only when action_type is "discrete".
reward_type: {"continuous", "binary"}, default="continuous"
Reward type.
reward_std: float, default=0.0 (>=0)
Noise level of the reward. Applicable only when reward_type is "continuous".
obs_std: float, default=0.0 (>=0)
Noise level of the state observation.
StateTransitionFunction: BaseStateTransitionFunction, default=StateTransitionFunction
State transition function. Both class and instance are acceptable.
RewardFunction: BaseRewardFunction, default=RewardFunction
Expected immediate reward function. Both class and instance are acceptable.
random_state: int, default=None (>= 0)
Random state.
Examples
-------
Setup:
.. code-block:: python
# import necessary module from syntheticgym
from syntheticgym import SyntheticEnv
from scope_rl.policy import OnlineHead
from scope_rl.ope.online import calc_on_policy_policy_value
# import necessary module from other libraries
from d3rlpy.algos import RandomPolicy
from d3rlpy.preprocessing import MinMaxActionScaler
# initialize environment
env = SyntheticEnv(random_state=12345)
# the following commands also work
# import gym
# env = gym.make("SyntheticEnv-continuous-v0")
# define (RL) agent (i.e., policy)
agent = OnlineHead(
RandomPolicy(
action_scaler=MinMaxActionScaler(
minimum=0.1,
maximum=10,
)
),
name="random",
)
agent.build_with_env(env)
Interaction:
.. code-block:: python
# OpenAI Gym and Gymnasium-like interaction with agent
for episode in range(1000):
obs, info = env.reset()
done = False
while not done:
action = agent.predict_online(obs)
obs, reward, done, truncated, info = env.step(action)
Online Evaluation:
.. code-block:: python
# calculate on-policy policy value
on_policy_performance = calc_on_policy_policy_value(
env,
agent,
n_trajectories=100,
random_state=12345
)
Output:
.. code-block:: python
>>> on_policy_performance
27.59
References
-------
Greg Brockman, Vicki Cheung, Ludwig Pettersson, Jonas Schneider, John Schulman, Jie Tang, and Wojciech Zaremba.
"OpenAI Gym." 2016.
"""
def __init__(
self,
step_per_episode: int = 10,
state_dim: int = 5,
action_type: str = "continuous", # "discrete"
n_actions: int = 10, # Applicable only when action_type is "discrete"
action_dim: int = 3,
action_context: Optional[
np.ndarray
] = None, # Applicable only when action_type is "discrete"
reward_type: str = "continuous", # "binary"
reward_std: float = 0.0,
obs_std: float = 0.0,
StateTransitionFunction: BaseStateTransitionFunction = StateTransitionFunction,
RewardFunction: BaseRewardFunction = RewardFunction,
random_state: Optional[int] = None,
):
super().__init__()
if random_state is None:
raise ValueError("random_state must be given")
self.random_ = check_random_state(random_state)
check_scalar(
state_dim,
name="state_dim",
target_type=int,
min_val=1,
)
self.state_dim = state_dim
check_scalar(
n_actions,
name="n_actions",
target_type=int,
min_val=1,
)
self.n_actions = n_actions
check_scalar(
action_dim,
name="action_dim",
target_type=int,
min_val=1,
)
self.action_dim = action_dim
check_scalar(
obs_std,
name="obs_std",
target_type=float,
min_val=0.0,
)
self.obs_std = obs_std
check_scalar(
step_per_episode,
name="step_per_episode",
target_type=int,
min_val=1,
)
self.step_per_episode = step_per_episode
# define observation space
self.observation_space = Box(
low=np.full(state_dim, -1.0),
high=np.full(state_dim, 1.0),
dtype=float,
)
# define action space
if action_type not in ["continuous", "discrete"]:
raise ValueError(
f'action_type must be either "continuous" or "discrete", but {action_type} is given'
)
if action_type == "continuous":
self.action_type = "continuous"
self.action_space = Box(
low=-1.0, high=1.0, shape=(action_dim,), dtype=float
)
elif action_type == "discrete":
self.action_type = "discrete"
self.action_space = Discrete(n_actions)
if action_context is None:
action_context = self.random_.normal(
loc=0.0, scale=1.0, size=(n_actions, action_dim)
)
check_array(
action_context,
name="action_context",
expected_dim=2,
)
if action_context.shape != (n_actions, action_dim):
raise ValueError(
f"The shape of action_context must be (n_actions, action_dim), but found {action_context.shape}."
)
self.action_context = action_context
if isinstance(StateTransitionFunction, BaseStateTransitionFunction):
self.state_transition_function = StateTransitionFunction
elif issubclass(StateTransitionFunction, BaseStateTransitionFunction):
self.state_transition_function = StateTransitionFunction(
state_dim=state_dim,
action_dim=action_dim,
random_state=random_state,
)
else:
raise ValueError(
"StateTransitionFunction must be a child class of BaseStateTransitionFunction"
)
if isinstance(RewardFunction, BaseRewardFunction):
self.reward_function = RewardFunction
elif issubclass(RewardFunction, BaseRewardFunction):
self.reward_function = RewardFunction(
state_dim=state_dim,
action_dim=action_dim,
reward_type=reward_type,
reward_std=reward_std,
random_state=random_state,
)
else:
raise ValueError(
"RewardFunction must be a child class of BaseRewardFunction"
)
# define reward range
if reward_type not in ["continuous", "binary"]:
raise ValueError(
f'reward_type must be either "continuous" or "binary", but {reward_type} is given'
)
if reward_type == "continuous":
self.reward_range = (-np.inf, np.inf)
else:
self.reward_range = (0, 1)
check_scalar(
reward_std,
name="reward_std",
target_type=float,
min_val=0.0,
)
self.reward_type = reward_type
self.reward_std = reward_std
def _observation(self, state: np.ndarray):
"""Add a observation noise."""
obs = self.random_.normal(loc=state, scale=self.obs_std)
return obs
[docs] def step(self, action: Action) -> Tuple[Any]:
"""Simulate a action interaction with a context.
Note
-------
The simulation procedure is given as follows.
1. Sample reward for the given state-action pair.
2. Update state with state transition function.
3. Return the feedback to the RL agent.
Parameters
-------
action: {int, array-like of shape (action_dim, )} (>= 0)
Indicating which action to present to the context.
Returns
-------
feedbacks: Tuple
obs: ndarray of shape (state_dim,)
State observation, which possibly be noisy.
reward: float
Observed immediate rewards.
done: bool
Whether the episode end or not.
truncated: False
For API consistency.
info: (empty) dict
Additional information that may be useful for the package users.
This is unavailable to the RL agent.
"""
if self.action_type == "discrete":
action = self.action_context[action]
check_array(
action,
name="action",
expected_dim=1,
)
if action.shape[0] != self.action_dim:
raise ValueError(
"Dimension of action must be equal to action_dim, but found False."
)
# 1. sample reward for the given action.
reward = self.reward_function.sample_reward(self.state, action)
# 2. update state with state_transition
self.state = self.state_transition_function.step(self.state, action)
done = self.t == self.step_per_episode - 1
if done:
obs, _ = self.reset()
else:
self.t += 1
obs = self._observation(self.state)
return obs, reward, done, False, {}
[docs] def reset(self, seed: Optional[int] = None) -> np.ndarray:
"""Initialize the environment.
Returns
-------
obs: ndarray of shape (state_dim,)
State observation, which possibly be noisy.
info: (empty) dict
Additional information that may be useful for the package users.
This is unavailable to the RL agent.
"""
if seed is not None:
self.random_ = check_random_state(seed)
self.t = 0
state = self.random_.normal(loc=0.0, scale=1.0, size=(self.state_dim,))
self.state = state / np.linalg.norm(state, ord=2)
obs = self._observation(self.state)
return obs, {}
def render(self) -> None:
pass
def close(self) -> None:
pass