Source code for scope_rl.ope.discrete.cumulative_distribution_estimators

# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.

"""Cumulative Distribution Off-Policy Estimators for discrete action cases."""
from dataclasses import dataclass
from typing import Tuple, Optional, Dict

import numpy as np
from sklearn.utils import check_scalar

from ..estimators_base import (
    BaseCumulativeDistributionOPEEstimator,
)
from ...utils import check_array


[docs]@dataclass class CumulativeDistributionDM( BaseCumulativeDistributionOPEEstimator, ): """Direct Method (DM) for estimating the cumulative distribution function (CDF) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseCumulativeDistributionOPEEstimator` Imported as: :class:`scope_rl.ope.discrete.CumulativeDistributionDM` Note ------- DM estimates the CDF using the initial state value as follows. .. math:: \\hat{F}_{\\mathrm{DM}}(m, \\pi; \\mathcal{D}) := \\frac{1}{n} \\sum_{i=1}^n \\sum_{a \\in \\mathcal{A}} \\pi(a \\mid s_0^{(i)}) \\hat{G}(m; s_0^{(i)}, a) where :math:`\\hat{F}(\\cdot)` is the estimated cumulative distribution function and :math:`\\hat{G}(\\cdot)` is an estimator for :math:`\\mathbb{E} \\left[ \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t \\leq m \\right \\} \\mid s,a \\right]`. DM has low variance compared to other estimators, but can produce larger bias due to approximation errors. There are several methods to estimate :math:`\\hat{Q}(s, a)` such as Fitted Q Evaluation (FQE) (Le et al., 2019) and Minimax Q-Function Learning (MQL) (Uehara et al., 2020). .. seealso:: The implementation of FQE is provided by `d3rlpy <https://d3rlpy.readthedocs.io/en/latest/references/off_policy_evaluation.html>`_. The implementations of Minimax Learning is available at :class:`scope_rl.ope.weight_value_learning`. Parameters ------- estimator_name: str, default="cdf_dm" Name of the estimator. References ------- Yash Chandak, Scott Niekum, Bruno Castro da Silva, Erik Learned-Miller, Emma Brunskill, and Philip S. Thomas. "Universal Off-Policy Evaluation." 2021. Audrey Huang, Liu Leqi, Zachary C. Lipton, and Kamyar Azizzadenesheli. "Off-Policy Risk Assessment in Contextual Bandits." 2021. Masatoshi Uehara, Jiawei Huang, and Nan Jiang. "Minimax Weight and Q-Function Learning for Off-Policy Evaluation." 2020. Hoang Le, Cameron Voloshin, and Yisong Yue. "Batch Policy Learning under Constraints." 2019. """ estimator_name: str = "cdf_dm" def __post_init__(self): self.action_type = "discrete"
[docs] def estimate_cumulative_distribution_function( self, step_per_trajectory: int, reward: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> Tuple[np.ndarray]: """Estimate the cumulative distribution function (CDF) of the reward distribution under the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_cumulative_distribution_function: ndarray of shape (n_partition, ) or (n_episode, ) Estimated cumulative distribution function for the pre-defined reward scale. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1 ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) check_array(reward, name="reward", expected_dim=1) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( reward_scale, name="reward_scale", expected_dim=1, ) if not ( reward.shape[0] == evaluation_policy_action_dist.shape[0] == state_action_value_prediction.shape[0] ): raise ValueError( "Expected `reward.shape[0] == evaluation_policy_action_dist.shape[0] == state_action_value_prediction.shape[0]`" ", but found False" ) if reward.shape[0] % step_per_trajectory: raise ValueError( "Expected `reward.shape[0] \\% step_per_trajectory == 0`, but found False" ) if ( evaluation_policy_action_dist.shape[1] != state_action_value_prediction.shape[1] ): raise ValueError( "Expected evaluation_policy_action_dist.shape[1] == state_action_value_prediction.shape[1], but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) ( trajectory_wise_reward, trajectory_wise_importance_weight, initial_state_value_prediction, ) = self._aggregate_trajectory_wise_statistics_discrete( step_per_trajectory=step_per_trajectory, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, ) initial_state_value_prediction = np.clip( initial_state_value_prediction, reward_scale.min(), reward_scale.max() ) density = np.histogram( initial_state_value_prediction, bins=reward_scale, density=True )[0] probability_density_function = density * np.diff(reward_scale) return np.insert(probability_density_function, 0, 0).cumsum()
[docs] def estimate_mean( self, step_per_trajectory: int, reward: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate mean. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_mean: float Estimated mean of the reward under the evaluation policy. """ cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, reward=reward, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) return (np.diff(cumulative_density) * reward_scale[1:]).sum()
[docs] def estimate_variance( self, step_per_trajectory: int, reward: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate variance. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_variance: float Estimated variance of the reward under the evaluation policy. """ cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, reward=reward, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) mean = (np.diff(cumulative_density) * reward_scale[1:]).sum() return (np.diff(cumulative_density) * (reward_scale[1:] - mean) ** 2).sum()
[docs] def estimate_conditional_value_at_risk( self, step_per_trajectory: int, reward: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, alphas: Optional[np.ndarray] = None, **kwargs, ): """Estimate conditional value at risk. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alphas: array-like of shape (n_alpha, ), default=None Set of proportions of the shaded region. The values should be within `[0, 1)`. If `None` is given, :class:`np.linspace(0, 1, 21)` will be used. Return ------- estimated_conditional_value_at_risk: ndarray of (n_alpha, ) Estimated conditional value at risk (CVaR) of the reward under the evaluation policy. """ if alphas is None: alphas = np.linspace(0, 1, 21) check_array(alphas, name="alphas", expected_dim=1, min_val=0.0, max_val=1.0) alphas = np.sort(alphas) cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, reward=reward, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) cvar = np.zeros_like(alphas) for i, alpha in enumerate(alphas): idx_ = np.nonzero(cumulative_density[1:] > alpha)[0] if len(idx_) == 0: cvar[i] = ( np.diff(cumulative_density) * reward_scale[1:] ).sum() / cumulative_density[-1] elif idx_[0] == 0: cvar[i] = reward_scale[1] else: lower_idx_ = idx_[0] relative_probability_density = ( np.diff(cumulative_density)[: lower_idx_ + 1] / cumulative_density[lower_idx_ + 1] ) cvar[i] = ( relative_probability_density * reward_scale[1 : lower_idx_ + 2] ).sum() return cvar
[docs] def estimate_interquartile_range( self, step_per_trajectory: int, reward: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, alpha: float = 0.05, **kwargs, ) -> Dict[str, float]: """Estimate interquartile range. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alpha: float, default=0.05 Proportion of the shaded region. Return ------- estimated_interquartile_range: dict Estimated interquartile range of the reward under the evaluation policy. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% quartile (lower), {100 * (1. - alpha)}% quartile (upper), ] """ check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=0.5) cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, reward=reward, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) lower_idx_ = np.nonzero(cumulative_density > alpha)[0] median_idx_ = np.nonzero(cumulative_density > 0.5)[0] upper_idx_ = np.nonzero(cumulative_density > 1 - alpha)[0] estimated_interquartile_range = { "median": self._target_value_given_idx( median_idx_, reward_scale=reward_scale ), f"{100 * (1. - alpha)}% quartile (lower)": self._target_value_given_idx( lower_idx_, reward_scale=reward_scale, ), f"{100 * (1. - alpha)}% quartile (upper)": self._target_value_given_idx( upper_idx_, reward_scale=reward_scale, ), } return estimated_interquartile_range
[docs]@dataclass class CumulativeDistributionTIS( BaseCumulativeDistributionOPEEstimator, ): """Trajectory-wise Importance Sampling (TIS) for estimating the cumulative distribution function (CDF) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseCumulativeDistributionOPEEstimator` Imported as: :class:`scope_rl.ope.discrete.CumulativeDistributionTIS` Note ------- TIS estimates the CDF via trajectory-wise importance weighting as follows. .. math:: \\hat{F}_{\\mathrm{TIS}}(m, \\pi; \\mathcal{D}) := \\frac{1}{n} \\sum_{i=1}^n w_{0:T-1}^{(i)} \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t^{(i)} \\leq m \\right \\} where :math:`\\hat{F}(\\cdot)` is the estimated cumulative distribution function, :math:`w_{0:T-1} := \\prod_{t=0}^{T-1} (\\pi(a_t \\mid s_t) / \\pi_0(a_t \\mid s_t))` is the trajectory-wise importance weight, and :math:`\\mathbb{I} \\{ \\cdot \\}` is the indicator function. TIS enables an unbiased estimation of the policy value. However, when the trajectory length (:math:`T`) is large, TIS suffers from high variance due to the product of importance weights over the entire horizon. Parameters ------- estimator_name: str, default="cdf_tis" Name of the estimator. References ------- Yash Chandak, Scott Niekum, Bruno Castro da Silva, Erik Learned-Miller, Emma Brunskill, and Philip S. Thomas. "Universal Off-Policy Evaluation." 2021. Audrey Huang, Liu Leqi, Zachary C. Lipton, and Kamyar Azizzadenesheli. "Off-Policy Risk Assessment in Contextual Bandits." 2021. """ estimator_name: str = "cdf_tis" def __post_init__(self): self.action_type = "discrete"
[docs] def estimate_cumulative_distribution_function( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> Tuple[np.ndarray]: """Estimate the cumulative distribution function (CDF) of the reward distribution under the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_cumulative_distribution_function: ndarray of shape (n_partition, ) or (n_episode, ) Estimated cumulative distribution function for the pre-defined reward scale. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1 ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) check_array(reward, name="reward", expected_dim=1) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) check_array( reward_scale, name="reward_scale", expected_dim=1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_trajectory_wise_pscore.shape[0]`, " "but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) ( trajectory_wise_reward, trajectory_wise_importance_weight, initial_state_value_prediction, ) = self._aggregate_trajectory_wise_statistics_discrete( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, gamma=gamma, ) n = len(trajectory_wise_reward) sort_idxes = trajectory_wise_reward.argsort() sorted_importance_weight = trajectory_wise_importance_weight[sort_idxes] cumulative_density = np.clip(sorted_importance_weight.cumsum() / n, 0, 1) trajectory_wise_reward = np.clip( trajectory_wise_reward, reward_scale.min(), reward_scale.max() ) histogram = np.histogram( trajectory_wise_reward, bins=reward_scale, density=False )[0] idx = histogram.cumsum().astype(int) - 1 idx = np.where(idx < 0, 0, idx) cumulative_density = cumulative_density[idx] return np.insert(cumulative_density, 0, 0)
[docs] def estimate_mean( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate mean. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_mean: float Estimated mean of the reward under the evaluation policy. """ cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, reward_scale=reward_scale, gamma=gamma, ) return (np.diff(cumulative_density) * reward_scale[1:]).sum()
[docs] def estimate_variance( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate variance. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_variance: float Estimated variance of the reward under the evaluation policy. """ cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, reward_scale=reward_scale, gamma=gamma, ) mean = (np.diff(cumulative_density) * reward_scale[1:]).sum() return (np.diff(cumulative_density) * (reward_scale[1:] - mean) ** 2).sum()
[docs] def estimate_conditional_value_at_risk( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, alphas: Optional[np.ndarray] = None, **kwargs, ): """Estimate conditional value at risk. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alphas: array-like of shape (n_alpha, ), default=None Set of proportions of the shaded region. The values should be within `[0, 1)`. If `None` is given, :class:`np.linspace(0, 1, 21)` will be used. Return ------- estimated_conditional_value_at_risk: ndarray of (n_alpha, ) Estimated conditional value at risk (CVaR) of the reward under the evaluation policy. """ if alphas is None: alphas = np.linspace(0, 1, 21) check_array(alphas, name="alphas", expected_dim=1, min_val=0.0, max_val=1.0) alphas = np.sort(alphas) cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, reward_scale=reward_scale, gamma=gamma, ) cvar = np.zeros_like(alphas) for i, alpha in enumerate(alphas): idx_ = np.nonzero(cumulative_density[1:] > alpha)[0] if len(idx_) == 0: cvar[i] = ( np.diff(cumulative_density) * reward_scale[1:] ).sum() / cumulative_density[-1] elif idx_[0] == 0: cvar[i] = reward_scale[1] else: lower_idx_ = idx_[0] relative_probability_density = ( np.diff(cumulative_density)[: lower_idx_ + 1] / cumulative_density[lower_idx_ + 1] ) cvar[i] = ( relative_probability_density * reward_scale[1 : lower_idx_ + 2] ).sum() return cvar
[docs] def estimate_interquartile_range( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, alpha: float = 0.05, **kwargs, ) -> Dict[str, float]: """Estimate interquartile range. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alpha: float, default=0.05 Proportion of the shaded region. Return ------- estimated_interquartile_range: dict Estimated interquartile range of the reward under the evaluation policy. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% quartile (lower), {100 * (1. - alpha)}% quartile (upper), ] """ check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=0.5) cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, reward_scale=reward_scale, gamma=gamma, ) lower_idx_ = np.nonzero(cumulative_density > alpha)[0] median_idx_ = np.nonzero(cumulative_density > 0.5)[0] upper_idx_ = np.nonzero(cumulative_density > 1 - alpha)[0] estimated_interquartile_range = { "median": self._target_value_given_idx( median_idx_, reward_scale=reward_scale ), f"{100 * (1. - alpha)}% quartile (lower)": self._target_value_given_idx( lower_idx_, reward_scale=reward_scale, ), f"{100 * (1. - alpha)}% quartile (upper)": self._target_value_given_idx( upper_idx_, reward_scale=reward_scale, ), } return estimated_interquartile_range
[docs]@dataclass class CumulativeDistributionTDR( BaseCumulativeDistributionOPEEstimator, ): """Trajectory-wise Doubly Robust (TDR) for estimating the cumulative distribution function (CDF) for discrete action spaces. Bases: :class:`scope_rl.ope.BaseCumulativeDistributionOPEEstimator` Imported as: :class:`scope_rl.ope.discrete.CumulativeDistributionTrajectoryWiseDR` Note ------- TDR estimates the CDF via trajectory-wise importance weighting and estimated Q-function :math:`\\hat{Q}` as follows. .. math:: \\hat{F}_{\\mathrm{TDR}}(m, \\pi; \\mathcal{D}) &:= \\frac{1}{n} \\sum_{i=1}^n \\sum_{a \\in \\mathcal{A}} \\pi(a \\mid s_0^{(i)}) \\hat{G}(m; s_0^{(i)}, a) \\\\ & \quad \quad + \\frac{1}{n} \\sum_{i=1}^n w_{0:T-1}^{(i)} \\left( \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t^{(i)} \\leq m \\right \\} - \\hat{G}(m; s_0^{(i)}, a_0^{(i)}) \\right) where :math:`\\hat{F}(\\cdot)` is the estimated cumulative distribution function and :math:`\\hat{G}(\\cdot;s,a)` is an estimator for :math:`\\mathbb{E} \\left[ \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t \\leq m \\right \\} \\mid s,a \\right]`. :math:`w_{0:T-1} := \\prod_{t=0}^{T-1} (\\pi(a_t \\mid s_t) / \\pi_0(a_t \\mid s_t))` is the trajectory-wise importance weight and :math:`\\mathbb{I} \\{ \\cdot \\}` is the indicator function. TDR is unbiased and has lower variance than TIS when :math:`\\hat{Q}(\\cdot)` is reasonably accurate and satisfies :math:`0 < \\hat{Q}(\\cdot) < 2 Q(\\cdot)`. However, when the importance weight is quite large, it may still suffer from a high variance. Parameters ------- estimator_name: str, default="cdf_tdr" Name of the estimator. References ------- Yash Chandak, Scott Niekum, Bruno Castro da Silva, Erik Learned-Miller, Emma Brunskill, and Philip S. Thomas. "Universal Off-Policy Evaluation." 2021. Audrey Huang, Liu Leqi, Zachary C. Lipton, and Kamyar Azizzadenesheli. "Off-Policy Risk Assessment in Contextual Bandits." 2021. """ estimator_name: str = "cdf_tdr" def __post_init__(self): self.action_type = "discrete"
[docs] def estimate_cumulative_distribution_function( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> Tuple[np.ndarray]: """Estimate the cumulative distribution function (CDF) of the reward distribution under the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_cumulative_distribution_function: ndarray of shape (n_partition, ) or (n_episode, ) Estimated cumulative distribution function for the pre-defined reward scale. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1 ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) check_array(reward, name="reward", expected_dim=1) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( reward_scale, name="reward_scale", expected_dim=1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] == state_action_value_prediction.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_trajectory_wise_pscore.shape[0] " "== state_action_value_prediction.shape[0]`, but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if ( evaluation_policy_action_dist.shape[1] != state_action_value_prediction.shape[1] ): raise ValueError( "Expected evaluation_policy_action_dist.shape[1] == state_action_value_prediction.shape[1], but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) ( trajectory_wise_reward, trajectory_wise_importance_weight, initial_state_value_prediction, ) = self._aggregate_trajectory_wise_statistics_discrete( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, gamma=gamma, ) trajectory_wise_reward = np.clip( trajectory_wise_reward, reward_scale.min(), reward_scale.max() ) initial_state_value_prediction = np.clip( initial_state_value_prediction, reward_scale.min(), reward_scale.max() ) weighted_residual = np.zeros_like(reward_scale) for i, threshold in enumerate(reward_scale): observation = (trajectory_wise_reward <= threshold).astype(int) prediction = (initial_state_value_prediction <= threshold).astype(int) weighted_residual[i] = ( trajectory_wise_importance_weight * (observation - prediction) ).mean() histogram_baseline = np.histogram( initial_state_value_prediction, bins=reward_scale, density=True )[0] histogram_baseline = (histogram_baseline * np.diff(reward_scale)).cumsum() histogram_baseline = np.insert(histogram_baseline, 0, 0) cumulative_density = weighted_residual + histogram_baseline return np.clip(np.maximum.accumulate(cumulative_density), 0, 1)
[docs] def estimate_mean( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate mean. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_mean: float Estimated mean of the reward under the evaluation policy. """ cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) return (np.diff(cumulative_density) * reward_scale[1:]).sum()
[docs] def estimate_variance( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> float: """Estimate variance. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_variance: float Estimated variance of the reward under the evaluation policy. """ cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) mean = (np.diff(cumulative_density) * reward_scale[1:]).sum() return (np.diff(cumulative_density) * (reward_scale[1:] - mean) ** 2).sum()
[docs] def estimate_conditional_value_at_risk( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, alphas: Optional[np.ndarray] = None, **kwargs, ): """Estimate conditional value at risk. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alphas: array-like of shape (n_alpha, ), default=None Set of proportions of the shaded region. The values should be within `[0, 1)`. If `None` is given, :class:`np.linspace(0, 1, 21)` will be used. Return ------- estimated_conditional_value_at_risk: ndarray of (n_alpha, ) Estimated conditional value at risk (CVaR) of the reward under the evaluation policy. """ if alphas is None: alphas = np.linspace(0, 1, 21) check_array(alphas, name="alphas", expected_dim=1, min_val=0.0, max_val=1.0) alphas = np.sort(alphas) cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) cvar = np.zeros_like(alphas) for i, alpha in enumerate(alphas): idx_ = np.nonzero(cumulative_density[1:] > alpha)[0] if len(idx_) == 0: cvar[i] = ( np.diff(cumulative_density) * reward_scale[1:] ).sum() / cumulative_density[-1] elif idx_[0] == 0: cvar[i] = reward_scale[1] else: lower_idx_ = idx_[0] relative_probability_density = ( np.diff(cumulative_density)[: lower_idx_ + 1] / cumulative_density[lower_idx_ + 1] ) cvar[i] = ( relative_probability_density * reward_scale[1 : lower_idx_ + 2] ).sum() return cvar
[docs] def estimate_interquartile_range( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, alpha: float = 0.05, **kwargs, ) -> Dict[str, float]: """Estimate interquartile range. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. alpha: float, default=0.05 Proportion of the shaded region. Return ------- estimated_interquartile_range: dict Estimated interquartile range of the reward under the evaluation policy. .. code-block:: python key: [ mean, {100 * (1. - alpha)}% quartile (lower), {100 * (1. - alpha)}% quartile (upper), ] """ check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=0.5) cumulative_density = self.estimate_cumulative_distribution_function( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, reward_scale=reward_scale, gamma=gamma, ) lower_idx_ = np.nonzero(cumulative_density > alpha)[0] median_idx_ = np.nonzero(cumulative_density > 0.5)[0] upper_idx_ = np.nonzero(cumulative_density > 1 - alpha)[0] estimated_interquartile_range = { "median": self._target_value_given_idx( median_idx_, reward_scale=reward_scale ), f"{100 * (1. - alpha)}% quartile (lower)": self._target_value_given_idx( lower_idx_, reward_scale=reward_scale, ), f"{100 * (1. - alpha)}% quartile (upper)": self._target_value_given_idx( upper_idx_, reward_scale=reward_scale, ), } return estimated_interquartile_range
[docs]@dataclass class CumulativeDistributionSNTIS( CumulativeDistributionTIS, ): """Self Normalized Trajectory-wise Importance Sampling (SNTIS) for estimating the cumulative distribution function (CDF) for discrete action spaces. Bases: :class:`scope_rl.ope.discrete.CumulativeDistributionTIS` :class:`scope_rl.ope.BaseCumulativeDistributionOPEEstimator` Imported as: :class:`scope_rl.ope.discrete.CumulativeDistributionSNTIS` Note ------- SNTIS estimates the CDF via trajectory-wise importance weighting as follows. .. math:: \\hat{F}_{\\mathrm{SNTIS}}(m, \\pi; \\mathcal{D})) := \\sum_{i=1}^n \\frac{w_{0:T-1}^{(i)}}{\\sum_{i'=1}^n w_{0:T-1}^{(i')}} \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t^{(i)} \\leq m \\right \\} where :math:`\\hat{F}(\\cdot)` is the estimated cumulative distribution function, :math:`w_{0:T-1} := \\prod_{t=0}^{T-1} (\\pi(a_t \\mid s_t) / \\pi_0(a_t \\mid s_t))` is the trajectory-wise importance weight, and :math:`\\mathbb{I} \\{ \\cdot \\}` is the indicator function. The self-normalized estimator is no longer unbiased, but has a bounded variance while also remaining consistent. Parameters ------- estimator_name: str, default="cdf_sntis" Name of the estimator. References ------- Yash Chandak, Scott Niekum, Bruno Castro da Silva, Erik Learned-Miller, Emma Brunskill, and Philip S. Thomas. "Universal Off-Policy Evaluation." 2021. Audrey Huang, Liu Leqi, Zachary C. Lipton, and Kamyar Azizzadenesheli. "Off-Policy Risk Assessment in Contextual Bandits." 2021. """ estimator_name: str = "cdf_sntis" def __post_init__(self): self.action_type = "discrete"
[docs] def estimate_cumulative_distribution_function( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> Tuple[np.ndarray]: """Estimate the cumulative distribution function (CDF) of the reward distribution under the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_cumulative_distribution_function: ndarray of shape (n_partition, ) or (n_episode, ) Estimated cumulative distribution function for the pre-defined reward scale. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1 ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) check_array(reward, name="reward", expected_dim=1) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( action, name="action", expected_dim=1, min_val=0, max_val=evaluation_policy_action_dist.shape[1] - 1, ) check_array( reward_scale, name="reward_scale", expected_dim=1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_trajectory_wise_pscore.shape[0]`, " "but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) ( trajectory_wise_reward, trajectory_wise_importance_weight, initial_state_value_prediction, ) = self._aggregate_trajectory_wise_statistics_discrete( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, gamma=gamma, ) weight_sum = trajectory_wise_importance_weight.sum() sort_idxes = trajectory_wise_reward.argsort() sorted_importance_weight = trajectory_wise_importance_weight[sort_idxes] cumulative_density = np.clip( sorted_importance_weight.cumsum() / weight_sum, 0, 1 ) trajectory_wise_reward = np.clip( trajectory_wise_reward, reward_scale.min(), reward_scale.max() ) histogram = np.histogram( trajectory_wise_reward, bins=reward_scale, density=False )[0] idx = histogram.cumsum().astype(int) - 1 idx = np.where(idx < 0, 0, idx) cumulative_density = cumulative_density[idx] return np.insert(cumulative_density, 0, 0)
[docs]@dataclass class CumulativeDistributionSNTDR( CumulativeDistributionTDR, ): """Self Normalized Trajectory-wise Doubly Robust (SNTDR) for estimating the cumulative distribution function (CDF) for discrete action spaces. Bases: :class:`scope_rl.ope.discrete.CumulativeDistributionTDR` :class:`scope_rl.ope.BaseCumulativeDistributionOPEEstimator` Imported as: :class:`scope_rl.ope.discrete.CumulativeDistributionSNTDR` Note ------- SNTDR estimates the CDF via trajectory-wise importance weighting and estimated Q-function :math:`\\hat{Q}` as follows. .. math:: \\hat{F}_{\\mathrm{SNTDR}}(m, \\pi; \\mathcal{D})) &:= \\frac{1}{n} \\sum_{i=1}^n \\sum_{a \\in \\mathcal{A}} \\pi(a \\mid s_0^{(t)}) \\hat{G}(m; s_0^{(t)}, a) \\\\ & \quad \quad + \\sum_{i=1}^n \\frac{w_{0:T-1}^{(i)}}{\\sum_{i'=1}^n w_{0:T-1}^{(i')}} \\left( \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t^{(i)} \\leq m \\right \\} - \\hat{G}(m; s_0^{(i)}, a_0^{(i)}) \\right) where :math:`\\hat{F}(\\cdot)` is the estimated cumulative distribution function and :math:`\\hat{G}(\\cdot)` is an estimator for :math:`\\mathbb{E} \left[ \\mathbb{I} \\left \\{\\sum_{t=0}^{T-1} \\gamma^t r_t \\leq m \\right \\} \\mid s,a \\right]`. :math:`w_{0:T-1} := \\prod_{t=0}^{T-1} (\\pi(a_t \\mid s_t) / \\pi_0(a_t \\mid s_t))` is the trajectory-wise importance weight and and :math:`\\mathbb{I} \\{ \\cdot \\}` is the indicator function. The self-normalized estimator is no longer unbiased, but has a bounded variance while also remaining consistent. Parameters ------- estimator_name: str, default="cdf_sntdr" Name of the estimator. References ------- Yash Chandak, Scott Niekum, Bruno Castro da Silva, Erik Learned-Miller, Emma Brunskill, and Philip S. Thomas. "Universal Off-Policy Evaluation." 2021. Audrey Huang, Liu Leqi, Zachary C. Lipton, and Kamyar Azizzadenesheli. "Off-Policy Risk Assessment in Contextual Bandits." 2021. """ estimator_name: str = "cdf_sntdr" def __post_init__(self): self.action_type = "discrete"
[docs] def estimate_cumulative_distribution_function( self, step_per_trajectory: int, action: np.ndarray, reward: np.ndarray, pscore: np.ndarray, evaluation_policy_action_dist: np.ndarray, state_action_value_prediction: np.ndarray, reward_scale: np.ndarray, gamma: float = 1.0, **kwargs, ) -> Tuple[np.ndarray]: """Estimate the cumulative distribution function (CDF) of the reward distribution under the evaluation policy. Parameters ------- step_per_trajectory: int (> 0) Number of timesteps in an episode. action: array-like of shape (n_trajectories * step_per_trajectory, ) Action chosen by the behavior policy. reward: array-like of shape (n_trajectories * step_per_trajectory, ) Observed immediate rewards. pscore: array-like of shape (n_trajectories * step_per_trajectory, ) Conditional action choice probability of the behavior policy, i.e., :math:`\\pi_0(a \\mid s)` evaluation_policy_action_dist: array-like of shape (n_trajectories * step_per_trajectory, n_action) Conditional action distribution induced by the evaluation policy, i.e., :math:`\\pi(a \\mid s_t) \\forall a \\in \\mathcal{A}` state_action_value_prediction: array-like of shape (n_trajectories * step_per_trajectory, n_action) :math:`\\hat{Q}` for all actions, i.e., :math:`\\hat{Q}(s_t, a) \\forall a \\in \\mathcal{A}`. reward_scale: array-like of shape (n_partition, ) Scale of the trajectory-wise reward used for x-axis of the CDF plot. gamma: float, default=1.0 Discount factor. The value should be within (0, 1]. Return ------- estimated_cumulative_distribution_function: ndarray of shape (n_partition, ) or (n_episode, ) Estimated cumulative distribution function for the pre-defined reward scale. """ check_scalar( step_per_trajectory, name="step_per_trajectory", target_type=int, min_val=1 ) check_scalar(gamma, name="gamma", target_type=float, min_val=0.0, max_val=1.0) check_array(reward, name="reward", expected_dim=1) check_array( pscore, name="pscore", expected_dim=1, min_val=0.0, max_val=1.0, ) check_array( evaluation_policy_action_dist, name="evaluation_policy_action_dist", expected_dim=2, min_val=0.0, max_val=1.0, ) check_array( state_action_value_prediction, name="state_action_value_prediction", expected_dim=2, ) check_array( reward_scale, name="reward_scale", expected_dim=1, ) if not ( action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_action_dist.shape[0] == state_action_value_prediction.shape[0] ): raise ValueError( "Expected `action.shape[0] == reward.shape[0] == pscore.shape[0] == evaluation_policy_trajectory_wise_pscore.shape[0] " "== state_action_value_prediction.shape[0]`, but found False" ) if action.shape[0] % step_per_trajectory: raise ValueError( "Expected `action.shape[0] \\% step_per_trajectory == 0`, but found False" ) if ( evaluation_policy_action_dist.shape[1] != state_action_value_prediction.shape[1] ): raise ValueError( "Expected evaluation_policy_action_dist.shape[1] == state_action_value_prediction.shape[1], but found False" ) if not np.allclose( evaluation_policy_action_dist.sum(axis=1), np.ones(evaluation_policy_action_dist.shape[0]), ): raise ValueError( "Expected `evaluation_policy_action_dist.sum(axis=1) == np.ones(evaluation_policy_action_dist.shape[0])`" ", but found it False" ) ( trajectory_wise_reward, trajectory_wise_importance_weight, initial_state_value_prediction, ) = self._aggregate_trajectory_wise_statistics_discrete( step_per_trajectory=step_per_trajectory, action=action, reward=reward, pscore=pscore, evaluation_policy_action_dist=evaluation_policy_action_dist, state_action_value_prediction=state_action_value_prediction, gamma=gamma, ) trajectory_wise_reward = np.clip( trajectory_wise_reward, reward_scale.min(), reward_scale.max() ) initial_state_value_prediction = np.clip( initial_state_value_prediction, reward_scale.min(), reward_scale.max() ) weighted_residual = np.zeros_like(reward_scale) for i, threshold in enumerate(reward_scale): observation = (trajectory_wise_reward <= threshold).astype(int) prediction = (initial_state_value_prediction <= threshold).astype(int) weighted_residual[i] = ( trajectory_wise_importance_weight * (observation - prediction) ).sum() / trajectory_wise_importance_weight.sum() histogram_baseline = np.histogram( initial_state_value_prediction, bins=reward_scale, density=True )[0] histogram_baseline = (histogram_baseline * np.diff(reward_scale)).cumsum() histogram_baseline = np.insert(histogram_baseline, 0, 0) cumulative_density = weighted_residual + histogram_baseline return np.clip(np.maximum.accumulate(cumulative_density), 0, 1)