Source code for exopy.measurement.processor

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Object reponsible for a measurement execution.

Please note that the 'real' work of performing the tasks is handled by the
engine. This object handles all the other aspects (running of the hooks,
handling of the monitors, ...)

"""
import os
import logging
from time import sleep
from threading import Thread, RLock

import enaml
from atom.api import Atom, Typed, ForwardTyped, Value, Bool
from enaml.widgets.api import Window
from enaml.layout.api import InsertTab, FloatItem
from enaml.application import deferred_call, schedule

from .engines.api import BaseEngine, ExecutionInfos
from .measurement import Measurement
from ..utils.flags import BitFlag
from ..utils.traceback import format_exc


logger = logging.getLogger(__name__)


[docs]def plugin(): """Delayed import to avoid circular references. """ from .plugin import MeasurementPlugin return MeasurementPlugin
[docs]def schedule_and_block(func, args=(), kwargs={}, priority=100): """Schedule a function call on the main thread and wait for it to complete. """ sheduled = schedule(func, args, kwargs, priority=100) while sheduled.pending(): sleep(0.05)
[docs]class MeasurementProcessor(Atom): """Object reponsible for a measurement execution. """ #: Boolean indicating whether or not the processor is working. active = Bool() #: Reference to the measurement plugin. plugin = ForwardTyped(plugin) #: Currently run measurement or last measurement run. running_measurement = Typed(Measurement) #: Instance of the currently used engine. engine = Typed(BaseEngine) #: Boolean indicating whether or not process all enqueued measurements. continuous_processing = Bool(True) #: Monitors window monitors_window = Typed(Window)
[docs] def start_measurement(self, measurement): """Start a new measurement. """ if self._thread and self._thread.is_alive(): self._state.set('stop_processing') self._thread.join(5) if self._thread.is_alive(): core = self.plugin.workbench.get_plugin('enaml.workbench.core') cmd = 'exopy.app.errors.signal' msg = ("Can't stop the running execution thread. Please " "restart the application and consider reporting this " "as a bug.") core.invoke_command(cmd, dict(kind='error', message=msg)) return if self.continuous_processing: self._state.set('continuous_processing') else: self._state.clear('continuous_processing') deferred_call(setattr, self, 'active', True) self._thread = Thread(target=self._run_measurements, args=(measurement,)) self._thread.daemon = True self._thread.start()
[docs] def pause_measurement(self): """Pause the currently active measurement. """ logger.info('Pausing measurement %s.', self.running_measurement.name) self.running_measurement.status = 'PAUSING' self._state.set('pause_attempt') if self._state.test('running_main'): self.engine.pause() self.engine.observe('status', self._watch_engine_state) else: if self._active_hook: self._active_hook.pause() self._active_hook.observe('paused', self._watch_hook_state)
[docs] def resume_measurement(self): """Resume the currently paused measurement. """ logger.info('Resuming measurement %s.', self.running_measurement.name) self.running_measurement.status = 'RESUMING' self._state.clear('paused') self._state.set('resuming') if self._state.test('running_main'): self.engine.resume() self.engine.observe('status', self._watch_engine_state) else: if self._active_hook: self._active_hook.resume() self._active_hook.observe('resumed', self._watch_hook_state)
[docs] def stop_measurement(self, no_post_exec=False, force=False): """Stop the currently active measurement. """ if no_post_exec or force: self._state.set('no_post_exec') self._state.set('stop_attempt') if self.running_measurement: logger.info('Stopping measurement %s.', self.running_measurement.name) self.running_measurement.status = 'STOPPING' if self._state.test('running_main'): self.engine.stop(force) else: if self._active_hook: self._active_hook.stop(force)
[docs] def stop_processing(self, no_post_exec=False, force=False): """Stop processing the enqueued measurements. """ if self.running_measurement: logger.info('Stopping measurement %s.', self.running_measurement.name) if no_post_exec or force: self._state.set('no_post_exec') self._state.set('stop_attempt', 'stop_processing') self._state.clear('processing') if self._state.test('running_main'): self.engine.stop(force) else: if self._active_hook: self._active_hook.stop(force)
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= #: Background thread handling the measurement execution _thread = Value() #: Internal flags used to keep track of the execution state. _state = Typed(BitFlag, (('processing', 'running_pre_hooks', 'running_main', 'running_post_hooks', 'pause_attempt', 'paused', 'resuming', 'stop_attempt', 'stop_processing', 'no_post_exec', 'continuous_processing'),) ) #: Hook currently executed. The value is meaningful only when #: 'running_pre_hooks' or 'running_post_hooks' is set. _active_hook = Value() #: Lock to avoid race condition when pausing. _lock = Value(factory=RLock) def _run_measurements(self, measurement): """Run measurements (either all enqueued or only one) This code is executed by a thread (stored in _thread) Parameters ---------- measurement : Measurement First measurement to run. Other measurements will be run in their order of appearance in the queue if the user enable continuous processing. """ # If the engine does not exist, create one. plugin = self.plugin if not self.engine: engine = plugin.create('engine', plugin.selected_engine) schedule_and_block(setattr, (self, 'engine', engine)) # Mark that we started processing measurements. self._state.set('processing') # Process enqueued measurement as long as we are supposed to. while not self._state.test('stop_processing'): # Clear the internal state to start fresh. self._clear_state() # If we were provided with a measurement use it, otherwise find the # next one. if measurement: meas = measurement measurement = None else: meas = self.plugin.find_next_measurement() # If there is a measurement register it as the running one, update # its status and log its execution. if meas is not None: meas_id = meas.name + '_' + meas.id self._set_measurement_state('RUNNING', 'The measurement is being run.', meas) msg = 'Starting execution of measurement %s' logger.info(msg % meas.name + meas.id) status, infos = self._run_measurement(meas) # Release runtime dependencies. meas.dependencies.release_runtimes() # If no measurement remains stop. else: break # Log the result. mess = 'Measurement %s processed, status : %s' % (meas_id, status) if infos: mess += '\n' + infos logger.info(mess) # Update the status and infos. self._set_measurement_state(status, infos, clear=True) # If we are supposed to stop, stop. if (not self._state.test('continuous_processing') or self._state.test('stop_processing')): break if self.engine and self.plugin.engine_policy == 'stop': self._stop_engine() self._state.clear('processing') deferred_call(setattr, self, 'active', False) def _run_measurement(self, measurement): """Run a single measurement. """ # Switch to running state. measurement.enter_running_state() meas_id = measurement.name + '_' + measurement.id # Collect runtime dependencies res, msg, errors = measurement.dependencies.collect_runtimes() if not res: status = 'SKIPPED' if 'unavailable' in msg else 'FAILED' return status, msg + '\n' + errors_to_msg(errors) # Records that we got access to all the runtimes. mess = ('The use of all runtime resources have been granted to the ' 'measurement %s' % meas_id) logger.info(mess.replace('\n', ' ')) # Run checks now that we have all the runtimes. if not measurement.forced_enqueued: res, errors = measurement.run_checks() if not res: msg = 'Measurement %s failed to pass the checks :\n' % meas_id return 'FAILED', msg + errors_to_msg(errors) # Now that we know the measurement is going to run save it. default_filename = meas_id + '.meas.ini' path = os.path.join(measurement.root_task.default_path, default_filename) measurement.save(path) logger.info('Starting measurement {}.'.format(meas_id)) # Execute all pre-execution hooks. result, errors = self._run_pre_execution(measurement) if not result: msg = ('Measurement %s failed to run pre-execution hooks :\n' % meas_id) return 'FAILED', msg + errors_to_msg(errors) result = True errors = {} if self._check_for_pause_or_stop(): # Connect new monitors, and start them. logger.debug('Connecting monitors for measurement %s', meas_id) self._start_monitors(measurement) # Assemble the task infos for the engine to run the main task. deps = measurement.dependencies infos = ExecutionInfos( id=meas_id+'-main', task=measurement.root_task, build_deps=deps.get_build_dependencies().dependencies, runtime_deps=deps.get_runtime_dependencies('main'), observed_entries=measurement.collect_monitored_entries(), checks=not measurement.forced_enqueued, ) # Ask the engine to perform the main task. logger.debug('Passing measurement %s to the engine.', meas_id) self._state.set('running_main') execution_result = self.engine.perform(infos) self._state.clear('running_main') # Record the result and store engine return value in the # measurement for the post execution hooks. result &= execution_result.success errors.update(execution_result.errors) measurement.task_execution_result = execution_result # Disconnect monitors. logger.debug('Disonnecting monitors for measurement %s', meas_id) self._stop_monitors(measurement) # Save the stop_attempt state to allow to run post execution if we # are supposed to do so. state = self._state.test('stop_attempt') self._state.clear('stop_attempt') # Execute all post-execution hooks if pertinent. if not self._state.test('no_post_exec'): res, errors = self._run_post_execution(measurement) result &= res if state: self._state.set('stop_attempt') if self._state.test('stop_attempt'): return ('INTERRUPTED', 'The measurement has been interrupted by the user.') if not result: if not execution_result.success: msg = 'Execution of the main task failed :\n' else: msg = 'Some post-execution hook failed to run :\n' return 'FAILED', msg + errors_to_msg(errors) return 'COMPLETED', 'The measurement successfully completed.' def _run_pre_execution(self, measurement): """Run pre measurement execution operations. Returns ------- result : bool Boolean indicating whether or not the operations succeeded. report : dict Dict storing the errors (as dict) by id of the operation in which they occured. """ result = True full_report = {} self._state.set('running_pre_hooks') meas_id = measurement.name + '_' + measurement.id for id, hook in measurement.pre_hooks.items(): if not self._check_for_pause_or_stop(): break logger.debug('Calling pre-measurement hook %s for measurement %s', id, meas_id) with self._lock: self._active_hook = hook try: hook.run(self.plugin.workbench, self.engine) except Exception: result = False full_report[id] = format_exc() # Prevent issues with pausing/resuming with self._lock: self._active_hook.unobserve('paused', self._watch_hook_state) self._active_hook = None self._state.clear('running_pre_hooks') return result, full_report def _run_post_execution(self, measurement): """Run post measurement operations. Parameters ---------- measurement : Measurement Returns ------- result : bool Boolean indicating whether or not the operations succeeded. report : dict Dict storing the errors (as dict) by id of the operation in which they occured. """ result = True full_report = {} self._state.set('running_post_hooks') meas_id = measurement.name + '_' + measurement.id for id, hook in measurement.post_hooks.items(): if not self._check_for_pause_or_stop(): break logger.debug('Calling post-measurement hook %s for measurement %s', id, meas_id) with self._lock: self._active_hook = hook try: hook.run(self.plugin.workbench, self.engine) except Exception: result = False full_report[id] = format_exc() # Prevent issues with pausing/resuming with self._lock: self._active_hook.unobserve('paused', self._watch_hook_state) self._active_hook = None self._state.clear('running_post_hooks') return result, full_report def _start_monitors(self, measurement): """Start the monitors attached to a measurement and display them. If no dedicated window exists one will be created. For monitors for which a dockitem already exists it is re-used. """ def start_monitors(self, measurement): """Start the monitors attached to a measurement. Called in the main thread. """ workbench = self.plugin.workbench if not self.monitors_window: with enaml.imports(): from .workspace.monitors_window import MonitorsWindow self.monitors_window = MonitorsWindow() else: self.monitors_window.send_to_front() self.monitors_window.measurement = measurement dock_area = self.monitors_window.dock_area anchor = '' for dock_item in dock_area.dock_items(): if dock_item.name not in measurement.monitors: dock_item.destroy() elif not anchor: anchor = dock_item.name # We show the window now because otherwise the layout ops are not # properly executed. if self.plugin.auto_show_monitors: self.monitors_window.show() ops = [] for monitor in measurement.monitors.values(): decl = monitor.declaration dock_item = dock_area.find(decl.id) if dock_item is None: try: dock_item = decl.create_item(workbench, dock_area) except Exception: msg = 'Failed to create widget for monitor %s :\n %s' logger.error(msg, decl.id, format_exc()) continue if dock_item is not None: if dock_item.float_default: ops.append(FloatItem(item=decl.id)) else: ops.append(InsertTab(item=decl.id, target=anchor)) self.engine.observe('progress', monitor.process_news) if dock_item: dock_item.monitor = monitor monitor.start() if ops: dock_area.update_layout(ops) # Executed in the main thread to avoid GUI update issues. schedule_and_block(start_monitors, (self, measurement), priority=100) def _stop_monitors(self, measurement): """Disconnect the monitors from the engine and stop them. The monitors windows is not hidden as the user may want to check it later. """ def stop_monitors(engine, measurement): """Stop the monitors. Executed on the main thread. """ if engine: engine.unobserve('news') for monitor in measurement.monitors.values(): monitor.stop() # Executed in the main thread to avoid GUI update issues. schedule_and_block(stop_monitors, (self.engine, measurement), priority=100) def _check_for_pause_or_stop(self): """Check if a pause or stop request is pending and process it. Returns ------- should_stop : bool Booelan indicating whether or not the execution of the measurement should stop. """ flag = self._state if flag.test('stop_attempt'): return False if flag.test('pause_attempt'): flag.clear('pause_attempt') self._set_measurement_state('PAUSED', 'The measurement is paused.') flag.set('paused') while True: if flag.wait(0.1, 'resuming'): flag.clear('resuming') self._set_measurement_state('RUNNING', 'The measurement has resumed.') return True if flag.test('stop_attempt'): return False return True # Those must post update of measurement.status and remove observers def _watch_engine_state(self, change): """Observe engine state to notify that the engine paused or resumed. """ if change['value'] == 'Paused': self._state.clear('pause_attempt') self.engine.unobserve('status', self._watch_engine_state) self._set_measurement_state('PAUSED', 'The measurement is paused.') self._state.set('paused') elif change['value'] == 'Running': self._state.clear('resuming') self.engine.unobserve('status', self._watch_engine_state) self._set_measurement_state('RUNNING', 'The measurement has resumed.') def _watch_hook_state(self, change): """Observe hook paused/resumed events to validate pausing/resuming. """ if change['name'] == 'paused': self._active_hook.unobserve('status', self._watch_hook_state) self._set_measurement_state('PAUSED', 'The measurement is paused.') self._state.clear('pause_attempt') self._state.set('paused') elif change['name'] == 'resumed': self._state.clear('resuming') self._active_hook.unobserve('status', self._watch_hook_state) self._set_measurement_state('RUNNING', 'The measurement has resumed.') def _set_measurement_state(self, status, infos, measurement=None, clear=False): """Set the measurement status and infos in the main thread. """ def set_state(processor, status, infos, meas, clear): if meas: processor.running_measurement = meas measurement = processor.running_measurement measurement.status = status measurement.infos = infos if clear: processor.running_measurement = None # Executed in the main thread to avoid GUI update issues. schedule_and_block(set_state, (self, status, infos, measurement, clear), priority=100) def _stop_engine(self): """Stop the engine. """ logger.debug('Stopping engine') engine = self.engine engine.shutdown() i = 0 while engine and engine.status != 'Stopped': sleep(0.5) i += 1 if i > 10: engine.shutdown(force=True) def _clear_state(self): """Clear the state when starting while preserving persistent settings. """ flags = list(self._state.flags) flags.remove('processing') flags.remove('continuous_processing') self._state.clear(*flags) def _post_setattr_continuous_processing(self, old, new): """Make sure the internal bit flag does reflect the real setting. """ if new: self._state.set('continuous_processing') else: self._state.clear('continuous_processing')
[docs]def errors_to_msg(errors): """Convert a dictionary of errors in a well formatted message. """ err = '\n'.join(('- %s : %s' % (k, v) for k, v in errors.items())) return 'The following errors occured :\n' + err