Source code for scope_rl.policy.orl

# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.

"""Meta class to handle Offline Learning (ORL)."""
from dataclasses import dataclass
from collections import defaultdict
from typing import Union, Optional, Any, Dict, List, Tuple
from tqdm.auto import tqdm

import numpy as np

from d3rlpy.algos import QLearningAlgoBase
from d3rlpy.dataset import MDPDataset
from sklearn.model_selection import train_test_split

from .head import BaseHead
from ..utils import MultipleLoggedDataset, defaultdict_to_dict
from ..types import LoggedDataset


HeadDict = Dict[str, Tuple[BaseHead, Dict[str, Any]]]


[docs]@dataclass class TrainCandidatePolicies: """Class to handle ORL by multiple algorithms simultaneously. (applicable to both discrete/continuous action cases) Imported as: :class:`scope_rl.policy.TrainCandidatePolicies` Parameters ------- fitting_args: dict, default=None Arguments of fitting function to learn model. Examples ---------- Preparation: .. code-block:: python # import necessary module from SCOPE-RL from scope_rl.dataset import SyntheticDataset from scope_rl.policy import TrainCandidatePolicies from scope_rl.policy import EpsilonGreedyHead, SoftmaxHead # import necessary module from other libraries import gym import rtbgym from d3rlpy.algos import DoubleDQNConfig from d3rlpy.dataset import create_fifo_replay_buffer from d3rlpy.algos import ConstantEpsilonGreedy from d3rlpy.algos import DiscreteBCQConfig, DiscreteCQLConfig # initialize environment env = gym.make("RTBEnv-discrete-v0") # define (RL) agent (i.e., policy) and train on the environment ddqn = DoubleDQNConfig().create() buffer = create_fifo_replay_buffer( limit=10000, env=env, ) explorer = ConstantEpsilonGreedy( epsilon=0.3, ) ddqn.fit_online( env=env, buffer=buffer, explorer=explorer, n_steps=10000, n_steps_per_epoch=1000, ) # convert ddqn policy to a stochastic data collection policy behavior_policy = EpsilonGreedyHead( ddqn, n_actions=env.action_space.n, epsilon=0.3, name="ddqn_epsilon_0.3", random_state=12345, ) # initialize dataset class dataset = SyntheticDataset( env=env, max_episode_steps=env.step_per_episode, ) # data collection logged_dataset = dataset.obtain_episodes( behavior_policies=behavior_policy, n_datasets=2, n_trajectories=100, random_state=12345, ) **Learning Evaluation Policies**: .. code-block:: python # base algorithms bcq = DiscreteBCQConfig().create() cql = DiscreteCQLConfig().create() algorithms = [bcq, cql] algorithms_name = ["bcq", "cql"] # policy wrappers policy_wrappers = { "eps_01": ( EpsilonGreedyHead, { "epsilon": 0.1, "n_actions": env.action_space.n, } ), "eps_03": ( EpsilonGreedyHead, { "epsilon": 0.3, "n_actions": env.action_space.n, } ), "softmax": ( SoftmaxHead, { "tau": 1.0, "n_actions": env.action_space.n, } ), } # off-policy learning orl = TrainCandidatePolicies() eval_policies = orl.obtain_evaluation_policy( logged_dataset=logged_dataset, algorithms=algorithms, algorithms_name=algorithms_name, policy_wrappers=policy_wrappers, random_state=12345, ) **Output**: .. code-block:: python >>> [eval_policy.name for eval_policy in eval_policies[behavior_policy.name][0]] ['bcq_eps_01', 'bcq_eps_03', 'bcq_softmax', 'cql_eps_01', 'cql_eps_03', 'cql_softmax'] """ fitting_args: Optional[Dict[str, Any]] = None def __post_init__(self): if self.fitting_args is None: self.fitting_args = { "n_steps": 10000, } def _learn_base_policy( self, logged_dataset: LoggedDataset, algorithms: List[QLearningAlgoBase], random_state: Optional[int] = None, ): """Learn base policy. Parameters ------- logged_dataset: LoggedDataset or MultipleLoggedDataset Logged dataset used to conduct OPE. .. code-block:: python key: [ size, n_trajectories, step_per_trajectory, action_type, n_actions, action_dim, action_keys, action_meaning, state_dim, state_keys, state, action, reward, done, terminal, info, pscore, ] .. seealso:: :class:`scope_rl.dataset.SyntheticDataset` describes the components of :class:`logged_dataset`. algorithms: list of QLearningAlgoBase List of algorithms to fit. random_state: int, default=None (>= 0) Random state. Returns ------- base_policies: List of QLearningAlgoBase List of learned policies. """ offlinerl_dataset = MDPDataset( observations=logged_dataset["state"], actions=logged_dataset["action"], rewards=logged_dataset["reward"], terminals=logged_dataset["done"], ) for i in tqdm( np.arange(len(algorithms)), desc="[learn_policies: algorithms]", total=len(algorithms), ): algorithms[i].fit( offlinerl_dataset, **self.fitting_args, ) return algorithms def _apply_head( self, base_policies: List[QLearningAlgoBase], base_policies_name: List[str], policy_wrappers: HeadDict, random_state: Optional[int] = None, ): """Apply policy wrappers to the (deterministic) base policies. Parameters ------- base_policies: list of QLearningAlgoBase List of base (learned) policies. base_policies_name: list of str List of the name of each base policy. policy_wrappers: HeadDict. Dictionary containing information about policy wrappers. The HeadDict should follow the following format. .. code-block:: python key: wrapper_name value: (BaseHead, params_dict) (Example of ``HeadDict``) .. code-block:: python { "eps_01": # wrapper_name ( EpsilonGreedyHead, # BaseHead { "epsilon": 0.1, # params_dict "n_actions": 5, }, ) } .. note:: ``random_state``, ``name``, and ``base_policy`` should be omitted from the ``params_dict``. .. seealso:: :doc:`/documentation/_autosummary/scope_rl.policy.head` described various policy wrappers and their parameters. random_state: int, default=None (>= 0) Random state. Returns ------- evaluation_policies: list of BaseHead List of (stochastic) evaluation policies. """ eval_policies = [] for i in range(len(base_policies)): for head_name in policy_wrappers: Head, kwargs = policy_wrappers[head_name] eval_policy = Head( base_policy=base_policies[i], name=base_policies_name[i] + f"_{head_name}", random_state=random_state, **kwargs, ) eval_policies.append(eval_policy) return eval_policies
[docs] def learn_base_policy( self, logged_dataset: Union[LoggedDataset, MultipleLoggedDataset], algorithms: List[QLearningAlgoBase], behavior_policy_name: Optional[str] = None, dataset_id: Optional[int] = None, random_state: Optional[int] = None, ): """Learn base policy. Parameters ------- logged_dataset: LoggedDataset or MultipleLoggedDataset Logged dataset used to conduct OPE. .. code-block:: python key: [ size, n_trajectories, step_per_trajectory, action_type, n_actions, action_dim, action_keys, action_meaning, state_dim, state_keys, state, action, reward, done, terminal, info, pscore, behavior_policy, dataset_id, ] .. seealso:: :class:`scope_rl.dataset.SyntheticDataset` describes the components of :class:`logged_dataset`. algorithms: list of QLearningAlgoBase List of algorithms to fit. behavior_policy_name: str, default=None Name of the behavior policy. dataset_id: int, default=None Id of the logged dataset. random_state: int, default=None (>= 0) Random state. Returns ------- base_policies: QLearningAlgoBase List of learned policies. """ if isinstance(logged_dataset, MultipleLoggedDataset): if behavior_policy_name is None and dataset_id is None: base_policies = defaultdict(list) for behavior_policy, n_datasets in logged_dataset.n_datasets.items(): for dataset_id_ in range(n_datasets): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy, dataset_id=dataset_id_ ) base_policies_ = self._learn_base_policy( logged_dataset=logged_dataset_, algorithms=algorithms, random_state=random_state, ) base_policies[behavior_policy].append(base_policies_) base_policies = defaultdict_to_dict(base_policies) elif behavior_policy_name is None and dataset_id is not None: base_policies = {} for behavior_policy in logged_dataset.behavior_policy_names: logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy, dataset_id=dataset_id ) base_policies_ = self._learn_base_policy( logged_dataset=logged_dataset_, algorithms=algorithms, random_state=random_state, ) base_policies[behavior_policy] = base_policies_ elif behavior_policy_name is not None and dataset_id is None: base_policies = [] for dataset_id_ in range( logged_dataset.n_datasets[behavior_policy_name] ): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy_name, dataset_id=dataset_id_, ) base_policies_ = self._learn_base_policy( logged_dataset=logged_dataset_, algorithms=algorithms, random_state=random_state, ) base_policies.append(base_policies_) else: logged_dataset = logged_dataset.get( behavior_policy_name=behavior_policy_name, dataset_id=dataset_id_ ) base_policies = self._learn_base_policy( logged_dataset=logged_dataset, algorithms=algorithms, random_state=random_state, ) else: base_policies = self._learn_base_policy( logged_dataset=logged_dataset, algorithms=algorithms, random_state=random_state, ) return base_policies
[docs] def apply_head( self, base_policies: Union[ List[QLearningAlgoBase], Dict[str, List[QLearningAlgoBase]] ], base_policies_name: List[str], policy_wrappers: HeadDict, random_state: Optional[int] = None, ): """Apply policy wrappers to the (deterministic) base policies. Parameters ------- base_policies: list of QLearningAlgoBase List of base (learned) policies. base_policies_name: list of str List of the name of each base policy. policy_wrappers: HeadDict. Dictionary containing information about policy wrappers. The HeadDict should follow the following format. .. code-block:: python key: wrapper_name value: (BaseHead, params_dict) (Example of ``HeadDict``) .. code-block:: python { "eps_01": # wrapper_name ( EpsilonGreedyHead, # BaseHead { "epsilon": 0.1, # params_dict "n_actions": 5, }, ) } .. note:: ``random_state``, ``name``, and ``base_policy`` should be omitted from the ``params_dict``. .. seealso:: :doc:`/documentation/_autosummary/scope_rl.policy.head` described various policy wrappers and their parameters. behavior_policy_name: str, default=None Name of the behavior policy. dataset_id: int, default=None Id of the logged dataset. random_state: int, default=None (>= 0) Random state. Returns ------- evaluation_policies: list of BaseHead List of (stochastic) evaluation policies. """ if isinstance(base_policies, dict): evaluation_policies = {} for behavior_policy in base_policies.keys(): if isinstance(base_policies[behavior_policy][0], QLearningAlgoBase): if len(base_policies[behavior_policy]) != len(base_policies_name): raise ValueError( "Expected `len(base_policies[behavior_policy]) == len(base_policies_name)`, but found False" ) evaluation_policies[behavior_policy] = self._apply_head( base_policies=base_policies[behavior_policy], base_policies_name=base_policies_name, policy_wrappers=policy_wrappers, random_state=random_state, ) else: evaluation_policies[behavior_policy] = [] for dataset_id_ in range(len(base_policies[behavior_policy])): if len(base_policies[behavior_policy][dataset_id_]) != len( base_policies_name ): raise ValueError( "Expected `len(base_policies[behavior_policy][dataset_id_]) == len(base_policies_name)`, but found False" ) evaluation_policies_ = self._apply_head( base_policies=base_policies[behavior_policy][dataset_id_], base_policies_name=base_policies_name, policy_wrappers=policy_wrappers, random_state=random_state, ) evaluation_policies[behavior_policy].append( evaluation_policies_ ) else: if isinstance(base_policies[0], QLearningAlgoBase): if len(base_policies) != len(base_policies_name): raise ValueError( "Expected `len(base_policies) == len(base_policies_name)`, but found False" ) evaluation_policies = self._apply_head( base_policies=base_policies, base_policies_name=base_policies_name, policy_wrappers=policy_wrappers, random_state=random_state, ) else: evaluation_policies = [] for dataset_id_ in range(len(base_policies)): evaluation_policies_ = self._apply_head( base_policies=base_policies[dataset_id_], base_policies_name=base_policies_name, policy_wrappers=policy_wrappers, random_state=random_state, ) evaluation_policies.append(evaluation_policies_) return evaluation_policies
[docs] def obtain_evaluation_policy( self, logged_dataset: Union[LoggedDataset, MultipleLoggedDataset], algorithms: List[QLearningAlgoBase], algorithms_name: List[str], policy_wrappers: HeadDict, behavior_policy_name: Optional[str] = None, dataset_id: Optional[int] = None, random_state: Optional[int] = None, ): """Obtain evaluation policies given base algorithms and policy wrappers. Parameters ------- logged_dataset: LoggedDataset or MultipleLoggedDataset Logged dataset used to conduct OPE. .. code-block:: python key: [ size, n_trajectories, step_per_trajectory, action_type, n_actions, action_dim, action_keys, action_meaning, state_dim, state_keys, state, action, reward, done, terminal, info, pscore, ] .. seealso:: :class:`scope_rl.dataset.SyntheticDataset` describes the components of :class:`logged_dataset`. algorithms: list of QLearningAlgoBase List of algorithms to fit. algorithms_name: list of str List of the name of each base policy. policy_wrappers: HeadDict Dictionary containing information about policy wrappers. The HeadDict should follow the following format. .. code-block:: python key: wrapper_name value: (BaseHead, params_dict) (Example of ``HeadDict``) .. code-block:: python { "eps_01": # wrapper_name ( EpsilonGreedyHead, # BaseHead { "epsilon": 0.1, # params_dict "n_actions": 5, }, ) } .. note:: ``random_state``, ``name``, and ``base_policy`` should be omitted from the ``params_dict``. .. seealso:: :doc:`/documentation/_autosummary/scope_rl.policy.head` described various policy wrappers and their parameters. behavior_policy_name: str, default=None Name of the behavior policy. dataset_id: int, default=None Id of the logged dataset. random_state: int, default=None (>= 0) Random state. Returns ------- evaluation_policies: list of BaseHead List of (stochastic) evaluation policies. """ if len(algorithms) != len(algorithms_name): raise ValueError( "algorithms and alogirthms_name must have the same length, but found False" ) if isinstance(logged_dataset, MultipleLoggedDataset): if behavior_policy_name is None and dataset_id is None: eval_policies = defaultdict(list) for behavior_policy, n_datasets in logged_dataset.n_datasets.items(): for dataset_id_ in range(n_datasets): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy, dataset_id=dataset_id_ ) base_policies_ = self._learn_base_policy( logged_dataset=logged_dataset_, algorithms=algorithms, random_state=random_state, ) eval_policies_ = self._apply_head( base_policies=base_policies_, base_policies_name=algorithms_name, policy_wrappers=policy_wrappers, random_state=random_state, ) eval_policies[behavior_policy].append(eval_policies_) eval_policies = defaultdict_to_dict(eval_policies) elif behavior_policy_name is None and dataset_id is not None: eval_policies = {} for behavior_policy in logged_dataset.behavior_policy_names: logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy, dataset_id=dataset_id ) base_policies_ = self._learn_base_policy( logged_dataset=logged_dataset_, algorithms=algorithms, random_state=random_state, ) eval_policies_ = self._apply_head( base_policies=base_policies_, base_policies_name=algorithms_name, policy_wrappers=policy_wrappers, random_state=random_state, ) eval_policies[behavior_policy] = eval_policies_ elif behavior_policy_name is not None and dataset_id is None: eval_policies = [] for dataset_id_ in range( logged_dataset.n_datasets[behavior_policy_name] ): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy_name, dataset_id=dataset_id_, ) base_policies_ = self._learn_base_policy( logged_dataset=logged_dataset_, algorithms=algorithms, random_state=random_state, ) eval_policies_ = self._apply_head( base_policies=base_policies_, base_policies_name=algorithms_name, policy_wrappers=policy_wrappers, random_state=random_state, ) eval_policies.append(eval_policies_) else: logged_dataset = logged_dataset.get( behavior_policy_name=behavior_policy_name, dataset_id=dataset_id_ ) base_policies = self._learn_base_policy( logged_dataset=logged_dataset, algorithms=algorithms, random_state=random_state, ) eval_policies = self._apply_head( base_policies=base_policies, base_policies_name=algorithms_name, policy_wrappers=policy_wrappers, random_state=random_state, ) else: base_policies = self._learn_base_policy( logged_dataset=logged_dataset, algorithms=algorithms, random_state=random_state, ) eval_policies = self._apply_head( base_policies=base_policies, base_policies_name=algorithms_name, policy_wrappers=policy_wrappers, random_state=random_state, ) return eval_policies