# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.
"""Useful tools."""
from dataclasses import dataclass
from collections import defaultdict
from typing import DefaultDict, Dict, Union, Optional, Any, Tuple
from pathlib import Path
import pickle
import gym
import scipy
import numpy as np
from sklearn.utils import check_scalar, check_random_state
from .types import LoggedDataset, OPEInputDict
[docs]@dataclass
class MultipleLoggedDataset:
"""This class contains paths to multiple logged datasets and returns logged_dataset.
Parameters
-------
action_type: {"discrete", "continuous"}
Type of the action space.
path: str
Path to the directory. Either absolute or relative path is acceptable.
save_relative_path: bool, default=False.
Whether to save a relative path.
If `True`, a path relative to the scope-rl directory will be saved.
If `False`, the absolute path will be saved.
Note that this option was added in order to run examples in the documentation properly.
Otherwise, the default setting (`False`) is recommended.
"""
action_type: str
path: str
save_relative_path: bool = False
def __post_init__(self):
self.dataset_ids = defaultdict(int)
self.abs_path = None
self.relative_path = None
self.path = Path(self.path)
self.path.mkdir(parents=True, exist_ok=True)
if self.save_relative_path:
abs_path = str(self.path.resolve())
relative_path = abs_path.split("scope-rlrl/scope_rl/")
if len(relative_path) == 1:
self.relative_path = abs_path.split("scope_rl/")
else:
self.relative_path = "scope_rl/" + relative_path
else:
self.abs_path = self.path.resolve()
[docs] def add(self, logged_dataset: LoggedDataset, behavior_policy_name: str):
"""Save logged dataset.
Parameters
-------
logged_dataset: LoggedDataset.
Logged dataset to save.
behavior_policy_name: str
Name of the behavior policy that generated the logged dataset.
"""
dataset_id = self.dataset_ids[behavior_policy_name]
self.dataset_ids[behavior_policy_name] += 1
logged_dataset["dataset_id"] = dataset_id
with open(
self.path
/ f"logged_dataset_{self.action_type}_{behavior_policy_name}_{dataset_id}.pickle",
"wb",
) as f:
pickle.dump(logged_dataset, f)
[docs] def get(self, behavior_policy_name: str, dataset_id: int):
"""Load logged dataset.
Parameters
-------
behavior_policy_name: str
Name of the behavior policy that generated the logged dataset.
dataset_id: int
Id of the logged dataset.
Returns
-------
logged_dataset: LoggedDataset.
Logged dataset.
"""
if self.save_relative_path:
abs_path = str(Path.cwd())
abs_path = abs_path.split("scope-rl/scope_rl/")
if len(abs_path) == 1:
abs_path = abs_path.split("scope_rl/")
abs_path = Path(abs_path[0] + "scope_rl/" + self.relative_path)
else:
abs_path = Path(abs_path[0] + "scope-rl/scope_rl/" + self.relative_path)
else:
path = self.abs_path
with open(
path
/ f"logged_dataset_{self.action_type}_{behavior_policy_name}_{dataset_id}.pickle",
"rb",
) as f:
logged_dataset = pickle.load(f)
return logged_dataset
@property
def behavior_policy_names(self):
return list(self.dataset_ids.keys())
@property
def n_datasets(self):
return defaultdict_to_dict(self.dataset_ids)
[docs]def l2_distance(
x: np.ndarray,
y: np.ndarray,
bandwidth: float = 1.0,
):
"""Calcilate L2 distance.
Parameters
-------
x: array-like of shape (n_samples, n_dim)
Input array 1.
y: array-like of shape (n_samples, n_dim)
Input array 2.
Returns
-------
distance: ndarray of (n_samples, )
distance between x and y.
"""
x_2 = (x**2).sum(axis=1)
y_2 = (y**2).sum(axis=1)
x_y = (x[:, np.newaxis, :] @ y[:, :, np.newaxis]).flatten()
return x_2 + y_2 - 2 * x_y
[docs]def gaussian_kernel(
x: np.ndarray,
y: np.ndarray,
bandwidth: float = 1.0,
):
"""Gaussian kernel.
x: array-like of shape (n_samples, n_dim)
Input array 1.
y: array-like of shape (n_samples, n_dim)
Input array 2.
bandwidth: float, default=1.0
Bandwidth hyperparameter of the Gaussian kernel.
Returns
-------
kernel_density: ndarray of (n_samples, )
kernel density of x given y.
"""
distance = l2_distance(x, y)
return np.exp(-distance / (2 * bandwidth**2)) / np.sqrt(
2 * np.pi * bandwidth**2
)
[docs]def triangular_kernel(
x: np.ndarray,
y: np.ndarray,
bandwidth: float = 1.0,
):
"""Triangular kernel.
Parameters
-------
x: array-like of shape (n_samples, n_dim)
Input array 1.
y: array-like of shape (n_samples, n_dim)
Input array 2.
bandwidth: float, default=1.0
Bandwidth hyperparameter of the Trianglar kernel.
Returns
-------
kernel_density: ndarray of (n_samples, )
kernel density of x given y.
"""
distance = np.sqrt(l2_distance(x, y))
norm_dist = np.clip(distance / bandwidth)
return (norm_dist < 1) * (1 - norm_dist) / bandwidth
[docs]def epanechnikov_kernel(
x: np.ndarray,
y: np.ndarray,
bandwidth: float = 1.0,
):
"""Epanechnikov kernel.
Parameters
-------
x: array-like of shape (n_samples, n_dim)
Input array 1.
y: array-like of shape (n_samples, n_dim)
Input array 2.
bandwidth: float, default=1.0
Bandwidth hyperparameter of the Trianglar kernel.
Returns
-------
kernel_density: ndarray of (n_samples, )
kernel density of x given y.
"""
distance = np.sqrt(l2_distance(x, y))
clipped_norm_dist = np.clip(distance / bandwidth, None, 1.0)
return 0.75 * (1 - clipped_norm_dist**2) / bandwidth
[docs]def cosine_kernel(
x: np.ndarray,
y: np.ndarray,
bandwidth: float = 1.0,
):
"""Cosine kernel.
x: array-like of shape (n_samples, n_dim)
Input array 1.
y: array-like of shape (n_samples, n_dim)
Input array 2.
bandwidth: float, default=1.0
Bandwidth hyperparameter of the Trianglar kernel.
Returns
-------
kernel_density: ndarray of (n_samples, )
kernel density of x given y.
"""
distance = np.sqrt(l2_distance(x, y))
norm_dist = np.clip(distance / bandwidth)
return (norm_dist < 1) * (np.pi / 4) * np.cos(norm_dist * np.pi / 2) / bandwidth
[docs]def estimate_confidence_interval_by_bootstrap(
samples: np.ndarray,
alpha: float = 0.05,
n_bootstrap_samples: int = 100,
random_state: Optional[int] = None,
) -> Dict[str, float]:
"""Estimate the confidence interval by a nonparametric bootstrap-like procedure.
Parameters
-------
samples: array-like
Samples.
alpha: float, default=0.05
Significance level. The value should be within `[0, 1)`.
n_bootstrap_samples: int, default=10000 (> 0)
Number of resampling performed in the bootstrap procedure.
random_state: int, default=None (>= 0)
Random state.
Returns
-------
estimated_confidence_interval: dict
Dictionary storing the estimated mean and upper-lower confidence bounds.
"""
check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=1.0)
check_scalar(
n_bootstrap_samples, name="n_bootstrap_samples", target_type=int, min_val=1
)
if random_state is None:
raise ValueError("random_state must be given")
random_ = check_random_state(random_state)
boot_samples = [
np.mean(random_.choice(samples, size=samples.shape[0]))
for i in range(n_bootstrap_samples)
]
lower_bound = np.percentile(boot_samples, 100 * (alpha / 2))
upper_bound = np.percentile(boot_samples, 100 * (1.0 - alpha / 2))
return {
"mean": np.mean(boot_samples),
f"{100 * (1. - alpha)}% CI (lower)": lower_bound,
f"{100 * (1. - alpha)}% CI (upper)": upper_bound,
}
[docs]def estimate_confidence_interval_by_hoeffding(
samples: np.ndarray,
alpha: float = 0.05,
**kwargs,
) -> Dict[str, float]:
"""Estimate the confidence interval by the Hoeffding's inequality.
Note
-------
The Hoeffding's inequality provides high-probability bounds of the expectation :math:`\\mu := \\mathbb{E}[X], X \\sim p(X)` as follows.
.. math::
|\\hat{\\mu} - \\mu| \\leq X_{\\max} \\sqrt{\\frac{\\log(1 / \\alpha)}{2 n}},
which holds with probability :math:`1 - \\alpha` where :math:`n` is the data size.
Parameters
-------
samples: array-like
Samples.
alpha: float, default=0.05
Significance level. The value should be within `[0, 1)`.
Returns
-------
estimated_confidence_interval: dict
Dictionary storing the estimated mean and upper-lower confidence bounds.
"""
check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=1.0)
mean = samples.mean()
ci = samples.max() * np.sqrt(np.log(2 / alpha) / 2 * len(samples))
return {
"mean": mean,
f"{100 * (1. - alpha)}% CI (lower)": mean - ci,
f"{100 * (1. - alpha)}% CI (upper)": mean + ci,
}
[docs]def estimate_confidence_interval_by_empirical_bernstein(
samples: np.ndarray,
alpha: float = 0.05,
**kwargs,
) -> Dict[str, float]:
"""Estimate the confidence interval by the empirical bernstein inequality.
Note
-------
The empirical bernstein inequality provides high-probability bounds of the expectation :math:`\\mu := \\mathbb{E}[X], X \\sim p(X)` as follows.
.. math::
|\\hat{\\mu} - \\mu| \\leq \\frac{7 X_{\\max} \\log(2 / \\alpha)}{3 (n - 1)} + \\sqrt{\\frac{2 \\hat{\\mathbb{V}}(X) \\log(2 / \\alpha)}{n(n - 1)}},
which holds with probability :math:`1 - \\alpha` where :math:`n` is the data size and :math:`\\hat{\\mathbb{V}}` is the sample variance.
Parameters
-------
samples: array-like
Samples.
alpha: float, default=0.05
Significance level. The value should be within `[0, 1)`.
Returns
-------
estimated_confidence_interval: dict
Dictionary storing the estimated mean and upper-lower confidence bounds.
"""
check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=1.0)
n = len(samples)
mean = samples.mean()
ci = 7 * samples.max() * np.log(2 / alpha) / (3 * (n - 1)) + np.sqrt(
2 * np.log(2 / alpha) * samples.var() / (n - 1)
)
return {
"mean": mean,
f"{100 * (1. - alpha)}% CI (lower)": mean - ci,
f"{100 * (1. - alpha)}% CI (upper)": mean + ci,
}
[docs]def estimate_confidence_interval_by_t_test(
samples: np.ndarray,
alpha: float = 0.05,
**kwargs,
) -> Dict[str, float]:
"""Estimate the confidence interval by Student T-test.
Note
-------
Student T-test assumes that :math:`X \\sim p(X)` follows a normal distribution.
Based on this assumption, the :math:`1 - \\alpha` \% confidence interval of :math:`\\mu := \\mathbb{E}[X]` is derived as follows.
.. math::
|\\hat{\\mu} - \\mu| \\leq \\frac{T_{\\mathrm{test}}(1 - \\alpha, n-1)}{\\sqrt{n} / \\hat{\\sigma}},
where :math:`n` is the data size, :math:`T_{\\mathrm{test}}(\\cdot,\\cdot)` is the T-value, and :math:`\\sigma` is the standard deviation, respectively.
Parameters
-------
samples: NDArray
Samples.
alpha: float, default=0.05
Significance level. The value should be within `[0, 1)`.
Returns
-------
estimated_confidence_interval: dict
Dictionary storing the estimated mean and upper-lower confidence bounds.
"""
check_scalar(alpha, name="alpha", target_type=float, min_val=0.0, max_val=1.0)
n = len(samples)
t = scipy.stats.t.ppf(1 - alpha, n - 1)
mean = samples.mean()
ci = t * samples.std(ddof=1) / np.sqrt(n)
return {
"mean": mean,
f"{100 * (1. - alpha)}% CI (lower)": mean - ci,
f"{100 * (1. - alpha)}% CI (upper)": mean + ci,
}
[docs]def defaultdict_to_dict(dict_: Union[Dict[Any, Any], DefaultDict[Any, Any]]):
"""Transform a defaultdict into a corresponding dict."""
if isinstance(dict_, defaultdict):
dict_ = {key: defaultdict_to_dict(value) for key, value in dict_.items()}
return dict_
[docs]def check_array(
array: np.ndarray,
name: str,
expected_dim: int = 1,
expected_dtype: Optional[type] = None,
min_val: Optional[float] = None,
max_val: Optional[float] = None,
) -> ValueError:
"""Input validation on array.
Parameters
-------
array: object
Input array to check.
name: str
Name of the input array.
expected_dim: int, default=1
Expected dimension of the input array.
expected_dtype: {type, tuple of type}, default=None
Expected dtype of the input array.
min_val: float, default=None
Minimum value allowed in the input array.
max_val: float, default=None
Maximum value allowed in the input array.
"""
if not isinstance(array, np.ndarray):
raise ValueError(f"{name} must be {expected_dim}D array, but got {type(array)}")
if array.ndim != expected_dim:
raise ValueError(
f"{name} must be {expected_dim}D array, but got {array.ndim}D array"
)
if expected_dtype is not None:
if not np.issubsctype(array, expected_dtype):
raise ValueError(
f"The elements of {name} must be {expected_dtype}, but got {array.dtype}"
)
if min_val is not None:
if array.min() < min_val:
raise ValueError(
f"The elements of {name} must be larger than {min_val}, but got minimum value {array.min()}"
)
if max_val is not None:
if array.max() > max_val:
raise ValueError(
f"The elements of {name} must be smaller than {max_val}, but got maximum value {array.max()}"
)
[docs]def check_logged_dataset(logged_dataset: LoggedDataset):
"""Check logged dataset keys.
Parameters
-------
logged_dataset: LoggedDataset
Logged dataset.
"""
dataset_keys = logged_dataset.keys()
for expected_key in [
"n_trajectories",
"action_type",
"n_actions",
"action_dim",
"state_dim",
"step_per_trajectory",
"state",
"action",
"reward",
"pscore",
"done",
"terminal",
]:
if expected_key not in dataset_keys:
raise RuntimeError(f"{expected_key} does not exist in logged_dataset")
[docs]class NewGymAPIWrapper:
"""This class converts old gym outputs (gym<0.26.0) to the new ones (gym>=0.26.0)."""
def __init__(
self,
env: gym.Env,
):
self.env = env
def reset(self, seed: Optional[int] = None) -> np.ndarray:
self.env.seed(seed)
state = self.env.reset()
return state, {}
def step(self, action: Any) -> Tuple[Any]:
state, action, done, info = self.env.step(action)
return state, action, False, done, info
def render(self):
self.env.render()
def close(self):
self.env.close()
def __getattr__(self, key) -> Any:
return object.__getattribute__(self.env, key)
[docs]class OldGymAPIWrapper:
"""This class converts new gym outputs (gym>=0.26.0) to the old ones (gym<0.26.0)."""
def __init__(
self,
env: gym.Env,
):
self.env = env
def reset(self) -> np.ndarray:
state, info = self.env.reset()
return state
def step(self, action: Any) -> Tuple[Any]:
state, action, done, truncated, info = self.env.step(action)
return state, action, done or truncated, info
def render(self, mode: str = "human"):
self.env.render()
def close(self):
self.env.close()
def seed(self, seed: Optional[int] = None):
self.env.reset(seed=seed)
def __getattr__(self, key) -> Any:
return object.__getattribute__(self.env, key)