Source code for exopy.measurement.plugin

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Plugin handling all measurement related functions.

"""
import logging
import os
from functools import partial

from atom.api import Typed, Str, List, ForwardTyped, Enum, Bool, Dict

from ..utils.plugin_tools import (HasPreferencesPlugin, ExtensionsCollector,
                                  make_extension_validator)
from .engines.api import Engine
from .monitors.api import Monitor
from .hooks.api import PreExecutionHook, PostExecutionHook
from .editors.api import Editor
from .processor import MeasurementProcessor
from .container import MeasurementContainer

logger = logging.getLogger(__name__)

ENGINES_POINT = 'exopy.measurement.engines'

MONITORS_POINT = 'exopy.measurement.monitors'

PRE_HOOK_POINT = 'exopy.measurement.pre-execution'

POST_HOOK_POINT = 'exopy.measurement.post-execution'

EDITORS_POINT = 'exopy.measurement.editors'


def _workspace():
    from .workspace.workspace import MeasurementSpace
    return MeasurementSpace


[docs]class TaskRuntimeContext(): """A context manager used to give temporary access to a task runtime (e.g. driver) to a task. """ def __init__(self, dependencies, task): self.dependencies = dependencies self.task = task def __enter__(self): res, msg, errors = self.dependencies.collect_task_runtimes(self.task) if res: self.task.root.run_time = self.dependencies.get_runtime_dependencies('main') else: logger.error(msg) logger.error(errors) return res, msg, errors def __exit__(self, type, value, traceback): self.task.root.run_time = {} self.dependencies.release_runtimes() self.dependencies.reset()
[docs]class MeasurementPlugin(HasPreferencesPlugin): """The measurement plugin is reponsible for managing all measurement related extensions and handling measurement execution. """ #: Reference to the workspace if any. workspace = ForwardTyped(_workspace) #: Reference to the last directory from/in which a measurement was #: loaded/saved path = Str().tag(pref=True) #: Currently edited measurements. edited_measurements = Typed(MeasurementContainer, ()) #: Currently enqueued measurements. enqueued_measurements = Typed(MeasurementContainer, ()) #: Measurement processor responsible for measurement execution. processor = Typed(MeasurementProcessor) #: List of currently available engines. engines = List() #: Currently selected engine represented by its id. selected_engine = Str().tag(pref=True) #: What to do of the engine when there is no more measurement to perform. engine_policy = Enum('stop', 'sleep').tag(pref=True) #: List of currently available pre-execution hooks. pre_hooks = List() #: Default pre-execution hooks to use for new measurements. default_pre_hooks = List().tag(pref=True) #: List of currently available monitors. monitors = List() #: Default monitors to use for new measurements. default_monitors = List(default=['exopy.text_monitor']).tag(pref=True) #: Always show monitors on measurement startup. auto_show_monitors = Bool(True).tag(pref=True) #: List of currently available post-execution hooks. post_hooks = List() #: Default post-execution hooks to use for new measurements. default_post_hooks = List().tag(pref=True) #: List of currently available editors. editors = List() # TODO add the possibility to deactivate some editors.
[docs] def start(self): """Start the plugin lifecycle by collecting all contributions. """ core = self.workbench.get_plugin('enaml.workbench.core') core.invoke_command('exopy.app.errors.enter_error_gathering') checker = make_extension_validator(Engine, ('new',)) self._engines = ExtensionsCollector(workbench=self.workbench, point=ENGINES_POINT, ext_class=Engine, validate_ext=checker) self._engines.start() checker = make_extension_validator(Editor, ('new', 'is_meant_for')) self._editors = ExtensionsCollector(workbench=self.workbench, point=EDITORS_POINT, ext_class=Editor, validate_ext=checker) self._editors.start() checker = make_extension_validator(PreExecutionHook, ('new',)) self._pre_hooks = ExtensionsCollector(workbench=self.workbench, point=PRE_HOOK_POINT, ext_class=PreExecutionHook, validate_ext=checker) self._pre_hooks.start() checker = make_extension_validator(Monitor, ('new',)) self._monitors = ExtensionsCollector(workbench=self.workbench, point=MONITORS_POINT, ext_class=Monitor, validate_ext=checker) self._monitors.start() checker = make_extension_validator(PostExecutionHook, ('new',)) self._post_hooks = ExtensionsCollector(workbench=self.workbench, point=POST_HOOK_POINT, ext_class=PostExecutionHook, validate_ext=checker) self._post_hooks.start() for contrib in ('engines', 'editors', 'pre_hooks', 'monitors', 'post_hooks'): self._update_contribs(contrib, None) # This call is delayed till there to avoid loading the preferences # before discovering the contributions (would be an issue for engine). super(MeasurementPlugin, self).start() state = core.invoke_command('exopy.app.states.get', {'state_id': 'exopy.app.directory'}) m_dir = os.path.join(state.app_directory, 'measurement') # Create measurement subfolder if it does not exist. if not os.path.isdir(m_dir): os.mkdir(m_dir) s_dir = os.path.join(m_dir, 'saved_measurements') # Create saved_measurements subfolder if it does not exist. if not os.path.isdir(s_dir): os.mkdir(s_dir) if not os.path.isdir(self.path): self.path = s_dir cmd = 'exopy.app.errors.signal' for contrib in ('pre_hooks', 'monitors', 'post_hooks'): default = getattr(self, 'default_'+contrib) avai_default = [d for d in default if d in getattr(self, contrib)] if default != avai_default: msg = 'The following {} have not been found : {}' missing = set(default) - set(avai_default) core.invoke_command(cmd, dict(kind='error', message=msg.format(contrib, missing))) setattr(self, 'default_'+contrib, avai_default) for contrib in ('engines', 'editors', 'pre_hooks', 'monitors', 'post_hooks'): getattr(self, '_'+contrib).observe('contributions', partial(self._update_contribs, contrib)) core.invoke_command('exopy.app.errors.exit_error_gathering')
[docs] def stop(self): """Stop the plugin and remove all observers. """ # Close the monitors window. if self.processor.monitors_window: self.processor.monitors_window.hide() self.processor.monitors_window.close() self.processor.monitors_window = None for contrib in ('engines', 'editors', 'pre_hooks', 'monitors', 'post_hooks'): getattr(self, '_'+contrib).stop()
[docs] def get_declarations(self, kind, ids): """Get the declarations of engines/editors/tools. If an id does not correspond to a known declarations it will be omitted from the return value, but no error will be raised. This is because the user can easily know which declarations exist by looking at the appropriate member of the plugin. Parameters ---------- kind : {'engine', 'editor', 'pre-hook', 'monitor', 'post-hook'} Kind of object to create. ids : list Ids of the declarations to return. Returns ------- declarations : dict Declarations stored in a dict by id. """ kinds = ('engine', 'editor', 'pre-hook', 'monitor', 'post-hook') if kind not in kinds: msg = 'Expected kind must be one of {}, not {}.' raise ValueError(msg.format(kinds, kind)) decls = getattr(self, '_'+kind.replace('-', '_')+'s').contributions return {k: v for k, v in decls.items() if k in ids}
[docs] def create(self, kind, id, default=True): """Create a new instance of an engine/editor/tool. Parameters ---------- kind : {'engine', 'editor', 'pre-hook', 'monitor', 'post-hook'} Kind of object to create. id : unicode Id of the object to create. default : bool, optional Whether to use default parameters or not when creating the object. Returns ------- obj : BaseEngine|BaseMeasurementTool|BaseEditor New instance of the requested object. Raises ------ ValueError : Raised if the provided kind or id in incorrect. """ kinds = ('engine', 'editor', 'pre-hook', 'monitor', 'post-hook') if kind not in kinds: msg = 'Expected kind must be one of {}, not {}.' raise ValueError(msg.format(kinds, kind)) decls = getattr(self, '_'+kind.replace('-', '_')+'s').contributions if id not in decls: raise ValueError('Unknown {} : {}'.format(kind, id)) return decls[id].new(self.workbench, default)
[docs] def find_next_measurement(self): """Find the next runnable measurement in the queue. Returns ------- measurement : Measurement|None First valid measurement in the queue or None if there is no available measurement. """ enqueued_measurements = self.enqueued_measurements.measurements i = 0 measurement = None # Look for a measurement not being currently edited. (Can happen if the # user is editing the second measurement when the first measurement # ends). while i < len(enqueued_measurements): measurement = enqueued_measurements[i] if measurement.status != 'READY': i += 1 measurement = None else: break return measurement
[docs] def get_task_runtime(self, measurement, task): """Give temporary access to a task runtime Parameters ---------- measurement: Measurement Measurement used to analyse and collect runtime task: Task: Task whose dependencies are going to be analysed and collected. Must be part of the measurement. Returns ------- runtime: TaskRuntimeContext A context manager that acquires and releases the task dependencies. """ return TaskRuntimeContext(measurement.dependencies, task)
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= #: Collector of engines. _engines = Typed(ExtensionsCollector) #: Collector of editors. _editors = Typed(ExtensionsCollector) #: Collector of pre-execution hooks. _pre_hooks = Typed(ExtensionsCollector) #: Collectorsof monitors. _monitors = Typed(ExtensionsCollector) #: Collector of post-execution hooks. _post_hooks = Typed(ExtensionsCollector) #: Workspace state infos kept to preserve layout. _workspace_state = Dict() def _post_setattr_selected_engine(self, old, new): """Ensures that the selected engine is informed when it is selected and deselected. This is always called before notifying the workspace of the change. """ # Destroy old instance if any. self.processor.engine = None if old in self.engines: engine = self._engines.contributions[old] engine.react_to_unselection(self.workbench) if new and new in self.engines: engine = self._engines.contributions[new] engine.react_to_selection(self.workbench) def _update_contribs(self, name, change): """Update the list of available contributions (editors, engines, tools) when they change. """ setattr(self, name, list(getattr(self, '_'+name).contributions)) def _default_processor(self): """Create a MeasurementProcessor with a reference to the plugin. """ return MeasurementProcessor(plugin=self)