from __future__ import annotations
from typing import Callable
from contextlib import suppress
from time import time
import numpy as np
from scipy.optimize import minimize, OptimizeResult
from scipy.optimize import NonlinearConstraint
from pyfemtet._i18n import Msg, _
from pyfemtet._util.closing import closing
from pyfemtet._util.dask_util import get_client, Lock
from pyfemtet.opt.problem.variable_manager import *
from pyfemtet.opt.problem.problem import *
from pyfemtet.opt.exceptions import *
from pyfemtet.opt.history import MAIN_FILTER
from pyfemtet.logger import get_module_logger
from pyfemtet.opt.optimizer._base_optimizer import *
__all__ = [
'ScipyOptimizer',
]
logger = get_module_logger('opt.optimizer', False)
class InterruptScipyMinimize(Exception):
pass
class _ScipyCallback:
def __init__(self, opt: ScipyOptimizer):
self.opt = opt
def __call__(self, xk: np.ndarray = None, intermediate_result: OptimizeResult = None):
pass
[docs]
class ScipyOptimizer(AbstractOptimizer):
"""
Optimizer class that utilizes SciPy optimization methods.
This class serves as a wrapper around SciPy's optimization routines,
allowing customization of the optimization method, tolerance, and options.
It also provides mechanisms for handling constraints with enhancement and scaling.
Attributes:
method (str): The optimization method to use (e.g., 'BFGS', 'Nelder-Mead').
tol (float or None): Tolerance for termination.
options (dict): Additional options to pass to the SciPy optimizer.
constraint_enhancement (float): Small value added to enhance constraint handling.
constraint_scaling (float): Scaling factor applied to constraints.
Args:
method (str): The optimization method to use (e.g., 'BFGS', 'Nelder-Mead').
tol (float or None): Tolerance for termination.
"""
def __init__(self, method: str = None, tol=None):
super().__init__()
self.method = method
self.tol = tol
self.options = {}
self.constraint_enhancement = 0.001
self.constraint_scaling = 1.
self._scheduler_address: str | None = None
self.timeout = None
self._time_start = None
self.__n_succeeded_trials = None
[docs]
def add_sub_fidelity_model(self, name: str, sub_fidelity_model: SubFidelityModel, fidelity: Fidelity):
raise NotImplementedError("ScipyOptimizer does not support multiple fidelity models.")
@property
def _n_succeeded_trials_in_current_optimization(self) -> int | None: # オーバーライド不可
out = None
client = get_client(self._scheduler_address)
if client is not None:
raise NotImplementedError(
"ScipyOptimizer does not support parallel computing."
)
if self.__n_succeeded_trials is None:
self.__n_succeeded_trials = 0
return self.__n_succeeded_trials
@_n_succeeded_trials_in_current_optimization.setter
def _n_succeeded_trials_in_current_optimization(self, value: int | None): # オーバーライド不可
client = get_client(self._scheduler_address)
with Lock('scipy_n_succeeded_trials'):
if client is not None:
raise NotImplementedError('ScipyOptimizer does not support parallel computing.')
self.__n_succeeded_trials = value
def _per_solve_callback(self):
df = self.history.get_df(equality_filters=MAIN_FILTER)
self._n_succeeded_trials_in_current_optimization = (
1 + self._n_succeeded_trials_in_current_optimization
)
if self.n_trials is not None:
if self._n_succeeded_trials_in_current_optimization >= self.n_trials:
raise InterruptScipyMinimize
if self.timeout is not None:
if time() - self._time_start >= self.timeout:
raise InterruptScipyMinimize
[docs]
def add_trial(self, parameters: dict[str, SupportedVariableTypes]):
raise NotImplementedError(_(
en_message='You cannot use `add_trial()` in `ScipyOptimizer`.',
jp_message='`ScipyOptimizer` では `add_trial()` は使えません。',
))
def _get_x0(self, after_finalizing_history=False) -> np.ndarray:
# params を取得
params: dict[str, Parameter] = self.variable_manager.get_variables(
filter='parameter', format='raw'
)
for param in params.values():
if isinstance(param, CategoricalVariable):
raise NotImplementedError(_(
en_message='Scipy can optimize only numerical parameters.',
jp_message='Scipy では数値パラメータのみ最適化できます。'
))
# リスタートならば最適値で初期値を上書きする
if after_finalizing_history:
df = self.history.get_df()
optimal_trials = df[df["optimality"]]
if len(optimal_trials) > 0:
optimal_trial = optimal_trials.iloc[-1]
for prm_name in params.keys():
if prm_name in optimal_trial:
params[prm_name].value = optimal_trial[prm_name]
# fix == True のものを除いて初期値を作成
x0 = np.array([p.value for p in params.values() if not p.properties.get('fix', False)])
return x0
def _warn_bounds_for_nelder_mead(self) -> None:
# https://github.com/scipy/scipy/issues/19991
if self.method.lower() != 'nelder-mead':
return
bounds = self._get_scipy_bounds()
if bounds is None:
return
x0 = self._get_x0()
if (np.allclose(x0, bounds[:, 0])
or np.allclose(x0, bounds[:, 1])):
logger.warning(Msg.WARN_SCIPY_NELDER_MEAD_BOUND)
def _setup_before_parallel(self):
if not self._done_setup_before_parallel:
# 並列プロセスは起動のラグがあるので
# すべての並列プロセスで起算時刻を同じにしたい
self._time_start = time()
# 別のプロセスが計算を始めてから自分のプロセスが上書きし内容
# ここで初期化
self._n_succeeded_trials_in_current_optimization = 0
super()._setup_before_parallel() # flag inside
self._warn_bounds_for_nelder_mead()
def _get_scipy_bounds(self) -> np.ndarray | None:
has_any_bound = False
params: dict[str, Parameter] = self.variable_manager.get_variables(filter='parameter')
bounds = []
for param in params.values():
assert isinstance(param, NumericParameter)
bounds.append([
param.lower_bound or -np.inf,
param.upper_bound or np.inf,
])
has_any_bound += (
(param.lower_bound is not None)
or (param.upper_bound is not None))
if has_any_bound:
bounds = np.array(bounds)
else:
bounds = None
return bounds
def _update_vm_by_xk(self, xk):
vm = self.variable_manager
# check interruption
self._check_and_raise_interruption()
# parameter suggestion
params = vm.get_variables(filter='parameter')
xk_list = list(xk)
for name, prm in params.items():
if prm.properties.get('fix', False): # default is False
continue
if isinstance(prm, NumericParameter):
prm.value = xk_list.pop(0)
elif isinstance(prm, CategoricalParameter):
raise NotImplementedError(Msg.ERR_SCIPY_NOT_IMPLEMENT_CATEGORICAL)
else:
raise NotImplementedError
assert len(xk_list) == 0
# evaluate expressions
vm.eval_expressions()
# check interruption
self._check_and_raise_interruption()
def _scipy_constraint_fun(self, xk, cns: Constraint):
self._update_vm_by_xk(xk)
# update fem (very slow!)
if cns.using_fem:
logger.warning(Msg.WARN_USING_FEM_IN_NLC)
pass_to_fem = self.variable_manager.get_variables(
filter='pass_to_fem', format='raw'
)
self.fem_manager.all_fems_as_a_fem.update_parameter(pass_to_fem)
return cns.eval(self.fem_manager.all_fems_as_a_fem)
def _get_scipy_constraints(self) -> (
None
| list[NonlinearConstraint | dict]
):
if len(self.constraints) == 0:
return None
if self.method is None:
method = 'SLSQP'
else:
method = self.method
assert method.lower() in ('cobyla', 'cobyqa', 'slsqp', 'trust-constr')
out = []
for cns in self.constraints.values():
# use Constraint object
if method.lower() in ('trust-constr', 'cobyqa'):
if cns.hard:
raise NotImplementedError(
Msg.F_ERR_SCIPY_METHOD_NOT_IMPLEMENT_HARD_CONSTRAINT(
method
)
)
# constraint_scaling を使うためには violation を計算しなければならない
# TODO: 上下両端が決められている場合は二回計算することになるのでそれを解消する
if cns.lower_bound is not None:
scipy_cns = NonlinearConstraint(
fun=(
lambda xk_, cns_=cns:
(
cns.lower_bound
- self._scipy_constraint_fun(xk_, cns_)
) * self.constraint_scaling
+ self.constraint_enhancement
),
lb=-np.inf,
ub=0,
keep_feasible=cns.hard,
finite_diff_rel_step=self.options.get('finite_diff_rel_step', None),
)
out.append(scipy_cns)
if cns.upper_bound is not None:
scipy_cns = NonlinearConstraint(
fun=(
lambda xk_, cns_=cns:
(
self._scipy_constraint_fun(xk_, cns_)
- cns.upper_bound
) * self.constraint_scaling
+ self.constraint_enhancement
),
lb=-np.inf,
ub=0,
keep_feasible=cns.hard,
finite_diff_rel_step=self.options.get('finite_diff_rel_step', None),
)
out.append(scipy_cns)
# scipy_cns = NonlinearConstraint(
# fun=lambda xk_, cns_=cns: self._scipy_constraint_fun(xk_, cns_),
# lb=(cns.lower_bound or -np.inf) + self.constraint_enhancement,
# ub=(cns.upper_bound or np.inf) - self.constraint_enhancement,
# keep_feasible=cns.hard,
# finite_diff_rel_step=self.options.get('finite_diff_rel_step', None),
# )
# out.append(scipy_cns)
# use dict object
else:
if method.lower() == 'slsqp' and not cns.hard:
logger.warning(Msg.WARN_SCIPY_SLSQP_CANNOT_PROCESS_SOFT_CONSTRAINT)
if method.lower() == 'cobyla' and cns.hard:
logger.error(
Msg.F_ERR_SCIPY_METHOD_NOT_IMPLEMENT_HARD_CONSTRAINT(
method))
raise NotImplementedError(
Msg.F_ERR_SCIPY_METHOD_NOT_IMPLEMENT_HARD_CONSTRAINT(
method))
if cns.lower_bound is not None:
scipy_cns = dict(
type='ineq',
fun=(lambda xk_, cns_=cns:
(
self._scipy_constraint_fun(xk_, cns_)
- cns_.lower_bound
) * self.constraint_scaling
- self.constraint_enhancement),
)
out.append(scipy_cns)
if cns.upper_bound is not None:
scipy_cns = dict(
type='ineq',
fun=(lambda xk_, cns_=cns:
(
cns_.upper_bound
- self._scipy_constraint_fun(xk_, cns_)
) * self.constraint_scaling
- self.constraint_enhancement),
)
out.append(scipy_cns)
return out
def _get_scipy_callback(self) -> (
Callable[[OptimizeResult, ...], ...]
| Callable[[np.ndarray, ...], ...]
):
return _ScipyCallback(self)
class _SolveSet(AbstractOptimizer._SolveSet):
def _hard_constraint_handling(self, e: HardConstraintViolation):
raise NotImplementedError(
Msg.ERR_SCIPY_HARD_CONSTRAINT_VIOLATION
) from e
def _hidden_constraint_handling(self, e: _HiddenConstraintViolation):
raise NotImplementedError(
Msg.ERR_SCIPY_HIDDEN_CONSTRAINT
) from e
def _skip_handling(self, e: SkipSolve):
raise NotImplementedError(
Msg.ERR_SCIPY_NOT_IMPLEMENT_SKIP
) from e
def _objective(self, xk: np.ndarray) -> float:
with self._logging():
vm = self.variable_manager
# parameter suggestion
self._update_vm_by_xk(xk)
# construct TrialInput
x = vm.get_variables(filter='parameter')
# process main fidelity model
solve_set = self._get_solve_set()
f_return = solve_set.solve(x)
self._per_solve_callback()
assert f_return is not None
dict_y_internal = f_return[1]
y_internal: float = tuple(dict_y_internal.values())[0] # type: ignore
return y_internal
[docs]
def run(self):
# ===== finalize =====
self._finalize()
# ===== construct x0 =====
x0 = self._get_x0(after_finalizing_history=True)
# ===== 終了条件 =====
# In case that setup_before_parallel is not called
if self._time_start is None:
self._time_start = time()
if self._n_succeeded_trials_in_current_optimization is None:
self._n_succeeded_trials_in_current_optimization = 0
# ===== run =====
with closing(self.fem_manager.all_fems_as_a_fem):
with self._setting_status(), \
suppress(InterruptOptimization), \
suppress(InterruptScipyMinimize):
try:
minimize(
self._objective,
x0,
args=(),
method=self.method,
bounds=self._get_scipy_bounds(),
constraints=self._get_scipy_constraints(),
tol=self.tol,
callback=self._get_scipy_callback(),
options=self.options,
)
finally:
self._time_start = None
self._n_succeeded_trials_in_current_optimization = None