# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Useful tools for engines.
"""
from __future__ import (division, unicode_literals, print_function,
absolute_import)
import logging
from threading import Thread
from queue import Empty # This is allowed thanks to the future package
from multiprocessing.queues import Queue
from pickle import dumps
from atom.api import Atom, Coerced, Typed
from ...utils.traceback import format_exc
from ...tasks.tasks.database import TaskDatabase
[docs]class MeasureSpy(Atom):
"""Spy observing a task database and sending values update into a queue.
All updates are sent immediatly as no issues have been detected so far.
Using a timer based implementation would complicate things.
"""
#: Set of entries for which to send notifications.
observed_entries = Coerced(set)
#: Reference to the database that needs to be observed.
observed_database = Typed(TaskDatabase)
#: Queue in which to send the updates.
queue = Typed(Queue)
def __init__(self, queue, observed_entries, observed_database):
super(MeasureSpy, self).__init__(queue=queue,
observed_database=observed_database,
observed_entries=observed_entries)
self.observed_database.observe('notifier', self.enqueue_update)
[docs] def enqueue_update(self, change):
"""Put an update in the queue.
Notes
-----
Change is a tuple as this is connected to a Signal.
"""
if change[0] in self.observed_entries:
try:
# Ensure pickling is ok at the cost of a small overhead
dumps(change)
self.queue.put(change)
except Exception:
logger = logging.getLogger(__name__)
logger.error('Failed to enqueue %s :\n%s' % (change,
format_exc()))
[docs] def close(self):
"""Put a dummy object signaling that no more updates will be sent.
"""
self.queue.put(('', ''))
[docs]class ThreadMeasureMonitor(Thread):
"""Thread sending a queue content to the news signal of an engine.
"""
def __init__(self, engine, queue):
super(ThreadMeasureMonitor, self).__init__()
self.queue = queue
self.engine = engine
[docs] def run(self):
"""Send the news received from the queue to the engine news signal.
"""
while True:
try:
news = self.queue.get()
if news not in [(None, None), ('', '')]:
# Here news is a Signal not Event hence the syntax.
self.engine.progress(news)
elif news == ('', ''):
logger = logging.getLogger(__name__)
logger.debug('Spy closed')
else:
break
except Empty: # pragma: no cover
continue
except Exception:
logger = logging.getLogger(__name__)
logger.error('Failed to received enqueued object :\n' +
format_exc())