Source code for rtbgym.envs.simulator.rtb_synthetic

# Copyright (c) 2023, Haruka Kiyohara, Ren Kishimoto, HAKUHODO Technologies Inc., and Hanjuku-kaso Co., Ltd. All rights reserved.
# Licensed under the Apache 2.0 License.

"""Synthetic Bidding Auction Simulation."""
from dataclasses import dataclass
from typing import Tuple, Union, Optional

import numpy as np
from sklearn.utils import check_scalar, check_random_state

from .base import (
    BaseSimulator,
    BaseWinningPriceDistribution,
    BaseClickAndConversionRate,
)
from .function import (  # noqa: F401
    WinningPriceDistribution,
    ClickThroughRate,
    ConversionRate,
)
from ...utils import NormalDistribution, check_array
from ...types import Numeric


[docs]@dataclass class RTBSyntheticSimulator(BaseSimulator): """Class to calculate the outcome probability and stochastically determine auction result in Real-Time Bidding (RTB) setting for display advertising. Imported as: :class:`rtbgym.envs.simulator.RTBSyntheticSimulator` Parameters ------- cost_indicator: {"impression", "click", "conversion"}, default="click" Defines when the cost arises. step_per_episode: int, default=7 (> 0) Number of timesteps in an episode. n_ads: int, default=100 (> 0) Number of (candidate) ads used for auction bidding. n_users: int, default=100 (> 0) Number of (candidate) users used for auction bidding. ad_feature_vector: array-like of shape (n_ads, ad_feature_dim), default=None Feature vectors that characterizes each ad. user_feature_vector: array-like of shape (n_users, user_feature_dim), default=None Feature vectors that characterizes each user. ad_sampling_rate: array-like of shape (step_per_episode, n_ads) or (n_ads, ), default=None Sampling probabilities to determine which ad (id) is used in each auction. user_sampling_rate: array-like of shape (step_per_episode, n_users) or (n_uses, ), default=None Sampling probabilities to determine which user (id) is used in each auction. WinningPriceDistribution: BaseWinningPriceDistribution Winning price distribution of auctions. Both class and instance are acceptable. ClickThroughRate: BaseClickAndConversionRate Click through rate (i.e., click / impression). Both class and instance are acceptable. ConversionRate: BaseClickAndConversionRate Conversion rate (i.e., conversion / click). Both class and instance are acceptable. standard_bid_price_distribution: NormalDistribution, default=None Distribution of the bid price whose average impression probability is expected to be 0.5. minimum_standard_bid_price: int, default=None (> 0) Minimum value for standard bid price. If `None`, minimum_standard_bid_price is set to :class:`standard_bid_price_distribution.mean / 2`. search_volume_distribution: NormalDistribution, default=None Search volume distribution for each timestep. minimum_search_volume: int, default = 10 (> 0) Minimum search volume at each timestep. random_state: int, default=None (>= 0) Random state. References ------- Di Wu, Xiujun Chen, Xun Yang, Hao Wang, Qing Tan, Xiaoxun Zhang, Jian Xu, and Kun Gai. "Budget Constrained Bidding by Model-free Reinforcement Learning in Display Advertising." 2018. Jun Zhao, Guang Qiu, Ziyu Guan, Wei Zhao, and Xiaofei He. "Deep Reinforcement Learning for Sponsored Search Real-time Bidding." 2018. """ cost_indicator: str = "click" step_per_episode: int = 7 n_ads: int = 100 n_users: int = 100 ad_feature_dim: int = 5 user_feature_dim: int = 5 ad_feature_vector: Optional[np.ndarray] = None user_feature_vector: Optional[np.ndarray] = None ad_sampling_rate: Optional[np.ndarray] = None user_sampling_rate: Optional[np.ndarray] = None WinningPriceDistribution: BaseWinningPriceDistribution = ( # noqa: F811 WinningPriceDistribution ) ClickThroughRate: BaseClickAndConversionRate = ClickThroughRate # noqa: F811 ConversionRate: BaseClickAndConversionRate = ConversionRate # noqa: F811 standard_bid_price_distribution: Optional[NormalDistribution] = (None,) minimum_standard_bid_price: Optional[Union[int, float]] = None search_volume_distribution: Optional[NormalDistribution] = (None,) minimum_search_volume: int = 10 random_state: Optional[int] = None def __post_init__(self): if self.cost_indicator not in ["impression", "click", "conversion"]: raise ValueError( f"cost_indicator must be 'impression', 'click', or 'conversion', but {self.click_indicator} is given" ) check_scalar( self.step_per_episode, name="step_per_episode", target_type=int, min_val=1, ) check_scalar( self.n_ads, name="n_ads", target_type=int, min_val=1, ) check_scalar( self.n_users, name="n_users", target_type=int, min_val=1, ) self.ad_ids = np.arange(self.n_ads) self.user_ids = np.arange(self.n_users) if self.random_state is None: raise ValueError("random_state must be given") self.random_ = check_random_state(self.random_state) check_scalar( self.ad_feature_dim, name="ad_feature_dim", target_type=int, min_val=1, ) check_scalar( self.user_feature_dim, name="user_feature_dim", target_type=int, min_val=1, ) if self.ad_feature_vector is None: self.ad_feature_vector = self.random_.normal( size=(self.n_ads, self.ad_feature_dim) ) check_array( self.ad_feature_vector, name="ad_feature_vector", expected_dim=2, ) if self.ad_feature_vector.shape != (self.n_ads, self.ad_feature_dim): raise ValueError( "The shape of ad_feature_vector must be (n_ads, ad_feature_dim)" ) if self.user_feature_vector is None: self.user_feature_vector = self.random_.normal( size=(self.n_users, self.user_feature_dim) ) check_array( self.user_feature_vector, name="user_feature_vector", expected_dim=2, ) if self.user_feature_vector.shape != (self.n_users, self.user_feature_dim): raise ValueError( "The shape of user_feature_vector must be (n_users, user_feature_dim)" ) if self.ad_sampling_rate is None: self.ad_sampling_rate = np.full( (self.step_per_episode, self.n_ads), 1 / self.n_ads ) elif isinstance(self.ad_sampling_rate, np.ndarray): if self.ad_sampling_rate.ndim == 1: self.ad_sampling_rate = np.tile( self.ad_sampling_rate / self.ad_sampling_rate.sum(), (self.step_per_episode, 1), ) else: self.ad_sampling_rate = ( self.ad_sampling_rate / np.tile(np.sum(self.ad_sampling_rate, axis=1), (self.n_ads, 1)).T ) check_array( self.ad_sampling_rate, name="ad_sampling_rate", expected_dim=2, min_val=0, ) if self.ad_sampling_rate.shape != (self.step_per_episode, self.n_ads): raise ValueError( "The shape of ad_sampling_rate must be (step_per_episode, n_ads)" ) if not np.allclose( self.ad_sampling_rate.sum(axis=1), np.ones(self.step_per_episode) ): raise ValueError( "Expected `ad_sampling_rate.sum(axis=1) == np.ones(step_per_episode)`, but found False" ) if self.user_sampling_rate is None: self.user_sampling_rate = np.full( (self.step_per_episode, self.n_users), 1 / self.n_users ) elif isinstance(self.user_sampling_rate, np.ndarray): if self.user_sampling_rate.ndim == 1: self.user_sampling_rate = np.tile( self.user_sampling_rate / self.user_sampling_rate.sum(), (self.step_per_episode, 1), ) else: self.user_sampling_rate = ( self.user_sampling_rate / np.tile( np.sum(self.user_sampling_rate, axis=1), (self.n_users, 1) ).T ) check_array( self.user_sampling_rate, name="user_sampling_rate", expected_dim=2, ) if self.user_sampling_rate.shape != (self.step_per_episode, self.n_users): raise ValueError( "The shape of user_sampling_rate must be (step_per_episode, n_users)" ) if not np.allclose( self.user_sampling_rate.sum(axis=1), np.ones(self.step_per_episode) ): raise ValueError( "Expected `user_sampling_rate.sum(axis=1) == np.ones(step_per_episode)`, but found False" ) if self.standard_bid_price_distribution is None: self.standard_bid_price_distribution = NormalDistribution( mean=50, std=5, random_state=self.random_state, ) if not isinstance(self.standard_bid_price_distribution, NormalDistribution): raise ValueError( "standard_bid_price_distribution must be a NormalDistribution" ) if not isinstance(self.standard_bid_price_distribution.mean, Numeric): raise ValueError( "standard_bid_price_distribution must have a single parameter for mean and std" ) if self.minimum_standard_bid_price is not None and not ( isinstance(self.minimum_standard_bid_price, (int, float)) and 0 <= self.minimum_standard_bid_price <= self.standard_bid_price_distribution.mean ): raise ValueError( f"minimum_standard_bid_price must be a float value within [0, standard_bid_price_distribution.mean], but {self.minimum_standard_bid_price} is given" ) if self.search_volume_distribution is None: self.search_volume_distribution = NormalDistribution( mean=200, std=20, random_state=self.random_state, ) if isinstance(self.search_volume_distribution.mean, Numeric): self.search_volume_distribution = NormalDistribution( mean=np.full( self.step_per_episode, self.search_volume_distribution.mean ), std=np.full(self.step_per_episode, self.search_volume_distribution.std), random_state=self.random_state, ) check_array( self.search_volume_distribution.mean, name="search_volume_distribution.mean", expected_dim=1, min_val=0, ) if not ( isinstance(self.search_volume_distribution.mean, (int, float)) or len(self.search_volume_distribution.mean) == self.step_per_episode ): raise ValueError( "length of search_volume_distribution must be equal to step_per_episode" ) check_scalar( self.minimum_search_volume, name="minimum_search_volume", target_type=int, min_val=1, ) # define winning function if isinstance(self.WinningPriceDistribution, BaseWinningPriceDistribution): self.winning_price_distribution = self.WinningPriceDistribution elif issubclass(self.WinningPriceDistribution, BaseWinningPriceDistribution): self.winning_price_distribution = self.WinningPriceDistribution( n_ads=self.n_ads, n_users=self.n_users, ad_feature_dim=self.ad_feature_dim, user_feature_dim=self.user_feature_dim, step_per_episode=self.step_per_episode, standard_bid_price_distribution=self.standard_bid_price_distribution, minimum_standard_bid_price=self.minimum_standard_bid_price, random_state=self.random_state, ) else: raise ValueError( "WinningPriceDistribution must be a child class of BaseWinningPriceDistribution" ) # define click/imp and conversion/click rate function if isinstance(self.ClickThroughRate, BaseClickAndConversionRate): self.ctr = self.ClickThroughRate elif issubclass(self.ClickThroughRate, BaseClickAndConversionRate): self.ctr = self.ClickThroughRate( n_ads=self.n_ads, n_users=self.n_users, ad_feature_dim=self.ad_feature_dim, user_feature_dim=self.user_feature_dim, step_per_episode=self.step_per_episode, random_state=self.random_state, ) else: raise ValueError( "ClickThroughRate must be a child class of BaseClickAndConversionRate" ) if isinstance(self.ConversionRate, BaseClickAndConversionRate): self.cvr = self.ConversionRate elif issubclass(self.ConversionRate, BaseClickAndConversionRate): self.cvr = self.ConversionRate( n_ads=self.n_ads, n_users=self.n_users, ad_feature_dim=self.ad_feature_dim, user_feature_dim=self.user_feature_dim, step_per_episode=self.step_per_episode, random_state=self.random_state + 1, # to differentiate the coef with that of CTR ) else: raise ValueError( "ConversionRate must be a child class of BaseClickAndConversionRate" ) @property def standard_bid_price(self): return self.winning_price_distribution.standard_bid_price
[docs] def generate_auction( self, volume: Optional[int] = None, timestep: Optional[int] = None ): """Sample ad and user pair for each auction. Parameters ------- volume: int, default=None (> 0) Total number of auctions to generate. timestep: int, default=None (> 0) Timestep in the RL environment. Returns ------- ad_ids: ndarray of shape (volume, ) IDs of the ads. user_ids: ndarray of shape (volume, ) IDs of the users. """ # stochastically determine search volume if volume is None: if timestep is None: volume = np.clip( self.search_volume_distribution.sample(), self.minimum_search_volume, None, ).astype(int)[0][0] else: volume = np.clip( self.search_volume_distribution.sample(), self.minimum_search_volume, None, ).astype(int)[0][timestep - 1] check_scalar( volume, name="volume", target_type=(int, np.integer), min_val=1, ) if timestep is not None: check_scalar(timestep, name="timestep", target_type=int, min_val=0) ad_ids = self.random_.choice( self.ad_ids, size=volume, p=self.ad_sampling_rate[timestep], ) user_ids = self.random_.choice( self.user_ids, size=volume, p=self.user_sampling_rate[timestep], ) else: ad_ids = self.random_.choice( self.ad_ids, size=volume, p=self.ad_sampling_rate.mean(axis=0), ) user_ids = self.random_.choice( self.user_ids, size=volume, p=self.user_sampling_rate.mean(axis=0), ) return ad_ids, user_ids
[docs] def map_idx_to_features( self, ad_ids: np.ndarray, user_ids: np.ndarray ) -> np.ndarray: """Map the ad and the user index into feature vectors. Parameters ------- ad_ids: array-like of shape (search_volume, ) IDs of the ads. (search_volume is determined in RL environment.) user_ids: array-like of shape (search_volume, ) IDs of the users. (search_volume is determined in RL environment.) Returns ------- ad_feature_vector: ndarray of shape (search_volume/n_samples, ad_feature_dim) Ad feature vector for each auction. user_feature_vector: ndarray of shape (search_volume/n_samples, user_feature_dim) User feature vector for each auction. """ check_array( ad_ids, name="ad_ids", expected_dim=1, expected_dtype=int, min_val=0, max_val=self.n_ads - 1, ) check_array( user_ids, name="user_ids", expected_dim=1, expected_dtype=int, min_val=0, max_val=self.n_users - 1, ) if ad_ids.shape[0] != user_ids.shape[0]: raise ValueError("ad_ids and user_ids must have the same length") ad_features = self.ad_feature_vector[ad_ids] user_features = self.user_feature_vector[user_ids] return ad_features, user_features
[docs] def calc_and_sample_outcome( self, timestep: int, ad_ids: np.ndarray, user_ids: np.ndarray, bid_prices: np.ndarray, ) -> Tuple[np.ndarray]: """Simulate bidding auction for given queries. (Calculate outcome probability and stochastically determine auction result.) Parameters ------- timestep: int (> 0) Timestep in the RL environment. ad_ids: array-like of shape (search_volume, ) IDs of the ads. user_ids: array-like of shape (search_volume, ) IDs of the users. bid_prices: array-like of shape(search_volume, ) Bid price for each action. (search_volume is determined in RL environment.) Returns ------- costs: ndarray of shape (search_volume, ) Cost raised (i.e., second price) for each auction. impressions: ndarray of shape (search_volume, ) Binary indicator of whether impression occurred or not for each auction. clicks: ndarray of shape (search_volume, ) Binary indicator of whether click occurred or not for each auction. conversions: ndarray of shape (search_volume, ) Binary indicator of whether conversion occurred or not for each auction. """ check_scalar( timestep, name="timestep", target_type=int, min_val=0, ) check_array( ad_ids, name="ad_ids", expected_dim=1, expected_dtype=int, min_val=0, max_val=self.n_ads - 1, ) check_array( user_ids, name="user_ids", expected_dim=1, expected_dtype=int, min_val=0, max_val=self.n_users - 1, ) if ad_ids.shape[0] != user_ids.shape[0]: raise ValueError("ad_ids and user_ids must have the same length") check_array( bid_prices, name="bid_prices", expected_dim=1, min_val=0, ) ad_feature_vector = self.ad_feature_vector[ad_ids] user_feature_vector = self.user_feature_vector[user_ids] impressions, winning_prices = self.winning_price_distribution.sample_outcome( bid_prices=bid_prices, ad_ids=ad_ids, user_ids=user_ids, ad_feature_vector=ad_feature_vector, user_feature_vector=user_feature_vector, timestep=timestep, ) clicks = ( self.ctr.sample_outcome( ad_ids=ad_ids, user_ids=user_ids, ad_feature_vector=ad_feature_vector, user_feature_vector=user_feature_vector, timestep=timestep, ) * impressions ) conversions = ( self.cvr.sample_outcome( ad_ids=ad_ids, user_ids=user_ids, ad_feature_vector=ad_feature_vector, user_feature_vector=user_feature_vector, timestep=timestep, ) * clicks ) if self.cost_indicator == "impression": costs = winning_prices * impressions elif self.cost_indicator == "click": costs = winning_prices * clicks elif self.cost_indicator == "conversion": costs = winning_prices * conversions return costs, impressions, clicks, conversions