# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Generic utility functions for testing.
"""
import os
import gc
import weakref
import inspect
from contextlib import contextmanager
from pprint import pformat
import enaml
from configobj import ConfigObj
from atom.api import Atom, Bool
from enaml.application import timed_call
from enaml.widgets.api import Window, Dialog, PopupView
with enaml.imports():
from enaml.stdlib.message_box import MessageBox
APP_PREFERENCES = os.path.join('app', 'preferences')
APP_DIR_CONFIG = 'app_directory.ini'
[docs]def exopy_path():
"""Get the exopy path as determined by the sys_path fixture.
"""
from .fixtures import EXOPY
assert EXOPY
return EXOPY
[docs]def run_pending_tasks(qtbot, timeout=1000):
"""Run all enaml pending tasks.
WARNING: this may not run the Qt event loop if no task is pending.
This will only deal with tasks schedule through the schedule function
(or Application method)
Parameters
----------
timeout : int, optional
Timeout after which the operation should fail in ms
"""
def check_pending_tasks():
"""Check for pending task on the application.
"""
assert not qtbot.enaml_app.has_pending_tasks()
qtbot.wait_until(check_pending_tasks)
[docs]def get_window(qtbot, cls=Window, timeout=1000):
"""Convenience function running the event loop and returning the first
window found in the set of active windows.
Parameters
----------
cls : type, optional
Type of the window which should be returned.
timeout : int, optional
Timeout after which the operation should fail in ms
Returns
-------
window : Window or None
Return the first window found matching the specified class
Raises
------
AssertionError : raised if no window is found in the given time
"""
def check_window_presence():
"""Try to find a window of the proper type.
"""
assert [w for w in Window.windows if isinstance(w, cls)]
qtbot.wait_until(check_window_presence)
for w in Window.windows:
if isinstance(w, cls):
return w
[docs]def wait_for_window_displayed(qtbot, window, timeout=1000):
"""Wait for a window to be displayed.
This method should be called on already activated windows (the show method
should have been called).
"""
if not window.proxy_is_active or not window.proxy.widget:
msg = 'Window must be activated before waiting for display'
raise RuntimeError(msg)
qtbot.wait_exposed(window.proxy.widget)
[docs]class EventObserver(Atom):
"""Simple observer registering the fact it was called once.
"""
called = Bool()
[docs] def callback(self, change):
"""Callback registering it was called at least once.
"""
self.called = True
[docs] def assert_called(self):
"""Check whether the callback was called.
"""
assert self.called
[docs]def wait_for_destruction(qtbot, widget):
"""Wait for a widget to get destroyed.
"""
if widget.is_destroyed:
return
obs = EventObserver()
widget.observe('destroyed', obs.callback)
qtbot.wait_until(obs.assert_called)
[docs]@contextmanager
def close_all_windows(qtbot):
"""Close all opened windows.
"""
yield
run_pending_tasks(qtbot)
while Window.windows:
windows = list(Window.windows)
# First close non top level windows to avoid a window to lose its
# parent and not remove itself from the set of windows.
non_top_level_windows = [w for w in windows if w.parent is not None]
for window in non_top_level_windows:
close_window_or_popup(qtbot, window)
for window in windows:
close_window_or_popup(qtbot, window)
[docs]class ScheduledClosing(object):
"""Scheduled closing of dialog.
"""
def __init__(self, bot, cls, handler, op, skip_answer):
self.cls = cls
self.handler = handler
self.op = op
self.bot = bot
self.skip_answer = skip_answer
self.called = False
def __call__(self):
self.called = True
from .fixtures import DIALOG_SLEEP
dial = get_window(self.bot, cls=self.cls)
wait_for_window_displayed(self.bot, dial)
self.bot.wait(DIALOG_SLEEP)
obs = EventObserver()
dial.observe('finished', obs.callback)
try:
self.handler(self.bot, dial)
finally:
if not self.skip_answer:
getattr(dial, self.op)()
self.bot.wait_until(obs.assert_called, timeout=10e3)
[docs] def was_called(self):
"""Assert the scheduler was called.
"""
assert self.called
[docs]@contextmanager
def handle_dialog(qtbot, op='accept', handler=lambda qtbot, window: window,
cls=Dialog, time=100, skip_answer=False):
"""Automatically close a dialog opened during the context.
Parameters
----------
op : {'accept', 'reject'}, optional
Whether to accept or reject the dialog.
handler : callable, optional
Callable taking as arguments the bot and the dialog, called before
accepting or rejecting the dialog.
cls : type, optional
Dialog class to identify.
time : float, optional
Time to wait before handling the dialog in ms.
skip_answer : bool, optional
Skip answering to the dialog. If this is True the handler should handle
the answer itself.
"""
sch = ScheduledClosing(qtbot, cls, handler, op, skip_answer)
timed_call(time, sch)
try:
yield
except Exception:
raise
else:
qtbot.wait_until(sch.was_called, 10e3)
[docs]@contextmanager
def handle_question(qtbot, answer):
"""Handle question dialog.
"""
def answer_question(qtbot, dial):
"""Mark the right button as clicked.
"""
if answer in ('yes', 'no'):
dial.buttons[0 if answer == 'yes' else 1].was_clicked = True
with handle_dialog(qtbot, 'accept' if answer == 'yes' else 'reject',
handler=answer_question, cls=MessageBox):
yield
[docs]def set_preferences(workbench, preferences):
"""Set the preferences stored in the preference plugin.
This function must be called before accessing any plugin relying on those
values.
Parameters
----------
workbench :
Application workbench.
preferences : dict
Dictionary describing the preferences.
"""
plugin = workbench.get_plugin('exopy.app.preferences')
plugin._prefs = ConfigObj(preferences, indent_type=' ',
encoding='utf-8')
[docs]class ErrorDialogException(Exception):
"""Error raised when patching the error plugin to raise rather than show a
dialog when exiting error gathering.
"""
pass
[docs]@contextmanager
def signal_error_raise():
"""Make the error plugin raise an exception when signaling.
"""
from exopy.app.errors.plugin import ErrorsPlugin
func = ErrorsPlugin.signal
def raise_for_signal(self, kind, **kwargs):
"""Raise an easy to identify error.
"""
raise ErrorDialogException()
ErrorsPlugin.signal = raise_for_signal
try:
yield
finally:
ErrorsPlugin.signal = func
[docs]def exit_on_err(self):
"""Replacement function for exopy.app.errors plugin exit_error_gathering.
This function will raise instead of displaying a dialog. Useful to catch
unexpected errors.
Should be used in conjunction with the monkeypatch fixture.
"""
self._gathering_counter -= 1
if self._gathering_counter < 1:
self._gathering_counter = 0
if self._delayed:
msg = 'Unexpected exceptions occured :\n'
raise ErrorDialogException(msg + pformat(self._delayed))
[docs]class CallSpy(object):
"""Object simply monitoring how many times it gets called.
"""
__slots__ = ('called', 'args', 'kwargs')
def __init__(self):
self.called = 0
def __call__(self, *args, **kwargs):
self.called += 1
self.args = args
self.kwargs = kwargs
[docs]class ObjectTracker(object):
"""Object tracking instance of a given class exists.
It works by patching __new__ and keeping a WeakSet of the created objects.
So instances created before creating the tracker are not tracked.
If the objects are not weak referenceable you should set has_weakref to
False. By default objects inheriting from Atom are not weak referenceable.
It provides way to list the object referring alive objects to help tracking
ref leaks.
"""
def __init__(self, cls, has_weakref=True):
self._refs = weakref.WeakSet()
# Create a weak referenceable subclass if necessary
if not has_weakref:
class __weakref_cls__(cls):
__slots__ = ('__weakref__',)
else:
__weakref_cls__ = cls
def override_new(original_cls, *args, **kwargs):
"""Function replacing new allowing to track instances.
"""
new = __weakref_cls__.__old_new__(__weakref_cls__, *args, **kwargs)
self._refs.add(new)
return new
__weakref_cls__.__old_new__ = cls.__new__
cls.__new__ = override_new
__weakref_cls__.original_cls = cls
self.cls = __weakref_cls__
[docs] def stop_tracking(self):
"""Use to properly remove tracking.
"""
self.cls.original_cls.__new__ = self.cls.__old_new__
@property
def alive_instances(self):
"""Currently alive instances of the tracked objects.
"""
gc.collect()
return self._refs
[docs] def list_referrers(self, exclude=[], depth=0):
"""List all the referrers of the tracked objects.
Can exlude some objects and go to deeper levels (referrers of the
referrers) in which case reference to the first object are filtered.
References held by frames are also filtered
This function is mostly useful when tracking why an object that is
expected to be released is not.
"""
gc.collect()
def find_referrers(ref_dict, obj, depth=0, exclude=[]):
"""Find the object referring the specified object.
"""
referrers = [ref for ref in gc.get_referrers(obj)
if not (inspect.isframe(ref) or
ref in exclude)]
if depth == 0:
ref_dict[obj] = referrers
else:
deeper = {}
for r in referrers:
find_referrers(deeper, r, depth-1,
exclude + [obj, referrers])
ref_dict[obj] = deeper
tracked = {}
objs = [r for r in self._refs if r not in exclude]
for r in objs:
find_referrers(tracked, r, depth, [objs])
return tracked