Source code for exopy.tasks.tasks.util.sleep_task

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# Copyright 2015-2018 by Exopy Authors, see AUTHORS for more details.
#
# Distributed under the terms of the BSD license.
#
# The full license is in the file LICENCE, distributed with this software.
# -----------------------------------------------------------------------------
"""Task that makes the system wait on all multithreading pools
for a set amount of time.

"""
import numbers
from atom.api import (Str, set_default)
from time import sleep

from ..base_tasks import SimpleTask
from ..validators import Feval


[docs]class SleepTask(SimpleTask): """Simply sleeps for the specified amount of time. Wait for any parallel operation before execution by default. """ # Time to wait time = Str().tag(pref=True, feval=Feval(types=numbers.Real)) wait = set_default({'': True}) database_entries = set_default({'time': 1})
[docs] def perform(self): t = self.format_and_eval_string(self.time) self.write_in_database('time', t) sleep(t)
[docs] def check(self, *args, **kwargs): """ Check if time > 0 """ test, tb = super(SleepTask, self).check(*args, **kwargs) if test and self.format_and_eval_string(self.time) < 0: tb[self.get_error_path()] = 'Sleep time must be positive.' test = False return test, tb