Source code for pyfemtet.opt.optimizer.scipy_optimizer._scipy_optimizer

from __future__ import annotations

from typing import Callable

from contextlib import suppress
from time import time

import numpy as np
from scipy.optimize import minimize, OptimizeResult
from scipy.optimize import NonlinearConstraint

from pyfemtet._i18n import Msg, _
from pyfemtet._util.closing import closing
from pyfemtet._util.dask_util import get_client, Lock
from pyfemtet.opt.problem.variable_manager import *
from pyfemtet.opt.problem.problem import *
from pyfemtet.opt.exceptions import *
from pyfemtet.opt.history import MAIN_FILTER
from pyfemtet.logger import get_module_logger

from pyfemtet.opt.optimizer._base_optimizer import *


__all__ = [
    'ScipyOptimizer',
]


logger = get_module_logger('opt.optimizer', False)


class InterruptScipyMinimize(Exception):
    pass


class _ScipyCallback:

    def __init__(self, opt: ScipyOptimizer):
        self.opt = opt

    def __call__(self, xk: np.ndarray = None, intermediate_result: OptimizeResult = None):
        pass


[docs] class ScipyOptimizer(AbstractOptimizer): """ Optimizer class that utilizes SciPy optimization methods. This class serves as a wrapper around SciPy's optimization routines, allowing customization of the optimization method, tolerance, and options. It also provides mechanisms for handling constraints with enhancement and scaling. Attributes: method (str): The optimization method to use (e.g., 'BFGS', 'Nelder-Mead'). tol (float or None): Tolerance for termination. options (dict): Additional options to pass to the SciPy optimizer. constraint_enhancement (float): Small value added to enhance constraint handling. constraint_scaling (float): Scaling factor applied to constraints. Args: method (str): The optimization method to use (e.g., 'BFGS', 'Nelder-Mead'). tol (float or None): Tolerance for termination. """ def __init__(self, method: str = None, tol=None): super().__init__() self.method = method self.tol = tol self.options = {} self.constraint_enhancement = 0.001 self.constraint_scaling = 1. self._scheduler_address: str | None = None self.timeout = None self._time_start = None self.__n_succeeded_trials = None
[docs] def add_sub_fidelity_model(self, name: str, sub_fidelity_model: SubFidelityModel, fidelity: Fidelity): raise NotImplementedError("ScipyOptimizer does not support multiple fidelity models.")
@property def _n_succeeded_trials_in_current_optimization(self) -> int | None: # オーバーライド不可 out = None client = get_client(self._scheduler_address) if client is not None: raise NotImplementedError( "ScipyOptimizer does not support parallel computing." ) if self.__n_succeeded_trials is None: self.__n_succeeded_trials = 0 return self.__n_succeeded_trials @_n_succeeded_trials_in_current_optimization.setter def _n_succeeded_trials_in_current_optimization(self, value: int | None): # オーバーライド不可 client = get_client(self._scheduler_address) with Lock('scipy_n_succeeded_trials'): if client is not None: raise NotImplementedError('ScipyOptimizer does not support parallel computing.') self.__n_succeeded_trials = value def _per_solve_callback(self): df = self.history.get_df(equality_filters=MAIN_FILTER) self._n_succeeded_trials_in_current_optimization = ( 1 + self._n_succeeded_trials_in_current_optimization ) if self.n_trials is not None: if self._n_succeeded_trials_in_current_optimization >= self.n_trials: raise InterruptScipyMinimize if self.timeout is not None: if time() - self._time_start >= self.timeout: raise InterruptScipyMinimize
[docs] def add_trial(self, parameters: dict[str, SupportedVariableTypes]): raise NotImplementedError(_( en_message='You cannot use `add_trial()` in `ScipyOptimizer`.', jp_message='`ScipyOptimizer` では `add_trial()` は使えません。', ))
def _get_x0(self, after_finalizing_history=False) -> np.ndarray: # params を取得 params: dict[str, Parameter] = self.variable_manager.get_variables( filter='parameter', format='raw' ) for param in params.values(): if isinstance(param, CategoricalVariable): raise NotImplementedError(_( en_message='Scipy can optimize only numerical parameters.', jp_message='Scipy では数値パラメータのみ最適化できます。' )) # リスタートならば最適値で初期値を上書きする if after_finalizing_history: df = self.history.get_df() optimal_trials = df[df["optimality"]] if len(optimal_trials) > 0: optimal_trial = optimal_trials.iloc[-1] for prm_name in params.keys(): if prm_name in optimal_trial: params[prm_name].value = optimal_trial[prm_name] # fix == True のものを除いて初期値を作成 x0 = np.array([p.value for p in params.values() if not p.properties.get('fix', False)]) return x0 def _warn_bounds_for_nelder_mead(self) -> None: # https://github.com/scipy/scipy/issues/19991 if self.method.lower() != 'nelder-mead': return bounds = self._get_scipy_bounds() if bounds is None: return x0 = self._get_x0() if (np.allclose(x0, bounds[:, 0]) or np.allclose(x0, bounds[:, 1])): logger.warning(Msg.WARN_SCIPY_NELDER_MEAD_BOUND) def _setup_before_parallel(self): if not self._done_setup_before_parallel: # 並列プロセスは起動のラグがあるので # すべての並列プロセスで起算時刻を同じにしたい self._time_start = time() # 別のプロセスが計算を始めてから自分のプロセスが上書きし内容 # ここで初期化 self._n_succeeded_trials_in_current_optimization = 0 super()._setup_before_parallel() # flag inside self._warn_bounds_for_nelder_mead() def _get_scipy_bounds(self) -> np.ndarray | None: has_any_bound = False params: dict[str, Parameter] = self.variable_manager.get_variables(filter='parameter') bounds = [] for param in params.values(): assert isinstance(param, NumericParameter) bounds.append([ param.lower_bound or -np.inf, param.upper_bound or np.inf, ]) has_any_bound += ( (param.lower_bound is not None) or (param.upper_bound is not None)) if has_any_bound: bounds = np.array(bounds) else: bounds = None return bounds def _update_vm_by_xk(self, xk): vm = self.variable_manager # check interruption self._check_and_raise_interruption() # parameter suggestion params = vm.get_variables(filter='parameter') xk_list = list(xk) for name, prm in params.items(): if prm.properties.get('fix', False): # default is False continue if isinstance(prm, NumericParameter): prm.value = xk_list.pop(0) elif isinstance(prm, CategoricalParameter): raise NotImplementedError(Msg.ERR_SCIPY_NOT_IMPLEMENT_CATEGORICAL) else: raise NotImplementedError assert len(xk_list) == 0 # evaluate expressions vm.eval_expressions() # check interruption self._check_and_raise_interruption() def _scipy_constraint_fun(self, xk, cns: Constraint): self._update_vm_by_xk(xk) # update fem (very slow!) if cns.using_fem: logger.warning(Msg.WARN_USING_FEM_IN_NLC) pass_to_fem = self.variable_manager.get_variables( filter='pass_to_fem', format='raw' ) self.fem_manager.all_fems_as_a_fem.update_parameter(pass_to_fem) return cns.eval(self.fem_manager.all_fems_as_a_fem) def _get_scipy_constraints(self) -> ( None | list[NonlinearConstraint | dict] ): if len(self.constraints) == 0: return None if self.method is None: method = 'SLSQP' else: method = self.method assert method.lower() in ('cobyla', 'cobyqa', 'slsqp', 'trust-constr') out = [] for cns in self.constraints.values(): # use Constraint object if method.lower() in ('trust-constr', 'cobyqa'): if cns.hard: raise NotImplementedError( Msg.F_ERR_SCIPY_METHOD_NOT_IMPLEMENT_HARD_CONSTRAINT( method ) ) # constraint_scaling を使うためには violation を計算しなければならない # TODO: 上下両端が決められている場合は二回計算することになるのでそれを解消する if cns.lower_bound is not None: scipy_cns = NonlinearConstraint( fun=( lambda xk_, cns_=cns: ( cns.lower_bound - self._scipy_constraint_fun(xk_, cns_) ) * self.constraint_scaling + self.constraint_enhancement ), lb=-np.inf, ub=0, keep_feasible=cns.hard, finite_diff_rel_step=self.options.get('finite_diff_rel_step', None), ) out.append(scipy_cns) if cns.upper_bound is not None: scipy_cns = NonlinearConstraint( fun=( lambda xk_, cns_=cns: ( self._scipy_constraint_fun(xk_, cns_) - cns.upper_bound ) * self.constraint_scaling + self.constraint_enhancement ), lb=-np.inf, ub=0, keep_feasible=cns.hard, finite_diff_rel_step=self.options.get('finite_diff_rel_step', None), ) out.append(scipy_cns) # scipy_cns = NonlinearConstraint( # fun=lambda xk_, cns_=cns: self._scipy_constraint_fun(xk_, cns_), # lb=(cns.lower_bound or -np.inf) + self.constraint_enhancement, # ub=(cns.upper_bound or np.inf) - self.constraint_enhancement, # keep_feasible=cns.hard, # finite_diff_rel_step=self.options.get('finite_diff_rel_step', None), # ) # out.append(scipy_cns) # use dict object else: if method.lower() == 'slsqp' and not cns.hard: logger.warning(Msg.WARN_SCIPY_SLSQP_CANNOT_PROCESS_SOFT_CONSTRAINT) if method.lower() == 'cobyla' and cns.hard: logger.error( Msg.F_ERR_SCIPY_METHOD_NOT_IMPLEMENT_HARD_CONSTRAINT( method)) raise NotImplementedError( Msg.F_ERR_SCIPY_METHOD_NOT_IMPLEMENT_HARD_CONSTRAINT( method)) if cns.lower_bound is not None: scipy_cns = dict( type='ineq', fun=(lambda xk_, cns_=cns: ( self._scipy_constraint_fun(xk_, cns_) - cns_.lower_bound ) * self.constraint_scaling - self.constraint_enhancement), ) out.append(scipy_cns) if cns.upper_bound is not None: scipy_cns = dict( type='ineq', fun=(lambda xk_, cns_=cns: ( cns_.upper_bound - self._scipy_constraint_fun(xk_, cns_) ) * self.constraint_scaling - self.constraint_enhancement), ) out.append(scipy_cns) return out def _get_scipy_callback(self) -> ( Callable[[OptimizeResult, ...], ...] | Callable[[np.ndarray, ...], ...] ): return _ScipyCallback(self) class _SolveSet(AbstractOptimizer._SolveSet): def _hard_constraint_handling(self, e: HardConstraintViolation): raise NotImplementedError( Msg.ERR_SCIPY_HARD_CONSTRAINT_VIOLATION ) from e def _hidden_constraint_handling(self, e: _HiddenConstraintViolation): raise NotImplementedError( Msg.ERR_SCIPY_HIDDEN_CONSTRAINT ) from e def _skip_handling(self, e: SkipSolve): raise NotImplementedError( Msg.ERR_SCIPY_NOT_IMPLEMENT_SKIP ) from e def _objective(self, xk: np.ndarray) -> float: with self._logging(): vm = self.variable_manager # parameter suggestion self._update_vm_by_xk(xk) # construct TrialInput x = vm.get_variables(filter='parameter') # process main fidelity model solve_set = self._get_solve_set() f_return = solve_set.solve(x) self._per_solve_callback() assert f_return is not None dict_y_internal = f_return[1] y_internal: float = tuple(dict_y_internal.values())[0] # type: ignore return y_internal
[docs] def run(self): # ===== finalize ===== self._finalize() # ===== construct x0 ===== x0 = self._get_x0(after_finalizing_history=True) # ===== 終了条件 ===== # In case that setup_before_parallel is not called if self._time_start is None: self._time_start = time() if self._n_succeeded_trials_in_current_optimization is None: self._n_succeeded_trials_in_current_optimization = 0 # ===== run ===== with closing(self.fem_manager.all_fems_as_a_fem): with self._setting_status(), \ suppress(InterruptOptimization), \ suppress(InterruptScipyMinimize): try: minimize( self._objective, x0, args=(), method=self.method, bounds=self._get_scipy_bounds(), constraints=self._get_scipy_constraints(), tol=self.tol, callback=self._get_scipy_callback(), options=self.options, ) finally: self._time_start = None self._n_succeeded_trials_in_current_optimization = None