# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Dummy engines, editors and measurement tools used for testing.
Those are contributed by the manifest found in contributions.enaml
"""
from time import sleep
from threading import Event
import enaml
from atom.api import Atom, Bool, List, Value, set_default
from exopy.app.dependencies.plugin import RuntimeContainer
from exopy.measurement.editors.api import BaseEditor
from exopy.measurement.hooks.api import (BasePreExecutionHook,
BasePostExecutionHook)
from exopy.measurement.engines.api import BaseEngine
from exopy.measurement.monitors.api import BaseMonitor
[docs]class DummyEditor(BaseEditor):
"""Dummy editor used for testing.
"""
pass
[docs]class DummyEngine(BaseEngine):
"""Dummy engine used for testing.
"""
fail_perform = Bool()
waiting = Value(factory=Event)
go_on = Value(factory=Event)
should_pause = Bool()
accept_pause = Bool(True)
should_resume = Bool()
measurement_force_enqueued = Bool()
signal_resuming = Value(factory=Event)
go_on_resuming = Value(factory=Event)
signal_resumed = Value(factory=Event)
go_on_resumed = Value(factory=Event)
_stop = Bool()
[docs] def pause(self):
self.should_pause = True
[docs] def resume(self):
self.should_resume = True
[docs] def stop(self, force=False):
"""Stop the execution.
"""
self._stop = True
[docs] def shutdown(self, force=False):
if force:
self.status = 'Stopped'
[docs]class DummyHook(Atom):
"""Base class for dummy mesure hook used for testing.
"""
fail_check = Bool().tag(pref=True)
fail_run = Bool()
should_pause = Bool()
accept_pause = Bool(True)
should_resume = Bool()
stop_called = Bool()
waiting = Value(factory=Event)
go_on = Value(factory=Event)
signal_resuming = Value(factory=Event)
go_on_resuming = Value(factory=Event)
signal_resumed = Value(factory=Event)
go_on_resumed = Value(factory=Event)
[docs] def run(self, workbench, engine):
"""Run method esecuting the hook.
"""
self.waiting.set()
self.go_on.wait()
if self.fail_run:
raise RuntimeError()
if self.accept_pause and self.should_pause:
self.paused = True
while True:
sleep(0.001)
if self.should_resume:
self.signal_resuming.set()
self.go_on_resuming.wait()
self.resumed = True
break
self.signal_resumed.set()
self.go_on_resumed.wait()
self.waiting.clear()
self.go_on.clear()
[docs] def pause(self):
"""Method to call to pause execution.
"""
self.should_pause = True
[docs] def resume(self):
"""Method to call to resume execution.
"""
self.should_resume = True
[docs] def stop(self, force=False):
"""Method to call to stop execution.
"""
self.stop_called = True
[docs]class DummyPreHook(DummyHook, BasePreExecutionHook):
"""Dummy pre-execution hook used for testing.
"""
[docs] def check(self, workbench, **kwargs):
"""Fail the check if the fail_check member is set or 'fail' is found in
the kwargs.
"""
if self.measurement.dependencies._runtime_map.get('main'):
assert self.measurement.root_task.run_time
if self.fail_check or 'fail' in kwargs:
return False, 'pre'
return True, ''
[docs] def list_runtimes(self, workbench):
"""Say that dummy is a dependency.
"""
with enaml.imports():
from .contributions import Flags
deps = RuntimeContainer()
if Flags.RUNTIME2_FAIL_ANALYSE:
deps.errors[self.declaration.id] = 'rr'
else:
deps.dependencies = {'dummy2': set(['test'])}
return deps
[docs]class DummyMonitor(BaseMonitor):
"""Dummy monitor used for testing.
"""
running = Bool()
monitored_entries = set_default(['default_path'])
received_news = List()
[docs] def start(self):
self.running = True
[docs] def stop(self):
self.running = False
[docs] def refresh_monitored_entries(self, entries=None):
"""Do nothing when refreshing.
"""
pass
[docs] def handle_database_entries_change(self, news):
"""Add all entries to the monitored ones.
"""
if news[0] == 'added':
self.monitored_entries = self.monitored_entries + [news[1]]
[docs] def handle_database_nodes_change(self, news):
"""Simply ignore nodes updates.
"""
pass
[docs] def process_news(self, news):
self.received_news.append(news)
[docs]class DummyPostHook(DummyHook, BasePostExecutionHook):
"""Dummy post execution hook used for testing.
"""
[docs] def check(self, workbench, **kwargs):
"""Fail the check if the fail_check member is set or 'fail' is found in
the kwargs.
"""
if self.fail_check or 'fail_post' in kwargs:
return False, 'post'
# Check that Measurement.run_checks can handle a None retur value