Source code for exopy.tasks.tasks.shared_resources

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Thread safe object to use in tasks.

"""
import logging
from contextlib import contextmanager
from collections import defaultdict
from threading import RLock, Lock

from atom.api import Atom, Instance, Value, Int, set_default


[docs]class SharedCounter(Atom): """ Thread-safe counter object. """ #: Current count of the counter. User should not manipulate this directly. count = Int()
[docs] def increment(self): """Increment the counter by one. """ with self._lock: self.count += 1
[docs] def decrement(self): """Decrement the counter by one. """ with self._lock: self.count += -1
#: Simple lock to ensure the thread safety of operations. _lock = Value(factory=Lock)
[docs]class SharedDict(Atom): """ Dict wrapper using a lock to protect access to its values. Parameters ---------- default : callable, optional Callable to use as argument for defaultdict, if unspecified a regular dict is used. """ def __init__(self, default=None, **kwargs): super(SharedDict, self).__init__(**kwargs) if default is not None: self._dict = defaultdict(default) else: self._dict = {}
[docs] @contextmanager def safe_access(self, key): """Context manager to safely manipulate a value of the dict. """ lock = self._lock lock.acquire() yield self._dict[key] lock.release()
[docs] @contextmanager def locked(self): """Acquire the instance lock. """ self._lock.acquire() yield self self._lock.release()
[docs] def get(self, key, default=None): """Equivalent of dict.get but lock protected. """ with self._lock: aux = self._dict.get(key, default) return aux
[docs] def items(self): """Equivalent of dict.items but lock protected. """ with self.locked(): for item in self._dict.items(): yield item
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= #: Underlying dict. _dict = Instance((dict, defaultdict)) #: Re-entrant lock use to secure the access to the dict. _lock = Value(factory=RLock) def __getitem__(self, key): with self.locked(): aux = self._dict[key] return aux def __setitem__(self, key, value): with self._lock: self._dict[key] = value def __delitem__(self, key): with self._lock: del self._dict[key] def __contains__(self, key): return key in self._dict def __iter__(self): return iter(self._dict) def __len__(self): return len(self._dict)
[docs]class ResourceHolder(SharedDict): """Base class for storing resources and handling releases and restting. """ #: Priority determining in which order resources will be released. #: Smallest values will be released earlier. priority = Int(100)
[docs] def release(self): """Release the resources held by this container. This method should be safe to call on already released resources. """ raise NotImplementedError()
[docs] def reset(self): """Reset the resources. This is different from releasing. This method is typically called when resuming a measurement to ensure that the state of the resources can be trusted inspite of the interruption. """ pass
[docs]class ThreadPoolResource(ResourceHolder): """Resource holder specialized to handle threads grouped in pools. """ # Should always be released first. As execution may not yet be complete. priority = set_default(-1) def __init__(self, default=list, **kwargs): super(ThreadPoolResource, self).__init__(default, **kwargs)
[docs] def release(self): """Join all the threads still alive. """ while True: threads = [] bugged = [] with self.locked(): # Get all the dispatcher we should be waiting upon. for pool in self: threads.extend(self[pool]) # If there is none break. if not any(threads): break # Else stop them. for dispatcher in threads: try: dispatcher.stop() except Exception: log = logging.getLogger(__name__) mes = 'Failed to join thread %s from pool %s' log.exception(mes, dispatcher, pool) bugged.append(dispatcher) # Make sure nobody modify the pools and update them by removing # the references to the dead threads. with self.locked(): for p in self: self[p] = [d for d in self[p] if d not in bugged and not d.inactive.is_set()]
[docs]class InstrsResource(ResourceHolder): """Resource holder specialized to handle instruments. Each driver instance should be stored as a 2-tuple with its associated starter. (driver, starter) """
[docs] def release(self): """Finalize all the opened connections. """ for instr_profile in self: try: driver, starter = self[instr_profile] starter.stop(driver) except Exception: log = logging.getLogger(__name__) mes = 'Failed to close connection to instr : %s' log.exception(mes, self[instr_profile])
[docs] def reset(self): """Clean the cache of all drivers to avoid corrupted value due to user interferences. """ for instr_id in self: d, starter = self[instr_id] starter.reset(d)
[docs]class FilesResource(ResourceHolder): """Resource holder specialized in handling standard file descriptors. """
[docs] def release(self): """Close all the opened files. """ for file_id in self: try: self[file_id].close() except Exception: log = logging.getLogger(__name__) mes = 'Failed to close file handler : %s' log.exception(mes, self[file_id])