Source code for exopy.measurement.engines.utils

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Useful tools for engines.

"""
import logging
from threading import Thread
from queue import Empty
from multiprocessing.queues import Queue
from pickle import dumps

from atom.api import Atom, Coerced, Typed

from ...utils.traceback import format_exc
from ...tasks.tasks.database import TaskDatabase


[docs]class MeasureSpy(Atom): """Spy observing a task database and sending values update into a queue. All updates are sent immediatly as no issues have been detected so far. Using a timer based implementation would complicate things. """ #: Set of entries for which to send notifications. observed_entries = Coerced(set) #: Reference to the database that needs to be observed. observed_database = Typed(TaskDatabase) #: Queue in which to send the updates. queue = Typed(Queue) def __init__(self, queue, observed_entries, observed_database): super(MeasureSpy, self).__init__(queue=queue, observed_database=observed_database, observed_entries=observed_entries) self.observed_database.observe('notifier', self.enqueue_update)
[docs] def enqueue_update(self, change): """Put an update in the queue. Notes ----- Change is a tuple as this is connected to a Signal. """ if change[0] in self.observed_entries: try: # Ensure pickling is ok at the cost of a small overhead dumps(change) self.queue.put(change) except Exception: logger = logging.getLogger(__name__) logger.error('Failed to enqueue %s :\n%s' % (change, format_exc()))
[docs] def close(self): """Put a dummy object signaling that no more updates will be sent. """ self.queue.put(('', ''))
[docs]class ThreadMeasureMonitor(Thread): """Thread sending a queue content to the news signal of an engine. """ def __init__(self, engine, queue): super(ThreadMeasureMonitor, self).__init__() self.queue = queue self.engine = engine
[docs] def run(self): """Send the news received from the queue to the engine news signal. """ while True: try: news = self.queue.get() if news not in [(None, None), ('', '')]: # Here progress is a Signal not an Event hence the syntax. self.engine.progress(news) elif news == ('', ''): logger = logging.getLogger(__name__) logger.debug('Spy closed') else: break except Empty: # pragma: no cover continue except Exception: logger = logging.getLogger(__name__) logger.error('Failed to received enqueued object :\n' + format_exc())