Source code for exopy.tasks.tasks.util.formula_task
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Task for defining mathematical formulas.
"""
from collections import OrderedDict
from atom.api import (Typed, set_default)
from ....utils.traceback import format_exc
from ....utils.atom_util import (ordered_dict_from_pref, ordered_dict_to_pref)
from ..base_tasks import SimpleTask
[docs]class FormulaTask(SimpleTask):
"""Compute values according to formulas.
Any valid python expression can be evaluated; any valid key entry of the
database will be replaced by it's value.
"""
#: List of formulas.
#: To modify it (add/remove entry) the dictionary must be copied, modified
#: and then reassigned.
formulas = Typed(OrderedDict, ()).tag(pref=[ordered_dict_to_pref,
ordered_dict_from_pref])
wait = set_default({'activated': True}) # Wait on all pools by default.
[docs] def perform(self):
"""Evaluate alll formulas and update the database.
"""
for k, v in self.formulas.items():
value = self.format_and_eval_string(v)
self.write_in_database(k, value)
[docs] def check(self, *args, **kwargs):
"""Validate that all formulas can be evaluated.
"""
traceback = {}
test = True
err_path = self.get_error_path()
for k, v in self.formulas.items():
try:
value = self.format_and_eval_string(v)
self.write_in_database(k, value)
except Exception:
test = False
name = err_path + '-' + k
traceback[name] =\
"Failed to eval the formula {}: {}".format(k, format_exc())
return test, traceback
def _post_setattr_formulas(self, old, new):
"""Observer keeping the database entries in sync with the declared
formulas.
"""
self.database_entries = {key: 1.0 for key in new}