from __future__ import annotations
from typing import TYPE_CHECKING, TypeAlias, Literal, Sequence
import os
import csv
import ast
import math
import json
import datetime
import dataclasses
from time import sleep, time
import numpy as np
import pandas as pd
import pyfemtet
from pyfemtet._i18n import *
from pyfemtet._util.helper import generate_random_id
from pyfemtet._util.df_util import *
from pyfemtet._util.dask_util import *
from pyfemtet._util.str_enum import StrEnum
from pyfemtet.opt.exceptions import *
from pyfemtet.opt.problem.problem import *
from pyfemtet.opt.problem.variable_manager import *
from pyfemtet.logger import get_module_logger
from pyfemtet.opt.history._optimality import *
from pyfemtet.opt.history._hypervolume import *
if TYPE_CHECKING:
from pyfemtet.opt.interface import AbstractFEMInterface, FEMListInterface
__all__ = [
"TrialState",
"History",
"ColumnOrderMode",
"Record",
"create_err_msg_from_exception",
"CorrespondingColumnNameRuler",
"MAIN_FILTER",
"get_trial_name",
]
MAIN_FILTER: dict = {
"sub_fidelity_name": MAIN_FIDELITY_NAME,
"sub_sampling": float("nan"),
}
logger = get_module_logger("opt.history", False)
logger_dask = get_module_logger("opt.dask", False)
[docs]
def get_trial_name(trial=None, fidelity=None, sub_sampling=None, row: pd.Series = None):
if row is not None:
assert not math.isnan(row["trial"])
trial = row["trial"]
fidelity = row["fidelity"] if not math.isnan(row["fidelity"]) else None
sub_sampling = (
row["sub_sampling"] if not math.isnan(row["sub_sampling"]) else None
)
name_parts = ["trial"]
if fidelity is not None:
fid = str(fidelity)
if fid != MAIN_FIDELITY_NAME:
name_parts.append(fid)
name_parts.append(str(trial))
if sub_sampling is not None:
name_parts.append(str(sub_sampling))
trial_name = "_".join(name_parts)
return trial_name
def _assert_locked_with_timeout(lock, assertion_message=None, timeout=10.0):
start = time()
while not lock.locked():
sleep(0.5)
if time() - start > timeout:
assert False, assertion_message or "Lock is not acquired."
logger_dask.debug("Lock is not acquired. Retry to check locked.")
class MetaColumnNames(StrEnum):
prm_num_value = "prm.num.value"
prm_cat_value = "prm.cat.value"
def create_err_msg_from_exception(e: Exception):
""":meta private:"""
additional = " ".join(map(str, e.args))
if additional == "":
return type(e).__name__
else:
return type(e).__name__ + f"({additional})"
[docs]
class TrialState(StrEnum):
succeeded = "Success"
skipped = "Skip"
hard_constraint_violation = "Hard constraint violation"
soft_constraint_violation = "Soft constraint violation"
# Hidden Constraint
model_error = "Model error"
mesh_error = "Mesh error"
solve_error = "Solve error"
post_error = "Post-processing error"
unknown_error = "Unknown error"
undefined = "undefined"
@staticmethod
def get_corresponding_state_from_exception(e: Exception) -> TrialState:
""":meta private:"""
if isinstance(e, ModelError):
state = TrialState.model_error
elif isinstance(e, MeshError):
state = TrialState.mesh_error
elif isinstance(e, SolveError):
state = TrialState.solve_error
elif isinstance(e, PostProcessError):
state = TrialState.post_error
elif isinstance(e, HardConstraintViolation):
state = TrialState.hard_constraint_violation
elif isinstance(e, SkipSolve):
state = TrialState.skipped
else:
state = TrialState.unknown_error
return state
@staticmethod
def get_corresponding_exception_from_state(state: TrialState) -> Exception | None:
""":meta private:"""
if state == TrialState.model_error:
e = ModelError()
elif state == TrialState.mesh_error:
e = MeshError()
elif state == TrialState.solve_error:
e = SolveError()
elif state == TrialState.post_error:
e = PostProcessError()
elif state == TrialState.unknown_error:
e = Exception()
elif state == TrialState.hard_constraint_violation:
e = HardConstraintViolation()
elif state == TrialState.skipped:
e = SkipSolve()
else:
e = None
return e
@classmethod
def get_hidden_constraint_violation_states(cls):
""":meta private:"""
return [
cls.get_corresponding_state_from_exception(exception_type())
for exception_type in _HiddenConstraintViolation.__pyfemtet_subclasses__
]
class DataFrameWrapper:
""":meta private:"""
__df: pd.DataFrame
_lock_name = "edit-df"
_dataset_name: str
_scheduler_address: str
def __init__(self, df: pd.DataFrame):
self._dataset_name = "df-" + generate_random_id()
self._scheduler_address = None
self.set_df(df)
def __len__(self):
return len(self.get_df())
def __str__(self):
return self.get_df().__str__()
@property
def lock(self):
return Lock(self._lock_name)
def get_df(self, equality_filters: dict = None) -> pd.DataFrame:
"""
Args:
equality_filters (dict, optional):
{column: value} formatted dict.
Each condition is considered as
an 'and' condition.
Defaults to no filter.
Returns (pd.DataFrame):
"""
client = get_client(self._scheduler_address)
# dask クラスターがある場合
if client is not None:
# あるけど with を抜けている場合
if client.scheduler is None:
df = self.__df
# 健在の場合
else:
self._scheduler_address = client.scheduler.address
df = None
with Lock("access_dataset_df"):
# datasets 内に存在する場合
if self._dataset_name in client.list_datasets():
df = client.get_dataset(self._dataset_name)
# 存在しない場合は publish する
else:
df = self.__df
client.publish_dataset(**{self._dataset_name: df})
sleep(0.1)
assert df is not None
# dask クラスターがない場合
else:
df = self.__df
# filter に合致するものを取得
if equality_filters is not None:
df = get_partial_df(df, equality_filters)
return df
def set_df(self, df, equality_filters: dict = None):
"""
Args:
df:
equality_filters (dict, optional):
{column: value} formatted dict.
Each condition is considered as
an 'and' condition.
Only the indexed rows will be updated.
Defaults to no filter.
Returns (pd.DataFrame):
"""
# フィルタを適用
# partial_df を get_df した時点のものから
# 変わっていたらエラーになる
if equality_filters is not None:
_assert_locked_with_timeout(
self.lock, "set_df() with equality_filters must be called with locking."
)
partial_df = df
df = self.get_df()
apply_partial_df(df, partial_df, equality_filters)
# dask クラスター上のデータを更新
client = get_client(self._scheduler_address)
if client is not None:
if client.scheduler is not None:
self._scheduler_address = client.scheduler.address
with Lock("access_dataset_df", client):
# datasets 上に存在する場合は削除(上書きができない)
if self._dataset_name in client.list_datasets():
# remove
client.unpublish_dataset(self._dataset_name)
# update
client.publish_dataset(**{self._dataset_name: df})
sleep(0.1)
# local のデータを更新
self.__df = df
def start_dask(self):
# Register the df initialized before dask context.
self.set_df(self.__df)
def end_dask(self):
# Get back the df on dask to use the value outside
# dask context.
self.__df = self.get_df()
class CorrespondingColumnNameRuler:
""":meta private:"""
@staticmethod
def cns_lower_bound_name(cns_name):
return cns_name + "_lower_bound"
@staticmethod
def cns_upper_bound_name(cns_name):
return cns_name + "_upper_bound"
@staticmethod
def direction_name(obj_name):
return obj_name + "_direction"
@staticmethod
def prm_lower_bound_name(prm_name):
return prm_name + "_lower_bound"
@staticmethod
def prm_upper_bound_name(prm_name):
return prm_name + "_upper_bound"
@staticmethod
def prm_choices_name(prm_name):
return prm_name + "_choices"
@staticmethod
def prm_step_name(prm_name):
return prm_name + "_step"
[docs]
class ColumnOrderMode(StrEnum):
"""The order rule of the history csv columns."""
per_category = "per_category" #: Sort per each object.
important_first = (
"important_first" #: The values of parameters and objectives first.
)
ColumnOrderModeStr: TypeAlias = Literal["per_category", "important_first"]
class DuplicatedColumnNameError(Exception):
""":meta private:"""
class NoDuplicateDict(dict):
def update(self, m: dict, /, **kwargs):
for key_ in m.keys():
if key_ in self.keys():
raise DuplicatedColumnNameError(
_(
en_message="The name `{name}` is duplicated. "
"Please use another name.",
jp_message="名前 「{name}」 が重複しています。"
"別の名前を使ってください。",
name=key_,
)
)
super().update(m, **kwargs)
class ColumnManager:
""":meta private:"""
parameters: TrialInput
y_names: list[str]
c_names: list[str]
other_output_names: list[str]
column_dtypes: dict[str, type]
meta_columns: list[str]
@staticmethod
def columns_to_keep_even_if_nan():
return [
"messages",
]
def initialize(
self,
parameters: TrialInput,
y_names,
c_names,
other_output_names,
additional_data: dict,
column_order_mode: str = ColumnOrderMode.per_category,
):
self.parameters = parameters
self.y_names = y_names
self.c_names = c_names
self.other_output_names = other_output_names
self.set_full_sorted_column_information(
additional_data=additional_data,
column_order_mode=column_order_mode,
)
def set_full_sorted_column_information(
self,
extra_parameters: TrialInput = None,
extra_y_names: list[str] = None,
extra_c_names: list[str] = None,
extra_other_output_names: list[str] = None,
additional_data: dict = None,
column_order_mode: str = ColumnOrderMode.per_category,
):
extra_parameters = extra_parameters or TrialInput()
extra_y_names = extra_y_names or []
extra_c_names = extra_c_names or []
extra_other_output_names = extra_other_output_names or []
# column name になるので重複は許されない
column_dtypes: dict = NoDuplicateDict()
meta_columns: list = []
column_dtypes_later: dict = NoDuplicateDict()
meta_columns_later: list = []
if column_order_mode == ColumnOrderMode.per_category:
target_cds: dict = column_dtypes
target_mcs: list = meta_columns
elif column_order_mode == ColumnOrderMode.important_first:
target_cds: dict = column_dtypes_later
target_mcs: list = meta_columns_later
else:
assert False, f"Unknown {column_order_mode=}"
# noinspection PyUnresolvedReferences
keys = Record.__dataclass_fields__.copy().keys()
for key in keys:
# Note:
# as_df() で空欄になりうるカラムには
# Nan や '' を許容する dtype を指定すること
# 例えば、 trial に int を指定してはいけない
#
# Note:
# pandas は column_dtypes に str を受け付けない
# (object にキャストされる模様)
if key == "x":
for prm_name in self.parameters.keys():
param = self.parameters[prm_name]
if isinstance(param, NumericParameter):
# important
column_dtypes.update({prm_name: float})
meta_columns.append(MetaColumnNames.prm_num_value.value)
# later
f = CorrespondingColumnNameRuler.prm_lower_bound_name
target_cds.update({f(prm_name): float})
target_mcs.append("prm.num.lower_bound")
f = CorrespondingColumnNameRuler.prm_upper_bound_name
target_cds.update({f(prm_name): float})
target_mcs.append("prm.num.upper_bound")
f = CorrespondingColumnNameRuler.prm_step_name
target_cds.update({f(prm_name): float})
target_mcs.append("prm.num.step")
elif isinstance(param, CategoricalParameter):
# important
column_dtypes.update({prm_name: object})
meta_columns.append(MetaColumnNames.prm_cat_value.value)
# later
f = CorrespondingColumnNameRuler.prm_choices_name
target_cds.update({f(prm_name): object})
target_mcs.append("prm.cat.choices")
else:
raise NotImplementedError
for extra_prm_name, extra_param in extra_parameters.items():
if isinstance(extra_param, NumericParameter):
# later
target_cds.update({extra_prm_name: float})
target_mcs.append("")
f = CorrespondingColumnNameRuler.prm_lower_bound_name
target_cds.update({f(extra_prm_name): object})
target_mcs.append("")
f = CorrespondingColumnNameRuler.prm_upper_bound_name
target_cds.update({f(extra_prm_name): object})
target_mcs.append("")
elif isinstance(extra_param, CategoricalParameter):
target_cds.update({extra_prm_name: object})
target_mcs.append("")
f = CorrespondingColumnNameRuler.prm_choices_name
target_cds.update({f(extra_prm_name): object})
target_mcs.append("")
else:
raise NotImplementedError
elif key == "y":
f = CorrespondingColumnNameRuler.direction_name
for name in self.y_names:
# important
column_dtypes.update({name: float})
meta_columns.append("obj")
# later
target_cds.update({f(name): object}) # str | float
target_mcs.append("obj.direction")
for name in extra_y_names:
# later
target_cds.update({name: float})
target_mcs.append("")
# later
target_cds.update({f(name): object}) # str | float
target_mcs.append("")
elif key == "c":
f_lb = CorrespondingColumnNameRuler.cns_lower_bound_name
f_ub = CorrespondingColumnNameRuler.cns_upper_bound_name
for name in self.c_names:
# important
column_dtypes.update({name: float})
meta_columns.append("cns")
# later
target_cds.update({f_lb(name): float})
target_mcs.append("cns.lower_bound")
# later
target_cds.update({f_ub(name): float})
target_mcs.append("cns.upper_bound")
for name in extra_c_names:
# later
target_cds.update({name: float})
target_mcs.append("")
# later
target_cds.update({f_lb(name): float})
target_mcs.append("")
# later
target_cds.update({f_ub(name): float})
target_mcs.append("")
elif key == "other_outputs":
for name in self.other_output_names:
# important
column_dtypes.update({name: float})
meta_columns.append("other_output.value")
for name in extra_other_output_names:
# later
target_cds.update({name: float})
target_mcs.append("")
# additional_data を入れる
elif key == self._get_additional_data_column():
# important
column_dtypes.update({key: object})
meta_columns.append(json.dumps(additional_data or dict()))
elif key in (
"feasibility",
"optimality",
"sub_sampling",
"sub_fidelity_name",
):
# important
column_dtypes.update({key: object})
meta_columns.append("")
else:
# later
target_cds.update({key: object})
target_mcs.append("")
column_dtypes.update(column_dtypes_later)
meta_columns.extend(meta_columns_later)
self.column_dtypes = dict(**column_dtypes)
self.meta_columns = meta_columns
@staticmethod
def _get_additional_data_column():
return "trial"
@classmethod
def _get_additional_data(cls, columns, meta_columns) -> dict:
for column, meta_column in zip(columns, meta_columns):
if column == cls._get_additional_data_column():
if meta_column:
return json.loads(meta_column)
else:
return json.loads("{}")
else:
raise RuntimeError(
f'"{cls._get_additional_data_column()}" is not found in given columns.'
)
@staticmethod
def _filter_columns(meta_column, columns, meta_columns) -> list[str]:
out = []
assert len(columns) == len(meta_columns), (
f"{len(columns)=} and {len(meta_columns)=}"
)
for i, (column_, meta_column_) in enumerate(zip(columns, meta_columns)):
if meta_column_ == meta_column:
out.append(column_)
return out
@classmethod
def _filter_prm_names(cls, columns, meta_columns) -> list[str]:
return cls._filter_columns(
"prm.num.value", columns, meta_columns
) + cls._filter_columns("prm.cat.value", columns, meta_columns)
def filter_columns(self, meta_column) -> list[str]:
columns = list(self.column_dtypes.keys())
return self._filter_columns(meta_column, columns, self.meta_columns)
def get_prm_names(self) -> list[str]:
return self.filter_columns("prm.num.value") + self.filter_columns(
"prm.cat.value"
)
def get_obj_names(self) -> list[str]:
return self.filter_columns("obj")
def get_cns_names(self) -> list[str]:
return self.filter_columns("cns")
def get_other_output_names(self) -> list[str]:
return self.filter_columns("other_output")
@staticmethod
def _is_numerical_parameter(prm_name, columns, meta_columns):
col_index = tuple(columns).index(prm_name)
meta_column = meta_columns[col_index]
return meta_column == MetaColumnNames.prm_num_value
@staticmethod
def _is_categorical_parameter(prm_name, columns, meta_columns):
col_index = tuple(columns).index(prm_name)
meta_column = meta_columns[col_index]
return meta_column == MetaColumnNames.prm_cat_value
def is_numerical_parameter(self, prm_name) -> bool:
return self._is_numerical_parameter(
prm_name, tuple(self.column_dtypes.keys()), self.meta_columns
)
def is_categorical_parameter(self, prm_name) -> bool:
return self._is_categorical_parameter(
prm_name, tuple(self.column_dtypes.keys()), self.meta_columns
)
@staticmethod
def _get_parameter(prm_name: str, df: pd.DataFrame, meta_columns) -> Parameter:
if ColumnManager._is_numerical_parameter(prm_name, df.columns, meta_columns):
out = NumericParameter()
out.name = prm_name
out.value = float(df[prm_name].dropna().values[-1])
# lower_bound
key = CorrespondingColumnNameRuler.prm_lower_bound_name(prm_name)
if key in df.columns:
out.lower_bound = float(df[key].dropna().values[-1])
else:
out.lower_bound = None
# upper bound
key = CorrespondingColumnNameRuler.prm_upper_bound_name(prm_name)
if key in df.columns:
out.upper_bound = float(df[key].dropna().values[-1])
else:
out.upper_bound = None
# step
key = CorrespondingColumnNameRuler.prm_step_name(prm_name)
if key in df.columns:
out.step = float(df[key].dropna().values[-1])
else:
out.step = None
elif ColumnManager._is_categorical_parameter(
prm_name, df.columns, meta_columns
):
out = CategoricalParameter()
out.name = prm_name
out.value = str(df[prm_name].dropna().values[-1])
out.choices = (
df[CorrespondingColumnNameRuler.prm_choices_name(prm_name)]
.dropna()
.values[-1]
)
else:
raise NotImplementedError
return out
@staticmethod
def _reconvert_objects(df: pd.DataFrame, meta_columns: list[str]):
for column, meta_column in zip(df.columns, meta_columns):
# messages は df の段階で _RECORD_MESSAGE_DELIMITER
# separated な str なのでここで restore してはいけない
# choices list は csv を経由することで str になるので restore
if meta_column == "prm.cat.choices":
df[column] = [ast.literal_eval(d) for d in df[column]]
@staticmethod
def _get_sub_fidelity_names(df: pd.DataFrame) -> list[str]:
if "sub_fidelity_name" not in df.columns:
return [MAIN_FIDELITY_NAME]
else:
return np.unique(df["sub_fidelity_name"].values).tolist()
_RECORD_MESSAGE_DELIMITER = " | "
@dataclasses.dataclass
class Record:
""":meta private:"""
# x, y, c のみ特殊で、データの展開や関連情報の
# 列への展開を必要とするが、他の field は
# ここに定義すればよい
trial: int = None
trial_id: int = None
sub_sampling: SubSampling | None = None
sub_fidelity_name: str = None
fidelity: Fidelity = None
x: TrialInput = dataclasses.field(default_factory=TrialInput)
y: TrialOutput = dataclasses.field(default_factory=TrialOutput)
c: TrialConstraintOutput = dataclasses.field(default_factory=TrialConstraintOutput)
other_outputs: TrialFunctionOutput = dataclasses.field(
default_factory=TrialFunctionOutput
)
state: TrialState = TrialState.undefined
datetime_start: datetime.datetime = dataclasses.field(
default_factory=datetime.datetime.now
)
datetime_end: datetime.datetime = dataclasses.field(
default_factory=datetime.datetime.now
)
messages: list = dataclasses.field(default_factory=list)
hypervolume: float | None = None
feasibility: bool | None = None
optimality: bool | None = None
def as_df(self, dtypes: dict = None):
# noinspection PyUnresolvedReferences
keys = self.__dataclass_fields__.copy().keys()
d = {key: getattr(self, key) for key in keys if getattr(self, key) is not None}
x: TrialInput = d.pop("x")
y: TrialOutput = d.pop("y")
c: TrialConstraintOutput = d.pop("c")
other_outputs: TrialFunctionOutput = d.pop("other_outputs")
# prm
for prm_name, param in x.items():
d.update({prm_name: param.value})
if isinstance(param, NumericParameter):
f = CorrespondingColumnNameRuler.prm_lower_bound_name
d.update({f(prm_name): param.lower_bound})
f = CorrespondingColumnNameRuler.prm_upper_bound_name
d.update({f(prm_name): param.upper_bound})
f = CorrespondingColumnNameRuler.prm_step_name
d.update({f(prm_name): param.step})
elif isinstance(param, CategoricalParameter):
f = CorrespondingColumnNameRuler.prm_choices_name
d.update({f(prm_name): param.choices})
else:
raise NotImplementedError
# messages to str
messages_str = _RECORD_MESSAGE_DELIMITER.join(d["messages"])
d.update({"messages": messages_str})
# obj
d.update(**{k: v.value for k, v in y.items()})
d.update(
**{
f"{CorrespondingColumnNameRuler.direction_name(k)}": v.direction
for k, v in y.items()
}
)
# cns
d.update(**{k: v.value for k, v in c.items()})
f_lb = CorrespondingColumnNameRuler.cns_lower_bound_name
d.update(**{f"{f_lb(k)}": v.lower_bound for k, v in c.items()})
f_ub = CorrespondingColumnNameRuler.cns_upper_bound_name
d.update(**{f"{f_ub(k)}": v.upper_bound for k, v in c.items()})
# function
d.update(**{k: v.value for k, v in other_outputs.items()})
df = pd.DataFrame({k: [v] for k, v in d.items()}, columns=tuple(dtypes.keys()))
if dtypes:
df = df.astype(dtypes)
return df
@staticmethod
def get_state_str_from_series(row: pd.Series):
state: TrialState = TrialState.undefined
if "state" in row:
state = row["state"]
return state
class EntireDependentValuesCalculator:
""":meta private:"""
def __init__(
self,
records: Records,
equality_filters: dict,
entire_df: pd.DataFrame,
):
self.records = records
self.equality_filters = equality_filters
self.entire_df: pd.DataFrame = entire_df
self.partial_df: pd.DataFrame = get_partial_df(entire_df, equality_filters)
_assert_locked_with_timeout(self.records.df_wrapper.lock)
# get column names
obj_names = self.records.column_manager.get_obj_names()
f = CorrespondingColumnNameRuler.direction_name
obj_direction_names = [f(name) for name in obj_names]
# get values
all_obj_values = self.partial_df[obj_names].values
all_obj_directions = self.partial_df[obj_direction_names].values
feasibility = self.partial_df["feasibility"]
# convert values as minimization problem
y_internal = np.empty(all_obj_values.shape)
for i, (obj_values, obj_directions) in enumerate(
zip(all_obj_values.T, all_obj_directions.T)
):
y_internal[:, i] = np.array(
list(
map(
lambda args: Objective._convert(*args),
zip(obj_values, obj_directions),
)
)
)
self.partial_y_internal = y_internal
self.partial_feasibility = feasibility
def update_optimality(self, rtol=0.01):
_assert_locked_with_timeout(self.records.df_wrapper.lock)
# calc optimality
optimality = calc_optimality(
self.partial_y_internal,
self.partial_feasibility,
rtol=rtol,
)
# update
self.partial_df.loc[:, "optimality"] = optimality
def update_hypervolume(self, rtol):
_assert_locked_with_timeout(self.records.df_wrapper.lock)
# calc hypervolume
hv_values = calc_hypervolume(
self.partial_y_internal,
self.partial_feasibility,
ref_point="optuna-nadir",
rtol=rtol,
)
# update
self.partial_df.loc[:, "hypervolume"] = hv_values
def update_trial_number(self):
_assert_locked_with_timeout(self.records.df_wrapper.lock)
# calc trial
trial_number = 1 + np.arange(len(self.partial_df)).astype(int)
# update
self.partial_df.loc[:, "trial"] = trial_number
class Records:
""":meta private:
最適化の試行全体の情報を格納するモデルクラス
"""
df_wrapper: DataFrameWrapper
column_manager: ColumnManager
def __init__(self):
self.df_wrapper = DataFrameWrapper(pd.DataFrame())
self.column_manager = ColumnManager()
self.loaded_meta_columns = None
self.loaded_df = None
def __str__(self):
return self.df_wrapper.__str__()
def __len__(self):
return len(self.df_wrapper)
def initialize(self):
with self.df_wrapper.lock:
# 新しく始まる場合に備えカラムを設定
# load の場合はあとで上書きされる
df = pd.DataFrame(
[], columns=list(self.column_manager.column_dtypes.keys())
)
self.df_wrapper.set_df(df)
def load(self, path: str):
for encoding in (ENCODING, "utf-8"):
try:
with open(path, "r", encoding=encoding, newline="\n") as f:
reader = csv.reader(f, delimiter=",")
# load meta_column
loaded_meta_columns = reader.__next__()
reader.__next__() # empty line
# load df from line 3
loaded_df = pd.read_csv(f, encoding=encoding, header=0)
break
except UnicodeDecodeError:
continue
# df を csv にする過程で失われる list などのオブジェクトを restore
ColumnManager._reconvert_objects(loaded_df, loaded_meta_columns)
# この段階では column_dtypes が setup されていない可能性があるので
# compatibility check をしない。よって set_df しない。
self.loaded_meta_columns = loaded_meta_columns
self.loaded_df = loaded_df
def check_problem_compatibility(self):
# 読み込んだデータがないのであれば何もしない
if self.loaded_df is None:
return
# 順番が違ってもいいが、
# 構成に変更がないこと。
# ただし obj は減っていてもいい。
loaded_columns, loaded_meta_columns = (
self.loaded_df.columns,
self.loaded_meta_columns,
)
# prm_names が過不足ないか
loaded_prm_names = set(
self.column_manager._filter_prm_names(loaded_columns, loaded_meta_columns)
)
prm_names = set(self.column_manager.get_prm_names())
if not (
len(loaded_prm_names - prm_names) == len(prm_names - loaded_prm_names) == 0
):
raise RuntimeError("Incompatible parameter setting.")
# obj_names が増えていないか
loaded_obj_names = set(
self.column_manager._filter_columns(
"obj", loaded_columns, loaded_meta_columns
)
)
obj_names = set(self.column_manager.get_obj_names())
if len(obj_names - loaded_obj_names) > 0:
raise RuntimeError("Incompatible objective setting.")
# cns_names が過不足ないか
# TODO: cns の上下限は変更されてはならない。
loaded_cns_names = set(
self.column_manager._filter_columns(
"cns", loaded_columns, loaded_meta_columns
)
)
cns_names = set(self.column_manager.get_cns_names())
if not (
len(loaded_cns_names - cns_names) == len(cns_names - loaded_cns_names) == 0
):
raise RuntimeError("Incompatible constraint setting.")
def reinitialize_record_with_loaded_data(
self, column_order_mode: str = ColumnOrderMode.per_category
):
# 読み込んだデータがないのであれば何もしない
if self.loaded_df is None:
return
loaded_columns, loaded_meta_columns = (
self.loaded_df.columns,
self.loaded_meta_columns,
)
loaded_prm_names = set(
self.column_manager._filter_prm_names(loaded_columns, loaded_meta_columns)
)
loaded_obj_names = set(
self.column_manager._filter_columns(
"obj", loaded_columns, loaded_meta_columns
)
)
loaded_cns_names = set(
self.column_manager._filter_columns(
"cns", loaded_columns, loaded_meta_columns
)
)
loaded_other_output_names = set(
self.column_manager._filter_columns(
"other_output.value", loaded_columns, loaded_meta_columns
)
)
# loaded df に存在するが Record に存在しないカラムを Record に追加
extra_parameters = {}
extra_y_names = []
extra_c_names = []
extra_oo_names = []
for l_col, l_meta in zip(loaded_columns, loaded_meta_columns):
# 現在の Record に含まれないならば
if l_col not in self.column_manager.column_dtypes.keys():
# それが prm_name ならば
if l_col in loaded_prm_names:
# それが Categorical ならば
if (
CorrespondingColumnNameRuler.prm_choices_name(l_col)
in loaded_columns
):
param = CategoricalParameter()
param.name = l_col
param.value = ""
param.choices = []
# それが Numeric ならば
elif (
CorrespondingColumnNameRuler.prm_lower_bound_name(l_col)
in loaded_columns
):
param = NumericParameter()
param.name = l_col
param.value = np.nan
param.lower_bound = np.nan
param.upper_bound = np.nan
else:
raise NotImplementedError
extra_parameters.update({l_col: param})
# obj_name ならば
elif l_col in loaded_obj_names:
extra_y_names.append(l_col)
# cns_name ならば
elif l_col in loaded_cns_names:
extra_c_names.append(l_col)
# other_output_name ならば
elif l_col in loaded_other_output_names:
extra_oo_names.append(l_col)
# additional data を取得
a_data = self.column_manager._get_additional_data(
loaded_columns, loaded_meta_columns
)
self.column_manager.set_full_sorted_column_information(
extra_parameters=extra_parameters,
extra_y_names=extra_y_names,
extra_c_names=extra_c_names,
extra_other_output_names=extra_oo_names,
additional_data=a_data,
column_order_mode=column_order_mode,
)
# worker に影響しないように loaded_df のコピーを作成
df: pd.DataFrame = self.loaded_df.copy()
# loaded df に存在しないが Record に存在するカラムを追加
for col in self.column_manager.column_dtypes.keys():
if col not in df.columns:
# column ごとの default 値を追加
if col == "sub_fidelity_name":
df[col] = MAIN_FIDELITY_NAME
else:
df[col] = np.nan
# column_dtypes を設定
# 与える column_dtypes のほうが多い場合
# エラーになるので余分なものを削除
# 与える column_dtypes が少ない分には
# (pandas としては) 問題ない
dtypes = {
k: v
for k, v in self.column_manager.column_dtypes.items()
if k in self.loaded_df.columns
}
df = df.astype(dtypes)
# 並べ替え
df = df[list(self.column_manager.column_dtypes.keys())].astype(
self.column_manager.column_dtypes
)
# OK なので読み込んだデータを set_df する
self.df_wrapper.set_df(df)
def remove_nan_columns(
self, df, meta_columns, columns_to_keep: str | list[str] = None
) -> tuple[pd.DataFrame, tuple[str]]:
"""
Args:
df:
meta_columns:
columns_to_keep: Allowing these columns to all NaN values.
Returns:
Removed DataFrame and corresponding meta_columns.
"""
df = df.replace("", None)
nan_columns = df.isna().all(axis=0)
if columns_to_keep is None:
columns_to_keep = self.column_manager.columns_to_keep_even_if_nan()
nan_columns[columns_to_keep] = False
fdf = df.loc[:, ~nan_columns]
f_meta_columns = (np.array(meta_columns)[~nan_columns]).tolist()
return fdf, f_meta_columns
def save(self, path: str):
# filter NaN columns
df, meta_columns = self.remove_nan_columns(
self.df_wrapper.get_df(),
self.column_manager.meta_columns,
)
try:
with open(path, "w", encoding=ENCODING) as f:
writer = csv.writer(f, delimiter=",", lineterminator="\n")
# write meta_columns
writer.writerow(meta_columns)
writer.writerow([""] * len(meta_columns)) # empty line
# write df from line 3
df.to_csv(f, index=False, encoding=ENCODING, lineterminator="\n")
except PermissionError:
logger.warning(
_(
en_message="History csv file ({path}) is in use and cannot be written to. "
"Please free this file before exiting the program, "
"otherwise history data will be lost.",
jp_message="履歴のCSVファイル({path})が使用中のため書き込みできません。"
"プログラムを終了する前にこのファイルを閉じてください。"
"そうしない場合、履歴データが失われます。",
path=path,
)
)
def append(self, record: Record, rtol=0.01) -> pd.Series:
# get row
row = record.as_df(dtypes=self.column_manager.column_dtypes)
# concat
dfw = self.df_wrapper
# append
with dfw.lock:
df = dfw.get_df()
if len(df) == 0:
# ここで空のカラムを削除しては
# データの並びがおかしくなるケースが出る
new_df = row
else:
# pandas の型推定の仕様変更対策で
# 空のカラムは削除する
row.dropna(axis=1, inplace=True, how="all")
new_df = pd.concat(
[df, row],
axis=0,
ignore_index=True,
)
# calc entire-dependent values
# must be in with block to keep
# the entire data compatibility
# during processing.
self.update_entire_dependent_values(new_df, rtol=rtol)
dfw.set_df(new_df)
# postprocess after recording で使うために
# 計算済み最終行を返す
return new_df.iloc[-1]
def update_entire_dependent_values(
self,
processing_df: pd.DataFrame,
rtol=0.01,
):
_assert_locked_with_timeout(self.df_wrapper.lock)
# check trial_id is filled
trial_processed = False
if processing_df["trial_id"].notna().all():
id_to_n: dict = {
tid: i + 1 for i, tid in enumerate(processing_df["trial_id"].unique())
}
processing_df["trial"] = processing_df["trial_id"].map(id_to_n)
trial_processed = True
# update main fidelity
equality_filters = MAIN_FILTER
mgr = EntireDependentValuesCalculator(
self,
equality_filters,
processing_df,
)
mgr.update_optimality(rtol=rtol)
mgr.update_hypervolume(rtol=rtol)
if not trial_processed:
mgr.update_trial_number() # per_fidelity
pdf = mgr.partial_df
apply_partial_df(
df=processing_df, partial_df=pdf, equality_filters=equality_filters
)
# update sub fidelity
sub_fidelity_names: list = np.unique(
processing_df["sub_fidelity_name"]
).tolist()
if MAIN_FIDELITY_NAME in sub_fidelity_names:
sub_fidelity_names.remove(MAIN_FIDELITY_NAME)
for sub_fidelity_name in sub_fidelity_names:
equality_filters = {"sub_fidelity_name": sub_fidelity_name}
mgr = EntireDependentValuesCalculator(self, equality_filters, processing_df)
if not trial_processed:
mgr.update_trial_number() # per_fidelity
pdf = mgr.partial_df
apply_partial_df(
df=processing_df, partial_df=pdf, equality_filters=equality_filters
)
[docs]
class History:
"""最適化の試行の履歴を管理します。"""
_records: Records
prm_names: list[str]
obj_names: list[str]
cns_names: list[str]
other_output_names: list[str]
sub_fidelity_names: list[str]
is_restart: bool
additional_data: dict
path: str
"""The existing or destination CSV path.
If not specified, the CSV file is saved in the format
"pyfemtet.opt_%Y%m%d_%H%M%S.csv"
when the optimization process starts.
"""
@property
def all_output_names(self) -> list[str]:
return self.obj_names + self.cns_names + self.other_output_names
def __init__(self):
self._records = Records()
self.path: str | None = None
self._finalized: bool = False
self.is_restart = False
self.additional_data = dict(version=pyfemtet.__version__)
self.column_order_mode: ColumnOrderMode | ColumnOrderModeStr = (
ColumnOrderMode.per_category
)
self._rtol_calc_optimality = 0.01
def __str__(self):
return self._records.__str__()
def __enter__(self):
self._records.df_wrapper.start_dask()
def __exit__(self, exc_type, exc_val, exc_tb):
self._records.df_wrapper.end_dask()
def load_csv(self, path, with_finalize=False):
""":meta private:"""
self.is_restart = True
self.path = path
self._records.load(self.path)
if with_finalize:
self._finalize_from_loaded_data()
def _finalize_from_loaded_data(self):
assert self.is_restart
if not self._finalized:
df = self._records.loaded_df
meta_columns = self._records.loaded_meta_columns
self.prm_names = ColumnManager._filter_prm_names(df.columns, meta_columns)
self.obj_names = ColumnManager._filter_columns(
"obj", df.columns, meta_columns
)
self.cns_names = ColumnManager._filter_columns(
"cns", df.columns, meta_columns
)
self.other_output_names = ColumnManager._filter_columns(
"other_output.value", df.columns, meta_columns
)
self.sub_fidelity_names = ColumnManager._get_sub_fidelity_names(df)
self.additional_data = ColumnManager._get_additional_data(
df.columns, meta_columns
)
parameters: TrialInput = {}
for prm_name in self.prm_names:
param = ColumnManager._get_parameter(prm_name, df, meta_columns)
parameters.update({prm_name: param})
self.finalize(
parameters,
self.obj_names,
self.cns_names,
self.other_output_names,
self.sub_fidelity_names,
self.additional_data,
)
def finalize(
self,
parameters: TrialInput,
obj_names,
cns_names,
other_output_names,
sub_fidelity_names,
additional_data,
):
""":meta private:"""
self.prm_names = list(parameters.keys())
self.obj_names = list(obj_names)
self.cns_names = list(cns_names)
self.other_output_names = list(other_output_names)
self.sub_fidelity_names = list(sub_fidelity_names)
self.additional_data.update(additional_data)
if not self._finalized:
# ここで column_dtypes が決定する
self._records.column_manager.initialize(
parameters,
self.obj_names,
self.cns_names,
self.other_output_names,
self.additional_data,
self.column_order_mode,
)
# initialize
self._records.initialize()
if self.path is None:
self.path = datetime.datetime.now().strftime(
"pyfemtet.opt_%Y%m%d_%H%M%S.csv"
)
# load
if os.path.isfile(self.path):
self.load_csv(self.path)
self._records.check_problem_compatibility()
self._records.reinitialize_record_with_loaded_data(
self.column_order_mode
)
self._finalized = True
[docs]
def get_df(self, equality_filters: dict = None) -> pd.DataFrame:
"""Returns the optimization history.
Args:
equality_filters (dict, optional):
The {column: value} というフォーマットの
matching filter.
Returns: The optimization history.
"""
return self._records.df_wrapper.get_df(equality_filters)
[docs]
@staticmethod
def get_trial_name(
trial=None, fidelity=None, sub_sampling=None, row: pd.Series = None
):
return get_trial_name(trial, fidelity, sub_sampling, row)
def recording(self, fems: Sequence[AbstractFEMInterface]):
""":meta private:"""
# noinspection PyMethodParameters
class RecordContext:
def __init__(self_):
self_.record = Record()
self_.record_as_df = None
def __enter__(self_):
self_.record.datetime_start = datetime.datetime.now()
return self_.record
def append(self_):
self_.record.datetime_end = (
self_.record.datetime_end
if self_.record.datetime_end is not None
else datetime.datetime.now()
)
return self._records.append(
self_.record, rtol=self._rtol_calc_optimality
)
@staticmethod
def postprocess_after_recording(row):
client = get_client(self._records.df_wrapper._scheduler_address)
trial_name = self.get_trial_name(row=row)
# FIXME: メインフィデリティだけでなく、FEM に
# 対応するフィデリティ又はサブサンプリングのみ
# フィルタした情報を提供するようにする。
# フィデリティの話は現在解析を実行している opt が
# 必要なので、recording メソッドの引数に
# それを追加する
df = self.get_df(equality_filters=MAIN_FILTER)
if client is not None:
for fem in fems:
client.run_on_scheduler(
fem._postprocess_after_recording,
trial_name=trial_name,
df=df,
**(fem._create_postprocess_args()),
)
else:
for fem in fems:
fem._postprocess_after_recording(
dask_scheduler=None,
trial_name=trial_name,
df=df,
**(fem._create_postprocess_args()),
)
def __exit__(self_, exc_type, exc_val, exc_tb):
self_.record.datetime_end = datetime.datetime.now()
row: pd.Series | None = None
# record feasibility
# skipped -> None (empty)
# succeeded -> True
# else -> False
if self_.record.state == TrialState.skipped:
self_.record.feasibility = None
elif self_.record.state == TrialState.succeeded:
self_.record.feasibility = True
else:
self_.record.feasibility = False
# append
if exc_type is None:
row = self_.append()
# 1st argument of issubclass cannot be None
elif issubclass(exc_type, ExceptionDuringOptimization):
row = self_.append()
# if append is succeeded,
# do fem.post_processing
if row is not None:
self_.postprocess_after_recording(row)
# save history if no FEMOpt
client = get_client(self._records.df_wrapper._scheduler_address)
if client is None:
self.save()
return RecordContext()
[docs]
def save(self):
"""Export the optimization history.
The destination path is :class:`History.path`.
"""
# flask server 情報のように、最適化の途中で
# 書き換えられるケースがあるので
# additional data を再度ここで meta_columns に反映する
cm = self._records.column_manager
for i, column in enumerate(cm.column_dtypes.keys()):
# additional_data を入れる
if column == cm._get_additional_data_column():
cm.meta_columns[i] = json.dumps(self.additional_data or dict())
self._records.save(self.path)
def _create_optuna_study_for_visualization(self):
"""出力は internal ではない値で、objective は出力という意味であり cns, other_output を含む。"""
import optuna
# create study
kwargs: dict[str, ...] = dict(
# storage='sqlite:///' + os.path.basename(self.path) + '_dummy.db',
sampler=None,
pruner=None,
study_name="dummy",
)
if len(self.all_output_names) == 1:
kwargs.update(dict(direction="minimize"))
else:
kwargs.update(dict(directions=["minimize"] * len(self.all_output_names)))
study = optuna.create_study(**kwargs)
# add trial to study
df = self.get_df(equality_filters=MAIN_FILTER)
for i, row in df.iterrows():
# trial
trial_kwargs: dict = dict()
# state
state_str = Record.get_state_str_from_series(row)
if state_str != TrialState.succeeded:
continue
state = optuna.trial.TrialState.COMPLETE
trial_kwargs.update(dict(state=state))
# params
params = {prm_name: row[prm_name] for prm_name in self.prm_names}
trial_kwargs.update(dict(params=params))
# distribution
distributions: dict[str, optuna.distributions.BaseDistribution] = dict()
for prm_name in params.keys():
# float
if self._records.column_manager.is_numerical_parameter(prm_name):
lb_name = CorrespondingColumnNameRuler.prm_lower_bound_name(
prm_name
)
ub_name = CorrespondingColumnNameRuler.prm_upper_bound_name(
prm_name
)
if np.isnan(row[lb_name]):
low = df[prm_name].dropna().values.min()
else:
low = row[lb_name]
if np.isnan(row[ub_name]):
high = df[prm_name].dropna().values.max()
else:
high = row[ub_name]
dist = optuna.distributions.FloatDistribution(low=low, high=high)
# categorical
elif self._records.column_manager.is_categorical_parameter(prm_name):
choices_name = CorrespondingColumnNameRuler.prm_choices_name(
prm_name
)
dist = optuna.distributions.CategoricalDistribution(
choices=row[choices_name]
)
else:
raise NotImplementedError
distributions.update(
{prm_name: dist}
)
trial_kwargs.update(dict(distributions=distributions))
# objective (+ constraints + other_outputs as objective)
if len(self.all_output_names) == 1:
if len(self.obj_names) == 1:
trial_kwargs.update(dict(value=row[self.obj_names].values[0]))
elif len(self.cns_names) == 1:
trial_kwargs.update(dict(value=row[self.cns_names].values[0]))
elif len(self.other_output_names) == 1:
trial_kwargs.update(dict(value=row[self.other_output_names].values[0]))
else:
assert False
else:
values = row[self.all_output_names].values
trial_kwargs.update(dict(values=values))
# add to study
trial = optuna.create_trial(**trial_kwargs)
study.add_trial(trial)
return study
def is_numerical_parameter(self, prm_name: str) -> bool:
""":meta private:"""
return self._records.column_manager.is_numerical_parameter(prm_name)
def is_categorical_parameter(self, prm_name) -> bool:
""":meta private:"""
return self._records.column_manager.is_categorical_parameter(prm_name)