Source code for exopy.measurement.workspace.measurement_tracking

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Thread-loke object keeping track of the last edited measurement.

"""
from __future__ import (division, unicode_literals, print_function,
                        absolute_import)

from time import sleep
from threading import Thread, Lock, Event

import enaml
from atom.api import Atom, Value, List, Typed

from ..measurement import Measurement

with enaml.imports():
    from .measurement_edition import MeasurementEditorDockItem
    from .tools_edition import ToolsEditorDockItem


MEASURE_PARENTS = (MeasurementEditorDockItem, ToolsEditorDockItem)


[docs]class MeasurementTracker(Atom): """Object responsible for tracking the currently edited measurement. The tracking relies on the last focus that got focus. """
[docs] def start(self, measurement): """Start the working thread. """ self._selected = measurement self._should_stop.clear() self._buffer_empty.set() self._thread = Thread(target=self.run) self._thread.daemon = True self._thread.start()
[docs] def stop(self): """Stop the working thread. """ self._should_stop.set() self._queue_not_empty.set() # So that the working thread stop waiting self._thread.join()
[docs] def enqueue(self, widget): """Enqueue a newly selected widget. """ with self._lock: self._queue.append(widget) self._queue_not_empty.set()
[docs] def run(self): """Method called by the working thread. """ while True: self._queue_not_empty.wait() if self._should_stop.is_set(): break with self._lock: self._buffer_empty.clear() self._buffer.extend(self._queue[::-1]) self._queue = [] self._queue_not_empty.clear() for w in self._buffer[::-1]: selected = None for p in w.traverse_ancestors(): if isinstance(p, MEASURE_PARENTS): selected = p.measurement break if selected is not None: self._selected = selected break if (self._queue_not_empty.is_set() or self._should_stop.is_set()): break self._buffer = [] self._buffer_empty.set()
[docs] def get_selected_measurement(self): """Get the currently selected measurement. The measurement is returned only when thread in done processing the enqueued widgets. """ while True: self._buffer_empty.wait() if self._queue_not_empty.is_set(): # Cannot be tested in a perfectly deterministic way sleep(0.05) # pragma: no cover else: break return self._selected
[docs] def set_selected_measurement(self, measurement): """Set the currently selected measurement. This is used when the selected measurement does not result from focusing. """ while True: self._buffer_empty.wait() if self._queue_not_empty.is_set(): # Cannot be tested in a perfectly deterministic way sleep(0.05) # pragma: no cover else: break self._selected = measurement
# --- Private API --------------------------------------------------------- #: Background thread processing the last selection. _thread = Value() #: Lock ensuring the thread-safety when modifying the queue. _lock = Value(factory=Lock) #: Widgets that got focus and that have not yet been analysed. _queue = List() #: Widgets that got focus and that can be discarded if a selected #: measurement is found. _buffer = List() #: Event used to signal the working thread it should stop. _should_stop = Value(factory=Event) #: Event used to signal the working thread some widgets are waiting to be #: analysed _queue_not_empty = Value(factory=Event) #: Event signaling that the thread has processed its current buffer. _buffer_empty = Value(factory=Event) #: Current answer to the get_selected_measurement query. _selected = Typed(Measurement)