Source code for exopy.measurement.engines.process_engine.engine

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Engine executing the measurement in a different process.

"""
import logging
from multiprocessing import Pipe, Queue, Event
from threading import Thread
from threading import Event as tEvent
from pprint import pformat

from atom.api import Typed, Value, Bool

from ....utils.traceback import format_exc
from ....app.log.tools import QueueLoggerThread
from ..base_engine import BaseEngine
from ..utils import ThreadMeasureMonitor
from .subprocess import TaskProcess

logger = logging.getLogger(__name__)


[docs]class ProcessEngine(BaseEngine): """An engine executing the tasks it is sent in a different process. """
[docs] def perform(self, exec_infos): """Execute a given task. Parameters ---------- exec_infos : ExecutionInfos TaskInfos object describing the work to expected of the engine. Returns ------- exec_infos : ExecutionInfos Input object whose values have been updated. This is simply a convenience. Notes ----- IOError in pipe are raised only if an operation is attempted from the process that closed the pipe, but never when trying to poll from a different process. """ self.status = 'Running' # Clear all the flags. self._task_pause.clear() self._task_paused.clear() self._task_resumed.clear() self._task_stop.clear() self._force_stop.clear() self._stop_requested = False # If the process does not exist or is dead create a new one. if not self._process or not self._process.is_alive(): self._process_stop.clear() # Create the subprocess and the pipe. self._pipe, process_pipe = Pipe() self._process = TaskProcess(process_pipe, self._log_queue, self._monitor_queue, self._task_pause, self._task_paused, self._task_resumed, self._task_stop, self._process_stop) self._process.daemon = True # Create the logger thread in charge of dispatching log reports. self._log_thread = QueueLoggerThread(self._log_queue) self._log_thread.daemon = True logger.debug('Starting logging thread.') self._log_thread.start() # Create the monitor thread dispatching engine news to the monitor. self._monitor_thread = ThreadMeasureMonitor(self, self._monitor_queue) self._monitor_thread.daemon = True logger.debug('Starting monitoring thread.') self._monitor_thread.start() self._pause_thread = None # Start process. logger.debug('Starting subprocess') self._process.start() # Send the measurement. args = self._build_subprocess_args(exec_infos) try: self._pipe.send(args) except Exception: msg = ('Failed to send infos to subprocess :\n-infos : \n%s\n' '-errors :\n%s') logger.error(msg % (pformat(args), format_exc())) self._log_queue.put(None) self._monitor_queue.put((None, None)) self._cleanup(process=True) exec_infos.success = False exec_infos.errors['engine'] = msg self.status = 'Stopped' return exec_infos else: logger.debug('Task {} sent'.format(exec_infos.id)) # Check that the engine did receive the task. while not self._pipe.poll(2): if not self._process.is_alive(): msg = 'Subprocess was found dead unexpectedly' logger.debug(msg) self._log_queue.put(None) self._monitor_queue.put((None, None)) self._cleanup(process=False) exec_infos.success = False exec_infos.errors['engine'] = msg self.status = 'Stopped' return exec_infos # Simply empty the pipe the subprocess always send True if it answers self._pipe.recv() # Wait for the process to finish the measurement and check it has not # been killed. while not self._pipe.poll(1): if self._force_stop.is_set(): msg = 'Subprocess was terminated by the user.' logger.debug(msg) self._cleanup(process=False) exec_infos.errors['engine'] = msg self.status = 'Stopped' return exec_infos elif not self._process.is_alive(): msg = 'Subprocess was found dead unexpectedly' logger.debug(msg) self._log_queue.put(None) self._monitor_queue.put((None, None)) self._cleanup(process=False) exec_infos.success = False exec_infos.errors['engine'] = msg self.status = 'Stopped' return exec_infos # Here get message from process and react result, errors = self._pipe.recv() logger.debug('Subprocess done performing measurement') exec_infos.success = result exec_infos.errors.update(errors) self.status = 'Waiting' return exec_infos
[docs] def pause(self): """Ask the engine to pause the current task execution. """ self.status = 'Pausing' self._task_resumed.clear() self._task_paused.clear() self._task_pause.set() self._pause_thread = Thread(target=self._wait_for_pause) self._pause_thread.start()
[docs] def resume(self): """Ask the engine to resume the currently paused job. """ self.status = 'Resuming' self._task_pause.clear()
[docs] def stop(self, force=False): """Ask the engine to stop the current job. This method should not wait for the job to stop save if a forced stop was requested. Parameters ---------- force : bool, optional Force the engine to stop the performing the task. This allow the engine to use any means necessary to stop, in this case only should the call to this method block. """ self.status = 'Stopping' self._stop_requested = True self._task_stop.set() if force: self._force_stop.set() # Stop running queues self._log_queue.put(None) self._monitor_queue.put((None, None)) # Terminate the process and make sure all threads stopped properly. self._process.terminate() self._log_thread.join() self._monitor_thread.join() # Discard the queues as they may have been corrupted when the # process was terminated. self._log_queue = Queue() self._monitor_queue = Queue() self.status = 'Stopped'
[docs] def shutdown(self, force=False): """Ask the engine to stop completely. Parameters ---------- force : bool, optional Force the engine to stop the performing the task. This allow the engine to use any means necessary to stop, in this case only should the call to this method block. """ self.status = 'Shutting down' self._stop_requested = True self._task_stop.set() if not force: t = Thread(target=self._cleanup) t.start() else: self.stop(force=True)
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= #: Boolean indicating that the user requested the job to stop. _stop_requested = Bool() #: Interprocess event used to pause the subprocess current job. _task_pause = Value(factory=Event) #: Interprocess event signaling the subprocess current job is paused. _task_paused = Value(factory=Event) #: Interprocess event signaling the subprocess current job has resumed. _task_resumed = Value(factory=Event) #: Interprocess event used to stop the subprocess current measurement. _task_stop = Value(factory=Event) #: Interprocess event used to stop the subprocess. _process_stop = Value(factory=Event) #: Flag signaling that a forced exit has been requested _force_stop = Value(factory=tEvent) #: Current subprocess. _process = Typed(TaskProcess) #: Connection used to send and receive messages about execution (type #: ambiguous when the OS is not known) _pipe = Value() #: Inter-process queue used by the subprocess to transmit its log records. _log_queue = Value(factory=Queue) #: Thread in charge of collecting the log message coming from the #: subprocess. _log_thread = Typed(Thread) #: Inter-process queue used by the subprocess to send the values of the #: observed database entries. _monitor_queue = Value(factory=Queue) #: Thread in charge of collecting the values of the observed database #: entries. _monitor_thread = Typed(Thread) #: Thread in charge of notifying the engine that the engine did #: pause/resume after being asked to do so. _pause_thread = Typed(Thread) def _cleanup(self, process=True): """ Helper method taking care of making sure that everybody stops. Parameters ---------- process : bool Whether to join the worker process. Used when the process has been termintaed abruptly. """ logger.debug('Cleaning up') if process and self._process: self._process_stop.set() self._process.join() logger.debug('Subprocess joined') if self._pipe: self._pipe.close() if self._log_thread: self._log_thread.join() logger.debug('Log thread joined') if self._monitor_thread: self._monitor_thread.join() logger.debug('Monitor thread joined') if self._pause_thread: self._pause_thread.join() logger.debug('Pause thread joined') self.status = 'Stopped' def _build_subprocess_args(self, exec_infos): """Build the tuple to send to the subprocess. """ exec_infos.task.update_preferences_from_members() config = exec_infos.task.preferences database_root_state = exec_infos.task.database.copy_node_values() return (exec_infos.id, config, exec_infos.build_deps, exec_infos.runtime_deps, exec_infos.observed_entries, database_root_state, exec_infos.checks ) def _wait_for_pause(self): """ Wait for the _task_paused event to be set. """ stop_sig = self._task_stop paused_sig = self._task_paused while not stop_sig.is_set(): if paused_sig.wait(0.1): self.status = 'Paused' break resuming_sig = self._task_resumed while not stop_sig.is_set(): if resuming_sig.wait(1): self.status = 'Running' break