Source code for pyfemtet.opt.interface._surrogate_model_interface.botorch_interface

from typing import Sequence

import numpy as np
from scipy.stats.distributions import norm

# from gpytorch.priors.torch_priors import GammaPrior

from pyfemtet._i18n import _
from pyfemtet.opt.history import *
from pyfemtet.opt.exceptions import *

from pyfemtet.opt.prediction._model import PyFemtetModel, SingleTaskGPModel

from pyfemtet.opt.interface._surrogate_model_interface.base_surrogate_interface import AbstractSurrogateModelInterfaceBase

from pyfemtet.logger import get_module_logger

logger = get_module_logger('opt.interface', False)


__all__ = [
    'BoTorchInterface',
    'PoFBoTorchInterface',
]


[docs] class BoTorchInterface(AbstractSurrogateModelInterfaceBase): current_obj_std_values: dict[str, float] def __init__( self, history_path: str = None, train_history: History = None, _output_directions: ( Sequence[str | float] | dict[str, str | float] | dict[int, str | float] ) = None ): AbstractSurrogateModelInterfaceBase.__init__( self, history_path, train_history, _output_directions ) self.model = SingleTaskGPModel() self.pyfemtet_model = PyFemtetModel() self.current_obj_std_values = {} # get main only df = self.train_history.get_df(MAIN_FILTER) # filter succeeded only df = df[df['state'] == TrialState.succeeded] # training self.pyfemtet_model.update_model(self.model) self.pyfemtet_model.fit( history=self.train_history, df=df, observation_noise='no', )
[docs] def update(self) -> None: # update current objective values x = np.array([[variable.value for variable in self.current_prm_values.values()]]) y, s = self.pyfemtet_model.predict(x) y = y[0] s = s[0] for obj_name, obj_value, obj_std in zip(self.train_history.obj_names, y, s): self.current_obj_values.update({obj_name: obj_value}) self.current_obj_std_values.update({obj_name: obj_std})
[docs] class PoFBoTorchInterface(BoTorchInterface, AbstractSurrogateModelInterfaceBase): _debug: bool = False def __init__( self, history_path: str, train_history: History = None, observation_noise: float | str | None = None, feasibility_noise: float | str | None = None, feasibility_cdf_threshold: float | str = 0.5, # or 'sample_mean' _output_directions: ( Sequence[str | float] | dict[str, str | float] | dict[int, str | float] ) = None ): AbstractSurrogateModelInterfaceBase.__init__( self, history_path, train_history, _output_directions ) self.model = SingleTaskGPModel() self.pyfemtet_model = PyFemtetModel() self.model_c = SingleTaskGPModel() self.pyfemtet_model_c = PyFemtetModel() self.train_history_c = History() self.train_history_c.load_csv(history_path, with_finalize=True) self.pof_threshold = 0.5 self.feasibility_cdf_threshold = feasibility_cdf_threshold self.current_obj_std_values = {} # use feasibility as a single objective self.train_history_c.obj_names = ['feasibility'] # get main only df = self.train_history.get_df(MAIN_FILTER) df_c = self.train_history_c.get_df(MAIN_FILTER) # filter succeeded only for main df = df[df['state'] == TrialState.succeeded] # convert type bool to float df_c = df_c.astype({'feasibility': float}) # training main self.pyfemtet_model.update_model(self.model) self.pyfemtet_model.fit( history=self.train_history, df=df, observation_noise=observation_noise, ) # training model_c self.pyfemtet_model_c.update_model(self.model_c) self.pyfemtet_model_c.fit( history=self.train_history_c, df=df_c, # observation_noise=None, # observation_noise='no', # observation_noise=0.001, observation_noise=feasibility_noise, # covar_module_settings=dict( # name='matern_kernel_with_gamma_prior', # nu=2.5, # lengthscale_prior=GammaPrior(1.0, 9.0), # default: 3, 6 # outputscale_prior=GammaPrior(1.0, 0.15), # default: 2, 0.15 # ) covar_module_settings=dict( name='get_covar_module_with_dim_scaled_prior_extension', loc_coef=0.01, scale_coef=0.01, ) ) # set auto feasibility_cdf_threshold if self.feasibility_cdf_threshold == 'sample_mean': self.feasibility_cdf_threshold = df_c['feasibility'].mean() if self._debug: self._debug_df_c = df_c
[docs] def calc_pof(self): if self._debug: import plotly.graph_objects as go df = self._debug_df_c x_list = [] prm_names = self.train_history_c.prm_names for prm_name in prm_names: x_list.append(np.linspace( df[prm_name + '_lower_bound'].values[0], df[prm_name + '_upper_bound'].values[0], 20 )) for i in range(len(x_list)): for j in range(i, len(x_list)): if i == j: continue # i=0 # j=1 xx = np.meshgrid(x_list[i], x_list[j]) x_plot = np.array([[x[0]] * 400 for x in x_list]).T x_plot[:, i] = xx[0].ravel() x_plot[:, j] = xx[1].ravel() y_mean, y_std = self.pyfemtet_model_c.predict(x_plot) # feasibility_cdf_threshold = self.feasibility_cdf_threshold # feasibility_cdf_threshold = 0.5 cdf_threshold = 0.25 # 不明なところは pof が 1 近くにすればあとは ACQF がうまいことやってくれる # feasibility_cdf_threshold = self.feasibility_cdf_threshold * 0.5 pof = 1 - norm.cdf(cdf_threshold, y_mean, y_std) x1 = x_list[i] x2 = x_list[j] xx1 = xx[0] xx2 = xx[1] fig = go.Figure() fig.add_trace( go.Contour( x0=x1[0], y0=x2[0], dx=np.diff(x1)[0], dy=np.diff(x2)[0], z=pof.reshape(xx1.shape), ) ) fig.add_trace( go.Scatter( x=self._debug_df_c[prm_names[i]], y=self._debug_df_c[prm_names[j]], mode='markers', marker=dict( color=self._debug_df_c['feasibility'], colorscale='greens', ), ) ) fig.show() fig = go.Figure() fig.add_trace( go.Surface( x=xx1, y=xx2, z=y_mean.reshape(xx1.shape), ) ) fig.add_trace( go.Surface( x=xx1, y=xx2, z=(y_mean + y_std).reshape(xx1.shape), opacity=0.3, ) ) fig.add_trace( go.Surface( x=xx1, y=xx2, z=(y_mean - y_std).reshape(xx1.shape), opacity=0.3, ) ) fig.add_trace( go.Scatter3d( x=self._debug_df_c[prm_names[i]], y=self._debug_df_c[prm_names[j]], z=self._debug_df_c['feasibility'], mode='markers', ) ) fig.show() return x = np.array([[variable.value for variable in self.current_prm_values.values()]]) f_mean, f_std = self.pyfemtet_model_c.predict(x) f_mean, f_std = f_mean[0][0], f_std[0][0] if isinstance(self.feasibility_cdf_threshold, float): cdf_threshold = self.feasibility_cdf_threshold else: raise NotImplementedError( f'self.cdf_threshold must be float, ' f'passed {self.feasibility_cdf_threshold}' ) pof = 1 - norm.cdf(cdf_threshold, f_mean, f_std) return pof
[docs] def update(self) -> None: # BoTorchInterface.update() の前に PoF を計算する pof = self.calc_pof() if pof < self.pof_threshold: logger.info( _( en_message='The surrogate model estimated ' 'that the probability of ' 'feasibility (PoF) is {pof}. ' 'This is under {thresh}. ' 'So this trial is processed as ' 'a constraint violation.', jp_message='サロゲートモデルは解の実行可能確率(PoF)が' '{pof} であると予測しました。' 'これは閾値 {thresh} を下回っているので、' '最適化試行においては拘束違反であると扱います。', pof=pof, thresh=self.pof_threshold, ) ) raise _HiddenConstraintViolation(f'PoF < {self.pof_threshold}') BoTorchInterface.update(self)