from __future__ import annotations
from typing import Callable, TypeAlias, Sequence, Literal, NamedTuple, final, Any
from numbers import Real # マイナーなので型ヒントでは使わず、isinstance で使う
from time import sleep
import os
import sympy
from pyfemtet._i18n.messages import _
from pyfemtet.opt.history import *
from pyfemtet.opt.interface import *
from pyfemtet.opt.exceptions import *
from pyfemtet.opt.worker_status import *
from pyfemtet.opt.problem.problem import *
from pyfemtet.opt.problem.variable_manager import *
from pyfemtet.opt.optimizer._trial_queue import *
from pyfemtet.logger import get_module_logger
from pyfemtet.opt.exceptions import show_experimental_warning
__all__ = [
'AbstractOptimizer',
'SubFidelityModel',
'SubFidelityModels',
'_FReturnValue',
'OptimizationDataPerFEM',
]
DIRECTION: TypeAlias = (
float
| Literal[
'minimize',
'maximize',
]
)
class _UpdateMode(NamedTuple):
update_function: bool
update_fem: bool
logger = get_module_logger('opt.optimizer', False)
def _log_hidden_constraint(e: Exception):
err_msg = create_err_msg_from_exception(e)
logger.warning(_('----- Hidden constraint violation! -----'))
logger.warning(_('error: {err_msg}', err_msg=err_msg))
_FReturnValue: TypeAlias = tuple[TrialOutput, dict[str, float], TrialConstraintOutput, Record]
def _duplicated_name_check(name, names):
if name in names:
logger.warning(
_(
en_message='There are duplicated name {name}. If there are '
'duplicate names for parameters or objective '
'functions, the later defined ones will overwrite '
'the earlier ones. Please be careful to ensure '
'that this overwriting is intentional.',
jp_message='名前 {name} が重複しています。パラメータや目的関数'
'などの名前が重複すると、後から定義したものが上書き'
'されます。この上書きが意図したものであるかどうか、'
'十分に注意してください。',
name=name,
)
)
# 最適化問題の情報をストアするクラス。
class OptimizationData:
def __init__(self):
self._initialize_problem()
def _initialize_objectives(self): # SurrogateModel で使用
self.objectives = Objectives()
def _initialize_problem(self):
self.variable_manager = VariableManager()
self._initialize_objectives()
self.constraints = Constraints()
self.other_outputs = Functions()
def add_constant_value(
self,
name: str,
value: SupportedVariableTypes,
properties: dict[str, Any] | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> Variable:
var: Variable
# noinspection PyUnreachableCode
if isinstance(value, Real):
var = NumericVariable()
elif isinstance(value, str):
var = CategoricalVariable()
else:
raise ValueError(_(
en_message='Supported variable types are Real or str, got {type}',
jp_message='サポートされている変数の型は Real か str ですが、{type} が指定されました。',
type=type(value),
))
var.name = name
var.value = value
var.pass_to_fem = pass_to_fem
var.properties = properties if properties is not None else {}
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.variable_manager.variables.keys())
self.variable_manager.set_variable(var)
return var
def add_parameter(
self,
name: str,
initial_value: float | None = None,
lower_bound: float | None = None,
upper_bound: float | None = None,
step: float | None = None,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
fix: bool = False,
supress_duplicated_name_check: bool = False,
) -> NumericParameter:
properties = properties if properties is not None else {}
if fix:
properties.update({'fix': True})
prm = NumericParameter()
prm.name = name
prm.value = initial_value
prm.lower_bound = lower_bound
prm.upper_bound = upper_bound
prm.step = step
prm.properties = properties
prm.pass_to_fem = pass_to_fem
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.variable_manager.variables.keys())
self.variable_manager.set_variable(prm)
return prm
def add_expression_string(
self,
name: str,
expression_string: str,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> ExpressionFromString:
var = ExpressionFromString()
var.name = name
var._expr = ExpressionFromString.InternalClass(
expression_string=expression_string,
)
var.properties = properties or dict()
var.pass_to_fem = pass_to_fem
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.variable_manager.variables.keys())
self.variable_manager.set_variable(var)
return var
def add_expression_sympy(
self,
name: str,
sympy_expr: sympy.Expr,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> ExpressionFromString:
var = ExpressionFromString()
var.name = name
var._expr = ExpressionFromString.InternalClass(sympy_expr=sympy_expr)
var.properties = properties or dict()
var.pass_to_fem = pass_to_fem
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.variable_manager.variables.keys())
self.variable_manager.set_variable(var)
return var
def add_expression(
self,
name: str,
fun: Callable[..., float],
properties: dict[str, ...] | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> ExpressionFromFunction:
var = ExpressionFromFunction()
var.name = name
var.fun = fun
var.args = args or tuple()
var.kwargs = kwargs or dict()
var.properties = properties or dict()
var.pass_to_fem = pass_to_fem
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.variable_manager.variables.keys())
self.variable_manager.set_variable(var)
return var
def add_categorical_parameter(
self,
name: str,
initial_value: SupportedVariableTypes | None = None,
choices: list[SupportedVariableTypes] | None = None,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
fix: bool = False,
supress_duplicated_name_check: bool = False,
) -> CategoricalParameter:
properties = properties if properties is not None else {}
if fix:
properties.update({'fix': True})
prm = CategoricalParameter()
prm.name = name
prm.value = initial_value
prm.choices = choices
prm.properties = properties
prm.pass_to_fem = pass_to_fem
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.variable_manager.variables.keys())
self.variable_manager.set_variable(prm)
return prm
def add_objective(
self,
name: str,
fun: Callable[..., float],
direction: DIRECTION = 'minimize',
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
) -> Objective:
obj = Objective()
obj.fun = fun
obj.args = args or ()
obj.kwargs = kwargs or {}
obj.direction = direction
obj.fem_ctx = None
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.objectives.keys())
self.objectives.update({name: obj})
return obj # Context で fem_ctx をセットするために返す
def add_objectives(
self,
names: str | list[str],
fun: Callable[..., Sequence[float]],
n_return: int,
directions: DIRECTION | Sequence[DIRECTION | None] | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
) -> list[Objective]:
# argument processing
# noinspection PyUnreachableCode
if isinstance(names, str):
names = [f'{names}_{i}' for i in range(n_return)]
elif isinstance(names, Sequence):
# names = names
pass
else:
raise ValueError(
_(en_message='`names` must be a string or an array of strings.',
jp_message='`names` は文字列か文字列の配列でなければなりません。',)
)
if directions is None:
directions = ['minimize' for __ in range(n_return)]
else:
if isinstance(directions, str) or isinstance(directions, Real):
directions = [directions for __ in range(n_return)]
else:
# directions = directions
pass
assert len(names) == len(directions) == n_return
out = []
function_factory = ObjectivesFunc(fun, n_return)
for i, (name, direction) in enumerate(zip(names, directions)):
fun_i = function_factory.get_fun_that_returns_ith_value(i)
out.append(self.add_objective(
fun=fun_i,
name=name,
direction=direction,
args=args,
kwargs=kwargs,
supress_duplicated_name_check=supress_duplicated_name_check,
))
return out
def add_constraint(
self,
name: str,
fun: Callable[..., float],
lower_bound: float | None = None,
upper_bound: float | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
strict: bool = True,
using_fem: bool | None = None,
supress_duplicated_name_check: bool = False,
) -> Constraint:
if lower_bound is None and upper_bound is None:
raise ValueError(_(
en_message='One of `lower_bound` and `upper_bound` '
'should be set.',
jp_message='`lower_bound` and `upper_bound` のうち'
'少なくとも一つは指定されなければなりません。'
))
cns = Constraint()
cns.fun = fun
cns.args = args or ()
cns.kwargs = kwargs or {}
cns.lower_bound = lower_bound
cns.upper_bound = upper_bound
cns.hard = strict
cns.fem_ctx = None
cns.using_fem = using_fem
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.constraints.keys())
self.constraints.update({name: cns})
return cns # Context で fem_ctx をセットするために返す
def add_other_output(
self,
name: str,
fun: Callable[..., float],
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
):
other_func = Function()
other_func.fun = fun
other_func.args = args or ()
other_func.kwargs = kwargs or {}
other_func.fem_ctx = None
if not supress_duplicated_name_check:
_duplicated_name_check(name, self.other_outputs.keys())
self.other_outputs.update({name: other_func})
return other_func
# 最適化問題の関数を FEM ごとに管理して
# 実行制御とかを行うクラス
class OptimizationDataPerFEM(OptimizationData):
fem: AbstractFEMInterface
def __init__(self, fem: AbstractFEMInterface):
self.fem = fem
super().__init__()
def _initialize_problem(self):
super()._initialize_problem()
self._done_load_problem_from_fem = False
# TODO: Constraint が using_fem check を使うためだけにこの実装は過剰。
@staticmethod
def _with_add_fem_ctx(f):
def wrapper(self, *args, **kwargs):
def add_fem_ctx(something):
if isinstance(something, Function):
something.fem_ctx = self
out = f(self, *args, **kwargs)
if isinstance(out, Sequence):
for item in out:
add_fem_ctx(item)
else:
add_fem_ctx(out)
return out
return wrapper
@_with_add_fem_ctx
def add_constraint(
self,
name: str,
fun: Callable[..., float],
lower_bound: float | None = None,
upper_bound: float | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
strict: bool = True,
using_fem: bool | None = None,
supress_duplicated_name_check: bool = False,
):
return super().add_constraint(
name=name,
fun=fun,
lower_bound=lower_bound,
upper_bound=upper_bound,
args=args,
kwargs=kwargs,
strict=strict,
using_fem=using_fem,
supress_duplicated_name_check=supress_duplicated_name_check,
)
@_with_add_fem_ctx
def add_objective(
self,
name: str,
fun: Callable[..., float],
direction: DIRECTION = 'minimize',
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
) -> Objective:
return super().add_objective(
name=name,
fun=fun,
direction=direction,
args=args,
kwargs=kwargs,
supress_duplicated_name_check=supress_duplicated_name_check,
)
@_with_add_fem_ctx
def add_objectives(
self,
names: str | list[str],
fun: Callable[..., Sequence[float]],
n_return: int,
directions: DIRECTION | Sequence[DIRECTION | None] | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
) -> list[Objective]:
return super().add_objectives(
names=names,
fun=fun,
n_return=n_return,
directions=directions,
args=args,
kwargs=kwargs,
supress_duplicated_name_check=supress_duplicated_name_check,
)
@_with_add_fem_ctx
def add_other_output(
self,
name: str,
fun: Callable[..., float],
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
):
return super().add_other_output(
name=name,
fun=fun,
args=args,
kwargs=kwargs,
supress_duplicated_name_check=supress_duplicated_name_check,
)
def _y_common(self, fem: AbstractFEMInterface) -> TrialOutput:
out = TrialOutput()
for name, obj in self.objectives.items():
obj_result = ObjectiveResult(obj, fem)
out.update({name: obj_result})
return out
def _y(self) -> TrialOutput:
return self._y_common(self.fem)
def _convert_y(self, y: TrialOutput) -> dict:
out = dict()
for name, result in y.items():
obj = self.objectives[name]
value_internal = obj.convert(result.value)
out.update({name: value_internal})
return out
def _c_common(
self,
out: TrialConstraintOutput,
fem: AbstractFEMInterface,
hard: bool,
) -> TrialConstraintOutput:
for name, cns in self.constraints.items():
if cns.hard == hard:
cns_result = ConstraintResult(cns, fem)
out.update({name: cns_result})
return out
def _hard_c(self, out: TrialConstraintOutput) -> TrialConstraintOutput:
return self._c_common(out, self.fem, hard=True)
def _soft_c(self, out: TrialConstraintOutput) -> TrialConstraintOutput:
return self._c_common(out, self.fem, hard=False)
def _other_outputs_common(self, out: TrialFunctionOutput, fem: AbstractFEMInterface) -> TrialFunctionOutput:
for name, other_func in self.other_outputs.items():
other_func_result = FunctionResult(other_func, fem)
out.update({name: other_func_result})
return out
def _other_outputs(self, out: TrialFunctionOutput) -> TrialFunctionOutput:
return self._other_outputs_common(out, self.fem)
def _load_problem_from_fem(self):
self.fem.load_variables(self)
self.fem.load_objectives(self)
self.fem.load_constraints(self)
class FEMListForGlobal(FEMListInterface):
# 単一 FEM しか使わない場合に list を意識させないため
# 長さ 1 の時は要素を返す処理が方々で必要
@property
def object_pass_to_fun(self):
out = super().object_pass_to_fun
if len(out) == 1:
return out[0]
else:
return out
# GlobalOptimizationData が これらを操作すると二重操作になる。
def update_parameter(self, x: TrialInput) -> None:
pass
def update(self):
pass
def _check_param_and_raise(self, prm_name) -> None:
pass
# ユーザーが操作できないようにする
def append(self, fem: AbstractFEMInterface):
raise RuntimeError("Invalid operation. Use `AbstractOptimizer.fem_manager.append` instead.")
def _append(self, fem: AbstractFEMInterface):
FEMListInterface.append(self, fem)
# 特定の FEM に紐づかない最適化問題データを管理する
class GlobalOptimizationData(OptimizationDataPerFEM):
fem: FEMListForGlobal
# 特定の FEM とそれに紐づく最適化問題データを管理する
class FEMAndDataConnectionManager:
_contexts: list[OptimizationDataPerFEM]
_fems: list[AbstractFEMInterface]
def __init__(self):
self.all_fems_as_a_fem = FEMListForGlobal()
self.global_data = GlobalOptimizationData(self.all_fems_as_a_fem)
self._fems = []
self._contexts = []
@property
def fems(self) -> tuple[AbstractFEMInterface, ...]:
return tuple(self._fems)
@property
def contexts(self) -> tuple[OptimizationDataPerFEM, ...]:
return tuple(self._contexts + [self.global_data])
def append(self, fem: AbstractFEMInterface) -> OptimizationDataPerFEM:
self._fems.append(fem)
self.all_fems_as_a_fem._append(fem)
ctx = OptimizationDataPerFEM(fem)
self._contexts.append(ctx)
return ctx
[docs]
class AbstractOptimizer(OptimizationData):
# optimize
n_trials: int | None
timeout: float | None
seed: int | None
include_queued_in_n_trials: bool
# problem
fidelity: Fidelity | None
sub_fidelity_name: str
sub_fidelity_models: SubFidelityModels | None
# system
history: History
entire_status: WorkerStatus
worker_status: WorkerStatus
worker_status_list: list[WorkerStatus]
_done_setup_before_parallel: bool
_worker_index: int | str | None
_worker_name: str | None
def __init__(self):
super().__init__()
# optimization
self.seed = None
self.n_trials = None
self.timeout = None
self.include_queued_in_n_trials = False
# multi-fidelity
self.fidelity = None
self.sub_fidelity_name = MAIN_FIDELITY_NAME
self.sub_fidelity_models = SubFidelityModels()
# System
self.fem_manager = FEMAndDataConnectionManager()
self.history: History = History()
self.solve_condition: Callable[[History], bool] = lambda _: True
self.termination_condition: Callable[[History], bool] = lambda _: False
self.entire_status: WorkerStatus = WorkerStatus(ENTIRE_PROCESS_STATUS_KEY)
self.worker_status: WorkerStatus = WorkerStatus('worker-status')
self.worker_status_list: list[WorkerStatus] = [self.worker_status]
self.trial_queue: TrialQueue = TrialQueue()
self._done_setup_before_parallel = False
self._worker_index: int | str | None = None
self._worker_name: str | None = None
@property
def fem(self) -> AbstractFEMInterface | tuple[AbstractFEMInterface, ...]:
fems: tuple[AbstractFEMInterface, ...] = self.fem_manager.fems
if len(fems) == 1:
return fems[0]
else:
return fems
@fem.setter
def fem(self, value: AbstractFEMInterface):
# FEMList は初期化しつつ、
# global_data は初期化しないようにする
self.fem_manager.all_fems_as_a_fem = FEMListForGlobal()
self.fem_manager.global_data.fem = self.fem_manager.all_fems_as_a_fem
self.fem_manager.append(value)
[docs]
def add_fem(self, fem: AbstractFEMInterface) -> OptimizationDataPerFEM:
show_experimental_warning('add_fem', logger)
return self.fem_manager.append(fem)
# @property
# def global_data(self) -> GlobalOptimizationData:
# return self.fem_manager.global_data
# ===== public =====
@staticmethod
def _dispatch_global_data_store(f):
def wrapper(self: AbstractOptimizer, *args, **kwargs):
instance: GlobalOptimizationData = self.fem_manager.global_data
method_name = f.__name__
method = getattr(
instance,
method_name,
)
out = method(*args, **kwargs)
self._refresh_problem()
return out
return wrapper
[docs]
@_dispatch_global_data_store
def add_constant_value(
self,
name: str,
value: SupportedVariableTypes,
properties: dict[str, Any] | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
):
pass
[docs]
@_dispatch_global_data_store
def add_parameter(
self,
name: str,
initial_value: float | None = None,
lower_bound: float | None = None,
upper_bound: float | None = None,
step: float | None = None,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
fix: bool = False,
supress_duplicated_name_check: bool = False,
) -> None:
pass
[docs]
@_dispatch_global_data_store
def add_expression_string(
self,
name: str,
expression_string: str,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> None:
pass
[docs]
@_dispatch_global_data_store
def add_expression_sympy(
self,
name: str,
sympy_expr: sympy.Expr,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> None:
pass
[docs]
@_dispatch_global_data_store
def add_expression(
self,
name: str,
fun: Callable[..., float],
properties: dict[str, ...] | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
*,
pass_to_fem: bool = True,
supress_duplicated_name_check: bool = False,
) -> None:
pass
[docs]
@_dispatch_global_data_store
def add_categorical_parameter(
self,
name: str,
initial_value: SupportedVariableTypes | None = None,
choices: list[SupportedVariableTypes] | None = None,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
fix: bool = False,
supress_duplicated_name_check: bool = False,
) -> None:
pass
[docs]
@_dispatch_global_data_store
def add_objective(
self,
name: str,
fun: Callable[..., float],
direction: DIRECTION = 'minimize',
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
) -> None:
pass
[docs]
@_dispatch_global_data_store
def add_objectives(
self,
names: str | list[str],
fun: Callable[..., Sequence[float]],
n_return: int,
directions: DIRECTION | Sequence[DIRECTION | None] | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
):
pass
[docs]
@_dispatch_global_data_store
def add_constraint(
self,
name: str,
fun: Callable[..., float],
lower_bound: float | None = None,
upper_bound: float | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
strict: bool = True,
using_fem: bool | None = None,
supress_duplicated_name_check: bool = False,
):
pass
[docs]
@_dispatch_global_data_store
def add_other_output(
self,
name: str,
fun: Callable[..., float],
args: tuple | None = None,
kwargs: dict | None = None,
supress_duplicated_name_check: bool = False,
):
pass
[docs]
def add_sub_fidelity_model(
self,
name: str,
sub_fidelity_model: SubFidelityModel,
fidelity: Fidelity,
):
sub_fidelity_model.variable_manager = self.variable_manager
if self.sub_fidelity_models is None:
self.sub_fidelity_models = SubFidelityModels()
self.fidelity = 1.
_duplicated_name_check(name, self.sub_fidelity_models.keys())
self.sub_fidelity_models._update(name, sub_fidelity_model, fidelity)
[docs]
def add_trial(
self,
parameters: dict[str, SupportedVariableTypes],
):
self.trial_queue.enqueue(parameters)
[docs]
def get_variables(self, format: Literal['dict', 'values', 'raw'] = 'dict'):
return self.variable_manager.get_variables(
format=format,
)
[docs]
def get_parameter(self, format: Literal['dict', 'values', 'raw'] = 'dict'):
return self.variable_manager.get_variables(
format=format, filter='parameter'
)
[docs]
def set_solve_condition(self, fun: Callable[[History], bool]):
self.solve_condition = fun
[docs]
def set_termination_condition(self, fun: Callable[[History], bool] | None):
if fun is None:
self.termination_condition = lambda _: False
else:
self.termination_condition = fun
show_experimental_warning('set_termination_condition', logger)
# ===== private =====
def _setup_enqueued_trials(self):
# Insert initial trial
params: dict = self.variable_manager.get_variables(format='dict', filter='parameter')
self.trial_queue.enqueue_first(params, flags={_IS_INITIAL_TRIAL_FLAG_KEY: True})
# Remove trials included in history
tried: list[dict] = get_tried_list_from_history(
self.history,
equality_filters=MAIN_FILTER,
)
self.trial_queue.remove_tried(tried)
# Remove duplicated
self.trial_queue.remove_duplicated()
def _should_solve(self, history):
return self.solve_condition(history)
@final
def _get_hard_constraint_violation_names(self, hard_c: TrialConstraintOutput) -> list[str]:
violation_names = []
for name, result in hard_c.items():
cns = self.constraints[name]
l_or_u = result.check_violation()
if l_or_u is not None:
logger.warning(_('----- Hard constraint violation! -----'))
logger.warning(_('constraint: {name}', name=name))
logger.warning(_('evaluated value: {value}', value=result.value))
if l_or_u == 'lower_bound':
logger.info(_('lower bound: {lb}', lb=cns.lower_bound))
violation_names.append(name + '_' + l_or_u)
elif l_or_u == 'upper_bound':
logger.info(_('upper bound: {ub}', ub=cns.upper_bound))
violation_names.append(name + '_' + l_or_u)
else:
raise NotImplementedError
return violation_names
def _check_and_raise_interruption(self):
# raise Interrupt
interrupted = self.entire_status.value >= WorkerStatus.interrupting
if interrupted:
self.worker_status.value = WorkerStatus.interrupting
raise InterruptOptimization
# ===== solve =====
class _SolveSet:
opt: AbstractOptimizer
opt_: AbstractOptimizer
def __init__(self, opt: AbstractOptimizer):
self.opt: AbstractOptimizer = opt
self.subsampling_idx: SubSampling | None = None
def _preprocess(self):
pass
def _hard_constraint_handling(self, e: HardConstraintViolation):
pass
def _hidden_constraint_handling(self, e: _HiddenConstraintViolation):
pass
def _skip_handling(self, e: SkipSolve):
pass
def _if_succeeded(self, f_return: _FReturnValue):
pass
def _postprocess(self):
if self.opt.termination_condition(self.opt.history):
self.opt.entire_status.value = WorkerStatus.interrupting
def _solve_or_raise(
self,
opt_: AbstractOptimizer,
parameters: TrialInput,
history: History = None,
trial_id=None,
) -> _FReturnValue:
# create context
if history is not None:
record_to_history = history.recording(opt_.fem_manager.fems)
else:
class DummyRecordContext:
def __enter__(self):
return Record()
def __exit__(self, exc_type, exc_val, exc_tb):
pass
record_to_history = DummyRecordContext()
# context manager for preprocessing and postprocessing
class AllFEMPrePostPerFidelity:
# noinspection PyMethodParameters
def __enter__(self_):
opt_.fem_manager.all_fems_as_a_fem.trial_postprocess_per_fidelity()
return self_
# noinspection PyMethodParameters
def __exit__(self_, exc_type, exc_val, exc_tb):
opt_.fem_manager.all_fems_as_a_fem.trial_postprocess_per_fidelity()
# processing with recording
# postprocess_after_recording より後に trial_postprocess を
# 実行するために、record_to_history の前に AllFEMPrePostPerFidelity を使う
with AllFEMPrePostPerFidelity(), record_to_history as record:
# output
empty_c: TrialConstraintOutput = TrialConstraintOutput()
empty_other_outputs: TrialFunctionOutput = TrialFunctionOutput()
empty_y = TrialOutput()
# (Required for direction recording to graph even if the value is nan)
for ctx in opt_.fem_manager.contexts:
empty_y.update({
obj_name: ObjectiveResult(obj, ctx.fem, float('nan'))
for obj_name, obj in ctx.objectives.items()
})
y_internal: dict = {}
# record common result
record.x = parameters
record.y = empty_y
record.c = empty_c
record.other_outputs = empty_other_outputs
record.sub_sampling = self.subsampling_idx
record.trial_id = trial_id
record.sub_fidelity_name = opt_.sub_fidelity_name
record.fidelity = opt_.fidelity
if self.opt._worker_name is not None:
record.messages.append(self.opt._worker_name)
# check skip
if not opt_._should_solve(history):
record.state = TrialState.skipped
raise SkipSolve
from contextlib import nullcontext
# start solve per FEM
if opt_.sub_fidelity_name != MAIN_FIDELITY_NAME:
logger.info('----------')
logger.info(_('fidelity: ({name})', name=opt_.sub_fidelity_name))
for ctx in opt_.fem_manager.contexts:
logger.info(_('input variables:'))
logger.info(parameters)
# ===== update FEM parameter =====
with nullcontext():
# 共通
pass_to_fem = self.opt.fem_manager.global_data.variable_manager.get_variables(
filter="pass_to_fem",
format="raw",
)
# FEM ごとのものを追加
pass_to_fem.update(ctx.variable_manager.get_variables(
filter="pass_to_fem",
format="raw",
))
logger.info(_('updating variables...'))
ctx.fem.update_parameter(pass_to_fem)
opt_._check_and_raise_interruption()
# ===== evaluate hard constraint =====
with nullcontext():
ctx_hard_c = TrialConstraintOutput()
logger.info(_('evaluating constraint functions...'))
try:
ctx._hard_c(ctx_hard_c)
record.c.update(ctx_hard_c)
except _HiddenConstraintViolation as e:
_log_hidden_constraint(e)
record.c.update(ctx_hard_c)
record.state = (
TrialState.get_corresponding_state_from_exception(e)
)
record.messages.append(
_('Hidden constraint violation '
'during hard constraint function '
'evaluation: ')
+ create_err_msg_from_exception(e)
)
raise e
# check hard constraint violation
violation_names = opt_._get_hard_constraint_violation_names(ctx_hard_c)
if len(violation_names) > 0:
record.state = TrialState.hard_constraint_violation
record.messages.append(
_('Hard constraint violation: ')
+ ', '.join(violation_names))
raise HardConstraintViolation
# ===== update FEM =====
with nullcontext():
logger.info(_('Solving FEM...'))
try:
ctx.fem.update()
opt_._check_and_raise_interruption()
# if hidden constraint violation
except _HiddenConstraintViolation as e:
opt_._check_and_raise_interruption()
_log_hidden_constraint(e)
record.state = (
TrialState.get_corresponding_state_from_exception(e)
)
record.messages.append(
_("Hidden constraint violation in FEM update: ")
+ create_err_msg_from_exception(e)
)
raise e
# ===== evaluate y =====
with nullcontext():
logger.info(_('evaluating objective functions...'))
try:
ctx_y: TrialOutput = ctx._y()
record.y.update(ctx_y)
y_internal.update(ctx._convert_y(ctx_y))
opt_._check_and_raise_interruption()
# if intentional error (by user)
except _HiddenConstraintViolation as e:
record.y.update(ctx_y)
opt_._check_and_raise_interruption()
_log_hidden_constraint(e)
record.state = TrialState.get_corresponding_state_from_exception(e)
record.messages.append(
_('Hidden constraint violation during '
'objective function evaluation: ')
+ create_err_msg_from_exception(e))
raise e
# ===== evaluate soft constraint =====
with nullcontext():
ctx_soft_c = TrialConstraintOutput()
logger.info(_('evaluating remaining constraints...'))
try:
ctx._soft_c(ctx_soft_c)
record.c.update(ctx_soft_c)
# if intentional error (by user)
except _HiddenConstraintViolation as e:
_log_hidden_constraint(e)
record.c.update(ctx_soft_c)
record.state = TrialState.get_corresponding_state_from_exception(e)
record.messages.append(
_('Hidden constraint violation during '
'soft constraint function evaluation: ')
+ create_err_msg_from_exception(e))
raise e
# ===== evaluate other functions =====
with nullcontext():
logger.info(_('evaluating other functions...'))
ctx_other_outputs = TrialFunctionOutput()
try:
ctx._other_outputs(ctx_other_outputs)
record.other_outputs.update(ctx_other_outputs)
# if intentional error (by user)
except _HiddenConstraintViolation as e:
_log_hidden_constraint(e)
record.other_outputs.update(ctx_other_outputs)
record.state = (
TrialState.get_corresponding_state_from_exception(e)
)
record.messages.append(
_(
"Hidden constraint violation during "
"another output function evaluation: "
)
+ create_err_msg_from_exception(e)
)
raise e
logger.info(_('output:'))
logger.info(record.y)
record.state = TrialState.succeeded
opt_._check_and_raise_interruption()
return record.y, y_internal, record.c, record
def solve(
self,
x: TrialInput,
opt_: AbstractOptimizer | None = None,
trial_id: str = None,
) -> _FReturnValue | None:
"""Nothing will be raised even if infeasible."""
vm = self.opt.variable_manager
# opt_ はメインの opt 又は sub_fidelity
opt_ = opt_ or self.opt
self.opt_ = opt_
# check interruption
self.opt._check_and_raise_interruption()
# if opt_ is not self, update variable manager
opt_.variable_manager = vm
# noinspection PyMethodParameters
class Process:
def __enter__(self_):
# preprocess
self._preprocess()
def __exit__(self_, exc_type, exc_val, exc_tb):
# postprocess
self._postprocess()
with Process():
# declare output
f_return = None
# start solve
try:
f_return = self._solve_or_raise(
opt_=opt_,
parameters=x,
history=self.opt.history,
trial_id=trial_id,
)
except HardConstraintViolation as e:
self._hard_constraint_handling(e)
except _HiddenConstraintViolation as e:
self._hidden_constraint_handling(e)
except SkipSolve as e:
self._skip_handling(e)
else:
self._if_succeeded(f_return)
# check interruption
self.opt._check_and_raise_interruption()
return f_return
def solve_all_fidelity(self, x: TrialInput) -> dict[str, _FReturnValue | None]:
out: dict[str, _FReturnValue | None] = dict()
class AllFEMPrePostPerTrial:
# noinspection PyMethodParameters
def __enter__(self_):
self.opt.fem_manager.all_fems_as_a_fem.trial_preprocess()
for sub in self.opt.sub_fidelity_models.values():
sub.fem_manager.all_fems_as_a_fem.trial_preprocess()
return self_
# noinspection PyMethodParameters
def __exit__(self_, exc_type, exc_val, exc_tb):
self.opt.fem_manager.all_fems_as_a_fem.trial_postprocess()
for sub in self.opt.sub_fidelity_models.values():
sub.fem_manager.all_fems_as_a_fem.trial_postprocess()
with AllFEMPrePostPerTrial():
main_fidelity = self.opt
f_return = self.solve(x)
out[main_fidelity.sub_fidelity_name] = f_return
for sub_fidelity_name, sub_fidelity in main_fidelity.sub_fidelity_models.items():
f_return = self.solve(x, sub_fidelity)
out[sub_fidelity_name] = f_return
return out
def _get_solve_set(self):
return self._SolveSet(self)
# ===== run and setup =====
def _setting_status(self):
# noinspection PyMethodParameters
class _SettingStatus:
def __enter__(self_):
pass
def __exit__(self_, exc_type, exc_val, exc_tb):
if exc_type is not None:
if self.worker_status.value < WorkerStatus.crashed:
self.worker_status.value = WorkerStatus.crashed
else:
if self.worker_status.value < WorkerStatus.finishing:
self.worker_status.value = WorkerStatus.finishing
return _SettingStatus()
[docs]
def run(self) -> None:
raise NotImplementedError
def _run(
self,
worker_idx: int | str,
worker_name: str,
history: History,
entire_status: WorkerStatus,
worker_status: WorkerStatus,
worker_status_list: list[WorkerStatus],
wait_other_process_setup: bool,
) -> None:
self.history = history
self.entire_status = entire_status
self.worker_status = worker_status
self._worker_index = worker_idx
self._worker_name = worker_name
self.worker_status_list = worker_status_list
class SettingStatusFinished:
# noinspection PyMethodParameters
def __enter__(self_):
pass
# noinspection PyMethodParameters
def __exit__(self_, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.worker_status.value = WorkerStatus.crashed
import sys
from traceback import print_tb
print(
_(
'===== Exception raised in worker {worker_idx} =====',
worker_idx=worker_idx
),
file=sys.stderr
)
print_tb(exc_tb)
print(
_(
'{name}: {exc_val}',
name=exc_type.__name__,
exc_val=exc_val
),
file=sys.stderr
)
else:
self.worker_status.value = WorkerStatus.finished
logger.info(_(
en_message='worker `{worker}` started.',
worker=worker_name,
))
with SettingStatusFinished():
self.worker_status.value = WorkerStatus.initializing
self.worker_status.value = WorkerStatus.launching_fem
self.fem_manager.all_fems_as_a_fem._setup_after_parallel(self)
if wait_other_process_setup:
self.worker_status.value = WorkerStatus.waiting
while True:
self._check_and_raise_interruption()
# 他のすべての worker_status が wait 以上になったら break
logger.debug([ws.value for ws in worker_status_list])
if all([ws.value >= WorkerStatus.waiting
for ws in worker_status_list]):
# リソースの競合等を避けるため
# break する前に index 秒待つ
if isinstance(worker_idx, str):
wait_second = 0.
else:
wait_second = int(worker_idx + 1)
sleep(wait_second)
break
sleep(1)
self.worker_status.value = WorkerStatus.running
if os.environ.get('DEBUG_FEMOPT_PARALLEL'):
if isinstance(worker_idx, int):
sleep((worker_idx + 1) * 2)
self.run()
logger.info(
_(
en_message="worker `{worker}` successfully finished!",
worker=worker_name,
)
)
def _logging(self):
# noinspection PyMethodParameters
class LoggingOutput:
def __enter__(self_):
df = self.history.get_df(
equality_filters=MAIN_FILTER
)
self_.count = len(df) + 1
succeeded_count = len(df[df['state'] == TrialState.succeeded])
succeeded_text = _(
en_message='{succeeded_count} succeeded trials',
jp_message='成功した試行数: {succeeded_count}',
succeeded_count=succeeded_count,
)
logger.info(f'▼▼▼▼▼ solve {self_.count} ({succeeded_text}) start ▼▼▼▼▼')
def __exit__(self_, exc_type, exc_val, exc_tb):
logger.info(f'▲▲▲▲▲ solve {self_.count} end ▲▲▲▲▲\n')
return LoggingOutput()
def _refresh_problem(self):
# ctx の内容が減っている場合でも検出できるように初期化
self._initialize_problem()
for ctx in self.fem_manager.contexts:
# femlist が _load すると各 FEMInterface が
# 想定するのと異なる object_pass_to_fun (=Sequence)が
# 渡されてしまうので、
# 各 context のみが _load_... を行う必要がある。
if not isinstance(ctx, GlobalOptimizationData):
ctx._load_problem_from_fem()
# optimizer に反映する
self.objectives.update(ctx.objectives)
self.constraints.update(ctx.constraints)
self.other_outputs.update(ctx.other_outputs)
self.variable_manager.variables.update(ctx.variable_manager.variables)
# 特殊処理が必要な場合は最後に fem の責任で行う
for ctx in self.fem_manager.contexts:
if not isinstance(ctx, GlobalOptimizationData):
ctx.fem.contact_to_optimizer(self, self.fem_manager.global_data, ctx)
# noinspection PyMethodMayBeStatic
def _get_additional_data(self) -> dict:
return dict()
def _collect_additional_data(self) -> dict:
additional_data = {}
additional_data.update(self._get_additional_data())
additional_data.update(self.fem_manager.all_fems_as_a_fem._get_additional_data())
return additional_data
def _finalize_history(self):
if not self.history._finalized:
parameters = self.variable_manager.get_variables(
filter='parameter', format='raw'
)
self.history.finalize(
parameters=parameters,
obj_names=list(self.objectives.keys()),
cns_names=list(self.constraints.keys()),
other_output_names=list(self.other_outputs.keys()),
sub_fidelity_names=[self.sub_fidelity_name] + list(self.sub_fidelity_models.keys()),
additional_data=self._collect_additional_data()
)
def _setup_before_parallel(self):
if not self._done_setup_before_parallel:
# check compatibility with fem if needed
for ctx in self.fem_manager.contexts:
# 共通
for variable in self.fem_manager.global_data.variable_manager.variables.values():
if variable.pass_to_fem:
ctx.fem._check_param_and_raise(variable.name)
# ctx ごと
for variable in ctx.variable_manager.variables.values():
if variable.pass_to_fem:
ctx.fem._check_param_and_raise(variable.name)
# check the enqueued trials is
# compatible with current optimization
# problem setup
history_set = set(self.history.prm_names)
for t in self.trial_queue.queue:
params: dict = t.d
enqueued_set: set = set(params.keys())
# Warning if over
if len(enqueued_set - history_set) > 0:
logger.warning(
_(
en_message='Enqueued parameter set contains '
'more parameters than the optimization '
'problem setup. The extra parameters '
'will be ignored.\n'
'Enqueued set: {enqueued_set}\n'
'Setup set: {history_set}\n'
'Parameters ignored: {over_set}',
jp_message='予約された入力変数セットは'
'最適化のセットアップで指定されたよりも'
'多くの変数を含んでいます。'
'そのような変数は無視されます。\n'
'予約された入力変数: {enqueued_set}\n'
'最適化する変数: {history_set}\n'
'無視される変数: {over_set}',
enqueued_set=enqueued_set,
history_set=history_set,
over_set=enqueued_set - history_set,
)
)
# Error if not enough
if len(history_set - enqueued_set) > 0:
raise ValueError(
_(
en_message="The enqueued parameter set lacks "
"some parameters to be optimized.\n"
"Enqueued set: {enqueued_set}\n"
"Parameters to optimize: {history_set}\n"
"Lacked set: {lacked_set}",
jp_message="予約された入力変数セットに"
"変数が不足しています。\n"
"予約された変数: {enqueued_set}\n"
"最適化する変数: {history_set}\n"
"足りない変数: {lacked_set}",
enqueued_set=enqueued_set,
history_set=history_set,
lacked_set=history_set - enqueued_set,
)
)
# remove duplicated enqueued trials
self._setup_enqueued_trials()
self._done_setup_before_parallel = True
def _setup_after_parallel(self):
pass
def _finalize(self):
# check sub fidelity models
if self.sub_fidelity_models is None:
self.sub_fidelity_models = SubFidelityModels()
for sub_fidelity_model in self.sub_fidelity_models.values():
assert sub_fidelity_model.objectives.keys() == self.objectives.keys()
assert sub_fidelity_model.constraints.keys() == self.constraints.keys()
# finalize
self._refresh_problem()
self.variable_manager.resolve()
self._finalize_history()
# setup if needed
self._setup_before_parallel()
self._setup_after_parallel()
[docs]
class SubFidelityModel(AbstractOptimizer):
pass
class SubFidelityModels(dict[str, SubFidelityModel]):
def _update(self, name, model: SubFidelityModel, fidelity: Fidelity):
model.sub_fidelity_name = name
model.fidelity = fidelity
self.update({name: model})