from __future__ import annotations
import platform
from typing import Callable, Sequence
import os
import sys
from time import sleep, time
from contextlib import nullcontext
from concurrent.futures import ThreadPoolExecutor
import sympy
import pyfemtet
from pyfemtet._i18n.messages import _
from pyfemtet._util.dask_util import *
from pyfemtet.opt.worker_status import *
from pyfemtet.opt.problem.problem import *
from pyfemtet.opt.problem.variable_manager import *
from pyfemtet.opt.interface import *
from pyfemtet.opt.optimizer import *
from pyfemtet.opt.optimizer._base_optimizer import DIRECTION, OptimizationDataPerFEM
from pyfemtet.opt.history import History
from pyfemtet.logger import get_module_logger
from pyfemtet.opt.visualization.history_viewer._process_monitor._application import (
process_monitor_main,
MonitorHostRecord
)
logger = get_module_logger('opt.femopt', False)
[docs]
class FEMOpt:
"""
A class to manage finite element method (FEM) optimization using a specified optimizer and FEM interface.
Attributes:
opt (AbstractOptimizer): The optimizer instance to be used for optimization.
monitor_info (dict[str, str | int | None]): Dictionary to store monitoring information such as host and port.
Args:
fem (AbstractFEMInterface, optional): An instance of a FEM interface. Defaults to None, in which case a FemtetInterface is used.
opt (AbstractOptimizer, optional): An optimizer instance. Defaults to None, in which case OptunaOptimizer is used.
"""
opt: AbstractOptimizer
def __init__(
self,
fem: AbstractFEMInterface = None,
opt: AbstractOptimizer = None,
):
self.opt: AbstractOptimizer = opt or OptunaOptimizer()
# fem が与えられていれば opt にセット
if fem is not None:
self.opt.fem = fem
# この時点で opt に fem がセットされていなければ
# デフォルトをセット
if len(self.opt.fem_manager.fems) == 0:
self.opt.fem = FemtetInterface()
# fem が正しくセットされているか確認
if len(self.opt.fem_manager.fems) == 0:
raise RuntimeError(
"FEM interface could not be initialized. "
"Please ensure that a valid FEM interface is provided or can be created."
)
self.monitor_info: dict[str, str | int | None] = dict(
host=None, port=None,
)
[docs]
def add_fem(self, fem: AbstractFEMInterface) -> OptimizationDataPerFEM:
return self.opt.fem_manager.append(fem)
[docs]
def add_constant_value(
self,
name: str,
value: SupportedVariableTypes,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
):
self.opt.add_constant_value(name, value, properties, pass_to_fem=pass_to_fem)
[docs]
def add_parameter(
self,
name: str,
initial_value: float | None = None,
lower_bound: float | None = None,
upper_bound: float | None = None,
step: float | None = None,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
fix: bool = False,
) -> None:
self.opt.add_parameter(name, initial_value, lower_bound, upper_bound, step, properties, pass_to_fem=pass_to_fem, fix=fix)
[docs]
def add_expression_string(
self,
name: str,
expression_string: str,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
) -> None:
self.opt.add_expression_string(name, expression_string, properties, pass_to_fem=pass_to_fem)
[docs]
def add_expression_sympy(
self,
name: str,
sympy_expr: sympy.Expr,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
) -> None:
self.opt.add_expression_sympy(name, sympy_expr, properties, pass_to_fem=pass_to_fem)
[docs]
def add_expression(
self,
name: str,
fun: Callable[..., float],
properties: dict[str, ...] | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
*,
pass_to_fem: bool = True,
) -> None:
self.opt.add_expression(name, fun, properties, args, kwargs, pass_to_fem=pass_to_fem)
[docs]
def add_categorical_parameter(
self,
name: str,
initial_value: SupportedVariableTypes | None = None,
choices: list[SupportedVariableTypes] | None = None,
properties: dict[str, ...] | None = None,
*,
pass_to_fem: bool = True,
fix: bool = False,
) -> None:
self.opt.add_categorical_parameter(name, initial_value, choices, properties, pass_to_fem=pass_to_fem, fix=fix)
[docs]
def add_objective(
self,
name: str,
fun: Callable[..., float],
direction: DIRECTION = 'minimize',
args: tuple | None = None,
kwargs: dict | None = None,
) -> None:
self.opt.add_objective(name, fun, direction, args, kwargs)
[docs]
def add_objectives(
self,
names: str | list[str],
fun: Callable[..., Sequence[float]],
n_return: int,
directions: DIRECTION | Sequence[DIRECTION] = None,
args: tuple | None = None,
kwargs: dict | None = None,
):
self.opt.add_objectives(names, fun, n_return, directions, args, kwargs)
[docs]
def add_constraint(
self,
name: str,
fun: Callable[..., float],
lower_bound: float | None = None,
upper_bound: float | None = None,
args: tuple | None = None,
kwargs: dict | None = None,
strict: bool = True,
using_fem: bool | None = None,
):
self.opt.add_constraint(name, fun, lower_bound, upper_bound, args, kwargs, strict, using_fem)
[docs]
def add_other_output(
self,
name: str,
fun: Callable[..., float],
args: tuple | None = None,
kwargs: dict | None = None,
):
self.opt.add_other_output(name, fun, args, kwargs)
[docs]
def add_trial(
self,
parameters: dict[str, SupportedVariableTypes],
):
self.opt.add_trial(parameters)
[docs]
def add_sub_fidelity_model(
self,
name: str,
sub_fidelity_model: SubFidelityModel,
fidelity: Fidelity,
):
self.opt.add_sub_fidelity_model(name, sub_fidelity_model, fidelity)
[docs]
def set_termination_condition(
self,
func: Callable[[History], bool] | None,
):
self.opt.set_termination_condition(func)
[docs]
def set_monitor_host(self, host: str = None, port: int = None):
"""Sets the host IP address and the port of the process monitor.
Args:
host (str):
The hostname or IP address of the monitor server.
port (int, optional):
The port number of the monitor server.
If None, ``8080`` will be used.
Defaults to None.
Tip:
Specifying host ``0.0.0.0`` allows viewing monitor
from all computers on the local network.
If no hostname is specified, the monitor server
will be hosted on ``localhost``.
We can access process monitor by accessing
```localhost:8080``` on our browser by default.
"""
if host is not None:
self.monitor_info.update(host=host)
if port is not None:
self.monitor_info.update(port=port)
[docs]
def set_random_seed(self, seed: int):
self.opt.seed = seed
[docs]
def optimize(
self,
n_trials: int = None,
n_parallel: int = 1,
timeout: float = None,
wait_setup: bool = True,
confirm_before_exit: bool = True,
history_path: str = None,
with_monitor: bool = True,
scheduler_address: str = None,
seed: int | None = None,
):
# ===== show initialize info =====
logger.info(
_(
'===== pyfemtet version {ver} =====',
ver=pyfemtet.__version__,
)
)
client: Client
# set arguments
self.opt.n_trials = n_trials or self.opt.n_trials
self.opt.timeout = timeout or self.opt.timeout
self.opt.history.path = history_path or self.opt.history.path
if seed is not None:
self.opt.seed = seed
# construct opt workers
n_using_cluster_workers = n_parallel # workers excluding main
if scheduler_address is None:
n_using_cluster_workers = n_using_cluster_workers - 1
worker_name_base = 'Sub'
if n_parallel == 1:
cluster = nullcontext()
# noinspection PyTypeChecker
client = DummyClient()
else:
logger.info(_('Launching processes...'))
cluster = LocalCluster(
n_workers=n_parallel - 1,
threads_per_worker=1 if n_parallel > 1 else None,
processes=True if n_parallel > 1 else False,
)
logger.info(_('Connecting cluster...'))
client = Client(
cluster,
)
else:
worker_name_base = 'Remote Worker'
cluster = nullcontext()
client = Client(scheduler_address)
# construct other workers
main_worker_names = list()
logger.info(_('Launching threads...'))
executor_workers = 2 # save_history, watch_status
if with_monitor:
executor_workers += 1 # monitor
if scheduler_address is None:
executor_workers += 1 # main
main_worker_names.append('Main')
executor = ThreadPoolExecutor(
max_workers=executor_workers,
thread_name_prefix='thread_worker'
)
with cluster, client, self.opt.history, executor:
logger.info(_('Setting up...'))
# finalize history
self.opt._refresh_problem()
self.opt.variable_manager.resolve()
self.opt._finalize_history()
# optimizer-specific setup after history finalized
self.opt._setup_before_parallel()
# setup FEM (mainly for distributing files)
self.opt.fem_manager.all_fems_as_a_fem._setup_before_parallel()
# create worker status list
entire_status = WorkerStatus(ENTIRE_PROCESS_STATUS_KEY)
assert n_parallel == len(main_worker_names) + n_using_cluster_workers
worker_status_list = []
for i in range(n_parallel):
worker_status = WorkerStatus(f'worker-status-{i}')
worker_status.value = WorkerStatus.initializing
worker_status_list.append(worker_status)
entire_status.value = WorkerStatus.initializing
# Get workers and Assign roles
opt_worker_addresses = list(client.scheduler_info()['workers'].keys())[:n_using_cluster_workers]
opt_worker_names = [f'{worker_name_base} {i+1}' for i in range(n_using_cluster_workers)]
# Setting up monitor
if with_monitor:
logger.info(_('Launching Monitor...'))
monitor_host_record = MonitorHostRecord()
# noinspection PyTypeChecker,PyUnusedLocal
monitor_future = executor.submit(
process_monitor_main,
history=self.opt.history,
status=entire_status,
worker_addresses=main_worker_names + opt_worker_addresses,
worker_names=main_worker_names + opt_worker_names,
worker_status_list=worker_status_list,
host=self.monitor_info['host'],
port=self.monitor_info['port'],
host_record=monitor_host_record,
)
else:
monitor_future = None
monitor_host_record = None
logger.info(_('Setting up optimization problem...'))
entire_status.value = WorkerStatus.running
# Run on cluster
futures = client.map(
self.opt._run,
# Arguments of func
range(n_using_cluster_workers), # worker_index
opt_worker_names, # worker_name
[self.opt.history] * n_using_cluster_workers, # history
[entire_status] * n_using_cluster_workers, # entire_status
worker_status_list[:n_using_cluster_workers], # worker_status
[worker_status_list] * n_using_cluster_workers, # worker_status_list
[wait_setup] * n_using_cluster_workers, # wait_other_process_setup
# Arguments of map
workers=opt_worker_addresses,
allow_other_workers=False,
)
# Run on main process
assert len(main_worker_names) == 0 or len(main_worker_names) == 1
if len(main_worker_names) == 1:
# noinspection PyTypeChecker
future = executor.submit(
self.opt._run,
main_worker_names[0],
main_worker_names[0],
self.opt.history,
entire_status,
worker_status_list[n_using_cluster_workers:][0],
worker_status_list,
wait_setup
)
else:
class DummyFuture:
def result(self):
pass
future = DummyFuture()
# Saving history
def save_history():
while True:
sleep(2)
if len(self.opt.history.get_df()) > 0:
try:
self.opt.history.save()
logger.debug('History saved!')
except PermissionError:
logger.error(
_('Cannot save history. '
'The most common reason is '
'that the csv is opened by '
'another program (such as Excel). '
'Please free {path} or lost the '
'optimization history.',
path=self.opt.history.path)
)
if entire_status.value >= WorkerStatus.finished:
break
logger.debug('History save thread finished!')
future_saving = executor.submit(save_history, )
# Watching
def watch_worker_status():
while True:
sleep(1)
logger.debug([s.value for s in worker_status_list])
if all([s.value >= WorkerStatus.finished for s in worker_status_list]):
break
if entire_status.value < WorkerStatus.finished:
entire_status.value = WorkerStatus.finished
logger.debug('All workers finished!')
future_watching = executor.submit(watch_worker_status, )
if monitor_host_record is not None:
# update additional_data of history
# to notify how to emit interruption
# signal by external processes
monitor_record_wait_start = time()
while len(monitor_host_record.get()) == 0:
sleep(0.1)
if time() - monitor_record_wait_start > 30:
logger.warning(_(
en_message='Getting monitor host information is '
'failed within 30 seconds. '
'It can not be able to terminate by '
'requesting POST '
'`<host>:<port>/interrupt` '
'by an external process.',
jp_message='モニターの情報取得が 30 秒以内に'
'終わりませんでした。最適化プロセスは、'
'外部プロセスから '
'`<host>:<port>/interrupt` に POST を'
'リクエストしても終了できない可能性が'
'あります。'
))
break
if len(monitor_host_record.get()) > 0:
self.opt.history.additional_data.update(
monitor_host_record.get()
)
# Terminating monitor even if exception is raised
class TerminatingMonitor:
def __init__(self, monitor_future_):
self.monitor_future_ = monitor_future_
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
# Send termination signal to monitor
# and wait to finish
# noinspection PyTypeChecker
entire_status.value = WorkerStatus.terminated
if self.monitor_future_ is not None:
self.monitor_future_.result()
with TerminatingMonitor(monitor_future):
# Wait to finish optimization
client.gather(futures)
future.result()
future_saving.result()
future_watching.result()
if confirm_before_exit:
confirm_msg = _(
en_message='The optimization is now complete. '
'You can view the results on the monitor '
'until you press Enter to exit the program.',
jp_message='最適化が終了しました。'
'プログラムを終了するまで、'
'結果をプロセスモニターで確認できます。'
)
result_viewer_msg = _(
en_message='After the program ends, '
'you can check the optimization results '
'using the result viewer.\n'
'The result viewer can be launched by '
'performing one of the following actions:\n'
'- {windows_only}Launch the `pyfemtet-opt-result-viewer` '
'shortcut on your desktop if exists.\n'
'- {windows_only}Launch {dir}.\n'
'- Execute "py -m pyfemtet.opt.visualization.history_viewer" '
'in the command line',
jp_message='プログラム終了後も、結果ビューワを使って最適化結果を'
'確認することができます。'
'結果ビューワは以下のいずれかを実施すると起動できます。\n'
'- {windows_only}デスクトップの pyfemtet-opt-result-viewer '
'ショートカットを起動する\n'
'- {windows_only}{dir} にある {filename} を起動する\n'
'- コマンドラインで「py -m pyfemtet.opt.visualization.history_viewer」'
'を実行する',
dir=os.path.abspath(os.path.dirname(sys.executable)),
filename='pyfemtet-opt-result-viewer.exe (or .cmd)',
windows_only='(Windows only) ' if platform.system() != 'Windows' else '',
)
print("====================")
print(confirm_msg)
print(result_viewer_msg)
print(_(
en_message='Press Enter to quit...',
jp_message='終了するには Enter を押してください...',
))
input()
df = self.opt.history.get_df()
logger.info(_('All processes are terminated.'))
return df
[docs]
def debug_1():
# noinspection PyUnresolvedReferences
from time import sleep
# from pyfemtet.opt.optimizer import InterruptOptimization
import optuna
from pyfemtet.opt.interface import AbstractFEMInterface, NoFEM
def _parabola(_fem: AbstractFEMInterface, _opt: AbstractOptimizer):
x = _opt.get_variables('values')
# print(os.getpid())
# raise RuntimeError
# raise Interrupt
# if get_worker() is None:
# raise RuntimeError
return (x ** 2).sum()
def _cns(_fem: AbstractFEMInterface, _opt: AbstractOptimizer):
x = _opt.get_variables('values')
return x[0]
_opt = OptunaOptimizer()
_opt.sampler = optuna.samplers.TPESampler(seed=42)
_opt.n_trials = 10
_fem = NoFEM()
_opt.fem = _fem
_args = (_opt,)
_opt.add_parameter('x1', 1, -1, 1, step=0.2)
_opt.add_parameter('x2', 1, -1, 1, step=0.2)
_opt.add_constraint('cns', _cns, lower_bound=-0.5, args=_args)
_opt.add_objective('obj', _parabola, args=_args)
_femopt = FEMOpt(fem=_fem, opt=_opt)
_femopt.opt = _opt
# _femopt.opt.history.path = 'v1test/femopt-restart-test.csv'
_femopt.optimize(n_parallel=2)
print(os.path.abspath(_femopt.opt.history.path))
[docs]
def substrate_size(Femtet):
"""基板のXY平面上での専有面積を計算します。"""
substrate_w = Femtet.GetVariableValue('substrate_w')
substrate_d = Femtet.GetVariableValue('substrate_d')
# assert get_worker() is not None
return substrate_w * substrate_d # 単位: mm2
[docs]
def debug_2():
from pyfemtet.opt.interface import FemtetInterface
from pyfemtet.opt.optimizer import OptunaOptimizer
fem = FemtetInterface(
femprj_path=os.path.join(os.path.dirname(__file__), 'wat_ex14_parametric_jp.femprj'),
)
opt = OptunaOptimizer()
opt.fem = fem
opt.add_parameter(name="substrate_w", initial_value=40, lower_bound=22, upper_bound=60)
opt.add_parameter(name="substrate_d", initial_value=60, lower_bound=34, upper_bound=60)
opt.add_objective(name='基板サイズ(mm2)', fun=substrate_size)
opt.add_objective(name='obj2', fun=substrate_size)
opt.add_objective(name='obj3', fun=substrate_size)
opt.n_trials = 10
# opt.history.path = os.path.join(os.path.dirname(__file__), 'femtet-test.csv')
femopt = FEMOpt()
femopt.opt = opt
femopt.optimize(n_parallel=1)
[docs]
def debug_3():
# noinspection PyUnresolvedReferences
from time import sleep
# from pyfemtet.opt.optimizer import InterruptOptimization
import optuna
from pyfemtet.opt.interface import AbstractFEMInterface, NoFEM
def _parabola(_fem: AbstractFEMInterface, _opt: AbstractOptimizer):
x = _opt.get_variables('values')
# print(os.getpid())
# raise RuntimeError
# raise Interrupt
# if get_worker() is None:
# raise RuntimeError
return (x ** 2).sum()
def _cns(_fem: AbstractFEMInterface, _opt: AbstractOptimizer):
x = _opt.get_variables('values')
return x[0]
_opt = OptunaOptimizer()
_opt.sampler = optuna.samplers.TPESampler(seed=42)
_fem = NoFEM()
_opt.fem = _fem
_args = (_opt,)
_opt.add_parameter('x1', 1, -1, 1, step=0.2)
_opt.add_parameter('x2', 1, -1, 1, step=0.2)
_opt.add_constraint('cns', _cns, lower_bound=-0.5, args=_args)
_opt.add_objective('obj', _parabola, args=_args)
_femopt = FEMOpt(fem=_fem, opt=_opt)
_femopt.optimize(
scheduler_address='<dask scheduler で起動したスケジューラの tcp をここに入力>',
n_trials=80,
n_parallel=6,
with_monitor=True,
confirm_before_exit=False,
)
if __name__ == '__main__':
# for i in range(1):
debug_1()
# debug_2()
# debug_3()