# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.
"""Bid Price Calculation."""
from dataclasses import dataclass
from typing import Union, Optional
import numpy as np
from sklearn.base import BaseEstimator, is_classifier
from sklearn.utils import check_scalar, check_random_state, check_X_y
from .base import BaseSimulator
from ...utils import check_array
from ...types import Numeric
[docs]@dataclass
class Bidder:
"""Class to determine bid price.
Imported as: :class:`rtbgym.envs.simulator.Bidder`
Note
-------
Intended to be called and initialized from RTBEnv class in env.py.
Determine bid price by the following formula.
.. math::
{bid price}_{t, i} = {adjust rate}_{t} \\times {predicted reward}_{t,i} ( \\times {const.})
Parameters
-------
simulator: BaseSimulator
Auction simulator.
objective: {"click", "conversion"}, default="conversion"
Objective outcome (i.e., reward) of the auction.
reward_predictor: BaseEstimator, default=None
A machine learning model to predict the reward to determine the bidding price.
If `None`, the ground-truth (expected) reward is used instead of the predicted one.
scaler: {int, float}, default=None (> 0)
Scaling factor (constant value) used for bid price determination.
If `None`, one should call auto_fit_scaler().
random_state: int, default=None (>= 0)
Random state.
References
-------
Di Wu, Xiujun Chen, Xun Yang, Hao Wang, Qing Tan, Xiaoxun Zhang, Jian Xu, and Kun Gai.
"Budget Constrained Bidding by Model-free Reinforcement Learning in Display Advertising." 2018.
Jun Zhao, Guang Qiu, Ziyu Guan, Wei Zhao, and Xiaofei He.
"Deep Reinforcement Learning for Sponsored Search Real-time Bidding." 2018.
"""
simulator: BaseSimulator
objective: str = "conversion"
reward_predictor: Optional[BaseEstimator] = None
scaler: Optional[Union[int, float]] = None
random_state: Optional[int] = None
def __post_init__(self):
if not isinstance(self.simulator, BaseSimulator):
raise ValueError("simulator must be a child class of BaseSimulator")
if self.objective not in ["click", "conversion"]:
raise ValueError(
f'objective must be either "click" or "conversion", but {self.objective} is given'
)
if self.reward_predictor is not None and not isinstance(
self.reward_predictor, BaseEstimator
):
raise ValueError(
"reward_predictor must be BaseEstimator or a child class of BaseEstimator"
)
if self.scaler is not None:
check_scalar(
self.scaler,
name="scaler",
target_type=(int, float),
min_val=0,
)
if self.random_state is None:
raise ValueError("random_state must be given")
self.random_ = check_random_state(self.random_state)
self.use_reward_predictor = False if self.reward_predictor is None else True
@property
def standard_bid_price(self):
return self.simulator.standard_bid_price
[docs] def determine_bid_price(
self,
timestep: int,
adjust_rate: float,
ad_ids: np.ndarray,
user_ids: np.ndarray,
) -> np.ndarray:
"""Determine the bidding price using given adjust rate and the predicted/ground-truth rewards.
Note
-------
Determine bid price as follows.
.. math::
{bid price}_{t, i} = {adjust rate}_{t} \\times {predicted reward}_{t,i} ( \\times {const.})
Parameters
-------
timestep: int (> 0)
Timestep of the RL environment.
adjust_rate: float (>= 0)
Adjust rate parameter for the bidding price.
ad_ids: array-like of shape (search_volume, )
IDs of the ads.
user_ids: array-like of shape (search_volume, )
IDs of the users.
Returns
-------
bid_prices: ndarray of shape(search_volume, )
Bid price for each auction.
"""
if self.scaler is None:
raise RuntimeError(
"scalar should be given, please call .auto_fit_scaler() or .custom_set_scaler() before calling .determine_bid_price()"
)
check_scalar(
timestep,
name="timestep",
target_type=int,
min_val=0,
)
check_scalar(
adjust_rate,
name="adjust_rate",
target_type=Numeric,
min_val=0,
)
ad_feature_vector, user_feature_vector = self.simulator.map_idx_to_features(
ad_ids=ad_ids,
user_ids=user_ids,
)
if self.use_reward_predictor:
predicted_rewards = self._predict_reward(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timestep,
)
bid_prices = (
adjust_rate * predicted_rewards * self.standard_bid_price * self.scaler
)
else:
ground_truth_rewards = self._calc_ground_truth_reward(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timestep,
)
bid_prices = (
adjust_rate
* ground_truth_rewards
* self.standard_bid_price
* self.scaler
)
return bid_prices.astype(int)
[docs] def custom_set_scaler(self, scaler: Union[int, float]) -> None:
"""Set scaling factor used for bid price calculation.
Parameters
-------
scaler: {int, float} (> 0)
Scaling factor (constant value) used in bid price calculation.
"""
check_scalar(
scaler,
name="scaler",
target_type=(int, float),
min_val=0,
)
self.scaler = scaler
[docs] def auto_fit_scaler(self, step_per_episode: int, n_samples: int = 100000) -> None:
"""Fit scaling factor used for bid price calculation.
Note
-------
scaler is set to approximate reciprocal of the mean predicted/ground-truth rewards.
scaler ~= 1 / mean of predicted/ground-truth rewards
Parameters
-------
step_per_episode: int (> 0)
Number of timesteps in an episode.
n_samples: int, default=100000 (> 0)
Number of samples to fit bid_scaler.
"""
check_scalar(
step_per_episode,
name="step_per_episode",
target_type=int,
min_val=1,
)
check_scalar(
n_samples,
name="n_samples",
target_type=int,
min_val=1,
)
timesteps = self.random_.choice(step_per_episode, n_samples)
ad_ids, user_ids = self.simulator.generate_auction(volume=n_samples)
ad_feature_vector, user_feature_vector = self.simulator.map_idx_to_features(
ad_ids=ad_ids,
user_ids=user_ids,
)
if self.use_reward_predictor:
predicted_rewards = self._predict_reward(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timesteps,
)
self.scaler = 1 / predicted_rewards.mean()
else:
ground_truth_rewards = self._calc_ground_truth_reward(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timesteps,
)
self.scaler = 1 / ground_truth_rewards.mean()
[docs] def custom_set_reward_predictor(self, reward_predictor: BaseEstimator):
"""Set reward predictor used for bid price calculation.
Parameters
-------
reward_predictor: BaseEstimator, default=None
A machine learning model to predict the reward to determine the bidding price.
If None, the ground-truth (expected) reward is used instead of the predicted one.
"""
if reward_predictor is not None and not isinstance(
reward_predictor, BaseEstimator
):
raise ValueError("reward_predictor must be a child class of BaseEstimator")
self.reward_predictor = reward_predictor
self.use_reward_predictor = True
[docs] def fit_reward_predictor(
self, step_per_episode: int, n_samples: int = 100000
) -> None:
"""Fit reward predictor in advance (pre-train) to use prediction in bidding price determination.
Note
-------
Intended to be used only when use_reward_predictor=True option.
X and y of the prediction model is given as follows.
X: array-like of shape (search_volume, ad_feature_dim + user_feature_dim + 1)
Concatenated vector of contexts (ad_feature_vector + user_feature_vector) and timestep.
y: array-like of shape (search_volume, )
Reward (i.e., auction outcome) obtained in each auction.
Parameters
-------
step_per_episode: int (> 0)
Number of timesteps in an episode.
n_samples: int, default=100000 (> 0)
Number of samples to fit reward predictor.
"""
if not self.use_reward_predictor:
raise RuntimeError(
"Please set the attribute, reward_predictor, before calling .fit_reward_predictor()"
)
check_scalar(
step_per_episode,
name="step_per_episode",
target_type=int,
min_val=1,
)
check_scalar(
n_samples,
name="n_samples",
target_type=int,
min_val=1,
)
ad_ids, user_ids = self.simulator.generate_auction(n_samples)
ad_feature_vector, user_feature_vector = self.simulator.map_idx_to_features(
ad_ids, user_ids
)
contexts = np.concatenate([ad_feature_vector, user_feature_vector], axis=1)
timesteps = self.random_.choice(step_per_episode, n_samples)
feature_vectors = np.concatenate([contexts, timesteps.reshape((-1, 1))], axis=1)
if self.objective == "click":
rewards = self.simulator.ctr.sample_outcome(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timesteps,
)
else: # "conversion"
rewards = self.simulator.ctr.sample_outcome(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timesteps,
) * self.simulator.cvr.sample_outcome(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timesteps,
)
X, y = check_X_y(feature_vectors, rewards)
self.reward_predictor.fit(X, y)
def _predict_reward(
self,
ad_ids: np.ndarray,
user_ids: np.ndarray,
ad_feature_vector: np.ndarray,
user_feature_vector: np.ndarray,
timestep: Union[int, np.ndarray],
) -> np.ndarray:
"""Predict the reward (i.e., auction outcome) to determine bidding price.
Note
-------
Intended to be used only when use_reward_predictor=True option.
X and y of the prediction model is given as follows.
X: array-like of shape (search_volume, ad_feature_dim + user_feature_dim + 1)
Concatenated vector of contexts (ad_feature_vector + user_feature_vector) and timestep.
y: array-like of shape (search_volume, )
Reward (i.e., auction outcome) obtained in each auction.
Parameters
-------
ad_ids: array-like of shape (search_volume, )
IDs of the ads.
user_ids: array-like of shape (search_volume, )
IDs of the users.
ad_feature_vector: array-like of shape (search_volume, ad_feature_dim)
Feature vector of the ads.
user_feature_vector: array-like of shape (search_volume, user_feature_dim)
Feature vector of the users.
timestep: {int, array-like of shape (search_volume, )} (> 0)
Timestep in the RL environment.
Returns
-------
predicted_rewards: ndarray of shape (search_volume, )
Predicted reward for each auction.
"""
check_array(
ad_ids,
name="ad_ids",
expected_dim=1,
)
check_array(
ad_feature_vector,
name="ad_feature_vector",
expected_dim=2,
)
check_array(
user_feature_vector,
name="user_feature_vector",
expected_dim=2,
)
contexts = np.concatenate([ad_feature_vector, user_feature_vector], axis=1)
if isinstance(timestep, int):
timestep = np.full(ad_ids.shape[0], timestep)
check_array(timestep, name="timestep", expected_dim=1, min_val=0)
timestep = timestep.reshape((-1, 1))
X = np.concatenate([contexts, timestep], axis=1)
predicted_rewards = (
self.reward_predictor.predict_proba(X)[:, 1]
if is_classifier(self.reward_predictor)
else self.reward_predictor.predict(X)
)
return predicted_rewards
def _calc_ground_truth_reward(
self,
ad_ids: np.ndarray,
user_ids: np.ndarray,
ad_feature_vector: np.ndarray,
user_feature_vector: np.ndarray,
timestep: Union[int, np.ndarray],
) -> np.ndarray:
"""Calculate the ground-truth reward (i.e., auction outcome) to determine bidding price.
Parameters
-------
ad_ids: array-like of shape (search_volume, )
IDs of the ads.
user_ids: array-like of shape (search_volume, )
IDs of the users.
ad_feature_vector: array-like of shape (search_volume, ad_feature_dim)
Feature vector of the ads.
user_feature_vector: array-like of shape (search_volume, user_feature_dim)
Feature vector of the users.
timestep: {int, array-like of shape (search_volume, )}
Timestep in the RL environment.
Returns
-------
expected_rewards: array-like of shape(search_volume, )
Ground-truth (expected) reward for each auction when impression occurs.
"""
if self.objective == "click":
expected_rewards = self.simulator.ctr.calc_prob(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timestep,
)
else: # "conversion"
expected_rewards = self.simulator.ctr.calc_prob(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timestep,
) * self.simulator.cvr.calc_prob(
ad_ids=ad_ids,
user_ids=user_ids,
ad_feature_vector=ad_feature_vector,
user_feature_vector=user_feature_vector,
timestep=timestep,
)
return expected_rewards