Source code for pyfemtet.dispatch_extensions._impl

from time import time, sleep
from multiprocessing import current_process

from tqdm import tqdm

from win32com.client import Dispatch, CDispatch

from femtetutils import util

from pyfemtet._i18n import Msg
from pyfemtet._util.dask_util import *
from pyfemtet._util.process_util import *
from pyfemtet.logger import get_module_logger


__all__ = [
    'launch_and_dispatch_femtet',
    'dispatch_femtet',
    'dispatch_specific_femtet',
    'DispatchExtensionException',
]


logger = get_module_logger('dispatch', False)


DISPATCH_TIMEOUT = 120


[docs] class DispatchExtensionException(Exception): pass
class FemtetNotFoundException(DispatchExtensionException): """Raises when (specific) Femtet process doesn't exist.""" class FemtetConnectionTimeoutError(DispatchExtensionException): """Raises when connection trials is timed out.""" def _get_subprocess_log_prefix(): return f'({current_process().name}) '
[docs] def launch_and_dispatch_femtet( timeout=DISPATCH_TIMEOUT, strictly_pid_specify=True ) -> tuple[CDispatch, int]: """Launch Femtet by new process and connect to it. The wrapper for Dispatch() but returns PID with IFemtet. Args: timeout (int, optional): Raises an error if the connection is not established within the specified timeout. strictly_pid_specify (bool, optional): Attempts to establish a connection to the launched Femtet strictly. This may result in slower processing due to process exclusivity handling. Raises: FemtetConnectionTimeoutError: Couldn't connect Femtet process for some reason (i.e. Femtet.exe is not launched). Returns: tuple[IFemtet, int]: An object for controlling Femtet and the PID of the Femtet being controlled. """ # launch femtet util.execute_femtet() pid = util.get_last_executed_femtet_process_id() logger.debug(f'Target pid is {pid}.') for _ in tqdm(range(5), Msg.WAIT_FOR_LAUNCH_FEMTET): sleep(1) # dispatch femtet if strictly_pid_specify: Femtet, pid = dispatch_specific_femtet(pid, timeout) else: # worker process なら排他処理する with Lock('simply-dispatch-femtet'): Femtet, pid = dispatch_femtet() return Femtet, pid
[docs] def dispatch_femtet(timeout=DISPATCH_TIMEOUT, subprocess_log_prefix='') -> tuple[CDispatch, int]: """Connect to existing Femtet process. The wrapper for Dispatch() but returns PID with IFemtet. Args: timeout (int, optional): Raises an error if the connection is not established within the specified timeout. subprocess_log_prefix (str, optional): A prefix to output in logs. Typically used only for internal processing. Raises: FemtetConnectionTimeoutError: Couldn't connect Femtet process for some reason (i.e. Femtet.exe is not launched). Returns: tuple[IFemtet, int]: An object for controlling Femtet and the PID of the Femtet being controlled. """ # Dispatch if subprocess_log_prefix: logger.debug('%s'+'Try to connect Femtet.', subprocess_log_prefix) else: logger.info(Msg.TRY_TO_CONNECT_FEMTET) Femtet = Dispatch('FemtetMacro.Femtet') logger.debug('%s'+'Dispatch finished.', subprocess_log_prefix) # timeout 秒以内に接続が確立するか start = time() while True: hwnd = Femtet.hWnd # 接続が確立 if hwnd > 0: logger.debug('%s'+f'Dispatched hwnd is {hwnd} and' f'its pid is {_get_pid(hwnd)}.' f'Connection established.', subprocess_log_prefix) break else: logger.debug('%s'+f'Dispatched hwnd is {hwnd}.' f'Waiting for establishing connection.', subprocess_log_prefix) # 接続がタイムアウト if time()-start > timeout: raise FemtetConnectionTimeoutError(f'Connection trial with Femtet is timed out in {timeout} sec') sleep(1) # pid を取得 pid = _get_pid(hwnd) if subprocess_log_prefix: logger.debug('%s'+f'Successfully connected. The pid of Femtet is {pid}.', subprocess_log_prefix) else: logger.info(Msg.F_FEMTET_CONNECTED(pid)) return Femtet, pid
def _block_other_femtets( target_pid, subprocess_idx, connection_flags, timeout_main_wait_for_us, lock_inter_subproc, # lock_main, ): """Target 以外の pid を持つ Femtet と接続し、他の(メインの)プロセスが接続できないようにする。 ただし、Femtet の Dispatch ルールは一意なので、 ターゲットの Femtet と接続するまでの他のプロセスだけをブロックする。 接続可能な Femtet がなければ timeout するまで接続を試行しようとするが、 ひとつでも timeout すれば、それ以降のプロセスは dispatch する必要がない。 """ # Dispatch を行う if connection_flags[subprocess_idx]: # 他のタイムアウトしたプロセスによってスキップフラグを立てられた場合 my_pid = 0 else: lock_inter_subproc.acquire() try: logger.debug('%s'+f'Subprocess started.', _get_subprocess_log_prefix()) _, my_pid = dispatch_femtet(2, _get_subprocess_log_prefix()) except DispatchExtensionException: logger.debug('%s'+f'Connection failed. The other subprocesses will skip.', _get_subprocess_log_prefix()) my_pid = 0 # ひとつでもここに来れば、それ以降は探しても無駄なのでスキップフラグを立てる for i in range(len(connection_flags)-1): connection_flags[i] = True finally: lock_inter_subproc.release() # # target なら Dispatch の終了を通知する前に main を lock する # if my_pid == target_pid: # lock_main.acquire() # Dispatch の終了を通知 connection_flags[subprocess_idx] = True # # pid が目的のものなら他の subprocesses すべてが Dispatch 終了後に開放する # pid が目的のものなら即刻開放する if my_pid == target_pid: # while True: # start = time() # if all(connection_flags[:-1]): # break # if time()-start > timeout_main_wait_for_us: # return -1 # sleep(1) logger.debug('%s'+f'Release {my_pid}.', _get_subprocess_log_prefix()) # lock_main.release() return 0 # my_pid が非正なら何にも関与しないので即刻終了する elif my_pid <= 0: logger.debug('%s'+'Failed or timed out to connect Femtet.', _get_subprocess_log_prefix()) return 1 # そうでなければメインプロセスが Dispatch を終えるまで解放しない else: logger.debug('%s'+f'Connected to {my_pid}. Keep connection to block.', _get_subprocess_log_prefix()) start = time() while True: if connection_flags[-1]: logger.debug('%s'+'Main process seems to connect target Femtet.', _get_subprocess_log_prefix()) return 1 if time()-start > timeout_main_wait_for_us: logger.warning( '%s'+f'Timed out to wait for the main process to {timeout_main_wait_for_us}', _get_subprocess_log_prefix() ) return -1 sleep(1)
[docs] def dispatch_specific_femtet(pid, timeout=DISPATCH_TIMEOUT) -> tuple[CDispatch, int]: with Lock('dispatch-specific-femtet'): return _dispatch_specific_femtet_core(pid, timeout)
def _dispatch_specific_femtet_core(pid, timeout=DISPATCH_TIMEOUT) -> tuple[CDispatch, int]: if timeout < 5: raise ValueError(f'Timeout to dispatch specific femtet should equal or be over 5.') # 存在する Femtet プロセスの列挙 pids = _get_pids('Femtet.exe') logger.info(f'PID of existing Femtet processes: {pids}') if not (pid in pids): raise FemtetNotFoundException(f"Femtet (pid = {pid}) doesn't exist.") logger.info(Msg.F_SEARCHING_FEMTET_WITH_SPECIFIC_PID(pid)) # 子プロセスの準備 with _NestableSyncManager() as manager: # フラグの準備 connection_flags = manager.list() lock_inter_subproc = manager.Lock() # lock_main = manager.Lock() for _ in range(len(pids)+1): # [-1]は自プロセス用のフラグ connection_flags.append(False) # 目的以外の Femtet をブロックする子プロセスを開始 logger.debug('Start subprocess to block Femtet other than target pid.') processes = [] for subprocess_id in tqdm(range(len(pids)), 'Specifying connection...'): p = _NestableSpawnProcess( target=_block_other_femtets, args=( pid, subprocess_id, connection_flags, 10, lock_inter_subproc, # lock_main, ), ) p.start() processes.append(p) # 子プロセスの Dispatch 完了を待つ start = time() while True: if all(connection_flags[:-1]): break if time()-start > timeout: # 子プロセスを終了する for p in processes: p.terminate() p.join() try: lock_inter_subproc.release() except RuntimeError: pass raise FemtetConnectionTimeoutError( Msg.F_ERR_FEMTET_CONNECTION_TIMEOUT( pid=pid, timeout=timeout, ) ) sleep(1) # # subprocesses の Dispatch 終了後、target_pid の解放を待つ # lock_main.acquire() # 子プロセスによるブロックが終了しているので Dispatch する try: logger.debug('Block process seems to be finished. Try Dispatch.') Femtet, my_pid = dispatch_femtet() except DispatchExtensionException as e: # lock_main.release() raise e # Dispatch 完了を通知 connection_flags[-1] = True # サブプロセスすべての正常終了を待つ for p in processes: p.join() if my_pid != pid: # pid の結果が違う場合 txt = f'Target pid is {pid}, but connected pid is {my_pid}. ' txt += f'The common reason is that THIS python process once connected {pid}.' logger.warning(txt) return Femtet, my_pid def _debug(): launch_and_dispatch_femtet(5) if __name__ == '__main__': _Femtet, _my_pid = launch_and_dispatch_femtet(5) # _Femtet, _my_pid = dispatch_specific_femtet(pid=26124)