Source code for exopy.tasks.tasks.shared_resources

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Thread safe object to use in tasks.

"""
from __future__ import (division, unicode_literals, print_function,
                        absolute_import)

import logging
from contextlib import contextmanager
from collections import defaultdict
from threading import RLock, Lock

from atom.api import Atom, Instance, Value, Int, set_default


[docs]class SharedCounter(Atom): """ Thread-safe counter object. """ #: Current count of the counter. User should not manipulate this directly. count = Int()
[docs] def increment(self): """Increment the counter by one. """ with self._lock: self.count += 1
[docs] def decrement(self): """Decrement the counter by one. """ with self._lock: self.count += -1
#: Simple lock to ensure the thread safety of operations. _lock = Value(factory=Lock)
[docs]class SharedDict(Atom): """ Dict wrapper using a lock to protect access to its values. Parameters ---------- default : callable, optional Callable to use as argument for defaultdict, if unspecified a regular dict is used. """ def __init__(self, default=None, **kwargs): super(SharedDict, self).__init__(**kwargs) if default is not None: self._dict = defaultdict(default) else: self._dict = {}
[docs] @contextmanager def safe_access(self, key): """Context manager to safely manipulate a value of the dict. """ lock = self._lock lock.acquire() yield self._dict[key] lock.release()
[docs] @contextmanager def locked(self): """Acquire the instance lock. """ self._lock.acquire() yield self self._lock.release()
[docs] def get(self, key, default=None): """Equivalent of dict.get but lock protected. """ with self._lock: aux = self._dict.get(key, default) return aux
[docs] def items(self): """Equivalent of dict.items but lock protected. """ with self.locked(): for item in self._dict.items(): yield item
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= #: Underlying dict. _dict = Instance((dict, defaultdict)) #: Re-entrant lock use to secure the access to the dict. _lock = Value(factory=RLock) def __getitem__(self, key): with self.locked(): aux = self._dict[key] return aux def __setitem__(self, key, value): with self._lock: self._dict[key] = value def __delitem__(self, key): with self._lock: del self._dict[key] def __contains__(self, key): return key in self._dict def __iter__(self): return iter(self._dict) def __len__(self): return len(self._dict)
[docs]class ResourceHolder(SharedDict): """Base class for storing resources and handling releases and restting. """ #: Priority determining in which order resources will be released. #: Smallest values will be released earlier. priority = Int(100)
[docs] def release(self): """Release the resources held by this container. This method should be safe to call on already released resources. """ raise NotImplementedError()
[docs] def reset(self): """Reset the resources. This is different from releasing. This method is typically called when resuming a measurement to ensure that the state of the resources can be trusted inspite of the interruption. """ pass
[docs]class ThreadPoolResource(ResourceHolder): """Resource holder specialized to handle threads grouped in pools. """ # Should always be released first. As execution may not yet be complete. priority = set_default(-1) def __init__(self, default=list, **kwargs): super(ThreadPoolResource, self).__init__(default, **kwargs)
[docs] def release(self): """Join all the threads still alive. """ while True: threads = [] bugged = [] with self.locked(): # Get all the dispatcher we should be waiting upon. for pool in self: threads.extend(self[pool]) # If there is none break. if not any(threads): break # Else stop them. for dispatcher in threads: try: dispatcher.stop() except Exception: log = logging.getLogger(__name__) mes = 'Failed to join thread %s from pool %s' log.exception(mes, dispatcher, pool) bugged.append(dispatcher) # Make sure nobody modify the pools and update them by removing # the references to the dead threads. with self.locked(): for p in self: self[p] = [d for d in self[p] if d not in bugged and not d.inactive.is_set()]
[docs]class InstrsResource(ResourceHolder): """Resource holder specialized to handle instruments. Each driver instance should be stored as a 2-tuple with its associated starter. (driver, starter) """
[docs] def release(self): """Finalize all the opened connections. """ for instr_profile in self: try: driver, starter = self[instr_profile] starter.stop(driver) except Exception: log = logging.getLogger(__name__) mes = 'Failed to close connection to instr : %s' log.exception(mes, self[instr_profile])
[docs] def reset(self): """Clean the cache of all drivers to avoid corrupted value due to user interferences. """ for instr_id in self: d, starter = self[instr_id] starter.reset(d)
[docs]class FilesResource(ResourceHolder): """Resource holder specialized in handling standard file descriptors. """
[docs] def release(self): """Close all the opened files. """ for file_id in self: try: self[file_id].close() except Exception: log = logging.getLogger(__name__) mes = 'Failed to close file handler : %s' log.exception(mes, self[file_id])