Source code for exopy.testing.measurement.dummies

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Dummy engines, editors and measurement tools used for testing.

Those are contributed by the manifest found in contributions.enaml

"""
from time import sleep
from threading import Event

import enaml
from atom.api import Atom, Bool, List, Value, set_default

from exopy.app.dependencies.plugin import RuntimeContainer
from exopy.measurement.editors.api import BaseEditor
from exopy.measurement.hooks.api import (BasePreExecutionHook,
                                         BasePostExecutionHook)
from exopy.measurement.engines.api import BaseEngine
from exopy.measurement.monitors.api import BaseMonitor


[docs]class DummyEditor(BaseEditor): """Dummy editor used for testing. """ pass
[docs]class DummyEngine(BaseEngine): """Dummy engine used for testing. """ fail_perform = Bool() waiting = Value(factory=Event) go_on = Value(factory=Event) should_pause = Bool() accept_pause = Bool(True) should_resume = Bool() measurement_force_enqueued = Bool() signal_resuming = Value(factory=Event) go_on_resuming = Value(factory=Event) signal_resumed = Value(factory=Event) go_on_resumed = Value(factory=Event) _stop = Bool()
[docs] def perform(self, exec_infos): """Simply return the exec_infos. """ self.measurement_force_enqueued = not exec_infos.checks self.waiting.set() self.progress(('test', True)) self.go_on.wait() if self.accept_pause and self.should_pause: self.status = 'Pausing' sleep(0.001) self.status = 'Paused' while True: if self.should_resume: self.signal_resuming.set() self.status = 'Resuming' self.go_on_resuming.wait() self.status = 'Running' break sleep(0.001) self.signal_resumed.set() self.go_on_resumed.wait() if self._stop: return exec_infos self.waiting.clear() self.go_on.clear() exec_infos.success = False if self.fail_perform else True return exec_infos
[docs] def pause(self): self.should_pause = True
[docs] def resume(self): self.should_resume = True
[docs] def stop(self, force=False): """Stop the execution. """ self._stop = True
[docs] def shutdown(self, force=False): if force: self.status = 'Stopped'
[docs]class DummyHook(Atom): """Base class for dummy mesure hook used for testing. """ fail_check = Bool().tag(pref=True) fail_run = Bool() should_pause = Bool() accept_pause = Bool(True) should_resume = Bool() stop_called = Bool() waiting = Value(factory=Event) go_on = Value(factory=Event) signal_resuming = Value(factory=Event) go_on_resuming = Value(factory=Event) signal_resumed = Value(factory=Event) go_on_resumed = Value(factory=Event)
[docs] def run(self, workbench, engine): """Run method esecuting the hook. """ self.waiting.set() self.go_on.wait() if self.fail_run: raise RuntimeError() if self.accept_pause and self.should_pause: self.paused = True while True: sleep(0.001) if self.should_resume: self.signal_resuming.set() self.go_on_resuming.wait() self.resumed = True break self.signal_resumed.set() self.go_on_resumed.wait() self.waiting.clear() self.go_on.clear()
[docs] def pause(self): """Method to call to pause execution. """ self.should_pause = True
[docs] def resume(self): """Method to call to resume execution. """ self.should_resume = True
[docs] def stop(self, force=False): """Method to call to stop execution. """ self.stop_called = True
[docs]class DummyPreHook(DummyHook, BasePreExecutionHook): """Dummy pre-execution hook used for testing. """
[docs] def check(self, workbench, **kwargs): """Fail the check if the fail_check member is set or 'fail' is found in the kwargs. """ if self.measurement.dependencies._runtime_map.get('main'): assert self.measurement.root_task.run_time if self.fail_check or 'fail' in kwargs: return False, 'pre' return True, ''
[docs] def list_runtimes(self, workbench): """Say that dummy is a dependency. """ with enaml.imports(): from .contributions import Flags deps = RuntimeContainer() if Flags.RUNTIME2_FAIL_ANALYSE: deps.errors[self.declaration.id] = 'rr' else: deps.dependencies = {'dummy2': set(['test'])} return deps
[docs]class DummyMonitor(BaseMonitor): """Dummy monitor used for testing. """ running = Bool() monitored_entries = set_default(['default_path']) received_news = List()
[docs] def start(self): self.running = True
[docs] def stop(self): self.running = False
[docs] def refresh_monitored_entries(self, entries=None): """Do nothing when refreshing. """ pass
[docs] def handle_database_entries_change(self, news): """Add all entries to the monitored ones. """ if news[0] == 'added': self.monitored_entries = self.monitored_entries + [news[1]]
[docs] def handle_database_nodes_change(self, news): """Simply ignore nodes updates. """ pass
[docs] def process_news(self, news): self.received_news.append(news)
[docs]class DummyPostHook(DummyHook, BasePostExecutionHook): """Dummy post execution hook used for testing. """
[docs] def check(self, workbench, **kwargs): """Fail the check if the fail_check member is set or 'fail' is found in the kwargs. """ if self.fail_check or 'fail_post' in kwargs: return False, 'post'
# Check that Measurement.run_checks can handle a None retur value