Source code for rtbgym.envs.simulator.function

# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.

"""Mathematical Functions used in Real-Time Bidding (RTB) Simulation."""
from dataclasses import dataclass
from typing import Tuple, Union, Optional

import numpy as np
from sklearn.utils import check_scalar, check_random_state

from .base import (
    BaseWinningPriceDistribution,
    BaseClickAndConversionRate,
)
from ...utils import NormalDistribution
from ...utils import sigmoid
from ...utils import check_array


[docs]@dataclass class WinningPriceDistribution(BaseWinningPriceDistribution): """Class to sample the winning price (i.e., second price) and compare it with the given bid price. Imported as: :class:`rtbgym.envs.simulator.WinningDistribution` Note ------- Winning price distribution follows gamma distribution. .. math:: p(x) = x^{k-1} \\frac{\\exp(- x / \\theta)}{\\theta^k \\Gamma(k)}, where :math:`\\Gamma(k) := (k-1)!` and :math:`k` and :math:`\\theta` are hyperparameters. Tip ------- Use :class:`BaseWinningPriceDistribution` to define a custom WinningPriceDistribution. Parameters ------- n_ads: int (> 0) Number of ads. n_users: int (> 0) Number of users. (This is for API consistency) ad_feature_dim: int (> 0) Dimension of the ad feature vectors. (This is for API consistency) user_feature_dim: int (> 0) Dimension of the user feature vectors. (This is for API consistency) step_per_episode: int (> 0) Length of the CTR trend cycle. (This is for API consistency) standard_bid_price_distribution: NormalDistribution, default=None Distribution of the bid price whose average impression probability is expected to be 0.5. minimum_standard_bid_price: {int, float}, default=None (> 0) Minimum value for standard bid price. If None, minimum_standard_bid_price is set to standard_bid_price_distribution.mean / 2. random_state: int, default=None (>= 0) Random state. References ------- Wen-Yuan Zhu, Wen-Yueh Shih, Ying-Hsuan Lee, Wen-Chih Peng, and Jiun-Long Huang. "A Gamma-based Regression for Winning Price Estimation in Real-Time Bidding Advertising." 2017. """ n_ads: int n_users: int ad_feature_dim: int user_feature_dim: int step_per_episode: int standard_bid_price_distribution: Optional[NormalDistribution] = (None,) minimum_standard_bid_price: Optional[Union[int, float]] = None random_state: Optional[int] = None def __post_init__(self): check_scalar(self.n_ads, name="n_ads", target_type=int, min_val=1) if not isinstance(self.standard_bid_price_distribution, NormalDistribution): raise ValueError( "standard_bid_price_distribution must be a child class of NormalDistribution" ) if self.minimum_standard_bid_price is None: self.minimum_standard_bid_price = ( self.standard_bid_price_distribution.mean / 2 ) check_scalar( self.minimum_standard_bid_price, name="minimum_standard_bid_price", target_type=(int, float), min_val=0, ) if self.random_state is None: raise ValueError("random_state must be given") self.random_ = check_random_state(self.random_state) if self.standard_bid_price_distribution is None: self.standard_bid_price_distribution = NormalDistribution( mean=50, std=5, random_state=self.random_state, ) standard_bid_prices = np.clip( self.standard_bid_price_distribution.sample(self.n_ads), self.minimum_standard_bid_price, None, ) self.ks = self.random_.normal( loc=50, scale=5, size=self.n_ads, ) self.thetas = self.random_.normal( loc=standard_bid_prices * 0.02, scale=(standard_bid_prices * 0.02) / 5, size=self.n_ads, ) @property def standard_bid_price(self): return self.standard_bid_price_distribution.mean
[docs] def sample_outcome( self, bid_prices: np.ndarray, ad_ids: np.ndarray, user_ids: np.ndarray, ad_feature_vector: np.ndarray, user_feature_vector: np.ndarray, timestep: Union[int, np.ndarray], ) -> Tuple[np.ndarray]: """Calculate impression probability for given bid price. Parameters ------- bid_prices: array-like of shape (search_volume, ) Bid price for each auction. ad_ids: array-like of shape (search_volume/n_samples, ) Ad ids used for each auction. (This is for API consistency) user_ids: array-like of shape (search_volume/n_samples, ) User ids used for each auction. (This is for API consistency) ad_feature_vector: array-like of shape (search_volume/n_samples, ad_feature_dim) Ad feature vector for each auction. user_feature_vector: array-like of shape (search_volume/n_samples, user_feature_dim) User feature vector for each auction. timestep: {int, array-like of shape (n_samples, )} Timestep in the RL environment. Returns ------- impressions: ndarray of shape (search_volume, ) Whether impression occurred for each auction. winning_prices: ndarray of shape (search_volume, ) Sampled winning price for each auction. """ check_array( bid_prices, name="bid_prices", expected_dim=1, min_val=0, ) check_array( ad_ids, name="ad_ids", expected_dim=1, expected_dtype=int, min_val=0, max_val=self.n_ads - 1, ) winning_prices = np.clip( self.random_.gamma(shape=self.ks[ad_ids], scale=self.thetas[ad_ids]), 1, None, ) impressions = winning_prices < bid_prices return impressions.astype(int), winning_prices.astype(int)
[docs]@dataclass class ClickThroughRate(BaseClickAndConversionRate): """Class to calculate ground-truth CTR (i.e., click per impression). Imported as: :class:`rtbgym.envs.simulator.ClickThroughRate` Note ------- We define two coefficient, context coefficient (`coef`) and time coefficient (`time_coef`). First, the value is calculated linearly from context vector and coef by inner product. Then, we multiply the value with `time_coef` and gain (ground-truth) CTR. In short, CTR is calculated as follows. CTR = (context @ coef) * time_coef, where @ denotes inner product. Tip ------- Use :class:`BaseClickAndConversionRate` to define a custom ClickThroughRate. Parameters ------- n_ads: int (> 0) Number of ads. (This is for API consistency) n_users: int (> 0) Number of users. (This is for API consistency) ad_feature_dim: int (> 0) Dimension of the ad feature vectors. user_feature_dim: int (> 0) Dimension of the user feature vectors. step_per_episode: int (> 0) Length of the CTR trend cycle. random_state: int, default=None (>= 0) Random state. """ n_ads: int n_users: int ad_feature_dim: int user_feature_dim: int step_per_episode: int random_state: Optional[int] = None def __post_init__(self): check_scalar( self.ad_feature_dim, name="ad_feature_dim", target_type=int, min_val=1, ) check_scalar( self.user_feature_dim, name="user_feature_dim", target_type=int, min_val=1, ) check_scalar( self.step_per_episode, name="step_per_episode", target_type=int, min_val=1, ) if self.random_state is None: raise ValueError("random_state must be given") self.random_ = check_random_state(self.random_state) coef_dim = self.ad_feature_dim + self.user_feature_dim self.coef = self.random_.normal(loc=0.0, scale=0.5 / coef_dim, size=coef_dim) # define intermittent time_coef using trigonometric function n_wave = 10 time_coef_weight = self.random_.beta(5, 20, size=n_wave) start_point = self.random_.uniform(size=n_wave) time_coef = np.zeros(self.step_per_episode + 20) for i in range(10): time_coef += time_coef_weight[i] * ( np.cos( ( np.arange(self.step_per_episode + 20) * (i + 1) * np.pi + start_point[i] * 2 * np.pi ) / self.step_per_episode ) + 1 ) start_idx = self.random_.randint(5, 15) self.time_coef = ( time_coef[start_idx : start_idx + self.step_per_episode] / n_wave )
[docs] def calc_prob( self, ad_ids: np.ndarray, user_ids: np.ndarray, ad_feature_vector: np.ndarray, user_feature_vector: np.ndarray, timestep: Union[int, np.ndarray], ) -> np.ndarray: """Calculate CTR (i.e., click per impression). Note ------- CTR is calculated using both context coefficient (`coef`) and time coefficient (`time_coef`). CTR = (context @ coef) * time_coef, where @ denotes inner product. Parameters ------- ad_ids: array-like of shape (search_volume/n_samples, ) Ad ids used for each auction. (not used, but for API consistency) user_ids: array-like of shape (search_volume/n_samples, ) User ids used for each auction. (not used, but for API consistency) ad_feature_vector: array-like of shape (search_volume/n_samples, ad_feature_dim) Ad feature vector for each auction. user_feature_vector: array-like of shape (search_volume/n_samples, user_feature_dim) User feature vector for each auction. timestep: {int, array-like of shape (n_samples, )} Timestep in the RL environment. Returns ------- ctrs: ndarray of shape (search_volume/n_samples, ) Ground-truth CTR (i.e., click per impression) for each auction. """ check_array( ad_feature_vector, name="ad_feature_vector", expected_dim=2, ) check_array( user_feature_vector, name="user_feature_vector", expected_dim=2, ) if ad_feature_vector.shape[1] != self.ad_feature_dim: raise ValueError( "Expected `ad_feature_dim.shape[1] == ad_feature_dim`, but found False" ) if user_feature_vector.shape[1] != self.user_feature_dim: raise ValueError( "Expected `user_feature_dim.shape[1] == user_feature_dim`, but found False" ) if ad_feature_vector.shape[0] != user_feature_vector.shape[0]: raise ValueError( "Expected ad_feature_dim and user_feature_dim must have the same length" ) if not (isinstance(timestep, int) and timestep >= 0) and not ( isinstance(timestep, np.ndarray) and np.issubsctype(timestep, int) and timestep.ndim == 1 and timestep.min() >= 0 ): raise ValueError( "timestep must be an non-negative integer or an 1-dimensional NDArray of non-negative integers" ) contexts = np.concatenate([ad_feature_vector, user_feature_vector], axis=1) ctrs = sigmoid(contexts @ self.coef.T) * self.time_coef[timestep].flatten() return ctrs
[docs] def sample_outcome( self, ad_ids: np.ndarray, user_ids: np.ndarray, ad_feature_vector: np.ndarray, user_feature_vector: np.ndarray, timestep: Union[int, np.ndarray], ) -> np.ndarray: """Stochastically determine whether click occurs in impression=True case. Parameters ------- ad_ids: array-like of shape (search_volume/n_samples, ) Ad ids used for each auction. (not used, but for API consistency) user_ids: array-like of shape (search_volume/n_samples, ) User ids used for each auction. (not used, but for API consistency) ad_feature_vector: array-like of shape (search_volume/n_samples, ad_feature_dim) Ad feature vector for each auction. user_feature_vector: array-like of shape (search_volume/n_samples, user_feature_dim) User feature vector for each auction. timestep: {int, array-like of shape (n_samples, )} Timestep in the RL environment. Returns ------- clicks: array-like of shape (search_volume/n_samples, ) Whether click occurs when impression=True. """ ctrs = self.calc_prob( timestep=timestep, ad_ids=ad_ids, user_ids=user_ids, ad_feature_vector=ad_feature_vector, user_feature_vector=user_feature_vector, ) clicks = self.random_.rand(len(ad_ids)) < ctrs return clicks.astype(int)
[docs]@dataclass class ConversionRate(BaseClickAndConversionRate): """Class to calculate ground-truth CVR (i.e., conversion per click). Imported as: :class:`rtbgym.envs.simulator.ConversionRate` Note ------- We define two coefficient, context coefficient (`coef`) and time coefficient (`time_coef`). First, the value is calculated linearly from context vector and coef by inner product. Then, we multiply the value with `time_coef` and gain (ground-truth) CVR. In short, CVR is calculated as follows. CVR = (context @ coef) * time_coef, where @ denotes inner product. Tip ------- Use :class:`BaseClickAndConversionRate` to define a custom ConversionRate. Parameters ------- n_ads: int (> 0) Number of ads. (This is for API consistency) n_users: int (> 0) Number of users. (This is for API consistency) ad_feature_dim: int (> 0) Dimension of the ad feature vectors. user_feature_dim: int (> 0) Dimension of the user feature vectors. step_per_episode: int (> 0) Length of the CVR trend cycle. random_state: int, default=None (>= 0) Random state. """ n_ads: int n_users: int ad_feature_dim: int user_feature_dim: int step_per_episode: int random_state: Optional[int] = None def __post_init__(self): check_scalar( self.ad_feature_dim, name="ad_feature_dim", target_type=int, min_val=1, ) check_scalar( self.user_feature_dim, name="user_feature_dim", target_type=int, min_val=1, ) check_scalar( self.step_per_episode, name="step_per_episode", target_type=int, min_val=1, ) if self.random_state is None: raise ValueError("random_state must be given") self.random_ = check_random_state(self.random_state) coef_dim = self.ad_feature_dim + self.user_feature_dim self.coef = self.random_.normal(loc=0.0, scale=0.5 / coef_dim, size=coef_dim) # define intermittent time_coef using trigonometric function n_wave = 10 time_coef_weight = self.random_.beta(10, 15, size=n_wave) start_point = self.random_.uniform(size=n_wave) time_coef = np.zeros(self.step_per_episode + 20) for i in range(10): time_coef += time_coef_weight[i] * ( np.cos( ( np.arange(self.step_per_episode + 20) * (i + 1) * np.pi + start_point[i] * 2 * np.pi ) / self.step_per_episode ) + 1 ) start_idx = self.random_.randint(5, 15) self.time_coef = ( time_coef[start_idx : start_idx + self.step_per_episode] / n_wave )
[docs] def calc_prob( self, ad_ids: np.ndarray, user_ids: np.ndarray, ad_feature_vector: np.ndarray, user_feature_vector: np.ndarray, timestep: Union[int, np.ndarray], ) -> np.ndarray: """Calculate CVR (i.e., conversion per click) using context vectors. Note ------- CVR is calculated using both context coefficient (`coef`) and time coefficient (`time_coef`). CVR = (context @ coef) * time_coef, where @ denotes inner product. Parameters ------- ad_ids: array-like of shape (search_volume/n_samples, ) Ad ids used for each auction. (not used, but for API consistency) user_ids: array-like of shape (search_volume/n_samples, ) User ids used for each auction. (not used, but for API consistency) ad_feature_vector: array-like of shape (search_volume/n_samples, ad_feature_dim) Ad feature vector for each auction. user_feature_vector: array-like of shape (search_volume/n_samples, user_feature_dim) User feature vector for each auction. timestep: {int, array-like of shape (n_samples, )} Timestep in the RL environment. Returns ------- cvrs: ndarray of shape (search_volume/n_samples, ) Ground-truth CVR (i.e., conversion per click) for each auction. """ check_array( ad_feature_vector, name="ad_feature_vector", expected_dim=2, ) check_array( user_feature_vector, name="user_feature_vector", expected_dim=2, ) if ad_feature_vector.shape[1] != self.ad_feature_dim: raise ValueError( "Expected `ad_feature_dim.shape[1] == ad_feature_dim`, but found False" ) if user_feature_vector.shape[1] != self.user_feature_dim: raise ValueError( "Expected `user_feature_dim.shape[1] == user_feature_dim`, but found False" ) if ad_feature_vector.shape[0] != user_feature_vector.shape[0]: raise ValueError( "Expected ad_feature_dim and user_feature_dim must have the same length" ) if not (isinstance(timestep, int) and timestep >= 0) and not ( isinstance(timestep, np.ndarray) and np.issubsctype(timestep, int) and timestep.ndim == 1 and timestep.min() >= 0 ): raise ValueError( "timestep must be an non-negative integer or an 1-dimensional NDArray of non-negative integers" ) contexts = np.concatenate([ad_feature_vector, user_feature_vector], axis=1) cvrs = sigmoid(contexts @ self.coef.T) * self.time_coef[timestep].flatten() return cvrs
[docs] def sample_outcome( self, ad_ids: np.ndarray, user_ids: np.ndarray, ad_feature_vector: np.ndarray, user_feature_vector: np.ndarray, timestep: Union[int, np.ndarray], ) -> np.ndarray: """Stochastically determine whether conversion occurs in click=True case. Parameters ------- ad_ids: array-like of shape (search_volume/n_samples, ) Ad ids used for each auction. (not used, but for API consistency) user_ids: array-like of shape (search_volume/n_samples, ) User ids used for each auction. (not used, but for API consistency) ad_feature_vector: array-like of shape (search_volume/n_samples, ad_feature_dim) Ad feature vector for each auction. user_feature_vector: array-like of shape (search_volume/n_samples, user_feature_dim) User feature vector for each auction. timestep: {int, array-like of shape (n_samples, )} Timestep in the RL environment. Returns ------- conversions: ndarray of shape (search_volume/n_samples, ) Whether conversion occurs when click=True. """ cvrs = self.calc_prob( ad_ids=ad_ids, user_ids=user_ids, ad_feature_vector=ad_feature_vector, user_feature_vector=user_feature_vector, timestep=timestep, ) conversions = self.random_.rand(len(ad_ids)) < cvrs return conversions.astype(int)