Source code for scope_rl.ope.input

# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.

"""Meta class to create input for Off-Policy Evaluation (OPE)."""
from dataclasses import dataclass
from copy import deepcopy
from typing import Dict, List, Optional, Any, Union

from collections import defaultdict
from tqdm.auto import tqdm

import torch
import numpy as np
from sklearn.utils import check_scalar, check_random_state

import gym
from gym.spaces import Discrete

from d3rlpy.dataset import MDPDataset
from d3rlpy.ope import FQEConfig
from d3rlpy.ope import DiscreteFQE
from d3rlpy.ope import FQE as ContinuousFQE
from d3rlpy.models.encoders import VectorEncoderFactory
from d3rlpy.models.q_functions import MeanQFunctionFactory
from d3rlpy.preprocessing import Scaler, ActionScaler

from .weight_value_learning import (
    DiscreteDiceStateActionWightValueLearning,
    DiscreteDiceStateWightValueLearning,
    ContinuousDiceStateActionWightValueLearning,
    ContinuousDiceStateWightValueLearning,
    DiscreteMinimaxStateActionValueLearning,
    DiscreteMinimaxStateValueLearning,
    ContinuousMinimaxStateActionValueLearning,
    ContinuousMinimaxStateValueLearning,
    DiscreteMinimaxStateActionWeightLearning,
    DiscreteMinimaxStateWeightLearning,
    ContinuousMinimaxStateActionWeightLearning,
    ContinuousMinimaxStateWeightLearning,
)
from .weight_value_learning.function import (
    DiscreteQFunction,
    ContinuousQFunction,
    VFunction,
    DiscreteStateActionWeightFunction,
    ContinuousStateActionWeightFunction,
    StateWeightFunction,
)
from .online import rollout_policy_online
from ..policy.head import BaseHead
from ..utils import (
    MultipleLoggedDataset,
    MultipleInputDict,
    defaultdict_to_dict,
    check_logged_dataset,
    check_array,
)
from ..types import LoggedDataset, OPEInputDict


[docs]@dataclass class CreateOPEInput: """Class to prepare OPE inputs. Imported as: :class:`scope_rl.ope.CreateOPEInput` Parameters ------- env: gym.Env, default=None Reinforcement learning (RL) environment. model_args: dict of dict, default=None Arguments of the models. .. code-block:: python key: [ "fqe", "state_action_dual", "state_action_value", "state_action_weight", "state_dual", "state_value", "state_weight", "hidden_dim", # hidden dim of value/weight function, except FQE ] .. note:: Please specify :class:`scaler` and :class:`action_scaler` when calling :class:`.obtain_whole_inputs()` (, as we will overwrite those specified by :class:`model_args[model]["scaler/action_scaler"]`). .. seealso:: The followings describe the parameters of each model. * (external) `d3rlpy's documentation about FQE <https://d3rlpy.readthedocs.io/en/latest/references/off_policy_evaluation.html>`_ * (API reference) :class:`scope_rl.ope.weight_value_learning` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. bandwidth: float, default=1.0 (> 0) Bandwidth hyperparameter of the kernel. state_scaler: d3rlpy.preprocessing.Scaler, default=None Scaling factor of state. action_scaler: d3rlpy.preprocessing.ActionScaler, default=None Scaling factor of action. Only applicable in the continuous action case. device: Optional[str] = None Specifies device used for torch. Examples ---------- Preparation: .. code-block:: python # import necessary module from SCOPE-RL from scope_rl.dataset import SyntheticDataset from scope_rl.policy import EpsilonGreedyHead from scope_rl.ope import CreateOPEInput from scope_rl.ope import OffPolicyEvaluation as OPE from scope_rl.ope.discrete import TrajectoryWiseImportanceSampling as TIS from scope_rl.ope.discrete import PerDecisionImportanceSampling as PDIS # import necessary module from other libraries import gym import rtbgym from d3rlpy.algos import DoubleDQNConfig from d3rlpy.dataset import create_fifo_replay_buffer from d3rlpy.algos import ConstantEpsilonGreedy # initialize environment env = gym.make("RTBEnv-discrete-v0") # define (RL) agent (i.e., policy) and train on the environment ddqn = DoubleDQNConfig().create() buffer = create_fifo_replay_buffer( limit=10000, env=env, ) explorer = ConstantEpsilonGreedy( epsilon=0.3, ) ddqn.fit_online( env=env, buffer=buffer, explorer=explorer, n_steps=10000, n_steps_per_epoch=1000, ) # convert ddqn policy to stochastic data collection policy behavior_policy = EpsilonGreedyHead( ddqn, n_actions=env.action_space.n, epsilon=0.3, name="ddqn_epsilon_0.3", random_state=12345, ) # initialize dataset class dataset = SyntheticDataset( env=env, max_episode_steps=env.step_per_episode, ) # data collection logged_dataset = dataset.obtain_episodes( behavior_policies=behavior_policy, n_trajectories=100, random_state=12345, ) **Create Input**: .. code-block:: python # evaluation policy ddqn_ = EpsilonGreedyHead( base_policy=ddqn, n_actions=env.action_space.n, name="ddqn", epsilon=0.0, random_state=12345 ) random_ = EpsilonGreedyHead( base_policy=ddqn, n_actions=env.action_space.n, name="random", epsilon=1.0, random_state=12345 ) # create input for off-policy evaluation (OPE) prep = CreateOPEInput( env=env, ) input_dict = prep.obtain_whole_inputs( logged_dataset=logged_dataset, evaluation_policies=[ddqn_, random_], require_value_prediction=True, n_trajectories_on_policy_evaluation=100, random_state=12345, ) **Output**: .. code-block:: python >>> input_dict {'ddqn': {'evaluation_policy_action_dist': array([[0., 0., 0., ..., 0., 1., 0.], [0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 1., 0.], ..., [0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 1., 0.]]), 'evaluation_policy_action': None, 'state_action_value_prediction': array([[11.64699173, 10.1278677 , 10.09877205, ..., 10.16476822, 15.13939476, 8.95065594], [10.42242146, 7.73790789, 7.27790451, ..., 3.51157165, 12.0761919 , 3.75301909], [ 7.22864819, 6.88499546, 5.68951464, ..., 6.10659647, 7.05469513, 4.81715965], ..., [ 7.28475332, 3.91264176, 4.6845212 , ..., -0.02834684, 7.94454432, 2.59267783], [13.44723797, 3.08360171, 5.99188185, ..., -2.16886044, 7.13434792, 5.72265959], [ 2.27913332, 3.07881427, 1.8636421 , ..., 3.37803316, 3.20135021, 2.68845224]]), 'initial_state_value_prediction': array([15.13939476, 14.83423805, 13.82990742, ..., 15.49367523, 15.49053097, 14.88922691]), 'state_action_marginal_importance_weight': None, 'state_marginal_importance_weight': None, 'on_policy_policy_value': array([ 8., 10., 9., ..., 13., 18., 4.]), 'gamma': 1.0, 'behavior_policy': 'ddqn_epsilon_0.3', 'evaluation_policy': 'ddqn', 'dataset_id': 0},}, 'random': {'evaluation_policy_action_dist': array([[0.1, 0.1, 0.1, ..., 0.1, 0.1, 0.1], [0.1, 0.1, 0.1, ..., 0.1, 0.1, 0.1], [0.1, 0.1, 0.1, ..., 0.1, 0.1, 0.1], ..., [0.1, 0.1, 0.1, ..., 0.1, 0.1, 0.1], [0.1, 0.1, 0.1, ..., 0.1, 0.1, 0.1], [0.1, 0.1, 0.1, ..., 0.1, 0.1, 0.1]]), 'evaluation_policy_action': None, 'state_action_value_prediction': array([[10.63342857, 10.61063576, 11.16767025, ..., 15.32427979, 15.08568764, 10.50707436], [ 4.02995491, 4.80947208, 7.07302999, ..., 9.928442 , 10.78198528, 9.04977417], [ 6.21145582, 6.08772421, 6.5972681 , ..., 9.68579388, 7.2353406 , 6.17404699], ..., [ 1.2350018 , 1.37531543, 3.48139453, ..., 3.44862366, 5.41990328, -0.20314722], [ 0.81208032, -0.28935188, 2.62608957, ..., 6.6619091 , -2.18710518, -2.34665537], [ 2.36533523, 2.24474525, 2.31729817, ..., 4.7845993 , 2.83752441, 3.00596046]]), 'initial_state_value_prediction': array([12.5472518 , 12.56364899, 12.30248432, ..., 12.62372198, 12.6544138 , 12.54314356]), 'state_action_marginal_importance_weight': None, 'state_marginal_importance_weight': None, 'on_policy_policy_value': array([ 9., 7., 4., ..., 15., 8., 5.]), 'gamma': 1.0, 'behavior_policy': 'ddqn_epsilon_0.3', 'evaluation_policy': 'random', 'dataset_id': 0}} .. seealso:: * :doc:`Quickstart </documentation/quickstart>` """ env: Optional[gym.Env] = None model_args: Optional[Dict[str, Any]] = None gamma: float = 1.0 bandwidth: float = 1.0 state_scaler: Optional[Scaler] = None action_scaler: Optional[ActionScaler] = None device: Optional[str] = None def __post_init__(self) -> None: if self.model_args is None: self.model_args = { "fqe": None, "state_action_dual": None, "state_action_value": None, "state_action_weight": None, "state_dual": None, "state_value": None, "state_weight": None, "hidden_dim": None, } for key in [ "fqe", "state_action_dual", "state_action_value", "state_action_weight", "state_dual", "state_value", "state_weight", "hidden_dim", ]: if key not in self.model_args: self.model_args[key] = None if self.model_args["hidden_dim"] is None: self.model_args["hidden_dim"] = 100 if self.model_args["fqe"] is None: self.model_args["fqe"] = { "encoder_factory": VectorEncoderFactory( hidden_units=[self.model_args["hidden_dim"]] ), "q_func_factory": MeanQFunctionFactory(), "learning_rate": 1e-4, } if self.model_args["state_action_dual"] is None: self.model_args["state_action_dual"] = {} if self.model_args["state_action_value"] is None: self.model_args["state_action_value"] = {} if self.model_args["state_action_weight"] is None: self.model_args["state_action_weight"] = {} if self.model_args["state_dual"] is None: self.model_args["state_dual"] = {} if self.model_args["state_value"] is None: self.model_args["state_value"] = {} if self.model_args["state_weight"] is None: self.model_args["state_weight"] = {} check_scalar( self.gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0 ) check_scalar(self.bandwidth, name="bandwidth", target_type=float, min_val=0.0) if self.state_scaler is not None: if not isinstance(self.state_scaler, Scaler): raise ValueError( "state_scaler must be an instance of d3rlpy.preprocessing.Scaler, but found False" ) if self.action_scaler is not None: if not isinstance(self.action_scaler, ActionScaler): raise ValueError( "action_scaler must be an instance of d3rlpy.preprocessing.ActionScaler, but found False" ) if self.device is None: self.device = "cuda:0" if torch.cuda.is_available() else "cpu" def _register_logged_dataset(self, logged_dataset: LoggedDataset): self.fqe = {} self.state_action_dual_function = {} self.state_action_value_function = {} self.state_action_weight_function = {} self.state_dual_function = {} self.state_value_function = {} self.state_weight_function = {} self.initial_state_dict = {} self.logged_dataset = logged_dataset self.behavior_policy_name = logged_dataset["behavior_policy"] self.dataset_id = logged_dataset["dataset_id"] self.action_type = self.logged_dataset["action_type"] self.n_actions = self.logged_dataset["n_actions"] self.action_dim = self.logged_dataset["action_dim"] self.state_dim = self.logged_dataset["state_dim"] self.n_trajectories = self.logged_dataset["n_trajectories"] self.step_per_trajectory = self.logged_dataset["step_per_trajectory"] self.n_samples = self.n_trajectories * self.step_per_trajectory self.state = self.logged_dataset["state"] self.action = self.logged_dataset["action"] self.reward = self.logged_dataset["reward"] self.pscore = self.logged_dataset["pscore"] self.done = self.logged_dataset["done"] self.terminal = self.logged_dataset["terminal"] self.state_2d = self.state.reshape( (-1, self.step_per_trajectory, self.state_dim) ) self.reward_2d = self.reward.reshape((-1, self.step_per_trajectory)) self.pscore_2d = self.pscore.reshape((-1, self.step_per_trajectory)) self.done_2d = self.done.reshape((-1, self.step_per_trajectory)) self.terminal_2d = self.terminal.reshape((-1, self.step_per_trajectory)) if self.action_type == "discrete": self.action_2d = self.action.reshape((-1, self.step_per_trajectory)) else: self.action_2d = self.action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) self.mdp_dataset = MDPDataset( observations=self.state, actions=self.action, rewards=self.reward, terminals=self.done, )
[docs] def build_and_fit_FQE( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, ) -> None: """Perform Fitted Q Evaluation (FQE). Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.fqe: pass else: self.fqe[evaluation_policy.name] = [] fqe_config = FQEConfig(gamma=self.gamma, **self.model_args["fqe"]) for k in range(k_fold): if self.action_type == "discrete": self.fqe[evaluation_policy.name].append( DiscreteFQE( algo=evaluation_policy, config=fqe_config, device=self.device, ) ) else: self.fqe[evaluation_policy.name].append( ContinuousFQE( algo=evaluation_policy, config=fqe_config, device=self.device, ) ) if k_fold == 1: self.fqe[evaluation_policy.name][0].fit( self.mdp_dataset, n_steps=n_steps, ) else: all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) mdp_dataset_ = MDPDataset( observations=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), actions=action_, rewards=self.reward_2d[subset_idx_].flatten(), terminals=self.done_2d[subset_idx_].flatten(), ) self.fqe[evaluation_policy.name][k].fit( mdp_dataset_, n_steps=n_steps, )
[docs] def build_and_fit_state_action_dual_model( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, random_state: Optional[int] = None, ) -> None: """Perform Augmented Lagrangian Method (ALM) to estimate the state-action value weight function. Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. random_state: int, default=None (>= 0) Random state. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.state_action_dual_function: pass else: self.state_action_dual_function[evaluation_policy.name] = [] for k in range(k_fold): if self.action_type == "discrete": self.state_action_dual_function[evaluation_policy.name].append( DiscreteDiceStateActionWightValueLearning( q_function=DiscreteQFunction( n_actions=self.n_actions, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], device=self.device, ), w_function=DiscreteStateActionWeightFunction( n_actions=self.n_actions, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, device=self.device, ), state_scaler=self.state_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_action_dual"], ) ) else: self.state_action_dual_function[evaluation_policy.name].append( ContinuousDiceStateActionWightValueLearning( q_function=ContinuousQFunction( action_dim=self.action_dim, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], ), w_function=ContinuousStateActionWeightFunction( action_dim=self.action_dim, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, ), state_scaler=self.state_scaler, action_scaler=self.action_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_action_dual"], ) ) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, ) ) else: evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, ) if k_fold == 1: if self.action_type == "discrete": self.state_action_dual_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, evaluation_policy_action_dist=evaluation_policy_action_dist, n_steps=n_steps, random_state=random_state, ) else: self.state_action_dual_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, evaluation_policy_action=evaluation_policy_action, n_steps=n_steps, random_state=random_state, ) else: if self.action_type == "discrete": evaluation_policy_action_dist = ( evaluation_policy_action_dist.reshape( (-1, self.step_per_trajectory, self.n_actions) ) ) else: evaluation_policy_action = evaluation_policy_action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() evaluation_policy_action_dist_ = evaluation_policy_action_dist[ subset_idx_ ].reshape((-1, self.n_actions)) else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) evaluation_policy_action_ = evaluation_policy_action[ subset_idx_ ].reshape((-1, self.action_dim)) if self.action_type == "discrete": self.state_action_dual_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), evaluation_policy_action_dist=evaluation_policy_action_dist_, n_steps=n_steps, random_state=random_state, ) else: self.state_action_dual_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), evaluation_policy_action=evaluation_policy_action_, n_steps=n_steps, random_state=random_state, )
[docs] def build_and_fit_state_action_value_model( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, random_state: Optional[int] = None, ) -> None: """Perform Minimax Q Learning (MQL) to estimate the state-action value function. Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. random_state: int, default=None (>= 0) Random state. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.state_action_value_function: pass else: self.state_action_value_function[evaluation_policy.name] = [] for k in range(k_fold): if self.action_type == "discrete": self.state_action_value_function[evaluation_policy.name].append( DiscreteMinimaxStateActionValueLearning( q_function=DiscreteQFunction( n_actions=self.n_actions, state_dim=self.state_dim, device=self.device, hidden_dim=self.model_args["hidden_dim"], ), state_scaler=self.state_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_action_value"], ) ) else: self.state_action_value_function[evaluation_policy.name].append( ContinuousMinimaxStateActionValueLearning( q_function=ContinuousQFunction( action_dim=self.action_dim, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], ), state_scaler=self.state_scaler, action_scaler=self.action_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_action_value"], ) ) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, ) ) else: evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, ) if k_fold == 1: if self.action_type == "discrete": self.state_action_value_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, n_steps=n_steps, random_state=random_state, ) else: self.state_action_value_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action=evaluation_policy_action, n_steps=n_steps, random_state=random_state, ) else: if self.action_type == "discrete": evaluation_policy_action_dist = ( evaluation_policy_action_dist.reshape( (-1, self.step_per_trajectory, self.n_actions) ) ) else: evaluation_policy_action = evaluation_policy_action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() evaluation_policy_action_dist_ = evaluation_policy_action_dist[ subset_idx_ ].reshape((-1, self.n_actions)) else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) evaluation_policy_action_ = evaluation_policy_action[ subset_idx_ ].reshape((-1, self.action_dim)) if self.action_type == "discrete": self.state_action_value_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].flatten(), evaluation_policy_action_dist=evaluation_policy_action_dist_, n_steps=n_steps, random_state=random_state, ) else: self.state_action_value_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].reshape( (-1, self.action_dim) ), evaluation_policy_action=evaluation_policy_action_, n_steps=n_steps, random_state=random_state, )
[docs] def build_and_fit_state_action_weight_model( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, random_state: Optional[int] = None, ) -> None: """Perform Minimax Weight Learning (MWL) to estimate the state-action weight function. Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. random_state: int, default=None (>= 0) Random state. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.state_action_weight_function: pass else: self.state_action_weight_function[evaluation_policy.name] = [] for k in range(k_fold): if self.action_type == "discrete": self.state_action_weight_function[evaluation_policy.name].append( DiscreteMinimaxStateActionWeightLearning( w_function=DiscreteStateActionWeightFunction( n_actions=self.n_actions, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, device=self.device, ), state_scaler=self.state_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_action_weight"], ) ) else: self.state_action_weight_function[evaluation_policy.name].append( ContinuousMinimaxStateActionWeightLearning( w_function=ContinuousStateActionWeightFunction( action_dim=self.action_dim, state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, ), state_scaler=self.state_scaler, action_scaler=self.action_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_action_weight"], ) ) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, ) ) else: evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, ) if k_fold == 1: if self.action_type == "discrete": self.state_action_weight_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, evaluation_policy_action_dist=evaluation_policy_action_dist, n_steps=n_steps, random_state=random_state, ) else: self.state_action_weight_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, evaluation_policy_action=evaluation_policy_action, n_steps=n_steps, random_state=random_state, ) else: if self.action_type == "discrete": evaluation_policy_action_dist = ( evaluation_policy_action_dist.reshape( (-1, self.step_per_trajectory, self.n_actions) ) ) else: evaluation_policy_action = evaluation_policy_action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() evaluation_policy_action_dist_ = evaluation_policy_action_dist[ subset_idx_ ].reshape((-1, self.n_actions)) else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) evaluation_policy_action_ = evaluation_policy_action[ subset_idx_ ].reshape((-1, self.action_dim)) if self.action_type == "discrete": self.state_action_weight_function[evaluation_policy.name][ k ].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), evaluation_policy_action_dist=evaluation_policy_action_dist_, n_steps=n_steps, random_state=random_state, ) else: self.state_action_weight_function[evaluation_policy.name][ k ].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), evaluation_policy_action=evaluation_policy_action_, n_steps=n_steps, random_state=random_state, )
[docs] def build_and_fit_state_dual_model( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, random_state: Optional[int] = None, ) -> None: """Perform Augmented Lagrangian Method (ALM) to estimate the state value weight function. Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. random_state: int, default=None (>= 0) Random state. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.state_dual_function: pass else: self.state_dual_function[evaluation_policy.name] = [] for k in range(k_fold): if self.action_type == "discrete": self.state_dual_function[evaluation_policy.name].append( DiscreteDiceStateWightValueLearning( v_function=VFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], ), w_function=StateWeightFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, ), state_scaler=self.state_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_dual"], ) ) else: self.state_dual_function[evaluation_policy.name].append( ContinuousDiceStateWightValueLearning( v_function=VFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], ), w_function=StateWeightFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, ), state_scaler=self.state_scaler, action_scaler=self.action_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_dual"], ) ) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, ) ) else: evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, ) if k_fold == 1: if self.action_type == "discrete": self.state_dual_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, n_steps=n_steps, random_state=random_state, ) else: self.state_dual_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action=evaluation_policy_action, n_steps=n_steps, random_state=random_state, ) else: if self.action_type == "discrete": evaluation_policy_action_dist = ( evaluation_policy_action_dist.reshape( (-1, self.step_per_trajectory, self.n_actions) ) ) else: evaluation_policy_action = evaluation_policy_action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() evaluation_policy_action_dist_ = evaluation_policy_action_dist[ subset_idx_ ].reshape((-1, self.n_actions)) else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) evaluation_policy_action_ = evaluation_policy_action[ subset_idx_ ].reshape((-1, self.action_dim)) if self.action_type == "discrete": self.state_dual_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].flatten(), evaluation_policy_action_dist=evaluation_policy_action_dist_, n_steps=n_steps, random_state=random_state, ) else: self.state_dual_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].reshape( (-1, self.action_dim) ), evaluation_policy_action=evaluation_policy_action_, n_steps=n_steps, random_state=random_state, )
[docs] def build_and_fit_state_value_model( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, random_state: Optional[int] = None, ) -> None: """Perform Minimax V Learning (MVL) to estimate the state value function. Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. random_state: int, default=None (>= 0) Random state. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.state_value_function: pass else: self.state_value_function[evaluation_policy.name] = [] for k in range(k_fold): if self.action_type == "discrete": self.state_value_function[evaluation_policy.name].append( DiscreteMinimaxStateValueLearning( v_function=VFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], ), state_scaler=self.state_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_value"], ) ) else: self.state_value_function[evaluation_policy.name].append( ContinuousMinimaxStateValueLearning( v_function=VFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], ), state_scaler=self.state_scaler, action_scaler=self.action_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_value"], ) ) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, ) ) else: evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, ) if k_fold == 1: if self.action_type == "discrete": self.state_value_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, n_steps=n_steps, random_state=random_state, ) else: self.state_value_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action=evaluation_policy_action, n_steps=n_steps, random_state=random_state, ) else: if self.action_type == "discrete": evaluation_policy_action_dist = ( evaluation_policy_action_dist.reshape( (-1, self.step_per_trajectory, self.n_actions) ) ) else: evaluation_policy_action = evaluation_policy_action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() evaluation_policy_action_dist_ = evaluation_policy_action_dist[ subset_idx_ ].reshape((-1, self.n_actions)) else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) evaluation_policy_action_ = evaluation_policy_action[ subset_idx_ ].reshape((-1, self.action_dim)) if self.action_type == "discrete": self.state_value_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].flatten(), evaluation_policy_action_dist=evaluation_policy_action_dist_, n_steps=n_steps, random_state=random_state, ) else: self.state_value_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].reshape( (-1, self.action_dim) ), evaluation_policy_action=evaluation_policy_action_, n_steps=n_steps, random_state=random_state, )
[docs] def build_and_fit_state_weight_model( self, evaluation_policy: BaseHead, k_fold: int = 1, n_steps: int = 10000, random_state: Optional[int] = None, ) -> None: """Perform Minimax Weight Learning (MWL) to estimate state weight function. Parameters ------- evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. n_steps: int, default=10000 (> 0) Number of gradient steps. random_state: int, default=None (>= 0) Random state. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") check_scalar(n_steps, name="n_steps", target_type=int, min_val=1) if evaluation_policy.name in self.state_weight_function: pass else: self.state_weight_function[evaluation_policy.name] = [] for k in range(k_fold): if self.action_type == "discrete": self.state_weight_function[evaluation_policy.name].append( DiscreteMinimaxStateWeightLearning( w_function=StateWeightFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, ), state_scaler=self.state_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_weight"], ) ) else: self.state_weight_function[evaluation_policy.name].append( ContinuousMinimaxStateWeightLearning( w_function=StateWeightFunction( state_dim=self.state_dim, hidden_dim=self.model_args["hidden_dim"], enable_gradient_reversal=True, ), state_scaler=self.state_scaler, action_scaler=self.action_scaler, bandwidth=self.bandwidth, device=self.device, **self.model_args["state_weight"], ) ) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, ) ) else: evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, ) if k_fold == 1: if self.action_type == "discrete": self.state_weight_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, n_steps=n_steps, random_state=random_state, ) else: self.state_weight_function[evaluation_policy.name][0].fit( step_per_trajectory=self.step_per_trajectory, state=self.state, action=self.action, reward=self.reward, pscore=self.pscore, evaluation_policy_action=evaluation_policy_action, n_steps=n_steps, random_state=random_state, ) else: if self.action_type == "discrete": evaluation_policy_action_dist = ( evaluation_policy_action_dist.reshape( (-1, self.step_per_trajectory, self.n_actions) ) ) else: evaluation_policy_action = evaluation_policy_action.reshape( (-1, self.step_per_trajectory, self.action_dim) ) all_idx = np.arange(self.n_trajectories) idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() for k in tqdm( np.arange(k_fold), desc="[cross-fitting]", total=k_fold, ): idx_ = np.arange(idx[k], idx[k + 1]) subset_idx_ = np.setdiff1d(all_idx, idx_) if self.action_type == "discrete": action_ = self.action_2d[subset_idx_].flatten() evaluation_policy_action_dist_ = evaluation_policy_action_dist[ subset_idx_ ].reshape((-1, self.n_actions)) else: action_ = self.action_2d[subset_idx_].reshape( (-1, self.action_dim) ) evaluation_policy_action_ = evaluation_policy_action[ subset_idx_ ].reshape((-1, self.action_dim)) if self.action_type == "discrete": self.state_weight_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].flatten(), evaluation_policy_action_dist=evaluation_policy_action_dist_, n_steps=n_steps, random_state=random_state, ) else: self.state_weight_function[evaluation_policy.name][k].fit( step_per_trajectory=self.step_per_trajectory, state=self.state_2d[subset_idx_].reshape( (-1, self.state_dim) ), action=action_, reward=self.reward_2d[subset_idx_].flatten(), pscore=self.pscore_2d[subset_idx_].reshape( (-1, self.action_dim) ), evaluation_policy_action=evaluation_policy_action_, n_steps=n_steps, random_state=random_state, )
[docs] def obtain_initial_state( self, evaluation_policy: BaseHead, resample_initial_state: bool = False, minimum_rollout_length: int = 0, maximum_rollout_length: int = 100, random_state: Optional[int] = None, ): """Obtain initial state distribution (stationary distribution) of the evaluation policy. Parameters ------- evaluation_policy: BaseHead Evaluation policy. resample_initial_state: bool, default=False Whether to resample from initial state distribution using the given evaluation policy. If `False`, the initial state distribution of the behavior policy is used instead. minimum_rollout_length: int, default=0 (>= 0) Minimum length of rollout by the behavior policy before generating the logged dataset when working on the infinite horizon setting. This argument is irrelevant when working on the finite horizon setting. maximum_rollout_length: int, default=100 (>= minimum_rollout_length) Maximum length of rollout by the behavior policy before generating the logged dataset when working on the infinite horizon setting. This argument is irrelevant when working on the finite horizon setting. random_state: int, default=None (>= 0) Random state. Return ------- evaluation_policy_initial_state: ndarray of shape (n_trajectories, ) Initial state distribution of the evaluation policy. This is intended to be used in marginal OPE estimators. """ check_scalar( minimum_rollout_length, name=minimum_rollout_length, target_type=int, min_val=0, ) check_scalar( maximum_rollout_length, name=maximum_rollout_length, target_type=int, min_val=0, ) if maximum_rollout_length < minimum_rollout_length: raise ValueError( "maximum_rollout_length must be larger than minimum_rollout_length, but found False." ) if random_state is None: raise ValueError("random_state must be given") random_ = check_random_state(random_state) if not resample_initial_state: initial_state = self.state else: if evaluation_policy.name in self.initial_state_dict: initial_state = self.initial_state_dict[evaluation_policy.name] else: initial_state = np.zeros((self.n_trajectories), self.state_dim) rollout_lengths = random_.choice( np.arange(minimum_rollout_length, maximum_rollout_length), size=self.n_trajectories, ) self.env.reset(random_state) done = True for i in tqdm( np.arange(self.n_trajectories), desc="[obtain_trajectories]", total=self.n_trajectories, ): for rollout_step in rollout_lengths[i]: if done: state, _ = self.env.reset() action = evaluation_policy.sample_action_online(state) state, _, done, _, _ = self.env.step(action) initial_state[i] = state return initial_state
[docs] def obtain_evaluation_policy_action( self, evaluation_policy: BaseHead, state: Optional[np.ndarray] = None, ) -> np.ndarray: """Obtain evaluation policy action. Parameters ------- evaluation_policy: BaseHead Evaluation policy. state: np.ndarray, default=None Sample an action from the evaluation_policy at this state. If None is given, state observed in the logged data will be used. Return ------- evaluation_policy_action: ndarray of shape (n_trajectories * step_per_trajectory, ) Evaluation policy action :math:`a_t \\sim \\pi(a_t \\mid s_t)`. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") if state is not None: check_array(state, name="state", expected_dim=2) if state.shape[1] != self.state_dim: raise ValueError( "Expected state.shape[1] == self.logged_dataset['state_dim'], but found False" ) state = self.state if state is None else state return evaluation_policy.sample_action(state)
[docs] def obtain_evaluation_policy_action_dist( self, evaluation_policy: BaseHead, state: Optional[np.ndarray] = None, ) -> np.ndarray: """Obtain action choice probability of the evaluation policy and its an estimated Q-function of the observed state. Parameters ------- evaluation_policy: BaseHead Evaluation policy. state: np.ndarray, default=None Sample an action from the evaluation_policy at this state.. If None is given, state observed in the logged data will be used. Return ------- evaluation_policy_action_dist: ndarray of shape (n_trajectories * step_per_trajectory, n_actions) Evaluation policy pscore :math:`\\pi(a_t \\mid s_t)`. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") if state is not None: check_array(state, name="state", expected_dim=2) if state.shape[1] != self.state_dim: raise ValueError( "Expected state.shape[1] == self.logged_dataset['state_dim'], but found False" ) state = self.state if state is None else state return evaluation_policy.calc_action_choice_probability(state)
[docs] def obtain_evaluation_policy_action_prob_for_observed_state_action( self, evaluation_policy: BaseHead, ) -> np.ndarray: """Obtain the pscore of an observed state action pair. Parameters ------- evaluation_policy: BaseHead Evaluation policy. Return ------- evaluation_policy_pscore: ndarray of shape (n_trajectories * step_per_trajectory, ) Evaluation policy pscore :math:`\\pi(a_t \\mid s_t)`. """ if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") return evaluation_policy.calc_pscore_given_action(self.state, self.action)
[docs] def obtain_state_action_value_prediction( self, method: str, evaluation_policy: BaseHead, k_fold: int = 1, ) -> np.ndarray: """Obtain an estimated Q-function of the observed state and all actions (discrete) or that of the actions chosen by behavior and (deterministic) evaluation policies (continuous). Parameters ------- method: {"fqe", "dice_q", "mql"} Estimation method. evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. Return ------- state_action_value_prediction: ndarray of shape (n_trajectories * step_per_trajectory, n_actions) or (n_trajectories * step_per_trajectory, 2) If action_type is "discrete", output is state action value for observed state and all actions, i.e., :math:`\\hat{Q}(s, a) \\forall a \\in \\mathcal{A}`. If action_type is "continuous", output is state action value for the observed action and that chosen by the evaluation policy, i.e., (row 0) :math:`\\hat{Q}(s_t, a_t)` and (row 2) :math:`\\hat{Q}(s_t, \\pi(s_t))`. """ if method not in ["fqe", "dice_q", "mql"]: raise ValueError( f"method must be either 'fqe', 'dice_q', or 'mql', but {method} is given" ) if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") idx = np.array([(self.n_trajectories + k) // k_fold for k in range(k_fold)]) idx = np.insert(idx, 0, 0) idx = idx.cumsum() if self.action_type == "discrete": state_action_value_prediction = np.zeros( (self.n_trajectories, self.step_per_trajectory, self.n_actions) ) for k in range(k_fold): idx_ = np.arange(idx[k], idx[k + 1]) x = self.state_2d[idx_].reshape((-1, self.state_dim)) x_ = [] for i in range(x.shape[0]): x_.append(np.tile(x[i], (self.n_actions, 1))) x_ = np.array(x_).reshape((-1, self.state_dim)) a_ = np.tile(np.arange(self.n_actions), x.shape[0]) if method == "fqe": state_action_value_prediction_ = self.fqe[evaluation_policy.name][ k ].predict_value(x_, a_) elif method == "dice_q": state_action_value_prediction_ = self.state_action_dual_function[ evaluation_policy.name ][k].predict_value(x_, a_) elif method == "mql": state_action_value_prediction_ = self.state_action_value_function[ evaluation_policy.name ][k].predict_value(x_, a_) state_action_value_prediction[ idx_ ] = state_action_value_prediction_.reshape( (-1, self.step_per_trajectory, self.n_actions) ) state_action_value_prediction = state_action_value_prediction.reshape( (-1, self.n_actions) ) else: state_action_value_prediction = np.zeros( (self.n_trajectories, self.step_per_trajectory, 2) ) evaluation_policy_action = self.obtain_evaluation_policy_action( evaluation_policy ).reshape((-1, self.step_per_trajectory, self.action_dim)) for k in range(k_fold): idx_ = np.arange(idx[k], idx[k + 1]) state_ = self.state_2d[idx_].reshape((-1, self.state_dim)) action_ = self.action_2d[idx_].reshape((-1, self.action_dim)) evaluation_policy_action_ = evaluation_policy_action[idx_].reshape( (-1, self.action_dim) ) if method == "fqe": state_action_value_prediction_behavior_ = self.fqe[ evaluation_policy.name ][k].predict_value(state_, action_) state_action_value_prediction_eval_ = self.fqe[ evaluation_policy.name ][k].predict_value(state_, evaluation_policy_action_) elif method == "dice_q": state_action_value_prediction_behavior_ = ( self.state_action_dual_function[evaluation_policy.name][ k ].predict_value(state_, action_) ) state_action_value_prediction_eval_ = ( self.state_action_dual_function[evaluation_policy.name][ k ].predict_value(state_, evaluation_policy_action_) ) elif method == "mql": state_action_value_prediction_behavior_ = ( self.state_action_value_function[evaluation_policy.name][ k ].predict_value(state_, action_) ) state_action_value_prediction_eval_ = ( self.state_action_value_function[evaluation_policy.name][ k ].predict_value(state_, evaluation_policy_action_) ) state_action_value_prediction[ idx_, :, 0 ] = state_action_value_prediction_behavior_.reshape( (-1, self.step_per_trajectory) ) state_action_value_prediction[ idx_, :, 1 ] = state_action_value_prediction_eval_.reshape( (-1, self.step_per_trajectory) ) state_action_value_prediction = state_action_value_prediction.reshape( (-1, 2) ) return state_action_value_prediction
[docs] def obtain_initial_state_value_prediction( self, method: str, evaluation_policy: BaseHead, k_fold: int = 1, resample_initial_state: bool = False, minimum_rollout_length: int = 0, maximum_rollout_length: int = 100, random_state: Optional[int] = None, ) -> np.ndarray: """Obtain the initial state value of the evaluation policy in the case of discrete action spaces. Parameters ------- method: {"fqe", "dice_q", "dice_v", "mql", "mvl"} Estimation method. evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. resample_initial_state: bool, default=False Whether to resample initial state distribution using the given evaluation policy. If `False`, the initial state distribution of the behavior policy is used instead. minimum_rollout_length: int, default=0 (>= 0) Minimum length of rollout by the behavior policy before generating the logged dataset when working on the infinite horizon setting. This argument is irrelevant when working on the finite horizon setting. maximum_rollout_length: int, default=100 (>= minimum_rollout_length) Maximum length of rollout by the behavior policy before generating the logged dataset when working on the infinite horizon setting. This argument is irrelevant when working on the finite horizon setting. random_state: int, default=None (>= 0) Random state. Return ------- initial_state_value_prediction: ndarray of shape (n_trajectories, ) State action value of the observed initial state. """ if method not in ["fqe", "dice_q", "dice_v", "mql", "mvl"]: raise ValueError( f"method must be either 'fqe', 'dice_q', 'dice_v', 'mql', or 'mvl', but {method} is given" ) if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") if resample_initial_state: initial_state = self.obtain_initial_state( evaluation_policy=evaluation_policy, resample_initial_state=resample_initial_state, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) if method in ["fqe", "dice_q", "mql"]: if self.action_type == "discrete": x_ = [] for i in range(self.n_trajectories): x_.append(np.tile(initial_state[i], (self.n_actions, 1))) x_ = np.array(x_).reshape((-1, self.state_dim)) a_ = np.tile(np.arange(self.n_actions), self.n_trajectories) else: x_ = initial_state a_ = self.obtain_evaluation_policy_action( evaluation_policy=evaluation_policy, state=initial_state, ) if method == "fqe": state_action_value = self.fqe[evaluation_policy.name][ 0 ].predict_value(x_, a_) elif method == "dice_q": state_action_value = self.state_action_dual_function[ evaluation_policy.name ][0].predict_value(x_, a_) elif method == "mql": state_action_value = self.state_action_value_function[ evaluation_policy.name ][0].predict_value(x_, a_) if self.action_type == "discrete": evaluation_policy_action_dist = ( self.obtain_evaluation_policy_action_dist( evaluation_policy=evaluation_policy, state=initial_state, ) ) initial_state_value = ( state_action_value * evaluation_policy_action_dist ).sum(axis=1) else: initial_state_value = state_action_value else: initial_state_value = self.state_value_function[evaluation_policy.name][ 0 ].predict(state=initial_state) else: if method in ["fqe", "dice_q", "mql"]: state_action_value = self.obtain_state_action_value_prediction( method=method, evaluation_policy=evaluation_policy, k_fold=k_fold, ) if self.action_type == "discrete": action_dist = self.obtain_evaluation_policy_action_dist( evaluation_policy ) state_value = np.sum(state_action_value * action_dist, axis=1) else: state_value = self.obtain_state_action_value_prediction( method=method, evaluation_policy=evaluation_policy, k_fold=k_fold, )[:, 1] state_value = state_value.reshape((-1, self.step_per_trajectory)) else: idx = np.array( [(self.n_trajectories + k) // k_fold for k in range(k_fold)] ) idx = np.insert(idx, 0, 0) idx = idx.cumsum() state_value = np.zeros((self.n_trajectories, self.step_per_trajectory)) for k in range(k_fold): idx_ = np.arange(idx[k], idx[k + 1]) state_ = self.state_2d[idx_].reshape((-1, self.state_dim)) state_value_ = self.state_value_function[evaluation_policy.name][ k ].predict(state_) state_value[idx_] = state_value_.reshape( (-1, self.step_per_trajectory) ) initial_state_value = state_value[:, 0] return initial_state_value # (n_trajectories, )
[docs] def obtain_state_action_marginal_importance_weight( self, method: str, evaluation_policy: BaseHead, k_fold: int = 1, ) -> np.ndarray: """Predict state-action marginal importance weight. Parameters ------- method: {"dice", "mwl"} Estimation method. evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. Return ------- state_action_weight_prediction: ndarray of shape (n_trajectories * step_per_trajectory, ) State-action marginal importance weight for the observed state and action. """ if method not in ["dice", "mwl"]: raise ValueError( f"method must be either 'dice' or 'mwl', but {method} is given" ) if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") idx = np.array([(self.n_trajectories + k) // k_fold for k in range(k_fold)]) idx = np.insert(idx, 0, 0) idx = idx.cumsum() state_action_weight_prediction = np.zeros( (self.n_trajectories, self.step_per_trajectory) ) for k in range(k_fold): idx_ = np.arange(idx[k], idx[k + 1]) state_ = self.state_2d[idx_].reshape((-1, self.state_dim)) if self.action_type == "discrete": action_ = self.action_2d[idx_].flatten() else: action_ = self.action_2d[idx_].reshape((-1, self.action_dim)) if method == "dice": state_action_weight_prediction_ = self.state_action_dual_function[ evaluation_policy.name ][k].predict_weight(state_, action_) elif method == "mwl": state_action_weight_prediction_ = self.state_action_weight_function[ evaluation_policy.name ][k].predict_weight(state_, action_) state_action_weight_prediction[ idx_ ] = state_action_weight_prediction_.reshape((-1, self.step_per_trajectory)) return state_action_weight_prediction.flatten()
[docs] def obtain_state_marginal_importance_weight( self, method: str, evaluation_policy: BaseHead, k_fold: int = 1, ) -> np.ndarray: """Predict state marginal importance weight. Parameters ------- method: {"dice", "mwl"} Estimation method. evaluation_policy: BaseHead Evaluation policy. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. Return ------- state_weight_prediction: ndarray of shape (n_trajectories * step_per_trajectory, ) State marginal importance weight for observed state. """ if method not in ["dice", "mwl"]: raise ValueError( f"method must be either 'dice' or 'mwl', but {method} is given" ) if not isinstance(evaluation_policy, BaseHead): raise ValueError("evaluation_policy must be a child class of BaseHead") idx = np.array([(self.n_trajectories + k) // k_fold for k in range(k_fold)]) idx = np.insert(idx, 0, 0) idx = idx.cumsum() state_weight_prediction = np.zeros( (self.n_trajectories, self.step_per_trajectory) ) for k in range(k_fold): idx_ = np.arange(idx[k], idx[k + 1]) state_ = self.state_2d[idx_].reshape((-1, self.state_dim)) if method == "dice": state_weight_prediction_ = self.state_dual_function[ evaluation_policy.name ][k].predict_weight(state_) elif method == "mwl": state_weight_prediction_ = self.state_weight_function[ evaluation_policy.name ][k].predict_weight(state_) state_weight_prediction[idx_] = state_weight_prediction_.reshape( (-1, self.step_per_trajectory) ) return state_weight_prediction.flatten()
def _obtain_whole_inputs( self, logged_dataset: LoggedDataset, evaluation_policies: List[BaseHead], require_value_prediction: bool = False, require_weight_prediction: bool = False, resample_initial_state: bool = False, q_function_method: str = "fqe", v_function_method: str = "fqe", w_function_method: str = "dice", k_fold: int = 1, n_steps: int = 10000, n_trajectories_on_policy_evaluation: int = 100, use_stationary_distribution_on_policy_evaluation: bool = False, minimum_rollout_length: int = 0, maximum_rollout_length: int = 100, random_state: Optional[int] = None, ) -> OPEInputDict: """Obtain input as a dictionary. Parameters ------- logged_dataset: LoggedDataset or MultipleLoggedDataset Logged dataset used to conduct OPE. .. code-block:: python key: [ size, n_trajectories, step_per_trajectory, action_type, n_actions, action_dim, action_keys, action_meaning, state_dim, state_keys, state, action, reward, done, terminal, info, pscore, behavior_policy, dataset_id, ] .. seealso:: :class:`scope_rl.dataset.SyntheticDataset` describes the components of :class:`logged_dataset`. evaluation_policies: list of BaseHead Evaluation policies. require_value_prediction: bool, default=False Whether to obtain an estimated value function. require_weight_prediction: bool, default=False Whether to obtain an estimated weight function. resample_initial_state: bool, default=False Whether to resample initial state distribution using the given evaluation policy. If `False`, the initial state distribution of the behavior policy is used instead. Note that this parameter is applicable only when self.env is given. q_function_method: {"fqe", "dice_q", "mql"}, default="fqe" Method to estimate :math:`Q(s, a)`. v_function_method: {"fqe", "dice_q", "dice_v", "mql", "mvl"}, default="fqe" Method to estimate :math:`V(s)`. w_function_method: {"dice", "mwl"}, default="dice" Method to estimate :math:`w(s, a)` and :math:`w(s)`. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. If :math:`K>1`, we split the logged dataset into :math:`K` folds. :math:`\\mathcal{D}_j` is the :math:`j`-th split of logged data consisting of :math:`n_k` samples. Then, the value and weight functions (:math:`w^j` and :math:`Q^j`) are trained on the subset of data used for OPE, i.e., :math:`\\mathcal{D} \\setminus \\mathcal{D}_j`. If :math:`K=1`, the value and weight functions are trained on the entire data. n_steps: int, default=10000 (> 0) Number of gradient steps to train weight and value learning models. n_trajectories_on_policy_evaluation: int, default=None (> 0) Number of episodes to perform on-policy evaluation. use_stationary_distribution_on_policy_evaluation: bool, default=False Whether to evaluate a policy based on stationary distribution. If `True`, evaluation policy is evaluated by rollout without resetting environment at each episode. minimum_rollout_length: int, default=0 (>= 0) Minimum length of rollout to collect initial state. maximum_rollout_length: int, default=100 (>= minimum_rollout_length) Maximum length of rollout to collect initial state. random_state: int, default=None (>= 0) Random state. Return ------- input_dict: OPEInputDict Dictionary of the OPE inputs for each evaluation policy. .. code-block:: python key: [evaluation_policy_name][ evaluation_policy_action, evaluation_policy_action_dist, state_action_value_prediction, initial_state_value_prediction, state_action_marginal_importance_weight, state_marginal_importance_weight, on_policy_policy_value, gamma, behavior_policy, evaluation_policy, dataset_id, ] evaluation_policy_action: ndarray of shape (n_trajectories * step_per_trajectories, action_dim) Action chosen by the deterministic evaluation policy. If action_type is "discrete", `None` is recorded. evaluation_policy_action_dist: ndarray of shape (n_trajectories * step_per_trajectory, n_actions) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}`. If action_type is "continuous", `None` is recorded. state_action_value_prediction: ndarray If action_type is "discrete", :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. shape (n_trajectories * step_per_trajectory, n_actions) If action_type is "continuous", :math:`\\hat{Q}` for the observed action and that chosen by the evaluation policy, i.e., (row 0) :math:`\\hat{Q}(s_t, a_t)` and (row 2) :math:`\\hat{Q}(s_t, \\pi(a \\mid s_t))`. shape (n_trajectories * step_per_trajectory, 2) If require_value_prediction is `False`, `None` is recorded. initial_state_value_prediction: ndarray of shape (n_trajectories, ) Estimated initial state value. If use_base_model is `False`, `None` is recorded. state_action_marginal_importance_weight: ndarray of shape (n_trajectories * step_per_trajectory, ) Estimated state-action marginal importance weight, i.e., :math:`\\hat{w}(s_t, a_t) \\approx d^{\\pi}(s_t, a_t) / d^{\\pi_b}(s_t, a_t)`. If require_weight_prediction is `False`, `None` is recorded. state_marginal_importance_weight: ndarray of shape (n_trajectories * step_per_trajectory, ) Estimated state marginal importance weight, i.e., :math:`\\hat{w}(s_t) \\approx d^{\\pi}(s_t) / d^{\\pi_b}(s_t)`. If require_weight_prediction is `False`, `None` is recorded. on_policy_policy_value: ndarray of shape (n_trajectories_on_policy_evaluation, ) On-policy policy value. If self.env is `None`, `None` is recorded. gamma: float Discount factor. behavior_policy: str Name of the behavior policy. evaluation_policy: str Name of the evaluation policy. dataset_id: int Id of the logged dataset. """ check_logged_dataset(logged_dataset) self._register_logged_dataset(logged_dataset) if self.env is not None: if isinstance(self.env.action_space, Discrete): if logged_dataset["action_type"] != "discrete": raise RuntimeError( f"Detected mismatch between action_type of logged_dataset and env.action_space." ) elif logged_dataset["n_actions"] != self.env.action_space.n: raise RuntimeError( f"Detected mismatch between n_actions of logged_dataset and env.action_space.n." ) else: if logged_dataset["action_type"] != "continuous": raise RuntimeError( f"Detected mismatch between action_type of logged_dataset and env.action_space." ) elif logged_dataset["action_dim"] != self.env.action_space.shape[0]: raise RuntimeError( f"Detected mismatch between action_dim of logged_dataset and env.action_space.shape[0]." ) for eval_policy in evaluation_policies: if eval_policy.action_type != self.action_type: raise RuntimeError( f"One of the evaluation_policies, {eval_policy.name} does not match action_type in logged_dataset." " Please use {self.action_type} action_type instead." ) if require_value_prediction: if q_function_method == "fqe" or v_function_method == "fqe": for i in tqdm( range(len(evaluation_policies)), desc="[fit FQE model]", total=len(evaluation_policies), ): self.build_and_fit_FQE( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps ) if q_function_method == "dice_q" or v_function_method == "dice_q": for i in tqdm( range(len(evaluation_policies)), desc="[fit Augmented Lagrangian model]", total=len(evaluation_policies), ): self.build_and_fit_state_action_dual_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) if v_function_method == "dice_v": for i in tqdm( range(len(evaluation_policies)), desc="[fit Augmented Lagrangian model]", total=len(evaluation_policies), ): self.build_and_fit_state_dual_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) if q_function_method == "mql" or v_function_method == "mql": for i in tqdm( range(len(evaluation_policies)), desc="[fit MQL model]", total=len(evaluation_policies), ): self.build_and_fit_state_action_value_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) if v_function_method == "mvl": for i in tqdm( range(len(evaluation_policies)), desc="[fit MVL model]", total=len(evaluation_policies), ): self.build_and_fit_state_value_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) if require_weight_prediction: if w_function_method == "dice": for i in tqdm( range(len(evaluation_policies)), desc="[fit Augmented Lagrangian model]", total=len(evaluation_policies), ): self.build_and_fit_state_action_dual_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) self.build_and_fit_state_dual_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) elif w_function_method == "mwl": for i in tqdm( range(len(evaluation_policies)), desc="[fit MWL model]", total=len(evaluation_policies), ): self.build_and_fit_state_action_weight_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) self.build_and_fit_state_weight_model( evaluation_policies[i], k_fold=k_fold, n_steps=n_steps, random_state=random_state, ) input_dict = defaultdict(dict) for i in tqdm( range(len(evaluation_policies)), desc="[collect input data: eval_policy]", total=len(evaluation_policies), ): # input for IPW, DR if self.action_type == "discrete": input_dict[evaluation_policies[i].name][ "evaluation_policy_action_dist" ] = self.obtain_evaluation_policy_action_dist(evaluation_policies[i]) input_dict[evaluation_policies[i].name][ "evaluation_policy_action" ] = None else: input_dict[evaluation_policies[i].name][ "evaluation_policy_action_dist" ] = None input_dict[evaluation_policies[i].name][ "evaluation_policy_action" ] = self.obtain_evaluation_policy_action(evaluation_policies[i]) # input for DM, DR if require_value_prediction: input_dict[evaluation_policies[i].name][ "state_action_value_prediction" ] = self.obtain_state_action_value_prediction( method=q_function_method, evaluation_policy=evaluation_policies[i], k_fold=k_fold, ) input_dict[evaluation_policies[i].name][ "initial_state_value_prediction" ] = self.obtain_initial_state_value_prediction( method=v_function_method, evaluation_policy=evaluation_policies[i], k_fold=k_fold, resample_initial_state=resample_initial_state, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) else: input_dict[evaluation_policies[i].name][ "state_action_value_prediction" ] = None input_dict[evaluation_policies[i].name][ "initial_state_value_prediction" ] = None # input for marginal OPE if require_weight_prediction: input_dict[evaluation_policies[i].name][ "state_action_marginal_importance_weight" ] = self.obtain_state_action_marginal_importance_weight( method=w_function_method, evaluation_policy=evaluation_policies[i], k_fold=k_fold, ) input_dict[evaluation_policies[i].name][ "state_marginal_importance_weight" ] = self.obtain_state_marginal_importance_weight( method=w_function_method, evaluation_policy=evaluation_policies[i], k_fold=k_fold, ) else: input_dict[evaluation_policies[i].name][ "state_action_marginal_importance_weight" ] = None input_dict[evaluation_policies[i].name][ "state_marginal_importance_weight" ] = None # input for the evaluation of OPE estimators if self.env is not None: if n_trajectories_on_policy_evaluation is None: n_trajectories_on_policy_evaluation = self.n_trajectories input_dict[evaluation_policies[i].name][ "on_policy_policy_value" ] = rollout_policy_online( self.env, evaluation_policies[i], n_trajectories=n_trajectories_on_policy_evaluation, evaluate_on_stationary_distribution=use_stationary_distribution_on_policy_evaluation, step_per_trajectory=self.step_per_trajectory, gamma=self.gamma, random_state=random_state, ) else: input_dict[evaluation_policies[i].name]["on_policy_policy_value"] = None input_dict[evaluation_policies[i].name]["gamma"] = self.gamma input_dict[evaluation_policies[i].name][ "behavior_policy" ] = self.behavior_policy_name input_dict[evaluation_policies[i].name][ "evaluation_policy" ] = evaluation_policies[i].name input_dict[evaluation_policies[i].name]["dataset_id"] = self.dataset_id return defaultdict_to_dict(input_dict)
[docs] def obtain_whole_inputs( self, logged_dataset: Union[LoggedDataset, MultipleLoggedDataset], evaluation_policies: List[BaseHead], behavior_policy_name: Optional[str] = None, dataset_id: Optional[int] = None, require_value_prediction: bool = False, require_weight_prediction: bool = False, resample_initial_state: bool = False, q_function_method: str = "fqe", v_function_method: str = "fqe", w_function_method: str = "dice", k_fold: int = 1, n_steps: int = 10000, n_trajectories_on_policy_evaluation: int = 100, use_stationary_distribution_on_policy_evaluation: bool = False, minimum_rollout_length: int = 0, maximum_rollout_length: int = 100, random_state: Optional[int] = None, path: str = "input_dict/", save_relative_path: bool = False, ) -> OPEInputDict: """Obtain input as a dictionary. Parameters ------- logged_dataset: LoggedDataset or MultipleLoggedDataset Logged dataset containing the following. .. code-block:: python key: [ size, n_trajectories, step_per_trajectory, action_type, n_actions, action_dim, action_keys, action_meaning, state_dim, state_keys, state, action, reward, done, terminal, info, pscore, behavior_policy, dataset_id, ] .. seealso:: :class:`scope_rl.dataset.SyntheticDataset` describes the components of :class:`logged_dataset`. evaluation_policies: list of BaseHead or BaseHead Evaluation policies. .. tip:: 1. When using LoggedDataset, evaluation_policies should be ``List[BaseHead]`` (``[BaseHead, BaseHead, ..]``). 2. When using MultipleLoggedDataset and apply the same evaluation policies across behavior_policies and dataset_ids, evaluation_policies should be ``List[BaseHead]``. 3. When using MultipleLoggedDataset and apply the same evaluation policies across dataset_ids but different evaluation_policies across behavior policies, evaluation_policies should be ``Dict[str, List[BaseHead]]``. (key: ``[behavior_policy_name]``). 4. When using MultipleLoggedDataset and apply different evaluation policies across dataset_ids and behavior policies, evaluation_policies should be ``Dict[str, List[BaseHead]]``. (key: ``[behavior_policy_name][dataset_id]``) behavior_policy_name: str, default=None Name of the behavior policy. dataset_id: int, default=None Id of the logged dataset. require_value_prediction: bool, default=False Whether to obtain an estimated value function. require_weight_prediction: bool, default=False Whether to obtain an estimated weight function. resample_initial_state: bool, default=False Whether to resample initial state distribution using the given evaluation policy. If `False`, the initial state distribution of the behavior policy is used instead. Note that this parameter is applicable only when self.env is given. q_function_method: {"fqe", "dice_q", "mql"}, default="fqe" Method to estimate :math:`Q(s, a)`. v_function_method: {"fqe", "dice_q", "dice_v", "mql", "mvl"}, default="fqe" Method to estimate :math:`V(s)`. w_function_method: {"dice", "mwl"}, default="dice" Method to estimate :math:`w(s, a)` and :math:`w(s)`. k_fold: int, default=1 (> 0) Number of folds to perform cross-fitting. If :math:`K>1`, we split the logged dataset into :math:`K` folds. :math:`\\mathcal{D}_j` is the :math:`j`-th split of logged data consisting of :math:`n_k` samples. Then, the value and weight functions (:math:`w^j` and :math:`Q^j`) are trained on the subset of data used for OPE, i.e., :math:`\\mathcal{D} \\setminus \\mathcal{D}_j`. If :math:`K=1`, the value and weight functions are trained on the entire data. n_steps: int, default=10000 (> 0) Number of gradient steps to fit weight and value learning methods. n_trajectories_on_policy_evaluation: int, default=None (> 0) Number of episodes to perform on-policy evaluation. use_stationary_distribution_on_policy_evaluation: bool, default=False Whether to evaluate a policy based on stationary distribution. If `True`, evaluation policy is evaluated by rollout without resetting environment at each episode. minimum_rollout_length: int, default=0 (>= 0) Minimum length of rollout to collect initial state. maximum_rollout_length: int, default=100 (>= minimum_rollout_length) Maximum length of rollout to collect initial state. random_state: int, default=None (>= 0) Random state. path: str Path to the directory. Either absolute or relative path is acceptable. save_relative_path: bool, default=False. Whether to save a relative path. If `True`, a path relative to the scope-rl directory will be saved. If `False`, the absolute path will be saved. Note that this option was added in order to run examples in the documentation properly. Otherwise, the default setting (`False`) is recommended. Return ------- input_dicts: OPEInputDict or MultipleInputDict MultipleInputDict is an instance containing (multiple) input dictionary for OPE. Each input dict is accessible by the following command. .. code-block:: python input_dict_ = input_dict.get(behavior_policy_name=behavior_policy.name, dataset_id=0) Each input dict consists of the following. .. code-block:: python key: [evaluation_policy_name][ evaluation_policy_action, evaluation_policy_action_dist, state_action_value_prediction, initial_state_value_prediction, state_action_marginal_importance_weight, state_marginal_importance_weight, on_policy_policy_value, gamma, behavior_policy, evaluation_policy, dataset_id, ] evaluation_policy_action: ndarray of shape (n_trajectories * step_per_trajectories, action_dim) Action chosen by the deterministic evaluation policy. If action_type is "discrete", `None` is recorded. evaluation_policy_action_dist: ndarray of shape (n_trajectories * step_per_trajectory, n_actions) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}`. If action_type is "continuous", `None` is recorded. state_action_value_prediction: ndarray If action_type is "discrete", :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. shape (n_trajectories * step_per_trajectory, n_actions) If action_type is "continuous", :math:`\\hat{Q}` for the observed action and that chosen by the evaluation policy, i.e., (row 0) :math:`\\hat{Q}(s_t, a_t)` and (row 2) :math:`\\hat{Q}(s_t, \\pi(a \\mid s_t))`. shape (n_trajectories * step_per_trajectory, 2) If require_value_prediction is `False`, `None` is recorded. initial_state_value_prediction: ndarray of shape (n_trajectories, ) Estimated initial state value. If use_base_model is `False`, `None` is recorded. state_action_marginal_importance_weight: ndarray of shape (n_trajectories * step_per_trajectory, ) Estimated state-action marginal importance weight, i.e., :math:`\\hat{w}(s_t, a_t) \\approx d^{\\pi}(s_t, a_t) / d^{\\pi_b}(s_t, a_t)`. If require_weight_prediction is `False`, `None` is recorded. state_marginal_importance_weight: ndarray of shape (n_trajectories * step_per_trajectory, ) Estimated state marginal importance weight, i.e., :math:`\\hat{w}(s_t) \\approx d^{\\pi}(s_t) / d^{\\pi_b}(s_t)`. If require_weight_prediction is `False`, `None` is recorded. on_policy_policy_value: ndarray of shape (n_trajectories_on_policy_evaluation, ) On-policy policy value. If self.env is `None`, `None` is recorded. gamma: float Discount factor. behavior_policy: str Name of the behavior policy. evaluation_policy: str Name of the evaluation policy. dataset_id: int Id of the logged dataset. """ apply_different_eval_policies_across_all_datasets = False apply_different_eval_policies_across_behavior_policies = False if isinstance(evaluation_policies, dict): if isinstance(list(evaluation_policies.values())[0], BaseHead): apply_different_eval_policies_across_behavior_policies = True else: apply_different_eval_policies_across_all_datasets = True evaluation_policies_ = deepcopy(evaluation_policies) if isinstance(logged_dataset, MultipleLoggedDataset): if behavior_policy_name is None and dataset_id is None: evaluation_policies = defaultdict(list) for behavior_policy, n_datasets in logged_dataset.n_datasets.items(): for dataset_id_ in range(n_datasets): if apply_different_eval_policies_across_all_datasets: evaluation_policies[behavior_policy].append( evaluation_policies_[behavior_policy][dataset_id_] ) elif apply_different_eval_policies_across_behavior_policies: evaluation_policies[behavior_policy].append( evaluation_policies_[behavior_policy] ) else: evaluation_policies[behavior_policy].append( evaluation_policies_ ) elif behavior_policy_name is None and dataset_id is not None: evaluation_policies = {} for behavior_policy, n_datasets in logged_dataset.n_datasets.items(): if apply_different_eval_policies_across_all_datasets: evaluation_policies[behavior_policy] = evaluation_policies_[ behavior_policy ][dataset_id] elif apply_different_eval_policies_across_behavior_policies: evaluation_policies[behavior_policy] = evaluation_policies_[ behavior_policy ] else: evaluation_policies[behavior_policy] = evaluation_policies_ elif behavior_policy_name is not None and dataset_id is None: evaluation_policies = [] for dataset_id_ in range( logged_dataset.n_datasets[behavior_policy_name] ): if apply_different_eval_policies_across_all_datasets: evaluation_policies.append( evaluation_policies_[behavior_policy_name][dataset_id_] ) elif apply_different_eval_policies_across_behavior_policies: evaluation_policies[behavior_policy].append( evaluation_policies_[behavior_policy_name] ) else: evaluation_policies[behavior_policy].append( evaluation_policies_ ) else: if apply_different_eval_policies_across_all_datasets: evaluation_policies = evaluation_policies_[behavior_policy_name][ dataset_id ] elif apply_different_eval_policies_across_behavior_policies: evaluation_policies = evaluation_policies_[behavior_policy_name] else: evaluation_policies = evaluation_policies_ else: behavior_policy_name = logged_dataset["behavior_policy"] dataset_id = logged_dataset["dataset_id"] if apply_different_eval_policies_across_all_datasets: evaluation_policies = evaluation_policies_[behavior_policy_name][ dataset_id ] elif apply_different_eval_policies_across_behavior_policies: evaluation_policies = evaluation_policies_[behavior_policy_name] else: evaluation_policies = evaluation_policies_ if isinstance(logged_dataset, MultipleLoggedDataset): input_dict = MultipleInputDict( action_type=logged_dataset.action_type, path=path, save_relative_path=save_relative_path, ) if behavior_policy_name is None and dataset_id is None: behavior_policies = logged_dataset.behavior_policy_names for i in tqdm( np.arange(len(behavior_policies)), desc="[collect input data: behavior_policy]", total=len(behavior_policies), ): n_datasets = logged_dataset.n_datasets[behavior_policies[i]] for dataset_id_ in tqdm( np.arange(n_datasets), desc="[collect input data: dataset_id]", total=n_datasets, ): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policies[i], dataset_id=dataset_id_, ) input_dict_ = self._obtain_whole_inputs( logged_dataset=logged_dataset_, evaluation_policies=evaluation_policies[ behavior_policies[i] ][dataset_id_], require_value_prediction=require_value_prediction, require_weight_prediction=require_weight_prediction, resample_initial_state=resample_initial_state, q_function_method=q_function_method, v_function_method=v_function_method, w_function_method=w_function_method, k_fold=k_fold, n_steps=n_steps, n_trajectories_on_policy_evaluation=n_trajectories_on_policy_evaluation, use_stationary_distribution_on_policy_evaluation=use_stationary_distribution_on_policy_evaluation, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) input_dict.add( input_dict_, behavior_policy_name=behavior_policies[i], dataset_id=dataset_id_, ) elif behavior_policy_name is None and dataset_id is not None: behavior_policies = logged_dataset.behavior_policy_names for i in tqdm( np.arange(len(behavior_policies)), desc="[collect input data: behavior_policy]", total=len(behavior_policies), ): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policies[i], dataset_id=dataset_id ) input_dict_ = self._obtain_whole_inputs( logged_dataset=logged_dataset_, evaluation_policies=evaluation_policies[behavior_policies[i]], require_value_prediction=require_value_prediction, require_weight_prediction=require_weight_prediction, resample_initial_state=resample_initial_state, q_function_method=q_function_method, v_function_method=v_function_method, w_function_method=w_function_method, k_fold=k_fold, n_steps=n_steps, n_trajectories_on_policy_evaluation=n_trajectories_on_policy_evaluation, use_stationary_distribution_on_policy_evaluation=use_stationary_distribution_on_policy_evaluation, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) input_dict.add( input_dict_, behavior_policy_name=behavior_policies[i], dataset_id=dataset_id, ) elif behavior_policy_name is not None and dataset_id is None: n_datasets = logged_dataset.n_datasets[behavior_policy_name] for dataset_id_ in tqdm( np.arange(n_datasets), desc="[collect input data: dataset_id]", total=n_datasets, ): logged_dataset_ = logged_dataset.get( behavior_policy_name=behavior_policy_name, dataset_id=dataset_id_, ) input_dict_ = self._obtain_whole_inputs( logged_dataset=logged_dataset_, evaluation_policies=evaluation_policies[dataset_id_], require_value_prediction=require_value_prediction, require_weight_prediction=require_weight_prediction, resample_initial_state=resample_initial_state, q_function_method=q_function_method, v_function_method=v_function_method, w_function_method=w_function_method, k_fold=k_fold, n_steps=n_steps, n_trajectories_on_policy_evaluation=n_trajectories_on_policy_evaluation, use_stationary_distribution_on_policy_evaluation=use_stationary_distribution_on_policy_evaluation, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) input_dict.add( input_dict_, behavior_policy_name=behavior_policy_name, dataset_id=dataset_id_, ) else: logged_dataset = logged_dataset.get( behavior_policy_name=behavior_policy_name, dataset_id=dataset_id ) input_dict = self._obtain_whole_inputs( logged_dataset=logged_dataset, evaluation_policies=evaluation_policies, require_value_prediction=require_value_prediction, require_weight_prediction=require_weight_prediction, resample_initial_state=resample_initial_state, q_function_method=q_function_method, v_function_method=v_function_method, w_function_method=w_function_method, k_fold=k_fold, n_steps=n_steps, n_trajectories_on_policy_evaluation=n_trajectories_on_policy_evaluation, use_stationary_distribution_on_policy_evaluation=use_stationary_distribution_on_policy_evaluation, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) else: input_dict = self._obtain_whole_inputs( logged_dataset=logged_dataset, evaluation_policies=evaluation_policies, require_value_prediction=require_value_prediction, require_weight_prediction=require_weight_prediction, resample_initial_state=resample_initial_state, q_function_method=q_function_method, v_function_method=v_function_method, w_function_method=w_function_method, k_fold=k_fold, n_steps=n_steps, n_trajectories_on_policy_evaluation=n_trajectories_on_policy_evaluation, use_stationary_distribution_on_policy_evaluation=use_stationary_distribution_on_policy_evaluation, minimum_rollout_length=minimum_rollout_length, maximum_rollout_length=maximum_rollout_length, random_state=random_state, ) return input_dict