Source code for scope_rl.ope.discrete.basic_estimators

# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.

"""Off-Policy Estimators for discrete action cases."""
from dataclasses import dataclass
from typing import Dict, Optional

import numpy as np
from sklearn.utils import check_scalar

from ..estimators_base import BaseOffPolicyEstimator
from ...utils import check_array


[docs]@dataclass class DirectMethod(BaseOffPolicyEstimator): """Direct Method (DM) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.DirectMethod` Note ------- DM estimates the policy value using an estimated initial state value as follows. .. math:: \\hat{J}_{\\mathrm{DM}} (\\pi; \\mathcal{D}) := \\frac{1}{n} \\sum_{i=1}^n \\sum_{a \\in \\mathcal{A}} \\pi(a | s_0^{(i)}) \\hat{Q}(s_0^{(i)}, a) = \\frac{1}{n} \\sum_{i=1}^n \\hat{V}(s_0^{(i)}), where :math:`\\mathcal{D}=\\{\\{(s_t, a_t, r_t)\\}_{t=0}^{T-1}\\}_{i=1}^n` is the logged dataset with :math:`n` trajectories. :math:`T` indicates step per episode. :math:`\\hat{Q}(s_t, a_t)` is the estimated Q value given a state-action pair. :math:`\\hat{V}(s_t)` is the estimated value function given a state. DM has low variance compared to other estimators, but can produce larger bias due to approximation errors. There are several methods to estimate :math:`\\hat{Q}(s, a)` such as Fitted Q Evaluation (FQE) (Le et al., 2019) and Minimax Q-Function Learning (MQL) (Uehara et al., 2020). .. seealso:: The implementation of FQE is provided by `d3rlpy <https://d3rlpy.readthedocs.io/en/latest/references/off_policy_evaluation.html>`_. The implementations of Minimax Learning is available at :class:`scope_rl.ope.weight_value_learning`. Parameters ------- estimator_name: str, default="dm" Name of the estimator. References ------- Masatoshi Uehara, Jiawei Huang, and Nan Jiang. "Minimax Weight and Q-Function Learning for Off-Policy Evaluation." 2020. Hoang Le, Cameron Voloshin, and Yisong Yue. "Batch Policy Learning under Constraints." 2019. """ estimator_name: str = "dm" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ state_value = ( (state_action_value_prediction * evaluation_policy_action_dist) .sum(axis=1) .reshape((-1, step_per_trajectory)) ) return state_value[:, 0]
[docs] def estimate_policy_value( self, step_per_trajectory: int, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, **kwargs, ) -> float: """Estimate the policy value of the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. Return ------- V_hat: float Estimated policy value. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) if ( state_action_value_prediction.shape[0] != evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `state_action_value_prediction.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if ( state_action_value_prediction.shape[1] != evaluation_policy_action_dist.shape[1] ): raise ValueError( "Expected `state_action_value_prediction.shape[1] == evaluation_policy_action_dist.shape[1]`" ", but found False" ) if state_action_value_prediction.shape[0] % step_per_trajectory: raise ValueError( "Expected `state_action_value_prediction.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) estimated_policy_value = self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, ).mean() return estimated_policy_value
[docs] def estimate_interval( self, step_per_trajectory: int, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, alpha: float = 0.05, ci: str = "bootstrap", n_bootstrap_samples: int = 10000, random_state: Optional[int] = None, **kwargs, ) -> Dict[str, float]: """Estimate the confidence interval of the policy value by nonparametric bootstrap. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. alpha: float, default=0.05 Significance level. The value should be within `[0, 1)`. ci: {"bootstrap", "hoeffding", "bernstein", "ttest"}, default="bootstrap" Name of the method to estimate the confidence interval. n_bootstrap_samples: int, default=10000 (> 0) Number of resampling performed in the bootstrap procedure. random_state: int, default=None (>= 0) Random state. Return ------- estimated_confidence_interval: dict Dictionary storing the estimated mean and upper-lower confidence bounds. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% CI (lower), {100 * (1. - alpha)}% CI (upper), ] """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) if ( state_action_value_prediction.shape[0] != evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `state_action_value_prediction.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if ( state_action_value_prediction.shape[1] != evaluation_policy_action_dist.shape[1] ): raise ValueError( "Expected `state_action_value_prediction.shape[1] == evaluation_policy_action_dist.shape[1]`" ", but found False" ) if state_action_value_prediction.shape[0] % step_per_trajectory: raise ValueError( "Expected `state_action_value_prediction.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) if ci not in self._estimate_confidence_interval.keys(): raise ValueError( f"ci must be one of 'bootstrap', 'hoeffding', 'bernstein', or 'ttest', but {ci} is given" ) estimated_trajectory_value = self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, ) return self._estimate_confidence_interval[ci]( samples=estimated_trajectory_value, alpha=alpha, n_bootstrap_samples=n_bootstrap_samples, random_state=random_state, )
[docs]@dataclass class TrajectoryWiseImportanceSampling(BaseOffPolicyEstimator): """Trajectory-wise Important Sampling (TIS) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.TrajectoryWiseImportanceSampling` Note ------- TIS estimates the policy value via trajectory-wise importance weighting as follows. .. math:: \\hat{J}_{\\mathrm{TIS}} (\\pi; \\mathcal{D}) := \\frac{1}{n} \\sum_{i=1}^n \\sum_{t=0}^{T-1} \\gamma^t w_{0:T-1}^{(i)} r_t^{(i)}, where :math:`w_{0:T-1} := \\prod_{t=0}^{T-1} (\\pi(a_t | s_t) / \\pi_0(a_t | s_t))` is the trajectory-wise importance weight. TIS enables an unbiased estimation of the policy value. However, when the trajectory length (:math:`T`) is large, TIS suffers from high variance due to the product of importance weights over the entire horizon. Parameters ------- estimator_name: str, default="tis" Name of the estimator. References ------- Doina Precup, Richard S. Sutton, and Satinder P. Singh. "Eligibility Traces for Off-Policy Policy Evaluation." 2000. """ estimator_name = "tis" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ behavior_policy_pscore = self._calc_behavior_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, pscore=pscore, pscore_type="trajectory_wise", ) evaluation_policy_pscore = self._calc_evaluation_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, action=action, evaluation_policy_action_dist=evaluation_policy_action_dist, pscore_type="trajectory_wise", ) weight = evaluation_policy_pscore / behavior_policy_pscore reward = reward.reshape((-1, step_per_trajectory)) discount = np.full(step_per_trajectory, gamma).cumprod() / gamma estimated_trajectory_value = (discount[np.newaxis, :] * weight * reward).sum( axis=1 ) return estimated_trajectory_value
[docs] def estimate_policy_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate the policy value of the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- V_hat: float Estimated policy value. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( reward, name="reward", expected_dim=1, ) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) estimated_policy_value = self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, gamma=gamma, ).mean() return estimated_policy_value
[docs] def estimate_interval( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, alpha: float = 0.05, ci: str = "bootstrap", n_bootstrap_samples: int = 10000, random_state: Optional[int] = None, **kwargs, ) -> Dict[str, float]: """Estimate the confidence interval of the policy value by nonparametric bootstrap. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alpha: float, default=0.05 Significance level. The value should be within `[0, 1)`. ci: {"bootstrap", "hoeffding", "bernstein", "ttest"}, default="bootstrap" Name of the method to estimate the confidence interval. n_bootstrap_samples: int, default=10000 (> 0) Number of resampling performed in the bootstrap procedure. random_state: int, default=None (>= 0) Random state. Return ------- estimated_confidence_interval: dict Dictionary storing the estimated mean and upper-lower confidence bounds. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% CI (lower), {100 * (1. - alpha)}% CI (upper), ] """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( reward, name="reward", expected_dim=1, ) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) if ci not in self._estimate_confidence_interval.keys(): raise ValueError( f"ci must be one of 'bootstrap', 'hoeffding', 'bernstein', or 'ttest', but {ci} is given" ) estimated_trajectory_value = self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, gamma=gamma, ) return self._estimate_confidence_interval[ci]( samples=estimated_trajectory_value, alpha=alpha, n_bootstrap_samples=n_bootstrap_samples, random_state=random_state, )
[docs]@dataclass class PerDecisionImportanceSampling(BaseOffPolicyEstimator): """Per-Decision Importance Sampling (PDIS) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.PerDecisionImportanceSampling` Note ------- PDIS estimates the policy value via step-wise importance weighting as follows. .. math:: \\hat{J}_{\\mathrm{PDIS}} (\\pi; \\mathcal{D}) := \\frac{1}{n} \\sum_{i=1}^n \\sum_{t=0}^{T-1} \\gamma^t w_{0:t}^{(i)} r_t^{(i)}, where :math:`w_{0:t} := \\prod_{t'=0}^t (\\pi(a_{t'} | s_{t'}) / \\pi_0(a_{t'} | s_{t'}))` is the importance weight for each time step wrt the previous actions (referred to as the per-decision or step-wise importance weight). By using per-decision importance weighting instead of trajectory-wise importance weighting of TIS, PDIS has lower variance than TIS while remaining unbiased. However, when the trajectory length (:math:`T`) is large, PDIS still suffers from high variance. Parameters ------- estimator_name: str, default="pdis" Name of the estimator. References ------- Doina Precup, Richard S. Sutton, and Satinder P. Singh. "Eligibility Traces for Off-Policy Policy Evaluation." 2000. """ estimator_name = "pdis" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ behavior_policy_pscore = self._calc_behavior_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, pscore=pscore, pscore_type="step_wise", ) evaluation_policy_pscore = self._calc_evaluation_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, action=action, evaluation_policy_action_dist=evaluation_policy_action_dist, pscore_type="step_wise", ) weight = evaluation_policy_pscore / behavior_policy_pscore reward = reward.reshape((-1, step_per_trajectory)) discount = np.full(step_per_trajectory, gamma).cumprod() / gamma estimated_trajectory_value = (discount[np.newaxis, :] * weight * reward).sum( axis=1 ) return estimated_trajectory_value
[docs] def estimate_policy_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate the policy value of the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- V_hat: float Estimated policy value. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( reward, name="reward", expected_dim=1, ) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) return self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, gamma=gamma, ).mean()
[docs] def estimate_interval( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, alpha: float = 0.05, ci: str = "bootstrap", n_bootstrap_samples: int = 10000, random_state: Optional[int] = None, **kwargs, ) -> Dict[str, float]: """Estimate the confidence interval of the policy value by nonparametric bootstrap. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alpha: float, default=0.05 Significance level. The value should be within `[0, 1)`. ci: {"bootstrap", "hoeffding", "bernstein", "ttest"}, default="bootstrap" Name of the method to estimate the confidence interval. n_bootstrap_samples: int, default=10000 (> 0) Number of resampling performed in the bootstrap procedure. random_state: int, default=None (>= 0) Random state. Return ------- estimated_confidence_interval: dict Dictionary storing the estimated mean and upper-lower confidence bounds. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% CI (lower), {100 * (1. - alpha)}% CI (upper), ] """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( reward, name="reward", expected_dim=1, ) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) if ci not in self._estimate_confidence_interval.keys(): raise ValueError( f"ci must be one of 'bootstrap', 'hoeffding', 'bernstein', or 'ttest', but {ci} is given" ) estimated_trajectory_value = self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, gamma=gamma, ) return self._estimate_confidence_interval[ci]( samples=estimated_trajectory_value, alpha=alpha, n_bootstrap_samples=n_bootstrap_samples, random_state=random_state, )
[docs]@dataclass class DoublyRobust(BaseOffPolicyEstimator): """Doubly Robust (DR) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.DoublyRobust` Note ------- DR estimates the policy value via step-wise importance weighting and estimated Q-function :math:`\\hat{Q}` as follows. .. math:: \\hat{J}_{\\mathrm{DR}} (\\pi; \\mathcal{D}) := \\frac{1}{n} \\sum_{i=1}^n \\sum_{t=0}^{T-1} \\gamma^t \\left( w_{0:t}^{(i)} (r_t^{(i)} - \\hat{Q}(s_t^{(i)}, a_t^{(i)})) + w_{0:t-1}^{(i)} \\sum_{a \\in \\mathcal{A}} \\pi(a | s_t^{(i)}) \\hat{Q}(s_t^{(i)}, a) \\right), where :math:`w_{0:t} := \\prod_{t'=0}^t (\\pi(a_{t'} | s_{t'}) / \\pi_0(a_{t'} | s_{t'}))` is the per-decision importance weight. DR is unbiased and has lower variance than PDIS when :math:`\\hat{Q}(\\cdot)` is reasonably accurate and satisfies :math:`0 < \\hat{Q}(\\cdot) < 2 Q(\\cdot)`. However, when the importance weight is quite large, it may still suffer from a high variance. Parameters ------- estimator_name: str, default="dr" Name of the estimator. References ------- Nan Jiang and Lihong Li. "Doubly Robust Off-policy Value Evaluation for Reinforcement Learning." 2016. Philip S. Thomas and Emma Brunskill. "Data-Efficient Off-Policy Policy Evaluation for Reinforcement Learning." 2016. """ estimator_name = "dr" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, gamma: float = 1.0, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ behavior_policy_pscore = self._calc_behavior_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, pscore=pscore, pscore_type="step_wise", ) evaluation_policy_pscore = self._calc_evaluation_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, action=action, evaluation_policy_action_dist=evaluation_policy_action_dist, pscore_type="step_wise", ) weight = evaluation_policy_pscore / behavior_policy_pscore weight_prev = np.roll(weight, 1, axis=1) weight_prev[:, 0] = 1 reward = reward.reshape((-1, step_per_trajectory)) discount = np.full(step_per_trajectory, gamma).cumprod() / gamma state_value_prediction = ( (state_action_value_prediction * evaluation_policy_action_dist) .sum(axis=1) .reshape((-1, step_per_trajectory)) ) state_action_value_prediction = state_action_value_prediction[ np.arange(len(action)), action ].reshape((-1, step_per_trajectory)) estimated_trajectory_value = ( discount[np.newaxis, :] * ( weight * (reward - state_action_value_prediction) + weight_prev * state_value_prediction ) ).sum(axis=1) return estimated_trajectory_value
[docs] def estimate_policy_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate the policy value of the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- V_hat: float Estimated policy value. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( reward, name="reward", expected_dim=1, ) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == state_action_value_prediction.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] " "== state_action_value_prediction.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if ( state_action_value_prediction.shape[1] != evaluation_policy_action_dist.shape[1] ): raise ValueError( "Expected `state_action_value_prediction.shape[1] == evaluation_policy_action_dist.shape[1]`" ", but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) return self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, gamma=gamma, ).mean()
[docs] def estimate_interval( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, gamma: float = 1.0, alpha: float = 0.05, ci: str = "bootstrap", n_bootstrap_samples: int = 10000, random_state: Optional[int] = None, **kwargs, ) -> Dict[str, float]: """Estimate the confidence interval of the policy value by nonparametric bootstrap. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alpha: float, default=0.05 Significance level. The value should be within `[0, 1)`. ci: {"bootstrap", "hoeffding", "bernstein", "ttest"}, default="bootstrap" Name of the method to estimate the confidence interval. n_bootstrap_samples: int, default=10000 (> 0) Number of resampling performed in the bootstrap procedure. random_state: int, default=None (>= 0) Random state. Return ------- estimated_confidence_interval: dict Dictionary storing the estimated mean and upper-lower confidence bounds. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% CI (lower), {100 * (1. - alpha)}% CI (upper), ] """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1, ) check_array( reward, name="reward", expected_dim=1, ) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == state_action_value_prediction.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] " "== state_action_value_prediction.shape[0] == evaluation_policy_action_dist.shape[0]`" ", but found False" ) if ( state_action_value_prediction.shape[1] != evaluation_policy_action_dist.shape[1] ): raise ValueError( "Expected `state_action_value_prediction.shape[1] == evaluation_policy_action_dist.shape[1]`" ", but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) if ci not in self._estimate_confidence_interval.keys(): raise ValueError( f"ci must be one of 'bootstrap', 'hoeffding', 'bernstein', or 'ttest', but {ci} is given" ) estimated_trajectory_value = self._estimate_trajectory_value( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, gamma=gamma, ) return self._estimate_confidence_interval[ci]( samples=estimated_trajectory_value, alpha=alpha, n_bootstrap_samples=n_bootstrap_samples, random_state=random_state, )
[docs]@dataclass class SelfNormalizedTIS(TrajectoryWiseImportanceSampling): """Self-Normalized Trajectory-wise Important Sampling (SNTIS) for discrete action spaces. Bases: :class:`scope_rl.ope.discrete.TrajectoryWiseImportanceSampling` -> :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.SelfNormalizedTIS` Note ------- SNTIS estimates the policy value via self-normalized trajectory-wise importance weighting as follows. .. math:: \\hat{J}_{\\mathrm{SNTIS}} (\\pi; \\mathcal{D}) := \\sum_{i=1}^n \\sum_{t=0}^{T-1} \\gamma^t \\frac{w_{0:T-1}^{(i)}}{\\sum_{i'=1}^n w_{0:T-1}^{(i')}} r_t^{(i)}, where :math:`w_{0:T-1} := \\prod_{t=0}^{T-1} (\\pi(a_t | s_t) / \\pi_0(a_t | s_t))` is the trajectory-wise importance weight. The self-normalized estimator is no longer unbiased, but has variance bounded by :math:`r_{max}^2` while also remaining consistent. Parameters ------- estimator_name: str, default="sntis" Name of the estimator. References ------- Nathan Kallus and Masatoshi Uehara. "Intrinsically Efficient, Stable, and Bounded Off-Policy Evaluation for Reinforcement Learning." 2019. Doina Precup, Richard S. Sutton, and Satinder P. Singh. "Eligibility Traces for Off-Policy Policy Evaluation." 2000. """ estimator_name = "sntis" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ behavior_policy_pscore = self._calc_behavior_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, pscore=pscore, pscore_type="trajectory_wise", ) evaluation_policy_pscore = self._calc_evaluation_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, action=action, evaluation_policy_action_dist=evaluation_policy_action_dist, pscore_type="trajectory_wise", ) weight = evaluation_policy_pscore / behavior_policy_pscore self_normalized_weight = weight / (weight.mean(axis=0)[np.newaxis, :] + 1e-10) reward = reward.reshape((-1, step_per_trajectory)) discount = np.full(step_per_trajectory, gamma).cumprod() / gamma estimated_trajectory_value = ( discount[np.newaxis, :] * self_normalized_weight * reward ).sum(axis=1) return estimated_trajectory_value
[docs]@dataclass class SelfNormalizedPDIS(PerDecisionImportanceSampling): """Self-Normalized Per-Decision Importance Sampling (SNPDIS) for discrete action spaces. Bases: :class:`scope_rl.ope.discrete.PerDecisionImportanceSampling` -> :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.SelfNormalizedPDIS` Note ------- SNPDIS estimates the policy value via self-normalized step-wise importance weighting as follows. .. math:: \\hat{J}_{\\mathrm{SNPDIS}} (\\pi; \\mathcal{D}) := \\sum_{i=1}^n \\sum_{t=0}^{T-1} \\gamma^t \\frac{w_{1:t}^{(i)}}{\\sum_{i'=1}^n w_{1:t}^{(i')}} r_t^{(i)}, where :math:`w_{0:t} := \\prod_{t'=1}^t (\\pi(a_{t'} | s_{t'}) / \\pi_0(a_{t'} | s_{t'}))` is the per-decision importance weight. The self-normalized estimator is no longer unbiased, but has variance bounded by :math:`r_{max}^2` while also remaining consistent. Parameters ------- estimator_name: str, default="snpdis" Name of the estimator. References ------- Nathan Kallus and Masatoshi Uehara. "Intrinsically Efficient, Stable, and Bounded Off-Policy Evaluation for Reinforcement Learning." 2019. Doina Precup, Richard S. Sutton, and Satinder P. Singh. "Eligibility Traces for Off-Policy Policy Evaluation." 2000. """ estimator_name = "snpdis" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, gamma: float = 1.0, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ behavior_policy_pscore = self._calc_behavior_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, pscore=pscore, pscore_type="step_wise", ) evaluation_policy_pscore = self._calc_evaluation_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, action=action, evaluation_policy_action_dist=evaluation_policy_action_dist, pscore_type="step_wise", ) weight = evaluation_policy_pscore / behavior_policy_pscore self_normalized_weight = weight / (weight.mean(axis=0)[np.newaxis, :] + 1e-10) reward = reward.reshape((-1, step_per_trajectory)) discount = np.full(step_per_trajectory, gamma).cumprod() / gamma estimated_trajectory_value = ( discount[np.newaxis, :] * self_normalized_weight * reward ).sum(axis=1) return estimated_trajectory_value
[docs]@dataclass class SelfNormalizedDR(DoublyRobust): """Self-Normalized Doubly Robust (SNDR) for discrete action spaces. Bases: :class:`scope_rl.ope.discrete.DoublyRobust` -> :class:`scope_rl.ope.BaseOffPolicyEstimator` Imported as: :class:`scope_rl.ope.discrete.SelfNormalizedDR` Note ------- SNDR estimates the policy value via self-normalized step-wise importance weighting and estimated Q-function :math:`\\hat{Q}` as follows. .. math:: \\hat{J}_{\\mathrm{SNDR}} (\\pi; \\mathcal{D}) := \\sum_{i=1}^n \\sum_{t=0}^{T-1} \\gamma^t \\left( \\frac{w_{0:t}^{(i)}}{\\sum_{i'=1}^n w_{0:t}^{(i')}} (r_t^{(i)} - \\hat{Q}(s_t^{(i)}, a_t^{(i)})) + \\frac{w_{0:t-1}^{(i)}}{\\sum_{i'=1}^n w_{0:t-1}^{(i')}} \\sum_{a \\in \\mathcal{A}} \\pi(a | s_t^{(i)}) \\hat{Q}(s_t^{(i)}, a) \\right), where :math:`w_{0:t} := \\prod_{t'=0}^t (\\pi(a_{t'} | s_{t'}) / \\pi_0(a_{t'} | s_{t'}))` is the per-decision importance weight. The self-normalized estimator is no longer unbiased, but has variance bounded by :math:`r_{max}^2` while also remaining consistent. Parameters ------- estimator_name: str, default="sndr" Name of the estimator. References ------- Nathan Kallus and Masatoshi Uehara. "Intrinsically Efficient, Stable, and Bounded Off-Policy Evaluation for Reinforcement Learning." 2019. Nan Jiang and Lihong Li. "Doubly Robust Off-policy Value Evaluation for Reinforcement Learning." 2016. Philip S. Thomas and Emma Brunskill. "Data-Efficient Off-Policy Policy Evaluation for Reinforcement Learning." 2016. """ estimator_name = "sndr" def __post_init__(self): self.action_type = "discrete" def _estimate_trajectory_value( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, gamma: float = 1.0, **kwargs, ) -> np.ndarray: """Estimate the trajectory-wise policy value. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_b(a | s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a | s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_trajectory_wise_policy_value: ndarray of shape (n_trajectories, ) Policy value (expected reward under the evaluation policy) estimated for each trajectory. """ behavior_policy_pscore = self._calc_behavior_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, pscore=pscore, pscore_type="step_wise", ) evaluation_policy_pscore = self._calc_evaluation_policy_pscore_discrete( step_per_trajectory=step_per_trajectory, action=action, evaluation_policy_action_dist=evaluation_policy_action_dist, pscore_type="step_wise", ) weight = evaluation_policy_pscore / behavior_policy_pscore self_normalized_weight = weight / (weight.mean(axis=0)[np.newaxis, :] + 1e-10) self_normalized_weight_prev = np.roll(self_normalized_weight, 1, axis=1) self_normalized_weight_prev[:, 0] = 1 reward = reward.reshape((-1, step_per_trajectory)) discount = np.full(step_per_trajectory, gamma).cumprod() / gamma state_value_prediction = ( (state_action_value_prediction * evaluation_policy_action_dist) .sum(axis=1) .reshape((-1, step_per_trajectory)) ) state_action_value_prediction = state_action_value_prediction[ np.arange(len(action)), action ].reshape((-1, step_per_trajectory)) estimated_trajectory_value = ( discount[np.newaxis, :] * ( self_normalized_weight * (reward - state_action_value_prediction) + self_normalized_weight_prev * state_value_prediction ) ).sum(axis=1) return estimated_trajectory_value