# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Useful tools for engines.
"""
import logging
from threading import Thread
from queue import Empty
from multiprocessing.queues import Queue
from pickle import dumps
from atom.api import Atom, Coerced, Typed
from ...utils.traceback import format_exc
from ...tasks.tasks.database import TaskDatabase
[docs]class MeasureSpy(Atom):
"""Spy observing a task database and sending values update into a queue.
All updates are sent immediatly as no issues have been detected so far.
Using a timer based implementation would complicate things.
"""
#: Set of entries for which to send notifications.
observed_entries = Coerced(set)
#: Reference to the database that needs to be observed.
observed_database = Typed(TaskDatabase)
#: Queue in which to send the updates.
queue = Typed(Queue)
def __init__(self, queue, observed_entries, observed_database):
super(MeasureSpy, self).__init__(queue=queue,
observed_database=observed_database,
observed_entries=observed_entries)
self.observed_database.observe('notifier', self.enqueue_update)
[docs] def enqueue_update(self, change):
"""Put an update in the queue.
Notes
-----
Change is a tuple as this is connected to a Signal.
"""
if change[0] in self.observed_entries:
try:
# Ensure pickling is ok at the cost of a small overhead
dumps(change)
self.queue.put(change)
except Exception:
logger = logging.getLogger(__name__)
logger.error('Failed to enqueue %s :\n%s' % (change,
format_exc()))
[docs] def close(self):
"""Put a dummy object signaling that no more updates will be sent.
"""
self.queue.put(('', ''))
[docs]class ThreadMeasureMonitor(Thread):
"""Thread sending a queue content to the news signal of an engine.
"""
def __init__(self, engine, queue):
super(ThreadMeasureMonitor, self).__init__()
self.queue = queue
self.engine = engine
[docs] def run(self):
"""Send the news received from the queue to the engine news signal.
"""
while True:
try:
news = self.queue.get()
if news not in [(None, None), ('', '')]:
# Here progress is a Signal not an Event hence the syntax.
self.engine.progress(news)
elif news == ('', ''):
logger = logging.getLogger(__name__)
logger.debug('Spy closed')
else:
break
except Empty: # pragma: no cover
continue
except Exception:
logger = logging.getLogger(__name__)
logger.error('Failed to received enqueued object :\n' +
format_exc())