Source code for exopy.instruments.plugin

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
# Distributed under the terms of the BSD license.
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Instrument manager plugin.

import os
import sys
import logging
from functools import partial
from collections import defaultdict

from atom.api import Typed, List, Dict
from enaml.application import deferred_call
from watchdog.observers import Observer

from ..utils.watchdog import SystematicFileUpdater
from ..utils.plugin_tools import (HasPreferencesPlugin, ExtensionsCollector,
from .user import InstrUser
from .starters.base_starter import Starter, BaseStarter
from .drivers.driver_decl import Driver, Drivers
from .connections.base_connection import Connection
from .settings.base_settings import Settings
from .manufacturer_aliases import ManufacturerAlias
from .infos import ManufacturersHolder, ProfileInfos, validate_profile_infos

DRIVERS_POINT = 'exopy.instruments.drivers'

STARTERS_POINT = 'exopy.instruments.starters'

USERS_POINT = 'exopy.instruments.users'

CONNECTIONS_POINT = 'exopy.instruments.connections'

SETTINGS_POINT = 'exopy.instruments.settings'

ALIASES_POINT = 'exopy.instruments.manufacturer_aliases'

logger = logging.getLogger(__name__)

[docs]def validate_user(user): """Validate that the user does declare a validate method if its policy is releasable. """ if not return False, 'InstrUser must provide an id.' if user.policy == 'releasable': member = user.release_profiles func = getattr(member, 'im_func', getattr(member, '__func__', None)) o_func = InstrUser.release_profiles if not func or func is o_func: msg = ("InstrUser policy is releasable but it does not declare a" " a release_profiles function.") return False, msg return True, ''
[docs]def validate_starter(starter): """Validate a starter declaration by checking for members and provided starter. """ if not return False, 'Starter must provide an id.' if not starter.description: return False, 'Starter must provide a description.' if not starter.starter: # The member validation ensures we get the right type return False, 'Starter must provide a BaseStarter subclass instance.' def get_function(cls, method_name): """Get the function corresponding to a method. """ meth = getattr(cls, m_name) return getattr(meth, 'im_func', meth) st = type(starter.starter) for m_name in ('start', 'stop', 'check_infos', 'reset'): if get_function(st, m_name) is get_function(BaseStarter, m_name): return False, 'BaseStarter subclass should implement %s' % m_name return True, ''
# TODO add a way to specify default values for settings from the preferences
[docs]class InstrumentManagerPlugin(HasPreferencesPlugin): """The instrument plugin manages the instrument drivers and their use. """ #: List of the known instrument profile ids. profiles = List() #: List of instruments for which at least one driver is declared. instruments = List() #: List of registered intrument users. #: Only registered users can be granted the use of an instrument. users = List() #: List of registered instrument starters. starters = List() #: List of registered connection types. connections = List() #: List of registered settings. settings = List() #: Currently used profiles. #: This dict should be edited by user code. used_profiles = Dict()
[docs] def start(self): """Start the plugin lifecycle by collecting all contributions. """ super(InstrumentManagerPlugin, self).start() core = self.workbench.get_plugin('enaml.workbench.core') core.invoke_command('') state = core.invoke_command('', {'state_id': ''}) i_dir = os.path.join(state.app_directory, 'instruments') # Create instruments subfolder if it does not exist. if not os.path.isdir(i_dir): os.mkdir(i_dir) p_dir = os.path.join(i_dir, 'profiles') # Create profiles subfolder if it does not exist. if not os.path.isdir(p_dir): os.mkdir(p_dir) self._profiles_folders = [p_dir] self._users = ExtensionsCollector(workbench=self.workbench, point=USERS_POINT, ext_class=InstrUser, validate_ext=validate_user) self._users.start() self._starters = ExtensionsCollector(workbench=self.workbench, point=STARTERS_POINT, ext_class=Starter, validate_ext=validate_starter) self._starters.start() checker = make_extension_validator(Connection, ('new',), ('id', 'description')) self._connections = ExtensionsCollector(workbench=self.workbench, point=CONNECTIONS_POINT, ext_class=Connection, validate_ext=checker) self._connections.start() checker = make_extension_validator(Settings, ('new',), ('id', 'description')) self._settings = ExtensionsCollector(workbench=self.workbench, point=SETTINGS_POINT, ext_class=Settings, validate_ext=checker) self._settings.start() checker = make_extension_validator(ManufacturerAlias, (), ('id', 'aliases',)) self._aliases = ExtensionsCollector(workbench=self.workbench, point=ALIASES_POINT, ext_class=ManufacturerAlias, validate_ext=checker) self._aliases.start() self._drivers = DeclaratorsCollector(workbench=self.workbench, point=DRIVERS_POINT, ext_class=[Driver, Drivers]) self._drivers.start() for contrib in ('users', 'starters', 'connections', 'settings'): self._update_contribs(contrib, None) err = False details = {} for d_id, d_infos in self._drivers.contributions.items(): res, tb = d_infos.validate(self) if not res: err = True details[d_id] = tb if err: core.invoke_command('', {'kind': 'exopy.driver-validation', 'details': details}) # TODO providing in app a way to have a splash screen while starting to # let the user know what is going on would be nice # TODO handle dynamic addition of drivers by observing contributions # and updating the manufacturers infos accordingly. # should also observe manufacturer aliases self._refresh_profiles() self._bind_observers() core.invoke_command('')
[docs] def stop(self): """Stop the plugin and remove all observers. """ self._unbind_observers() for contrib in ('drivers', 'users', 'starters', 'connections', 'settings'): getattr(self, '_'+contrib).stop()
[docs] def create_connection(self, connection_id, infos, read_only=False): """Create a connection and initialize it. Parameters ---------- connection_id : unicode Id of the the connection to instantiate. infos : dict Dictionarry to use to initialize the state of the connection. read_only : bool Should the connection be created as read-only. Returns ------- connection : BaseConnection Ready to use widget. """ c_decl = self._connections.contributions[connection_id] conn =, infos, read_only) if conn.declaration is None: conn.declaration = c_decl return conn
[docs] def create_settings(self, settings_id, infos, read_only=False): """Create a settings and initialize it. Parameters ---------- settings_id : unicode Id of the the settings to instantiate. infos : dict Dictionary to use to initialize the state of the settings. read_only : bool Should the settings be created as read-only. Returns ------- connection : BaseSettings Ready to use widget. """ if settings_id is None: msg = 'No id was found for the settings whose infos are %s' logger.warning(msg, infos) return None s_decl = self._settings.contributions[settings_id] sett =, infos, read_only) if sett.declaration is None: sett.declaration = s_decl return sett
[docs] def get_drivers(self, drivers): """Query drivers class and the associated starters. Parameters ---------- drivers : list List of driver ids for which the matching class should be returned. Returns ------- drivers : dict Requested drivers and associated starter indexed by id. missing : list List of ids which do not correspond to any known valid driver. """ ds = self._drivers.contributions knowns = {d_id: ds[d_id] for d_id in drivers if d_id in ds} missing = list(set(drivers) - set(knowns)) return {d_id: (infos.cls, self._starters.contributions[infos.starter].starter) for d_id, infos in knowns.items()}, missing
[docs] def get_profiles(self, user_id, profiles, try_release=True, partial=False): """Query profiles for use by a declared user. Parameters ---------- user_id : unicode Id of the user which request the authorization to use the instrument. profiles : list Ids of the instrument profiles which are requested. try_release : bool, optional Should we attempt to release currently used profiles. partial : bool, optional Should only a subset of the requested profiles be returned if some profiles are not available. Returns ------- profiles : dict Requested profiles as a dictionary. unavailable : list List of profiles that are not currently available and cannot be released. """ if user_id not in self.users: raise ValueError('Unknown instrument user tried to query profiles') used = [p for p in profiles if p in self.used_profiles] unavailable = [] if used: released = [] if not try_release: unavailable = used else: used_by_owner = defaultdict(set) for p in used: used_by_owner[self.used_profiles[p]].add(p) for o in list(used_by_owner): user = self._users.contributions[o] if user.policy == 'releasable': to_release = used_by_owner[o] r = user.release_profiles(self.workbench, to_release) unavailable.extend(set(to_release) - set(r)) released.extend(r) else: unavailable.extend(used_by_owner[o]) if unavailable and not partial: if released: used = {k: v for k, v in self.used_profiles.items() if k not in released} self.used_profiles = used return {}, unavailable available = ([p for p in profiles if p not in unavailable] if unavailable else profiles) with self.suppress_notifications(): u = self.used_profiles self.used_profiles = {} u.update({p: user_id for p in available}) self.used_profiles = u queried = {} for p in available: queried[p] = self._profiles[p]._config.dict() return queried, unavailable
[docs] def release_profiles(self, user_id, profiles): """Release some previously acquired profiles. The user should not maintain any communication with the instruments whose profiles have been released after calling this method. Parameters ---------- user_id : unicode Id of the user releasing the profiles. profiles : iterable Profiles (ids) which are no longer needed by the user. """ self.used_profiles = {k: v for k, v in self.used_profiles.items() if k not in profiles or v != user_id}
[docs] def get_aliases(self, manufacturer): """List the known aliases of a manufacturer. Parameters ---------- manufacturer : str Name of the manufacturer for which to return the aliases. Returns ------- aliases : list[unicode] Known aliases of the manufacturer. """ aliases = self._aliases.contributions.get(manufacturer, []) if aliases: aliases = aliases.aliases return aliases
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= #: Collector of drivers. _drivers = Typed(DeclaratorsCollector) #: Collector for the manufacturer aliases. _aliases = Typed(ExtensionsCollector) #: Declared manufacturers storing the corresponding model infos. _manufacturers = Typed(ManufacturersHolder) #: Collector of users. _users = Typed(ExtensionsCollector) #: Collector of starters. _starters = Typed(ExtensionsCollector) #: Collector of connections. _connections = Typed(ExtensionsCollector) #: Collector of settings. _settings = Typed(ExtensionsCollector) #: List of folders in which to search for profiles. # TODO make that list editable and part of the preferences _profiles_folders = List() #: Mapping of profile name to profile infos. _profiles = Dict() #: Watchdog observer tracking changes to the profiles folders. _observer = Typed(Observer) def _update_contribs(self, name, change): """Update the list of available contributions (editors, engines, tools) when they change. """ setattr(self, name, list(getattr(self, '_'+name).contributions)) if name == 'starters': for id_, s in getattr(self, '_'+name).contributions.items(): = id_ def _refresh_profiles(self): """List of profiles living in the profiles folders. """ profiles = {} logger = logging.getLogger(__name__) for path in self._profiles_folders: if os.path.isdir(path): filenames = sorted(f for f in os.listdir(path) if f.endswith('.instr.ini') and (os.path.isfile(os.path.join(path, f)))) for filename in filenames: profile_path = os.path.join(path, filename) # Beware redundant names are overwritten name = filename[:-len('.instr.ini')] # TODO should be delayed and lead to a nicer report i = ProfileInfos(path=profile_path, plugin=self) res, msg = validate_profile_infos(i) if res: profiles[name] = i else: logger.warning(msg) else: logger.warning('{} is not a valid directory'.format(path)) self._profiles = profiles def _bind_observers(self): """Start the observers. """ for contrib in ('users', 'starters', 'connections', 'settings'): callback = partial(self._update_contribs, contrib) getattr(self, '_'+contrib).observe('contributions', callback) def update(): """Run the handler on the main thread to avoid GUI issues. """ deferred_call(self._refresh_profiles) self._observer = Observer() for folder in self._profiles_folders: handler = SystematicFileUpdater(update) self._observer.schedule(handler, folder, recursive=True) self._observer.start() def _unbind_observers(self): """Stop the observers. """ for contrib in ('users', 'starters', 'connections', 'settings'): callback = partial(self._update_contribs, contrib) getattr(self, '_'+contrib).observe('contributions', callback) self._observer.unschedule_all() self._observer.stop() try: self._observer.join() except RuntimeError: pass def _post_setattr__profiles(self, old, new): """Automatically update the profiles member. """ self.profiles = sorted(new) def _default__manufacturers(self): """Delayed till this is first needed. """ holder = ManufacturersHolder(plugin=self) valid_drivers = [d for d in self._drivers.contributions.values()] holder.update_manufacturers(valid_drivers) return holder