Example Codes for Custom OPE Estimators#
Here, we show example codes for implementing custom OPE estimators.
Estimator for Basic OPE#
To define a custom OPE etimator, use BaseOffPolicyEstimator
and implement the following abstract methods.
_estimate_trajectory_value
: estimate policy value for each trajectory in the logged datasetestimate_policy_value
: estimate policy value for a logged datasetestimate_interval
: estimate confidence intervals for OPE estimate for a logged dataset
We provide an example of implementing a naive average estimator, which returns the naive average of the reward observed in the logged dataset as follows.
from scope_rl.ope import BaseOffPolicyEstimator
from dataclasses import dataclass
@dataclass
class NaiveAverageEstimator(BaseOffPolicyEstimator):
estimator_name = "naive_average"
def __post__init__(self):
self.action_type = "discrete" # "continuous"
def _estimate_trajectory_value(
self,
step_per_trajectory: int, # length of a trajectory
reward: np.ndarray, # step-wise reward
gamma: float = 1.0, # discount factor
**kwargs,
) -> np.ndarray:
"""Estimate policy value for each data tuple in the logged dataset."""
reward = reward.reshape((-1, step_per_trajectory))
discount = np.full(step_per_trajectory, gamma).cumprod() / gamma
estimated_trajectory_value = (discount[np.newaxis, :] * reward).sum(
axis=1
)
return estimated_trajectory_value
def estimate_policy_value(
self,
step_per_trajectory: int,
reward: np.ndarray,
gamma: float = 1.0,
**kwargs,
) -> float:
"""Estimate policy value for a logged dataset."""
estimated_policy_value = self._estimate_trajectory_value(
step_per_trajectory=step_per_trajectory,
reward=reward,
gamma=gamma,
).mean()
return estimated_policy_value
def estimate_interval(
step_per_trajectory: int,
reward: np.ndarray,
gamma: float = 1.0,
alpha: float = 0.05,
ci: str = "bootstrap",
n_bootstrap_samples: int = 10000,
random_state: Optional[int] = None,
**kwargs,
) -> Dict[str, float]:
"""Estimate confidence intervals for OPE estimate for a logged dataset"""
estimated_trajectory_value = self._estimate_trajectory_value(
step_per_trajectory=step_per_trajectory,
action=action,
reward=reward,
pscore=pscore,
evaluation_policy_action_dist=evaluation_policy_action_dist,
gamma=gamma,
)
return self._estimate_confidence_interval[ci](
samples=estimated_trajectory_value,
alpha=alpha,
n_bootstrap_samples=n_bootstrap_samples,
random_state=random_state,
)
To add the inequality to derive confidence intervals, override the following property.
from scope_rl.ope import DiscreteTrajectoryWiseImportanceSampling
def estimate_condifence_interval_by_ttest(
samples: np.ndarray,
alpha: float = 0.05,
**kwargs,
) -> Dict[str, float]:
"""Estimate confidence intervals by Student's T-test."""
n = len(samples)
t = scipy.stats.t.ppf(1 - alpha, n - 1)
mean = samples.mean()
ci = t * samples.std(ddof=1) / np.sqrt(n)
return {
"mean": mean,
f"{100 * (1. - alpha)}% CI (lower)": mean - ci,
f"{100 * (1. - alpha)}% CI (upper)": mean + ci,
}
@dataclass
class CustomHighConfidenceTIS(DiscreteTrajectoryWiseImportanceSampling):
@property
def _estimate_confidence_interval(self) -> Dict[str, Callable]:
return {
"custom_ci": estimate_confidence_interval_by_ttest
}
Note that, an OPE estimator can take the following inputs from logged_dataset
and input_dict
.
(logged datasets)
step_per_trajectory
: number of steps in a trajectorystate
: state observationaction
: action chosen by the behavior policyreward
: reward observationdone
: whether an episode ends or not (due to the consequence of agent action)terminal
: whether an episode terminates or not (due to fixed episode lengths)pscore
: probability of the behavior policy choosing the observed action
(input dict)
evaluation_policy_action
: action chosen by the evaluation policy (continuous action case)evaluation_policy_action_dist
: action distribution of the evaluation policy (discrete action case)state_action_value_prediction
: predicted value for observed state-action pairsinitial_state_value_prediction
: predicted value for observed initial actionsstate_action_marginal_importance_weight
: estimated state-action marginal importance weightstate_marginal_importance_weight
: estimated state-marginal importance weighton_policy_policy_value
: on-policy policy value of the evaluation policygamma
: discount factor
Other auxiliary information
size
: number of data tuplesn_trajectories
: number of trajectoriesaction_type
: type of action, either “discrete” or “continuous”n_actions
: number of actions (discrete action case)action_dim
: dimension of actions (continuous action case)action_keys
: disctionary containing the name of actions (optional)action_meaning
: np.ndarray to map action index to actual actionsstate_dim
: dimension of statesstate_keys
: disctionary containing the name of states (optional)info
: info obtained during the interaction of the agentbehavior_policy
: name of the behavior policyevaluation_policy
: name of the evaluation policydataset_id
: dataset id
Estimator for Cumulative Distribution OPE#
To define a custom CD-OPE estimator, use BaseCumulativeDistributionOPEEstimator
and implement the following abstract methods.
estimate_cumulative_distribution_function
: estimate cumulative distribution function (CDF)estimate_mean
: estimate policy value from CDFestimate_variance
: estimate variance from CDFestimate_conditional_value_at_risk
: estimate conditional value at risk (CVaR) from CDFestimate_interquartile_range
: estimate interquartile range from CDF
We provide an example of implementing a naive cdf estimator, which does not apply importance sampling as follows.
from scope_rl.ope import BaseCumulativeDistributionOPEEstimator
@dataclass
class NaiveCumulativeDistributionEstimator(BaseCumulativeDistributionOPEEstimator):
estimator_name = "naive_cdf"
def __post__init__(self):
self.action_type = "discrete" # "continuous"
def estimate_cumulative_distribution_function(
self,
step_per_trajectory: int,
reward: np.ndarray,
reward_scale: np.ndarray, # bins of the CDF
gamma: float = 1.0,
**kwargs,
) -> Tuple[np.ndarray]:
"""Estimate cumulative distribution function."""
reward = reward.reshape((-1, step_per_trajectory))
discount = np.full(step_per_trajectory, gamma).cumprod() / gamma
trajectory_reward = (discount[np.newaxis, :] * reward).sum(
axis=1
)
sort_idxes = trajectory_wise_reward.argsort()
sorted_importance_weight = trajectory_wise_importance_weight[sort_idxes]
cumulative_density = np.clip(sorted_importance_weight.cumsum() / n, 0, 1)
trajectory_wise_reward = np.clip(
trajectory_wise_reward, reward_scale.min(), reward_scale.max()
)
histogram = np.histogram(
trajectory_wise_reward, bins=reward_scale, density=False
)[0]
idx = histogram.cumsum().astype(int) - 1
idx = np.where(idx < 0, 0, idx)
cumulative_density = cumulative_density[idx]
return np.insert(cumulative_density, 0, 0)
def estimate_mean(
self,
reward: np.ndarray,
reward_scale: np.ndarray,
gamma: float = 1.0,
**kwargs,
) -> float:
"""Estimate mean (i.e., policy value) from CDF."""
cumulative_density = self.estimate_cumulative_distribution_function(
step_per_trajectory=step_per_trajectory,
reward=reward,
reward_scale=reward_scale,
gamma=gamma,
)
return (np.diff(cumulative_density) * reward_scale[1:]).sum()
def estimate_variance(
self,
reward: np.ndarray,
reward_scale: np.ndarray,
gamma: float = 1.0,
**kwargs,
) -> float:
"""Estimate variance from CDF."""
cumulative_density = self.estimate_cumulative_distribution_function(
step_per_trajectory=step_per_trajectory,
reward=reward,
reward_scale=reward_scale,
gamma=gamma,
)
mean = (np.diff(cumulative_density) * reward_scale[1:]).sum()
return (np.diff(cumulative_density) * (reward_scale[1:] - mean) ** 2).sum()
def estimate_conditional_value_at_risk(
self,
reward: np.ndarray,
reward_scale: np.ndarray,
gamma: float = 1.0,
alphas: Optional[np.ndarray] = None, # the proportions of the sided region
**kwargs,
):
"""Estimate CVaR from CDF."""
if alphas is None:
alphas = np.linspace(0, 1, 21)
cumulative_density = self.estimate_cumulative_distribution_function(
step_per_trajectory=step_per_trajectory,
reward=reward,
reward_scale=reward_scale,
gamma=gamma,
)
cvar = np.zeros_like(alphas)
for i, alpha in enumerate(alphas):
idx_ = np.nonzero(cumulative_density[1:] > alpha)[0]
if len(idx_) == 0:
cvar[i] = (
np.diff(cumulative_density) * reward_scale[1:]
).sum() / cumulative_density[-1]
elif idx_[0] == 0:
cvar[i] = reward_scale[1]
else:
lower_idx_ = idx_[0]
relative_probability_density = (
np.diff(cumulative_density)[: lower_idx_ + 1]
/ cumulative_density[lower_idx_ + 1]
)
cvar[i] = (
relative_probability_density * reward_scale[1 : lower_idx_ + 2]
).sum()
return cvar
def estimate_interquartile_range(
self,
reward: np.ndarray,
reward_scale: np.ndarray,
gamma: float = 1.0,
alphas: float = 0.05, # the proportion of the sided region
**kwargs,
) -> Dict[str, float]:
"""Estimate interquartile range from CDF."""
cumulative_density = self.estimate_cumulative_distribution_function(
step_per_trajectory=step_per_trajectory,
reward=reward,
reward_scale=reward_scale,
gamma=gamma,
)
lower_idx_ = np.nonzero(cumulative_density > alpha)[0]
median_idx_ = np.nonzero(cumulative_density > 0.5)[0]
upper_idx_ = np.nonzero(cumulative_density > 1 - alpha)[0]
estimated_interquartile_range = {
"median": self._target_value_given_idx(
median_idx_, reward_scale=reward_scale
),
f"{100 * (1. - alpha)}% quartile (lower)": self._target_value_given_idx(
lower_idx_,
reward_scale=reward_scale,
),
f"{100 * (1. - alpha)}% quartile (upper)": self._target_value_given_idx(
upper_idx_,
reward_scale=reward_scale,
),
}
return estimated_interquartile_range
Note that, the available inputs are the same with basic OPE.
See also
Finally, contributions to SCOPE-RL with a new OPE estimator are more than welcome! Please read the guidelines for contribution (CONTRIBUTING.md).