Source code for exopy.measurement.editors.database_access_editor.editor_model

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Model driving the database exception editor.

"""
from atom.api import Atom, Typed, List, Dict

from ....tasks.api import RootTask, ComplexTask
from ....tasks.tasks.database import DatabaseNode
from ....utils.container_change import ContainerChange
from ....utils.atom_util import tagged_members


[docs]class NodeModel(Atom): """Object representing the database node state linked to a ComplexTask """ #: Reference to the task this node refers to. task = Typed(ComplexTask) #: Database entries available on the node associated with the task. entries = List() #: Database exceptions present on the node. exceptions = List() #: Database entries for which an access exception exists has_exceptions = List() #: Children nodes children = List()
[docs] def sort_nodes(self, nodes=None): """Sort the nodes according to the task order. New nodes can be passed to the methods in which case they will replace the existing ones. """ nodes = nodes if nodes is not None else self.children tasks = [t for t in self.task.gather_children() if isinstance(t, ComplexTask)] self.children = sorted(nodes, key=lambda n: tasks.index(n.task))
[docs] def add_node(self, node): """Add a node to the children of this one. """ self.sort_nodes(self.children + [node])
[docs] def remove_node(self, node): """Remove a node from the children. We also discard all nodes that have no linked task (happen when dicarding a ComplexTask containing other tasks). """ tasks = [t for t in self.task.gather_children() if isinstance(t, ComplexTask)] self.sort_nodes([c for c in self.children if c is not node and c.task in tasks])
[docs] def add_exception(self, entry): """Add an access exception. """ task, entry = self._find_task_from_entry(entry) if entry not in task.access_exs: task.add_access_exception(entry, 1)
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= def _post_setattr_task(self, old, new): """Attach and detach observers as needed. """ if new: for m in tagged_members(new, 'child'): new.observe(m, self._react_to_task_children_event) for m in tagged_members(new, 'child_notifier'): new.observe(m, self._react_to_task_children_event) if old: for m in tagged_members(old, 'child'): old.unobserve(m, self._react_to_task_children_event) for m in tagged_members(old, 'child_notifier'): old.unobserve(m, self._react_to_task_children_event) def _react_to_task_children_event(self, change): """Simply reorder the nodes if it was a move event. Only move events are transparent to the database. """ if isinstance(change, ContainerChange): if change.collapsed: for c in change.collapsed: self._react_to_task_children_event(c) if change.moved: self.sort_nodes() def _find_task_from_entry(self, full_entry): """Find the task and short name corresponding to a full entry name. """ possible_tasks = [t for t in self.task.gather_children() if full_entry.startswith(t.name)] if len(possible_tasks) > 1: for p in possible_tasks: e = full_entry[len(p.name)+1:] if e in p.database_entries: break task = p entry = e else: task = possible_tasks[0] entry = full_entry[len(task.name)+1:] return task, entry
[docs]class EditorModel(Atom): """Model driving the database access editor. """ #: Reference to the root task of the currently edited task hierarchy. root = Typed(RootTask) #: Dictionary storing the nodes for all tasks by path. nodes = Dict()
[docs] def increase_exc_level(self, path, entry): """Increase the exception level of an access exception. Parameters ---------- path : unicode Path of the node in which the exception to increase is. entry : unicode Entry whose access exception should be increased. """ self._modify_exception_level(path, entry, 1)
[docs] def decrease_exc_level(self, path, entry): """Decrease the exception level of an access exception. Parameters ---------- path : unicode Path of the node in which the exception to increase is. entry : unicode Entry whose access exception should be increased. """ self._modify_exception_level(path, entry, -1)
# ========================================================================= # --- Private API --------------------------------------------------------- # ========================================================================= def _modify_exception_level(self, path, entry, val): """Modify the exception level of an access exception. Parameters ---------- path : unicode Path of the node in which the exception to increase is. entry : unicode Entry whose access exception should be increased. val : int Amount by which to modify the level. """ database_node = self.root.database.go_to_path(path) real_path = path + '/' + database_node.meta['access'][entry] task, entry = self.nodes[real_path]._find_task_from_entry(entry) level = task.access_exs[entry] task.modify_access_exception(entry, level + val) def _post_setattr_root(self, old, new): """Ensure we are observing the right database. """ if old: old.database.unobserve('notifier', self._react_to_entries) old.database.unobserve('access_notifier', self._react_to_exceptions) old.database.unobserve('nodes_notifier', self._react_to_nodes) if new: new.database.observe('notifier', self._react_to_entries) new.database.observe('access_notifier', self._react_to_exceptions) new.database.observe('nodes_notifier', self._react_to_nodes) database_nodes = new.database.list_nodes() nodes = {p: self._model_from_node(p, n) for p, n in database_nodes.items()} # Write the has_exception member now that all nodes exists for p, n in nodes.items(): if n.exceptions: access = database_nodes[p].meta['access'] for k, v in access.items(): nodes[p + '/' + v].has_exceptions.append(k) # Set the proper parent for each node knowing there is only one # root node whose path is root, all other contain at least one / # and isolating the first part of the path e find its parent for p, m in nodes.items(): if '/' in p: p, _ = p.rsplit('/', 1) nodes[p].children.append(m) # Sort nodes now that they all have a parent for nmodel in nodes.values(): nmodel.sort_nodes() self.nodes = nodes def _react_to_entries(self, news): """Handle modification to entries. """ if isinstance(news, list): for n in news: self._react_to_entries(n) return path, entry = news[1].rsplit('/', 1) n = self.nodes[path] if news[0] == 'added': n.entries = n.entries[:] + [entry] elif news[0] == 'renamed': entries = n.entries[:] del entries[entries.index(entry)] entries.append(news[2].rsplit('/', 1)[1]) n.entries = entries elif news[0] == 'removed': entries = n.entries[:] del entries[entries.index(entry)] n.entries = entries def _react_to_exceptions(self, news): """Handle modifications to the access exceptions. """ if isinstance(news, list): for n in news: self._react_to_exceptions(n) return path = news[1] n = self.nodes[path] origin_node = self.nodes[path + '/' + news[2] if news[2] else path] if news[0] == 'added': n.exceptions = n.exceptions[:] + [news[3]] origin_node.has_exceptions = (origin_node.has_exceptions[:] + [news[3]]) elif news[0] == 'renamed': exceptions = n.exceptions[:] del exceptions[exceptions.index(news[3])] exceptions.append(news[4]) n.exceptions = exceptions exs = origin_node.has_exceptions[:] del exs[exs.index(news[3])] exs.append(news[4]) origin_node.has_exceptions = exs elif news[0] == 'removed': exceptions = n.exceptions[:] if news[3]: del exceptions[exceptions.index(news[3])] n.exceptions = exceptions exs = origin_node.has_exceptions[:] del exs[exs.index(news[3])] origin_node.has_exceptions = exs else: n.exceptions = [] origin_node.has_exceptions = [] def _react_to_nodes(self, news): """Handle modifications of the database nodes. """ if isinstance(news, list): for n in news: self._react_to_nodes(n) return path = news[1] + '/' + news[2] if news[0] == 'added': parent = self.nodes[news[1]] model = self._model_from_node(path, news[3]) parent.add_node(model) self.nodes[path] = model elif news[0] == 'renamed': new_path = news[1] + '/' + news[3] nodes = self.nodes.copy() for k, v in nodes.items(): if k.startswith(path): del self.nodes[k] self.nodes[new_path + k[len(path):]] = v elif news[0] == 'removed': node = self.nodes[path] del self.nodes[path] parent = self.nodes[path.rsplit('/', 1)[0]] parent.remove_node(node) def _get_task(self, path): """Retrieve the task corresponding to a certain path. """ if '/' not in path: return self.root names = path.split('/')[1:] task = self.root for n in names: for t in task.gather_children() + [None]: if t is None: raise ValueError('No task matching the specified path') if t.name == n: task = t break return task def _model_from_node(self, path, node): """Build a new model from a node informations. """ entries = [k for k, v in node.data.items() if not isinstance(v, DatabaseNode)] excs = list(node.meta.get('access', {}).keys()) return NodeModel(entries=entries, exceptions=excs, task=self._get_task(path))