# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Main objects used to represent all the aspects of a measurement (main task,
attached tools, dependencies, ...)
"""
import logging
from collections import OrderedDict, defaultdict
from itertools import chain
from datetime import date, datetime
from atom.api import (Atom, Dict, Str, Typed, ForwardTyped, Bool, Enum,
Value)
from configobj import ConfigObj
from ..tasks.api import RootTask
from ..utils.traceback import format_exc
from ..utils.configobj_ops import include_configobj
from ..utils.atom_util import HasPrefAtom
LOGGER = logging.getLogger(__name__)
[docs]def measurement_plugin():
"""Delayed to avoid circular references.
"""
from .plugin import MeasurementPlugin
return MeasurementPlugin
[docs]class MeasurementDependencies(Atom):
"""Container used to store the dependencies of a measurement.
"""
#: Reference to the Measurement this object is linked to.
measurement = ForwardTyped(lambda: Measurement)
[docs] def collect_runtimes(self):
"""Collect all the runtime needed to execute the measurement.
Those can then be accessed using `get_runtime_dependencies`
Returns
-------
result : bool
Boolean indicating whether or not the collection succeeded. Note
that even if the collection failed, some dependencies may have been
collected (other being unavailable) and must hence be released.
msg : str
String explaning why the operation failed if it failed.
errors : dict
Dictionary describing in details the errors. If some dependencies
does exist but cannot be accessed at the time of the query an entry
'unavailable' will be present.
"""
if self._runtime_dependencies:
return True, '', {}
res = self._analyse_task_runtime(self.measurement.root_task)
if not res[0]:
return res
workbench = self.measurement.plugin.workbench
# Check that we know the dependencies of the hooks
for h_id, h in chain(self.measurement.pre_hooks.items(),
self.measurement.post_hooks.items()):
if h_id not in self._runtime_map:
deps = h.list_runtimes(workbench)
if deps is None:
continue # The hook has no runtime dependencies
if deps.errors:
msg = 'Failed to analyse hook %s runtime dependencies.'
return False, msg % h_id, deps.errors
self._runtime_map[h_id] = deps.dependencies
self._update_runtime_analysis(deps.dependencies)
return self._collect_analysed_runtimes()
[docs] def collect_task_runtimes(self, task):
"""Collect all the runtime needed to execute a single task.
Those can then be accessed using `get_runtime_dependencies`
Returns
-------
result : bool
Boolean indicating whether or not the collection succeeded. Note
that even if the collection failed, some dependencies may have been
collected (other being unavailable) and must hence be released.
msg : str
String explaning why the operation failed if it failed.
errors : dict
Dictionary describing in details the errors. If some dependencies
does exist but cannot be accessed at the time of the query an entry
'unavailable' will be present.
"""
if task is None:
return False, 'No task was given', {}
if self._runtime_dependencies:
return True, '', {}
res = self._analyse_task_runtime(task)
if not res[0]:
return res
return self._collect_analysed_runtimes()
[docs] def release_runtimes(self):
"""Release all the runtimes collected for the execution.
"""
if not self._runtime_dependencies:
return
workbench = self.measurement.plugin.workbench
core = workbench.get_plugin('enaml.workbench.core')
cmd = 'exopy.app.dependencies.release_runtimes'
core.invoke_command(cmd, dict(owner='exopy.measurement',
dependencies=self._runtime_dependencies))
self._runtime_dependencies = None
[docs] def get_build_dependencies(self):
"""Get the build dependencies associated with the main task.
Returns
-------
dependencies : BuildContainer
BuildContainer as returned by 'exopy.app.dependencies.collect'.
The errors member should be checked to detect errors.
"""
workbench = self.measurement.plugin.workbench
core = workbench.get_plugin('enaml.workbench.core')
if not self._build_analysis:
cmd = 'exopy.app.dependencies.analyse'
deps = core.invoke_command(cmd,
{'obj': self.measurement.root_task,
'dependencies': ['build']})
if deps.errors:
return deps
self._build_analysis = deps.dependencies
if not self._build_dependencies:
cmd = 'exopy.app.dependencies.collect'
deps = core.invoke_command(cmd,
dict(dependencies=self._build_analysis,
kind='build'))
if not deps.errors:
self._build_dependencies = deps
else:
deps = self._build_dependencies
return deps
[docs] def get_runtime_dependencies(self, id):
"""Access the runtime dependencies associated with a hook or the main
task.
Those will correspond to the runtime dependencies that were collected.
Dependencies that have not been collected, because they are not
available for example, will not appear in here. However it is
guaranteed that sections corresponding to each kind of runtime
dependencies will be present even if they are empty.
Parameters
----------
id: unicode
Id of the hook for which to retrieve the runtimes or 'main' for
the main task.
Returns
-------
dependencies : dict
Dependencies for the requested measurement component.
Raises
------
RuntimeError :
Raised if this method is called before collect_runtimes.
"""
if self._runtime_dependencies is None:
raise RuntimeError('Runtime dependencies must be collected '
'(calling collect_runtimes) before they can be '
'queried.')
valids = self._runtime_map.get(id)
if not valids:
return {}
deps = self._runtime_dependencies
queried = {}
for runtime_id, r_deps in valids.items():
if runtime_id in deps:
queried[runtime_id] = {k: deps[runtime_id][k] for k in r_deps}
else:
queried[runtime_id] = {}
return queried
[docs] def reset(self):
"""Cleanup all cached values.
"""
if self._runtime_dependencies:
raise RuntimeError('Cannot reset dependencies while holding '
'runtime dependencies')
self._build_analysis.clear()
self._build_dependencies = None
self._runtime_analysis.clear()
self._runtime_map.clear()
# =========================================================================
# --- Private API ---------------------------------------------------------
# =========================================================================
#: Cached build dependencies analysis for the main task.
#: No actual dependency is stored, this dict can be used to collect them
_build_analysis = Dict()
#: Cached build dependencies of the main task.
#: Contains the actual dependencies.
_build_dependencies = Value()
#: Cached runtime dependencies analysis for the main task and the hooks.
#: No actual dependency is stored, this dict can be used to collect them
_runtime_analysis = Typed(defaultdict, (set,))
#: Cached runtime dependencies of the main task and the hooks.
#: Contains the actual dependencies.
#: Set to None when dependencies have not been collected.
_runtime_dependencies = Typed(dict)
#: Mapping determining which component has which dependency.
_runtime_map = Dict()
def _analyse_task_runtime(self, task):
workbench = self.measurement.plugin.workbench
core = workbench.get_plugin('enaml.workbench.core')
# If the dependencies of the main task are not known
if not self._runtime_map.get('main'):
cmd = 'exopy.app.dependencies.analyse'
deps = core.invoke_command(cmd,
{'obj': task,
'dependencies': ['build', 'runtime']})
b_deps, r_deps = deps
msg = 'Failed to analyse main task %s dependencies.'
if b_deps.errors:
return False, msg % 'build', b_deps.errors
if r_deps.errors:
return False, msg % 'runtime', r_deps.errors
self._build_analysis = b_deps.dependencies
self._runtime_map['main'] = r_deps.dependencies
self._update_runtime_analysis(r_deps.dependencies)
return True, '', {}
def _collect_analysed_runtimes(self):
workbench = self.measurement.plugin.workbench
core = workbench.get_plugin('enaml.workbench.core')
cmd = 'exopy.app.dependencies.collect'
deps = core.invoke_command(cmd,
dict(dependencies=self._runtime_analysis,
owner='exopy.measurement',
kind='runtime'))
if deps.errors:
msg = 'Failed to collect some runtime dependencies.'
return False, msg, deps.errors
elif deps.unavailable:
msg = 'Some dependencies are currently unavailable.'
self._runtime_dependencies = deps.dependencies
return False, msg, deps.unavailable
self._runtime_dependencies = deps.dependencies
return True, '', {}
def _update_runtime_analysis(self, new):
"""Update the known runtime dependencies.
"""
analysis = self._runtime_analysis
for k in new:
analysis[k].update(new[k])
[docs]class Measurement(HasPrefAtom):
"""Object representing all the aspects of a measurement.
"""
#: Name of the measurement.
name = Str().tag(pref=True)
#: Id of that particular iteration of the measurement. This value is used
#: when saving the measurement before running it. It is also communicated
#: to the root task
id = Str().tag(pref=True)
#: Current measurement status.
status = Enum('READY', 'RUNNING', 'PAUSING', 'PAUSED', 'RESUMING',
'STOPPING', 'EDITING', 'SKIPPED', 'FAILED', 'COMPLETED',
'INTERRUPTED')
#: Detailed information about the measurement status.
infos = Str()
#: Path to the last file in which that measurement was saved.
path = Str()
#: Root task holding the measurement logic.
root_task = Typed(RootTask)
#: Dict of active monitor for this measurement.
monitors = Typed(OrderedDict, ())
#: Dict of pre-measurement execution routines.
pre_hooks = Typed(OrderedDict, ())
#: Dict of post-measurement execution routines.
post_hooks = Typed(OrderedDict, ())
#: Reference to the measurement plugin managing this measurement.
plugin = ForwardTyped(measurement_plugin)
#: Flag signaling whether the user chose to enqueue the measurement knowing
#: some tests are failing.
forced_enqueued = Bool()
#: Object handling the collection and access to the measurement
#: dependencies.
dependencies = Typed(MeasurementDependencies)
#: Result object returned by the engine when the root_task has been
#: executed. Can be used by post-execution hook to adapt their behavior.
task_execution_result = Value()
def __init__(self, **kwargs):
super(Measurement, self).__init__(**kwargs)
self.add_tool('pre-hook', 'exopy.internal_checks')
[docs] def save(self, path):
"""Save the measurement as a ConfigObj object.
Parameters
----------
path : unicode
Path of the file to which save the measurement.
"""
config = ConfigObj(indent_type=' ', encoding='utf-8')
config.update(self.preferences_from_members())
# First save the task.
core = self.plugin.workbench.get_plugin(u'enaml.workbench.core')
cmd = u'exopy.tasks.save'
task_prefs = core.invoke_command(cmd, {'task': self.root_task,
'mode': 'config'}, self)
config['root_task'] = {}
include_configobj(config['root_task'], task_prefs)
# Save the state of each monitor, pre-hook, post-hook.
for kind in ('monitors', 'pre_hooks', 'post_hooks'):
config[kind] = {}
for id, obj in getattr(self, kind).items():
state = obj.get_state()
config[kind][id] = {}
include_configobj(config[kind][id], state)
with open(path, 'wb') as f:
config.write(f)
self.path = path
[docs] @classmethod
def load(cls, measurement_plugin, path, build_dep=None):
"""Build a measurement from a ConfigObj file.
Parameters
----------
measurement_plugin : MeasurementPlugin
Instance of the MeasurementPlugin storing all declarations.
path : unicode
Path of the file from which to load the measurement.
build_dep : dict, optional
Build dependencies of the main task.
Returns
-------
measurement : Measurement | None
Measurement buil from the config or None if and error occurred.
errors : dict
Dictionary describing the errors that occured.
"""
# Create the measurement.
measurement = cls(plugin=measurement_plugin)
config = ConfigObj(path, encoding='utf-8')
measurement.path = path
measurement.update_members_from_preferences(config)
# Return values storing the errors details.
errors = defaultdict(dict)
# Get the workbench and core plugin.
workbench = measurement_plugin.workbench
core = workbench.get_plugin('enaml.workbench.core')
# Load the task.
cmd = 'exopy.tasks.build_root'
build_dep = build_dep if build_dep else workbench
kwarg = {'mode': 'from config', 'config': config['root_task'],
'build_dep': build_dep}
try:
measurement.root_task = core.invoke_command(cmd, kwarg,
measurement)
except Exception:
msg = 'Building %s, failed to restore task : %s'
errors['main task'] = msg % (config.get('name'), format_exc())
return None, errors
for kind in ('monitor', 'pre-hook', 'post-hook'):
saved = config.get(kind.replace('-', '_')+'s', {})
# Make sure we always have the internal pre-hook in the right
# position.
if kind == 'pre-hook':
if 'exopy.internal_checks' in saved:
del measurement.pre_hooks['exopy.internal_checks']
for id, state in saved.items():
try:
obj = measurement_plugin.create(kind, id, default=False)
obj.set_state(state)
except Exception:
msg = 'Failed to restore {} {}: {}'.format(kind, id,
format_exc())
errors[kind][id] = msg
continue
measurement.add_tool(kind, id, obj)
measurement.name = config.get('name', '')
if errors:
measurement = None
return measurement, errors
[docs] def run_checks(self, **kwargs):
"""Run all measurement checks.
This is done at enqueueing time and before actually executing a
measurement save it it was forcibly enqueued. The dependencies needs to
be collected before calling this method.
Parameters
----------
**kwargs :
Keyword arguments to pass to the pre-operations.
Returns
-------
result : bool
Boolean indicating whether or not the operations succeeded.
report : dict
Dict storing the errors (as dict) by id of the operation in which
they occured.
"""
result = True
full_report = {}
workbench = self.plugin.workbench
self._write_infos_in_task()
self.root_task.run_time = \
self.dependencies.get_runtime_dependencies('main')
msg = 'Running checks for pre-measurement hook %s for measurement %s'
for id, hook in self.pre_hooks.items():
LOGGER.debug(msg, id, self.name)
answer = hook.check(workbench, **kwargs)
if answer is not None:
check, errors = answer
if errors:
full_report[id] = errors
result = result and check
msg = 'Running checks for post-measurement hook %s for measurement %s'
for id, hook in self.post_hooks.items():
LOGGER.debug(msg, id, self.name)
answer = hook.check(workbench, **kwargs)
if answer is not None:
check, errors = answer
if errors:
full_report[id] = errors
result = result and check
self.root_task.run_time.clear()
return result, full_report
[docs] def enter_edition_state(self):
"""Make the the measurement ready to be edited
"""
database = self.root_task.database
for monitor in self.monitors.values():
test = database.has_observer
if not test('notifier', monitor.handle_database_entries_change):
database.observe('notifier',
monitor.handle_database_entries_change)
if not test('nodes_notifier',
monitor.handle_database_nodes_change):
database.observe('nodes_notifier',
monitor.handle_database_nodes_change)
[docs] def enter_running_state(self):
"""Make the measurement ready to run.
"""
database = self.root_task.database
for monitor in self.monitors.values():
if database.has_observer('notifier',
monitor.handle_database_entries_change):
database.unobserve('notifier',
monitor.handle_database_entries_change)
if database.has_observer('nodes_notifier',
monitor.handle_database_nodes_change):
database.unobserve('nodes_notifier',
monitor.handle_database_nodes_change)
[docs] def collect_monitored_entries(self):
"""Get all the entries the monitors ask to be notified about.
Returns
-------
entries : list
List of the entries the engine will to observe.
"""
entries = []
for monitor in self.monitors.values():
entries.extend(monitor.monitored_entries)
return list(set(entries))
# =========================================================================
# --- Private API ---------------------------------------------------------
# =========================================================================
#: Dictionary storing the collected runtime dependencies.
_runtimes = Dict()
def _write_infos_in_task(self):
"""Write all the measurement values in the root_task database.
"""
self.root_task.write_in_database('meas_name', self.name)
self.root_task.write_in_database('meas_id', self.id)
self.root_task.write_in_database('meas_date', str(date.today()))
self.root_task.write_in_database('meas_time', datetime.now().time()
.strftime("%H-%M-%S"))
def _post_setattr_root_task(self, old, new):
"""Add the entries contributed by the measurement to the task database.
"""
entries = new.database_entries.copy()
entries.update({'meas_name': self.name, 'meas_id': self.id,
'meas_date': '', 'meas_time': ''})
new.database_entries = entries
def _default_dependencies(self):
"""Default value for the dependencies member.
"""
return MeasurementDependencies(measurement=self)