# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Thread safe object to use in tasks.
"""
from __future__ import (division, unicode_literals, print_function,
absolute_import)
import logging
from contextlib import contextmanager
from collections import defaultdict
from threading import RLock, Lock
from atom.api import Atom, Instance, Value, Int, set_default
[docs]class SharedCounter(Atom):
""" Thread-safe counter object.
"""
#: Current count of the counter. User should not manipulate this directly.
count = Int()
[docs] def increment(self):
"""Increment the counter by one.
"""
with self._lock:
self.count += 1
[docs] def decrement(self):
"""Decrement the counter by one.
"""
with self._lock:
self.count += -1
#: Simple lock to ensure the thread safety of operations.
_lock = Value(factory=Lock)
[docs]class SharedDict(Atom):
""" Dict wrapper using a lock to protect access to its values.
Parameters
----------
default : callable, optional
Callable to use as argument for defaultdict, if unspecified a regular
dict is used.
"""
def __init__(self, default=None, **kwargs):
super(SharedDict, self).__init__(**kwargs)
if default is not None:
self._dict = defaultdict(default)
else:
self._dict = {}
[docs] @contextmanager
def safe_access(self, key):
"""Context manager to safely manipulate a value of the dict.
"""
lock = self._lock
lock.acquire()
yield self._dict[key]
lock.release()
[docs] @contextmanager
def locked(self):
"""Acquire the instance lock.
"""
self._lock.acquire()
yield self
self._lock.release()
[docs] def get(self, key, default=None):
"""Equivalent of dict.get but lock protected.
"""
with self._lock:
aux = self._dict.get(key, default)
return aux
[docs] def items(self):
"""Equivalent of dict.items but lock protected.
"""
with self.locked():
for item in self._dict.items():
yield item
# =========================================================================
# --- Private API ---------------------------------------------------------
# =========================================================================
#: Underlying dict.
_dict = Instance((dict, defaultdict))
#: Re-entrant lock use to secure the access to the dict.
_lock = Value(factory=RLock)
def __getitem__(self, key):
with self.locked():
aux = self._dict[key]
return aux
def __setitem__(self, key, value):
with self._lock:
self._dict[key] = value
def __delitem__(self, key):
with self._lock:
del self._dict[key]
def __contains__(self, key):
return key in self._dict
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
[docs]class ResourceHolder(SharedDict):
"""Base class for storing resources and handling releases and restting.
"""
#: Priority determining in which order resources will be released.
#: Smallest values will be released earlier.
priority = Int(100)
[docs] def release(self):
"""Release the resources held by this container.
This method should be safe to call on already released resources.
"""
raise NotImplementedError()
[docs] def reset(self):
"""Reset the resources.
This is different from releasing. This method is typically called when
resuming a measurement to ensure that the state of the resources can be
trusted inspite of the interruption.
"""
pass
[docs]class ThreadPoolResource(ResourceHolder):
"""Resource holder specialized to handle threads grouped in pools.
"""
# Should always be released first. As execution may not yet be complete.
priority = set_default(-1)
def __init__(self, default=list, **kwargs):
super(ThreadPoolResource, self).__init__(default, **kwargs)
[docs] def release(self):
"""Join all the threads still alive.
"""
while True:
threads = []
bugged = []
with self.locked():
# Get all the dispatcher we should be waiting upon.
for pool in self:
threads.extend(self[pool])
# If there is none break.
if not any(threads):
break
# Else stop them.
for dispatcher in threads:
try:
dispatcher.stop()
except Exception:
log = logging.getLogger(__name__)
mes = 'Failed to join thread %s from pool %s'
log.exception(mes, dispatcher, pool)
bugged.append(dispatcher)
# Make sure nobody modify the pools and update them by removing
# the references to the dead threads.
with self.locked():
for p in self:
self[p] = [d for d in self[p]
if d not in bugged and not d.inactive.is_set()]
[docs]class InstrsResource(ResourceHolder):
"""Resource holder specialized to handle instruments.
Each driver instance should be stored as a 2-tuple with its associated
starter. (driver, starter)
"""
[docs] def release(self):
"""Finalize all the opened connections.
"""
for instr_profile in self:
try:
driver, starter = self[instr_profile]
starter.stop(driver)
except Exception:
log = logging.getLogger(__name__)
mes = 'Failed to close connection to instr : %s'
log.exception(mes, self[instr_profile])
[docs] def reset(self):
"""Clean the cache of all drivers to avoid corrupted value due to
user interferences.
"""
for instr_id in self:
d, starter = self[instr_id]
starter.reset(d)
[docs]class FilesResource(ResourceHolder):
"""Resource holder specialized in handling standard file descriptors.
"""
[docs] def release(self):
"""Close all the opened files.
"""
for file_id in self:
try:
self[file_id].close()
except Exception:
log = logging.getLogger(__name__)
mes = 'Failed to close file handler : %s'
log.exception(mes, self[file_id])